Administrator
2023-08-24 52685828db3535cd2640632ba5b91e41cf747064
交易通道处理采用线程池
4个文件已修改
50 ■■■■ 已修改文件
huaxin_client/command_manager.py 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_data_manager.py 22 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
log_module/log_analyse.py 19 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/test.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/command_manager.py
@@ -2,6 +2,7 @@
"""
命令管理器
"""
import concurrent.futures
import json
import logging
import threading
@@ -63,6 +64,7 @@
class TradeCommandManager:
    trade_client_dict = {}
    _instance = None
    process_command_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=50)
    def __new__(cls, *args, **kwargs):
        if not cls._instance:
@@ -126,8 +128,7 @@
                        cls.pipe_l2.send(
                            json.dumps({"type": "set_l2_codes", "data": _data}))
                    else:
                        t1 = threading.Thread(target=lambda: cls.__process_command(_type, None, val), daemon=True)
                        t1.start()
                        cls.process_command_thread_pool.submit(cls.__process_command, _type, None, val)
            except Exception as e:
                logger_local_huaxin_trade_debug.exception(e)
                logging.exception(e)
huaxin_client/l2_data_manager.py
@@ -136,17 +136,17 @@
# 上传数据
def upload_data(code, _type, datas):
def upload_data(code, _type, datas, new_sk=False):
    uid = random.randint(0, 100000)
    key = f"{_type}_{code}"
    fdata = json.dumps(
        {"type": _type, "data": {"code": code, "data": datas, "time": round(time.time() * 1000)}})
    # print("数据长度:", len(datas), len(fdata), f"{fdata[:20]}...{fdata[-20:]}", )
    # print("请求开始", uid, len(datas), len(fdata), f"{fdata[:20]}...{fdata[-20:]}")
    result = None
    start_time = time.time()
    # logger_local_huaxin_l2_upload.info(f"{code} 上传数据开始-{_type}")
    try:
        if new_sk:
            sk = SendResponseSkManager.create_send_response_sk()
            result = __send_response(sk, fdata.encode('utf-8'))
        else:
        result = send_response(key, fdata.encode('utf-8'))
    except Exception as e:
        logging.exception(e)
@@ -226,7 +226,7 @@
            temp = trading_canceled_queue.get()
            if temp:
                logger_local_huaxin_g_cancel.info(f"准备上报:{temp}")
                upload_data(temp[0], "trading_order_canceled", temp[1])
                upload_data(temp[0], "trading_order_canceled", temp[1], new_sk=True)
                logger_local_huaxin_g_cancel.info(f"上报成功:{temp}")
        except Exception as e:
            logger_local_huaxin_l2_error.exception(e)
@@ -278,12 +278,4 @@
if __name__ == "__main__":
    code = "603809"
    target_codes.add(code)
    run_upload_task(code)
    while True:
        for i in range(0, 5):
            add_l2_order_detail({"SecurityID": code, "Price": 11.28, "Volume": 500, "Side": "2", "OrderType": "\u0000",
                                 "OrderTime": 14591555, "MainSeq": 1, "SubSeq": 11050942, "OrderNO": 10692868,
                                 "OrderStatus": "A"}, False)
        time.sleep(0.001)
    upload_data("000798", "trading_order_canceled", 30997688, new_sk=True)
log_module/log_analyse.py
@@ -62,16 +62,19 @@
# 分析L2数据传输时间
def analyze_l2_data_transformation(path_):
    with open(path_, 'r') as f:
    with open(path_, 'r', encoding="utf-8") as f:
        while True:
            line = f.readline()
            if not line:
                break
            try:
            datas = line.split("|")
            create_time = datas[0].strip()
            data = datas[2].split("-")[1].strip()
            code = data.split("#")[0]
            l2_data = data.split("#")[1]
                contents = data.split("#")
                code = contents[0]
                use_time = int(contents[1].strip().split(":")[1])
                l2_data = contents[2]
            l2_data = eval(l2_data)
            max_time_data = None
            min_time_data = None
@@ -88,12 +91,16 @@
            if max_time_data and min_time_data:
                dt = datetime.datetime.strptime(create_time.split(".")[0], "%Y-%m-%d %H:%M:%S")
                create_timestamp = int(dt.timestamp() * 1000) + int(create_time.split(".")[1])
                if create_timestamp - min_time_data[10] > 20:
                    print(create_time, f"数量:{len(l2_data)}", code, create_timestamp - min_time_data[10],
                    if use_time > 100:
                        print(create_time, f"数量:{len(l2_data)}", f"耗时:{use_time}", code,
                              create_timestamp - min_time_data[10],
                          create_timestamp - max_time_data[10])
            except:
                print(line)
                pass
        pass
if __name__ == "__main__":
    analyze_l2_data_transformation("D:\\logs\\huaxin_l2\\orderdetail.2023-08-23.log")
    analyze_l2_data_transformation("D:\\logs\\huaxin_l2\\orderdetail.2023-08-24.log")
test/test.py
@@ -1,6 +1,6 @@
from huaxin_client import l2_data_manager
from log_module import async_log_util
from log_module.log import logger_debug
if __name__ == "__main__":
    async_log_util.add_log(logger_debug, "error", "测试错误")
    async_log_util.run_sync()
    l2_data_manager.upload_data("000798", "trading_order_canceled", 30997688, new_sk=True)