| | |
| | | self.upload_l2_data_task_dict = {} |
| | | self.l2_order_codes = set() |
| | | self.l2_transaction_codes = set() |
| | | self.__real_time_buy1_data = {} |
| | | |
| | | # 过滤订单 |
| | | def __filter_order(self, item): |
| | |
| | | # 添加委托详情 |
| | | def add_l2_order_detail(self, data, start_time=0, istransaction=False): |
| | | code = data["SecurityID"] |
| | | if code in self.__real_time_buy1_data: |
| | | if self.__real_time_buy1_data[code][1] == data["Price"]: |
| | | # 与买的价格一致 |
| | | if data["Side"] == '1': |
| | | if data["OrderStatus"] == 'D': |
| | | # 买撤 |
| | | self.__real_time_buy1_data[code][3] -= data["Volume"] |
| | | else: |
| | | # 买 |
| | | self.__real_time_buy1_data[code][3] += data["Volume"] |
| | | q: collections.deque = self.temp_order_queue_dict.get(code) |
| | | q.append((data['SecurityID'], data['Price'], data['Volume'], data['Side'], data['OrderType'], data['OrderTime'], |
| | | data['MainSeq'], data['SubSeq'], data['OrderNO'], data['OrderStatus'], time.time(), start_time)) |
| | |
| | | # 添加逐笔成交 |
| | | def add_transaction_detail(self, data): |
| | | code = data["SecurityID"] |
| | | if code in self.__real_time_buy1_data: |
| | | if self.__real_time_buy1_data[code][1] == data["TradePrice"]: |
| | | # 与买的价格一致 |
| | | self.__real_time_buy1_data[code][3] -= data["TradeVolume"] |
| | | q: collections.deque = self.temp_transaction_queue_dict.get(code) |
| | | q.append((data['SecurityID'], data['TradePrice'], data['TradeVolume'], |
| | | data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'], |
| | |
| | | |
| | | def add_market_data(self, data): |
| | | code = data["securityID"] |
| | | # [时间,买1价格,原始买1量,计算后的买1量]} |
| | | self.__real_time_buy1_data[code] = [data["dataTimeStamp"], data["buy"][0][0], data["buy"][0][1], |
| | | data["buy"][0][1]] |
| | | |
| | | self.data_callback_distribute_manager.get_distributed_callback(code).OnMarketData(code, [data]) |
| | | |
| | | # 分配上传队列 |
| | |
| | | t1.start() |
| | | t2 = threading.Thread(target=lambda: self.__run_upload_transaction_task(code), daemon=True) |
| | | t2.start() |
| | | self.upload_l2_data_task_dict[code] = (t1, t2) |
| | | t3 = threading.Thread(target=lambda: self.__run_upload_real_time_buy1_task(code), daemon=True) |
| | | t3.start() |
| | | self.upload_l2_data_task_dict[code] = (t1, t2, t3) |
| | | # 释放已经分配的队列 |
| | | |
| | | def release_distributed_upload_queue(self, code): |
| | |
| | | finally: |
| | | pass |
| | | |
| | | # 处理实时买1数据 |
| | | def __run_upload_real_time_buy1_task(self, code): |
| | | while True: |
| | | try: |
| | | if code in self.__real_time_buy1_data: |
| | | data = self.__real_time_buy1_data[code] |
| | | # 如果最新的买1是原来买1的1/2时开始上传 |
| | | if data[2] > 0 and data[3] / data[2] <= 0.5: |
| | | self.data_callback_distribute_manager.get_distributed_callback(code).OnRealTimeBuy1Info(code, |
| | | data) |
| | | except: |
| | | pass |
| | | finally: |
| | | time.sleep(0.1) |
| | | |
| | | |
| | | def add_target_code(code): |
| | | target_codes.add(code) |