""" 可转债入口函数 """ import json import logging import multiprocessing import threading import time from code_attribute import target_codes_manager, gpcode_manager, code_market_manager, history_k_data_util from db.redis_manager_delegate import RedisUtils from huaxin_client import l2_client_for_cb, trade_client_for_cb from huaxin_client.client_network import SendResponseSkManager from log_module import async_log_util, log_export from records import huaxin_trade_record_manager from third_data import kpl_data_manager, kpl_util from third_data.kpl_data_manager import PullTask, KPLCodeJXBlockManager, KPLLimitUpDataRecordManager from trade import huaxin_trade_api, huaxin_trade_data_update, huaxin_sell_util, backtest_trade from trade.buy_strategy import BuyStrategyDataManager from trade.trade_manager import CodeTradeStateManager from utils import middle_api_protocol, outside_api_command_manager, constant, tool, huaxin_util, socket_util, sell_util, \ output_util, l2_huaxin_util, output_data_util middle_api_protocol.SERVER_PORT = 10008 middle_api_protocol.SERVER_HOST = "43.138.167.68" constant.LOG_DIR = "logs_cb" from log_module.log import logger_debug, logger_trade, printlog import concurrent.futures __cancel_sell_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=8) __BuyStrategyDataManager = BuyStrategyDataManager() def __send_response(data_bytes): sk = SendResponseSkManager.create_send_response_sk(addr=middle_api_protocol.SERVER_HOST, port=middle_api_protocol.SERVER_PORT) try: data_bytes = socket_util.load_header(data_bytes) sk.sendall(data_bytes) result, header_str = socket_util.recv_data(sk) result = json.loads(result) if result["code"] != 0: raise Exception(result['msg']) finally: sk.close() def send_response(data, _client_id, _request_id): data_bytes = json.dumps({"type": "response", "data": data, "client_id": _client_id, "request_id": _request_id}).encode('utf-8') for i in range(3): try: __send_response(data_bytes) printlog("发送数据成功") break except Exception as e1: logging.exception(e1) # 撤长期没有成交的单 def __cancel_not_deal_order(code, order_ref, timeout=3): time.sleep(timeout) # 撤买单 huaxin_trade_api.cancel_order(1, code, "", orderRef=order_ref) def command_callback(client_id, request_id, data): """ 命令回调 :param client_id: :param request_id: :param data: json格式数据 :return: """ type_ = data.get('type') if type_ == outside_api_command_manager.API_TYPE_TRADE: try: trade_type = data["trade_type"] if trade_type == outside_api_command_manager.TRADE_TYPE_ORDER: code = data["code"] direction = data["direction"] volume = data["volume"] price_type = data["price_type"] price = data["price"] sinfo = data["sinfo"] if direction == 2: # price_type: 0-价格笼子 1-跌停价 2-涨停价 3-现价 4-买5价 async_log_util.info(logger_trade, f"API卖: 接收数据-{data}") current_price = None market_info = code_market_manager.get_market_info(code) if market_info: current_price = market_info.price else: price_results = history_k_data_util.HistoryKDatasUtils.get_gp_current_info([code]) current_price = price_results[0]["price"] limit_down_price = target_codes_manager.get_limit_down_price(code) limit_up_price = target_codes_manager.get_limit_up_price(code) order_ref = huaxin_util.create_order_ref() try: result = huaxin_sell_util.start_sell(code, volume, price_type, limit_up_price, limit_down_price, current_price, blocking=True, request_id=request_id, order_ref=order_ref) async_log_util.info(logger_trade, f"API卖结果: {result}") send_response(result, client_id, request_id) except Exception as e: if str(e).find("超时") >= 0: send_response({"code": 0, "data": {"orderRef": order_ref}}, client_id, request_id) else: raise e finally: huaxin_trade_data_update.add_position_list() huaxin_trade_data_update.add_money_list() huaxin_trade_data_update.add_deal_list() else: if not price: limit_down_price = target_codes_manager.get_limit_down_price(code) limit_up_price = target_codes_manager.get_limit_up_price(code) price = sell_util.get_sell_price(price_type, limit_up_price, limit_down_price, None) if not price: raise Exception("尚未获取到买入价格") # 获取买1金额 price = round(float(price), 3) order_ref = huaxin_util.create_order_ref() result = huaxin_trade_api.order(direction, code, volume, price, sinfo=sinfo, order_ref=order_ref, blocking=True, request_id=request_id) # 2s内没成交就撤单 __cancel_sell_thread_pool.submit(__cancel_not_deal_order, code, order_ref) else: result = huaxin_trade_api.order(direction, code, volume, price, sinfo=sinfo, blocking=True, request_id=request_id) huaxin_trade_data_update.add_position_list() huaxin_trade_data_update.add_money_list() huaxin_trade_data_update.add_deal_list() send_response({"code": 0, "data": result}, client_id, request_id) except Exception as e: logger_debug.exception(e) send_response({"code": 1, "msg": str(e)}, client_id, request_id) elif type_ == "get_code_position_info": try: # 查询此仓 code = data.get("code") if code: results = huaxin_trade_record_manager.PositionManager().list_by_day(tool.get_now_date_str("%Y%m%d"), code) else: results = huaxin_trade_record_manager.PositionManager().get_from_cache() if constant.backtest_mode_info: results.clear() position_dict = backtest_trade.position_dict for p in position_dict: translation = position_dict[p] underlying_code = translation["SecurityID"] # TODO 获取当时的价格 translation["buy_list"] = [{"price": str(translation['TradePrice']), "tradeTime": l2_huaxin_util.convert_time(translation['OrderTime']), "volume": 10}] cb_code = target_codes_manager.get_cb_code(underlying_code) underlying_market = code_market_manager.get_market_info(underlying_code) results.append( {"securityID": cb_code, "securityName": gpcode_manager.CodesNameManager().get_code_name(cb_code), "buy_list": translation["buy_list"], "sell_list": [], "currentPosition": 10, "id": cb_code}) for r in results: cb_code = r["securityID"] underlying_code = target_codes_manager.get_underlying_code(cb_code) cb_market = code_market_manager.get_market_info(cb_code) underlying_market = code_market_manager.get_market_info(underlying_code) if cb_market: r["marketInfo"] = {"code": cb_market.code, "name": r["securityName"], "rate": f"{round(cb_market.rate * 100, 2)}%", "price": cb_market.price, "lastVolume": cb_market.total_bid_volume // 100, "buy1Money": output_util.money_desc( cb_market.buy1_price * cb_market.buy1_volume), "preClosePrice": cb_market.pre_close_price} if underlying_market: if not gpcode_manager.CodesNameManager().get_code_name(underlying_market.code): # 异步请求名称 threading.Thread( target=lambda: gpcode_manager.CodesNameManager().request_code_name(underlying_market.code), daemon=True).start() r["underlyingMarketInfo"] = {"code": underlying_market.code, "name": gpcode_manager.CodesNameManager().get_code_name( underlying_market.code), "rate": f"{round(underlying_market.rate * 100, 2)}%", "price": underlying_market.price, "lastVolume": underlying_market.total_bid_volume // 100, "buy1Money": output_util.money_desc( underlying_market.buy1_price * underlying_market.buy1_volume), "preClosePrice": underlying_market.pre_close_price} r["underlyingDetailInfo"] = output_data_util.load_code_detail_info(underlying_market.code) r["underlyingDetailInfo"]["price"] = underlying_market.price # 获取买点与卖点 if "buy_list" not in r: buys = huaxin_trade_record_manager.DealRecordManager().list_buy_by_code_cache(cb_code) r["buy_list"] = [{"price": str(x["price"]), "tradeTime": x["tradeTime"], "volume": x["volume"]} for x in buys] if "sell_list" not in r: sells = huaxin_trade_record_manager.DealRecordManager().list_sell_by_code_cache(cb_code) r["sell_list"] = [{"price": str(x["price"]), "tradeTime": x["tradeTime"], "volume": x["volume"]} for x in sells] send_response({"code": 0, "data": results}, client_id, request_id) except Exception as e: logger_debug.exception(e) send_response({"code": 1, "msg": str(e)}, client_id, request_id) elif type_ == "refresh_trade_data": # 刷新交易数据 ctype = data.get("ctype") if ctype == "money": huaxin_trade_data_update.add_money_list() elif ctype == "position_list": huaxin_trade_data_update.add_position_list() elif ctype == "deal_list": huaxin_trade_data_update.add_deal_list() elif ctype == "delegate_list": huaxin_trade_data_update.add_delegate_list("手动刷新") send_response({"code": 0, "data": {}}, client_id, request_id) elif type_ == outside_api_command_manager.API_TYPE_COMMON_REQUEST: # 常规接口 ctype = data['ctype'] if ctype == 'get_account_money': # 获取账户资金 result = huaxin_trade_record_manager.MoneyManager.get_data() send_response({"code": 0, "data": result}, client_id, request_id) elif ctype == 'set_backtest_mode': try: # 设置回测模式 date = data.get("date") mode = data.get("mode") if not date: date = tool.get_now_date_str() if mode: # 开始回撤 constant.backtest_mode_info = (True, date) threading.Thread(target=lambda: backtest_trade.start_backtest(date), daemon=True).start() else: # 结束回撤 constant.backtest_mode_info = None send_response({"code": 0, "data": {}}, client_id, request_id) except Exception as e: send_response({"code": 0, "data": {}, "msg": str(e)}, client_id, request_id) def test(): time.sleep(5) print("获取资金:", huaxin_trade_api.get_money()) # print("获取成交:", huaxin_trade_api.get_deal_list()) # print("下单:", huaxin_trade_api.order(1, "127075", 10, 140.5, blocking=True)) def read_l2_results(trade_call_back_queue): while True: try: result = trade_call_back_queue.get() if result: async_log_util.info(logger_trade, f"正股涨停,准备买入可转债:{result}") # 获取可以买的代码 code, trade_time = result[0], result[1] # 获取股票代码的可转债代码 cb_code = target_codes_manager.get_cb_code(code) if CodeTradeStateManager().get_trade_state(cb_code) == CodeTradeStateManager.TRADE_STATE_ALREADY_BUY: async_log_util.info(logger_trade, f"已经下单过:{result}") continue # 获取可转债的涨停价 market_info = code_market_manager.get_market_info(cb_code) limit_up_price = target_codes_manager.get_limit_up_price(cb_code) if market_info: buy_price = round(min(float(market_info.price * 1.02), float(limit_up_price)), 3) async_log_util.info(logger_trade, f"准备下单:{cb_code}-{buy_price}") # 买入20股 result = huaxin_trade_api.order(1, cb_code, 20, buy_price, blocking=True) async_log_util.info(logger_trade, f"可转债下单结果:{result}") huaxin_trade_data_update.add_position_list() huaxin_trade_data_update.add_money_list() huaxin_trade_data_update.add_deal_list() except Exception as e: logger_debug.exception(e) time.sleep(1) finally: pass def __read_market_data(queue_market: multiprocessing.Queue): while True: try: result = queue_market.get() if result: # (代码, 最近的价格, 涨幅, 买1价, 买1量, 成交总量, 买入量, 卖出量, 昨日收盘价, 时间戳) code_market_manager.set_market_info(result) code = result[0] # 正股需要加载板块 if code.find("11") != 0 and code.find("12") != 0: limit_up_price = tool.get_limit_up_price(code, result[8]) KPLCodeJXBlockManager().load_jx_blocks(result[0], result[3], float(limit_up_price), KPLLimitUpDataRecordManager.get_current_reasons()) except Exception as e: logger_debug.exception(e) time.sleep(1) finally: pass def __init_data(): """ 初始化参数 :return: """ try: market_dict = log_export.load_latest_market_info() for k in market_dict: code_market_manager.set_market_info(market_dict[k], with_log=False) except Exception as e: logger_debug.exception(e) def __kpl_limit_up_callback(results): if results: result_list = kpl_util.parseLimitUpData(results) kpl_data_manager.KPLLimitUpDataRecordManager.save_record(tool.get_now_date_str(), result_list) if __name__ == '__main__': # ===========初始化数据========== try: target_codes_manager.load_valid_codes_info() except Exception as e: logger_debug.exception(e) __init_data() trade_call_back_queue = multiprocessing.Queue() # 华鑫交易数据更新 huaxin_trade_data_update.run() # 定时拉取开盘啦涨停数据 threading.Thread(target=lambda: PullTask.run_limit_up_task(__kpl_limit_up_callback), daemon=True).start() # 仿真交易不运行交易客户端 # # ===========运行交易外部API========== # # # # # 策略与交易通信队列 # # 交易结果读取, 交易命令队列与交易查询队列设置为同一个 # queue_strategy_r_trade_w, queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_query = multiprocessing.Queue(), multiprocessing.Queue(), multiprocessing.Queue() # huaxin_trade_api.run_trade(queue_strategy_r_trade_w, queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_query) # # # # ===========运行交易端========== # tradeProcess = multiprocessing.Process( # target=trade_client_for_cb.run, # args=(queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_query, queue_strategy_r_trade_w,)) # tradeProcess.start() # ===========运行本地API接口========== # middle_api_protocol.SERVER_HOST = "192.168.3.122" manager = outside_api_command_manager.NewApiCommandManager() manager.init(middle_api_protocol.SERVER_HOST, middle_api_protocol.SERVER_PORT, command_callback, [("trade_cb", 50)]) manager.run(blocking=False) # threading.Thread(target=test, daemon=True).start() # ===========读取根据L2制定的买入策略========== threading.Thread(target=read_l2_results, args=(trade_call_back_queue,), daemon=True).start() # ===========异步日志持久化========== threading.Thread(target=async_log_util.run_sync, daemon=True).start() # ===========Redis缓存====================== threading.Thread(target=RedisUtils.run_loop, daemon=True).start() # 运行L2数据监听队列 queue_market = multiprocessing.Queue() threading.Thread(target=__read_market_data, args=(queue_market,), daemon=True).start() l2_client_for_cb.run(trade_call_back_queue, queue_market)