From d47fbd65ab8197348ad293b7948fcdd2f8995594 Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期四, 31 八月 2023 13:16:00 +0800 Subject: [PATCH] L2数据上传任务守护线程 --- huaxin_client/l2_data_manager.py | 65 +++++++++++++++++++++++++++----- 1 files changed, 55 insertions(+), 10 deletions(-) diff --git a/huaxin_client/l2_data_manager.py b/huaxin_client/l2_data_manager.py index 13ad2a3..d6552bc 100644 --- a/huaxin_client/l2_data_manager.py +++ b/huaxin_client/l2_data_manager.py @@ -23,6 +23,7 @@ tmep_order_detail_queue_dict = {} tmep_transaction_queue_dict = {} target_codes = set() +target_codes_add_time = {} common_queue = queue.Queue() trading_canceled_queue = queue.Queue() log_buy_no_queue = queue.Queue() @@ -30,6 +31,18 @@ buy_order_nos_dict = {} # 鏈�杩戠殑澶у崟鎴愪氦鍗曞彿 latest_big_order_transaction_orders_dict = {} + + +def add_target_code(code): + target_codes.add(code) + # 璁板綍浠g爜鍔犲叆鏃堕棿 + target_codes_add_time[code] = time.time() + + +def del_target_code(code): + target_codes.discard(code) + if code in target_codes_add_time: + target_codes_add_time.pop(code) # 鑾峰彇鏈�杩戠殑澶у崟鎴愪氦璁㈠崟鍙� @@ -257,19 +270,28 @@ pass +__upload_order_threads = {} +__upload_trans_threads = {} + + # 杩愯涓婁紶浠诲姟 def run_upload_task(code: str, l2_data_callback: L2DataCallBack) -> None: - # 濡傛灉浠g爜娌℃湁鍦ㄧ洰鏍囦唬鐮佷腑灏变笉闇�瑕佽繍琛� - if code not in target_codes: - return - # 濡傛灉鏈�杩戠殑娲诲姩鏃堕棿灏忎簬2s灏变笉闇�瑕佽繍琛� - if code not in order_detail_upload_active_time_dict or time.time() - order_detail_upload_active_time_dict[code] > 2: - t = threading.Thread(target=lambda: __run_upload_order(code, l2_data_callback), daemon=True) - t.start() + try: + # 濡傛灉浠g爜娌℃湁鍦ㄧ洰鏍囦唬鐮佷腑灏变笉闇�瑕佽繍琛� + if code not in target_codes: + return + # 濡傛灉鏈�杩戠殑娲诲姩鏃堕棿灏忎簬2s灏变笉闇�瑕佽繍琛� + if code not in order_detail_upload_active_time_dict or time.time() - order_detail_upload_active_time_dict[code] > 2: + t = threading.Thread(target=lambda: __run_upload_order(code, l2_data_callback), daemon=True) + t.start() + __upload_order_threads[code] = t - if code not in transaction_upload_active_time_dict or time.time() - transaction_upload_active_time_dict[code] > 2: - t = threading.Thread(target=lambda: __run_upload_trans(code, l2_data_callback), daemon=True) - t.start() + if code not in transaction_upload_active_time_dict or time.time() - transaction_upload_active_time_dict[code] > 2: + t = threading.Thread(target=lambda: __run_upload_trans(code, l2_data_callback), daemon=True) + t.start() + __upload_trans_threads[code] = t + finally: + pass def run_upload_common(l2_data_callback: L2DataCallBack): @@ -290,6 +312,29 @@ t.start() +# 杩愯瀹堟姢绾跨▼ +def run_upload_daemon(_l2_data_callback): + def upload_daemon(): + while True: + try: + for code in target_codes_add_time: + # 鐩爣浠g爜鍔犲叆2s涔嬪悗鍚姩瀹堟姢 + if time.time() - target_codes_add_time[code] > 2: + if code not in __upload_order_threads or not __upload_order_threads[code].is_alive(): + t = threading.Thread(target=lambda: __run_upload_order(code, _l2_data_callback), daemon=True) + t.start() + __upload_order_threads[code] = t + if code not in __upload_trans_threads or not __upload_trans_threads[code].is_alive(): + t = threading.Thread(target=lambda: __run_upload_trans(code, _l2_data_callback), daemon=True) + t.start() + __upload_trans_threads[code] = t + time.sleep(3) + except: + pass + t = threading.Thread(target=lambda: upload_daemon(), daemon=True) + t.start() + + def __test(_l2_data_callback): code = "002073" if code not in tmep_order_detail_queue_dict: -- Gitblit v1.8.0