Administrator
2023-08-29 770624f05f8dd627be15442f15c82033010a63cc
huaxin_client/l2_data_manager.py
@@ -1,18 +1,20 @@
# -*- coding: utf-8 -*-
import contextlib
import json
import logging
import mmap
import queue
import random
import threading
import time
from huaxin_client import socket_util
from huaxin_client import socket_util, l2_data_transaction_protocol
from huaxin_client.client_network import SendResponseSkManager
# 活动时间
from log_module import log_export
from log_module.log import logger_local_huaxin_l2_error, logger_local_huaxin_l2_upload, logger_local_huaxin_l2_buy_no, \
    logger_local_huaxin_g_cancel
    logger_local_huaxin_g_cancel, hx_logger_contact_debug
order_detail_upload_active_time_dict = {}
transaction_upload_active_time_dict = {}
@@ -159,31 +161,36 @@
# 循环读取上传数据
def __run_upload_order(code):
def __run_upload_order(code, pipe):
    if code not in tmep_order_detail_queue_dict:
        tmep_order_detail_queue_dict[code] = queue.Queue()
    while True:
        # print("order task")
        try:
            if code not in target_codes:
                break
            order_detail_upload_active_time_dict[code] = time.time()
            udatas = []
            while not tmep_order_detail_queue_dict[code].empty():
                temp = tmep_order_detail_queue_dict[code].get()
                udatas.append(temp)
            if udatas:
                start_time = time.time()
                upload_data(code, "l2_order", udatas)
                use_time = int((time.time() - start_time) * 1000)
                if use_time > 20:
                    logger_local_huaxin_l2_upload.info(f"{code}-上传代码耗时:{use_time}ms")
            time.sleep(0.01)
    with contextlib.closing(mmap.mmap(-1, 1000 * 100, tagname=l2_data_transaction_protocol.get_mmap_tag_name_for_l2_order(code), access=mmap.ACCESS_WRITE)) as _mmap:
        while True:
            # print("order task")
            try:
                if code not in target_codes:
                    break
                # 打开共享内存
                order_detail_upload_active_time_dict[code] = time.time()
                udatas = []
                while not tmep_order_detail_queue_dict[code].empty():
                    temp = tmep_order_detail_queue_dict[code].get()
                    udatas.append(temp)
                if udatas:
                    start_time = time.time()
                    # upload_data(code, "l2_order", udatas)
                    l2_data_transaction_protocol.send_l2_order_detail(pipe, _mmap, code, udatas)
                    use_time = int((time.time() - start_time) * 1000)
                    if use_time > 20:
                        logger_local_huaxin_l2_upload.info(f"{code}-上传代码耗时:{use_time}ms")
        except Exception as e:
            logger_local_huaxin_l2_error.error(f"上传订单数据出错:{str(e)}")
            pass
                time.sleep(0.01)
            except Exception as e:
                hx_logger_contact_debug.exception(e)
                logger_local_huaxin_l2_error.error(f"上传订单数据出错:{str(e)}")
                pass
def __run_upload_trans(code):
@@ -245,13 +252,13 @@
# 运行上传任务
def run_upload_task(code):
def run_upload_task(code, pipe_strategy):
    # 如果代码没有在目标代码中就不需要运行
    if code not in target_codes:
        return
    # 如果最近的活动时间小于2s就不需要运行
    if code not in order_detail_upload_active_time_dict or time.time() - order_detail_upload_active_time_dict[code] > 2:
        t = threading.Thread(target=lambda: __run_upload_order(code), daemon=True)
        t = threading.Thread(target=lambda: __run_upload_order(code, pipe_strategy), daemon=True)
        t.start()
    if code not in transaction_upload_active_time_dict or time.time() - transaction_upload_active_time_dict[code] > 2:
@@ -278,9 +285,13 @@
def __test():
    code = "002073"
    if code not in tmep_order_detail_queue_dict:
        tmep_order_detail_queue_dict[code] = queue.Queue()
    target_codes.add(code)
    while True:
        try:
            trading_order_canceled("603106", 6114878)
            tmep_order_detail_queue_dict[code].put_nowait(['002073', 0.0, 88100, '1', '2', 103831240, 2011, 18190761, 18069131, 'D', 1693276711224])
            time.sleep(5)
        except:
            pass