""" 可转债入口函数 """ import json import logging import multiprocessing import threading import time from code_attribute import target_codes_manager, gpcode_manager, code_market_manager, history_k_data_util from huaxin_client import l2_client_for_cb, trade_client_for_cb from huaxin_client.client_network import SendResponseSkManager from log_module import async_log_util, log_export from records import huaxin_trade_record_manager from trade import huaxin_trade_api, huaxin_trade_data_update, huaxin_sell_util from utils import middle_api_protocol, outside_api_command_manager, constant, tool, huaxin_util, socket_util, sell_util, \ output_util middle_api_protocol.SERVER_PORT = 10008 middle_api_protocol.SERVER_HOST = "43.138.167.68" constant.LOG_DIR = "logs_cb" from log_module.log import logger_debug, logger_trade, printlog import concurrent.futures __cancel_sell_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=8) def __send_response(data_bytes): sk = SendResponseSkManager.create_send_response_sk(addr=middle_api_protocol.SERVER_HOST, port=middle_api_protocol.SERVER_PORT) try: data_bytes = socket_util.load_header(data_bytes) sk.sendall(data_bytes) result, header_str = socket_util.recv_data(sk) result = json.loads(result) if result["code"] != 0: raise Exception(result['msg']) finally: sk.close() def send_response(data, _client_id, _request_id): data_bytes = json.dumps({"type": "response", "data": data, "client_id": _client_id, "request_id": _request_id}).encode('utf-8') for i in range(3): try: __send_response(data_bytes) printlog("发送数据成功") break except Exception as e1: logging.exception(e1) # 撤长期没有成交的单 def __cancel_not_deal_order(code, order_ref, timeout=3): time.sleep(timeout) # 撤买单 huaxin_trade_api.cancel_order(1, code, "", orderRef=order_ref) def command_callback(client_id, request_id, data): """ 命令回调 :param client_id: :param request_id: :param data: json格式数据 :return: """ type_ = data.get('type') if type_ == outside_api_command_manager.API_TYPE_TRADE: try: trade_type = data["trade_type"] if trade_type == outside_api_command_manager.TRADE_TYPE_ORDER: code = data["code"] direction = data["direction"] volume = data["volume"] price_type = data["price_type"] price = data["price"] sinfo = data["sinfo"] if direction == 2: # price_type: 0-价格笼子 1-跌停价 2-涨停价 3-现价 4-买5价 async_log_util.info(logger_trade, f"API卖: 接收数据-{data}") current_price = None market_info = code_market_manager.get_market_info(code) if market_info: current_price = market_info.price else: price_results = history_k_data_util.HistoryKDatasUtils.get_gp_current_info([code]) current_price = price_results[0]["price"] limit_down_price = target_codes_manager.get_limit_down_price(code) limit_up_price = target_codes_manager.get_limit_up_price(code) order_ref = huaxin_util.create_order_ref() try: result = huaxin_sell_util.start_sell(code, volume, price_type, limit_up_price, limit_down_price, current_price, blocking=True, request_id=request_id, order_ref=order_ref) async_log_util.info(logger_trade, f"API卖结果: {result}") send_response(result, client_id, request_id) except Exception as e: if str(e).find("超时") >= 0: send_response({"code": 0, "data": {"orderRef": order_ref}}, client_id, request_id) else: raise e finally: huaxin_trade_data_update.add_position_list() huaxin_trade_data_update.add_money_list() huaxin_trade_data_update.add_deal_list() else: if not price: limit_down_price = target_codes_manager.get_limit_down_price(code) limit_up_price = target_codes_manager.get_limit_up_price(code) price = sell_util.get_sell_price(price_type, limit_up_price, limit_down_price, None) if not price: raise Exception("尚未获取到买入价格") # 获取买1金额 price = round(float(price), 3) order_ref = huaxin_util.create_order_ref() result = huaxin_trade_api.order(direction, code, volume, price, sinfo=sinfo, order_ref=order_ref, blocking=True, request_id=request_id) # 2s内没成交就撤单 __cancel_sell_thread_pool.submit(__cancel_not_deal_order, code, order_ref) else: result = huaxin_trade_api.order(direction, code, volume, price, sinfo=sinfo, blocking=True, request_id=request_id) huaxin_trade_data_update.add_position_list() huaxin_trade_data_update.add_money_list() huaxin_trade_data_update.add_deal_list() send_response({"code": 0, "data": result}, client_id, request_id) except Exception as e: logger_debug.exception(e) send_response({"code": 1, "msg": str(e)}, client_id, request_id) elif type_ == "get_code_position_info": # 查询此仓 code = data.get("code") if code: results = huaxin_trade_record_manager.PositionManager().list_by_day(tool.get_now_date_str("%Y%m%d"), code) else: results = huaxin_trade_record_manager.PositionManager().get_from_cache() for r in results: cb_code = r["securityID"] underlying_code = target_codes_manager.get_underlying_code(cb_code) cb_market = code_market_manager.get_market_info(cb_code) underlying_market = code_market_manager.get_market_info(underlying_code) if cb_market: r["marketInfo"] = {"code": cb_market.code, "name": r["securityName"], "rate": f"{round(cb_market.rate * 100, 2)}%", "price": cb_market.price, "lastVolume": cb_market.total_bid_volume // 100, "buy1Money": output_util.money_desc(cb_market.buy1_price * cb_market.buy1_volume), "preClosePrice": cb_market.pre_close_price} if underlying_market: if not gpcode_manager.CodesNameManager().get_code_name(underlying_market.code): # 异步请求名称 threading.Thread( target=lambda: gpcode_manager.CodesNameManager().request_code_name(underlying_market.code), daemon=True).start() r["underlyingMarketInfo"] = {"code": underlying_market.code, "name": gpcode_manager.CodesNameManager().get_code_name( underlying_market.code), "rate": f"{round(underlying_market.rate * 100, 2)}%", "price": underlying_market.price, "lastVolume": underlying_market.total_bid_volume // 100, "buy1Money": output_util.money_desc( underlying_market.buy1_price * underlying_market.buy1_volume), "preClosePrice": underlying_market.pre_close_price} # 获取买点与卖点 buys = huaxin_trade_record_manager.DealRecordManager().list_buy_by_code_cache(cb_code) sells = huaxin_trade_record_manager.DealRecordManager().list_sell_by_code_cache(cb_code) r["buy_list"] = [{"price": str(x["price"]), "tradeTime": x["tradeTime"], "volume": x["volume"]} for x in buys] r["sell_list"] = [{"price": str(x["price"]), "tradeTime": x["tradeTime"], "volume": x["volume"]} for x in sells] send_response({"code": 0, "data": results}, client_id, request_id) elif type_ == "refresh_trade_data": # 刷新交易数据 ctype = data.get("ctype") if ctype == "money": huaxin_trade_data_update.add_money_list() elif ctype == "position_list": huaxin_trade_data_update.add_position_list() elif ctype == "deal_list": huaxin_trade_data_update.add_deal_list() elif ctype == "delegate_list": huaxin_trade_data_update.add_delegate_list("手动刷新") send_response({"code": 0, "data": {}}, client_id, request_id) elif type_ == outside_api_command_manager.API_TYPE_COMMON_REQUEST: # 常规接口 ctype = data['ctype'] if ctype == 'get_account_money': # 获取账户资金 result = huaxin_trade_record_manager.MoneyManager.get_data() send_response({"code": 0, "data": result}, client_id, request_id) def test(): time.sleep(5) print("获取资金:", huaxin_trade_api.get_money()) # print("获取成交:", huaxin_trade_api.get_deal_list()) # print("下单:", huaxin_trade_api.order(1, "127075", 10, 140.5, blocking=True)) __deal_codes = set() def read_l2_results(trade_call_back_queue): while True: try: result = trade_call_back_queue.get() if result: async_log_util.info(logger_trade, f"正股涨停,准备买入可转债:{result}") # 获取可以买的代码 code, trade_time = result[0], result[1] if code in __deal_codes: async_log_util.info(logger_trade, f"已经下单过:{result}") continue # 获取股票代码的可转债代码 cb_code = target_codes_manager.get_cb_code(code) # 获取可转债的涨停价 limit_up_price = target_codes_manager.get_limit_up_price(cb_code) if limit_up_price: async_log_util.info(logger_trade, f"准备下单:{cb_code}-{limit_up_price}") # 买入20股 result = huaxin_trade_api.order(1, cb_code, 20, round(float(limit_up_price), 3), blocking=True) __deal_codes.add(code) async_log_util.info(logger_trade, f"可转债下单结果:{result}") huaxin_trade_data_update.add_position_list() huaxin_trade_data_update.add_money_list() huaxin_trade_data_update.add_deal_list() except Exception as e: logger_debug.exception(e) time.sleep(1) finally: pass def __read_market_data(queue_market: multiprocessing.Queue): while True: try: result = queue_market.get() if result: # 代码, 最近的价格, 涨幅, 买1价, 买1量, 成交总量 code_market_manager.set_market_info(result) except Exception as e: logger_debug.exception(e) time.sleep(1) finally: pass def __init_data(): """ 初始化参数 :return: """ try: market_dict = log_export.load_latest_market_info() for k in market_dict: code_market_manager.set_market_info(market_dict[k], with_log=False) except Exception as e: logger_debug.exception(e) if __name__ == '__main__': # ===========初始化数据========== try: target_codes_manager.load_valid_codes_info() except Exception as e: logger_debug.exception(e) __init_data() trade_call_back_queue = multiprocessing.Queue() # 华鑫交易数据更新 huaxin_trade_data_update.run() # 仿真交易不运行交易客户端 # # ===========运行交易外部API========== # # # # # 策略与交易通信队列 # # 交易结果读取, 交易命令队列与交易查询队列设置为同一个 # queue_strategy_r_trade_w, queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_query = multiprocessing.Queue(), multiprocessing.Queue(), multiprocessing.Queue() # huaxin_trade_api.run_trade(queue_strategy_r_trade_w, queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_query) # # # # ===========运行交易端========== # tradeProcess = multiprocessing.Process( # target=trade_client_for_cb.run, # args=(queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_query, queue_strategy_r_trade_w,)) # tradeProcess.start() # ===========运行本地API接口========== # middle_api_protocol.SERVER_HOST = "192.168.3.122" manager = outside_api_command_manager.NewApiCommandManager() manager.init(middle_api_protocol.SERVER_HOST, middle_api_protocol.SERVER_PORT, command_callback, [("trade_cb", 50)]) manager.run(blocking=False) # threading.Thread(target=test, daemon=True).start() # ===========读取根据L2制定的买入策略========== threading.Thread(target=read_l2_results, args=(trade_call_back_queue,), daemon=True).start() # ===========异步日志持久化========== threading.Thread(target=async_log_util.run_sync, daemon=True).start() # 运行L2数据监听队列 queue_market = multiprocessing.Queue() threading.Thread(target=__read_market_data, args=(queue_market,), daemon=True).start() l2_client_for_cb.run(trade_call_back_queue, queue_market)