Administrator
2023-11-14 5715545bec1d88fe9cc4ea79db0a5d1148694590
L2数据后置过滤
3个文件已修改
219 ■■■■■ 已修改文件
huaxin_client/l2_client.py 99 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_data_manager.py 69 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_listen_manager.py 51 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_client.py
@@ -65,7 +65,7 @@
    # 买入的大单订单号
    def __init__(self, api, l2_data_upload_manager):
    def __init__(self, api, l2_data_upload_manager: L2DataUploadManager):
        lev2mdapi.CTORATstpLev2MdSpi.__init__(self)
        self.__api = api
        self.is_login = False
@@ -131,6 +131,7 @@
            code = d[0]
            codes.add(code)
            self.codes_volume_and_price_dict[code] = (d[1], d[2])
            self.l2_data_upload_manager.set_order_fileter_condition(code, d[1], d[2])
        add_codes = codes - self.subscripted_codes
        del_codes = self.subscripted_codes - codes
        print("add del codes", add_codes, del_codes)
@@ -183,7 +184,11 @@
    def set_code_special_watch_volume(self, code, volume):
        # 有效期为3s
        self.special_code_volume_for_order_dict[code] = (volume, time.time() + 3)
        # self.special_code_volume_for_order_dict[code] = (volume, time.time() + 3)
        min_volume, limit_up_price = self.codes_volume_and_price_dict.get(code)
        self.l2_data_upload_manager.set_order_fileter_condition(code, min_volume,limit_up_price,
                                                                {volume, constant.SHADOW_ORDER_VOLUME}, time.time() + 3)
        async_log_util.info(logger_local_huaxin_l2_subscript, f"设置下单量监听:{code}-{volume}")
    def OnFrontConnected(self):
@@ -299,15 +304,15 @@
    def OnRtnTransaction(self, pTransaction):
        code = str(pTransaction['SecurityID'])
        min_volume, limit_up_price = self.codes_volume_and_price_dict.get(code)
        # min_volume, limit_up_price = self.codes_volume_and_price_dict.get(code)
        # 输出逐笔成交数据
        if pTransaction['ExecType'] == b"2":
            if min_volume is None:
                # 默认筛选50w
                if pTransaction['TradePrice'] * pTransaction['Volume'] < 500000:
                    return
            elif pTransaction['TradeVolume'] < min_volume:
                return
            # if min_volume is None:
            #     # 默认筛选50w
            #     if pTransaction['TradePrice'] * pTransaction['Volume'] < 500000:
            #         return
            # elif pTransaction['TradeVolume'] < min_volume:
            #     return
            # 撤单
            item = {"SecurityID": pTransaction['SecurityID'], "Price": pTransaction['TradePrice'],
                    "Volume": pTransaction['TradeVolume'],
@@ -327,45 +332,45 @@
                item["Side"] = "2"
            self.l2_data_upload_manager.add_l2_order_detail(item, 0, True)
        else:
            if abs(pTransaction['TradePrice'] - limit_up_price) < 0.201:
                # 涨停价
                # 成交
                item = {"SecurityID": pTransaction['SecurityID'], "TradePrice": pTransaction['TradePrice'],
                        "TradeVolume": pTransaction['TradeVolume'],
                        "OrderTime": pTransaction['TradeTime'], "MainSeq": pTransaction['MainSeq'],
                        "SubSeq": pTransaction['SubSeq'], "BuyNo": pTransaction['BuyNo'],
                        "SellNo": pTransaction['SellNo'],
                        "ExecType": pTransaction['ExecType'].decode()}
                # 暂时注释掉同1单号至多上传1次
                # key = f"{item['SecurityID']}_{item['TradePrice']}_{item['BuyNo']}"
                # if self.__last_transaction_keys_dict.get(code) == key:
                #     return
                # self.__last_transaction_keys_dict[code] = key
                # print("逐笔成交", item)
                self.l2_data_upload_manager.add_transaction_detail(item)
            # if abs(pTransaction['TradePrice'] - limit_up_price) < 0.201:
            # 涨停价
            # 成交
            item = {"SecurityID": pTransaction['SecurityID'], "TradePrice": pTransaction['TradePrice'],
                    "TradeVolume": pTransaction['TradeVolume'],
                    "OrderTime": pTransaction['TradeTime'], "MainSeq": pTransaction['MainSeq'],
                    "SubSeq": pTransaction['SubSeq'], "BuyNo": pTransaction['BuyNo'],
                    "SellNo": pTransaction['SellNo'],
                    "ExecType": pTransaction['ExecType'].decode()}
            # 暂时注释掉同1单号至多上传1次
            # key = f"{item['SecurityID']}_{item['TradePrice']}_{item['BuyNo']}"
            # if self.__last_transaction_keys_dict.get(code) == key:
            #     return
            # self.__last_transaction_keys_dict[code] = key
            # print("逐笔成交", item)
            self.l2_data_upload_manager.add_transaction_detail(item)
    def OnRtnOrderDetail(self, pOrderDetail):
        can_listen = False
        code = str(pOrderDetail['SecurityID'])
        start_time = 0
        if code in self.special_code_volume_for_order_dict:
            start_time = time.time()
            if self.special_code_volume_for_order_dict[code][0] == pOrderDetail[
                'Volume'] or constant.SHADOW_ORDER_VOLUME == pOrderDetail['Volume']:
                # 监控目标订单与影子订单
                if self.special_code_volume_for_order_dict[code][1] > time.time():
                    # 特殊量监听
                    can_listen = True
                else:
                    self.special_code_volume_for_order_dict.pop(code)
        if not can_listen:
            min_volume, limit_up_price = self.codes_volume_and_price_dict.get(code)
            if min_volume is None:
                # 默认筛选50w
                if pOrderDetail['Price'] * pOrderDetail['Volume'] < 500000:
                    return
            elif pOrderDetail['Volume'] < min_volume:
                return
        # can_listen = False
        # code = str(pOrderDetail['SecurityID'])
        # start_time = 0
        # if code in self.special_code_volume_for_order_dict:
        #     start_time = time.time()
        #     if self.special_code_volume_for_order_dict[code][0] == pOrderDetail[
        #         'Volume'] or constant.SHADOW_ORDER_VOLUME == pOrderDetail['Volume']:
        #         # 监控目标订单与影子订单
        #         if self.special_code_volume_for_order_dict[code][1] > time.time():
        #             # 特殊量监听
        #             can_listen = True
        #         else:
        #             self.special_code_volume_for_order_dict.pop(code)
        # if not can_listen:
        #     min_volume, limit_up_price = self.codes_volume_and_price_dict.get(code)
        #     if min_volume is None:
        #         # 默认筛选50w
        #         if pOrderDetail['Price'] * pOrderDetail['Volume'] < 500000:
        #             return
        #     elif pOrderDetail['Volume'] < min_volume:
        #         return
        # 输出逐笔委托数据
        # 上证OrderStatus=b"D"表示撤单
        item = {"SecurityID": pOrderDetail['SecurityID'], "Price": pOrderDetail['Price'],
@@ -374,7 +379,7 @@
                "OrderTime": pOrderDetail['OrderTime'], "MainSeq": pOrderDetail['MainSeq'],
                "SubSeq": pOrderDetail['SubSeq'], "OrderNO": pOrderDetail['OrderNO'],
                "OrderStatus": pOrderDetail['OrderStatus'].decode()}
        self.l2_data_upload_manager.add_l2_order_detail(item, start_time)
        self.l2_data_upload_manager.add_l2_order_detail(item, 0)
    def OnRtnBondMarketData(self, pDepthMarketData, FirstLevelBuyNum, FirstLevelBuyOrderVolumes, FirstLevelSellNum,
                            FirstLevelSellOrderVolumes):
huaxin_client/l2_data_manager.py
@@ -38,19 +38,22 @@
        self.temp_transaction_queue_dict = {}
        self.filter_order_condition_dict = {}
    # TODO 需要调用
    # 设置订单过滤条件
    def set_order_fileter_condition(self, code, min_volume, special_volumes=None, special_volumes_expire_time=None):
    def set_order_fileter_condition(self, code, min_volume, limit_up_price, special_volumes=None,
                                    special_volumes_expire_time=None):
        if special_volumes is None:
            special_volumes = set()
        self.filter_order_condition_dict[code] = [min_volume, special_volumes, special_volumes_expire_time]
        if code in self.filter_order_condition_dict and not special_volumes and not special_volumes_expire_time:
            self.filter_order_condition_dict[code][0] = (min_volume, limit_up_price)
        else:
            self.filter_order_condition_dict[code] = [min_volume, special_volumes, special_volumes_expire_time]
    # 过滤订单
    def __filter_order(self, item):
        filter_condition = self.filter_order_condition_dict.get(item[0])
        if filter_condition:
            # item[2]为量
            if item[2] >= filter_condition[0]:
            if item[2] >= filter_condition[0][0]:
                return item
            if filter_condition[1] and item[2] in filter_condition[1]:
                if filter_condition[2] and time.time() > filter_condition[2]:
@@ -62,26 +65,48 @@
            return None
        return item
        # 过滤订单
    def __filter_transaction(self, item):
        filter_condition = self.filter_order_condition_dict.get(item[0])
        if filter_condition:
            # item[2]为量
            if abs(item[1] - filter_condition[0][1]) < 0.201:
                return item
            return None
        return item
    # 添加委托详情
    def add_l2_order_detail(self, data, start_time, istransaction=False):
    def add_l2_order_detail(self, data, start_time=0, istransaction=False):
        code = data["SecurityID"]
        queue_info = self.order_queue_distribute_manager.get_distributed_queue(code)
        if not queue_info:
            return
        queue_info[1].put_nowait(
            (data['SecurityID'], data['Price'], data['Volume'], data['Side'], data['OrderType'], data['OrderTime'],
             data['MainSeq'], data['SubSeq'], data['OrderNO'], data['OrderStatus'], time.time(), start_time))
        # 不直接加入
        # queue_info = self.order_queue_distribute_manager.get_distributed_queue(code)
        # if not queue_info:
        #     return
        # queue_info[1].put_nowait(
        #     (data['SecurityID'], data['Price'], data['Volume'], data['Side'], data['OrderType'], data['OrderTime'],
        #      data['MainSeq'], data['SubSeq'], data['OrderNO'], data['OrderStatus'], time.time(), start_time))
        q: collections.deque = self.temp_order_queue_dict.get(code)
        q.append((data['SecurityID'], data['Price'], data['Volume'], data['Side'], data['OrderType'], data['OrderTime'],
                  data['MainSeq'], data['SubSeq'], data['OrderNO'], data['OrderStatus'], time.time(), start_time))
    # 添加逐笔成交
    def add_transaction_detail(self, data):
        code = data["SecurityID"]
        queue_info = self.transaction_queue_distribute_manager.get_distributed_queue(code)
        if not queue_info:
            return
        # 判断是否为大单成交
        queue_info[1].put_nowait((data['SecurityID'], data['TradePrice'], data['TradeVolume'],
                                  data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'],
                                  data['SellNo'], data['ExecType']))
        # 不直接加入
        # queue_info = self.transaction_queue_distribute_manager.get_distributed_queue(code)
        # if not queue_info:
        #     return
        # # 判断是否为大单成交
        # queue_info[1].put_nowait((data['SecurityID'], data['TradePrice'], data['TradeVolume'],
        #                           data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'],
        #                           data['SellNo'], data['ExecType']))
        q: collections.deque = self.temp_transaction_queue_dict.get(code)
        q.append((data['SecurityID'], data['TradePrice'], data['TradeVolume'],
                  data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'],
                  data['SellNo'], data['ExecType']))
    def add_market_data(self, data):
        # 加入上传队列
@@ -93,8 +118,8 @@
        self.transaction_queue_distribute_manager.distribute_queue(code)
        self.temp_order_queue_dict[code] = collections.deque()
        self.temp_transaction_queue_dict[code] = collections.deque()
        # threading.Thread(target=lambda: self.__run_upload_order_task(code)).start()
        # threading.Thread(target=lambda: self.__run_upload_transaction_task(code)).start()
        threading.Thread(target=lambda: self.__run_upload_order_task(code)).start()
        threading.Thread(target=lambda: self.__run_upload_transaction_task(code)).start()
    def __upload_l2_data(self, code, _queue, datas):
        _queue.put_nowait((code, datas, time.time()))
@@ -138,7 +163,9 @@
            try:
                if not q:
                    data = q.popleft()
                    temp_list.append(data)
                    data = self.__filter_transaction(data)
                    if data:
                        temp_list.append(data)
                else:
                    if temp_list:
                        # 上传数据
l2/l2_data_listen_manager.py
@@ -28,22 +28,28 @@
        __id = id(q)
        count = 0
        while True:
            datas_dict = {}
            # datas_dict = {}
            try:
                while not q.empty():
                if not q.empty():
                    item = q.get()
                    if item[0] not in datas_dict:
                        datas_dict[item[0]] = []
                    datas_dict[item[0]].append(item)
                if datas_dict:
                    for c in datas_dict:
                        self.my_l2_data_callback.OnL2Order(c, datas_dict[c], datas_dict[c][0][10])
                    self.my_l2_data_callback.OnL2Order(item[0], item[1], item[2])
                else:
                    time.sleep(0.002)
                    time.sleep(0.001)
                # while not q.empty():
                #     item = q.get()
                #     if item[0] not in datas_dict:
                #         datas_dict[item[0]] = []
                #     datas_dict[item[0]].append(item)
                # if datas_dict:
                #     for c in datas_dict:
                #         self.my_l2_data_callback.OnL2Order(c, datas_dict[c], datas_dict[c][0][10])
                # else:
                #     time.sleep(0.002)
            except Exception as e:
                async_log_util.exception(logger_debug, e)
            finally:
                datas_dict.clear()
                # datas_dict.clear()
                count += 1
                if count > 100:
                    count = 0
@@ -53,24 +59,29 @@
    # 接收L2逐笔成交数据
    def __recive_transaction_orders(self, q: multiprocessing.Queue):
        __id = id(q)
        datas_dict = {}
        # datas_dict = {}
        count = 0
        while True:
            try:
                while not q.empty():
                # while not q.empty():
                #     item = q.get()
                #     if item[0] not in datas_dict:
                #         datas_dict[item[0]] = []
                #     datas_dict[item[0]].append(item)
                # if datas_dict:
                #     for c in datas_dict:
                #         self.my_l2_data_callback.OnL2Transaction(c, datas_dict[c])
                # else:
                #     time.sleep(0.01)
                if not q.empty():
                    item = q.get()
                    if item[0] not in datas_dict:
                        datas_dict[item[0]] = []
                    datas_dict[item[0]].append(item)
                if datas_dict:
                    for c in datas_dict:
                        self.my_l2_data_callback.OnL2Transaction(c, datas_dict[c])
                    self.my_l2_data_callback.OnL2Transaction(item[0], item[1])
                else:
                    time.sleep(0.01)
                    time.sleep(0.005)
            except Exception as e:
                async_log_util.exception(logger_debug, e)
            finally:
                datas_dict.clear()
                # datas_dict.clear()
                count += 1
                if count > 50:
                    count = 0