Administrator
2023-11-02 eb33b717023d9871bd74e6dce47a065228cffefc
huaxin_client/l2_data_manager.py
@@ -1,19 +1,18 @@
# -*- coding: utf-8 -*-
import json
import logging
import multiprocessing
import queue
import random
import threading
import time
from huaxin_client import socket_util, l2_data_transform_protocol
from huaxin_client import socket_util
from huaxin_client.client_network import SendResponseSkManager
# 活动时间
from huaxin_client.l2_data_transform_protocol import L2DataCallBack
from log_module import log_export, async_log_util
from log_module.log import logger_local_huaxin_l2_error, logger_local_huaxin_l2_upload, logger_local_huaxin_l2_buy_no, \
    logger_local_huaxin_g_cancel, hx_logger_contact_debug, logger_system, logger_local_huaxin_l2_orderdetail
from huaxin_client.code_queue_distribute_manager import CodeQueueDistributeManager
from log_module import  async_log_util
from log_module.log import logger_local_huaxin_l2_error, logger_system
from utils import tool
order_detail_upload_active_time_dict = {}
@@ -24,12 +23,51 @@
target_codes = set()
target_codes_add_time = {}
common_queue = queue.Queue()
trading_canceled_queue = queue.Queue()
log_buy_no_queue = queue.Queue()
# 买入订单号的字典
buy_order_nos_dict = {}
# 最近的大单成交单号
latest_big_order_transaction_orders_dict = {}
# L2上传数据管理器
class L2DataUploadManager:
    def __init__(self, order_queue_distribute_manager: CodeQueueDistributeManager,
                 transaction_queue_distribute_manager: CodeQueueDistributeManager,
                 market_data_queue: multiprocessing.Queue):
        self.order_queue_distribute_manager = order_queue_distribute_manager
        self.transaction_queue_distribute_manager = transaction_queue_distribute_manager
        self.market_data_queue = market_data_queue
    # 添加委托详情
    def add_l2_order_detail(self, data, start_time, istransaction=False):
        code = data["SecurityID"]
        queue_info = self.order_queue_distribute_manager.get_distributed_queue(code)
        if not queue_info:
            return
        queue_info[1].put_nowait(
            (data['SecurityID'], data['Price'], data['Volume'], data['Side'], data['OrderType'], data['OrderTime'],
             data['MainSeq'], data['SubSeq'], data['OrderNO'], data['OrderStatus'], time.time(), start_time))
    # 添加逐笔成交
    def add_transaction_detail(self, data):
        code = data["SecurityID"]
        queue_info = self.transaction_queue_distribute_manager.get_distributed_queue(code)
        if not queue_info:
            return
        # 判断是否为大单成交
        queue_info[1].put_nowait((data['SecurityID'], data['TradePrice'], data['TradeVolume'],
                                  data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'],
                                  data['SellNo'], data['ExecType']))
    def add_market_data(self, data):
        # 加入上传队列
        self.market_data_queue.put_nowait(data)
    # 分配上传队列
    def distribute_upload_queue(self, code):
        self.order_queue_distribute_manager.distribute_queue(code)
        self.transaction_queue_distribute_manager.distribute_queue(code)
    # 释放已经分配的队列
    def release_distributed_upload_queue(self, code):
        self.order_queue_distribute_manager.release_distribute_queue(code)
        self.transaction_queue_distribute_manager.release_distribute_queue(code)
def add_target_code(code):
@@ -42,89 +80,6 @@
    target_codes.discard(code)
    if code in target_codes_add_time:
        target_codes_add_time.pop(code)
# 获取最近的大单成交订单号
def get_latest_transaction_order_nos(code):
    return latest_big_order_transaction_orders_dict.get(code)
# 正在成交的订单撤单了
def trading_order_canceled(code_, order_no):
    trading_canceled_queue.put((code_, order_no))
# 添加委托详情
def add_l2_order_detail(data, start_time, istransaction=False):
    code = data["SecurityID"]
    # 异步日志记录
    if code not in tmep_order_detail_queue_dict:
        tmep_order_detail_queue_dict[code] = queue.Queue()
    # 原来的格式
    # {"SecurityID": pOrderDetail['SecurityID'], "Price": pOrderDetail['Price'],
    #                 "Volume": pOrderDetail['Volume'],
    #                 "Side": pOrderDetail['Side'].decode(), "OrderType": pOrderDetail['OrderType'].decode(),
    #                 "OrderTime": pOrderDetail['OrderTime'], "MainSeq": pOrderDetail['MainSeq'],
    #                 "SubSeq": pOrderDetail['SubSeq'], "OrderNO": pOrderDetail['OrderNO'],
    #                 "OrderStatus": pOrderDetail['OrderStatus'].decode()}
    # 用于G撤的数据,暂时注释
    # if data['Side'] == "1":
    #     # 记录所有买入的订单号
    #     if data['SecurityID'] not in buy_order_nos_dict:
    #         buy_order_nos_dict[data['SecurityID']] = set()
    #     buy_order_nos_dict[data['SecurityID']].add(data['OrderNO'])
    #     # 买入订单号需要记录日志
    #     async_log_util.huaxin_l2_log.info(logger_local_huaxin_l2_buy_no, f"{data['SecurityID']}#{data['OrderNO']}")
    tmep_order_detail_queue_dict[code].put(
        (data['SecurityID'], data['Price'], data['Volume'], data['Side'], data['OrderType'], data['OrderTime'],
         data['MainSeq'], data['SubSeq'], data['OrderNO'], data['OrderStatus'], time.time(), start_time))
# 添加逐笔成交
def add_transaction_detail(data):
    code = data["SecurityID"]
    if code not in tmep_transaction_queue_dict:
        tmep_transaction_queue_dict[code] = queue.Queue()
    # 原来的格式
    #  item = {"SecurityID": pTransaction['SecurityID'], "TradePrice": pTransaction['TradePrice'],
    #                     "TradeVolume": pTransaction['TradeVolume'],
    #                     "OrderTime": pTransaction['TradeTime'], "MainSeq": pTransaction['MainSeq'],
    #                     "SubSeq": pTransaction['SubSeq'], "BuyNo": pTransaction['BuyNo'], "SellNo": pTransaction['SellNo'],
    #                     "ExecType": pTransaction['ExecType'].decode()}
    # 判断是否为大单成交
    code = data['SecurityID']
    # G撤相关数据操作暂时注释
    # if code in buy_order_nos_dict:
    #     if data['BuyNo'] in buy_order_nos_dict[code]:
    #         try:
    #             temp_list = latest_big_order_transaction_orders_dict.get(code)
    #             if not temp_list:
    #                 temp_list = []
    #             if temp_list:
    #                 if temp_list[-1] != data['BuyNo']:
    #                     # 不加入重复订单号
    #                     temp_list.append(data['BuyNo'])
    #                     if len(temp_list) > 10:
    #                         # 最多加10个订单号
    #                         temp_list = temp_list[-10:]
    #             else:
    #                 temp_list.append(data['BuyNo'])
    #             latest_big_order_transaction_orders_dict[code] = temp_list
    #         except:
    #             pass
    tmep_transaction_queue_dict[code].put((data['SecurityID'], data['TradePrice'], data['TradeVolume'],
                                           data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'],
                                           data['SellNo'], data['ExecType']))
def add_market_data(data):
    code = data['securityID']
    # 加入上传队列
    common_queue.put((code, "l2_market_data", data))
def add_subscript_codes(codes):
    print("add_subscript_codes", codes)
@@ -165,7 +120,6 @@
# 上传数据
def upload_data(code, _type, datas, new_sk=False):
    uid = random.randint(0, 100000)
    key = f"{_type}_{code}"
    fdata = json.dumps(
        {"type": _type, "data": {"code": code, "data": datas, "time": round(time.time() * 1000)}})
@@ -180,76 +134,16 @@
        logging.exception(e)
    finally:
        pass
        # print("请求结束", uid, result)
        # logger_local_huaxin_l2_upload.info(
        #     f"{code} 上传数据耗时-{_type}: {round((time.time() - start_time) * 1000, 1)} 数据量:{len(datas)}")
    # print("上传结果", result)
# 循环读取上传数据
def __run_upload_order(code: str, l2_data_callback: L2DataCallBack) -> None:
    if code not in tmep_order_detail_queue_dict:
        tmep_order_detail_queue_dict[code] = queue.Queue()
    if True:
        while True:
            # print("order task")
            try:
                if code not in target_codes:
                    break
                order_detail_upload_active_time_dict[code] = time.time()
                udatas = []
                while not tmep_order_detail_queue_dict[code].empty():
                    temp = tmep_order_detail_queue_dict[code].get()
                    udatas.append(temp)
                if udatas:
                    # start_time = time.time()
                    # upload_data(code, "l2_order", udatas)
                    l2_data_callback.OnL2Order(code, udatas, int(time.time() * 1000))
                    # l2_data_transaction_protocol.send_l2_order_detail(pipe, _mmap, code, udatas)
                    # use_time = int((time.time() - start_time) * 1000)
                    # if use_time > 10:
                    #     async_log_util.info(logger_local_huaxin_l2_upload, f"{code}-上传代码耗时:{use_time}ms")
                else:
                    # 没有数据的时候需等待,有数据时不需等待
                    time.sleep(0.001)
            except Exception as e:
                hx_logger_contact_debug.exception(e)
                logger_local_huaxin_l2_error.error(f"上传订单数据出错:{str(e)}")
                pass
def __run_upload_trans(code, l2_data_callback: L2DataCallBack):
    if code not in tmep_transaction_queue_dict:
        tmep_transaction_queue_dict[code] = queue.Queue()
    while True:
        # print("trans task")
        try:
            if code not in target_codes:
                break
            transaction_upload_active_time_dict[code] = time.time()
            udatas = []
            while not tmep_transaction_queue_dict[code].empty():
                temp = tmep_transaction_queue_dict[code].get()
                udatas.append(temp)
            if udatas:
                # upload_data(code, "l2_trans", udatas)
                l2_data_callback.OnL2Transaction(code, udatas)
            time.sleep(0.01)
        except Exception as e:
            logger_local_huaxin_l2_error.error(f"上传成交数据出错:{str(e)}")
def __run_upload_common(l2_data_callback: L2DataCallBack):
def __run_upload_common():
    print("__run_upload_common")
    logger_system.info(f"l2_client __run_upload_common 线程ID:{tool.get_thread_id()}")
    while True:
        try:
            while not common_queue.empty():
                temp = common_queue.get()
                if temp[1] == "l2_market_data":
                    l2_data_callback.OnMarketData(temp[0], temp[2])
                else:
                    upload_data(temp[0], temp[1], temp[2])
                upload_data(temp[0], temp[1], temp[2])
        except Exception as e:
            logger_local_huaxin_l2_error.exception(e)
@@ -258,121 +152,30 @@
            time.sleep(0.01)
def __run_upload_trading_canceled(l2_data_callback: L2DataCallBack):
    print("__run_upload_trading_canceled")
    logger_system.info(f"l2_client __run_upload_trading_canceled 线程ID:{tool.get_thread_id()}")
    while True:
        try:
            temp = trading_canceled_queue.get()
            if temp:
                logger_local_huaxin_g_cancel.info(f"准备上报:{temp}")
                # upload_data(temp[0], "trading_order_canceled", temp[1], new_sk=True)
                l2_data_callback.OnTradingOrderCancel(temp[0], temp[1])
                logger_local_huaxin_g_cancel.info(f"上报成功:{temp}")
        except Exception as e:
            logger_local_huaxin_l2_error.exception(e)
            logger_local_huaxin_l2_error.error(f"上传普通数据出错:{str(e)}")
def __run_log():
    print("__run_log")
    logger_system.info(f"l2_client __run_log 线程ID:{tool.get_thread_id()}")
    async_log_util.huaxin_l2_log.run_sync()
__upload_order_threads = {}
__upload_trans_threads = {}
# 运行上传任务
def run_upload_task(code: str, l2_data_callback: L2DataCallBack) -> None:
    try:
        # 如果代码没有在目标代码中就不需要运行
        if code not in target_codes:
            return
        # 如果最近的活动时间小于2s就不需要运行
        if code not in order_detail_upload_active_time_dict or time.time() - order_detail_upload_active_time_dict[
            code] > 2:
            t = threading.Thread(target=lambda: __run_upload_order(code, l2_data_callback), daemon=True)
            t.start()
            __upload_order_threads[code] = t
        if code not in transaction_upload_active_time_dict or time.time() - transaction_upload_active_time_dict[
            code] > 2:
            t = threading.Thread(target=lambda: __run_upload_trans(code, l2_data_callback), daemon=True)
            t.start()
            __upload_trans_threads[code] = t
    finally:
        pass
def run_upload_common(l2_data_callback: L2DataCallBack):
    t = threading.Thread(target=lambda: __run_upload_common(l2_data_callback), daemon=True)
    t.start()
def run_upload_trading_canceled(l2_data_callback: L2DataCallBack):
    t = threading.Thread(target=lambda: __run_upload_trading_canceled(l2_data_callback), daemon=True)
# 采用socket传输数据
def run_upload_common():
    t = threading.Thread(target=lambda: __run_upload_common(), daemon=True)
    t.start()
def run_log():
    # G撤相关数据,暂时注释
    # fdatas = log_export.load_huaxin_local_buy_no()
    # global buy_order_nos_dict
    # buy_order_nos_dict = fdatas
    t = threading.Thread(target=lambda: __run_log(), daemon=True)
    t.start()
# 运行守护线程
def run_upload_daemon(_l2_data_callback):
    def upload_daemon():
        logger_system.info(f"l2_client upload_daemon 线程ID:{tool.get_thread_id()}")
        while True:
            try:
                for code in target_codes_add_time:
                    # 目标代码加入2s之后启动守护
                    if time.time() - target_codes_add_time[code] > 2:
                        if code not in __upload_order_threads or not __upload_order_threads[code].is_alive():
                            t = threading.Thread(target=lambda: __run_upload_order(code, _l2_data_callback),
                                                 daemon=True)
                            t.start()
                            __upload_order_threads[code] = t
                            logger_local_huaxin_l2_upload.info(f"重新创建L2订单上传线程:{code}")
                        if code not in __upload_trans_threads or not __upload_trans_threads[code].is_alive():
                            t = threading.Thread(target=lambda: __run_upload_trans(code, _l2_data_callback),
                                                 daemon=True)
                            t.start()
                            __upload_trans_threads[code] = t
                            logger_local_huaxin_l2_upload.info(f"重新创建L2成交上传线程:{code}")
            except:
                pass
            finally:
                time.sleep(3)
    t = threading.Thread(target=lambda: upload_daemon(), daemon=True)
    t.start()
def __test(_l2_data_callback):
def __test():
    code = "002073"
    if code not in tmep_order_detail_queue_dict:
        tmep_order_detail_queue_dict[code] = queue.Queue()
    target_codes.add(code)
    t = threading.Thread(target=lambda: __run_upload_order(code, _l2_data_callback), daemon=True)
    t.start()
    while True:
        try:
            tmep_order_detail_queue_dict[code].put_nowait(
                ['002073', 0.0, 88100, '1', '2', 103831240, 2011, 18190761, 18069131, 'D', 1693276711224])
            time.sleep(5)
        except:
            pass
    pass
def run_test(_l2_data_callback):
    t = threading.Thread(target=lambda: __test(_l2_data_callback), daemon=True)
def run_test():
    t = threading.Thread(target=lambda: __test(), daemon=True)
    t.start()