""" 可转债入口函数 """ import multiprocessing import threading import time from code_attribute import target_codes_manager from huaxin_client import l2_client_for_cb, trade_client_for_cb from log_module import async_log_util from trade import huaxin_trade_api, huaxin_trade_data_update from utils import middle_api_protocol, outside_api_command_manager, constant constant.LOG_DIR = "logs_cb" from log_module.log import logger_debug, logger_trade def command_callback(client_id, request_id, data): """ 命令回调 :param client_id: :param request_id: :param data: json格式数据 :return: """ type_ = data.get('type') if type_ == outside_api_command_manager.API_TYPE_TRADE: # 交易 pass elif type_ == "get_code_position_info": # 查询此仓 pass elif type_ == "get_code_position_info": # 查询此仓 pass elif type_ == outside_api_command_manager.API_TYPE_COMMON_REQUEST: # 常规接口 ctype = data['ctype'] if ctype == 'get_account_money': # 获取账户资金 pass logger_debug.info(f"接收到命令:{request_id} - f{client_id} - {data}") def test(): time.sleep(5) # print("获取持仓:", huaxin_trade_api.get_position_list()) # print("获取资金:", huaxin_trade_api.get_money()) # print("获取成交:", huaxin_trade_api.get_deal_list()) print("下单:", huaxin_trade_api.order(1, "110060", 10, 140.5, blocking=True)) def read_l2_results(trade_call_back_queue): while True: try: result = trade_call_back_queue.get() if result: async_log_util.info(logger_trade, f"正股涨停,准备买入可转债:{result}") # 获取可以买的代码 code, trade_time = result[0], result[1] # 获取股票代码的可转债代码 cb_code = target_codes_manager.get_cb_code(code) # 获取可转债的涨停价 limit_up_price = target_codes_manager.get_limit_up_price(cb_code) if limit_up_price: async_log_util.info(logger_trade, f"准备下单:{cb_code}-{limit_up_price}") # 买入20股 huaxin_trade_api.order(1, cb_code, 20, round(float(limit_up_price), 3), blocking=False) except Exception as e: logger_debug.exception(e) time.sleep(1) finally: pass if __name__ == '__main__': # ===========初始化数据========== try: target_codes_manager.load_valid_codes_info() except Exception as e: logger_debug.exception(e) trade_call_back_queue = multiprocessing.Queue() middle_api_protocol.SERVER_PORT = 10008 middle_api_protocol.SERVER_HOST = "43.138.167.68" # ===========运行交易外部API========== # 策略与交易通信队列 # 交易结果读取, 交易命令队列与交易查询队列设置为同一个 queue_strategy_r_trade_w, queue_strategy_w_trade_r = multiprocessing.Queue(), multiprocessing.Queue() huaxin_trade_api.run_trade(queue_strategy_r_trade_w, queue_strategy_w_trade_r, queue_strategy_w_trade_r) # 华鑫交易数据更新 huaxin_trade_data_update.run() # ===========运行交易端========== tradeProcess = multiprocessing.Process( target=trade_client_for_cb.run, args=(queue_strategy_w_trade_r, queue_strategy_r_trade_w,)) tradeProcess.start() # ===========运行本地API接口========== # middle_api_protocol.SERVER_HOST = "192.168.3.122" manager = outside_api_command_manager.NewApiCommandManager() manager.init(middle_api_protocol.SERVER_HOST, middle_api_protocol.SERVER_PORT, command_callback, [("trade_cb", 20)]) manager.run(blocking=False) # threading.Thread(target=test, daemon=True).start() # ===========读取根据L2制定的买入策略========== threading.Thread(target=read_l2_results, args=(trade_call_back_queue,), daemon=True).start() # ===========异步日志持久化========== threading.Thread(target=async_log_util.run_sync, daemon=True).start() # 运行L2数据监听队列 l2_client_for_cb.run(trade_call_back_queue)