Administrator
2023-08-30 87a68117b9957219f17dc7830cb2b33b88a9d1d8
huaxin_client/l2_data_manager.py
@@ -7,11 +7,12 @@
import random
import threading
import time
from huaxin_client import socket_util, l2_data_transaction_protocol
from huaxin_client import socket_util, l2_data_transform_protocol
from huaxin_client.client_network import SendResponseSkManager
# 活动时间
from huaxin_client.l2_data_transform_protocol import L2DataCallBack
from log_module import log_export
from log_module.log import logger_local_huaxin_l2_error, logger_local_huaxin_l2_upload, logger_local_huaxin_l2_buy_no, \
    logger_local_huaxin_g_cancel, hx_logger_contact_debug
@@ -161,13 +162,9 @@
# 循环读取上传数据
def __run_upload_order(code, pipe):
def __run_upload_order(code: str, l2_data_callback: L2DataCallBack) -> None:
    if code not in tmep_order_detail_queue_dict:
        tmep_order_detail_queue_dict[code] = queue.Queue()
    tag = l2_data_transaction_protocol.get_mmap_tag_name_for_l2_order(code)
    # with contextlib.closing(
    #         mmap.mmap(-1, 1000 * 100, tag,
    #                   access=mmap.ACCESS_WRITE)) as _mmap:
    if True:
        while True:
            # print("order task")
@@ -182,7 +179,8 @@
                    udatas.append(temp)
                if udatas:
                    start_time = time.time()
                    upload_data(code, "l2_order", udatas)
                    # upload_data(code, "l2_order", udatas)
                    l2_data_callback.OnL2Order(code,  udatas, int(time.time() * 1000))
                    # l2_data_transaction_protocol.send_l2_order_detail(pipe, _mmap, code, udatas)
                    use_time = int((time.time() - start_time) * 1000)
                    if use_time > 20:
@@ -255,13 +253,13 @@
# 运行上传任务
def run_upload_task(code, pipe_strategy):
def run_upload_task(code: str, l2_data_callback: L2DataCallBack) -> None:
    # 如果代码没有在目标代码中就不需要运行
    if code not in target_codes:
        return
    # 如果最近的活动时间小于2s就不需要运行
    if code not in order_detail_upload_active_time_dict or time.time() - order_detail_upload_active_time_dict[code] > 2:
        t = threading.Thread(target=lambda: __run_upload_order(code, pipe_strategy), daemon=True)
        t = threading.Thread(target=lambda: __run_upload_order(code, l2_data_callback), daemon=True)
        t.start()
    if code not in transaction_upload_active_time_dict or time.time() - transaction_upload_active_time_dict[code] > 2:
@@ -287,12 +285,12 @@
    t.start()
def __test(pipe_strategy):
def __test(_l2_data_callback):
    code = "002073"
    if code not in tmep_order_detail_queue_dict:
        tmep_order_detail_queue_dict[code] = queue.Queue()
    target_codes.add(code)
    t = threading.Thread(target=lambda: __run_upload_order(code, pipe_strategy), daemon=True)
    t = threading.Thread(target=lambda: __run_upload_order(code, _l2_data_callback), daemon=True)
    t.start()
    while True:
        try:
@@ -303,15 +301,10 @@
            pass
def run_test(pipe_strage):
    t = threading.Thread(target=lambda: __test(pipe_strage), daemon=True)
def run_test(_l2_data_callback):
    t = threading.Thread(target=lambda: __test(_l2_data_callback), daemon=True)
    t.start()
def test():
    # upload_data("000798", "trading_order_canceled", 30997688, new_sk=True)
    code = "000333"
    tag = l2_data_transaction_protocol.get_mmap_tag_name_for_l2_order(code)
    with contextlib.closing(
            mmap.mmap(-1, 1000 * 100, tag,
                      access=mmap.ACCESS_WRITE)) as _mmap:
        pass
    pass