Administrator
2024-05-21 f5d3bd2bb47e7d1983158d281e2f059ac65a1bd6
真实下单位修复/深证下单方式修改/9:25之前L2订阅首板开1代码
3个文件已修改
1个文件已添加
312 ■■■■■ 已修改文件
huaxin_client/l2_market_client.py 294 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/huaxin/huaxin_delegate_postion_manager.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_manager_new.py 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.py 7 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_market_client.py
New file
@@ -0,0 +1,294 @@
# -*- coding: utf-8 -*-
import decimal
import json
import logging
import multiprocessing
import os
import queue
import time
import concurrent.futures
from huaxin_client import  l1_subscript_codes_manager
from huaxin_client import constant
import lev2mdapi
from huaxin_client.l2_data_manager import L2DataUploadManager
from log_module.async_log_util import huaxin_l2_log
from log_module.log import logger_local_huaxin_l2_subscript, logger_system, logger_l2_codes_subscript, hx_logger_l2_market_data
from utils import tool
###B类###
Front_Address = "tcp://10.0.1.101:6900"
Multicast_Address = "udp://224.224.2.19:7889"
Multicast_Address2 = "udp://224.224.224.234:7890"
Local_Interface_Address = constant.LOCAL_IP
set_codes_data_queue = queue.Queue()
market_code_dict = {}
class Lev2MdSpi(lev2mdapi.CTORATstpLev2MdSpi):
    latest_codes_set = set()
    special_code_volume_for_order_dict = {}
    # 已经订阅的代码
    subscripted_codes = set()
    # 涨停代码
    __limit_up_codes = set()
    # 买入的大单订单号
    def __init__(self, api, l2_data_upload_manager: L2DataUploadManager):
        lev2mdapi.CTORATstpLev2MdSpi.__init__(self)
        self.__api = api
        self.is_login = False
        self.l2_data_upload_manager = l2_data_upload_manager
    def __split_codes(self, codes):
        szse_codes = []
        sse_codes = []
        for code in codes:
            if code.find("00") == 0:
                szse_codes.append(code.encode())
            elif code.find("60") == 0:
                sse_codes.append(code.encode())
        return sse_codes, szse_codes
    def __unsubscribe(self, _codes):
        sh, sz = self.__split_codes(_codes)
        logger_local_huaxin_l2_subscript.info(f"取消订阅上证:{sh}")
        logger_local_huaxin_l2_subscript.info(f"取消订阅深证:{sz}")
        if sh:
            # 取消订阅逐笔委托
            self.__api.UnSubscribeMarketData(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
        if sz:
            self.__api.UnSubscribeMarketData(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
    def __subscribe(self, _codes):
        sh, sz = self.__split_codes(_codes)
        logger_local_huaxin_l2_subscript.info(f"订阅上证:{sh}")
        logger_local_huaxin_l2_subscript.info(f"订阅深证:{sz}")
        if sh:
            # 订阅逐笔委托
            result = self.__api.SubscribeMarketData(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
            logger_local_huaxin_l2_subscript.info(f"市场订阅结果sh:{result}")
        if sz:
            result = self.__api.SubscribeMarketData(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
            logger_local_huaxin_l2_subscript.info(f"市场订阅结果sz:{result}")
    def __process_codes_data(self, codes):
        codes = set(codes)
        if not self.is_login and not constant.TEST:
            raise Exception("L2尚未登录")
        add_codes = codes - self.subscripted_codes
        del_codes = self.subscripted_codes - codes
        print("add del codes", add_codes, del_codes)
        self.__subscribe(add_codes)
        self.__unsubscribe(del_codes)
        # 设置最近的代码列表
        self.latest_codes_set = codes
    # 订阅代码,[代码,...]
    def set_codes_data(self, codes):
        print("订阅代码数量:", len(codes))
        try:
            self.__process_codes_data(codes)
        except Exception as e:
            logging.exception(e)
            logger_l2_codes_subscript.exception(e)
        finally:
            # 保存一份最新的数据
            pass
    @classmethod
    def __set_latest_datas(cls, codes_data):
        data_str = json.dumps([tool.get_now_date_str(), codes_data])
        with open(constant.L2_CODES_INFO_PATH, mode='w') as f:
            f.write(data_str)
    @classmethod
    def __get_latest_datas(cls):
        if os.path.exists(constant.L2_CODES_INFO_PATH):
            with open(constant.L2_CODES_INFO_PATH, mode='r') as f:
                str_ = f.readline()
                data_json = json.loads(str_)
                if data_json[0] == tool.get_now_date_str():
                    return data_json[1]
        return []
    def OnFrontConnected(self):
        print("OnFrontConnected")
        logger_system.info(f"l2_client OnFrontConnected 线程ID:{tool.get_thread_id()}")
        logout_req = lev2mdapi.CTORATstpUserLogoutField()
        self.__api.ReqUserLogout(logout_req, 1)
        time.sleep(1)
        # 请求登录
        login_req = lev2mdapi.CTORATstpReqUserLoginField()
        self.__api.ReqUserLogin(login_req, 2)
    def OnRspUserLogin(self, pRspUserLogin, pRspInfo, nRequestID, bIsLast):
        print("OnRspUserLogin: ErrorID[%d] ErrorMsg[%s] RequestID[%d] IsLast[%d]" % (
            pRspInfo['ErrorID'], pRspInfo['ErrorMsg'], nRequestID, bIsLast))
        if pRspInfo['ErrorID'] == 0:
            print("----L2行情登录成功----")
            self.is_login = True
            logger_system.info(f"L2行情登录成功")
            # 初始设置值
            # threading.Thread(
            #     target=lambda: self.__process_codes_data(self.__get_latest_datas()),
            #     daemon=True).start()
            # 订阅L2
            codes_sh, codes_sz = l1_subscript_codes_manager.get_codes()
            codes = set()
            for code in codes_sh:
                codes.add(code.decode("utf-8"))
            for code in codes_sz:
                codes.add(code.decode("utf-8"))
            self.set_codes_data(codes)
    def OnRtnMarketData(self, pDepthMarketData, FirstLevelBuyNum, FirstLevelBuyOrderVolumes, FirstLevelSellNum,
                        FirstLevelSellOrderVolumes):
        # 传入:时间,现价,成交总量,买1,买2,买3,买4,买5,卖1,卖2,卖3,卖4,卖5
        try:
            d = {"dataTimeStamp": pDepthMarketData['DataTimeStamp'], "securityID": pDepthMarketData['SecurityID'],
                 "preClosePrice": pDepthMarketData['PreClosePrice'],
                 "lastPrice": pDepthMarketData['LastPrice'],
                 "totalBidVolume": pDepthMarketData['TotalBidVolume'],
                 "avgBidPrice": pDepthMarketData['AvgBidPrice'],
                 "totalAskVolume": pDepthMarketData['TotalAskVolume'],
                 "avgAskPrice": pDepthMarketData["AvgAskPrice"]
                 # "buy": [(pDepthMarketData['BidPrice1'], pDepthMarketData['BidVolume1']),
                 #         (pDepthMarketData['BidPrice2'], pDepthMarketData['BidVolume2']),
                 #         (pDepthMarketData['BidPrice3'], pDepthMarketData['BidVolume3']),
                 #         (pDepthMarketData['BidPrice4'], pDepthMarketData['BidVolume4']),
                 #         (pDepthMarketData['BidPrice5'], pDepthMarketData['BidVolume5'])],
                 # "sell": [
                 #     (pDepthMarketData['AskPrice1'], pDepthMarketData['AskVolume1']),
                 #     (pDepthMarketData['AskPrice2'], pDepthMarketData['AskVolume2']),
                 #     (pDepthMarketData['AskPrice3'], pDepthMarketData['AskVolume3']),
                 #     (pDepthMarketData['AskPrice4'], pDepthMarketData['AskVolume4']),
                 #     (pDepthMarketData['AskPrice5'], pDepthMarketData['AskVolume5'])
                 # ]
                 }
            limit_up_count = len(self.__limit_up_codes)
            # 获取是否涨停价
            limit_up_price = float(
                tool.to_price(decimal.Decimal(str(pDepthMarketData['PreClosePrice'])) * decimal.Decimal("1.1")))
            if abs(limit_up_price - pDepthMarketData['LastPrice']) < 0.001 or abs(
                    limit_up_price - pDepthMarketData['BidPrice1']) < 0.001:
                huaxin_l2_log.info(hx_logger_l2_market_data, f"{d}")
                self.__limit_up_codes.add(pDepthMarketData['SecurityID'])
            else:
                self.__limit_up_codes.discard(pDepthMarketData['SecurityID'])
            if pDepthMarketData.SecurityID in self.__limit_up_codes:
                market_code_dict[pDepthMarketData.SecurityID] = (
                    pDepthMarketData.SecurityID, pDepthMarketData.LastPrice, 0.1, pDepthMarketData.TotalBidVolume,
                    time.time(),
                    pDepthMarketData.BidPrice1, pDepthMarketData.BidVolume1)
            else:
                if pDepthMarketData.SecurityID in market_code_dict:
                    market_code_dict.pop(pDepthMarketData.SecurityID)
            if limit_up_count != len(self.__limit_up_codes):
                huaxin_l2_log.info(hx_logger_l2_market_data, f"涨停代码:{self.__limit_up_codes}")
        except:
            pass
def __init_l2(l2_data_upload_manager):
    print(lev2mdapi.CTORATstpLev2MdApi_GetApiVersion())
    # case 1: Tcp方式
    # g_SubMode=lev2mdapi.TORA_TSTP_MST_TCP
    # case 2: 组播方式
    g_SubMode = lev2mdapi.TORA_TSTP_MST_MCAST
    # case 1缓存模式
    api = lev2mdapi.CTORATstpLev2MdApi_CreateTstpLev2MdApi(g_SubMode, True)
    # case 2非缓存模式
    # api = lev2mdapi.CTORATstpLev2MdApi_CreateTstpLev2MdApi(g_SubMode, False)
    global spi
    spi = Lev2MdSpi(api, l2_data_upload_manager)
    api.RegisterSpi(spi)
    # -------------------正式模式-------------------------------------
    if g_SubMode != lev2mdapi.TORA_TSTP_MST_MCAST:
        api.RegisterFront(Front_Address)
    else:
        # case 1 从一个组播地址收取行情
        api.RegisterMulticast(Multicast_Address, Local_Interface_Address, "")
        # api.RegisterMulticast(Multicast_Address2, Local_Interface_Address, "")
    # case 2:注册多个组播地址同时收行情
    # api.RegisterMulticast(Multicast_Address, Local_Interface_Address, "");
    # api.RegisterMulticast(Multicast_Address2, Local_Interface_Address, "");
    # case 3:efvi模式收行情
    # api.RegisterMulticast(Multicast_Address, Local_Interface_Address, "", "enp101s0f0",4096, True);
    # case 1 不绑核运行
    api.Init()
__l2_cmd_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=3)
pipe_strategy = None
__latest_subscript_codes = set()
def __upload_codes_info(queue_l1_w_strategy_r: multiprocessing.Queue, datas):
    if not tool.is_trade_time():
        return
    # 上传数据
    type_ = "set_target_codes"
    request_id = f"sb_{int(time.time() * 1000)}"
    fdata = json.dumps(
        {"type": type_, "data": {"data": datas}, "request_id": request_id, "time": round(time.time() * 1000, 0)})
    if queue_l1_w_strategy_r is not None:
        queue_l1_w_strategy_r.put_nowait(fdata)
    # 记录新增加的代码
    codes = set([x[0] for x in datas])
    add_codes = codes - __latest_subscript_codes
    __latest_subscript_codes.clear()
    for c in codes:
        __latest_subscript_codes.add(c)
    if add_codes:
        hx_logger_l2_market_data.info(f"({request_id})新增加订阅的代码:{add_codes}")
def run(queue_l1_w_strategy_r) -> None:
    logger_system.info("L2进程ID:{}", os.getpid())
    logger_system.info(f"l2_client 线程ID:{tool.get_thread_id()}")
    try:
        # log.close_print()
        # 初始化
        # data_callback_distribute_manager = CodeDataCallbackDistributeManager(data_callbacks)
        # l2_data_upload_manager = L2DataUploadManager(data_callback_distribute_manager)
        __init_l2(None)
    except Exception as e:
        logger_system.exception(e)
    while True:
        if tool.trade_time_sub(tool.get_now_time_str(), "09:25:00") >= 0:
            # 只读竞价数据
            break
        try:
            # (代码,现价,涨幅,量,时间)
            list_ = [market_code_dict[k] for k in market_code_dict]
            flist = []
            plist = []
            for d in list_:
                if d[2] >= constant.L1_MIN_RATE:
                    # 涨幅小于5%的需要删除
                    flist.append(d)
            flist.sort(key=lambda x: x[2], reverse=True)
            datas = flist[:1000]
            # 将持仓股加入进去
            datas.extend(plist)
            print("代码数量:", len(datas))
            __upload_codes_info(queue_l1_w_strategy_r, datas)
        except Exception as e:
            pass
        finally:
            time.sleep(2)
if __name__ == "__main__":
    run(None)
l2/huaxin/huaxin_delegate_postion_manager.py
@@ -163,8 +163,8 @@
        else:
            # TODO 需要矫正是否撤单
            real_place_index_info = real_place_index, RELIABILITY_TYPE_REAL
    elif tool.trade_time_sub(datas[-1]['val']['time'], exec_data['val']['time']) >= estimate_time_space:
        # 下单超过2s
    elif tool.trade_time_sub(datas[-1]['val']['time'], exec_data['val']['time']) >= estimate_time_space and time.time() - order_time >5:
        # 下单超过2s且绝对时间超过5S以上才会估算真实下单位置
        estimate_index = __compute_estimate_order_position(code, exec_data["index"], shadow_price)
        if estimate_index:
            real_place_index_info = estimate_index, RELIABILITY_TYPE_ESTIMATE
l2/l2_data_manager_new.py
@@ -1774,6 +1774,7 @@
                tool.to_time_with_ms(total_datas[end_index]['val']['time'], total_datas[end_index]['val']['tms']))
            trade_price_info = HuaXinSellOrderStatisticManager.get_latest_trade_price_info(code)
            limit_up_price = gpcode_manager.get_limit_up_price(code)
            # p
            is_limit_up = False
            if limit_up_price and trade_price_info and abs(trade_price_info[0] - float(limit_up_price)) < 0.001:
                is_limit_up = True
@@ -1781,13 +1782,13 @@
                # 不是板上放量
                # 判断最近有没有涨停卖数据
                limit_up_sell_count = L2TradeSingleDataProcessor.get_latest_limit_up_sell_order_count(code)
                if limit_up_sell_count == 0 and not single:
                    # 如果没有涨停卖数据而且还没有成交买入信号,就按照原来的总卖额计算
                if (limit_up_sell_count == 0 or active_buy_blocks) and not single:
                    # 如果没有涨停卖数据/激进下单而且还没有成交买入信号,就按照原来的总卖额计算
                    threshold_money, sell_1_price = refer_sell_data[1], refer_sell_data[3][0]
                    for i in range(start_index - 1, -1, -1):
                        val = total_datas[i]["val"]
                        if tool.compare_time(val["time"], refer_sell_data[0]) < 0:
                            # 将本s的计算上去
                            # 读取的L2的总卖额是不包含当前s的数据,所以需要将当前s的数据纳入计算
                            break
                        if L2DataUtil.is_sell(val):
                            threshold_money += val["num"] * int(float(val["price"]) * 100)
main.py
@@ -12,6 +12,7 @@
import huaxin_client.l2_client
import huaxin_client.l1_client
import huaxin_client.l1_client_for_trade
from huaxin_client import l2_market_client
from log_module import log
from log_module.log import logger_l2_trade, logger_system, logger_local_huaxin_l1
@@ -135,6 +136,10 @@
                                                 args=(queue_l1_trade_w_strategy_r, queue_l1_trade_r_strategy_w,))
        l1TradeProcess.start()
        l2MarketProcess = multiprocessing.Process(target=l2_market_client.run,
                                                  args=(queue_l1_w_strategy_r,))
        l2MarketProcess.start()
        # 交易进程
        tradeProcess = multiprocessing.Process(
            target=huaxin_client.trade_client.run,
@@ -154,7 +159,7 @@
        # 主进程
        createTradeServer(pss_strategy, queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r,
                          queue_strategy_w_trade_r_for_read, queue_l1_trade_r_strategy_w,
                          queue_l1_trade_w_strategy_r,(order_ipc_addr, cancel_order_ipc_addr))
                          queue_l1_trade_w_strategy_r, (order_ipc_addr, cancel_order_ipc_addr))
        # 将tradeServer作为主进程
        l1Process.join()