Administrator
4 天以前 48fb7a00951f91bdc707e5dd2d196e5bccb752c3
huaxin_client/l2_client.py
@@ -6,26 +6,25 @@
import queue
import threading
import time
from typing import List
import concurrent.futures
from huaxin_client import command_manager, l2_data_transform_protocol
from huaxin_client import command_manager
from huaxin_client import constant
from huaxin_client import l2_data_manager
import lev2mdapi
from huaxin_client.code_queue_distribute_manager import CodeQueueDistributeManager
from huaxin_client.code_queue_distribute_manager import CodeDataCallbackDistributeManager
from huaxin_client.command_manager import L2ActionCallback
from huaxin_client.l2_data_manager import L2DataUploadManager
from log_module import log, async_log_util
from log_module.async_log_util import huaxin_l2_log
from log_module.log import logger_local_huaxin_l2_subscript, logger_system, logger_local_huaxin_l2_transaction, \
    logger_local_huaxin_g_cancel, logger_l2_codes_subscript
from log_module.log import logger_local_huaxin_l2_subscript, logger_system, logger_l2_codes_subscript, logger_debug
from utils import tool
###B类###
Front_Address = "tcp://10.0.1.101:6900"
Multicast_Address = "udp://224.224.2.19:7889"
Multicast_Address2 = "udp://224.224.224.234:7890"
Local_Interface_Address = "192.168.84.75"
Local_Interface_Address = constant.LOCAL_IP
###A类###
if constant.IS_A:
@@ -51,8 +50,9 @@
SZ_Securities = [b"002456", b"002849", b"002281", b"002336", b"000936", b"000920", b"000757", b"002896", b"002725",
                 b"000952", b"000526", b"000753", b"000681", b"002088", b"002436"]
SZ_Bond_Securities = [b"100303", b"109559", b"112617"]
set_codes_data_queue = queue.Queue()
market_code_dict = {}
set_codes_data_queue = queue.Queue(maxsize=1000)
ENABLE_NGST = True
class Lev2MdSpi(lev2mdapi.CTORATstpLev2MdSpi):
@@ -77,9 +77,10 @@
        szse_codes = []
        sse_codes = []
        for code in codes:
            if code.find("00") == 0:
            market_type = tool.get_market_type(code)
            if market_type == tool.MARKET_TYPE_SZSE:
                szse_codes.append(code.encode())
            elif code.find("60") == 0:
            elif market_type == tool.MARKET_TYPE_SSE:
                sse_codes.append(code.encode())
        return sse_codes, szse_codes
@@ -91,27 +92,38 @@
        logger_local_huaxin_l2_subscript.info(f"取消订阅上证:{sh}")
        logger_local_huaxin_l2_subscript.info(f"取消订阅深证:{sz}")
        if sh:
            # 取消订阅逐笔委托
            self.__api.UnSubscribeOrderDetail(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
            # 取消订阅逐笔成交
            self.__api.UnSubscribeTransaction(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
            if ENABLE_NGST:
                result = self.__api.UnSubscribeNGTSTick(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
                logger_local_huaxin_l2_subscript.info(f"逐笔NGTS订阅结果sh:{result}")
            else:
                # 取消订阅逐笔委托
                self.__api.UnSubscribeOrderDetail(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
                # 取消订阅逐笔成交
                self.__api.UnSubscribeTransaction(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
            self.__api.UnSubscribeMarketData(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
        if sz:
            self.__api.UnSubscribeOrderDetail(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
            self.__api.UnSubscribeTransaction(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
            self.__api.UnSubscribeMarketData(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
    def subscribe_codes(self, _codes):
        self.__subscribe(_codes)
    def __subscribe(self, _codes):
        sh, sz = self.__split_codes(_codes)
        logger_local_huaxin_l2_subscript.info(f"订阅上证:{sh}")
        logger_local_huaxin_l2_subscript.info(f"订阅深证:{sz}")
        if sh:
            # 订阅逐笔委托
            result = self.__api.SubscribeOrderDetail(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
            logger_local_huaxin_l2_subscript.info(f"逐笔委托订阅结果sh:{result}")
            # 订阅逐笔成交
            result = self.__api.SubscribeTransaction(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
            logger_local_huaxin_l2_subscript.info(f"逐笔成交订阅结果sh:{result}")
            if ENABLE_NGST:
                result = self.__api.SubscribeNGTSTick(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
                logger_local_huaxin_l2_subscript.info(f"逐笔NGTS订阅结果sh:{result}")
            else:
                # 订阅逐笔委托
                result = self.__api.SubscribeOrderDetail(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
                logger_local_huaxin_l2_subscript.info(f"逐笔委托订阅结果sh:{result}")
                # 订阅逐笔成交
                result = self.__api.SubscribeTransaction(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
                logger_local_huaxin_l2_subscript.info(f"逐笔成交订阅结果sh:{result}")
            result = self.__api.SubscribeMarketData(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
            logger_local_huaxin_l2_subscript.info(f"市场订阅结果sh:{result}")
@@ -125,6 +137,9 @@
    def __process_codes_data(self, codes_data, from_cache=False, delay=0.0):
        if from_cache and self.codes_volume_and_price_dict:
            return
        if not self.is_login and not constant.TEST:
            raise Exception("L2尚未登录")
        if delay > 0:
@@ -133,8 +148,9 @@
        for d in codes_data:
            code = d[0]
            codes.add(code)
            self.codes_volume_and_price_dict[code] = (d[1], d[2])
            self.l2_data_upload_manager.set_order_fileter_condition(code, d[1], float(d[2]))
            self.codes_volume_and_price_dict[code] = (d[1], d[2], d[3], d[4], d[5])
            self.l2_data_upload_manager.set_order_fileter_condition(code, d[1], round(float(d[2]), 2), d[3], d[4], d[5])
        logger_l2_codes_subscript.info("华鑫L2订阅总数:{}", len(codes))
        add_codes = codes - self.subscripted_codes
        del_codes = self.subscripted_codes - codes
        print("add del codes", add_codes, del_codes)
@@ -143,9 +159,9 @@
                self.l2_data_upload_manager.release_distributed_upload_queue(c)
                l2_data_manager.del_target_code(c)
            for c in codes:
                self.l2_data_upload_manager.distribute_upload_queue(c)
                self.l2_data_upload_manager.distribute_upload_queue(c, codes)
                l2_data_manager.add_target_code(c)
        except Exception as e:
        except Exception as e:  # TODO 清除原来还没释放掉的数据
            logger_system.error(f"L2代码分配上传队列出错:{str(e)}")
            logger_system.exception(e)
        self.__subscribe(add_codes)
@@ -153,6 +169,8 @@
        if add_codes:
            logger_system.info(f"新增L2订阅代码数量({'缓存' if from_cache else ''}):{len(add_codes)}")
            for c in add_codes:
                logger_l2_codes_subscript.info(f"l2委托数据过滤条件:{c} - {self.codes_volume_and_price_dict.get(c)}")
        logger_l2_codes_subscript.info("华鑫L2订阅结束,add-{} del-{}", len(add_codes), len(del_codes))
@@ -185,17 +203,6 @@
                    return data_json[1]
        return []
    def set_code_special_watch_volume(self, code, volume):
        # 有效期为3s
        # self.special_code_volume_for_order_dict[code] = (volume, time.time() + 3)
        d = self.codes_volume_and_price_dict.get(code)
        if d:
            min_volume, limit_up_price = d[0], d[1]
            self.l2_data_upload_manager.set_order_fileter_condition(code, min_volume, limit_up_price,
                                                                    {volume, constant.SHADOW_ORDER_VOLUME},
                                                                    time.time() + 3)
            huaxin_l2_log.info(logger_local_huaxin_l2_subscript, f"设置下单量监听:{code}-{volume}")
    def OnFrontConnected(self):
        print("OnFrontConnected")
        logger_system.info(f"l2_client OnFrontConnected 线程ID:{tool.get_thread_id()}")
@@ -212,10 +219,12 @@
        if pRspInfo['ErrorID'] == 0:
            print("----L2行情登录成功----")
            self.is_login = True
            logger_system.info(f"L2行情登录成功")
            # 初始设置值
            threading.Thread(
                target=lambda: self.__process_codes_data(self.__get_latest_datas(), from_cache=True, delay=6.0),
                daemon=True).start()
            if tool.trade_time_sub(tool.get_now_time_str(), "09:20:00") > 0:
                threading.Thread(
                    target=lambda: self.__process_codes_data(self.__get_latest_datas(), from_cache=True, delay=60),
                    daemon=True).start()
    def OnRspSubMarketData(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
        print("OnRspSubMarketData")
@@ -236,6 +245,8 @@
        if pRspInfo["ErrorID"] == 0:
            print("订阅成功")
            self.subscripted_codes.add(pSpecificSecurity['SecurityID'])
            # 初始化
            SubscriptDefend.set_l2_market_update(pSpecificSecurity['SecurityID'])
        if bIsLast == 1:
            print("订阅响应结束", self.subscripted_codes)
            l2_data_manager.add_subscript_codes(self.subscripted_codes)
@@ -244,6 +255,29 @@
        print("OnRspUnSubOrderDetail", bIsLast)
        try:
            code = pSpecificSecurity['SecurityID']
            self.subscripted_codes.discard(code)
            if bIsLast == 1:
                print("取消订阅响应结束", self.subscripted_codes)
                l2_data_manager.add_subscript_codes(self.subscripted_codes)
        except Exception as e:
            logging.exception(e)
    def OnRspSubNGTSTick(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
        async_log_util.info(logger_local_huaxin_l2_subscript,
                            f"NGTS订阅结果:{pSpecificSecurity['SecurityID']} {pRspInfo['ErrorID']} {pRspInfo['ErrorMsg']}")
        if pRspInfo["ErrorID"] == 0:
            print("订阅成功")
            self.subscripted_codes.add(pSpecificSecurity['SecurityID'])
            # 初始化
            SubscriptDefend.set_l2_market_update(pSpecificSecurity['SecurityID'])
        if bIsLast == 1:
            print("订阅响应结束", self.subscripted_codes)
            l2_data_manager.add_subscript_codes(self.subscripted_codes)
    def OnRspUnSubNGTSTick(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
        try:
            code = pSpecificSecurity['SecurityID']
            logger_local_huaxin_l2_subscript.info(f"NGTS取消订阅:{code}")
            self.subscripted_codes.discard(code)
            if bIsLast == 1:
                print("取消订阅响应结束", self.subscripted_codes)
@@ -266,34 +300,42 @@
    def OnRspSubXTSTick(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
        print("OnRspSubXTSTick")
    # 4.0.5版本接口
    def OnRspSubNGTSTick(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
        print("OnRspSubNGTSTick")
    def OnRtnMarketData(self, pDepthMarketData, FirstLevelBuyNum, FirstLevelBuyOrderVolumes, FirstLevelSellNum,
                        FirstLevelSellOrderVolumes):
        # 传入:时间,现价,成交总量,买1,买2,买3,买4,买5,卖1,卖2,卖3,卖4,卖5
        try:
            buys = [(pDepthMarketData['BidPrice1'], pDepthMarketData['BidVolume1']),
                    (pDepthMarketData['BidPrice2'], pDepthMarketData['BidVolume2']),
                    (pDepthMarketData['BidPrice3'], pDepthMarketData['BidVolume3']),
                    (pDepthMarketData['BidPrice4'], pDepthMarketData['BidVolume4']),
                    (pDepthMarketData['BidPrice5'], pDepthMarketData['BidVolume5'])]
            for i in range(6, 11):
                if not pDepthMarketData[f"BidVolume{i}"]:
                    break
                buys.append((pDepthMarketData[f'BidPrice{i}'], pDepthMarketData[f'BidVolume{i}']))
            sells = [
                (pDepthMarketData['AskPrice1'], pDepthMarketData['AskVolume1']),
                (pDepthMarketData['AskPrice2'], pDepthMarketData['AskVolume2']),
                (pDepthMarketData['AskPrice3'], pDepthMarketData['AskVolume3']),
                (pDepthMarketData['AskPrice4'], pDepthMarketData['AskVolume4']),
                (pDepthMarketData['AskPrice5'], pDepthMarketData['AskVolume5'])
            ]
            for i in range(6, 11):
                if not pDepthMarketData[f"AskVolume{i}"]:
                    break
                sells.append((pDepthMarketData[f'AskPrice{i}'], pDepthMarketData[f'AskVolume{i}']))
            d = {"dataTimeStamp": pDepthMarketData['DataTimeStamp'], "securityID": pDepthMarketData['SecurityID'],
                 "lastPrice": pDepthMarketData['LastPrice'],
                 "totalVolumeTrade": pDepthMarketData['TotalVolumeTrade'],
                 "totalValueTrade": pDepthMarketData['TotalValueTrade'],
                 "totalAskVolume": pDepthMarketData['TotalAskVolume'],
                 "avgAskPrice": pDepthMarketData["AvgAskPrice"],
                 "buy": [(pDepthMarketData['BidPrice1'], pDepthMarketData['BidVolume1']),
                         (pDepthMarketData['BidPrice2'], pDepthMarketData['BidVolume2']),
                         (pDepthMarketData['BidPrice3'], pDepthMarketData['BidVolume3']),
                         (pDepthMarketData['BidPrice4'], pDepthMarketData['BidVolume4']),
                         (pDepthMarketData['BidPrice5'], pDepthMarketData['BidVolume5'])],
                 "sell": [
                     (pDepthMarketData['AskPrice1'], pDepthMarketData['AskVolume1']),
                     (pDepthMarketData['AskPrice2'], pDepthMarketData['AskVolume2']),
                     (pDepthMarketData['AskPrice3'], pDepthMarketData['AskVolume3']),
                     (pDepthMarketData['AskPrice4'], pDepthMarketData['AskVolume4']),
                     (pDepthMarketData['AskPrice5'], pDepthMarketData['AskVolume5'])
                 ]}
            market_code_dict[pDepthMarketData['SecurityID']] = time.time()
                 "buy": buys,
                 "sell": sells}
            self.l2_data_upload_manager.add_market_data(d)
            SubscriptDefend.set_l2_market_update(pDepthMarketData['SecurityID'])
        except:
            pass
@@ -313,12 +355,6 @@
        # min_volume, limit_up_price = self.codes_volume_and_price_dict.get(code)
        # 输出逐笔成交数据
        if pTransaction['ExecType'] == b"2":
            # if min_volume is None:
            #     # 默认筛选50w
            #     if pTransaction['TradePrice'] * pTransaction['Volume'] < 500000:
            #         return
            # elif pTransaction['TradeVolume'] < min_volume:
            #     return
            # 撤单
            item = {"SecurityID": pTransaction['SecurityID'], "Price": pTransaction['TradePrice'],
                    "Volume": pTransaction['TradeVolume'],
@@ -356,28 +392,6 @@
            self.l2_data_upload_manager.add_transaction_detail(item)
    def OnRtnOrderDetail(self, pOrderDetail):
        # can_listen = False
        # code = str(pOrderDetail['SecurityID'])
        # start_time = 0
        # if code in self.special_code_volume_for_order_dict:
        #     start_time = time.time()
        #     if self.special_code_volume_for_order_dict[code][0] == pOrderDetail[
        #         'Volume'] or constant.SHADOW_ORDER_VOLUME == pOrderDetail['Volume']:
        #         # 监控目标订单与影子订单
        #         if self.special_code_volume_for_order_dict[code][1] > time.time():
        #             # 特殊量监听
        #             can_listen = True
        #         else:
        #             self.special_code_volume_for_order_dict.pop(code)
        # if not can_listen:
        #     min_volume, limit_up_price = self.codes_volume_and_price_dict.get(code)
        #     if min_volume is None:
        #         # 默认筛选50w
        #         if pOrderDetail['Price'] * pOrderDetail['Volume'] < 500000:
        #             return
        #     elif pOrderDetail['Volume'] < min_volume:
        #         return
        # 输出逐笔委托数据
        # 上证OrderStatus=b"D"表示撤单
        item = {"SecurityID": pOrderDetail['SecurityID'], "Price": pOrderDetail['Price'],
                "Volume": pOrderDetail['Volume'],
@@ -386,6 +400,38 @@
                "SubSeq": pOrderDetail['SubSeq'], "OrderNO": pOrderDetail['OrderNO'],
                "OrderStatus": pOrderDetail['OrderStatus'].decode()}
        self.l2_data_upload_manager.add_l2_order_detail(item, 0)
    def OnRtnNGTSTick(self, pTick):
        """
        上证股票的逐笔委托与成交
        @param pTick:
        @return:
        """
        try:
            if pTick['TickType'] == b'T':
                # 成交
                item = {"SecurityID": pTick['SecurityID'], "TradePrice": pTick['Price'],
                        "TradeVolume": pTick['Volume'],
                        "OrderTime": pTick['TickTime'], "MainSeq": pTick['MainSeq'],
                        "SubSeq": pTick['SubSeq'], "BuyNo": pTick['BuyNo'],
                        "SellNo": pTick['SellNo'],
                        "ExecType": '1'}
                self.l2_data_upload_manager.add_transaction_detail(item)
            elif pTick['TickType'] == b'A' or pTick['TickType'] == b'D':
                # 撤单
                item = {"SecurityID": pTick['SecurityID'], "Price": pTick['Price'],
                        "Volume": pTick['Volume'],
                        "Side": pTick['Side'].decode(), "OrderType": pTick['TickType'].decode(),
                        "OrderTime": pTick['TickTime'], "MainSeq": pTick['MainSeq'],
                        "SubSeq": pTick['SubSeq'], "OrderNO": '',
                        "OrderStatus": pTick['TickType'].decode()}
                if pTick['Side'] == b'1':
                    item['OrderNO'] = pTick['BuyNo']
                elif pTick['Side'] == b'2':
                    item['OrderNO'] = pTick['SellNo']
                self.l2_data_upload_manager.add_l2_order_detail(item, 0)
        except Exception as e:
            logger_local_huaxin_l2_subscript.exception(e)
    def OnRtnBondMarketData(self, pDepthMarketData, FirstLevelBuyNum, FirstLevelBuyOrderVolumes, FirstLevelSellNum,
                            FirstLevelSellOrderVolumes):
@@ -459,40 +505,50 @@
        for sell_index in range(0, FirstLevelSellNum):
            print("first level sell [%d] : [%d]" % (sell_index, FirstLevelSellOrderVolumes[sell_index]))
    def OnRtnXTSTick(self, pTick):
        # 输出上海债券逐笔数据’
        print(
            "OnXTSTick TickType[%s] SecurityID[%s] Price[%.2f] Volume[%d] TickTime[%d] MainSeq[%d] SubSeq[%d] BuyNo[%d] SellNo[%d]" % (
                pTick['TickType'],
                pTick['SecurityID'],
                pTick['Price'],
                pTick['Volume'],
                pTick['TickTime'],
                pTick['MainSeq'],
                pTick['SubSeq'],
                pTick['BuyNo'],
                pTick['SellNo']))
    def OnRtnNGTSTick(self, pTick):
        # 输出上海股基逐笔数据’
        print(
            "OnRtnNGTSTick TickType[%s] SecurityID[%s] Price[%.2f] Volume[%d] TickTime[%d] MainSeq[%d] SubSeq[%d] BuyNo[%d] SellNo[%d]" % (
                pTick['TickType'],
                pTick['SecurityID'],
                pTick['Price'],
                pTick['Volume'],
                pTick['TickTime'],
                pTick['MainSeq'],
                pTick['SubSeq'],
                pTick['BuyNo'],
                pTick['SellNo']))
class SubscriptDefend:
    """
    订阅守护
    定义:当订阅的代码超过一定时间没有回调数据时重新订阅
    """
    __l2_market_update_time = {}
    @classmethod
    def set_l2_market_update(cls, code):
        cls.__l2_market_update_time[code] = time.time()
    @classmethod
    def run(cls):
        while True:
            try:
                now_time = tool.get_now_time_as_int()
                if now_time < int("093015"):
                    continue
                if int("112945") < now_time < int("130015"):
                    continue
                if int("145645") < now_time:
                    continue
                if spi.subscripted_codes:
                    codes = []
                    for code in spi.subscripted_codes:
                        # 获取上次更新时间
                        update_time = cls.__l2_market_update_time.get(code)
                        if update_time and time.time() - update_time > 15:
                            # 需要重新订阅
                            codes.append(code)
                    if codes:
                        logger_debug.info(f"重新订阅:{codes}")
                        spi.subscribe_codes(codes)
            except:
                pass
            finally:
                time.sleep(15)
class MyL2ActionCallback(L2ActionCallback):
    def OnSetL2Position(self, codes_data):
        print("L2订阅数量:", len(codes_data))
        logger_l2_codes_subscript.info("华鑫L2代码处理队列获取到数据:数量-{}", len(codes_data))
        huaxin_l2_log.info(logger_l2_codes_subscript, "华鑫L2代码处理队列获取到数据:数量-{}", len(codes_data))
        try:
            spi.set_codes_data(codes_data)
        except Exception as e:
@@ -507,9 +563,9 @@
    g_SubMode = lev2mdapi.TORA_TSTP_MST_MCAST
    # case 1缓存模式
    # api = lev2mdapi.CTORATstpLev2MdApi_CreateTstpLev2MdApi(g_SubMode, True)
    api = lev2mdapi.CTORATstpLev2MdApi_CreateTstpLev2MdApi(g_SubMode, True)
    # case 2非缓存模式
    api = lev2mdapi.CTORATstpLev2MdApi_CreateTstpLev2MdApi(g_SubMode, False)
    # api = lev2mdapi.CTORATstpLev2MdApi_CreateTstpLev2MdApi(g_SubMode, False)
    global spi
    spi = Lev2MdSpi(api, l2_data_upload_manager)
    api.RegisterSpi(spi)
@@ -532,6 +588,9 @@
    api.Init()
__l2_cmd_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=3)
def __receive_from_queue_trade(queue_trade_w_l2_r: multiprocessing.Queue):
    logger_system.info(f"l2_client __receive_from_pipe_trade 线程ID:{tool.get_thread_id()}")
    while True:
@@ -542,12 +601,15 @@
                    value = value.decode("utf-8")
                data = json.loads(value)
                _type = data["type"]
                if _type == "listen_volume":
                    volume = data["data"]["volume"]
                    code = data["data"]["code"]
                    spi.set_code_special_watch_volume(code, volume)
                elif _type == "l2_cmd":
                    l2CommandManager.process_command(command_manager.CLIENT_TYPE_CMD_L2, None, data)
                if _type == "l2_cmd":
                    __start_time = time.time()
                    # 线程池
                    __l2_cmd_thread_pool.submit(
                        lambda: l2CommandManager.process_command(command_manager.CLIENT_TYPE_CMD_L2, None, data))
                    use_time = time.time() - __start_time
                    if use_time > 0.005:
                        huaxin_l2_log.info(logger_local_huaxin_l2_subscript, f"l2_cmd耗时:{use_time}s")
        except Exception as e:
            logging.exception(e)
@@ -555,43 +617,50 @@
pipe_strategy = None
def run(queue_r: multiprocessing.Queue, order_queues: List[multiprocessing.Queue],
        transaction_queues: List[multiprocessing.Queue], market_queue: multiprocessing.Queue) -> None:
    # def test_add_codes():
    #     time.sleep(5)
    #     # if value:
    #     #     if type(value) == bytes:
    #     #         value = value.decode("utf-8")
    #     #     data = json.loads(value)
    #     #     _type = data["type"]
    #     #     if _type == "listen_volume":
    #     #         volume = data["data"]["volume"]
    #     #         code = data["data"]["code"]
    #     #         spi.set_code_special_watch_volume(code, volume)
    #     #     elif _type == "l2_cmd":
    #     #         l2CommandManager.process_command(command_manager.CLIENT_TYPE_CMD_L2, None, data)
    #
    #     demo_datas = [("603002", int(50 * 10000 / 6.35), 6.35), ("002654", int(50 * 10000 / 15.59), 15.59),
    #                   ("603701", int(50 * 10000 / 14.28), 14.28), ("002908", int(50 * 10000 / 12.78), 12.78)]
    #
    #     queue_r.put_nowait(json.dumps({"type": "l2_cmd", "data": [demo_datas[0]]}))
    #     time.sleep(1)
    #
    #     spi.l2_data_upload_manager.add_l2_order_detail(
    #         {'SecurityID': '603002', 'Price': 6.35, 'Volume': 275000, 'Side': "1", 'OrderType': '0',
    #          'OrderTime': '13000015',
    #          'MainSeq': 2, 'SubSeq': 6739147, 'OrderNO': 5512466, 'OrderStatus': 'D'}, 0)
    #     spi.l2_data_upload_manager.add_l2_order_detail(
    #         {'SecurityID': '603002', 'Price': 6.35, 'Volume': 200, 'Side': "1", 'OrderType': '0',
    #          'OrderTime': '13000015',
    #          'MainSeq': 2, 'SubSeq': 6739148, 'OrderNO': 5512467, 'OrderStatus': 'D'}, 0)
    #     queue_r.put_nowait(json.dumps({"type": "listen_volume", "data": {"code": "603002", "volume": 100}}))
    #     time.sleep(0.1)
    #     spi.l2_data_upload_manager.add_l2_order_detail(
    #         {'SecurityID': '603002', 'Price': 6.35, 'Volume': 100, 'Side': "1", 'OrderType': '0',
    #          'OrderTime': '13000015',
    #          'MainSeq': 2, 'SubSeq': 6739148, 'OrderNO': 5512467, 'OrderStatus': 'D'}, 0)
def test_add_codes(queue_r):
    time.sleep(10)
    # if value:
    #     if type(value) == bytes:
    #         value = value.decode("utf-8")
    #     data = json.loads(value)
    #     _type = data["type"]
    #     if _type == "listen_volume":
    #         volume = data["data"]["volume"]
    #         code = data["data"]["code"]
    #         spi.set_code_special_watch_volume(code, volume)
    #     elif _type == "l2_cmd":
    #         l2CommandManager.process_command(command_manager.CLIENT_TYPE_CMD_L2, None, data)
    demo_datas = [("603002", int(50 * 10000 / 6.35), 6.35, 6.00, 200),
                  ("002654", int(50 * 10000 / 15.59), 15.59, 15.3, 200),
                  ("603701", int(50 * 10000 / 14.28), 14.28, 14.00, 200),
                  ("002908", int(50 * 10000 / 12.78), 12.78, 12.00, 200)]
    queue_r.put_nowait(json.dumps({"type": "l2_cmd", "data": [demo_datas[0]]}))
    time.sleep(10)
    while True:
        try:
            spi.l2_data_upload_manager.add_l2_order_detail(
                {'SecurityID': '603002', 'Price': 6.35, 'Volume': 275000, 'Side': "1", 'OrderType': '0',
                 'OrderTime': '13000015',
                 'MainSeq': 2, 'SubSeq': 6739147, 'OrderNO': 5512466, 'OrderStatus': 'D'}, 0)
            spi.l2_data_upload_manager.add_l2_order_detail(
                {'SecurityID': '603002', 'Price': 6.35, 'Volume': 200, 'Side': "1", 'OrderType': '0',
                 'OrderTime': '13000015',
                 'MainSeq': 2, 'SubSeq': 6739148, 'OrderNO': 5512467, 'OrderStatus': 'D'}, 0)
            # queue_r.put_nowait(json.dumps({"type": "listen_volume", "data": {"code": "603002", "volume": 100}}))
            time.sleep(0.1)
            spi.l2_data_upload_manager.add_l2_order_detail(
                {'SecurityID': '603002', 'Price': 6.35, 'Volume': 100, 'Side': "1", 'OrderType': '0',
                 'OrderTime': '13000015',
                 'MainSeq': 2, 'SubSeq': 6739148, 'OrderNO': 5512467, 'OrderStatus': 'D'}, 0)
        except Exception as e:
            logging.exception(e)
        finally:
            time.sleep(10)
def run(queue_r: multiprocessing.Queue, data_callbacks: list) -> None:
    logger_system.info("L2进程ID:{}", os.getpid())
    logger_system.info(f"l2_client 线程ID:{tool.get_thread_id()}")
    try:
@@ -599,17 +668,16 @@
        if queue_r is not None:
            t1 = threading.Thread(target=lambda: __receive_from_queue_trade(queue_r), daemon=True)
            t1.start()
        # 订阅守护
        threading.Thread(target=SubscriptDefend.run, daemon=True).start()
        # 初始化
        order_queue_distribute_manager = CodeQueueDistributeManager(order_queues)
        transaction_queue_distribute_manager = CodeQueueDistributeManager(transaction_queues)
        l2_data_upload_manager = L2DataUploadManager(order_queue_distribute_manager,
                                                     transaction_queue_distribute_manager, market_queue)
        data_callback_distribute_manager = CodeDataCallbackDistributeManager(data_callbacks)
        l2_data_upload_manager = L2DataUploadManager(data_callback_distribute_manager)
        __init_l2(l2_data_upload_manager)
        l2_data_manager.run_upload_common()
        l2_data_manager.run_log()
        # 测试
        # threading.Thread(target=lambda: test_add_codes(),daemon=True).start()
        # TODO 测试
        # threading.Thread(target=lambda: test_add_codes(queue_r), daemon=True).start()
        global l2CommandManager
        l2CommandManager = command_manager.L2CommandManager()
        l2CommandManager.init(MyL2ActionCallback())
@@ -656,13 +724,13 @@
             'OrderTime': '13000015',
             'MainSeq': 2, 'SubSeq': 6739148, 'OrderNO': 5512467, 'OrderStatus': 'D'}, 0)
    queue_r = multiprocessing.Queue()
    queue_r = multiprocessing.Queue(maxsize=1024)
    order_queues = []
    transaction_queues = []
    market_queue = multiprocessing.Queue()
    market_queue = multiprocessing.Queue(maxsize=1024)
    for i in range(20):
        order_queues.append(multiprocessing.Queue())
        transaction_queues.append(multiprocessing.Queue())
        order_queues.append(multiprocessing.Queue(maxsize=1024))
        transaction_queues.append(multiprocessing.Queue(maxsize=1024))
    threading.Thread(target=test_add_codes).start()
    run(queue_r, order_queues, transaction_queues, market_queue)