| | |
| | | import math |
| | | import multiprocessing |
| | | import random |
| | | import threading |
| | | |
| | | import msgpack |
| | | import zmq |
| | | |
| | | from huaxin_client import l2_data_transform_protocol |
| | | from utils import shared_memery_util |
| | | |
| | | process_manager = None |
| | | |
| | | |
| | | class TargetCodeProcessManager: |
| | |
| | | # 代码所在队列ID |
| | | self.__code_queue_dict = {} |
| | | |
| | | def add_codes(self, codes: set): |
| | | def set_codes(self, codes: set): |
| | | """ |
| | | 设置订阅代码 |
| | | @param codes: |
| | | @return: 返回队列与对应分配的代码:[(队列对象, {"代码1","代码2"}),...] |
| | | """ |
| | | add_codes = codes - self.__code_queue_dict.keys() |
| | | del_codes = self.__code_queue_dict.keys() - codes |
| | | # 删除代码 |
| | |
| | | for code in add_codes: |
| | | # 寻找未满的队列 |
| | | for queue_id in self.__queue_codes: |
| | | count_per_process = min(self.__max_code_count_per_queue_dict.get(queue_id), math.ceil(len(codes) / len(self.__com_queues))) |
| | | count_per_process = self.__max_code_count_per_queue_dict.get(queue_id) |
| | | if len(self.__queue_codes[queue_id]) >= count_per_process: |
| | | # 队列已满 |
| | | continue |
| | |
| | | self.__queue_codes[queue_id].add(code) |
| | | self.__code_queue_dict[code] = queue_id |
| | | break |
| | | return [(self.__com_queue_id_object_dict.get(queue_id), self.__queue_codes[queue_id]) for queue_id in |
| | | self.__queue_codes] |
| | | |
| | | def get_queues_with_codes(self): |
| | | """ |
| | |
| | | return results |
| | | |
| | | |
| | | class L2DataListener: |
| | | """ |
| | | L2数据监听 |
| | | """ |
| | | |
| | | def __init__(self, channel_list): |
| | | """ |
| | | |
| | | @param channel_list:channel_list:[((共享内存编号,委托共享内存数组, zmq地址),(共享内存编号,成交共享内存数组, zmq地址))] |
| | | """ |
| | | self.channel_list = channel_list |
| | | # 设置共享内存编号与共享内存数组映射 |
| | | self.shared_memery_num_object_dict = {} |
| | | for channel in self.channel_list: |
| | | self.shared_memery_num_object_dict[channel[0][0]] = channel[0][1] |
| | | self.shared_memery_num_object_dict[channel[1][0]] = channel[1][1] |
| | | |
| | | def create_data_listener(self, l2_data_callback: l2_data_transform_protocol.L2DataCallBack): |
| | | """ |
| | | 创建数据监听器 |
| | | @param |
| | | @return: |
| | | """ |
| | | for channel in self.channel_list: |
| | | channel_delegate = channel[0] |
| | | channel_deal = channel[1] |
| | | threading.Thread(target=self.__create_l2_zmq_server, args=(channel_delegate[2], l2_data_callback,), |
| | | daemon=True).start() |
| | | threading.Thread(target=self.__create_l2_zmq_server, args=(channel_deal[2], l2_data_callback,), |
| | | daemon=True).start() |
| | | |
| | | def __create_l2_zmq_server(self, ipc_addr, l2_data_callback: l2_data_transform_protocol.L2DataCallBack): |
| | | """ |
| | | 创建L2逐笔委托/成交zmq服务 |
| | | @param ipc_addr: |
| | | @return: |
| | | """ |
| | | context = zmq.Context() |
| | | socket = context.socket(zmq.REP) |
| | | socket.bind(ipc_addr) |
| | | while True: |
| | | data = socket.recv() |
| | | try: |
| | | #接收数据 |
| | | data = msgpack.unpackb(data) |
| | | shared_memery_id = data["data"]["memery_number"] |
| | | datas = shared_memery_util.read_datas(self.shared_memery_num_object_dict.get(shared_memery_id)) |
| | | if data["type"] == 1: |
| | | # 委托 |
| | | code, data_list, timestamp = datas[0], datas[1], datas[2] |
| | | l2_data_callback.OnL2Order(code, data_list, timestamp) |
| | | elif data["type"] == 2: |
| | | # 成交 |
| | | code, data_list = datas[0], datas[1] |
| | | l2_data_callback.OnL2Transaction(code, data_list) |
| | | except Exception as e: |
| | | pass |
| | | finally: |
| | | socket.send_string("SUCCESS") |
| | | |
| | | |
| | | if __name__ == "__main__": |
| | | queues = [multiprocessing.Queue() for i in range(7)] |
| | | manager = TargetCodeProcessManager(queues, 10) |