Administrator
2024-04-18 d5cfa6434b6650c9bbc2a08bea06b83edd6b7398
L2市场行情单独订阅
4个文件已修改
3个文件已添加
319 ■■■■■ 已修改文件
huaxin_client/l2_client.py 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_data_manager.py 17 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_market_client.py 234 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_market_main.py 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_market_main.spec 50 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
log_module/log.py 8 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utils/middle_api_protocol.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_client.py
@@ -369,6 +369,7 @@
            time.sleep(10)
def run(queue_r: multiprocessing.Queue, data_callbacks: list) -> None:
    logger_system.info("L2进程ID:{}", os.getpid())
    logger_system.info(f"l2_client 线程ID:{tool.get_thread_id()}")
huaxin_client/l2_data_manager.py
@@ -6,10 +6,13 @@
import time
# 活动时间
from code_atrribute import gpcode_manager
from huaxin_client.code_queue_distribute_manager import CodeDataCallbackDistributeManager
from log_module.async_log_util import huaxin_l2_log
from log_module.log import logger_local_huaxin_l2_error
import collections
from utils import middle_api_protocol
order_detail_upload_active_time_dict = {}
transaction_upload_active_time_dict = {}
@@ -177,4 +180,16 @@
def add_subscript_codes(codes):
    # 加入上传队列
    common_queue.put(('', "l2_subscript_codes", list(codes)))
    # common_queue.put(('', "l2_subscript_codes", list(codes)))
    # 上传数据
    if codes:
        fresults = []
        for code in codes:
            code_name = gpcode_manager.CodesNameManager.get_code_name(code)
            fresults.append((code, code_name))
        fdata = middle_api_protocol.load_l2_position_subscript_codes(fresults)
        middle_api_protocol.request(fdata)
if __name__ == "__main__":
    add_subscript_codes(["000333"])
huaxin_client/l2_market_client.py
New file
@@ -0,0 +1,234 @@
# -*- coding: utf-8 -*-
import json
import logging
import multiprocessing
import os
import queue
import threading
import time
import concurrent.futures
from typing import List
from huaxin_client import command_manager, l2_data_transform_protocol, l1_subscript_codes_manager
from huaxin_client import constant
from huaxin_client import l2_data_manager
import lev2mdapi
from huaxin_client.code_queue_distribute_manager import CodeQueueDistributeManager, CodeDataCallbackDistributeManager
from huaxin_client.command_manager import L2ActionCallback
from huaxin_client.l2_data_manager import L2DataUploadManager
from huaxin_client.l2_data_transform_protocol import L2DataCallBack
from log_module import log, async_log_util
from log_module.async_log_util import huaxin_l2_log
from log_module.log import logger_local_huaxin_l2_subscript, logger_system, logger_local_huaxin_l2_transaction, \
    logger_local_huaxin_g_cancel, logger_l2_codes_subscript, logger_local_huaxin_l2_market
from utils import tool
###B类###
Front_Address = "tcp://10.0.1.101:6900"
Multicast_Address = "udp://224.224.2.19:7889"
Multicast_Address2 = "udp://224.224.224.234:7890"
Local_Interface_Address = "192.168.84.75"
set_codes_data_queue = queue.Queue()
market_code_dict = {}
class Lev2MdSpi(lev2mdapi.CTORATstpLev2MdSpi):
    latest_codes_set = set()
    special_code_volume_for_order_dict = {}
    # 已经订阅的代码
    subscripted_codes = set()
    # 买入的大单订单号
    def __init__(self, api, l2_data_upload_manager: L2DataUploadManager):
        lev2mdapi.CTORATstpLev2MdSpi.__init__(self)
        self.__api = api
        self.is_login = False
        self.l2_data_upload_manager = l2_data_upload_manager
    def __split_codes(self, codes):
        szse_codes = []
        sse_codes = []
        for code in codes:
            if code.find("00") == 0:
                szse_codes.append(code.encode())
            elif code.find("60") == 0:
                sse_codes.append(code.encode())
        return sse_codes, szse_codes
    def __unsubscribe(self, _codes):
        sh, sz = self.__split_codes(_codes)
        logger_local_huaxin_l2_subscript.info(f"取消订阅上证:{sh}")
        logger_local_huaxin_l2_subscript.info(f"取消订阅深证:{sz}")
        if sh:
            # 取消订阅逐笔委托
            self.__api.UnSubscribeMarketData(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
        if sz:
            self.__api.UnSubscribeMarketData(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
    def __subscribe(self, _codes):
        sh, sz = self.__split_codes(_codes)
        logger_local_huaxin_l2_subscript.info(f"订阅上证:{sh}")
        logger_local_huaxin_l2_subscript.info(f"订阅深证:{sz}")
        if sh:
            # 订阅逐笔委托
            result = self.__api.SubscribeMarketData(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
            logger_local_huaxin_l2_subscript.info(f"市场订阅结果sh:{result}")
        if sz:
            result = self.__api.SubscribeMarketData(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
            logger_local_huaxin_l2_subscript.info(f"市场订阅结果sz:{result}")
    def __process_codes_data(self, codes):
        codes = set(codes)
        if not self.is_login and not constant.TEST:
            raise Exception("L2尚未登录")
        add_codes = codes - self.subscripted_codes
        del_codes = self.subscripted_codes - codes
        print("add del codes", add_codes, del_codes)
        self.__subscribe(add_codes)
        self.__unsubscribe(del_codes)
        # 设置最近的代码列表
        self.latest_codes_set = codes
    # 订阅代码,[代码,...]
    def set_codes_data(self, codes):
        try:
            self.__process_codes_data(codes)
        except Exception as e:
            logging.exception(e)
            logger_l2_codes_subscript.exception(e)
        finally:
            # 保存一份最新的数据
            self.__set_latest_datas(codes)
    @classmethod
    def __set_latest_datas(cls, codes_data):
        data_str = json.dumps([tool.get_now_date_str(), codes_data])
        with open(constant.L2_CODES_INFO_PATH, mode='w') as f:
            f.write(data_str)
    @classmethod
    def __get_latest_datas(cls):
        if os.path.exists(constant.L2_CODES_INFO_PATH):
            with open(constant.L2_CODES_INFO_PATH, mode='r') as f:
                str_ = f.readline()
                data_json = json.loads(str_)
                if data_json[0] == tool.get_now_date_str():
                    return data_json[1]
        return []
    def OnFrontConnected(self):
        print("OnFrontConnected")
        logger_system.info(f"l2_client OnFrontConnected 线程ID:{tool.get_thread_id()}")
        logout_req = lev2mdapi.CTORATstpUserLogoutField()
        self.__api.ReqUserLogout(logout_req, 1)
        time.sleep(1)
        # 请求登录
        login_req = lev2mdapi.CTORATstpReqUserLoginField()
        self.__api.ReqUserLogin(login_req, 2)
    def OnRspUserLogin(self, pRspUserLogin, pRspInfo, nRequestID, bIsLast):
        print("OnRspUserLogin: ErrorID[%d] ErrorMsg[%s] RequestID[%d] IsLast[%d]" % (
            pRspInfo['ErrorID'], pRspInfo['ErrorMsg'], nRequestID, bIsLast))
        if pRspInfo['ErrorID'] == 0:
            print("----L2行情登录成功----")
            self.is_login = True
            logger_system.info(f"L2行情登录成功")
            # 初始设置值
            threading.Thread(
                target=lambda: self.__process_codes_data(self.__get_latest_datas(), from_cache=True, delay=6.0),
                daemon=True).start()
    def OnRtnMarketData(self, pDepthMarketData, FirstLevelBuyNum, FirstLevelBuyOrderVolumes, FirstLevelSellNum,
                        FirstLevelSellOrderVolumes):
        # 传入:时间,现价,成交总量,买1,买2,买3,买4,买5,卖1,卖2,卖3,卖4,卖5
        try:
            d = {"dataTimeStamp": pDepthMarketData['DataTimeStamp'], "securityID": pDepthMarketData['SecurityID'],
                 "lastPrice": pDepthMarketData['LastPrice'],
                 "totalVolumeTrade": pDepthMarketData['TotalVolumeTrade'],
                 "totalValueTrade": pDepthMarketData['TotalValueTrade'],
                 "totalAskVolume": pDepthMarketData['TotalAskVolume'],
                 "avgAskPrice": pDepthMarketData["AvgAskPrice"],
                 "buy": [(pDepthMarketData['BidPrice1'], pDepthMarketData['BidVolume1']),
                         (pDepthMarketData['BidPrice2'], pDepthMarketData['BidVolume2']),
                         (pDepthMarketData['BidPrice3'], pDepthMarketData['BidVolume3']),
                         (pDepthMarketData['BidPrice4'], pDepthMarketData['BidVolume4']),
                         (pDepthMarketData['BidPrice5'], pDepthMarketData['BidVolume5'])],
                 "sell": [
                     (pDepthMarketData['AskPrice1'], pDepthMarketData['AskVolume1']),
                     (pDepthMarketData['AskPrice2'], pDepthMarketData['AskVolume2']),
                     (pDepthMarketData['AskPrice3'], pDepthMarketData['AskVolume3']),
                     (pDepthMarketData['AskPrice4'], pDepthMarketData['AskVolume4']),
                     (pDepthMarketData['AskPrice5'], pDepthMarketData['AskVolume5'])
                 ]}
            huaxin_l2_log.info(logger_local_huaxin_l2_market, f"{d}")
        except:
            pass
def __init_l2(l2_data_upload_manager):
    print(lev2mdapi.CTORATstpLev2MdApi_GetApiVersion())
    # case 1: Tcp方式
    # g_SubMode=lev2mdapi.TORA_TSTP_MST_TCP
    # case 2: 组播方式
    g_SubMode = lev2mdapi.TORA_TSTP_MST_MCAST
    # case 1缓存模式
    api = lev2mdapi.CTORATstpLev2MdApi_CreateTstpLev2MdApi(g_SubMode, True)
    # case 2非缓存模式
    # api = lev2mdapi.CTORATstpLev2MdApi_CreateTstpLev2MdApi(g_SubMode, False)
    global spi
    spi = Lev2MdSpi(api, l2_data_upload_manager)
    api.RegisterSpi(spi)
    # -------------------正式模式-------------------------------------
    if g_SubMode != lev2mdapi.TORA_TSTP_MST_MCAST:
        api.RegisterFront(Front_Address)
    else:
        # case 1 从一个组播地址收取行情
        api.RegisterMulticast(Multicast_Address, Local_Interface_Address, "")
        # api.RegisterMulticast(Multicast_Address2, Local_Interface_Address, "")
    # case 2:注册多个组播地址同时收行情
    # api.RegisterMulticast(Multicast_Address, Local_Interface_Address, "");
    # api.RegisterMulticast(Multicast_Address2, Local_Interface_Address, "");
    # case 3:efvi模式收行情
    # api.RegisterMulticast(Multicast_Address, Local_Interface_Address, "", "enp101s0f0",4096, True);
    # case 1 不绑核运行
    api.Init()
__l2_cmd_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=3)
pipe_strategy = None
def run() -> None:
    logger_system.info("L2进程ID:{}", os.getpid())
    logger_system.info(f"l2_client 线程ID:{tool.get_thread_id()}")
    try:
        log.close_print()
        # 初始化
        data_callback_distribute_manager = CodeDataCallbackDistributeManager(data_callbacks)
        l2_data_upload_manager = L2DataUploadManager(data_callback_distribute_manager)
        __init_l2(l2_data_upload_manager)
        # 订阅L2
        codes_sh, codes_sz = l1_subscript_codes_manager.get_codes()
        codes = set()
        for code in codes_sh:
            codes.add(code.decode("utf-8"))
        for code in codes_sz:
            codes.add(code.decode("utf-8"))
        spi.set_codes_data(codes)
        threading.Thread(target=huaxin_l2_log.run_sync, daemon=True).start()
    except Exception as e:
        logger_system.exception(e)
    while True:
        time.sleep(2)
if __name__ == "__main__":
    run()
l2_market_main.py
New file
@@ -0,0 +1,5 @@
from huaxin_client import l2_market_client, l2_data_manager
if __name__ == "__main__":
    l2_market_client.run()
    # l2_data_manager.add_subscript_codes(["000333"])
l2_market_main.spec
New file
@@ -0,0 +1,50 @@
# -*- mode: python ; coding: utf-8 -*-
block_cipher = None
a = Analysis(
    ['l2_market_main.py'],
    pathex=[],
    binaries=[],
    datas=[],
    hiddenimports=[],
    hookspath=[],
    hooksconfig={},
    runtime_hooks=[],
    excludes=[],
    win_no_prefer_redirects=False,
    win_private_assemblies=False,
    noarchive=False,
    cipher=block_cipher,
)
pyz = PYZ(a.pure, a.zipped_data, cipher=block_cipher)
exe = EXE(
    pyz,
    a.scripts,
    [],
    exclude_binaries=True,
    name='l2_market_main',
    debug=False,
    bootloader_ignore_signals=False,
    strip=False,
    upx=True,
    console=True,
    disable_windowed_traceback=False,
    argv_emulation=False,
    target_arch=None,
    codesign_identity=None,
    entitlements_file=None,
)
coll = COLLECT(
    exe,
    a.binaries,
    a.zipfiles,
    a.datas,
    strip=False,
    upx=True,
    upx_exclude=[],
    name='l2_market_main',
)
log_module/log.py
@@ -269,6 +269,13 @@
        logger.add(self.get_local_huaxin_path("l2", "subscript"),
                   filter=lambda record: record["extra"].get("name") == "local_huaxin_subscript",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_local_huaxin_path("l2", "market"),
                   filter=lambda record: record["extra"].get("name") == "local_huaxin_l2_market",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_local_huaxin_path("contact", "debug"),
                   filter=lambda record: record["extra"].get("name") == "local_huaxin_debug",
                   rotation="00:00", compression="zip", enqueue=True)
@@ -416,6 +423,7 @@
logger_local_huaxin_l2_upload = __mylogger.get_logger("local_huaxin_upload")
logger_local_huaxin_l2_error = __mylogger.get_logger("local_huaxin_error")
logger_local_huaxin_l2_subscript = __mylogger.get_logger("local_huaxin_subscript")
logger_local_huaxin_l2_market = __mylogger.get_logger("local_huaxin_l2_market")
logger_local_huaxin_contact_debug = __mylogger.get_logger("local_huaxin_debug")
logger_local_huaxin_trade_debug = __mylogger.get_logger("local_huaxin_trade_debug")
logger_local_huaxin_l1 = __mylogger.get_logger("local_huaxin_l1_show_info")
utils/middle_api_protocol.py
@@ -62,8 +62,8 @@
# ------------------------------L2订阅列表------------------------------------
def load_l2_subscript_codes(datas):
    fdata = {"type": "l2_subscript_codes", "data": {"ctype": "l2_subscript_codes", "data": datas}}
def load_l2_position_subscript_codes(datas):
    fdata = {"type": "l2_position_subscript_codes", "data": {"ctype": "l2_position_subscript_codes", "data": datas}}
    return fdata