Administrator
2023-11-02 eb33b717023d9871bd74e6dce47a065228cffefc
huaxin_client/l2_client.py
@@ -6,12 +6,15 @@
import queue
import threading
import time
from typing import List
from huaxin_client import command_manager, l2_data_transform_protocol
from huaxin_client import constant
from huaxin_client import l2_data_manager
import lev2mdapi
from huaxin_client.code_queue_distribute_manager import CodeQueueDistributeManager
from huaxin_client.command_manager import L2ActionCallback
from huaxin_client.l2_data_manager import L2DataUploadManager
from log_module import log, async_log_util
from log_module.log import logger_local_huaxin_l2_subscript, logger_system, logger_local_huaxin_l2_transaction, \
    logger_local_huaxin_g_cancel, logger_l2_codes_subscript
@@ -40,7 +43,6 @@
SZ_Securities = [b"002456", b"002849", b"002281", b"002336", b"000936", b"000920", b"000757", b"002896", b"002725",
                 b"000952", b"000526", b"000753", b"000681", b"002088", b"002436"]
SZ_Bond_Securities = [b"100303", b"109559", b"112617"]
spi = None
set_codes_data_queue = queue.Queue()
market_code_dict = {}
@@ -56,10 +58,11 @@
    # 买入的大单订单号
    def __init__(self, api):
    def __init__(self, api, l2_data_upload_manager):
        lev2mdapi.CTORATstpLev2MdSpi.__init__(self)
        self.__api = api
        self.is_login = False
        self.l2_data_upload_manager = l2_data_upload_manager
    def __split_codes(self, codes):
        szse_codes = []
@@ -124,12 +127,16 @@
        add_codes = codes - self.subscripted_codes
        del_codes = self.subscripted_codes - codes
        print("add del codes", add_codes, del_codes)
        for c in codes:
            l2_data_manager.add_target_code(c)
        for c in del_codes:
            l2_data_manager.del_target_code(c)
        for c in add_codes:
            l2_data_manager.run_upload_task(c, l2_data_callback)
        try:
            for c in del_codes:
                self.l2_data_upload_manager.release_distributed_upload_queue(c)
                l2_data_manager.del_target_code(c)
            for c in codes:
                self.l2_data_upload_manager.distribute_upload_queue(c)
                l2_data_manager.add_target_code(c)
        except Exception as e:
            logger_system.error(f"L2代码分配上传队列出错:{str(e)}")
            logger_system.exception(e)
        self.__subscribe(add_codes)
        self.__unsubscribe(del_codes)
@@ -268,8 +275,7 @@
                     (pDepthMarketData['AskPrice5'], pDepthMarketData['AskVolume5'])
                 ]}
            market_code_dict[pDepthMarketData['SecurityID']] = time.time()
            l2_data_manager.add_market_data(d)
            self.l2_data_upload_manager.add_market_data(d)
        except:
            pass
@@ -289,12 +295,6 @@
        min_volume, limit_up_price = self.codes_volume_and_price_dict.get(code)
        # 输出逐笔成交数据
        if pTransaction['ExecType'] == b"2":
            # G撤数据暂时注释
            # transaction_big_order_nos = l2_data_manager.get_latest_transaction_order_nos(code)
            # if transaction_big_order_nos and pTransaction['BuyNo'] in transaction_big_order_nos:
            #     # 正在成交的订单撤单了
            #     l2_data_manager.trading_order_canceled(code, pTransaction['BuyNo'])
            #     async_log_util.info(logger_local_huaxin_g_cancel, f"G撤撤单:{code} - {pTransaction['BuyNo']}")
            if min_volume is None:
                # 默认筛选50w
                if pTransaction['TradePrice'] * pTransaction['Volume'] < 500000:
@@ -320,8 +320,7 @@
                item["Side"] = "2"
            # 深证撤单
            print("逐笔委托", item)
            l2_data_manager.add_l2_order_detail(item, 0, True)
            self.l2_data_upload_manager.add_l2_order_detail(item, 0, True)
        else:
            if abs(pTransaction['TradePrice'] - limit_up_price) < 0.201:
                # 涨停价
@@ -338,7 +337,7 @@
                #     return
                # self.__last_transaction_keys_dict[code] = key
                # print("逐笔成交", item)
                l2_data_manager.add_transaction_detail(item)
                self.l2_data_upload_manager.add_transaction_detail(item)
    def OnRtnOrderDetail(self, pOrderDetail):
        can_listen = False
@@ -346,7 +345,8 @@
        start_time = 0
        if code in self.special_code_volume_for_order_dict:
            start_time = time.time()
            if self.special_code_volume_for_order_dict[code][0] == pOrderDetail['Volume'] or constant.SHADOW_ORDER_VOLUME == pOrderDetail['Volume']:
            if self.special_code_volume_for_order_dict[code][0] == pOrderDetail[
                'Volume'] or constant.SHADOW_ORDER_VOLUME == pOrderDetail['Volume']:
                # 监控目标订单与影子订单
                if self.special_code_volume_for_order_dict[code][1] > time.time():
                    # 特殊量监听
@@ -354,13 +354,6 @@
                else:
                    self.special_code_volume_for_order_dict.pop(code)
        if not can_listen:
            # 暂时注释掉G撤相关数据产生
            # if pOrderDetail['OrderStatus'] == b'D':
            #     transaction_big_order_nos = l2_data_manager.get_latest_transaction_order_nos(code)
            #     if transaction_big_order_nos and pOrderDetail['OrderNO'] in transaction_big_order_nos:
            #         # 正在成交的订单撤单了
            #         l2_data_manager.trading_order_canceled(code, pOrderDetail['OrderNO'])
            #         async_log_util.info(logger_local_huaxin_g_cancel, f"G撤撤单:{code} - {pOrderDetail['OrderNO']}")
            min_volume, limit_up_price = self.codes_volume_and_price_dict.get(code)
            if min_volume is None:
                # 默认筛选50w
@@ -376,7 +369,7 @@
                "OrderTime": pOrderDetail['OrderTime'], "MainSeq": pOrderDetail['MainSeq'],
                "SubSeq": pOrderDetail['SubSeq'], "OrderNO": pOrderDetail['OrderNO'],
                "OrderStatus": pOrderDetail['OrderStatus'].decode()}
        l2_data_manager.add_l2_order_detail(item, start_time)
        self.l2_data_upload_manager.add_l2_order_detail(item, start_time)
    def OnRtnBondMarketData(self, pDepthMarketData, FirstLevelBuyNum, FirstLevelBuyOrderVolumes, FirstLevelSellNum,
                            FirstLevelSellOrderVolumes):
@@ -481,7 +474,7 @@
class MyL2ActionCallback(L2ActionCallback):
    def OnSetL2Position(self, client_id, request_id, codes_data):
    def OnSetL2Position(self, codes_data):
        print("L2订阅数量:", len(codes_data))
        logger_l2_codes_subscript.info("华鑫L2代码处理队列获取到数据:数量-{}", len(codes_data))
        try:
@@ -490,7 +483,7 @@
            logging.exception(e)
def __init_l2():
def __init_l2(l2_data_upload_manager):
    print(lev2mdapi.CTORATstpLev2MdApi_GetApiVersion())
    # case 1: Tcp方式
    # g_SubMode=lev2mdapi.TORA_TSTP_MST_TCP
@@ -502,7 +495,7 @@
    # case 2非缓存模式
    api = lev2mdapi.CTORATstpLev2MdApi_CreateTstpLev2MdApi(g_SubMode, False)
    global spi
    spi = Lev2MdSpi(api)
    spi = Lev2MdSpi(api, l2_data_upload_manager)
    api.RegisterSpi(spi)
    # -------------------正式模式-------------------------------------
    if g_SubMode != lev2mdapi.TORA_TSTP_MST_MCAST:
@@ -522,32 +515,21 @@
    api.Init()
def __receive_from_pipe_trade(queue_trade_w_l2_r: multiprocessing.Queue):
def __receive_from_queue_trade(queue_trade_w_l2_r: multiprocessing.Queue):
    logger_system.info(f"l2_client __receive_from_pipe_trade 线程ID:{tool.get_thread_id()}")
    while True:
        try:
            value = queue_trade_w_l2_r.get()
            if value:
                value = value.decode("utf-8")
                if type(value) == bytes:
                    value = value.decode("utf-8")
                data = json.loads(value)
                if data["type"] == "listen_volume":
                _type = data["type"]
                if _type == "listen_volume":
                    volume = data["data"]["volume"]
                    code = data["data"]["code"]
                    spi.set_code_special_watch_volume(code, volume)
        except Exception as e:
            logging.exception(e)
def __receive_from_pipe_strategy(pipe_):
    logger_system.info(f"l2_client __receive_from_pipe_strategy 线程ID:{tool.get_thread_id()}")
    while True:
        # print("__receive_from_pipe_strategy")
        try:
            val = pipe_.recv()
            if val:
                print("L2客户端接受到数据")
                data = json.loads(val)
                if data["data"]["type"] == "l2_cmd":
                elif _type == "l2_cmd":
                    l2CommandManager.process_command(command_manager.CLIENT_TYPE_CMD_L2, None, data)
        except Exception as e:
            logging.exception(e)
@@ -556,28 +538,24 @@
pipe_strategy = None
def run(queue_trade_w_l2_r: multiprocessing.Queue, _pipe_strategy,
        _l2_data_callback: l2_data_transform_protocol.L2DataCallBack) -> None:
def run(queue_r: multiprocessing.Queue, order_queues: List[multiprocessing.Queue],
        transaction_queues: List[multiprocessing.Queue], market_queue: multiprocessing.Queue) -> None:
    logger_system.info("L2进程ID:{}", os.getpid())
    logger_system.info(f"l2_client 线程ID:{tool.get_thread_id()}")
    try:
        log.close_print()
        if queue_trade_w_l2_r is not None:
            t1 = threading.Thread(target=lambda: __receive_from_pipe_trade(queue_trade_w_l2_r), daemon=True)
        if queue_r is not None:
            t1 = threading.Thread(target=lambda: __receive_from_queue_trade(queue_r), daemon=True)
            t1.start()
        if _pipe_strategy is not None:
            global pipe_strategy
            pipe_strategy = _pipe_strategy
            t1 = threading.Thread(target=__receive_from_pipe_strategy, args=(_pipe_strategy,), daemon=True)
            t1.start()
        __init_l2()
        global l2_data_callback
        l2_data_callback = _l2_data_callback
        l2_data_manager.run_upload_common(l2_data_callback)
        l2_data_manager.run_upload_trading_canceled(l2_data_callback)
        # 初始化
        order_queue_distribute_manager = CodeQueueDistributeManager(order_queues)
        transaction_queue_distribute_manager = CodeQueueDistributeManager(transaction_queues)
        l2_data_upload_manager = L2DataUploadManager(order_queue_distribute_manager,
                                                     transaction_queue_distribute_manager, market_queue)
        __init_l2(l2_data_upload_manager)
        l2_data_manager.run_upload_common()
        l2_data_manager.run_log()
        l2_data_manager.run_upload_daemon(l2_data_callback)
        # l2_data_manager.run_test(l2_data_callback)
        global l2CommandManager
        l2CommandManager = command_manager.L2CommandManager()
        l2CommandManager.init(MyL2ActionCallback())
@@ -589,6 +567,6 @@
if __name__ == "__main__":
    run(None, None, None)
    # run(None, None, None)
    # spi.set_codes_data([("000333", 12000)])
    input()