| | |
| | | from huaxin_client.code_queue_distribute_manager import CodeQueueDistributeManager |
| | | from log_module import async_log_util |
| | | from log_module.async_log_util import huaxin_l2_log |
| | | from log_module.log import logger_local_huaxin_l2_error, logger_system, logger_local_huaxin_l2_subscript |
| | | from log_module.log import logger_local_huaxin_l2_error, logger_system, logger_local_huaxin_l2_subscript, \ |
| | | logger_local_huaxin_l2_special_volume |
| | | from utils import tool |
| | | import collections |
| | | import zmq |
| | |
| | | if item[2] >= filter_condition[0][0]: |
| | | return item |
| | | # 1手的买单满足价格 |
| | | if item[2] == 100: #and abs(filter_condition[0][2] - item[1]) < 0.001: |
| | | if item[2] == 100: # and abs(filter_condition[0][2] - item[1]) < 0.001: |
| | | return item |
| | | # 买量 |
| | | if item[2] == filter_condition[0][3]: |
| | |
| | | # queue_info[1].put_nowait( |
| | | # (data['SecurityID'], data['Price'], data['Volume'], data['Side'], data['OrderType'], data['OrderTime'], |
| | | # data['MainSeq'], data['SubSeq'], data['OrderNO'], data['OrderStatus'], time.time(), start_time)) |
| | | if data['Volume'] == 100: |
| | | log_queue = self.temp_log_queue_dict.get(code) |
| | | if log_queue: |
| | | log_queue.put_nowait(data) |
| | | |
| | | q: collections.deque = self.temp_order_queue_dict.get(code) |
| | | q.append((data['SecurityID'], data['Price'], data['Volume'], data['Side'], data['OrderType'], data['OrderTime'], |
| | |
| | | self.order_queue_distribute_manager.distribute_queue(code) |
| | | if not self.transaction_queue_distribute_manager.get_distributed_queue(code): |
| | | self.transaction_queue_distribute_manager.distribute_queue(code) |
| | | |
| | | if code not in self.temp_order_queue_dict: |
| | | self.temp_order_queue_dict[code] = collections.deque() |
| | | if code not in self.temp_transaction_queue_dict: |
| | | self.temp_transaction_queue_dict[code] = collections.deque() |
| | | if code not in self.temp_log_queue_dict: |
| | | self.temp_log_queue_dict[code] = queue.Queue() |
| | | # 分配订单上传协议 |
| | | self.l2_order_upload_protocol.distribute_upload_host(code) |
| | | |
| | |
| | | t1.start() |
| | | t2 = threading.Thread(target=lambda: self.__run_upload_transaction_task(code), daemon=True) |
| | | t2.start() |
| | | self.upload_l2_data_task_dict[code] = (t1, t2) |
| | | t3 = threading.Thread(target=lambda: self.__run_log_task(code), daemon=True) |
| | | t3.start() |
| | | self.upload_l2_data_task_dict[code] = (t1, t2, t3) |
| | | # 释放已经分配的队列 |
| | | |
| | | def release_distributed_upload_queue(self, code): |
| | |
| | | if code in self.temp_transaction_queue_dict: |
| | | self.temp_transaction_queue_dict[code].clear() |
| | | self.temp_transaction_queue_dict.pop(code) |
| | | if code in self.temp_log_queue_dict: |
| | | self.temp_log_queue_dict.pop(code) |
| | | |
| | | if code in self.upload_l2_data_task_dict: |
| | | self.upload_l2_data_task_dict.pop(code) |
| | | |
| | | |
| | | def __upload_l2_data(self, code, _queue, datas): |
| | | _queue.put_nowait((code, datas, time.time())) |
| | |
| | | pass |
| | | finally: |
| | | pass |
| | | |
| | | def __run_log_task(self, code): |
| | | q: queue.Queue = self.temp_log_queue_dict.get(code) |
| | | while True: |
| | | try: |
| | | temp = q.get(timeout=10) |
| | | huaxin_l2_log.info(logger_local_huaxin_l2_special_volume, |
| | | f"{temp}") |
| | | except: |
| | | time.sleep(0.02) |
| | | finally: |
| | | if code not in self.temp_log_queue_dict: |
| | | break |
| | | |
| | | |
| | | class L2DataUploadProtocolManager: |
| | |
| | | def __test(): |
| | | # 分配数据 |
| | | pass |
| | | |
| | | |
| | | |
| | | def run_test(): |