Administrator
2024-05-08 0287e8511b9fa75f5714c74e26cddd61f6dcac6c
可转债正股代码订阅
4个文件已添加
490 ■■■■■ 已修改文件
cb_main.py 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
cb_main.spec 50 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_client_for_cb.py 428 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/test.py 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
cb_main.py
New file
@@ -0,0 +1,7 @@
"""
可转债入口函数
"""
from huaxin_client import l2_client_for_cb
if __name__ == '__main__':
    l2_client_for_cb.run()
cb_main.spec
New file
@@ -0,0 +1,50 @@
# -*- mode: python ; coding: utf-8 -*-
block_cipher = None
a = Analysis(
    ['cb_main.py'],
    pathex=[],
    binaries=[],
    datas=[],
    hiddenimports=[],
    hookspath=[],
    hooksconfig={},
    runtime_hooks=[],
    excludes=[],
    win_no_prefer_redirects=False,
    win_private_assemblies=False,
    noarchive=False,
    cipher=block_cipher,
)
pyz = PYZ(a.pure, a.zipped_data, cipher=block_cipher)
exe = EXE(
    pyz,
    a.scripts,
    [],
    exclude_binaries=True,
    name='cb_main',
    debug=False,
    bootloader_ignore_signals=False,
    strip=False,
    upx=True,
    console=True,
    disable_windowed_traceback=False,
    argv_emulation=False,
    target_arch=None,
    codesign_identity=None,
    entitlements_file=None,
)
coll = COLLECT(
    exe,
    a.binaries,
    a.zipfiles,
    a.datas,
    strip=False,
    upx=True,
    upx_exclude=[],
    name='cb_main',
)
huaxin_client/l2_client_for_cb.py
New file
@@ -0,0 +1,428 @@
# -*- coding: utf-8 -*-
"""
可转债正股L2订阅
"""
import decimal
import json
import logging
import multiprocessing
import os
import queue
import threading
import time
import concurrent.futures
from code_atrribute.history_k_data_util import JueJinHttpApi, JueJinApi
from huaxin_client import command_manager
from huaxin_client import constant
from huaxin_client import l2_data_manager
import lev2mdapi
from huaxin_client.code_queue_distribute_manager import CodeDataCallbackDistributeManager
from huaxin_client.command_manager import L2ActionCallback
from huaxin_client.l2_data_manager import L2DataUploadManager
from log_module import log, async_log_util
from log_module.async_log_util import huaxin_l2_log
from log_module.log import logger_local_huaxin_l2_subscript, logger_system, logger_l2_codes_subscript, \
    logger_local_huaxin_l2_transaction
from utils import tool
###B类###
Front_Address = "tcp://10.0.1.101:6900"
Multicast_Address = "udp://224.224.2.19:7889"
Multicast_Address2 = "udp://224.224.224.234:7890"
Local_Interface_Address = constant.LOCAL_IP
g_SubMarketData = False
g_SubTransaction = False
g_SubOrderDetail = False
g_SubXTSTick = False
g_SubXTSMarketData = False
g_SubNGTSTick = False
g_SubBondMarketData = False
g_SubBondTransaction = False
g_SubBondOrderDetail = False
SH_Securities = []
SH_XTS_Securities = []
SZ_Securities = []
SZ_Bond_Securities = []
set_codes_data_queue = queue.Queue()
market_code_dict = {}
class Lev2MdSpi(lev2mdapi.CTORATstpLev2MdSpi):
    latest_codes_set = set()
    special_code_volume_for_order_dict = {}
    # 已经订阅的代码
    subscripted_codes = set()
    # 代码的上次成交的订单唯一索引
    __last_transaction_keys_dict = {}
    limit_up_price_dict = {}
    # 买入的大单订单号
    def __init__(self, api, ):
        lev2mdapi.CTORATstpLev2MdSpi.__init__(self)
        self.__api = api
        self.is_login = False
    def __split_codes(self, codes):
        szse_codes = []
        sse_codes = []
        for code in codes:
            if code.find("00") == 0:
                szse_codes.append(code.encode())
            elif code.find("60") == 0:
                sse_codes.append(code.encode())
        return sse_codes, szse_codes
    # 新增订阅
    # 取消订阅
    def __unsubscribe(self, _codes):
        sh, sz = self.__split_codes(_codes)
        logger_local_huaxin_l2_subscript.info(f"取消订阅上证:{sh}")
        logger_local_huaxin_l2_subscript.info(f"取消订阅深证:{sz}")
        if sh:
            # 取消订阅逐笔成交
            self.__api.UnSubscribeTransaction(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
        if sz:
            self.__api.UnSubscribeTransaction(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
    def __subscribe(self, _codes):
        sh, sz = self.__split_codes(_codes)
        logger_local_huaxin_l2_subscript.info(f"订阅上证:{sh}")
        logger_local_huaxin_l2_subscript.info(f"订阅深证:{sz}")
        if sh:
            # 订阅逐笔成交
            result = self.__api.SubscribeTransaction(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
            logger_local_huaxin_l2_subscript.info(f"逐笔成交订阅结果sh:{result}")
        if sz:
            result = self.__api.SubscribeTransaction(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
            logger_local_huaxin_l2_subscript.info(f"逐笔成交订阅结果sz:{result}")
    def __process_codes_data(self, codes, from_cache=False, delay=0.0):
        codes = set(codes)
        if not self.is_login and not constant.TEST:
            raise Exception("L2尚未登录")
        if delay > 0:
            time.sleep(delay)
        add_codes = codes - self.subscripted_codes
        del_codes = self.subscripted_codes - codes
        print("add del codes", add_codes, del_codes)
        try:
            for c in del_codes:
                l2_data_manager.del_target_code(c)
            for c in codes:
                l2_data_manager.add_target_code(c)
        except Exception as e:
            logger_system.error(f"L2代码分配上传队列出错:{str(e)}")
            logger_system.exception(e)
        self.__subscribe(add_codes)
        self.__unsubscribe(del_codes)
        if add_codes:
            logger_system.info(f"新增L2订阅代码数量({'缓存' if from_cache else ''}):{len(add_codes)}")
        logger_l2_codes_subscript.info("华鑫L2订阅结束,add-{} del-{}", len(add_codes), len(del_codes))
        # 设置最近的代码列表
        self.latest_codes_set = codes
    # 订阅代码,[代码,...]
    def set_codes_data(self, codes):
        try:
            self.__process_codes_data(codes)
        except Exception as e:
            logging.exception(e)
            logger_l2_codes_subscript.exception(e)
        finally:
            # 保存一份最新的数据
            self.__set_latest_datas(codes)
    @classmethod
    def __set_latest_datas(cls, codes_data):
        data_str = json.dumps([tool.get_now_date_str(), codes_data])
        with open(constant.L2_CODES_INFO_PATH, mode='w') as f:
            f.write(data_str)
    @classmethod
    def __get_latest_datas(cls):
        if os.path.exists(constant.L2_CODES_INFO_PATH):
            with open(constant.L2_CODES_INFO_PATH, mode='r') as f:
                str_ = f.readline()
                data_json = json.loads(str_)
                if data_json[0] == tool.get_now_date_str():
                    return data_json[1]
        return []
    def OnFrontConnected(self):
        logger_system.info(f"l2_client OnFrontConnected 线程ID:{tool.get_thread_id()}")
        logout_req = lev2mdapi.CTORATstpUserLogoutField()
        self.__api.ReqUserLogout(logout_req, 1)
        time.sleep(1)
        # 请求登录
        login_req = lev2mdapi.CTORATstpReqUserLoginField()
        self.__api.ReqUserLogin(login_req, 2)
    def OnRspUserLogin(self, pRspUserLogin, pRspInfo, nRequestID, bIsLast):
        if pRspInfo['ErrorID'] == 0:
            print("----L2行情登录成功----")
            self.is_login = True
            logger_system.info(f"L2行情登录成功")
            # 初始设置值
            threading.Thread(
                target=lambda: self.__process_codes_data(self.__get_latest_datas(), from_cache=True, delay=6.0),
                daemon=True).start()
    def OnRspSubTransaction(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
        async_log_util.info(logger_local_huaxin_l2_subscript,
                            f"订阅结果:{pSpecificSecurity['SecurityID']} {pRspInfo['ErrorID']} {pRspInfo['ErrorMsg']}")
        if pRspInfo["ErrorID"] == 0:
            self.subscripted_codes.add(pSpecificSecurity['SecurityID'])
        if bIsLast == 1:
            l2_data_manager.add_subscript_codes(self.subscripted_codes)
    def OnRspUnSubTransaction(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
        try:
            code = pSpecificSecurity['SecurityID']
            self.subscripted_codes.discard(code)
            if bIsLast == 1:
                l2_data_manager.add_subscript_codes(self.subscripted_codes)
        except Exception as e:
            logging.exception(e)
    def OnRtnTransaction(self, pTransaction):
        # 输出逐笔成交数据
        if pTransaction['ExecType'] == b"2":
            # 撤单
            item = {"SecurityID": pTransaction['SecurityID'], "Price": pTransaction['TradePrice'],
                    "Volume": pTransaction['TradeVolume'],
                    "OrderType": "2",
                    "OrderTime": pTransaction['TradeTime'], "MainSeq": pTransaction['MainSeq'],
                    "SubSeq": pTransaction['SubSeq'],
                    "OrderStatus": "D"}
            buyNo = pTransaction['BuyNo']
            sellNo = pTransaction['SellNo']
            if buyNo > 0:
                # 买
                item["OrderNO"] = buyNo
                item["Side"] = "1"
            elif sellNo > 0:
                # 卖
                item["OrderNO"] = sellNo
                item["Side"] = "2"
        else:
            item = {"SecurityID": pTransaction['SecurityID'], "TradePrice": pTransaction['TradePrice'],
                    "TradeVolume": pTransaction['TradeVolume'],
                    "OrderTime": pTransaction['TradeTime'], "MainSeq": pTransaction['MainSeq'],
                    "SubSeq": pTransaction['SubSeq'], "BuyNo": pTransaction['BuyNo'],
                    "SellNo": pTransaction['SellNo'],
                    "ExecType": pTransaction['ExecType'].decode()}
            if pTransaction['TradePrice'] == self.limit_up_price_dict.get(pTransaction['SecurityID']):
                # TODO 成交价是涨停价才输出
                huaxin_l2_log.info(logger_local_huaxin_l2_transaction, f"{item}")
            else:
                pass
class MyL2ActionCallback(L2ActionCallback):
    def OnSetL2Position(self, codes):
        huaxin_l2_log.info(logger_l2_codes_subscript, "华鑫L2代码处理队列获取到数据:数量-{}", len(codes))
        try:
            spi.set_codes_data(codes)
        except Exception as e:
            logging.exception(e)
def __init_l2():
    print(lev2mdapi.CTORATstpLev2MdApi_GetApiVersion())
    # case 1: Tcp方式
    # g_SubMode=lev2mdapi.TORA_TSTP_MST_TCP
    # case 2: 组播方式
    g_SubMode = lev2mdapi.TORA_TSTP_MST_MCAST
    # case 1缓存模式
    api = lev2mdapi.CTORATstpLev2MdApi_CreateTstpLev2MdApi(g_SubMode, True)
    # case 2非缓存模式
    # api = lev2mdapi.CTORATstpLev2MdApi_CreateTstpLev2MdApi(g_SubMode, False)
    global spi
    spi = Lev2MdSpi(api)
    api.RegisterSpi(spi)
    # -------------------正式模式-------------------------------------
    if g_SubMode != lev2mdapi.TORA_TSTP_MST_MCAST:
        api.RegisterFront(Front_Address)
    else:
        # case 1 从一个组播地址收取行情
        api.RegisterMulticast(Multicast_Address, Local_Interface_Address, "")
        # api.RegisterMulticast(Multicast_Address2, Local_Interface_Address, "")
    # case 2:注册多个组播地址同时收行情
    # api.RegisterMulticast(Multicast_Address, Local_Interface_Address, "");
    # api.RegisterMulticast(Multicast_Address2, Local_Interface_Address, "");
    # case 3:efvi模式收行情
    # api.RegisterMulticast(Multicast_Address, Local_Interface_Address, "", "enp101s0f0",4096, True);
    # case 1 不绑核运行
    api.Init()
__l2_cmd_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=3)
def __receive_from_queue_trade(queue_trade_w_l2_r: multiprocessing.Queue):
    logger_system.info(f"l2_client __receive_from_pipe_trade 线程ID:{tool.get_thread_id()}")
    while True:
        try:
            value = queue_trade_w_l2_r.get()
            if value:
                if type(value) == bytes:
                    value = value.decode("utf-8")
                data = json.loads(value)
                _type = data["type"]
                if _type == "l2_cmd":
                    huaxin_l2_log.info(logger_local_huaxin_l2_subscript, f"订阅代码:{data}")
                    __start_time = time.time()
                    # 线程池
                    __l2_cmd_thread_pool.submit(
                        lambda: l2CommandManager.process_command(command_manager.CLIENT_TYPE_CMD_L2, None, data))
                    use_time = time.time() - __start_time
                    if use_time > 0.005:
                        huaxin_l2_log.info(logger_local_huaxin_l2_subscript, f"l2_cmd耗时:{use_time}s")
        except Exception as e:
            logging.exception(e)
pipe_strategy = None
def test_add_codes(queue_r):
    time.sleep(10)
    # if value:
    #     if type(value) == bytes:
    #         value = value.decode("utf-8")
    #     data = json.loads(value)
    #     _type = data["type"]
    #     if _type == "listen_volume":
    #         volume = data["data"]["volume"]
    #         code = data["data"]["code"]
    #         spi.set_code_special_watch_volume(code, volume)
    #     elif _type == "l2_cmd":
    #         l2CommandManager.process_command(command_manager.CLIENT_TYPE_CMD_L2, None, data)
    time.sleep(2)
    demo_datas = ["603002",
                  "002654",
                  "603701",
                  "002908"]
    queue_r.put_nowait(json.dumps({"type": "l2_cmd", "data": demo_datas}))
    time.sleep(10)
    while True:
        try:
            spi.l2_data_upload_manager.add_l2_order_detail(
                {'SecurityID': '603002', 'Price': 6.35, 'Volume': 275000, 'Side': "1", 'OrderType': '0',
                 'OrderTime': '13000015',
                 'MainSeq': 2, 'SubSeq': 6739147, 'OrderNO': 5512466, 'OrderStatus': 'D'}, 0)
            spi.l2_data_upload_manager.add_l2_order_detail(
                {'SecurityID': '603002', 'Price': 6.35, 'Volume': 200, 'Side': "1", 'OrderType': '0',
                 'OrderTime': '13000015',
                 'MainSeq': 2, 'SubSeq': 6739148, 'OrderNO': 5512467, 'OrderStatus': 'D'}, 0)
            # queue_r.put_nowait(json.dumps({"type": "listen_volume", "data": {"code": "603002", "volume": 100}}))
            time.sleep(0.1)
            spi.l2_data_upload_manager.add_l2_order_detail(
                {'SecurityID': '603002', 'Price': 6.35, 'Volume': 100, 'Side': "1", 'OrderType': '0',
                 'OrderTime': '13000015',
                 'MainSeq': 2, 'SubSeq': 6739148, 'OrderNO': 5512467, 'OrderStatus': 'D'}, 0)
        except Exception as e:
            logging.exception(e)
        finally:
            time.sleep(10)
def get_subscript_codes():
    """
    获取需要订阅的代码
    :return:
    """
    results = JueJinHttpApi.get_exchanges_codes("SHSE,SZSE", sec_types=[8], skip_suspended=True, skip_st=True,
                                                fields="symbol, sec_type, sec_id,sec_name, underlying_symbol, delisted_date")
    fresults = []
    for r in results:
        tool.get_now_date_str()
        if int(tool.get_now_date_str('%Y%m%d')) >= int(r['delisted_date'].strftime('%Y%m%d')):
            continue
        fresults.append(r)
    print(len(fresults))
    return [x['underlying_symbol'].split('.')[1] for x in fresults]
def get_pre_price(codes):
    """
    获取昨日收盘价
    :param codes:
    :return:
    """
    symbols = JueJinApi.get_juejin_code_list_with_prefix(codes)
    results = JueJinHttpApi.get_instruments(symbols, 'pre_close,sec_id')
    return {x['sec_id']: round(x['pre_close'], 2) for x in results}
def __init_data():
    """
    初始化数据
    :return:
    """
    codes = None
    # 获取目标代码
    for i in range(3):
        try:
            codes = get_subscript_codes()
            if codes:
                break
        except:
            time.sleep(5)
    if codes:
        # 设置订阅代码
        spi.set_codes_data(codes)
        # 获取目标代码的收盘价
        pre_price_dict = None
        for i in range(3):
            try:
                pre_price_dict = get_pre_price(codes)
                if pre_price_dict:
                    break
            except:
                time.sleep(5)
        if pre_price_dict:
            for k in pre_price_dict:
                limit_up_price = tool.to_price(decimal.Decimal(str(pre_price_dict[k])) * decimal.Decimal("1.1"))
                Lev2MdSpi.limit_up_price_dict[k] = round(float(limit_up_price), 2)
def run() -> None:
    logger_system.info("可转债L2进程ID:{}", os.getpid())
    logger_system.info(f"可转债l2_client 线程ID:{tool.get_thread_id()}")
    try:
        log.close_print()
        # 初始化
        __init_l2()
        __init_data()
        threading.Thread(target=huaxin_l2_log.run_sync, daemon=True).start()
        # TODO 测试
        # threading.Thread(target=lambda: test_add_codes(queue_r), daemon=True).start()
        global l2CommandManager
        l2CommandManager = command_manager.L2CommandManager()
        logger_system.info("可转债L2订阅服务启动成功")
    except Exception as e:
        logger_system.exception(e)
    while True:
        time.sleep(2)
if __name__ == "__main__":
    run()
test/test.py
New file
@@ -0,0 +1,5 @@
from huaxin_client import l2_client_for_cb
if __name__ == "__main__":
    codes = l2_client_for_cb.get_subscript_codes()
    print(l2_client_for_cb.get_pre_price(codes))