From eb33b717023d9871bd74e6dce47a065228cffefc Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期四, 02 十一月 2023 11:23:09 +0800 Subject: [PATCH] L2进程与策略进程分开 --- huaxin_client/l2_client.py | 108 +++++++++++++++++++++-------------------------------- 1 files changed, 43 insertions(+), 65 deletions(-) diff --git a/huaxin_client/l2_client.py b/huaxin_client/l2_client.py index 4da4698..74d425c 100644 --- a/huaxin_client/l2_client.py +++ b/huaxin_client/l2_client.py @@ -6,12 +6,15 @@ import queue import threading import time +from typing import List from huaxin_client import command_manager, l2_data_transform_protocol from huaxin_client import constant from huaxin_client import l2_data_manager import lev2mdapi +from huaxin_client.code_queue_distribute_manager import CodeQueueDistributeManager from huaxin_client.command_manager import L2ActionCallback +from huaxin_client.l2_data_manager import L2DataUploadManager from log_module import log, async_log_util from log_module.log import logger_local_huaxin_l2_subscript, logger_system, logger_local_huaxin_l2_transaction, \ logger_local_huaxin_g_cancel, logger_l2_codes_subscript @@ -40,7 +43,6 @@ SZ_Securities = [b"002456", b"002849", b"002281", b"002336", b"000936", b"000920", b"000757", b"002896", b"002725", b"000952", b"000526", b"000753", b"000681", b"002088", b"002436"] SZ_Bond_Securities = [b"100303", b"109559", b"112617"] -spi = None set_codes_data_queue = queue.Queue() market_code_dict = {} @@ -56,10 +58,11 @@ # 涔板叆鐨勫ぇ鍗曡鍗曞彿 - def __init__(self, api): + def __init__(self, api, l2_data_upload_manager): lev2mdapi.CTORATstpLev2MdSpi.__init__(self) self.__api = api self.is_login = False + self.l2_data_upload_manager = l2_data_upload_manager def __split_codes(self, codes): szse_codes = [] @@ -124,12 +127,16 @@ add_codes = codes - self.subscripted_codes del_codes = self.subscripted_codes - codes print("add del codes", add_codes, del_codes) - for c in codes: - l2_data_manager.add_target_code(c) - for c in del_codes: - l2_data_manager.del_target_code(c) - for c in add_codes: - l2_data_manager.run_upload_task(c, l2_data_callback) + try: + for c in del_codes: + self.l2_data_upload_manager.release_distributed_upload_queue(c) + l2_data_manager.del_target_code(c) + for c in codes: + self.l2_data_upload_manager.distribute_upload_queue(c) + l2_data_manager.add_target_code(c) + except Exception as e: + logger_system.error(f"L2浠g爜鍒嗛厤涓婁紶闃熷垪鍑洪敊:{str(e)}") + logger_system.exception(e) self.__subscribe(add_codes) self.__unsubscribe(del_codes) @@ -268,8 +275,7 @@ (pDepthMarketData['AskPrice5'], pDepthMarketData['AskVolume5']) ]} market_code_dict[pDepthMarketData['SecurityID']] = time.time() - - l2_data_manager.add_market_data(d) + self.l2_data_upload_manager.add_market_data(d) except: pass @@ -289,12 +295,6 @@ min_volume, limit_up_price = self.codes_volume_and_price_dict.get(code) # 杈撳嚭閫愮瑪鎴愪氦鏁版嵁 if pTransaction['ExecType'] == b"2": - # G鎾ゆ暟鎹殏鏃舵敞閲� - # transaction_big_order_nos = l2_data_manager.get_latest_transaction_order_nos(code) - # if transaction_big_order_nos and pTransaction['BuyNo'] in transaction_big_order_nos: - # # 姝e湪鎴愪氦鐨勮鍗曟挙鍗曚簡 - # l2_data_manager.trading_order_canceled(code, pTransaction['BuyNo']) - # async_log_util.info(logger_local_huaxin_g_cancel, f"G鎾ゆ挙鍗曪細{code} - {pTransaction['BuyNo']}") if min_volume is None: # 榛樿绛涢��50w if pTransaction['TradePrice'] * pTransaction['Volume'] < 500000: @@ -320,8 +320,7 @@ item["Side"] = "2" # 娣辫瘉鎾ゅ崟 print("閫愮瑪濮旀墭", item) - - l2_data_manager.add_l2_order_detail(item, 0, True) + self.l2_data_upload_manager.add_l2_order_detail(item, 0, True) else: if abs(pTransaction['TradePrice'] - limit_up_price) < 0.201: # 娑ㄥ仠浠� @@ -338,7 +337,7 @@ # return # self.__last_transaction_keys_dict[code] = key # print("閫愮瑪鎴愪氦", item) - l2_data_manager.add_transaction_detail(item) + self.l2_data_upload_manager.add_transaction_detail(item) def OnRtnOrderDetail(self, pOrderDetail): can_listen = False @@ -346,7 +345,8 @@ start_time = 0 if code in self.special_code_volume_for_order_dict: start_time = time.time() - if self.special_code_volume_for_order_dict[code][0] == pOrderDetail['Volume'] or constant.SHADOW_ORDER_VOLUME == pOrderDetail['Volume']: + if self.special_code_volume_for_order_dict[code][0] == pOrderDetail[ + 'Volume'] or constant.SHADOW_ORDER_VOLUME == pOrderDetail['Volume']: # 鐩戞帶鐩爣璁㈠崟涓庡奖瀛愯鍗� if self.special_code_volume_for_order_dict[code][1] > time.time(): # 鐗规畩閲忕洃鍚� @@ -354,13 +354,6 @@ else: self.special_code_volume_for_order_dict.pop(code) if not can_listen: - # 鏆傛椂娉ㄩ噴鎺塆鎾ょ浉鍏虫暟鎹骇鐢� - # if pOrderDetail['OrderStatus'] == b'D': - # transaction_big_order_nos = l2_data_manager.get_latest_transaction_order_nos(code) - # if transaction_big_order_nos and pOrderDetail['OrderNO'] in transaction_big_order_nos: - # # 姝e湪鎴愪氦鐨勮鍗曟挙鍗曚簡 - # l2_data_manager.trading_order_canceled(code, pOrderDetail['OrderNO']) - # async_log_util.info(logger_local_huaxin_g_cancel, f"G鎾ゆ挙鍗曪細{code} - {pOrderDetail['OrderNO']}") min_volume, limit_up_price = self.codes_volume_and_price_dict.get(code) if min_volume is None: # 榛樿绛涢��50w @@ -376,7 +369,7 @@ "OrderTime": pOrderDetail['OrderTime'], "MainSeq": pOrderDetail['MainSeq'], "SubSeq": pOrderDetail['SubSeq'], "OrderNO": pOrderDetail['OrderNO'], "OrderStatus": pOrderDetail['OrderStatus'].decode()} - l2_data_manager.add_l2_order_detail(item, start_time) + self.l2_data_upload_manager.add_l2_order_detail(item, start_time) def OnRtnBondMarketData(self, pDepthMarketData, FirstLevelBuyNum, FirstLevelBuyOrderVolumes, FirstLevelSellNum, FirstLevelSellOrderVolumes): @@ -481,7 +474,7 @@ class MyL2ActionCallback(L2ActionCallback): - def OnSetL2Position(self, client_id, request_id, codes_data): + def OnSetL2Position(self, codes_data): print("L2璁㈤槄鏁伴噺锛�", len(codes_data)) logger_l2_codes_subscript.info("鍗庨懌L2浠g爜澶勭悊闃熷垪鑾峰彇鍒版暟鎹細鏁伴噺-{}", len(codes_data)) try: @@ -490,7 +483,7 @@ logging.exception(e) -def __init_l2(): +def __init_l2(l2_data_upload_manager): print(lev2mdapi.CTORATstpLev2MdApi_GetApiVersion()) # case 1: Tcp鏂瑰紡 # g_SubMode=lev2mdapi.TORA_TSTP_MST_TCP @@ -502,7 +495,7 @@ # case 2闈炵紦瀛樻ā寮� api = lev2mdapi.CTORATstpLev2MdApi_CreateTstpLev2MdApi(g_SubMode, False) global spi - spi = Lev2MdSpi(api) + spi = Lev2MdSpi(api, l2_data_upload_manager) api.RegisterSpi(spi) # -------------------姝e紡妯″紡------------------------------------- if g_SubMode != lev2mdapi.TORA_TSTP_MST_MCAST: @@ -522,32 +515,21 @@ api.Init() -def __receive_from_pipe_trade(queue_trade_w_l2_r: multiprocessing.Queue): +def __receive_from_queue_trade(queue_trade_w_l2_r: multiprocessing.Queue): logger_system.info(f"l2_client __receive_from_pipe_trade 绾跨▼ID:{tool.get_thread_id()}") while True: try: value = queue_trade_w_l2_r.get() if value: - value = value.decode("utf-8") + if type(value) == bytes: + value = value.decode("utf-8") data = json.loads(value) - if data["type"] == "listen_volume": + _type = data["type"] + if _type == "listen_volume": volume = data["data"]["volume"] code = data["data"]["code"] spi.set_code_special_watch_volume(code, volume) - except Exception as e: - logging.exception(e) - - -def __receive_from_pipe_strategy(pipe_): - logger_system.info(f"l2_client __receive_from_pipe_strategy 绾跨▼ID:{tool.get_thread_id()}") - while True: - # print("__receive_from_pipe_strategy") - try: - val = pipe_.recv() - if val: - print("L2瀹㈡埛绔帴鍙楀埌鏁版嵁") - data = json.loads(val) - if data["data"]["type"] == "l2_cmd": + elif _type == "l2_cmd": l2CommandManager.process_command(command_manager.CLIENT_TYPE_CMD_L2, None, data) except Exception as e: logging.exception(e) @@ -556,28 +538,24 @@ pipe_strategy = None -def run(queue_trade_w_l2_r: multiprocessing.Queue, _pipe_strategy, - _l2_data_callback: l2_data_transform_protocol.L2DataCallBack) -> None: +def run(queue_r: multiprocessing.Queue, order_queues: List[multiprocessing.Queue], + transaction_queues: List[multiprocessing.Queue], market_queue: multiprocessing.Queue) -> None: logger_system.info("L2杩涚▼ID锛歿}", os.getpid()) logger_system.info(f"l2_client 绾跨▼ID:{tool.get_thread_id()}") try: log.close_print() - if queue_trade_w_l2_r is not None: - t1 = threading.Thread(target=lambda: __receive_from_pipe_trade(queue_trade_w_l2_r), daemon=True) + if queue_r is not None: + t1 = threading.Thread(target=lambda: __receive_from_queue_trade(queue_r), daemon=True) t1.start() - if _pipe_strategy is not None: - global pipe_strategy - pipe_strategy = _pipe_strategy - t1 = threading.Thread(target=__receive_from_pipe_strategy, args=(_pipe_strategy,), daemon=True) - t1.start() - __init_l2() - global l2_data_callback - l2_data_callback = _l2_data_callback - l2_data_manager.run_upload_common(l2_data_callback) - l2_data_manager.run_upload_trading_canceled(l2_data_callback) + + # 鍒濆鍖� + order_queue_distribute_manager = CodeQueueDistributeManager(order_queues) + transaction_queue_distribute_manager = CodeQueueDistributeManager(transaction_queues) + l2_data_upload_manager = L2DataUploadManager(order_queue_distribute_manager, + transaction_queue_distribute_manager, market_queue) + __init_l2(l2_data_upload_manager) + l2_data_manager.run_upload_common() l2_data_manager.run_log() - l2_data_manager.run_upload_daemon(l2_data_callback) - # l2_data_manager.run_test(l2_data_callback) global l2CommandManager l2CommandManager = command_manager.L2CommandManager() l2CommandManager.init(MyL2ActionCallback()) @@ -589,6 +567,6 @@ if __name__ == "__main__": - run(None, None, None) + # run(None, None, None) # spi.set_codes_data([("000333", 12000)]) input() -- Gitblit v1.8.0