| | |
| | | """ |
| | | 命令管理器 |
| | | """ |
| | | import concurrent.futures |
| | | import json |
| | | import logging |
| | | import threading |
| | |
| | | class TradeCommandManager: |
| | | trade_client_dict = {} |
| | | _instance = None |
| | | process_command_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=50) |
| | | |
| | | def __new__(cls, *args, **kwargs): |
| | | if not cls._instance: |
| | |
| | | val = pipe_strategy.recv() |
| | | if val: |
| | | val = json.loads(val) |
| | | print("run_process_command",val) |
| | | print("run_process_command", val) |
| | | _type = val["type"] |
| | | _data = val["data"] |
| | | # 查看是否是设置L2的代码 |
| | |
| | | cls.pipe_l2.send( |
| | | json.dumps({"type": "set_l2_codes", "data": _data})) |
| | | else: |
| | | t1 = threading.Thread(target=lambda: cls.__process_command(_type, None, val), daemon=True) |
| | | t1.start() |
| | | cls.process_command_thread_pool.submit(cls.__process_command, _type, None, val) |
| | | except Exception as e: |
| | | logger_local_huaxin_trade_debug.exception(e) |
| | | logging.exception(e) |
| | |
| | | |
| | | |
| | | # 上传数据 |
| | | def upload_data(code, _type, datas): |
| | | def upload_data(code, _type, datas, new_sk=False): |
| | | uid = random.randint(0, 100000) |
| | | key = f"{_type}_{code}" |
| | | fdata = json.dumps( |
| | | {"type": _type, "data": {"code": code, "data": datas, "time": round(time.time() * 1000)}}) |
| | | # print("数据长度:", len(datas), len(fdata), f"{fdata[:20]}...{fdata[-20:]}", ) |
| | | # print("请求开始", uid, len(datas), len(fdata), f"{fdata[:20]}...{fdata[-20:]}") |
| | | result = None |
| | | start_time = time.time() |
| | | # logger_local_huaxin_l2_upload.info(f"{code} 上传数据开始-{_type}") |
| | | try: |
| | | result = send_response(key, fdata.encode('utf-8')) |
| | | if new_sk: |
| | | sk = SendResponseSkManager.create_send_response_sk() |
| | | result = __send_response(sk, fdata.encode('utf-8')) |
| | | else: |
| | | result = send_response(key, fdata.encode('utf-8')) |
| | | except Exception as e: |
| | | logging.exception(e) |
| | | finally: |
| | |
| | | temp = trading_canceled_queue.get() |
| | | if temp: |
| | | logger_local_huaxin_g_cancel.info(f"准备上报:{temp}") |
| | | upload_data(temp[0], "trading_order_canceled", temp[1]) |
| | | upload_data(temp[0], "trading_order_canceled", temp[1], new_sk=True) |
| | | logger_local_huaxin_g_cancel.info(f"上报成功:{temp}") |
| | | except Exception as e: |
| | | logger_local_huaxin_l2_error.exception(e) |
| | |
| | | |
| | | |
| | | if __name__ == "__main__": |
| | | code = "603809" |
| | | target_codes.add(code) |
| | | run_upload_task(code) |
| | | while True: |
| | | for i in range(0, 5): |
| | | add_l2_order_detail({"SecurityID": code, "Price": 11.28, "Volume": 500, "Side": "2", "OrderType": "\u0000", |
| | | "OrderTime": 14591555, "MainSeq": 1, "SubSeq": 11050942, "OrderNO": 10692868, |
| | | "OrderStatus": "A"}, False) |
| | | time.sleep(0.001) |
| | | upload_data("000798", "trading_order_canceled", 30997688, new_sk=True) |
| | |
| | | |
| | | # 分析L2数据传输时间 |
| | | def analyze_l2_data_transformation(path_): |
| | | with open(path_, 'r') as f: |
| | | with open(path_, 'r', encoding="utf-8") as f: |
| | | while True: |
| | | line = f.readline() |
| | | if not line: |
| | | break |
| | | datas = line.split("|") |
| | | create_time = datas[0].strip() |
| | | data = datas[2].split("-")[1].strip() |
| | | code = data.split("#")[0] |
| | | l2_data = data.split("#")[1] |
| | | l2_data = eval(l2_data) |
| | | max_time_data = None |
| | | min_time_data = None |
| | | for d in l2_data: |
| | | if len(d) > 10: |
| | | if max_time_data is None: |
| | | max_time_data = d |
| | | if min_time_data is None: |
| | | min_time_data = d |
| | | if d[10] > max_time_data[10]: |
| | | max_time_data = d |
| | | if d[10] < min_time_data[10]: |
| | | min_time_data = d |
| | | if max_time_data and min_time_data: |
| | | dt = datetime.datetime.strptime(create_time.split(".")[0], "%Y-%m-%d %H:%M:%S") |
| | | create_timestamp = int(dt.timestamp() * 1000) + int(create_time.split(".")[1]) |
| | | if create_timestamp - min_time_data[10] > 20: |
| | | print(create_time, f"数量:{len(l2_data)}", code, create_timestamp - min_time_data[10], |
| | | create_timestamp - max_time_data[10]) |
| | | try: |
| | | datas = line.split("|") |
| | | create_time = datas[0].strip() |
| | | data = datas[2].split(" - ")[1].strip() |
| | | contents = data.split("#") |
| | | code = contents[0] |
| | | use_time = int(contents[1].strip().split(":")[1]) |
| | | l2_data = contents[2] |
| | | l2_data = eval(l2_data) |
| | | max_time_data = None |
| | | min_time_data = None |
| | | for d in l2_data: |
| | | if len(d) > 10: |
| | | if max_time_data is None: |
| | | max_time_data = d |
| | | if min_time_data is None: |
| | | min_time_data = d |
| | | if d[10] > max_time_data[10]: |
| | | max_time_data = d |
| | | if d[10] < min_time_data[10]: |
| | | min_time_data = d |
| | | if max_time_data and min_time_data: |
| | | dt = datetime.datetime.strptime(create_time.split(".")[0], "%Y-%m-%d %H:%M:%S") |
| | | create_timestamp = int(dt.timestamp() * 1000) + int(create_time.split(".")[1]) |
| | | if use_time > 100: |
| | | print(create_time, f"数量:{len(l2_data)}", f"耗时:{use_time}", code, |
| | | create_timestamp - min_time_data[10], |
| | | create_timestamp - max_time_data[10]) |
| | | except: |
| | | print(line) |
| | | pass |
| | | |
| | | pass |
| | | |
| | | |
| | | if __name__ == "__main__": |
| | | analyze_l2_data_transformation("D:\\logs\\huaxin_l2\\orderdetail.2023-08-23.log") |
| | | analyze_l2_data_transformation("D:\\logs\\huaxin_l2\\orderdetail.2023-08-24.log") |
| | |
| | | from huaxin_client import l2_data_manager |
| | | from log_module import async_log_util |
| | | from log_module.log import logger_debug |
| | | |
| | | if __name__ == "__main__": |
| | | async_log_util.add_log(logger_debug, "error", "测试错误") |
| | | async_log_util.run_sync() |
| | | l2_data_manager.upload_data("000798", "trading_order_canceled", 30997688, new_sk=True) |