| | |
| | | # -*- coding: utf-8 -*- |
| | | import json |
| | | import logging |
| | | import multiprocessing |
| | | import queue |
| | | import random |
| | | import threading |
| | | import time |
| | | from huaxin_client import socket_util, l2_data_transform_protocol |
| | | from huaxin_client import socket_util |
| | | |
| | | from huaxin_client.client_network import SendResponseSkManager |
| | | |
| | | # 活动时间 |
| | | from huaxin_client.l2_data_transform_protocol import L2DataCallBack |
| | | from log_module import log_export, async_log_util |
| | | from log_module.log import logger_local_huaxin_l2_error, logger_local_huaxin_l2_upload, logger_local_huaxin_l2_buy_no, \ |
| | | logger_local_huaxin_g_cancel, hx_logger_contact_debug, logger_system, logger_local_huaxin_l2_orderdetail |
| | | from huaxin_client.code_queue_distribute_manager import CodeQueueDistributeManager |
| | | from log_module import async_log_util |
| | | from log_module.log import logger_local_huaxin_l2_error, logger_system |
| | | from utils import tool |
| | | |
| | | order_detail_upload_active_time_dict = {} |
| | |
| | | target_codes = set() |
| | | target_codes_add_time = {} |
| | | common_queue = queue.Queue() |
| | | trading_canceled_queue = queue.Queue() |
| | | log_buy_no_queue = queue.Queue() |
| | | # 买入订单号的字典 |
| | | buy_order_nos_dict = {} |
| | | # 最近的大单成交单号 |
| | | latest_big_order_transaction_orders_dict = {} |
| | | |
| | | |
| | | # L2上传数据管理器 |
| | | class L2DataUploadManager: |
| | | def __init__(self, order_queue_distribute_manager: CodeQueueDistributeManager, |
| | | transaction_queue_distribute_manager: CodeQueueDistributeManager, |
| | | market_data_queue: multiprocessing.Queue): |
| | | self.order_queue_distribute_manager = order_queue_distribute_manager |
| | | self.transaction_queue_distribute_manager = transaction_queue_distribute_manager |
| | | self.market_data_queue = market_data_queue |
| | | |
| | | # 添加委托详情 |
| | | def add_l2_order_detail(self, data, start_time, istransaction=False): |
| | | code = data["SecurityID"] |
| | | queue_info = self.order_queue_distribute_manager.get_distributed_queue(code) |
| | | if not queue_info: |
| | | return |
| | | queue_info[1].put_nowait( |
| | | (data['SecurityID'], data['Price'], data['Volume'], data['Side'], data['OrderType'], data['OrderTime'], |
| | | data['MainSeq'], data['SubSeq'], data['OrderNO'], data['OrderStatus'], time.time(), start_time)) |
| | | |
| | | # 添加逐笔成交 |
| | | def add_transaction_detail(self, data): |
| | | code = data["SecurityID"] |
| | | queue_info = self.transaction_queue_distribute_manager.get_distributed_queue(code) |
| | | if not queue_info: |
| | | return |
| | | # 判断是否为大单成交 |
| | | queue_info[1].put_nowait((data['SecurityID'], data['TradePrice'], data['TradeVolume'], |
| | | data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'], |
| | | data['SellNo'], data['ExecType'])) |
| | | |
| | | def add_market_data(self, data): |
| | | # 加入上传队列 |
| | | self.market_data_queue.put_nowait(data) |
| | | |
| | | # 分配上传队列 |
| | | def distribute_upload_queue(self, code): |
| | | self.order_queue_distribute_manager.distribute_queue(code) |
| | | self.transaction_queue_distribute_manager.distribute_queue(code) |
| | | |
| | | # 释放已经分配的队列 |
| | | def release_distributed_upload_queue(self, code): |
| | | self.order_queue_distribute_manager.release_distribute_queue(code) |
| | | self.transaction_queue_distribute_manager.release_distribute_queue(code) |
| | | |
| | | |
| | | def add_target_code(code): |
| | |
| | | target_codes.discard(code) |
| | | if code in target_codes_add_time: |
| | | target_codes_add_time.pop(code) |
| | | |
| | | |
| | | # 获取最近的大单成交订单号 |
| | | def get_latest_transaction_order_nos(code): |
| | | return latest_big_order_transaction_orders_dict.get(code) |
| | | |
| | | |
| | | # 正在成交的订单撤单了 |
| | | def trading_order_canceled(code_, order_no): |
| | | trading_canceled_queue.put((code_, order_no)) |
| | | |
| | | |
| | | # 添加委托详情 |
| | | def add_l2_order_detail(data, start_time, istransaction=False): |
| | | code = data["SecurityID"] |
| | | # 异步日志记录 |
| | | if code not in tmep_order_detail_queue_dict: |
| | | tmep_order_detail_queue_dict[code] = queue.Queue() |
| | | # 原来的格式 |
| | | # {"SecurityID": pOrderDetail['SecurityID'], "Price": pOrderDetail['Price'], |
| | | # "Volume": pOrderDetail['Volume'], |
| | | # "Side": pOrderDetail['Side'].decode(), "OrderType": pOrderDetail['OrderType'].decode(), |
| | | # "OrderTime": pOrderDetail['OrderTime'], "MainSeq": pOrderDetail['MainSeq'], |
| | | # "SubSeq": pOrderDetail['SubSeq'], "OrderNO": pOrderDetail['OrderNO'], |
| | | # "OrderStatus": pOrderDetail['OrderStatus'].decode()} |
| | | |
| | | # 用于G撤的数据,暂时注释 |
| | | # if data['Side'] == "1": |
| | | # # 记录所有买入的订单号 |
| | | # if data['SecurityID'] not in buy_order_nos_dict: |
| | | # buy_order_nos_dict[data['SecurityID']] = set() |
| | | # buy_order_nos_dict[data['SecurityID']].add(data['OrderNO']) |
| | | # # 买入订单号需要记录日志 |
| | | # async_log_util.huaxin_l2_log.info(logger_local_huaxin_l2_buy_no, f"{data['SecurityID']}#{data['OrderNO']}") |
| | | |
| | | tmep_order_detail_queue_dict[code].put( |
| | | (data['SecurityID'], data['Price'], data['Volume'], data['Side'], data['OrderType'], data['OrderTime'], |
| | | data['MainSeq'], data['SubSeq'], data['OrderNO'], data['OrderStatus'], time.time(), start_time)) |
| | | |
| | | |
| | | # 添加逐笔成交 |
| | | def add_transaction_detail(data): |
| | | code = data["SecurityID"] |
| | | if code not in tmep_transaction_queue_dict: |
| | | tmep_transaction_queue_dict[code] = queue.Queue() |
| | | # 原来的格式 |
| | | # item = {"SecurityID": pTransaction['SecurityID'], "TradePrice": pTransaction['TradePrice'], |
| | | # "TradeVolume": pTransaction['TradeVolume'], |
| | | # "OrderTime": pTransaction['TradeTime'], "MainSeq": pTransaction['MainSeq'], |
| | | # "SubSeq": pTransaction['SubSeq'], "BuyNo": pTransaction['BuyNo'], "SellNo": pTransaction['SellNo'], |
| | | # "ExecType": pTransaction['ExecType'].decode()} |
| | | |
| | | # 判断是否为大单成交 |
| | | code = data['SecurityID'] |
| | | # G撤相关数据操作暂时注释 |
| | | # if code in buy_order_nos_dict: |
| | | # if data['BuyNo'] in buy_order_nos_dict[code]: |
| | | # try: |
| | | # temp_list = latest_big_order_transaction_orders_dict.get(code) |
| | | # if not temp_list: |
| | | # temp_list = [] |
| | | # if temp_list: |
| | | # if temp_list[-1] != data['BuyNo']: |
| | | # # 不加入重复订单号 |
| | | # temp_list.append(data['BuyNo']) |
| | | # if len(temp_list) > 10: |
| | | # # 最多加10个订单号 |
| | | # temp_list = temp_list[-10:] |
| | | # else: |
| | | # temp_list.append(data['BuyNo']) |
| | | # latest_big_order_transaction_orders_dict[code] = temp_list |
| | | # except: |
| | | # pass |
| | | tmep_transaction_queue_dict[code].put((data['SecurityID'], data['TradePrice'], data['TradeVolume'], |
| | | data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'], |
| | | data['SellNo'], data['ExecType'])) |
| | | |
| | | |
| | | def add_market_data(data): |
| | | code = data['securityID'] |
| | | # 加入上传队列 |
| | | common_queue.put((code, "l2_market_data", data)) |
| | | |
| | | |
| | | def add_subscript_codes(codes): |
| | | print("add_subscript_codes", codes) |
| | |
| | | |
| | | # 上传数据 |
| | | def upload_data(code, _type, datas, new_sk=False): |
| | | uid = random.randint(0, 100000) |
| | | key = f"{_type}_{code}" |
| | | fdata = json.dumps( |
| | | {"type": _type, "data": {"code": code, "data": datas, "time": round(time.time() * 1000)}}) |
| | |
| | | logging.exception(e) |
| | | finally: |
| | | pass |
| | | # print("请求结束", uid, result) |
| | | # logger_local_huaxin_l2_upload.info( |
| | | # f"{code} 上传数据耗时-{_type}: {round((time.time() - start_time) * 1000, 1)} 数据量:{len(datas)}") |
| | | # print("上传结果", result) |
| | | |
| | | |
| | | # 循环读取上传数据 |
| | | def __run_upload_order(code: str, l2_data_callback: L2DataCallBack) -> None: |
| | | if code not in tmep_order_detail_queue_dict: |
| | | tmep_order_detail_queue_dict[code] = queue.Queue() |
| | | if True: |
| | | while True: |
| | | # print("order task") |
| | | try: |
| | | if code not in target_codes: |
| | | break |
| | | order_detail_upload_active_time_dict[code] = time.time() |
| | | udatas = [] |
| | | while not tmep_order_detail_queue_dict[code].empty(): |
| | | temp = tmep_order_detail_queue_dict[code].get() |
| | | udatas.append(temp) |
| | | if udatas: |
| | | # start_time = time.time() |
| | | # upload_data(code, "l2_order", udatas) |
| | | l2_data_callback.OnL2Order(code, udatas, int(time.time() * 1000)) |
| | | # l2_data_transaction_protocol.send_l2_order_detail(pipe, _mmap, code, udatas) |
| | | # use_time = int((time.time() - start_time) * 1000) |
| | | # if use_time > 10: |
| | | # async_log_util.info(logger_local_huaxin_l2_upload, f"{code}-上传代码耗时:{use_time}ms") |
| | | else: |
| | | # 没有数据的时候需等待,有数据时不需等待 |
| | | time.sleep(0.001) |
| | | except Exception as e: |
| | | hx_logger_contact_debug.exception(e) |
| | | logger_local_huaxin_l2_error.error(f"上传订单数据出错:{str(e)}") |
| | | pass |
| | | |
| | | |
| | | def __run_upload_trans(code, l2_data_callback: L2DataCallBack): |
| | | if code not in tmep_transaction_queue_dict: |
| | | tmep_transaction_queue_dict[code] = queue.Queue() |
| | | while True: |
| | | # print("trans task") |
| | | try: |
| | | if code not in target_codes: |
| | | break |
| | | transaction_upload_active_time_dict[code] = time.time() |
| | | udatas = [] |
| | | while not tmep_transaction_queue_dict[code].empty(): |
| | | temp = tmep_transaction_queue_dict[code].get() |
| | | udatas.append(temp) |
| | | if udatas: |
| | | # upload_data(code, "l2_trans", udatas) |
| | | l2_data_callback.OnL2Transaction(code, udatas) |
| | | time.sleep(0.01) |
| | | except Exception as e: |
| | | logger_local_huaxin_l2_error.error(f"上传成交数据出错:{str(e)}") |
| | | |
| | | |
| | | def __run_upload_common(l2_data_callback: L2DataCallBack): |
| | | def __run_upload_common(): |
| | | print("__run_upload_common") |
| | | logger_system.info(f"l2_client __run_upload_common 线程ID:{tool.get_thread_id()}") |
| | | while True: |
| | | try: |
| | | while not common_queue.empty(): |
| | | temp = common_queue.get() |
| | | if temp[1] == "l2_market_data": |
| | | l2_data_callback.OnMarketData(temp[0], temp[2]) |
| | | else: |
| | | upload_data(temp[0], temp[1], temp[2]) |
| | | upload_data(temp[0], temp[1], temp[2]) |
| | | |
| | | except Exception as e: |
| | | logger_local_huaxin_l2_error.exception(e) |
| | |
| | | time.sleep(0.01) |
| | | |
| | | |
| | | def __run_upload_trading_canceled(l2_data_callback: L2DataCallBack): |
| | | print("__run_upload_trading_canceled") |
| | | logger_system.info(f"l2_client __run_upload_trading_canceled 线程ID:{tool.get_thread_id()}") |
| | | while True: |
| | | try: |
| | | temp = trading_canceled_queue.get() |
| | | if temp: |
| | | logger_local_huaxin_g_cancel.info(f"准备上报:{temp}") |
| | | # upload_data(temp[0], "trading_order_canceled", temp[1], new_sk=True) |
| | | l2_data_callback.OnTradingOrderCancel(temp[0], temp[1]) |
| | | logger_local_huaxin_g_cancel.info(f"上报成功:{temp}") |
| | | except Exception as e: |
| | | logger_local_huaxin_l2_error.exception(e) |
| | | logger_local_huaxin_l2_error.error(f"上传普通数据出错:{str(e)}") |
| | | |
| | | |
| | | def __run_log(): |
| | | print("__run_log") |
| | | logger_system.info(f"l2_client __run_log 线程ID:{tool.get_thread_id()}") |
| | | async_log_util.huaxin_l2_log.run_sync() |
| | | |
| | | |
| | | __upload_order_threads = {} |
| | | __upload_trans_threads = {} |
| | | |
| | | |
| | | # 运行上传任务 |
| | | def run_upload_task(code: str, l2_data_callback: L2DataCallBack) -> None: |
| | | try: |
| | | # 如果代码没有在目标代码中就不需要运行 |
| | | if code not in target_codes: |
| | | return |
| | | # 如果最近的活动时间小于2s就不需要运行 |
| | | if code not in order_detail_upload_active_time_dict or time.time() - order_detail_upload_active_time_dict[ |
| | | code] > 2: |
| | | t = threading.Thread(target=lambda: __run_upload_order(code, l2_data_callback), daemon=True) |
| | | t.start() |
| | | __upload_order_threads[code] = t |
| | | |
| | | if code not in transaction_upload_active_time_dict or time.time() - transaction_upload_active_time_dict[ |
| | | code] > 2: |
| | | t = threading.Thread(target=lambda: __run_upload_trans(code, l2_data_callback), daemon=True) |
| | | t.start() |
| | | __upload_trans_threads[code] = t |
| | | finally: |
| | | pass |
| | | |
| | | |
| | | def run_upload_common(l2_data_callback: L2DataCallBack): |
| | | t = threading.Thread(target=lambda: __run_upload_common(l2_data_callback), daemon=True) |
| | | t.start() |
| | | |
| | | |
| | | def run_upload_trading_canceled(l2_data_callback: L2DataCallBack): |
| | | t = threading.Thread(target=lambda: __run_upload_trading_canceled(l2_data_callback), daemon=True) |
| | | # 采用socket传输数据 |
| | | def run_upload_common(): |
| | | t = threading.Thread(target=lambda: __run_upload_common(), daemon=True) |
| | | t.start() |
| | | |
| | | |
| | | def run_log(): |
| | | # G撤相关数据,暂时注释 |
| | | # fdatas = log_export.load_huaxin_local_buy_no() |
| | | # global buy_order_nos_dict |
| | | # buy_order_nos_dict = fdatas |
| | | t = threading.Thread(target=lambda: __run_log(), daemon=True) |
| | | t.start() |
| | | |
| | | |
| | | # 运行守护线程 |
| | | def run_upload_daemon(_l2_data_callback): |
| | | def upload_daemon(): |
| | | logger_system.info(f"l2_client upload_daemon 线程ID:{tool.get_thread_id()}") |
| | | while True: |
| | | try: |
| | | for code in target_codes_add_time: |
| | | # 目标代码加入2s之后启动守护 |
| | | if time.time() - target_codes_add_time[code] > 2: |
| | | if code not in __upload_order_threads or not __upload_order_threads[code].is_alive(): |
| | | t = threading.Thread(target=lambda: __run_upload_order(code, _l2_data_callback), |
| | | daemon=True) |
| | | t.start() |
| | | __upload_order_threads[code] = t |
| | | logger_local_huaxin_l2_upload.info(f"重新创建L2订单上传线程:{code}") |
| | | if code not in __upload_trans_threads or not __upload_trans_threads[code].is_alive(): |
| | | t = threading.Thread(target=lambda: __run_upload_trans(code, _l2_data_callback), |
| | | daemon=True) |
| | | t.start() |
| | | __upload_trans_threads[code] = t |
| | | logger_local_huaxin_l2_upload.info(f"重新创建L2成交上传线程:{code}") |
| | | except: |
| | | pass |
| | | finally: |
| | | time.sleep(3) |
| | | |
| | | t = threading.Thread(target=lambda: upload_daemon(), daemon=True) |
| | | t.start() |
| | | |
| | | |
| | | def __test(_l2_data_callback): |
| | | def __test(): |
| | | code = "002073" |
| | | if code not in tmep_order_detail_queue_dict: |
| | | tmep_order_detail_queue_dict[code] = queue.Queue() |
| | | target_codes.add(code) |
| | | t = threading.Thread(target=lambda: __run_upload_order(code, _l2_data_callback), daemon=True) |
| | | t.start() |
| | | while True: |
| | | try: |
| | | tmep_order_detail_queue_dict[code].put_nowait( |
| | | ['002073', 0.0, 88100, '1', '2', 103831240, 2011, 18190761, 18069131, 'D', 1693276711224]) |
| | | time.sleep(5) |
| | | except: |
| | | pass |
| | | pass |
| | | |
| | | |
| | | def run_test(_l2_data_callback): |
| | | t = threading.Thread(target=lambda: __test(_l2_data_callback), daemon=True) |
| | | def run_test(): |
| | | t = threading.Thread(target=lambda: __test(), daemon=True) |
| | | t.start() |
| | | |
| | | |