From 87a68117b9957219f17dc7830cb2b33b88a9d1d8 Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期三, 30 八月 2023 01:54:10 +0800 Subject: [PATCH] L2进程与策略进程合并 --- huaxin_client/l2_data_manager.py | 33 +++++++++++++-------------------- 1 files changed, 13 insertions(+), 20 deletions(-) diff --git a/huaxin_client/l2_data_manager.py b/huaxin_client/l2_data_manager.py index cf959e7..aa59f02 100644 --- a/huaxin_client/l2_data_manager.py +++ b/huaxin_client/l2_data_manager.py @@ -7,11 +7,12 @@ import random import threading import time -from huaxin_client import socket_util, l2_data_transaction_protocol +from huaxin_client import socket_util, l2_data_transform_protocol from huaxin_client.client_network import SendResponseSkManager # 娲诲姩鏃堕棿 +from huaxin_client.l2_data_transform_protocol import L2DataCallBack from log_module import log_export from log_module.log import logger_local_huaxin_l2_error, logger_local_huaxin_l2_upload, logger_local_huaxin_l2_buy_no, \ logger_local_huaxin_g_cancel, hx_logger_contact_debug @@ -161,13 +162,9 @@ # 寰幆璇诲彇涓婁紶鏁版嵁 -def __run_upload_order(code, pipe): +def __run_upload_order(code: str, l2_data_callback: L2DataCallBack) -> None: if code not in tmep_order_detail_queue_dict: tmep_order_detail_queue_dict[code] = queue.Queue() - tag = l2_data_transaction_protocol.get_mmap_tag_name_for_l2_order(code) - # with contextlib.closing( - # mmap.mmap(-1, 1000 * 100, tag, - # access=mmap.ACCESS_WRITE)) as _mmap: if True: while True: # print("order task") @@ -182,7 +179,8 @@ udatas.append(temp) if udatas: start_time = time.time() - upload_data(code, "l2_order", udatas) + # upload_data(code, "l2_order", udatas) + l2_data_callback.OnL2Order(code, udatas, int(time.time() * 1000)) # l2_data_transaction_protocol.send_l2_order_detail(pipe, _mmap, code, udatas) use_time = int((time.time() - start_time) * 1000) if use_time > 20: @@ -255,13 +253,13 @@ # 杩愯涓婁紶浠诲姟 -def run_upload_task(code, pipe_strategy): +def run_upload_task(code: str, l2_data_callback: L2DataCallBack) -> None: # 濡傛灉浠g爜娌℃湁鍦ㄧ洰鏍囦唬鐮佷腑灏变笉闇�瑕佽繍琛� if code not in target_codes: return # 濡傛灉鏈�杩戠殑娲诲姩鏃堕棿灏忎簬2s灏变笉闇�瑕佽繍琛� if code not in order_detail_upload_active_time_dict or time.time() - order_detail_upload_active_time_dict[code] > 2: - t = threading.Thread(target=lambda: __run_upload_order(code, pipe_strategy), daemon=True) + t = threading.Thread(target=lambda: __run_upload_order(code, l2_data_callback), daemon=True) t.start() if code not in transaction_upload_active_time_dict or time.time() - transaction_upload_active_time_dict[code] > 2: @@ -287,12 +285,12 @@ t.start() -def __test(pipe_strategy): +def __test(_l2_data_callback): code = "002073" if code not in tmep_order_detail_queue_dict: tmep_order_detail_queue_dict[code] = queue.Queue() target_codes.add(code) - t = threading.Thread(target=lambda: __run_upload_order(code, pipe_strategy), daemon=True) + t = threading.Thread(target=lambda: __run_upload_order(code, _l2_data_callback), daemon=True) t.start() while True: try: @@ -303,15 +301,10 @@ pass -def run_test(pipe_strage): - t = threading.Thread(target=lambda: __test(pipe_strage), daemon=True) +def run_test(_l2_data_callback): + t = threading.Thread(target=lambda: __test(_l2_data_callback), daemon=True) t.start() + def test(): - # upload_data("000798", "trading_order_canceled", 30997688, new_sk=True) - code = "000333" - tag = l2_data_transaction_protocol.get_mmap_tag_name_for_l2_order(code) - with contextlib.closing( - mmap.mmap(-1, 1000 * 100, tag, - access=mmap.ACCESS_WRITE)) as _mmap: - pass \ No newline at end of file + pass -- Gitblit v1.8.0