| | |
| | | |
| | | class MdSpi(xmdapi.CTORATstpXMdSpi): |
| | | def __init__(self, api): |
| | | self.codes_sh, self.codes_sz = get_level1_codes() |
| | | for i in range(3): |
| | | try: |
| | | self.codes_sh, self.codes_sz = get_level1_codes() |
| | | break |
| | | except: |
| | | time.sleep(2) |
| | | xmdapi.CTORATstpXMdSpi.__init__(self) |
| | | self.__api = api |
| | | |
| | |
| | | {"type": type_, "data": {"data": datas}}) |
| | | if pipe_l2 is not None: |
| | | pipe_l2.send(fdata) |
| | | |
| | | |
| | | |
| | | def run(pipe_l2): |
| | |
| | | api.Init() |
| | | |
| | | |
| | | def __receive_from_pipe(pipe): |
| | | def __receive_from_pipe_trade(pipe): |
| | | while True: |
| | | try: |
| | | value = pipe.recv() |
| | |
| | | volume = data["data"]["volume"] |
| | | code = data["data"]["code"] |
| | | spi.set_code_special_watch_volume(code, volume) |
| | | elif data["type"] == "set_l2_codes": |
| | | except: |
| | | pass |
| | | |
| | | |
| | | def __receive_from_pipe_strategy(pipe): |
| | | while True: |
| | | try: |
| | | value = pipe.recv() |
| | | if value: |
| | | value = value.decode("utf-8") |
| | | data = json.loads(value) |
| | | if data["type"] == "set_l2_codes": |
| | | data = data["data"] |
| | | l2CommandManager.process_command(command_manager.CLIENT_TYPE_CMD_L2, None, data) |
| | | except: |
| | | pass |
| | | |
| | | |
| | | def run(pipe_trade=None): |
| | | def run(pipe_trade=None, pipe_strategy=None): |
| | | __init_l2() |
| | | if pipe_trade is not None: |
| | | t1 = threading.Thread(target=lambda: __receive_from_pipe(pipe_trade), daemon=True) |
| | | t1 = threading.Thread(target=lambda: __receive_from_pipe_trade(pipe_trade), daemon=True) |
| | | t1.start() |
| | | if pipe_strategy is not None: |
| | | t1 = threading.Thread(target=lambda: __receive_from_pipe_strategy(pipe_strategy), daemon=True) |
| | | t1.start() |
| | | l2_data_manager.run_upload_common() |
| | | global l2CommandManager |
| | |
| | | # from huaxin_api import trade_client, l2_client, l1_client |
| | | |
| | | |
| | | def createTradeServer(pipe_server, pipe_trade, pipe_l1): |
| | | def createTradeServer(pipe_server, pipe_trade, pipe_l1, pipe_l2): |
| | | # 初始化参数 |
| | | global_data_loader.init() |
| | | |
| | |
| | | t1.start() |
| | | |
| | | # 交易接口服务 |
| | | t1 = threading.Thread(target=trade_api_server.run, args=(pipe_server,), daemon=True) |
| | | t1 = threading.Thread(target=trade_api_server.run, args=(pipe_server, pipe_l2), daemon=True) |
| | | t1.start() |
| | | |
| | | # redis后台服务 |
| | |
| | | pst_trade, pst_strategy = multiprocessing.Pipe() |
| | | # 交易与l2之间的通信 |
| | | ptl2_trade, ptl2_l2 = multiprocessing.Pipe() |
| | | # l1与trade间的通信 |
| | | pl1t_l1, pl1t_trade = multiprocessing.Pipe() |
| | | # 策略与l2之间的通信 |
| | | psl2_strategy, psl2_l2 = multiprocessing.Pipe() |
| | | |
| | | # l1与策略间的通信 |
| | | pl1t_l1, pl1t_strategy = multiprocessing.Pipe() |
| | | |
| | | serverProcess = multiprocessing.Process(target=createServer, args=(pss_server,)) |
| | | serverProcess.start() |
| | | |
| | | tradeServerProcess = multiprocessing.Process(target=createTradeServer, args=(pss_strategy, pst_strategy, pl1t_trade,)) |
| | | tradeServerProcess = multiprocessing.Process(target=createTradeServer, |
| | | args=(pss_strategy, pst_strategy, pl1t_strategy, psl2_strategy)) |
| | | tradeServerProcess.start() |
| | | |
| | | # 等待服务器启动 |
| | | time.sleep(10) |
| | | |
| | | # 交易进程与L2进程 |
| | | tradeProcess = multiprocessing.Process(target=huaxin_api.trade_client.run, args=(ptl2_trade, pst_trade)) |
| | | l2Process = multiprocessing.Process(target=huaxin_api.l2_client.run, args=(ptl2_l2,)) |
| | | l2Process = multiprocessing.Process(target=huaxin_api.l2_client.run, args=(ptl2_l2, psl2_l2,)) |
| | | tradeProcess.start() |
| | | l2Process.start() |
| | | # L1订阅数据 |
| | |
| | | "data": data, |
| | | "request_id": request_id} |
| | | root_data = socket_util.encryp_client_params_sign(root_data) |
| | | pipe_trade.send(json.dumps(root_data).encode(encoding='utf-8')) |
| | | pipe_trade.send(json.dumps(root_data)) |
| | | hx_logger_trade_loop.info("请求发送成功:request_id-{}", request_id) |
| | | except BrokenPipeError as e: |
| | | hx_logger_trade_loop.info("请求发送异常:request_id-{} error-{}", request_id, str(e)) |
| | |
| | | |
| | | from trade.huaxin import huaxin_trade_api, huaxin_trade_record_manager, \ |
| | | huaxin_trade_data_update |
| | | from trade.huaxin.huaxin_trade_api import ClientSocketManager |
| | | from trade.trade_manager import TradeTargetCodeModeManager |
| | | from utils import socket_util, tool, huaxin_util, data_export_util |
| | | |
| | |
| | | codes = [d[0] for d in datas] |
| | | for code in codes: |
| | | block_info.init_code(code) |
| | | result = huaxin_trade_api.set_l2_codes_data(datas) |
| | | print("设置L2代码结果:", result) |
| | | root_data = {"type": ClientSocketManager.CLIENT_TYPE_CMD_L2, |
| | | "data": datas, |
| | | "request_id": f"{ ClientSocketManager.CLIENT_TYPE_CMD_L2}_{round(time.time()*1000)}"} |
| | | root_data = socket_util.encryp_client_params_sign(root_data) |
| | | pipe_l2.send(json.dumps(root_data)) |
| | | print("设置L2代码结束") |
| | | except Exception as e: |
| | | logging.exception(e) |
| | | finally: |
| | |
| | | time.sleep(1) |
| | | |
| | | |
| | | def run(pipe): |
| | | def run(pipe_server, pipe_l2_): |
| | | global pipe_l2 |
| | | pipe_l2 = pipe_l2_ |
| | | |
| | | print("create TradeApiServer") |
| | | # 拉取交易信息 |
| | | huaxin_trade_data_update.run() |
| | |
| | | t1 = threading.Thread(target=lambda: __set_target_codes(), daemon=True) |
| | | t1.start() |
| | | |
| | | t1 = threading.Thread(target=lambda: __read_sync_task(pipe), daemon=True) |
| | | t1 = threading.Thread(target=lambda: __read_sync_task(pipe_server), daemon=True) |
| | | t1.start() |
| | | |
| | | laddr = "0.0.0.0", 10009 |