Administrator
2023-11-14 5715545bec1d88fe9cc4ea79db0a5d1148694590
huaxin_client/l2_data_manager.py
@@ -38,19 +38,22 @@
        self.temp_transaction_queue_dict = {}
        self.filter_order_condition_dict = {}
    # TODO 需要调用
    # 设置订单过滤条件
    def set_order_fileter_condition(self, code, min_volume, special_volumes=None, special_volumes_expire_time=None):
    def set_order_fileter_condition(self, code, min_volume, limit_up_price, special_volumes=None,
                                    special_volumes_expire_time=None):
        if special_volumes is None:
            special_volumes = set()
        self.filter_order_condition_dict[code] = [min_volume, special_volumes, special_volumes_expire_time]
        if code in self.filter_order_condition_dict and not special_volumes and not special_volumes_expire_time:
            self.filter_order_condition_dict[code][0] = (min_volume, limit_up_price)
        else:
            self.filter_order_condition_dict[code] = [min_volume, special_volumes, special_volumes_expire_time]
    # 过滤订单
    def __filter_order(self, item):
        filter_condition = self.filter_order_condition_dict.get(item[0])
        if filter_condition:
            # item[2]为量
            if item[2] >= filter_condition[0]:
            if item[2] >= filter_condition[0][0]:
                return item
            if filter_condition[1] and item[2] in filter_condition[1]:
                if filter_condition[2] and time.time() > filter_condition[2]:
@@ -62,26 +65,48 @@
            return None
        return item
        # 过滤订单
    def __filter_transaction(self, item):
        filter_condition = self.filter_order_condition_dict.get(item[0])
        if filter_condition:
            # item[2]为量
            if abs(item[1] - filter_condition[0][1]) < 0.201:
                return item
            return None
        return item
    # 添加委托详情
    def add_l2_order_detail(self, data, start_time, istransaction=False):
    def add_l2_order_detail(self, data, start_time=0, istransaction=False):
        code = data["SecurityID"]
        queue_info = self.order_queue_distribute_manager.get_distributed_queue(code)
        if not queue_info:
            return
        queue_info[1].put_nowait(
            (data['SecurityID'], data['Price'], data['Volume'], data['Side'], data['OrderType'], data['OrderTime'],
             data['MainSeq'], data['SubSeq'], data['OrderNO'], data['OrderStatus'], time.time(), start_time))
        # 不直接加入
        # queue_info = self.order_queue_distribute_manager.get_distributed_queue(code)
        # if not queue_info:
        #     return
        # queue_info[1].put_nowait(
        #     (data['SecurityID'], data['Price'], data['Volume'], data['Side'], data['OrderType'], data['OrderTime'],
        #      data['MainSeq'], data['SubSeq'], data['OrderNO'], data['OrderStatus'], time.time(), start_time))
        q: collections.deque = self.temp_order_queue_dict.get(code)
        q.append((data['SecurityID'], data['Price'], data['Volume'], data['Side'], data['OrderType'], data['OrderTime'],
                  data['MainSeq'], data['SubSeq'], data['OrderNO'], data['OrderStatus'], time.time(), start_time))
    # 添加逐笔成交
    def add_transaction_detail(self, data):
        code = data["SecurityID"]
        queue_info = self.transaction_queue_distribute_manager.get_distributed_queue(code)
        if not queue_info:
            return
        # 判断是否为大单成交
        queue_info[1].put_nowait((data['SecurityID'], data['TradePrice'], data['TradeVolume'],
                                  data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'],
                                  data['SellNo'], data['ExecType']))
        # 不直接加入
        # queue_info = self.transaction_queue_distribute_manager.get_distributed_queue(code)
        # if not queue_info:
        #     return
        # # 判断是否为大单成交
        # queue_info[1].put_nowait((data['SecurityID'], data['TradePrice'], data['TradeVolume'],
        #                           data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'],
        #                           data['SellNo'], data['ExecType']))
        q: collections.deque = self.temp_transaction_queue_dict.get(code)
        q.append((data['SecurityID'], data['TradePrice'], data['TradeVolume'],
                  data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'],
                  data['SellNo'], data['ExecType']))
    def add_market_data(self, data):
        # 加入上传队列
@@ -93,8 +118,8 @@
        self.transaction_queue_distribute_manager.distribute_queue(code)
        self.temp_order_queue_dict[code] = collections.deque()
        self.temp_transaction_queue_dict[code] = collections.deque()
        # threading.Thread(target=lambda: self.__run_upload_order_task(code)).start()
        # threading.Thread(target=lambda: self.__run_upload_transaction_task(code)).start()
        threading.Thread(target=lambda: self.__run_upload_order_task(code)).start()
        threading.Thread(target=lambda: self.__run_upload_transaction_task(code)).start()
    def __upload_l2_data(self, code, _queue, datas):
        _queue.put_nowait((code, datas, time.time()))
@@ -138,7 +163,9 @@
            try:
                if not q:
                    data = q.popleft()
                    temp_list.append(data)
                    data = self.__filter_transaction(data)
                    if data:
                        temp_list.append(data)
                else:
                    if temp_list:
                        # 上传数据