Administrator
2025-03-07 4a2ac13b0ac30601fbd0fcf63ac533011d17dfb7
bug修复/新版L2订阅初步改造
3个文件已修改
4个文件已添加
1475 ■■■■■ 已修改文件
huaxin_client/code_queue_distribute_manager.py 87 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_client_v2.py 721 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_data_manager_v2.py 471 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_manager_new.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/subscript/l2_subscript_manager.py 87 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.py 41 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utils/shared_memery_util.py 66 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/code_queue_distribute_manager.py
@@ -3,6 +3,8 @@
"""
import copy
import zmq
from log_module.log import logger_local_huaxin_l2_error
@@ -113,3 +115,88 @@
        """
        codes = self.distibuted_code_callback_dict.keys()
        return copy.deepcopy(codes)
# 数据通道分配
class CodeDataChannelDistributeManager:
    def __init__(self, channel_list: list):
        """
        # [(逐笔委托共享内存, 逐笔成交共享内存, 通知地址)]
        @param channel_list:[((编号,multiprocessing.Array, zmq_address),(编号, multiprocessing.Array, zmq_address))]
        """
        flist = []
        for channel_data in channel_list:
            flist.append((id(channel_data), channel_data))
        self.channel_list = flist
        self.distibuted_code_channel_dict = {}
        self.zmq_host_socket_dict = {}
        self.zmq_context = zmq.Context()
        for channel in channel_list:
            socket = self.zmq_context.socket(zmq.REQ)
            host = channel[0][2]
            socket.connect(host)
            self.zmq_host_socket_dict[host] = socket
            socket = self.zmq_context.socket(zmq.REQ)
            host = channel[1][2]
            socket.connect(host)
            self.zmq_host_socket_dict[host] = socket
    def get_zmq_socket(self, host):
        return self.zmq_host_socket_dict.get(host)
    # 获取可用的队列
    def get_available_channel(self):
        # 已经分配的回调ID
        distibuted_channels_ids = set()
        for code in self.distibuted_code_channel_dict:
            distibuted_channels_ids.add(self.distibuted_code_channel_dict[code][0])
        for channel in self.channel_list:
            if channel[0] not in distibuted_channels_ids:
                return channel
        return None
    # 为代码分配队列
    def distribute_channel(self, code, target_codes=None):
        if code in self.distibuted_code_channel_dict:
            return self.distibuted_code_channel_dict.get(code)
        callback_info = self.get_available_channel()
        if not callback_info:
            distibuted_callbacks_ids = set()
            need_release_codes = set()
            for code in self.distibuted_code_channel_dict:
                distibuted_callbacks_ids.add(self.distibuted_code_channel_dict[code][0])
                # 如果代码没在目标代码中就移除
                if target_codes and code not in target_codes:
                    need_release_codes.add(code)
            for c in need_release_codes:
                self.release_distribute_channel(c)
            logger_local_huaxin_l2_error.info(f"已经分配的代码:{self.distibuted_code_channel_dict.keys()}")
            logger_local_huaxin_l2_error.info(f"已经分配的callbackid:{distibuted_callbacks_ids}")
            # 删除已经没在目标代码中的分配
            raise Exception("无可用的回调对象")
        self.distibuted_code_channel_dict[code] = callback_info
        return callback_info
    # 获取代码分配的队列
    def get_distributed_channel(self, code):
        if code in self.distibuted_code_channel_dict:
            return self.distibuted_code_channel_dict.get(code)[1]
        else:
            return None
    def release_distribute_channel(self, code):
        if code in self.distibuted_code_channel_dict:
            self.distibuted_code_channel_dict.pop(code)
    # 获取空闲的位置数量
    def get_free_channel_count(self):
        return len(self.channel_list) - len(self.distibuted_code_channel_dict.keys())
    def get_distributed_codes(self):
        """
        获取已经分配的代码
        @return:
        """
        codes = self.distibuted_code_channel_dict.keys()
        return copy.deepcopy(codes)
huaxin_client/l2_client_v2.py
New file
@@ -0,0 +1,721 @@
# -*- coding: utf-8 -*-
import json
import logging
import multiprocessing
import os
import queue
import threading
import time
import concurrent.futures
from huaxin_client import command_manager
from huaxin_client import constant
from huaxin_client import l2_data_manager_v2
import lev2mdapi
from huaxin_client.code_queue_distribute_manager import CodeDataCallbackDistributeManager, \
    CodeDataChannelDistributeManager
from huaxin_client.command_manager import L2ActionCallback
from huaxin_client.l2_data_manager_v2 import L2DataUploadManager
from log_module import log, async_log_util
from log_module.async_log_util import huaxin_l2_log
from log_module.log import logger_local_huaxin_l2_subscript, logger_system, logger_l2_codes_subscript, logger_debug
from utils import tool
###B类###
Front_Address = "tcp://10.0.1.101:6900"
Multicast_Address = "udp://224.224.2.19:7889"
Multicast_Address2 = "udp://224.224.224.234:7890"
Local_Interface_Address = constant.LOCAL_IP
###A类###
if constant.IS_A:
    Front_Address = "tcp://10.0.1.101:6900"
    Multicast_Address = "udp://224.224.22.3:8889"
    Multicast_Address2 = "udp://224.224.224.231:4889"
    Local_Interface_Address = "172.16.22.111"
ENABLE_NGST = True
class Lev2MdSpi(lev2mdapi.CTORATstpLev2MdSpi):
    latest_codes_set = set()
    special_code_volume_for_order_dict = {}
    # 已经订阅的代码
    subscripted_codes = set()
    # 买入的大单订单号
    def __init__(self, api, l2_data_upload_manager: L2DataUploadManager):
        lev2mdapi.CTORATstpLev2MdSpi.__init__(self)
        self.__api = api
        self.is_login = False
        self.l2_data_upload_manager = l2_data_upload_manager
        self.codes_volume_and_price_dict = {}
    def __split_codes(self, codes):
        szse_codes = []
        sse_codes = []
        for code in codes:
            market_type = tool.get_market_type(code)
            if market_type == tool.MARKET_TYPE_SZSE:
                szse_codes.append(code.encode())
            elif market_type == tool.MARKET_TYPE_SSE:
                sse_codes.append(code.encode())
        return sse_codes, szse_codes
    # 新增订阅
    # 取消订阅
    def __unsubscribe(self, _codes):
        sh, sz = self.__split_codes(_codes)
        logger_local_huaxin_l2_subscript.info(f"取消订阅上证:{sh}")
        logger_local_huaxin_l2_subscript.info(f"取消订阅深证:{sz}")
        if sh:
            if ENABLE_NGST:
                result = self.__api.UnSubscribeNGTSTick(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
                logger_local_huaxin_l2_subscript.info(f"逐笔NGTS订阅结果sh:{result}")
            else:
                # 取消订阅逐笔委托
                self.__api.UnSubscribeOrderDetail(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
                # 取消订阅逐笔成交
                self.__api.UnSubscribeTransaction(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
            self.__api.UnSubscribeMarketData(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
        if sz:
            self.__api.UnSubscribeOrderDetail(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
            self.__api.UnSubscribeTransaction(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
            self.__api.UnSubscribeMarketData(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
    def subscribe_codes(self, _codes):
        self.__subscribe(_codes)
    def __subscribe(self, _codes):
        sh, sz = self.__split_codes(_codes)
        logger_local_huaxin_l2_subscript.info(f"订阅上证:{sh}")
        logger_local_huaxin_l2_subscript.info(f"订阅深证:{sz}")
        if sh:
            if ENABLE_NGST:
                result = self.__api.SubscribeNGTSTick(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
                logger_local_huaxin_l2_subscript.info(f"逐笔NGTS订阅结果sh:{result}")
            else:
                # 订阅逐笔委托
                result = self.__api.SubscribeOrderDetail(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
                logger_local_huaxin_l2_subscript.info(f"逐笔委托订阅结果sh:{result}")
                # 订阅逐笔成交
                result = self.__api.SubscribeTransaction(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
                logger_local_huaxin_l2_subscript.info(f"逐笔成交订阅结果sh:{result}")
            result = self.__api.SubscribeMarketData(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
            logger_local_huaxin_l2_subscript.info(f"市场订阅结果sh:{result}")
        if sz:
            result = self.__api.SubscribeOrderDetail(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
            logger_local_huaxin_l2_subscript.info(f"逐笔委托订阅结果sz:{result}")
            result = self.__api.SubscribeTransaction(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
            logger_local_huaxin_l2_subscript.info(f"逐笔成交订阅结果sz:{result}")
            result = self.__api.SubscribeMarketData(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
            logger_local_huaxin_l2_subscript.info(f"市场订阅结果sz:{result}")
    def __process_codes_data(self, codes_data, from_cache=False, delay=0.0):
        if from_cache and self.codes_volume_and_price_dict:
            return
        if not self.is_login and not constant.TEST:
            raise Exception("L2尚未登录")
        if delay > 0:
            time.sleep(delay)
        codes = set()
        for d in codes_data:
            code = d[0]
            codes.add(code)
            self.codes_volume_and_price_dict[code] = (d[1], d[2], d[3], d[4], d[5])
            self.l2_data_upload_manager.set_order_fileter_condition(code, d[1], round(float(d[2]), 2), d[3], d[4], d[5])
        logger_l2_codes_subscript.info("华鑫L2订阅总数:{}", len(codes))
        add_codes = codes - self.subscripted_codes
        del_codes = self.subscripted_codes - codes
        print("add del codes", add_codes, del_codes)
        try:
            for c in del_codes:
                self.l2_data_upload_manager.release_distributed_upload_queue(c)
                l2_data_manager_v2.del_target_code(c)
            for c in codes:
                self.l2_data_upload_manager.distribute_upload_queue(c, codes)
                l2_data_manager_v2.add_target_code(c)
        except Exception as e:  # TODO 清除原来还没释放掉的数据
            logger_system.error(f"L2代码分配上传队列出错:{str(e)}")
            logger_system.exception(e)
        self.__subscribe(add_codes)
        self.__unsubscribe(del_codes)
        if add_codes:
            logger_system.info(f"新增L2订阅代码数量({'缓存' if from_cache else ''}):{len(add_codes)}")
            for c in add_codes:
                logger_l2_codes_subscript.info(f"l2委托数据过滤条件:{c} - {self.codes_volume_and_price_dict.get(c)}")
        logger_l2_codes_subscript.info("华鑫L2订阅结束,add-{} del-{}", len(add_codes), len(del_codes))
        # 设置最近的代码列表
        self.latest_codes_set = codes
    # 订阅代码,[(代码,最低手数,涨停价)]
    def set_codes_data(self, codes_data):
        try:
            self.__process_codes_data(codes_data)
        except Exception as e:
            logger_l2_codes_subscript.exception(e)
        finally:
            # 保存一份最新的数据
            self.__set_latest_datas(codes_data)
    @classmethod
    def __set_latest_datas(cls, codes_data):
        data_str = json.dumps([tool.get_now_date_str(), codes_data])
        with open(constant.L2_CODES_INFO_PATH, mode='w') as f:
            f.write(data_str)
    @classmethod
    def __get_latest_datas(cls):
        if os.path.exists(constant.L2_CODES_INFO_PATH):
            with open(constant.L2_CODES_INFO_PATH, mode='r') as f:
                str_ = f.readline()
                data_json = json.loads(str_)
                if data_json[0] == tool.get_now_date_str():
                    return data_json[1]
        return []
    def OnFrontConnected(self):
        print("OnFrontConnected")
        logger_system.info(f"l2_client OnFrontConnected 线程ID:{tool.get_thread_id()}")
        logout_req = lev2mdapi.CTORATstpUserLogoutField()
        self.__api.ReqUserLogout(logout_req, 1)
        time.sleep(1)
        # 请求登录
        login_req = lev2mdapi.CTORATstpReqUserLoginField()
        self.__api.ReqUserLogin(login_req, 2)
    def OnRspUserLogin(self, pRspUserLogin, pRspInfo, nRequestID, bIsLast):
        print("OnRspUserLogin: ErrorID[%d] ErrorMsg[%s] RequestID[%d] IsLast[%d]" % (
            pRspInfo['ErrorID'], pRspInfo['ErrorMsg'], nRequestID, bIsLast))
        if pRspInfo['ErrorID'] == 0:
            print("----L2行情登录成功----")
            self.is_login = True
            logger_system.info(f"L2行情登录成功")
            # 初始设置值
            if tool.trade_time_sub(tool.get_now_time_str(), "09:20:00") > 0:
                threading.Thread(
                    target=lambda: self.__process_codes_data(self.__get_latest_datas(), from_cache=True, delay=60),
                    daemon=True).start()
    def OnRspSubMarketData(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
        print("OnRspSubMarketData")
    def OnRspSubIndex(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
        print("OnRspSubIndex")
    def OnRspSubTransaction(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
        print("OnRspSubTransaction")
    def OnRspSubOrderDetail(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
        print("OnRspSubOrderDetail", pRspInfo)
        # try:
        print("订阅结果:", pSpecificSecurity["ExchangeID"], pSpecificSecurity["SecurityID"], pRspInfo["ErrorID"],
              pRspInfo["ErrorMsg"])
        async_log_util.info(logger_local_huaxin_l2_subscript,
                            f"订阅结果:{pSpecificSecurity['SecurityID']} {pRspInfo['ErrorID']} {pRspInfo['ErrorMsg']}")
        if pRspInfo["ErrorID"] == 0:
            print("订阅成功")
            self.subscripted_codes.add(pSpecificSecurity['SecurityID'])
            # 初始化
            SubscriptDefend.set_l2_market_update(pSpecificSecurity['SecurityID'])
        if bIsLast == 1:
            print("订阅响应结束", self.subscripted_codes)
            l2_data_manager_v2.add_subscript_codes(self.subscripted_codes)
    def OnRspUnSubOrderDetail(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
        print("OnRspUnSubOrderDetail", bIsLast)
        try:
            code = pSpecificSecurity['SecurityID']
            self.subscripted_codes.discard(code)
            if bIsLast == 1:
                print("取消订阅响应结束", self.subscripted_codes)
                l2_data_manager_v2.add_subscript_codes(self.subscripted_codes)
        except Exception as e:
            logging.exception(e)
    def OnRspSubNGTSTick(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
        async_log_util.info(logger_local_huaxin_l2_subscript,
                            f"NGTS订阅结果:{pSpecificSecurity['SecurityID']} {pRspInfo['ErrorID']} {pRspInfo['ErrorMsg']}")
        if pRspInfo["ErrorID"] == 0:
            print("订阅成功")
            self.subscripted_codes.add(pSpecificSecurity['SecurityID'])
            # 初始化
            SubscriptDefend.set_l2_market_update(pSpecificSecurity['SecurityID'])
        if bIsLast == 1:
            print("订阅响应结束", self.subscripted_codes)
            l2_data_manager_v2.add_subscript_codes(self.subscripted_codes)
    def OnRspUnSubNGTSTick(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
        try:
            code = pSpecificSecurity['SecurityID']
            logger_local_huaxin_l2_subscript.info(f"NGTS取消订阅:{code}")
            self.subscripted_codes.discard(code)
            if bIsLast == 1:
                print("取消订阅响应结束", self.subscripted_codes)
                l2_data_manager_v2.add_subscript_codes(self.subscripted_codes)
        except Exception as e:
            logging.exception(e)
    def OnRspSubBondMarketData(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
        print("OnRspSubBondMarketData")
    def OnRspSubBondTransaction(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
        print("OnRspSubBondTransaction")
    def OnRspSubBondOrderDetail(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
        print("OnRspSubBondOrderDetail")
    def OnRspSubXTSMarketData(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
        print("OnRspSubXTSMarketData")
    def OnRspSubXTSTick(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
        print("OnRspSubXTSTick")
    def OnRtnMarketData(self, pDepthMarketData, FirstLevelBuyNum, FirstLevelBuyOrderVolumes, FirstLevelSellNum,
                        FirstLevelSellOrderVolumes):
        # 传入:时间,现价,成交总量,买1,买2,买3,买4,买5,卖1,卖2,卖3,卖4,卖5
        try:
            buys = [(pDepthMarketData['BidPrice1'], pDepthMarketData['BidVolume1']),
                    (pDepthMarketData['BidPrice2'], pDepthMarketData['BidVolume2']),
                    (pDepthMarketData['BidPrice3'], pDepthMarketData['BidVolume3']),
                    (pDepthMarketData['BidPrice4'], pDepthMarketData['BidVolume4']),
                    (pDepthMarketData['BidPrice5'], pDepthMarketData['BidVolume5'])]
            for i in range(6, 11):
                if not pDepthMarketData[f"BidVolume{i}"]:
                    break
                buys.append((pDepthMarketData[f'BidPrice{i}'], pDepthMarketData[f'BidVolume{i}']))
            sells = [
                (pDepthMarketData['AskPrice1'], pDepthMarketData['AskVolume1']),
                (pDepthMarketData['AskPrice2'], pDepthMarketData['AskVolume2']),
                (pDepthMarketData['AskPrice3'], pDepthMarketData['AskVolume3']),
                (pDepthMarketData['AskPrice4'], pDepthMarketData['AskVolume4']),
                (pDepthMarketData['AskPrice5'], pDepthMarketData['AskVolume5'])
            ]
            for i in range(6, 11):
                if not pDepthMarketData[f"AskVolume{i}"]:
                    break
                sells.append((pDepthMarketData[f'AskPrice{i}'], pDepthMarketData[f'AskVolume{i}']))
            d = {"dataTimeStamp": pDepthMarketData['DataTimeStamp'], "securityID": pDepthMarketData['SecurityID'],
                 "lastPrice": pDepthMarketData['LastPrice'],
                 "totalVolumeTrade": pDepthMarketData['TotalVolumeTrade'],
                 "totalValueTrade": pDepthMarketData['TotalValueTrade'],
                 "totalAskVolume": pDepthMarketData['TotalAskVolume'],
                 "avgAskPrice": pDepthMarketData["AvgAskPrice"],
                 "buy": buys,
                 "sell": sells}
            self.l2_data_upload_manager.add_market_data(d)
            SubscriptDefend.set_l2_market_update(pDepthMarketData['SecurityID'])
        except:
            pass
    def OnRtnIndex(self, pIndex):
        # 输出指数行情数据
        print(
            "OnRtnIndex SecurityID[%s] LastIndex[%.2f] LowIndex[%.2f] HighIndex[%.2f] TotalVolumeTraded[%d] Turnover[%.2f]" % (
                pIndex['SecurityID'],
                pIndex['LastIndex'],
                pIndex['LowIndex'],
                pIndex['HighIndex'],
                pIndex['TotalVolumeTraded'],
                pIndex['Turnover']))
    def OnRtnTransaction(self, pTransaction):
        code = str(pTransaction['SecurityID'])
        # min_volume, limit_up_price = self.codes_volume_and_price_dict.get(code)
        # 输出逐笔成交数据
        if pTransaction['ExecType'] == b"2":
            # 撤单
            item = {"SecurityID": pTransaction['SecurityID'], "Price": pTransaction['TradePrice'],
                    "Volume": pTransaction['TradeVolume'],
                    "OrderType": "2",
                    "OrderTime": pTransaction['TradeTime'], "MainSeq": pTransaction['MainSeq'],
                    "SubSeq": pTransaction['SubSeq'],
                    "OrderStatus": "D"}
            buyNo = pTransaction['BuyNo']
            sellNo = pTransaction['SellNo']
            if buyNo > 0:
                # 买
                item["OrderNO"] = buyNo
                item["Side"] = "1"
            elif sellNo > 0:
                # 卖
                item["OrderNO"] = sellNo
                item["Side"] = "2"
            self.l2_data_upload_manager.add_l2_order_detail(item, 0, True)
        else:
            # if abs(pTransaction['TradePrice'] - limit_up_price) < 0.201:
            # 涨停价
            # 成交
            item = {"SecurityID": pTransaction['SecurityID'], "TradePrice": pTransaction['TradePrice'],
                    "TradeVolume": pTransaction['TradeVolume'],
                    "OrderTime": pTransaction['TradeTime'], "MainSeq": pTransaction['MainSeq'],
                    "SubSeq": pTransaction['SubSeq'], "BuyNo": pTransaction['BuyNo'],
                    "SellNo": pTransaction['SellNo'],
                    "ExecType": pTransaction['ExecType'].decode()}
            self.l2_data_upload_manager.add_transaction_detail(item)
    def OnRtnOrderDetail(self, pOrderDetail):
        # 上证OrderStatus=b"D"表示撤单
        item = {"SecurityID": pOrderDetail['SecurityID'], "Price": pOrderDetail['Price'],
                "Volume": pOrderDetail['Volume'],
                "Side": pOrderDetail['Side'].decode(), "OrderType": pOrderDetail['OrderType'].decode(),
                "OrderTime": pOrderDetail['OrderTime'], "MainSeq": pOrderDetail['MainSeq'],
                "SubSeq": pOrderDetail['SubSeq'], "OrderNO": pOrderDetail['OrderNO'],
                "OrderStatus": pOrderDetail['OrderStatus'].decode()}
        self.l2_data_upload_manager.add_l2_order_detail(item, 0)
    def OnRtnNGTSTick(self, pTick):
        """
        上证股票的逐笔委托与成交
        @param pTick:
        @return:
        """
        try:
            if pTick['TickType'] == b'T':
                # 成交
                item = {"SecurityID": pTick['SecurityID'], "TradePrice": pTick['Price'],
                        "TradeVolume": pTick['Volume'],
                        "OrderTime": pTick['TickTime'], "MainSeq": pTick['MainSeq'],
                        "SubSeq": pTick['SubSeq'], "BuyNo": pTick['BuyNo'],
                        "SellNo": pTick['SellNo'],
                        "ExecType": '1'}
                self.l2_data_upload_manager.add_transaction_detail(item)
            elif pTick['TickType'] == b'A' or pTick['TickType'] == b'D':
                # 撤单
                item = {"SecurityID": pTick['SecurityID'], "Price": pTick['Price'],
                        "Volume": pTick['Volume'],
                        "Side": pTick['Side'].decode(), "OrderType": pTick['TickType'].decode(),
                        "OrderTime": pTick['TickTime'], "MainSeq": pTick['MainSeq'],
                        "SubSeq": pTick['SubSeq'], "OrderNO": '',
                        "OrderStatus": pTick['TickType'].decode()}
                if pTick['Side'] == b'1':
                    item['OrderNO'] = pTick['BuyNo']
                elif pTick['Side'] == b'2':
                    item['OrderNO'] = pTick['SellNo']
                self.l2_data_upload_manager.add_l2_order_detail(item, 0)
        except Exception as e:
            logger_local_huaxin_l2_subscript.exception(e)
    def OnRtnBondMarketData(self, pDepthMarketData, FirstLevelBuyNum, FirstLevelBuyOrderVolumes, FirstLevelSellNum,
                            FirstLevelSellOrderVolumes):
        # 输出行情快照数据
        print(
            "OnRtnBondMarketData SecurityID[%s] LastPrice[%.2f] TotalVolumeTrade[%d] TotalValueTrade[%.2f] BidPrice1[%.2f] BidVolume1[%d] AskPrice1[%.2f] AskVolume1[%d]" % (
                pDepthMarketData['SecurityID'],
                pDepthMarketData['LastPrice'],
                pDepthMarketData['TotalValueTrade'],
                pDepthMarketData['TotalValueTrade'],
                pDepthMarketData['BidPrice1'],
                pDepthMarketData['BidVolume1'],
                pDepthMarketData['AskPrice1'],
                pDepthMarketData['AskVolume1']))
        # 输出一档价位买队列前50笔委托数量
        for buy_index in range(0, FirstLevelBuyNum):
            print("first level buy  [%d] : [%d]" % (buy_index, FirstLevelBuyOrderVolumes[buy_index]))
        # 输出一档价位卖队列前50笔委托数量
        for sell_index in range(0, FirstLevelSellNum):
            print("first level sell [%d] : [%d]" % (sell_index, FirstLevelSellOrderVolumes[sell_index]))
    def OnRtnBondTransaction(self, pTransaction):
        # 输出逐笔成交数据
        print(
            "OnRtnBondTransaction SecurityID[%s] TradePrice[%.2f] TradeVolume[%d] TradeTime[%d] MainSeq[%d] SubSeq[%d] BuyNo[%d] SellNo[%d] ExecType[%d]" % (
                pTransaction['SecurityID'],
                pTransaction['TradePrice'],
                pTransaction['TradeVolume'],
                pTransaction['TradeTime'],
                pTransaction['MainSeq'],
                pTransaction['SubSeq'],
                pTransaction['BuyNo'],
                pTransaction['SellNo'],
                pTransaction['ExecType'],
            ))
    def OnRtnBondOrderDetail(self, pOrderDetail):
        # 输出逐笔委托数据
        print(
            "OnRtnBondOrderDetail SecurityID[%s] Price[%.2f] Volume[%d] Side[%s] OrderType[%s] OrderTime[%d] MainSeq[%d] SubSeq[%d]" % (
                pOrderDetail['SecurityID'],
                pOrderDetail['Price'],
                pOrderDetail['Volume'],
                pOrderDetail['Side'],
                pOrderDetail['OrderType'],
                pOrderDetail['OrderTime'],
                pOrderDetail['MainSeq'],
                pOrderDetail['SubSeq']))
    def OnRtnXTSMarketData(self, pDepthMarketData, FirstLevelBuyNum, FirstLevelBuyOrderVolumes, FirstLevelSellNum,
                           FirstLevelSellOrderVolumes):
        # 输出行情快照数据
        print(
            "OnRtnXTSMarketData SecurityID[%s] LastPrice[%.2f] TotalVolumeTrade[%d] TotalValueTrade[%.2f] BidPrice1[%.2f] BidVolume1[%d] AskPrice1[%.2f] AskVolume1[%d]" % (
                pDepthMarketData['SecurityID'],
                pDepthMarketData['LastPrice'],
                pDepthMarketData['TotalValueTrade'],
                pDepthMarketData['TotalValueTrade'],
                pDepthMarketData['BidPrice1'],
                pDepthMarketData['BidVolume1'],
                pDepthMarketData['AskPrice1'],
                pDepthMarketData['AskVolume1']))
        # 输出一档价位买队列前50笔委托数量
        for buy_index in range(0, FirstLevelBuyNum):
            print("first level buy  [%d] : [%d]" % (buy_index, FirstLevelBuyOrderVolumes[buy_index]))
        # 输出一档价位卖队列前50笔委托数量
        for sell_index in range(0, FirstLevelSellNum):
            print("first level sell [%d] : [%d]" % (sell_index, FirstLevelSellOrderVolumes[sell_index]))
class SubscriptDefend:
    """
    订阅守护
    定义:当订阅的代码超过一定时间没有回调数据时重新订阅
    """
    __l2_market_update_time = {}
    @classmethod
    def set_l2_market_update(cls, code):
        cls.__l2_market_update_time[code] = time.time()
    @classmethod
    def run(cls):
        while True:
            try:
                now_time = tool.get_now_time_as_int()
                if now_time < int("093015"):
                    continue
                if int("112945") < now_time < int("130015"):
                    continue
                if int("145645") < now_time:
                    continue
                if spi.subscripted_codes:
                    codes = []
                    for code in spi.subscripted_codes:
                        # 获取上次更新时间
                        update_time = cls.__l2_market_update_time.get(code)
                        if update_time and time.time() - update_time > 15:
                            # 需要重新订阅
                            codes.append(code)
                    if codes:
                        logger_debug.info(f"重新订阅:{codes}")
                        spi.subscribe_codes(codes)
            except:
                pass
            finally:
                time.sleep(15)
class MyL2ActionCallback(L2ActionCallback):
    def OnSetL2Position(self, codes_data):
        huaxin_l2_log.info(logger_l2_codes_subscript, "华鑫L2代码处理队列获取到数据:数量-{}", len(codes_data))
        try:
            spi.set_codes_data(codes_data)
        except Exception as e:
            logging.exception(e)
def __init_l2(l2_data_upload_manager):
    print(lev2mdapi.CTORATstpLev2MdApi_GetApiVersion())
    # case 1: Tcp方式
    # g_SubMode=lev2mdapi.TORA_TSTP_MST_TCP
    # case 2: 组播方式
    g_SubMode = lev2mdapi.TORA_TSTP_MST_MCAST
    # case 1缓存模式
    api = lev2mdapi.CTORATstpLev2MdApi_CreateTstpLev2MdApi(g_SubMode, True)
    # case 2非缓存模式
    # api = lev2mdapi.CTORATstpLev2MdApi_CreateTstpLev2MdApi(g_SubMode, False)
    global spi
    spi = Lev2MdSpi(api, l2_data_upload_manager)
    api.RegisterSpi(spi)
    # -------------------正式模式-------------------------------------
    if g_SubMode != lev2mdapi.TORA_TSTP_MST_MCAST:
        api.RegisterFront(Front_Address)
    else:
        # case 1 从一个组播地址收取行情
        api.RegisterMulticast(Multicast_Address, Local_Interface_Address, "")
        # api.RegisterMulticast(Multicast_Address2, Local_Interface_Address, "")
    # case 2:注册多个组播地址同时收行情
    # api.RegisterMulticast(Multicast_Address, Local_Interface_Address, "");
    # api.RegisterMulticast(Multicast_Address2, Local_Interface_Address, "");
    # case 3:efvi模式收行情
    # api.RegisterMulticast(Multicast_Address, Local_Interface_Address, "", "enp101s0f0",4096, True);
    # case 1 不绑核运行
    api.Init()
__l2_cmd_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=3)
def __receive_from_queue_trade(queue_trade_w_l2_r: multiprocessing.Queue):
    logger_system.info(f"l2_client __receive_from_pipe_trade 线程ID:{tool.get_thread_id()}")
    while True:
        try:
            value = queue_trade_w_l2_r.get()
            if value:
                if type(value) == bytes:
                    value = value.decode("utf-8")
                data = json.loads(value)
                _type = data["type"]
                if _type == "l2_cmd":
                    __start_time = time.time()
                    # 线程池
                    __l2_cmd_thread_pool.submit(
                        lambda: l2CommandManager.process_command(command_manager.CLIENT_TYPE_CMD_L2, None, data))
                    use_time = time.time() - __start_time
                    if use_time > 0.005:
                        huaxin_l2_log.info(logger_local_huaxin_l2_subscript, f"l2_cmd耗时:{use_time}s")
        except Exception as e:
            logging.exception(e)
pipe_strategy = None
def test_add_codes(queue_r):
    time.sleep(10)
    # if value:
    #     if type(value) == bytes:
    #         value = value.decode("utf-8")
    #     data = json.loads(value)
    #     _type = data["type"]
    #     if _type == "listen_volume":
    #         volume = data["data"]["volume"]
    #         code = data["data"]["code"]
    #         spi.set_code_special_watch_volume(code, volume)
    #     elif _type == "l2_cmd":
    #         l2CommandManager.process_command(command_manager.CLIENT_TYPE_CMD_L2, None, data)
    demo_datas = [("603002", int(50 * 10000 / 6.35), 6.35, 6.00, 200),
                  ("002654", int(50 * 10000 / 15.59), 15.59, 15.3, 200),
                  ("603701", int(50 * 10000 / 14.28), 14.28, 14.00, 200),
                  ("002908", int(50 * 10000 / 12.78), 12.78, 12.00, 200)]
    queue_r.put_nowait(json.dumps({"type": "l2_cmd", "data": [demo_datas[0]]}))
    time.sleep(10)
    while True:
        try:
            spi.l2_data_upload_manager.add_l2_order_detail(
                {'SecurityID': '603002', 'Price': 6.35, 'Volume': 275000, 'Side': "1", 'OrderType': '0',
                 'OrderTime': '13000015',
                 'MainSeq': 2, 'SubSeq': 6739147, 'OrderNO': 5512466, 'OrderStatus': 'D'}, 0)
            spi.l2_data_upload_manager.add_l2_order_detail(
                {'SecurityID': '603002', 'Price': 6.35, 'Volume': 200, 'Side': "1", 'OrderType': '0',
                 'OrderTime': '13000015',
                 'MainSeq': 2, 'SubSeq': 6739148, 'OrderNO': 5512467, 'OrderStatus': 'D'}, 0)
            # queue_r.put_nowait(json.dumps({"type": "listen_volume", "data": {"code": "603002", "volume": 100}}))
            time.sleep(0.1)
            spi.l2_data_upload_manager.add_l2_order_detail(
                {'SecurityID': '603002', 'Price': 6.35, 'Volume': 100, 'Side': "1", 'OrderType': '0',
                 'OrderTime': '13000015',
                 'MainSeq': 2, 'SubSeq': 6739148, 'OrderNO': 5512467, 'OrderStatus': 'D'}, 0)
        except Exception as e:
            logging.exception(e)
        finally:
            time.sleep(10)
def run(queue_r: multiprocessing.Queue, queue_data_callback: multiprocessing.Queue, channel_list: list) -> None:
    """
    运行
    @param queue_r:
    @param queue_data_callback: 低频数据回调队列
    @param channel_list: [((编号,multiprocessing.Array, zmq_address),(编号, multiprocessing.Array, zmq_address))]
    @return:
    """
    logger_system.info("L2进程ID:{}", os.getpid())
    logger_system.info(f"l2_client 线程ID:{tool.get_thread_id()}")
    try:
        log.close_print()
        if queue_r is not None:
            t1 = threading.Thread(target=lambda: __receive_from_queue_trade(queue_r), daemon=True)
            t1.start()
        # 订阅守护
        threading.Thread(target=SubscriptDefend.run, daemon=True).start()
        # 初始化
        data_channel_distribute_manager = CodeDataChannelDistributeManager(channel_list)
        l2_data_upload_manager = L2DataUploadManager(data_channel_distribute_manager)
        __init_l2(l2_data_upload_manager)
        l2_data_manager_v2.run_upload_common()
        l2_data_manager_v2.run_log()
        # TODO 测试
        # threading.Thread(target=lambda: test_add_codes(queue_r), daemon=True).start()
        global l2CommandManager
        l2CommandManager = command_manager.L2CommandManager()
        l2CommandManager.init(MyL2ActionCallback())
        logger_system.info("L2订阅服务启动成功")
    except Exception as e:
        logger_system.exception(e)
    while True:
        time.sleep(2)
def test():
    def test_add_codes():
        time.sleep(5)
        # if value:
        #     if type(value) == bytes:
        #         value = value.decode("utf-8")
        #     data = json.loads(value)
        #     _type = data["type"]
        #     if _type == "listen_volume":
        #         volume = data["data"]["volume"]
        #         code = data["data"]["code"]
        #         spi.set_code_special_watch_volume(code, volume)
        #     elif _type == "l2_cmd":
        #         l2CommandManager.process_command(command_manager.CLIENT_TYPE_CMD_L2, None, data)
        demo_datas = [("603002", int(50 * 10000 / 6.35), 6.35), ("002654", int(50 * 10000 / 15.59), 15.59),
                      ("603701", int(50 * 10000 / 14.28), 14.28), ("002908", int(50 * 10000 / 12.78), 12.78)]
        queue_r.put_nowait(json.dumps({"type": "l2_cmd", "data": [demo_datas[0]]}))
        time.sleep(1)
        spi.l2_data_upload_manager.add_l2_order_detail(
            {'SecurityID': '603002', 'Price': 6.35, 'Volume': 275000, 'Side': "1", 'OrderType': '0',
             'OrderTime': '13000015',
             'MainSeq': 2, 'SubSeq': 6739147, 'OrderNO': 5512466, 'OrderStatus': 'D'}, 0)
        spi.l2_data_upload_manager.add_l2_order_detail(
            {'SecurityID': '603002', 'Price': 6.35, 'Volume': 200, 'Side': "1", 'OrderType': '0',
             'OrderTime': '13000015',
             'MainSeq': 2, 'SubSeq': 6739148, 'OrderNO': 5512467, 'OrderStatus': 'D'}, 0)
        queue_r.put_nowait(json.dumps({"type": "listen_volume", "data": {"code": "603002", "volume": 100}}))
        time.sleep(0.1)
        spi.l2_data_upload_manager.add_l2_order_detail(
            {'SecurityID': '603002', 'Price': 6.35, 'Volume': 100, 'Side': "1", 'OrderType': '0',
             'OrderTime': '13000015',
             'MainSeq': 2, 'SubSeq': 6739148, 'OrderNO': 5512467, 'OrderStatus': 'D'}, 0)
    queue_r = multiprocessing.Queue()
    order_queues = []
    transaction_queues = []
    market_queue = multiprocessing.Queue()
    for i in range(20):
        order_queues.append(multiprocessing.Queue())
        transaction_queues.append(multiprocessing.Queue())
    threading.Thread(target=test_add_codes).start()
    run(queue_r, order_queues, transaction_queues, market_queue)
if __name__ == "__main__":
    input()
huaxin_client/l2_data_manager_v2.py
New file
@@ -0,0 +1,471 @@
# -*- coding: utf-8 -*-
import json
import logging
import marshal
import queue
import threading
import time
import msgpack
import constant
from huaxin_client import socket_util
from huaxin_client.client_network import SendResponseSkManager
# 活动时间
from huaxin_client.code_queue_distribute_manager import CodeDataChannelDistributeManager
from log_module import async_log_util
from log_module.async_log_util import huaxin_l2_log
from log_module.log import logger_local_huaxin_l2_error, logger_system, logger_local_huaxin_l2_subscript, \
    logger_local_huaxin_l2_special_volume, logger_debug
from utils import tool, shared_memery_util
import collections
import zmq
order_detail_upload_active_time_dict = {}
transaction_upload_active_time_dict = {}
# 临时数据
tmep_order_detail_queue_dict = {}
tmep_transaction_queue_dict = {}
target_codes = set()
target_codes_add_time = {}
common_queue = queue.Queue(maxsize=1000)
# L2上传数据管理器
class L2DataUploadManager:
    TYPE_DELEGATE = 1
    TYPE_TRANSACTION = 2
    TYPE_MARKET = 3
    def __init__(self, data_channel_distribute_manager: CodeDataChannelDistributeManager):
        self.data_channel_distribute_manager = data_channel_distribute_manager
        # 代码分配的对象
        self.temp_order_queue_dict = {}
        self.temp_transaction_queue_dict = {}
        self.temp_log_queue_dict = {}
        self.filter_order_condition_dict = {}
        self.upload_l2_data_task_dict = {}
        self.l2_order_codes = set()
        self.l2_transaction_codes = set()
    # 设置订单过滤条件
    # special_price:过滤的1手的价格
    def set_order_fileter_condition(self, code, min_volume, limit_up_price, shadow_price, buy_volume, special_volumes):
        if not special_volumes:
            special_volumes = set()
        # if code not in self.filter_order_condition_dict:
        try:
            # (最小的量, 涨停价格, 影子单价格, 买的量, 废弃使用, 特殊的量集合)
            self.filter_order_condition_dict[code] = [(min_volume, limit_up_price, shadow_price, buy_volume,
                                                       int(min_volume) // 50, set(special_volumes))]
            huaxin_l2_log.info(logger_local_huaxin_l2_subscript,
                               f"({code})常规过滤条件设置:{self.filter_order_condition_dict[code]}")
        except Exception as e:
            logger_debug.error(f"{str(e)} - min_volume-{min_volume}")
    # 过滤订单
    def __filter_order(self, item):
        filter_condition = self.filter_order_condition_dict.get(item[0])
        if filter_condition:
            # item[2]为量
            if item[2] >= filter_condition[0][0]:
                return item
            # 1手的买单满足价格
            # if item[2] == 100 and abs(filter_condition[0][2] - item[1]) < 0.001:
            #     return item
            # 买量
            if item[2] == filter_condition[0][3]:
                return item
            # 所有的涨停卖
            if item[3] != '1':
                # 卖与卖撤
                if abs(item[1] - filter_condition[0][1]) < 0.001:
                    # 涨停价
                    return item
            else:
                if item[2] in filter_condition[0][5]:
                    # 特殊手数
                    return item
            return None
        return item
        # 过滤订单
    def __filter_transaction(self, item):
        filter_condition = self.filter_order_condition_dict.get(item[0])
        if filter_condition:
            # item[2]为量
            if abs(item[1] - filter_condition[0][1]) < 0.201:
                return item
            return None
        return item
    # 添加委托详情
    def add_l2_order_detail(self, data, start_time=0, istransaction=False):
        code = data["SecurityID"]
        # 不直接加入
        # queue_info = self.order_queue_distribute_manager.get_distributed_queue(code)
        # if not queue_info:
        #     return
        # queue_info[1].put_nowait(
        #     (data['SecurityID'], data['Price'], data['Volume'], data['Side'], data['OrderType'], data['OrderTime'],
        #      data['MainSeq'], data['SubSeq'], data['OrderNO'], data['OrderStatus'], time.time(), start_time))
        # if data['Volume'] == 100:
        #     log_queue = self.temp_log_queue_dict.get(code)
        #     if log_queue:
        #         log_queue.put_nowait(data)
        q: collections.deque = self.temp_order_queue_dict.get(code)
        if q is not None:
            q.append(
                (data['SecurityID'], data['Price'], data['Volume'], data['Side'], data['OrderType'], data['OrderTime'],
                 data['MainSeq'], data['SubSeq'], data['OrderNO'], data['OrderStatus'], time.time(), start_time))
    # 添加逐笔成交
    def add_transaction_detail(self, data):
        code = data["SecurityID"]
        # 不直接加入
        # queue_info = self.transaction_queue_distribute_manager.get_distributed_queue(code)
        # if not queue_info:
        #     return
        # # 判断是否为大单成交
        # queue_info[1].put_nowait((data['SecurityID'], data['TradePrice'], data['TradeVolume'],
        #                           data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'],
        #                           data['SellNo'], data['ExecType']))
        q: collections.deque = self.temp_transaction_queue_dict.get(code)
        if q is not None:
            q.append((data['SecurityID'], data['TradePrice'], data['TradeVolume'],
                      data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'],
                      data['SellNo'], data['ExecType'], time.time()))
    def add_market_data(self, data):
        # 加入上传队列
        # self.market_data_queue.put_nowait(data)
        code = data['securityID']
        # TODO 改为zmq发送
        callback = self.data_channel_distribute_manager.get_distributed_channel(code)
        if callback:
            callback.OnMarketData(code, data)
    # 分配上传队列
    def distribute_upload_queue(self, code, _target_codes=None):
        """
        分配上传队列
        @param code: 代码
        @param _target_codes: 所有的目标代码
        @return:
        """
        if not self.data_channel_distribute_manager.get_distributed_channel(code):
            self.data_channel_distribute_manager.distribute_channel(code, _target_codes)
        if code not in self.temp_order_queue_dict:
            self.temp_order_queue_dict[code] = collections.deque()
        if code not in self.temp_transaction_queue_dict:
            self.temp_transaction_queue_dict[code] = collections.deque()
        if code not in self.temp_log_queue_dict:
            self.temp_log_queue_dict[code] = queue.Queue(maxsize=1000)
        if code not in self.upload_l2_data_task_dict:
            t1 = threading.Thread(target=lambda: self.__run_upload_order_task(code), daemon=True)
            t1.start()
            t2 = threading.Thread(target=lambda: self.__run_upload_transaction_task(code), daemon=True)
            t2.start()
            # t3 = threading.Thread(target=lambda: self.__run_log_task(code), daemon=True)
            # t3.start()
            self.upload_l2_data_task_dict[code] = (t1, t2)
        # 释放已经分配的队列
    def release_distributed_upload_queue(self, code):
        self.data_channel_distribute_manager.release_distribute_channel(code)
        if code in self.temp_order_queue_dict:
            self.temp_order_queue_dict[code].clear()
            self.temp_order_queue_dict.pop(code)
        if code in self.temp_transaction_queue_dict:
            self.temp_transaction_queue_dict[code].clear()
            self.temp_transaction_queue_dict.pop(code)
        if code in self.temp_log_queue_dict:
            self.temp_log_queue_dict.pop(code)
        if code in self.upload_l2_data_task_dict:
            self.upload_l2_data_task_dict.pop(code)
    def __upload_l2_data(self, code, _queue, datas):
        _queue.put_nowait(marshal.dumps([code, datas, time.time()]))
    # 处理订单数据并上传
    def __run_upload_order_task(self, code):
        q: collections.deque = self.temp_order_queue_dict.get(code)
        temp_list = []
        while True:
            try:
                while len(q) > 0:
                    data = q.popleft()
                    # 前置数据处理,过滤掉无用的数据
                    data = self.__filter_order(data)
                    if data:
                        temp_list.append(data)
                if temp_list:
                    # 上传数据
                    # self.__upload_l2_data(code, upload_queue, temp_list)
                    # self.__upload_l2_order_data(code, temp_list)
                    __start_time = time.time()
                    last_data = temp_list[-1]
                    shared_memery_number, m_array, zmq_host = \
                        self.data_channel_distribute_manager.get_distributed_channel(code)[0]
                    # 数据填充到内存
                    shared_memery_util.set_datas(m_array,
                                                 (code, temp_list, time.time()))
                    # 通知获取数据
                    _socket = self.data_channel_distribute_manager.get_zmq_socket(zmq_host)
                    _socket.send(
                        msgpack.packb({"type": self.TYPE_DELEGATE, "data": {"memery_number": shared_memery_number}}))
                    _socket.recv_string()
                    use_time = time.time() - __start_time
                    if use_time > 0.01:
                        # 记录10ms以上的数据
                        huaxin_l2_log.info(logger_local_huaxin_l2_error, f"耗时:{use_time}s  结束数据:{last_data}")
                    temp_list = []
                else:
                    if code not in self.temp_order_queue_dict:
                        self.l2_order_codes.discard(code)
                        break
                    self.l2_order_codes.add(code)
                    time.sleep(0.001)
            except Exception as e:
                logging.exception(e)
            finally:
                temp_list.clear()
    # 处理成交数据并上传
    def __run_upload_transaction_task(self, code):
        q: collections.deque = self.temp_transaction_queue_dict.get(code)
        temp_list = []
        while True:
            try:
                while len(q) > 0:
                    data = q.popleft()
                    data = self.__filter_transaction(data)
                    if data:
                        temp_list.append(data)
                if temp_list:
                    # 上传数据
                    # self.__upload_l2_data(code, upload_queue, temp_list)
                    shared_memery_number, m_array, zmq_host = \
                        self.data_channel_distribute_manager.get_distributed_channel(code)[1]
                    # 数据填充到内存
                    shared_memery_util.set_datas(m_array,
                                                 (code, temp_list))
                    # 通知获取数据
                    _socket = self.data_channel_distribute_manager.get_zmq_socket(zmq_host)
                    _socket.send(msgpack.packb({"type": self.TYPE_TRANSACTION, "data": {"memery_number": shared_memery_number}}))
                    _socket.recv_string()
                    temp_list = []
                else:
                    if code not in self.temp_transaction_queue_dict:
                        self.l2_transaction_codes.discard(code)
                        break
                    self.l2_transaction_codes.add(code)
                    time.sleep(0.001)
            except:
                pass
            finally:
                temp_list.clear()
    def __run_log_task(self, code):
        q: queue.Queue = self.temp_log_queue_dict.get(code)
        while True:
            try:
                temp = q.get(timeout=10)
                huaxin_l2_log.info(logger_local_huaxin_l2_special_volume,
                                   f"{temp}")
            except:
                time.sleep(0.02)
            finally:
                if code not in self.temp_log_queue_dict:
                    break
class L2DataUploadProtocolManager:
    # ipchosts IPC协议
    def __init__(self, ipchosts):
        self.ipchosts = ipchosts
        # 所有的client
        self.socket_client_dict = {}
        # 保存代码分配的client 格式:{code:(host, socket)}
        self.code_socket_client_dict = {}
        self.rlock = threading.RLock()
        context = zmq.Context()
        if constant.is_windows():
            return
        for host in self.ipchosts:
            socket = context.socket(zmq.REQ)
            socket.connect(host)
            self.socket_client_dict[host] = socket
    # 获取
    def __get_available_ipchost(self):
        if len(self.code_socket_client_dict) >= len(self.socket_client_dict):
            raise Exception("无可用host")
        used_hosts = set([self.code_socket_client_dict[k][0] for k in self.code_socket_client_dict])
        for host in self.socket_client_dict:
            if host not in used_hosts:
                return host, self.socket_client_dict[host]
        raise Exception("无可用host")
    # 分配HOST
    def distribute_upload_host(self, code):
        if code in self.code_socket_client_dict:
            return
        self.rlock.acquire()
        try:
            host_info = self.__get_available_ipchost()
            if host_info:
                self.code_socket_client_dict[code] = host_info
        finally:
            self.rlock.release()
    def release_distributed_upload_host(self, code):
        if code not in self.code_socket_client_dict:
            return
        self.rlock.acquire()
        try:
            if code in self.code_socket_client_dict:
                self.code_socket_client_dict.pop(code)
        finally:
            self.rlock.release()
    def upload_data_as_json(self, code, data):
        if code not in self.code_socket_client_dict:
            raise Exception("尚未分配host")
        host, socket = self.code_socket_client_dict[code]
        socket.send(marshal.dumps(data))
        socket.recv_string()
def add_target_code(code):
    target_codes.add(code)
    # 记录代码加入时间
    target_codes_add_time[code] = time.time()
def del_target_code(code):
    target_codes.discard(code)
    if code in target_codes_add_time:
        target_codes_add_time.pop(code)
def add_subscript_codes(codes):
    # print("add_subscript_codes", codes)
    # 加入上传队列
    common_queue.put(('', "l2_subscript_codes", list(codes)))
def __send_response(sk, msg):
    msg = socket_util.load_header(msg)
    sk.sendall(msg)
    result, header_str = socket_util.recv_data(sk)
    if result:
        result_json = json.loads(result)
        if result_json.get("code") == 0:
            return True
    return False
# 发送消息
def send_response(type, msg):
    try:
        sk = SendResponseSkManager.get_send_response_sk(type)
        if __send_response(sk, msg):
            return True
        else:
            # 再次发送
            # print("再次发送")
            return __send_response(sk, msg)
    except ConnectionResetError as e:
        SendResponseSkManager.del_send_response_sk(type)
        sk = SendResponseSkManager.get_send_response_sk(type)
        return __send_response(sk, msg)
    except BrokenPipeError as e:
        SendResponseSkManager.del_send_response_sk(type)
        sk = SendResponseSkManager.get_send_response_sk(type)
        return __send_response(sk, msg)
# 上传数据
def upload_data(code, _type, datas, new_sk=False):
    key = f"{_type}_{code}"
    fdata = json.dumps(
        {"type": _type, "data": {"code": code, "data": datas, "time": round(time.time() * 1000)}})
    result = None
    try:
        if new_sk:
            sk = SendResponseSkManager.create_send_response_sk()
            result = __send_response(sk, fdata.encode('utf-8'))
        else:
            result = send_response(key, fdata.encode('utf-8'))
    except Exception as e:
        logging.exception(e)
    finally:
        pass
def __run_upload_common():
    # print("__run_upload_common")
    logger_system.info(f"l2_client __run_upload_common 线程ID:{tool.get_thread_id()}")
    while True:
        try:
            while not common_queue.empty():
                temp = common_queue.get()
                upload_data(temp[0], temp[1], temp[2])
        except Exception as e:
            logger_local_huaxin_l2_error.exception(e)
            logger_local_huaxin_l2_error.error(f"上传普通数据出错:{str(e)}")
        finally:
            time.sleep(0.01)
def __run_log():
    # print("__run_log")
    logger_system.info(f"l2_client __run_log 线程ID:{tool.get_thread_id()}")
    async_log_util.huaxin_l2_log.run_sync()
# 采用socket传输数据
def run_upload_common():
    t = threading.Thread(target=lambda: __run_upload_common(), daemon=True)
    t.start()
def run_log():
    t = threading.Thread(target=lambda: __run_log(), daemon=True)
    t.start()
def __test():
    # 分配数据
    pass
def run_test():
    t = threading.Thread(target=lambda: __test(), daemon=True)
    t.start()
def test():
    ipclist = []
    for i in range(0, 70):
        ipclist.append(f"ipc://l2order{i}.ipc")
    manager = L2DataUploadProtocolManager(ipclist)
    code = "000333"
    manager.distribute_upload_host(code)
    manager.upload_data_as_json(code, {"test": "test"})
l2/l2_data_manager_new.py
@@ -1474,7 +1474,7 @@
                RDCancelBigNumComputer().set_watch_indexes(code, radical_result[4])
            return
        else:
            radical_result = cls.__compute_radical_order_begin_pos_for_many_sell(code, start_index, end_index)
            radical_result = cls.__compute_radical_order_begin_pos_for_many_sell(code, compute_start_index, compute_end_index)
            if radical_result[0]:
                buy_single_index, buy_exec_index = radical_result[0][0], radical_result[0][1]
                buy_volume_rate = cls.volume_rate_info[code][0]
l2/subscript/l2_subscript_manager.py
New file
@@ -0,0 +1,87 @@
"""
L2订阅管理
"""
import math
import multiprocessing
import random
class TargetCodeProcessManager:
    """
    目标代码的进程管理
    """
    def __init__(self, com_queues: list, max_code_count_per_queue_list):
        """
        初始化
        @param com_queues:list<multiprocessing.Queue> 通信队列
        @param max_code_count_per_queue: 每个队列最大的代码数量
        """
        self.__com_queues = com_queues
        self.__max_code_count_per_queue_dict = {id(com_queues[i]): max_code_count_per_queue_list[i] for i in
                                                range(len(com_queues))}
        # 队列ID与队列对象的映射
        self.__com_queue_id_object_dict = {id(q): q for q in com_queues}
        # 队列ID对应的代码,格式:{"队列ID":{"代码1","代码2"}}
        self.__queue_codes = {}
        for q in com_queues:
            self.__queue_codes[id(q)] = set()
        # 代码所在队列ID
        self.__code_queue_dict = {}
    def add_codes(self, codes: set):
        add_codes = codes - self.__code_queue_dict.keys()
        del_codes = self.__code_queue_dict.keys() - codes
        # 删除代码
        if del_codes:
            for code in del_codes:
                if code in self.__code_queue_dict:
                    queue_id = self.__code_queue_dict[code]
                    self.__queue_codes[queue_id].discard(code)
                    self.__code_queue_dict.pop(code)
        # 为新增代码分配队列
        for code in add_codes:
            # 寻找未满的队列
            for queue_id in self.__queue_codes:
                count_per_process = min(self.__max_code_count_per_queue_dict.get(queue_id), math.ceil(len(codes) / len(self.__com_queues)))
                if len(self.__queue_codes[queue_id]) >= count_per_process:
                    # 队列已满
                    continue
                # 队列未满,分配代码
                self.__queue_codes[queue_id].add(code)
                self.__code_queue_dict[code] = queue_id
                break
    def get_queues_with_codes(self):
        """
        获取队列分配的代码
        @return: [(队列对象,{代码集合})]
        """
        results = []
        for queue_id in self.__queue_codes:
            results.append((self.__com_queue_id_object_dict.get(queue_id), self.__queue_codes.get(queue_id)))
        return results
if __name__ == "__main__":
    queues = [multiprocessing.Queue() for i in range(7)]
    manager = TargetCodeProcessManager(queues, 10)
    counts = [70, 60, 50, 10]
    for i in range(4):
        codes = set()
        for i in range(counts[i]):
            code = random.randint(1, 1000000)
            code = str(code).zfill(6)
            codes.add(code)
        print(codes)
        manager.add_codes(codes)
        results = manager.get_queues_with_codes()
        fcodes = set()
        for r in results:
            fcodes |= r[1]
        if codes - fcodes or fcodes - codes:
            print("订阅出错")
        print(results)
main.py
@@ -1,8 +1,11 @@
"""
GUI管理
"""
import math
import psutil
import constant
from code_attribute import gpcode_manager
from log_module import log
from log_module.log import logger_l2_trade, logger_system
@@ -14,13 +17,14 @@
from task import task_manager
from third_data import hx_qc_value_util
from third_data.code_plate_key_manager import KPLPlateForbiddenManager
from utils import shared_memery_util
logger_system.info("程序启动Pre:{}", os.getpid())
import huaxin_client.trade_client
import huaxin_client.l2_client
import huaxin_client.l1_client
from huaxin_client import l2_market_client
from huaxin_client import l2_market_client, l2_client_v2
from servers import server_util, huaxin_trade_server, server
@@ -74,6 +78,41 @@
if __name__ == '__main__1':
    huaxin_client.l2_client.test()
def __create_l2_subscript():
    channel_list = []
    for i in range(constant.HUAXIN_L2_MAX_CODES_COUNT):
        # 创建委托/成交的共享数组和ZMQ通信通道
        delegate_ipc_addr = f"ipc://order_{i}.ipc",
        deal_ipc_addr = f"ipc://deal_{i}.ipc",
        delegate = [0, shared_memery_util.create_array(), delegate_ipc_addr]
        delegate[0] = shared_memery_util.get_number(delegate[1])
        deal = [0, shared_memery_util.create_array(), deal_ipc_addr]
        deal[0] = shared_memery_util.get_number(deal[1])
        channel_list.append((delegate, deal))
    # L2进程数量
    l2_process_count = 8
    base_channel_count = len(channel_list) // l2_process_count
    left_count = len(channel_list) % l2_process_count
    index = 0
    # ======分组======
    # 记录每个分组的数量
    channel_count_list = []
    for i in range(l2_process_count):
        channel_count = base_channel_count + (1 if i < left_count else 0)
        channel_count_list.append(channel_count)
        # 该进程下的通道
        channels = channel_list[index:index + channel_count]
        index += channel_count
        # 订阅信号队列, 数据回调队列(回调频次小的数据通过这种回调)
        sub_single_queue, data_callback_queue = multiprocessing.Queue(), multiprocessing.Queue()
        l2_process = multiprocessing.Process(target=l2_client_v2.run,
                                             args=(sub_single_queue, data_callback_queue, channels,))
        l2_process.start()
if __name__ == '__main__':
    # 可绑定16-31之间的核
    try:
utils/shared_memery_util.py
New file
@@ -0,0 +1,66 @@
"""
共享内存工具
"""
import multiprocessing
import msgpack
# 内容长度数据
__HEADER_CONTENT_LENGTH = 6
# 编号的位数
__HEADER_NUMBER_LENGTH = 4
__start_number = 0
def create_array(length=100 * 10000):
    """
    创建共享内存
    @param length: 长度
    @param with_number: 是否写入编号
    @return:
    """
    global __start_number
    __start_number += 1
    arr = multiprocessing.Array('c', length)
    length_str = str(__start_number).zfill(__HEADER_NUMBER_LENGTH).encode()
    for i in range(__HEADER_NUMBER_LENGTH):
        arr[i] = length_str[i]
    return arr
def get_number(arr: multiprocessing.Array):
    """
    获取编号
    @param arr:
    @return:
    """
    return int(arr[:__HEADER_NUMBER_LENGTH].decode())
def set_datas(arr: multiprocessing.Array, datas):
    """
    将数据设置到共享内存中
    @param arr:
    @param datas:
    @return:
    """
    st = msgpack.packb(datas)
    length = len(st)
    length_str = str(length).zfill(__HEADER_CONTENT_LENGTH).encode()
    for i in range(__HEADER_CONTENT_LENGTH):
        arr[__HEADER_NUMBER_LENGTH + i] = length_str[i]
    arr[
    __HEADER_NUMBER_LENGTH + __HEADER_CONTENT_LENGTH: __HEADER_NUMBER_LENGTH + __HEADER_CONTENT_LENGTH + length] = st
def read_datas(arr: multiprocessing.Array):
    """
    读取数据
    @param arr:
    @return: 原始格式的数据
    """
    length = int(arr[__HEADER_NUMBER_LENGTH:__HEADER_NUMBER_LENGTH + __HEADER_CONTENT_LENGTH].decode())
    content = arr[
              __HEADER_NUMBER_LENGTH + __HEADER_CONTENT_LENGTH:__HEADER_CONTENT_LENGTH + __HEADER_NUMBER_LENGTH + length]
    datas = msgpack.unpackb(content)
    return datas