Administrator
2023-08-30 c265bb186a6b6e2a31689d599be59b5b6f10cf42
huaxin_client/l2_data_manager.py
@@ -180,7 +180,7 @@
                if udatas:
                    start_time = time.time()
                    # upload_data(code, "l2_order", udatas)
                    l2_data_callback.OnL2Order(code,  udatas, int(time.time() * 1000))
                    l2_data_callback.OnL2Order(code, udatas, int(time.time() * 1000))
                    # l2_data_transaction_protocol.send_l2_order_detail(pipe, _mmap, code, udatas)
                    use_time = int((time.time() - start_time) * 1000)
                    if use_time > 20:
@@ -194,7 +194,7 @@
                pass
def __run_upload_trans(code):
def __run_upload_trans(code, l2_data_callback: L2DataCallBack):
    if code not in tmep_transaction_queue_dict:
        tmep_transaction_queue_dict[code] = queue.Queue()
    while True:
@@ -208,33 +208,38 @@
                temp = tmep_transaction_queue_dict[code].get()
                udatas.append(temp)
            if udatas:
                upload_data(code, "l2_trans", udatas)
                # upload_data(code, "l2_trans", udatas)
                l2_data_callback.OnL2Transaction(code, udatas)
            time.sleep(0.01)
        except Exception as e:
            logger_local_huaxin_l2_error.error(f"上传成交数据出错:{str(e)}")
def __run_upload_common():
def __run_upload_common(l2_data_callback: L2DataCallBack):
    print("__run_upload_common")
    while True:
        try:
            while not common_queue.empty():
                temp = common_queue.get()
                upload_data(temp[0], temp[1], temp[2])
                if temp[1] == "l2_market_data":
                    l2_data_callback.OnMarketData(temp[0], temp[2])
                else:
                    upload_data(temp[0], temp[1], temp[2])
            time.sleep(0.01)
        except Exception as e:
            logger_local_huaxin_l2_error.exception(e)
            logger_local_huaxin_l2_error.error(f"上传普通数据出错:{str(e)}")
def __run_upload_trading_canceled():
def __run_upload_trading_canceled(l2_data_callback: L2DataCallBack):
    print("__run_upload_trading_canceled")
    while True:
        try:
            temp = trading_canceled_queue.get()
            if temp:
                logger_local_huaxin_g_cancel.info(f"准备上报:{temp}")
                upload_data(temp[0], "trading_order_canceled", temp[1], new_sk=True)
                # upload_data(temp[0], "trading_order_canceled", temp[1], new_sk=True)
                l2_data_callback.OnTradingOrderCancel(temp[0], temp[1])
                logger_local_huaxin_g_cancel.info(f"上报成功:{temp}")
        except Exception as e:
            logger_local_huaxin_l2_error.exception(e)
@@ -263,17 +268,17 @@
        t.start()
    if code not in transaction_upload_active_time_dict or time.time() - transaction_upload_active_time_dict[code] > 2:
        t = threading.Thread(target=lambda: __run_upload_trans(code), daemon=True)
        t = threading.Thread(target=lambda: __run_upload_trans(code, l2_data_callback), daemon=True)
        t.start()
def run_upload_common():
    t = threading.Thread(target=lambda: __run_upload_common(), daemon=True)
def run_upload_common(l2_data_callback: L2DataCallBack):
    t = threading.Thread(target=lambda: __run_upload_common(l2_data_callback), daemon=True)
    t.start()
def run_upload_trading_canceled():
    t = threading.Thread(target=lambda: __run_upload_trading_canceled(), daemon=True)
def run_upload_trading_canceled(l2_data_callback: L2DataCallBack):
    t = threading.Thread(target=lambda: __run_upload_trading_canceled(l2_data_callback), daemon=True)
    t.start()