Administrator
2025-02-27 cb824ff6c70b0d9bbb28dd4cb648c98766745071
l2_test.py
@@ -2,6 +2,7 @@
import json
import logging
import multiprocessing
import queue
import socketserver
import threading
import time
@@ -14,7 +15,7 @@
from huaxin_client import l2_client_test, l1_subscript_codes_manager
from log_module.log import logger_local_huaxin_l2_transaction_big_order, logger_system
from third_data.custom_block_in_money_manager import CodeInMoneyManager, BlockInMoneyRankManager
from utils import tool
from utils import tool, middle_api_protocol
import urllib.parse as urlparse
from urllib.parse import parse_qs
@@ -88,6 +89,22 @@
        logger_system.error(f"端口服务器:{port} 启动失败")
def __run_upload_big_order_task(_queue: queue.Queue):
    # 运行上传大单任务
    while True:
        try:
            datas = []
            while not _queue.empty():
                datas.append(_queue.get())
            if datas:
                # 上传数据
                middle_api_protocol.request(middle_api_protocol.get_big_orders(datas))
        except:
            pass
        finally:
            time.sleep(1)
def run():
    codes_sh, codes_sz = l1_subscript_codes_manager.get_codes()
    codes = [x.decode() for x in codes_sh]
@@ -98,6 +115,8 @@
    page_size = int(len(codes) / cpu_count) + 1
    big_order_queue = multiprocessing.Queue()
    # 大单上传队列
    big_order_upload_queue = queue.Queue()
    for i in range(cpu_count):
        process = multiprocessing.Process(target=l2_client_test.run,
@@ -106,10 +125,13 @@
        process.start()
        # 绑核运行
        psutil.Process(process.pid).cpu_affinity([i])
    threading.Thread(target=__run_upload_big_order_task, args=(big_order_upload_queue,), daemon=True).start()
    while True:
        try:
            data = big_order_queue.get()
            CodeInMoneyManager().add_data(data)
            # 添加上传数据
            big_order_upload_queue.put_nowait(data)
            logger_local_huaxin_l2_transaction_big_order.info(f"{data}")
        except:
            pass