From 25a69761838501a775896eb8a2234cb8ee0a446d Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期二, 14 十一月 2023 17:27:13 +0800 Subject: [PATCH] bug修复 --- huaxin_client/l2_client.py | 107 +++++++++++++++++++++++++++++++++++++++++++++++++---- 1 files changed, 99 insertions(+), 8 deletions(-) diff --git a/huaxin_client/l2_client.py b/huaxin_client/l2_client.py index 005f3c3..c4230e5 100644 --- a/huaxin_client/l2_client.py +++ b/huaxin_client/l2_client.py @@ -56,7 +56,7 @@ class Lev2MdSpi(lev2mdapi.CTORATstpLev2MdSpi): latest_codes_set = set() - codes_volume_and_price_dict = {} + special_code_volume_for_order_dict = {} # 宸茬粡璁㈤槄鐨勪唬鐮� subscripted_codes = set() @@ -70,6 +70,7 @@ self.__api = api self.is_login = False self.l2_data_upload_manager = l2_data_upload_manager + self.codes_volume_and_price_dict = {} def __split_codes(self, codes): szse_codes = [] @@ -131,7 +132,7 @@ code = d[0] codes.add(code) self.codes_volume_and_price_dict[code] = (d[1], d[2]) - self.l2_data_upload_manager.set_order_fileter_condition(code, d[1], d[2]) + self.l2_data_upload_manager.set_order_fileter_condition(code, d[1], float(d[2])) add_codes = codes - self.subscripted_codes del_codes = self.subscripted_codes - codes print("add del codes", add_codes, del_codes) @@ -185,11 +186,14 @@ def set_code_special_watch_volume(self, code, volume): # 鏈夋晥鏈熶负3s # self.special_code_volume_for_order_dict[code] = (volume, time.time() + 3) - min_volume, limit_up_price = self.codes_volume_and_price_dict.get(code) - self.l2_data_upload_manager.set_order_fileter_condition(code, min_volume, limit_up_price, - {volume, constant.SHADOW_ORDER_VOLUME}, time.time() + 3) + d = self.codes_volume_and_price_dict.get(code) + if d: + min_volume, limit_up_price = d[0], d[1] + self.l2_data_upload_manager.set_order_fileter_condition(code, min_volume, limit_up_price, + {volume, constant.SHADOW_ORDER_VOLUME}, + time.time() + 3) - async_log_util.info(logger_local_huaxin_l2_subscript, f"璁剧疆涓嬪崟閲忕洃鍚細{code}-{volume}") + async_log_util.info(logger_local_huaxin_l2_subscript, f"璁剧疆涓嬪崟閲忕洃鍚細{code}-{volume}") def OnFrontConnected(self): print("OnFrontConnected") @@ -551,6 +555,42 @@ def run(queue_r: multiprocessing.Queue, order_queues: List[multiprocessing.Queue], transaction_queues: List[multiprocessing.Queue], market_queue: multiprocessing.Queue) -> None: + # def test_add_codes(): + # time.sleep(5) + # # if value: + # # if type(value) == bytes: + # # value = value.decode("utf-8") + # # data = json.loads(value) + # # _type = data["type"] + # # if _type == "listen_volume": + # # volume = data["data"]["volume"] + # # code = data["data"]["code"] + # # spi.set_code_special_watch_volume(code, volume) + # # elif _type == "l2_cmd": + # # l2CommandManager.process_command(command_manager.CLIENT_TYPE_CMD_L2, None, data) + # + # demo_datas = [("603002", int(50 * 10000 / 6.35), 6.35), ("002654", int(50 * 10000 / 15.59), 15.59), + # ("603701", int(50 * 10000 / 14.28), 14.28), ("002908", int(50 * 10000 / 12.78), 12.78)] + # + # queue_r.put_nowait(json.dumps({"type": "l2_cmd", "data": [demo_datas[0]]})) + # time.sleep(1) + # + # spi.l2_data_upload_manager.add_l2_order_detail( + # {'SecurityID': '603002', 'Price': 6.35, 'Volume': 275000, 'Side': "1", 'OrderType': '0', + # 'OrderTime': '13000015', + # 'MainSeq': 2, 'SubSeq': 6739147, 'OrderNO': 5512466, 'OrderStatus': 'D'}, 0) + # spi.l2_data_upload_manager.add_l2_order_detail( + # {'SecurityID': '603002', 'Price': 6.35, 'Volume': 200, 'Side': "1", 'OrderType': '0', + # 'OrderTime': '13000015', + # 'MainSeq': 2, 'SubSeq': 6739148, 'OrderNO': 5512467, 'OrderStatus': 'D'}, 0) + # queue_r.put_nowait(json.dumps({"type": "listen_volume", "data": {"code": "603002", "volume": 100}})) + # time.sleep(0.1) + # spi.l2_data_upload_manager.add_l2_order_detail( + # {'SecurityID': '603002', 'Price': 6.35, 'Volume': 100, 'Side': "1", 'OrderType': '0', + # 'OrderTime': '13000015', + # 'MainSeq': 2, 'SubSeq': 6739148, 'OrderNO': 5512467, 'OrderStatus': 'D'}, 0) + + logger_system.info("L2杩涚▼ID锛歿}", os.getpid()) logger_system.info(f"l2_client 绾跨▼ID:{tool.get_thread_id()}") try: @@ -567,6 +607,8 @@ __init_l2(l2_data_upload_manager) l2_data_manager.run_upload_common() l2_data_manager.run_log() + # 娴嬭瘯 + # threading.Thread(target=lambda: test_add_codes(),daemon=True).start() global l2CommandManager l2CommandManager = command_manager.L2CommandManager() l2CommandManager.init(MyL2ActionCallback()) @@ -577,7 +619,56 @@ time.sleep(2) +def test(): + def test_add_codes(): + time.sleep(5) + # if value: + # if type(value) == bytes: + # value = value.decode("utf-8") + # data = json.loads(value) + # _type = data["type"] + # if _type == "listen_volume": + # volume = data["data"]["volume"] + # code = data["data"]["code"] + # spi.set_code_special_watch_volume(code, volume) + # elif _type == "l2_cmd": + # l2CommandManager.process_command(command_manager.CLIENT_TYPE_CMD_L2, None, data) + + demo_datas = [("603002", int(50 * 10000 / 6.35), 6.35), ("002654", int(50 * 10000 / 15.59), 15.59), + ("603701", int(50 * 10000 / 14.28), 14.28), ("002908", int(50 * 10000 / 12.78), 12.78)] + + queue_r.put_nowait(json.dumps({"type": "l2_cmd", "data": [demo_datas[0]]})) + time.sleep(1) + + spi.l2_data_upload_manager.add_l2_order_detail( + {'SecurityID': '603002', 'Price': 6.35, 'Volume': 275000, 'Side': "1", 'OrderType': '0', + 'OrderTime': '13000015', + 'MainSeq': 2, 'SubSeq': 6739147, 'OrderNO': 5512466, 'OrderStatus': 'D'}, 0) + spi.l2_data_upload_manager.add_l2_order_detail( + {'SecurityID': '603002', 'Price': 6.35, 'Volume': 200, 'Side': "1", 'OrderType': '0', + 'OrderTime': '13000015', + 'MainSeq': 2, 'SubSeq': 6739148, 'OrderNO': 5512467, 'OrderStatus': 'D'}, 0) + queue_r.put_nowait(json.dumps({"type": "listen_volume", "data": {"code": "603002", "volume": 100}})) + time.sleep(0.1) + spi.l2_data_upload_manager.add_l2_order_detail( + {'SecurityID': '603002', 'Price': 6.35, 'Volume': 100, 'Side': "1", 'OrderType': '0', + 'OrderTime': '13000015', + 'MainSeq': 2, 'SubSeq': 6739148, 'OrderNO': 5512467, 'OrderStatus': 'D'}, 0) + + + + + queue_r = multiprocessing.Queue() + order_queues = [] + transaction_queues = [] + market_queue = multiprocessing.Queue() + for i in range(20): + order_queues.append(multiprocessing.Queue()) + transaction_queues.append(multiprocessing.Queue()) + threading.Thread(target=test_add_codes).start() + + run(queue_r, order_queues, transaction_queues, market_queue) + + if __name__ == "__main__": - # run(None, None, None) - # spi.set_codes_data([("000333", 12000)]) input() -- Gitblit v1.8.0