| | |
| | | import json |
| | | import logging |
| | | import marshal |
| | | import multiprocessing |
| | | import queue |
| | | import threading |
| | | import time |
| | |
| | | |
| | | # L2上传数据管理器 |
| | | class L2DataUploadManager: |
| | | """ |
| | | L2逐笔委托/L2逐笔成交:通过共享内存+ZMQ上传数据 |
| | | L2市场行情: 通过普通数据上传队列写入 |
| | | L2订阅的代码: 通过普通数据上传队列写入 |
| | | """ |
| | | |
| | | TYPE_DELEGATE = 1 |
| | | TYPE_TRANSACTION = 2 |
| | | TYPE_MARKET = 3 |
| | | |
| | | def __init__(self, data_channel_distribute_manager: CodeDataChannelDistributeManager): |
| | | def __init__(self, data_channel_distribute_manager: CodeDataChannelDistributeManager, |
| | | common_data_upload_queue: multiprocessing.Queue): |
| | | self.data_channel_distribute_manager = data_channel_distribute_manager |
| | | self.common_data_upload_queue = common_data_upload_queue |
| | | # 代码分配的对象 |
| | | self.temp_order_queue_dict = {} |
| | | self.temp_transaction_queue_dict = {} |
| | |
| | | data['SellNo'], data['ExecType'], time.time())) |
| | | |
| | | def add_market_data(self, data): |
| | | # 加入上传队列 |
| | | # self.market_data_queue.put_nowait(data) |
| | | code = data['securityID'] |
| | | # TODO 改为zmq发送 |
| | | callback = self.data_channel_distribute_manager.get_distributed_channel(code) |
| | | if callback: |
| | | callback.OnMarketData(code, data) |
| | | # 改为队列回调发送 |
| | | self.common_data_upload_queue.put_nowait({"type": "l2_market", "data": (code, data)}) |
| | | |
| | | # 分配上传队列 |
| | | def distribute_upload_queue(self, code, _target_codes=None): |
| | |
| | | |
| | | if code in self.upload_l2_data_task_dict: |
| | | self.upload_l2_data_task_dict.pop(code) |
| | | |
| | | def __upload_l2_data(self, code, _queue, datas): |
| | | _queue.put_nowait(marshal.dumps([code, datas, time.time()])) |
| | | |
| | | # 处理订单数据并上传 |
| | | def __run_upload_order_task(self, code): |
| | |
| | | (code, temp_list)) |
| | | # 通知获取数据 |
| | | _socket = self.data_channel_distribute_manager.get_zmq_socket(zmq_host) |
| | | _socket.send(msgpack.packb({"type": self.TYPE_TRANSACTION, "data": {"memery_number": shared_memery_number}})) |
| | | _socket.send( |
| | | msgpack.packb({"type": self.TYPE_TRANSACTION, "data": {"memery_number": shared_memery_number}})) |
| | | _socket.recv_string() |
| | | temp_list = [] |
| | | else: |