Administrator
2023-08-31 d47fbd65ab8197348ad293b7948fcdd2f8995594
huaxin_client/l2_data_manager.py
@@ -23,6 +23,7 @@
tmep_order_detail_queue_dict = {}
tmep_transaction_queue_dict = {}
target_codes = set()
target_codes_add_time = {}
common_queue = queue.Queue()
trading_canceled_queue = queue.Queue()
log_buy_no_queue = queue.Queue()
@@ -30,6 +31,18 @@
buy_order_nos_dict = {}
# 最近的大单成交单号
latest_big_order_transaction_orders_dict = {}
def add_target_code(code):
    target_codes.add(code)
    # 记录代码加入时间
    target_codes_add_time[code] = time.time()
def del_target_code(code):
    target_codes.discard(code)
    if code in target_codes_add_time:
        target_codes_add_time.pop(code)
# 获取最近的大单成交订单号
@@ -257,8 +270,13 @@
            pass
__upload_order_threads = {}
__upload_trans_threads = {}
# 运行上传任务
def run_upload_task(code: str, l2_data_callback: L2DataCallBack) -> None:
    try:
    # 如果代码没有在目标代码中就不需要运行
    if code not in target_codes:
        return
@@ -266,10 +284,14 @@
    if code not in order_detail_upload_active_time_dict or time.time() - order_detail_upload_active_time_dict[code] > 2:
        t = threading.Thread(target=lambda: __run_upload_order(code, l2_data_callback), daemon=True)
        t.start()
            __upload_order_threads[code] = t
    if code not in transaction_upload_active_time_dict or time.time() - transaction_upload_active_time_dict[code] > 2:
        t = threading.Thread(target=lambda: __run_upload_trans(code, l2_data_callback), daemon=True)
        t.start()
            __upload_trans_threads[code] = t
    finally:
        pass
def run_upload_common(l2_data_callback: L2DataCallBack):
@@ -290,6 +312,29 @@
    t.start()
# 运行守护线程
def run_upload_daemon(_l2_data_callback):
    def upload_daemon():
        while True:
            try:
                for code in target_codes_add_time:
                    # 目标代码加入2s之后启动守护
                    if time.time() - target_codes_add_time[code] > 2:
                        if code not in __upload_order_threads or not __upload_order_threads[code].is_alive():
                            t = threading.Thread(target=lambda: __run_upload_order(code, _l2_data_callback), daemon=True)
                            t.start()
                            __upload_order_threads[code] = t
                        if code not in __upload_trans_threads or not __upload_trans_threads[code].is_alive():
                            t = threading.Thread(target=lambda: __run_upload_trans(code, _l2_data_callback), daemon=True)
                            t.start()
                            __upload_trans_threads[code] = t
                    time.sleep(3)
            except:
                pass
    t = threading.Thread(target=lambda: upload_daemon(), daemon=True)
    t.start()
def __test(_l2_data_callback):
    code = "002073"
    if code not in tmep_order_detail_queue_dict: