| | |
| | | d = self.codes_volume_and_price_dict.get(code) |
| | | if d: |
| | | min_volume, limit_up_price, special_price, buy_volume = d[0], d[1], d[2], d[3] |
| | | self.l2_data_upload_manager.set_order_fileter_condition(code, min_volume, limit_up_price, special_price,buy_volume, |
| | | self.l2_data_upload_manager.set_order_fileter_condition(code, min_volume, limit_up_price, special_price, |
| | | buy_volume, |
| | | {volume, constant.SHADOW_ORDER_VOLUME}, |
| | | time.time() + 3) |
| | | huaxin_l2_log.info(logger_local_huaxin_l2_subscript, f"设置下单量监听:{code}-{volume}") |
| | |
| | | pipe_strategy = None |
| | | |
| | | |
| | | def test_add_codes(queue_r): |
| | | time.sleep(5) |
| | | # if value: |
| | | # if type(value) == bytes: |
| | | # value = value.decode("utf-8") |
| | | # data = json.loads(value) |
| | | # _type = data["type"] |
| | | # if _type == "listen_volume": |
| | | # volume = data["data"]["volume"] |
| | | # code = data["data"]["code"] |
| | | # spi.set_code_special_watch_volume(code, volume) |
| | | # elif _type == "l2_cmd": |
| | | # l2CommandManager.process_command(command_manager.CLIENT_TYPE_CMD_L2, None, data) |
| | | |
| | | demo_datas = [("603002", int(50 * 10000 / 6.35), 6.35, 6.00, 200), |
| | | ("002654", int(50 * 10000 / 15.59), 15.59, 15.3, 200), |
| | | ("603701", int(50 * 10000 / 14.28), 14.28, 14.00, 200), |
| | | ("002908", int(50 * 10000 / 12.78), 12.78, 12.00, 200)] |
| | | |
| | | queue_r.put_nowait(json.dumps({"type": "l2_cmd", "data": [demo_datas[0]]})) |
| | | time.sleep(1) |
| | | |
| | | spi.l2_data_upload_manager.add_l2_order_detail( |
| | | {'SecurityID': '603002', 'Price': 6.35, 'Volume': 275000, 'Side': "1", 'OrderType': '0', |
| | | 'OrderTime': '13000015', |
| | | 'MainSeq': 2, 'SubSeq': 6739147, 'OrderNO': 5512466, 'OrderStatus': 'D'}, 0) |
| | | spi.l2_data_upload_manager.add_l2_order_detail( |
| | | {'SecurityID': '603002', 'Price': 6.35, 'Volume': 200, 'Side': "1", 'OrderType': '0', |
| | | 'OrderTime': '13000015', |
| | | 'MainSeq': 2, 'SubSeq': 6739148, 'OrderNO': 5512467, 'OrderStatus': 'D'}, 0) |
| | | queue_r.put_nowait(json.dumps({"type": "listen_volume", "data": {"code": "603002", "volume": 100}})) |
| | | time.sleep(0.1) |
| | | spi.l2_data_upload_manager.add_l2_order_detail( |
| | | {'SecurityID': '603002', 'Price': 6.35, 'Volume': 100, 'Side': "1", 'OrderType': '0', |
| | | 'OrderTime': '13000015', |
| | | 'MainSeq': 2, 'SubSeq': 6739148, 'OrderNO': 5512467, 'OrderStatus': 'D'}, 0) |
| | | |
| | | |
| | | def run(queue_r: multiprocessing.Queue, order_queues: List[multiprocessing.Queue], |
| | | transaction_queues: List[multiprocessing.Queue], market_queue: multiprocessing.Queue) -> None: |
| | | transaction_queues: List[multiprocessing.Queue], market_queue: multiprocessing.Queue, |
| | | order_ipc_hosts: list) -> None: |
| | | # def test_add_codes(): |
| | | # time.sleep(5) |
| | | # # if value: |
| | |
| | | order_queue_distribute_manager = CodeQueueDistributeManager(order_queues) |
| | | transaction_queue_distribute_manager = CodeQueueDistributeManager(transaction_queues) |
| | | l2_data_upload_manager = L2DataUploadManager(order_queue_distribute_manager, |
| | | transaction_queue_distribute_manager, market_queue) |
| | | transaction_queue_distribute_manager, market_queue, |
| | | order_ipc_hosts) |
| | | __init_l2(l2_data_upload_manager) |
| | | l2_data_manager.run_upload_common() |
| | | l2_data_manager.run_log() |
| | | # 测试 |
| | | # threading.Thread(target=lambda: test_add_codes(),daemon=True).start() |
| | | threading.Thread(target=lambda: test_add_codes(queue_r), daemon=True).start() |
| | | global l2CommandManager |
| | | l2CommandManager = command_manager.L2CommandManager() |
| | | l2CommandManager.init(MyL2ActionCallback()) |
| | |
| | | |
| | | # L2上传数据管理器 |
| | | class L2DataUploadManager: |
| | | # order_ipc_hosts:远程host |
| | | def __init__(self, order_queue_distribute_manager: CodeQueueDistributeManager, |
| | | transaction_queue_distribute_manager: CodeQueueDistributeManager, |
| | | market_data_queue: multiprocessing.Queue): |
| | | market_data_queue: multiprocessing.Queue, order_ipc_hosts): |
| | | self.order_queue_distribute_manager = order_queue_distribute_manager |
| | | self.transaction_queue_distribute_manager = transaction_queue_distribute_manager |
| | | self.market_data_queue = market_data_queue |
| | |
| | | self.upload_l2_data_task_dict = {} |
| | | self.l2_order_codes = set() |
| | | self.l2_transaction_codes = set() |
| | | # 订单 |
| | | self.l2_order_upload_protocol = L2DataUploadProtocolManager(order_ipc_hosts) |
| | | |
| | | # 设置订单过滤条件 |
| | | # special_price:过滤的1手的价格 |
| | |
| | | self.temp_order_queue_dict[code] = collections.deque() |
| | | if code not in self.temp_transaction_queue_dict: |
| | | self.temp_transaction_queue_dict[code] = collections.deque() |
| | | # 分配订单上传协议 |
| | | self.l2_order_upload_protocol.distribute_upload_host(code) |
| | | |
| | | if code not in self.upload_l2_data_task_dict: |
| | | t1 = threading.Thread(target=lambda: self.__run_upload_order_task(code), daemon=True) |
| | |
| | | self.temp_transaction_queue_dict.pop(code) |
| | | if code in self.upload_l2_data_task_dict: |
| | | self.upload_l2_data_task_dict.pop(code) |
| | | self.l2_order_upload_protocol.release_distributed_upload_host(code) |
| | | |
| | | def __upload_l2_data(self, code, _queue, datas): |
| | | _queue.put_nowait((code, datas, time.time())) |
| | | |
| | | def __upload_l2_order_data(self, code, datas): |
| | | self.l2_order_upload_protocol.upload_data_as_json(code, (code, datas, time.time())) |
| | | |
| | | # 处理订单数据并上传 |
| | | def __run_upload_order_task(self, code): |
| | |
| | | |
| | | if temp_list: |
| | | # 上传数据 |
| | | self.__upload_l2_data(code, upload_queue, temp_list) |
| | | # self.__upload_l2_data(code, upload_queue, temp_list) |
| | | self.__upload_l2_order_data(code, temp_list) |
| | | temp_list = [] |
| | | else: |
| | | if code not in self.temp_order_queue_dict: |
| | |
| | | pass |
| | | |
| | | |
| | | # L2上传数据协议管理器 |
| | | class L2DataUploadProtocolManager: |
| | | |
| | | # ipchosts IPC协议 |
| | |
| | | self.socket_client_dict = {} |
| | | # 保存代码分配的client 格式:{code:(host, socket)} |
| | | self.code_socket_client_dict = {} |
| | | self.lock = threading.Lock() |
| | | self.rlock = threading.RLock() |
| | | context = zmq.Context() |
| | | for host in self.ipchosts: |
| | | socket = context.socket(zmq.REQ) |
| | |
| | | |
| | | # 获取 |
| | | def __get_available_ipchost(self): |
| | | try: |
| | | if len(self.code_socket_client_dict) >= len(self.socket_client_dict): |
| | | raise Exception("无可用host") |
| | | used_hosts = set([self.code_socket_client_dict[k][0] for k in self.code_socket_client_dict]) |
| | | for host in self.socket_client_dict: |
| | | if host not in used_hosts: |
| | | return host, self.socket_client_dict[host] |
| | | if len(self.code_socket_client_dict) >= len(self.socket_client_dict): |
| | | raise Exception("无可用host") |
| | | finally: |
| | | self.lock.release() |
| | | used_hosts = set([self.code_socket_client_dict[k][0] for k in self.code_socket_client_dict]) |
| | | for host in self.socket_client_dict: |
| | | if host not in used_hosts: |
| | | return host, self.socket_client_dict[host] |
| | | raise Exception("无可用host") |
| | | |
| | | # 分配HOST |
| | | def distribute_upload_host(self, code): |
| | | self.lock.acquire() |
| | | self.rlock.acquire() |
| | | try: |
| | | host_info = self.__get_available_ipchost() |
| | | if host_info: |
| | | self.code_socket_client_dict[code] = host_info |
| | | finally: |
| | | self.lock.release() |
| | | self.rlock.release() |
| | | |
| | | def release_distributed_upload_host(self, code): |
| | | self.lock.acquire() |
| | | self.rlock.acquire() |
| | | try: |
| | | if code in self.code_socket_client_dict: |
| | | self.code_socket_client_dict.pop(code) |
| | | finally: |
| | | self.lock.release() |
| | | self.rlock.release() |
| | | |
| | | def upload_data_as_json(self, code, data): |
| | | if code not in self.code_socket_client_dict: |
| | |
| | | pass |
| | | |
| | | |
| | | |
| | | def run_test(): |
| | | t = threading.Thread(target=lambda: __test(), daemon=True) |
| | | t.start() |
| | |
| | | import threading |
| | | import time |
| | | |
| | | import zmq |
| | | |
| | | from log_module import async_log_util |
| | | from log_module.log import logger_debug |
| | | |
| | |
| | | self.__l2_order_active_time_dict = {} |
| | | self.__l2_transaction_active_time_dict = {} |
| | | self.__l2_market_active_time_dict = {} |
| | | self.zmq_context = zmq.Context() |
| | | |
| | | # 接收L2逐笔委托数据 |
| | | def __recive_l2_orders(self, q: multiprocessing.Queue): |
| | |
| | | finally: |
| | | self.__l2_market_active_time_dict[__id] = time.time() |
| | | |
| | | def __create_ipc_server(self, host): |
| | | socket = self.zmq_context.socket(zmq.REP) |
| | | socket.bind(host) |
| | | count = 0 |
| | | while True: |
| | | try: |
| | | data = socket.recv_json() |
| | | self.my_l2_data_callback.OnL2Order(data[0], data[1], data[2]) |
| | | socket.send_string("SUCCESS") |
| | | except Exception as e: |
| | | async_log_util.exception(logger_debug, e) |
| | | finally: |
| | | count += 1 |
| | | if count > 100: |
| | | count = 0 |
| | | # 记录活跃时间,每100次记录一次 |
| | | self.__l2_order_active_time_dict[host] = time.time() |
| | | |
| | | # 创建订单的IPC服务 |
| | | def __create_ipc_server_hosts(self, order_ipc_hosts): |
| | | for host in order_ipc_hosts: |
| | | threading.Thread(target=lambda: self.__create_ipc_server(host), daemon=True).start() |
| | | |
| | | # 接收L2数据 |
| | | def receive_l2_data(self, order_queues, transaction_queues, market_queue): |
| | | for q in order_queues: |
| | | t1 = threading.Thread(target=lambda: self.__recive_l2_orders(q), daemon=True) |
| | | t1.start() |
| | | def receive_l2_data(self, order_queues, transaction_queues, market_queue, order_ipc_hosts): |
| | | # TODO 暂时不通过队列接收数据 |
| | | # for q in order_queues: |
| | | # t1 = threading.Thread(target=lambda: self.__recive_l2_orders(q), daemon=True) |
| | | # t1.start() |
| | | for q in transaction_queues: |
| | | t2 = threading.Thread(target=lambda: self.__recive_transaction_orders(q), daemon=True) |
| | | t2.start() |
| | | t3 = threading.Thread(target=lambda: self.__recive_l2_markets(market_queue), daemon=True) |
| | | t3.start() |
| | | # 接收订单hosts |
| | | self.__create_ipc_server_hosts(order_ipc_hosts) |
| | | |
| | | def get_active_count(self, type_): |
| | | expire_time = time.time() - 5 |
| | |
| | | queue_l1_w_strategy_r_: multiprocessing.Queue, |
| | | queue_strategy_w_trade_r_: multiprocessing.Queue, |
| | | queue_strategy_w_trade_r_for_read_: multiprocessing.Queue, order_queues_, transaction_queues_, |
| | | market_queue_, queue_l1_trade_r_strategy_w_, queue_l1_trade_w_strategy_r_): |
| | | market_queue_, queue_l1_trade_r_strategy_w_, queue_l1_trade_w_strategy_r_, order_ipc_hosts_): |
| | | logger_system.info("策略进程ID:{}", os.getpid()) |
| | | log.close_print() |
| | | # 初始化参数 |
| | |
| | | huaxin_trade_server.run(queue_strategy_r_trade_w_, queue_l1_w_strategy_r_, queue_strategy_w_trade_r_, |
| | | queue_strategy_w_trade_r_for_read_, order_queues_, |
| | | transaction_queues_, market_queue_, |
| | | queue_l1_trade_w_strategy_r_) |
| | | queue_l1_trade_w_strategy_r_, order_ipc_hosts_) |
| | | |
| | | |
| | | # 主服务 |
| | |
| | | # 创建L2通信队列 |
| | | order_queues = [] |
| | | transaction_queues = [] |
| | | order_ipc_hosts = [] |
| | | market_queue = multiprocessing.Queue() |
| | | for i in range(constant.HUAXIN_L2_MAX_CODES_COUNT): |
| | | order_queues.append(multiprocessing.Queue()) |
| | | for i in range(constant.HUAXIN_L2_MAX_CODES_COUNT): |
| | | transaction_queues.append(multiprocessing.Queue()) |
| | | for i in range(constant.HUAXIN_L2_MAX_CODES_COUNT): |
| | | order_ipc_hosts.append(f"ipc://l2order{i}.ipc") |
| | | |
| | | # L2 |
| | | l2Process = multiprocessing.Process( |
| | | target=huaxin_client.l2_client.run, |
| | | args=(queue_other_w_l2_r, order_queues, transaction_queues, market_queue)) |
| | | args=(queue_other_w_l2_r, order_queues, transaction_queues, market_queue, order_ipc_hosts)) |
| | | l2Process.start() |
| | | |
| | | # 主进程 |
| | | createTradeServer(pss_strategy, queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r, |
| | | queue_strategy_w_trade_r_for_read, |
| | | order_queues, transaction_queues, market_queue, queue_l1_trade_r_strategy_w, |
| | | queue_l1_trade_w_strategy_r) |
| | | queue_l1_trade_w_strategy_r, order_ipc_hosts) |
| | | |
| | | # 将tradeServer作为主进程 |
| | | l1Process.join() |
| | |
| | | |
| | | def run(queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_read, |
| | | order_queues, transaction_queues, |
| | | market_queue, queue_l1_trade_w_strategy_r): |
| | | market_queue, queue_l1_trade_w_strategy_r, order_ipc_hosts): |
| | | logger_system.info(f"trade_server 线程ID:{tool.get_thread_id()}") |
| | | try: |
| | | # 执行一些初始化数据 |
| | |
| | | # 监听L2数据 |
| | | global l2DataListenManager |
| | | l2DataListenManager = L2DataListenManager(my_l2_data_callback) |
| | | l2DataListenManager.receive_l2_data(order_queues, transaction_queues, market_queue) |
| | | l2DataListenManager.receive_l2_data(order_queues, transaction_queues, market_queue, order_ipc_hosts) |
| | | |
| | | # 启动交易服务 |
| | | huaxin_trade_api.run_pipe_trade(queue_strategy_r_trade_w, queue_strategy_w_trade_r, |