"""
|
可转债入口函数
|
"""
|
import datetime
|
import json
|
import logging
|
import multiprocessing
|
import threading
|
import time
|
|
from code_attribute import target_codes_manager, gpcode_manager, code_market_manager, history_k_data_util
|
from code_attribute.gpcode_manager import CodesNameManager
|
from code_attribute.limit_up_time_manager import FirstLimitUpTimeManager
|
from db.redis_manager_delegate import RedisUtils
|
from huaxin_client import l2_client_for_cb
|
from huaxin_client.client_network import SendResponseSkManager
|
from log_module import async_log_util, log_export
|
from records import huaxin_trade_record_manager
|
from third_data import kpl_data_manager, kpl_util
|
from third_data.kpl_data_manager import PullTask, KPLCodeJXBlockManager, KPLLimitUpDataRecordManager
|
from trade import huaxin_trade_api, huaxin_trade_data_update, huaxin_sell_util, backtest_trade, buy_strategy, \
|
sell_strategy
|
from trade.buy_strategy import BuyStrategyDataManager, StrategyBuyOrderRefManager
|
from trade.trade_manager import CodeTradeStateManager
|
from trade.trade_settings import WantBuyCodesManager, TradeStateManager
|
from utils import middle_api_protocol, outside_api_command_manager, constant, tool, huaxin_util, socket_util, sell_util, \
|
output_util, l2_huaxin_util, output_data_util
|
|
middle_api_protocol.SERVER_PORT = 10008
|
middle_api_protocol.SERVER_HOST = "43.138.167.68"
|
|
constant.LOG_DIR = "logs_cb"
|
from log_module.log import logger_debug, logger_trade, printlog
|
import concurrent.futures
|
|
__cancel_sell_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=8)
|
__BuyStrategyDataManager = BuyStrategyDataManager()
|
|
|
def __send_response(data_bytes):
|
sk = SendResponseSkManager.create_send_response_sk(addr=middle_api_protocol.SERVER_HOST,
|
port=middle_api_protocol.SERVER_PORT)
|
try:
|
data_bytes = socket_util.load_header(data_bytes)
|
sk.sendall(data_bytes)
|
result, header_str = socket_util.recv_data(sk)
|
result = json.loads(result)
|
if result["code"] != 0:
|
raise Exception(result['msg'])
|
finally:
|
sk.close()
|
|
|
def send_response(data, _client_id, _request_id):
|
data_bytes = json.dumps({"type": "response", "data": data, "client_id": _client_id,
|
"request_id": _request_id}).encode('utf-8')
|
for i in range(3):
|
try:
|
__send_response(data_bytes)
|
printlog("发送数据成功")
|
break
|
except Exception as e1:
|
logging.exception(e1)
|
|
|
# 撤长期没有成交的单
|
def __cancel_not_deal_order(code, order_ref, timeout=3):
|
time.sleep(timeout)
|
# 撤买单
|
huaxin_trade_api.cancel_order(1, code, "", orderRef=order_ref)
|
|
|
def command_callback(client_id, request_id, data):
|
"""
|
命令回调
|
:param client_id:
|
:param request_id:
|
:param data: json格式数据
|
:return:
|
"""
|
type_ = data.get('type')
|
if type_ == outside_api_command_manager.API_TYPE_TRADE:
|
try:
|
trade_type = data["trade_type"]
|
if trade_type == outside_api_command_manager.TRADE_TYPE_ORDER:
|
code = data["code"]
|
direction = data["direction"]
|
volume = data["volume"]
|
price_type = data["price_type"]
|
price = data["price"]
|
sinfo = data["sinfo"]
|
if direction == 2:
|
# price_type: 0-价格笼子 1-跌停价 2-涨停价 3-现价 4-买5价
|
async_log_util.info(logger_trade, f"API卖: 接收数据-{data}")
|
current_price = None
|
market_info = code_market_manager.get_market_info(code)
|
if market_info:
|
current_price = market_info.price
|
else:
|
price_results = history_k_data_util.HistoryKDatasUtils.get_gp_current_info([code])
|
current_price = price_results[0]["price"]
|
limit_down_price = target_codes_manager.get_limit_down_price(code)
|
limit_up_price = target_codes_manager.get_limit_up_price(code)
|
order_ref = huaxin_util.create_order_ref()
|
try:
|
result = huaxin_sell_util.start_sell(code, volume, price_type, limit_up_price,
|
limit_down_price,
|
current_price, blocking=True, request_id=request_id,
|
order_ref=order_ref)
|
async_log_util.info(logger_trade, f"API卖结果: {result}")
|
send_response(result, client_id, request_id)
|
except Exception as e:
|
if str(e).find("超时") >= 0:
|
send_response({"code": 0, "data": {"orderRef": order_ref}}, client_id, request_id)
|
else:
|
raise e
|
finally:
|
huaxin_trade_data_update.add_position_list()
|
huaxin_trade_data_update.add_money_list()
|
huaxin_trade_data_update.add_deal_list()
|
|
else:
|
if not price:
|
limit_down_price = target_codes_manager.get_limit_down_price(code)
|
limit_up_price = target_codes_manager.get_limit_up_price(code)
|
price = sell_util.get_sell_price(price_type, limit_up_price, limit_down_price, None)
|
if not price:
|
raise Exception("尚未获取到买入价格")
|
# 获取买1金额
|
price = round(float(price), 3)
|
order_ref = huaxin_util.create_order_ref()
|
result = huaxin_trade_api.order(direction, code, volume, price,
|
sinfo=sinfo, order_ref=order_ref,
|
blocking=True, request_id=request_id)
|
# 2s内没成交就撤单
|
__cancel_sell_thread_pool.submit(__cancel_not_deal_order, code, order_ref)
|
else:
|
result = huaxin_trade_api.order(direction, code, volume, price,
|
sinfo=sinfo,
|
blocking=True, request_id=request_id)
|
huaxin_trade_data_update.add_position_list()
|
huaxin_trade_data_update.add_money_list()
|
huaxin_trade_data_update.add_deal_list()
|
send_response({"code": 0, "data": result}, client_id, request_id)
|
except Exception as e:
|
logger_debug.exception(e)
|
send_response({"code": 1, "msg": str(e)}, client_id, request_id)
|
elif type_ == "get_code_position_info":
|
try:
|
# 查询此仓
|
code = data.get("code")
|
if code:
|
results = huaxin_trade_record_manager.PositionManager().list_by_day(tool.get_now_date_str("%Y%m%d"),
|
code)
|
else:
|
results = huaxin_trade_record_manager.PositionManager().get_from_cache()
|
if constant.backtest_mode_info:
|
results.clear()
|
position_dict = backtest_trade.position_dict
|
for p in position_dict:
|
translation = position_dict[p]
|
underlying_code = translation["SecurityID"]
|
# TODO 获取当时的价格
|
translation["buy_list"] = [{"price": str(translation['TradePrice']),
|
"tradeTime": l2_huaxin_util.convert_time(translation['OrderTime']),
|
"volume": 10}]
|
cb_code = target_codes_manager.get_cb_code(underlying_code)
|
underlying_market = code_market_manager.get_market_info(underlying_code)
|
results.append(
|
{"securityID": cb_code,
|
"securityName": gpcode_manager.CodesNameManager().get_code_name(cb_code),
|
"buy_list": translation["buy_list"], "sell_list": [], "currentPosition": 10, "id": cb_code})
|
|
for r in results:
|
cb_code = r["securityID"]
|
underlying_code = target_codes_manager.get_underlying_code(cb_code)
|
cb_market = code_market_manager.get_market_info(cb_code)
|
underlying_market = code_market_manager.get_market_info(underlying_code)
|
if cb_market:
|
r["marketInfo"] = {"code": cb_market.code, "name": r["securityName"],
|
"rate": f"{'+' if cb_market.rate > 0.0001 else ''}{round(cb_market.rate * 100, 2)}%",
|
"price": cb_market.price, "lastVolume": cb_market.total_bid_volume // 100,
|
"buy1Money": output_util.money_desc(
|
cb_market.buy1_price * cb_market.buy1_volume),
|
"preClosePrice": cb_market.pre_close_price}
|
r["createTime"] = 0
|
if underlying_market:
|
if not gpcode_manager.CodesNameManager().get_code_name(underlying_market.code):
|
# 异步请求名称
|
threading.Thread(
|
target=lambda: gpcode_manager.CodesNameManager().request_code_name(underlying_market.code),
|
daemon=True).start()
|
|
r["underlyingMarketInfo"] = {"code": underlying_market.code,
|
"name": gpcode_manager.CodesNameManager().get_code_name(
|
underlying_market.code),
|
"rate": f"{'+' if underlying_market.rate > 0.0001 else ''}{round(underlying_market.rate * 100, 2)}%",
|
"price": underlying_market.price,
|
"lastVolume": underlying_market.total_bid_volume // 100,
|
"buy1Money": output_util.money_desc(
|
underlying_market.buy1_price * underlying_market.buy1_volume),
|
"preClosePrice": underlying_market.pre_close_price}
|
|
r["underlyingDetailInfo"] = output_data_util.load_code_detail_info(underlying_market.code)
|
r["underlyingDetailInfo"]["price"] = underlying_market.price
|
|
# 获取买点与卖点
|
if "buy_list" not in r:
|
buys = huaxin_trade_record_manager.DealRecordManager().list_buy_by_code_cache(cb_code)
|
# 根据orderSystemId聚合
|
temp_dict = {}
|
for b in buys:
|
orderSysID = b["orderSysID"]
|
if orderSysID not in temp_dict:
|
temp_dict[orderSysID] = []
|
temp_dict[orderSysID].append(b)
|
r["buy_list"] = []
|
for k in temp_dict:
|
volume = sum([x["volume"] for x in temp_dict[k]])
|
x = temp_dict[k][0]
|
r["buy_list"].append({"price": str(x["price"]), "tradeTime": x["tradeTime"], "volume": volume,
|
"type": StrategyBuyOrderRefManager().get_strategy_type(x["orderRef"])})
|
if buys:
|
r["createTime"] = int(buys[0]["tradeTime"].replace(":", ""))
|
|
if "sell_list" not in r:
|
sells = huaxin_trade_record_manager.DealRecordManager().list_sell_by_code_cache(cb_code)
|
temp_dict = {}
|
for s in sells:
|
orderSysID = s["orderSysID"]
|
if orderSysID not in temp_dict:
|
temp_dict[orderSysID] = []
|
temp_dict[orderSysID].append(s)
|
r["sell_list"] = []
|
for k in temp_dict:
|
volume = sum([x["volume"] for x in temp_dict[k]])
|
x = temp_dict[k][0]
|
r["sell_list"].append({"price": str(x["price"]), "tradeTime": x["tradeTime"], "volume": volume})
|
send_response({"code": 0, "data": results}, client_id, request_id)
|
except Exception as e:
|
logger_debug.exception(e)
|
send_response({"code": 1, "msg": str(e)}, client_id, request_id)
|
elif type_ == "refresh_trade_data":
|
# 刷新交易数据
|
ctype = data.get("ctype")
|
if ctype == "money":
|
huaxin_trade_data_update.add_money_list()
|
elif ctype == "position_list":
|
huaxin_trade_data_update.add_position_list()
|
elif ctype == "deal_list":
|
huaxin_trade_data_update.add_deal_list()
|
elif ctype == "delegate_list":
|
huaxin_trade_data_update.add_delegate_list("手动刷新")
|
send_response({"code": 0, "data": {}}, client_id, request_id)
|
elif type_ == outside_api_command_manager.API_TYPE_COMMON_REQUEST:
|
# 常规接口
|
ctype = data['ctype']
|
if ctype == 'get_account_money':
|
# 获取账户资金
|
result = huaxin_trade_record_manager.MoneyManager.get_data()
|
send_response({"code": 0, "data": result}, client_id, request_id)
|
elif ctype == 'set_backtest_mode':
|
try:
|
# 设置回测模式
|
date = data.get("date")
|
mode = data.get("mode")
|
if not date:
|
date = tool.get_now_date_str()
|
if mode:
|
# 开始回撤
|
constant.backtest_mode_info = (True, date)
|
threading.Thread(target=lambda: backtest_trade.start_backtest(date), daemon=True).start()
|
else:
|
# 结束回撤
|
constant.backtest_mode_info = None
|
send_response({"code": 0, "data": {}}, client_id, request_id)
|
except Exception as e:
|
send_response({"code": 0, "data": {}, "msg": str(e)}, client_id, request_id)
|
elif ctype == "want_buy_codes":
|
try:
|
operate = data["operate"]
|
if operate == outside_api_command_manager.OPERRATE_ADD:
|
code = data["code"]
|
WantBuyCodesManager().add_code(code, buy_strategy.STRATEGY_TYPE_LIMIT_UP)
|
WantBuyCodesManager().add_code(code, buy_strategy.STRATEGY_TYPE_RISE_HIGH_WITH_BLOCKS)
|
send_response({"code": 0, "data": {}}, client_id, request_id)
|
elif operate == outside_api_command_manager.OPERRATE_GET:
|
code_infos = WantBuyCodesManager().list_code()
|
codes = set([x.split("-")[0] for x in code_infos])
|
code_infos = [(x, CodesNameManager.get_code_name(x)) for x in codes]
|
send_response({"code": 0, "data": code_infos}, client_id, request_id)
|
elif operate == outside_api_command_manager.OPERRATE_DELETE:
|
code = data["code"]
|
WantBuyCodesManager().remove_code(code, buy_strategy.STRATEGY_TYPE_LIMIT_UP)
|
WantBuyCodesManager().remove_code(code, buy_strategy.STRATEGY_TYPE_RISE_HIGH_WITH_BLOCKS)
|
send_response({"code": 0, "data": {}}, client_id, request_id)
|
except Exception as e:
|
send_response({"code": 1, "msg": str(e)}, client_id, request_id)
|
elif ctype == "trade_state":
|
try:
|
operate = data["operate"]
|
if operate == outside_api_command_manager.OPERRATE_SET:
|
state = data["state"]
|
if state > 0:
|
TradeStateManager().open_buy()
|
else:
|
TradeStateManager().close_buy()
|
send_response({"code": 0, "data": {}}, client_id, request_id)
|
elif operate == outside_api_command_manager.OPERRATE_GET:
|
state = 1 if TradeStateManager().is_can_buy() else 0
|
send_response({"code": 0, "data": {"state": state}}, client_id, request_id)
|
except Exception as e:
|
send_response({"code": 1, "msg": str(e)}, client_id, request_id)
|
|
|
def test():
|
time.sleep(5)
|
|
print("获取资金:", huaxin_trade_api.get_money())
|
# print("获取成交:", huaxin_trade_api.get_deal_list())
|
# print("下单:", huaxin_trade_api.order(1, "127075", 10, 140.5, blocking=True))
|
|
|
def __get_buy_money(strategy_type):
|
time_str = tool.get_now_time_str().replace(":", "")
|
if int(time_str) < int("103000"):
|
money = 40000
|
elif int(time_str) < int("113000"):
|
money = 30000
|
elif int(time_str) < int("140000"):
|
money = 20000
|
else:
|
money = 10000
|
if strategy_type == buy_strategy.STRATEGY_TYPE_RISE_HIGH_WITH_BLOCKS:
|
return int(money * 0.8)
|
elif strategy_type == buy_strategy.STRATEGY_TYPE_LIMIT_UP:
|
return int(money * 0.2)
|
else:
|
return money
|
|
|
def read_l2_results(trade_call_back_queue):
|
while True:
|
try:
|
result = trade_call_back_queue.get()
|
if result:
|
# 获取可以买的代码
|
code, trade_time = result[0], result[1]
|
buy_infos = result[3]
|
for buy_info in buy_infos:
|
if buy_info[0]:
|
if not TradeStateManager().is_can_buy():
|
continue
|
# 策略类型
|
strategy_type = buy_info[1]
|
# 获取股票代码的可转债代码
|
cb_code = target_codes_manager.get_cb_code(code)
|
if CodeTradeStateManager().get_trade_state(cb_code,
|
strategy_type) == CodeTradeStateManager.TRADE_STATE_ALREADY_BUY and not WantBuyCodesManager().is_in_cache(
|
cb_code, strategy_type):
|
# 已经买了且没在想买单
|
continue
|
# 如果之前就涨停了且没在想买单中
|
underlying_code = target_codes_manager.get_underlying_code(cb_code)
|
underlying_limit_up_time = FirstLimitUpTimeManager().get_first_limit_up_time(underlying_code)
|
if not WantBuyCodesManager().is_in_cache(cb_code, strategy_type) and underlying_limit_up_time:
|
continue
|
|
# 获取可转债的涨停价
|
market_info = code_market_manager.get_market_info(cb_code)
|
limit_up_price = target_codes_manager.get_limit_up_price(cb_code)
|
if market_info:
|
if market_info.rate > 0.139:
|
async_log_util.info(logger_trade, f"可转债涨幅过高::{cb_code}-{market_info.rate}")
|
continue
|
|
volume = int(__get_buy_money(strategy_type) / float(limit_up_price))
|
volume = (volume // 10) * 10
|
buy_price = round(min(float(market_info.price * 1.02), float(limit_up_price)), 3)
|
async_log_util.info(logger_trade, f"准备下单:{cb_code}-{buy_price}-{buy_info}")
|
# 买入20股
|
result = huaxin_trade_api.order(1, cb_code, volume, buy_price, blocking=True)
|
if type(result) == dict and result['code'] == 0:
|
orderRef = result['data']['orderRef']
|
StrategyBuyOrderRefManager().add(orderRef, strategy_type)
|
CodeTradeStateManager().set_trade_state(cb_code, strategy_type,
|
CodeTradeStateManager.TRADE_STATE_ALREADY_BUY)
|
# 移除想买单
|
WantBuyCodesManager().remove_code(cb_code, strategy_type)
|
async_log_util.info(logger_trade, f"可转债下单结果:{result}")
|
huaxin_trade_data_update.add_position_list()
|
huaxin_trade_data_update.add_money_list()
|
huaxin_trade_data_update.add_deal_list()
|
except Exception as e:
|
logger_debug.exception(e)
|
time.sleep(1)
|
finally:
|
pass
|
|
|
def __read_market_data(queue_market: multiprocessing.Queue):
|
while True:
|
try:
|
result = queue_market.get()
|
if result:
|
# (代码, 最近的价格, 涨幅, 买1价, 买1量, 成交总量, 买入量, 卖出量, 昨日收盘价, 时间戳)
|
code_market_manager.set_market_info(result)
|
try:
|
time_str = l2_huaxin_util.convert_time(result[9])
|
if int(time_str.replace(":", "")) < int("092455"):
|
continue
|
except Exception as e:
|
pass
|
code = result[0]
|
# 正股需要加载板块
|
if tool.is_stock(code):
|
# 正股代码
|
limit_up_price = tool.get_limit_up_price(code, result[8])
|
# 涨幅大于5%才开始获取板块
|
if result[2] > 0.05:
|
KPLCodeJXBlockManager().load_jx_blocks(result[0], result[3],
|
float(limit_up_price),
|
KPLLimitUpDataRecordManager.get_current_reasons())
|
FirstLimitUpTimeManager().process(result)
|
else:
|
# 可转债代码,处理
|
sell_strategy.process_market_info(result, __BuyStrategyDataManager)
|
except Exception as e:
|
logger_debug.exception(e)
|
time.sleep(1)
|
finally:
|
pass
|
|
|
def __init_data():
|
"""
|
初始化参数
|
:return:
|
"""
|
try:
|
market_dict = log_export.load_latest_market_info()
|
for k in market_dict:
|
code_market_manager.set_market_info(market_dict[k], with_log=False)
|
except Exception as e:
|
logger_debug.exception(e)
|
|
|
def __kpl_limit_up_callback(results):
|
if results:
|
result_list = kpl_util.parseLimitUpData(results)
|
kpl_data_manager.KPLLimitUpDataRecordManager.save_record(tool.get_now_date_str(), result_list)
|
|
|
if __name__ == '__main__':
|
# ===========初始化数据==========
|
try:
|
target_codes_manager.load_valid_codes_info()
|
except Exception as e:
|
logger_debug.exception(e)
|
|
__init_data()
|
|
trade_call_back_queue = multiprocessing.Queue()
|
|
# 华鑫交易数据更新
|
huaxin_trade_data_update.run()
|
|
# 定时拉取开盘啦涨停数据
|
threading.Thread(target=lambda: PullTask.run_limit_up_task(__kpl_limit_up_callback), daemon=True).start()
|
# 仿真交易不运行交易客户端
|
# # ===========运行交易外部API==========
|
#
|
#
|
#
|
# # 策略与交易通信队列
|
# # 交易结果读取, 交易命令队列与交易查询队列设置为同一个
|
# queue_strategy_r_trade_w, queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_query = multiprocessing.Queue(), multiprocessing.Queue(), multiprocessing.Queue()
|
# huaxin_trade_api.run_trade(queue_strategy_r_trade_w, queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_query)
|
#
|
#
|
# # ===========运行交易端==========
|
# tradeProcess = multiprocessing.Process(
|
# target=trade_client_for_cb.run,
|
# args=(queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_query, queue_strategy_r_trade_w,))
|
# tradeProcess.start()
|
|
# ===========运行本地API接口==========
|
# middle_api_protocol.SERVER_HOST = "192.168.3.122"
|
manager = outside_api_command_manager.NewApiCommandManager()
|
manager.init(middle_api_protocol.SERVER_HOST, middle_api_protocol.SERVER_PORT,
|
command_callback, [("trade_cb", 50)])
|
manager.run(blocking=False)
|
|
# threading.Thread(target=test, daemon=True).start()
|
# ===========读取根据L2制定的买入策略==========
|
threading.Thread(target=read_l2_results, args=(trade_call_back_queue,), daemon=True).start()
|
# ===========异步日志持久化==========
|
threading.Thread(target=async_log_util.run_sync, daemon=True).start()
|
# ===========Redis缓存======================
|
threading.Thread(target=RedisUtils.run_loop, daemon=True).start()
|
# 运行L2数据监听队列
|
queue_market = multiprocessing.Queue()
|
threading.Thread(target=__read_market_data, args=(queue_market,), daemon=True).start()
|
l2_client_for_cb.run(trade_call_back_queue, queue_market)
|