| | |
| | | self.temp_transaction_queue_dict = {} |
| | | self.filter_order_condition_dict = {} |
| | | |
| | | # TODO 需要调用 |
| | | # 设置订单过滤条件 |
| | | def set_order_fileter_condition(self, code, min_volume, special_volumes=None, special_volumes_expire_time=None): |
| | | def set_order_fileter_condition(self, code, min_volume, limit_up_price, special_volumes=None, |
| | | special_volumes_expire_time=None): |
| | | if special_volumes is None: |
| | | special_volumes = set() |
| | | self.filter_order_condition_dict[code] = [min_volume, special_volumes, special_volumes_expire_time] |
| | | if code in self.filter_order_condition_dict and not special_volumes and not special_volumes_expire_time: |
| | | self.filter_order_condition_dict[code][0] = (min_volume, limit_up_price) |
| | | else: |
| | | self.filter_order_condition_dict[code] = [min_volume, special_volumes, special_volumes_expire_time] |
| | | |
| | | # 过滤订单 |
| | | def __filter_order(self, item): |
| | | filter_condition = self.filter_order_condition_dict.get(item[0]) |
| | | if filter_condition: |
| | | # item[2]为量 |
| | | if item[2] >= filter_condition[0]: |
| | | if item[2] >= filter_condition[0][0]: |
| | | return item |
| | | if filter_condition[1] and item[2] in filter_condition[1]: |
| | | if filter_condition[2] and time.time() > filter_condition[2]: |
| | |
| | | return None |
| | | return item |
| | | |
| | | # 过滤订单 |
| | | |
| | | def __filter_transaction(self, item): |
| | | filter_condition = self.filter_order_condition_dict.get(item[0]) |
| | | if filter_condition: |
| | | # item[2]为量 |
| | | if abs(item[1] - filter_condition[0][1]) < 0.201: |
| | | return item |
| | | return None |
| | | return item |
| | | |
| | | # 添加委托详情 |
| | | def add_l2_order_detail(self, data, start_time, istransaction=False): |
| | | def add_l2_order_detail(self, data, start_time=0, istransaction=False): |
| | | code = data["SecurityID"] |
| | | queue_info = self.order_queue_distribute_manager.get_distributed_queue(code) |
| | | if not queue_info: |
| | | return |
| | | queue_info[1].put_nowait( |
| | | (data['SecurityID'], data['Price'], data['Volume'], data['Side'], data['OrderType'], data['OrderTime'], |
| | | data['MainSeq'], data['SubSeq'], data['OrderNO'], data['OrderStatus'], time.time(), start_time)) |
| | | # 不直接加入 |
| | | # queue_info = self.order_queue_distribute_manager.get_distributed_queue(code) |
| | | # if not queue_info: |
| | | # return |
| | | # queue_info[1].put_nowait( |
| | | # (data['SecurityID'], data['Price'], data['Volume'], data['Side'], data['OrderType'], data['OrderTime'], |
| | | # data['MainSeq'], data['SubSeq'], data['OrderNO'], data['OrderStatus'], time.time(), start_time)) |
| | | |
| | | q: collections.deque = self.temp_order_queue_dict.get(code) |
| | | q.append((data['SecurityID'], data['Price'], data['Volume'], data['Side'], data['OrderType'], data['OrderTime'], |
| | | data['MainSeq'], data['SubSeq'], data['OrderNO'], data['OrderStatus'], time.time(), start_time)) |
| | | |
| | | # 添加逐笔成交 |
| | | def add_transaction_detail(self, data): |
| | | code = data["SecurityID"] |
| | | queue_info = self.transaction_queue_distribute_manager.get_distributed_queue(code) |
| | | if not queue_info: |
| | | return |
| | | # 判断是否为大单成交 |
| | | queue_info[1].put_nowait((data['SecurityID'], data['TradePrice'], data['TradeVolume'], |
| | | data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'], |
| | | data['SellNo'], data['ExecType'])) |
| | | # 不直接加入 |
| | | # queue_info = self.transaction_queue_distribute_manager.get_distributed_queue(code) |
| | | # if not queue_info: |
| | | # return |
| | | # # 判断是否为大单成交 |
| | | # queue_info[1].put_nowait((data['SecurityID'], data['TradePrice'], data['TradeVolume'], |
| | | # data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'], |
| | | # data['SellNo'], data['ExecType'])) |
| | | |
| | | q: collections.deque = self.temp_transaction_queue_dict.get(code) |
| | | q.append((data['SecurityID'], data['TradePrice'], data['TradeVolume'], |
| | | data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'], |
| | | data['SellNo'], data['ExecType'])) |
| | | |
| | | def add_market_data(self, data): |
| | | # 加入上传队列 |
| | |
| | | self.transaction_queue_distribute_manager.distribute_queue(code) |
| | | self.temp_order_queue_dict[code] = collections.deque() |
| | | self.temp_transaction_queue_dict[code] = collections.deque() |
| | | # threading.Thread(target=lambda: self.__run_upload_order_task(code)).start() |
| | | # threading.Thread(target=lambda: self.__run_upload_transaction_task(code)).start() |
| | | threading.Thread(target=lambda: self.__run_upload_order_task(code)).start() |
| | | threading.Thread(target=lambda: self.__run_upload_transaction_task(code)).start() |
| | | |
| | | def __upload_l2_data(self, code, _queue, datas): |
| | | _queue.put_nowait((code, datas, time.time())) |
| | |
| | | try: |
| | | if not q: |
| | | data = q.popleft() |
| | | temp_list.append(data) |
| | | data = self.__filter_transaction(data) |
| | | if data: |
| | | temp_list.append(data) |
| | | else: |
| | | if temp_list: |
| | | # 上传数据 |