# -*- coding: utf-8 -*- import json import logging import marshal import multiprocessing import queue import threading import time import msgpack import constant from huaxin_client import socket_util from huaxin_client.client_network import SendResponseSkManager # 活动时间 from huaxin_client.code_queue_distribute_manager import CodeDataChannelDistributeManager from log_module import async_log_util from log_module.async_log_util import huaxin_l2_log from log_module.log import logger_local_huaxin_l2_error, logger_system, logger_local_huaxin_l2_subscript, \ logger_local_huaxin_l2_special_volume, logger_debug, logger_local_huaxin_l2_orderdetail from utils import tool, shared_memery_util import collections import zmq order_detail_upload_active_time_dict = {} transaction_upload_active_time_dict = {} # 临时数据 tmep_order_detail_queue_dict = {} tmep_transaction_queue_dict = {} target_codes = set() target_codes_add_time = {} common_queue = queue.Queue(maxsize=1000) # L2上传数据管理器 class L2DataUploadManager: """ L2逐笔委托/L2逐笔成交:通过共享内存+ZMQ上传数据 L2市场行情: 通过普通数据上传队列写入 L2订阅的代码: 通过普通数据上传队列写入 """ TYPE_DELEGATE = 1 TYPE_TRANSACTION = 2 TYPE_MARKET = 3 def __init__(self, data_channel_distribute_manager: CodeDataChannelDistributeManager, common_data_upload_queue: multiprocessing.Queue): self.data_channel_distribute_manager = data_channel_distribute_manager self.common_data_upload_queue = common_data_upload_queue # 代码分配的对象 self.temp_order_queue_dict = {} self.temp_transaction_queue_dict = {} self.temp_log_queue_dict = {} self.filter_order_condition_dict = {} self.upload_l2_data_task_dict = {} self.l2_order_codes = set() self.l2_transaction_codes = set() # 设置订单过滤条件 # special_price:过滤的1手的价格 def set_order_fileter_condition(self, code, min_volume, limit_up_price, shadow_price, buy_volume, special_volumes): if not special_volumes: special_volumes = set() # if code not in self.filter_order_condition_dict: try: # (最小的量, 涨停价格, 影子单价格, 买的量, 废弃使用, 特殊的量集合) self.filter_order_condition_dict[code] = [(min_volume, limit_up_price, shadow_price, buy_volume, int(min_volume) // 50, set(special_volumes))] # logger_local_huaxin_l2_subscript.info(f"({code})常规过滤条件设置:{self.filter_order_condition_dict[code]}") except Exception as e: logger_debug.error(f"{str(e)} - min_volume-{min_volume}") # 过滤订单 def __filter_order(self, item): filter_condition = self.filter_order_condition_dict.get(item[0]) if filter_condition: # item[2]为量 if item[2] >= filter_condition[0][0]: return item # 1手的买单满足价格 # if item[2] == 100 and abs(filter_condition[0][2] - item[1]) < 0.001: # return item # 买量 if item[2] == filter_condition[0][3]: return item # 所有的涨停卖 if item[3] != '1': # 卖与卖撤 if abs(item[1] - filter_condition[0][1]) < 0.001: # 涨停价 return item else: if item[2] in filter_condition[0][5]: # 特殊手数 return item return None return item # 过滤订单 def __filter_transaction(self, item): filter_condition = self.filter_order_condition_dict.get(item[0]) if filter_condition: # item[2]为量 if abs(item[1] - filter_condition[0][1]) < 0.201: return item return None return item # 添加委托详情 def add_l2_order_detail(self, data, start_time=0, istransaction=False): code = data["SecurityID"] # 不直接加入 # queue_info = self.order_queue_distribute_manager.get_distributed_queue(code) # if not queue_info: # return # queue_info[1].put_nowait( # (data['SecurityID'], data['Price'], data['Volume'], data['Side'], data['OrderType'], data['OrderTime'], # data['MainSeq'], data['SubSeq'], data['OrderNO'], data['OrderStatus'], time.time(), start_time)) # if data['Volume'] == 100: # log_queue = self.temp_log_queue_dict.get(code) # if log_queue: # log_queue.put_nowait(data) q: collections.deque = self.temp_order_queue_dict.get(code) if q is not None: q.append( (data['SecurityID'], data['Price'], data['Volume'], data['Side'], data['OrderType'], data['OrderTime'], data['MainSeq'], data['SubSeq'], data['OrderNO'], data['OrderStatus'], time.time(), start_time)) # 添加逐笔成交 def add_transaction_detail(self, data): code = data["SecurityID"] # 不直接加入 # queue_info = self.transaction_queue_distribute_manager.get_distributed_queue(code) # if not queue_info: # return # # 判断是否为大单成交 # queue_info[1].put_nowait((data['SecurityID'], data['TradePrice'], data['TradeVolume'], # data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'], # data['SellNo'], data['ExecType'])) q: collections.deque = self.temp_transaction_queue_dict.get(code) if q is not None: q.append((data['SecurityID'], data['TradePrice'], data['TradeVolume'], data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'], data['SellNo'], data['ExecType'], time.time())) def add_market_data(self, data): code = data['securityID'] # 改为队列回调发送 self.common_data_upload_queue.put_nowait({"type": "l2_market", "data": (code, data)}) # 分配上传队列 def distribute_upload_queue(self, code, _target_codes=None): """ 分配上传队列 @param code: 代码 @param _target_codes: 所有的目标代码 @return: """ if not self.data_channel_distribute_manager.get_distributed_channel(code): self.data_channel_distribute_manager.distribute_channel(code, _target_codes) if code not in self.temp_order_queue_dict: self.temp_order_queue_dict[code] = collections.deque() if code not in self.temp_transaction_queue_dict: self.temp_transaction_queue_dict[code] = collections.deque() if code not in self.temp_log_queue_dict: self.temp_log_queue_dict[code] = queue.Queue(maxsize=1000) if code not in self.upload_l2_data_task_dict: t1 = threading.Thread(target=lambda: self.__run_upload_order_task(code), daemon=True) t1.start() t2 = threading.Thread(target=lambda: self.__run_upload_transaction_task(code), daemon=True) t2.start() # t3 = threading.Thread(target=lambda: self.__run_log_task(code), daemon=True) # t3.start() self.upload_l2_data_task_dict[code] = (t1, t2) # 释放已经分配的队列 def release_distributed_upload_queue(self, code): self.data_channel_distribute_manager.release_distribute_channel(code) if code in self.temp_order_queue_dict: self.temp_order_queue_dict[code].clear() self.temp_order_queue_dict.pop(code) if code in self.temp_transaction_queue_dict: self.temp_transaction_queue_dict[code].clear() self.temp_transaction_queue_dict.pop(code) if code in self.temp_log_queue_dict: self.temp_log_queue_dict.pop(code) if code in self.upload_l2_data_task_dict: self.upload_l2_data_task_dict.pop(code) # 处理订单数据并上传 def __run_upload_order_task(self, code): q: collections.deque = self.temp_order_queue_dict.get(code) temp_list = [] while True: try: while len(q) > 0: data = q.popleft() # 前置数据处理,过滤掉无用的数据 data = self.__filter_order(data) if data: temp_list.append(data) if temp_list: # 上传数据 # self.__upload_l2_data(code, upload_queue, temp_list) # self.__upload_l2_order_data(code, temp_list) __start_time = time.time() last_data = temp_list[-1] shared_memery_number, m_array, zmq_host = \ self.data_channel_distribute_manager.get_distributed_channel(code)[0] # 数据填充到内存 shared_memery_util.set_datas(m_array, (code, temp_list, time.time())) # 通知获取数据 _socket = self.data_channel_distribute_manager.get_zmq_socket(zmq_host) _socket.send( msgpack.packb({"type": self.TYPE_DELEGATE, "data": {"memery_number": shared_memery_number}})) _socket.recv_string() use_time = time.time() - __start_time if use_time > 0.01: # 记录10ms以上的数据 huaxin_l2_log.info(logger_local_huaxin_l2_error, f"耗时:{use_time}s 结束数据:{last_data} 数据数量:{len(temp_list)}") temp_list = [] else: if code not in self.temp_order_queue_dict: self.l2_order_codes.discard(code) break self.l2_order_codes.add(code) time.sleep(0.001) except Exception as e: logging.exception(e) logger_local_huaxin_l2_error.exception(e) finally: temp_list.clear() # 处理成交数据并上传 def __run_upload_transaction_task(self, code): q: collections.deque = self.temp_transaction_queue_dict.get(code) temp_list = [] while True: try: while len(q) > 0: data = q.popleft() data = self.__filter_transaction(data) if data: temp_list.append(data) if temp_list: # 上传数据 # self.__upload_l2_data(code, upload_queue, temp_list) shared_memery_number, m_array, zmq_host = \ self.data_channel_distribute_manager.get_distributed_channel(code)[1] # 数据填充到内存 shared_memery_util.set_datas(m_array, (code, temp_list)) # 通知获取数据 _socket = self.data_channel_distribute_manager.get_zmq_socket(zmq_host) _socket.send( msgpack.packb({"type": self.TYPE_TRANSACTION, "data": {"memery_number": shared_memery_number}})) _socket.recv_string() temp_list = [] else: if code not in self.temp_transaction_queue_dict: self.l2_transaction_codes.discard(code) break self.l2_transaction_codes.add(code) time.sleep(0.001) except Exception as e: logger_local_huaxin_l2_error.exception(e) finally: temp_list.clear() def __run_log_task(self, code): q: queue.Queue = self.temp_log_queue_dict.get(code) while True: try: temp = q.get(timeout=10) huaxin_l2_log.info(logger_local_huaxin_l2_special_volume, f"{temp}") except: time.sleep(0.02) finally: if code not in self.temp_log_queue_dict: break class L2DataUploadProtocolManager: # ipchosts IPC协议 def __init__(self, ipchosts): self.ipchosts = ipchosts # 所有的client self.socket_client_dict = {} # 保存代码分配的client 格式:{code:(host, socket)} self.code_socket_client_dict = {} self.rlock = threading.RLock() context = zmq.Context() if constant.is_windows(): return for host in self.ipchosts: socket = context.socket(zmq.REQ) socket.connect(host) self.socket_client_dict[host] = socket # 获取 def __get_available_ipchost(self): if len(self.code_socket_client_dict) >= len(self.socket_client_dict): raise Exception("无可用host") used_hosts = set([self.code_socket_client_dict[k][0] for k in self.code_socket_client_dict]) for host in self.socket_client_dict: if host not in used_hosts: return host, self.socket_client_dict[host] raise Exception("无可用host") # 分配HOST def distribute_upload_host(self, code): if code in self.code_socket_client_dict: return self.rlock.acquire() try: host_info = self.__get_available_ipchost() if host_info: self.code_socket_client_dict[code] = host_info finally: self.rlock.release() def release_distributed_upload_host(self, code): if code not in self.code_socket_client_dict: return self.rlock.acquire() try: if code in self.code_socket_client_dict: self.code_socket_client_dict.pop(code) finally: self.rlock.release() def upload_data_as_json(self, code, data): if code not in self.code_socket_client_dict: raise Exception("尚未分配host") host, socket = self.code_socket_client_dict[code] socket.send(marshal.dumps(data)) socket.recv_string() def add_target_code(code): target_codes.add(code) # 记录代码加入时间 target_codes_add_time[code] = time.time() def del_target_code(code): target_codes.discard(code) if code in target_codes_add_time: target_codes_add_time.pop(code) def add_subscript_codes(codes): # print("add_subscript_codes", codes) # 加入上传队列 pid = multiprocessing.current_process().pid common_queue.put(('', "l2_subscript_codes_v2", (pid, list(codes)))) def __send_response(sk, msg): msg = socket_util.load_header(msg) sk.sendall(msg) result, header_str = socket_util.recv_data(sk) if result: result_json = json.loads(result) if result_json.get("code") == 0: return True return False # 发送消息 def send_response(type, msg): try: sk = SendResponseSkManager.get_send_response_sk(type) if __send_response(sk, msg): return True else: # 再次发送 # print("再次发送") return __send_response(sk, msg) except ConnectionResetError as e: SendResponseSkManager.del_send_response_sk(type) sk = SendResponseSkManager.get_send_response_sk(type) return __send_response(sk, msg) except BrokenPipeError as e: SendResponseSkManager.del_send_response_sk(type) sk = SendResponseSkManager.get_send_response_sk(type) return __send_response(sk, msg) # 上传数据 def upload_data(code, _type, datas, new_sk=False): key = f"{_type}_{code}" fdata = json.dumps( {"type": _type, "data": {"code": code, "data": datas, "time": round(time.time() * 1000)}}) result = None try: if new_sk: sk = SendResponseSkManager.create_send_response_sk() result = __send_response(sk, fdata.encode('utf-8')) else: result = send_response(key, fdata.encode('utf-8')) except Exception as e: logging.exception(e) finally: pass def __run_upload_common(): # print("__run_upload_common") logger_system.info(f"l2_client __run_upload_common 线程ID:{tool.get_thread_id()}") while True: try: while not common_queue.empty(): temp = common_queue.get() upload_data(temp[0], temp[1], temp[2]) except Exception as e: logger_local_huaxin_l2_error.exception(e) logger_local_huaxin_l2_error.error(f"上传普通数据出错:{str(e)}") finally: time.sleep(0.01) def __run_log(): # print("__run_log") logger_system.info(f"l2_client __run_log 线程ID:{tool.get_thread_id()}") async_log_util.huaxin_l2_log.run_sync() # 采用socket传输数据 def run_upload_common(): t = threading.Thread(target=lambda: __run_upload_common(), daemon=True) t.start() def run_log(): t = threading.Thread(target=lambda: __run_log(), daemon=True) t.start() def __test(): # 分配数据 pass def run_test(): t = threading.Thread(target=lambda: __test(), daemon=True) t.start() def test(): ipclist = [] for i in range(0, 70): ipclist.append(f"ipc://l2order{i}.ipc") manager = L2DataUploadProtocolManager(ipclist) code = "000333" manager.distribute_upload_host(code) manager.upload_data_as_json(code, {"test": "test"})