| | |
| | | |
| | | # 活动时间 |
| | | from huaxin_client.code_queue_distribute_manager import CodeQueueDistributeManager |
| | | from log_module import async_log_util |
| | | from log_module import async_log_util |
| | | from log_module.log import logger_local_huaxin_l2_error, logger_system |
| | | from utils import tool |
| | | import collections |
| | | |
| | | order_detail_upload_active_time_dict = {} |
| | | transaction_upload_active_time_dict = {} |
| | |
| | | self.order_queue_distribute_manager = order_queue_distribute_manager |
| | | self.transaction_queue_distribute_manager = transaction_queue_distribute_manager |
| | | self.market_data_queue = market_data_queue |
| | | self.temp_order_queue_dict = {} |
| | | self.temp_transaction_queue_dict = {} |
| | | |
| | | # 添加委托详情 |
| | | def add_l2_order_detail(self, data, start_time, istransaction=False): |
| | |
| | | def distribute_upload_queue(self, code): |
| | | self.order_queue_distribute_manager.distribute_queue(code) |
| | | self.transaction_queue_distribute_manager.distribute_queue(code) |
| | | self.temp_order_queue_dict[code] = collections.deque() |
| | | self.temp_transaction_queue_dict[code] = collections.deque() |
| | | # threading.Thread(target=lambda: self.__run_upload_order_task(code)).start() |
| | | # threading.Thread(target=lambda: self.__run_upload_transaction_task(code)).start() |
| | | |
| | | def __upload_l2_data(self, code, _queue, datas): |
| | | _queue.put_nowait((code, datas, time.time())) |
| | | |
| | | # 处理订单数据并上传 |
| | | def __run_upload_order_task(self, code): |
| | | q: collections.deque = self.temp_order_queue_dict.get(code) |
| | | temp_list = [] |
| | | queue_info = self.order_queue_distribute_manager.get_distributed_queue(code) |
| | | upload_queue = queue_info[1] |
| | | while True: |
| | | try: |
| | | if not q: |
| | | data = q.popleft() |
| | | temp_list.append(data) |
| | | else: |
| | | if temp_list: |
| | | # 上传数据 |
| | | self.__upload_l2_data(code, upload_queue, temp_list) |
| | | temp_list.clear() |
| | | else: |
| | | if code not in self.temp_order_queue_dict: |
| | | break |
| | | time.sleep(0.001) |
| | | |
| | | except: |
| | | pass |
| | | finally: |
| | | pass |
| | | |
| | | # 处理成交数据并上传 |
| | | def __run_upload_transaction_task(self, code): |
| | | q: collections.deque = self.temp_transaction_queue_dict.get(code) |
| | | queue_info = self.transaction_queue_distribute_manager.get_distributed_queue(code) |
| | | upload_queue = queue_info[1] |
| | | temp_list = [] |
| | | while True: |
| | | try: |
| | | if not q: |
| | | data = q.popleft() |
| | | temp_list.append(data) |
| | | else: |
| | | if temp_list: |
| | | # 上传数据 |
| | | self.__upload_l2_data(code, upload_queue, temp_list) |
| | | temp_list.clear() |
| | | else: |
| | | if code not in self.temp_transaction_queue_dict: |
| | | break |
| | | time.sleep(0.002) |
| | | except: |
| | | pass |
| | | finally: |
| | | pass |
| | | |
| | | # 释放已经分配的队列 |
| | | def release_distributed_upload_queue(self, code): |
| | | self.order_queue_distribute_manager.release_distribute_queue(code) |
| | | self.transaction_queue_distribute_manager.release_distribute_queue(code) |
| | | if code in self.temp_order_queue_dict: |
| | | self.temp_order_queue_dict[code].clear() |
| | | self.temp_order_queue_dict.pop(code) |
| | | if code in self.temp_transaction_queue_dict: |
| | | self.temp_transaction_queue_dict[code].clear() |
| | | self.temp_transaction_queue_dict.pop(code) |
| | | |
| | | |
| | | def add_target_code(code): |
| | |
| | | if code in target_codes_add_time: |
| | | target_codes_add_time.pop(code) |
| | | |
| | | |
| | | def add_subscript_codes(codes): |
| | | print("add_subscript_codes", codes) |
| | | # 加入上传队列 |