| | |
| | | # -*- coding: utf-8 -*- |
| | | import json |
| | | import logging |
| | | import multiprocessing |
| | | import marshal |
| | | import queue |
| | | import threading |
| | | import time |
| | | |
| | | import constant |
| | | from huaxin_client import socket_util |
| | | |
| | | from huaxin_client.client_network import SendResponseSkManager |
| | | |
| | | # 活动时间 |
| | | from huaxin_client.code_queue_distribute_manager import CodeQueueDistributeManager |
| | | from huaxin_client.code_queue_distribute_manager import CodeDataCallbackDistributeManager |
| | | from log_module import async_log_util |
| | | from log_module.async_log_util import huaxin_l2_log |
| | | from log_module.log import logger_local_huaxin_l2_error, logger_system, logger_local_huaxin_l2_subscript, \ |
| | | logger_local_huaxin_l2_special_volume |
| | | logger_local_huaxin_l2_special_volume, logger_debug, logger_local_huaxin_l2_orderdetail |
| | | from utils import tool |
| | | import collections |
| | | import zmq |
| | |
| | | tmep_transaction_queue_dict = {} |
| | | target_codes = set() |
| | | target_codes_add_time = {} |
| | | common_queue = queue.Queue() |
| | | common_queue = queue.Queue(maxsize=1000) |
| | | |
| | | |
| | | # L2上传数据管理器 |
| | | class L2DataUploadManager: |
| | | def __init__(self, order_queue_distribute_manager: CodeQueueDistributeManager, |
| | | transaction_queue_distribute_manager: CodeQueueDistributeManager, |
| | | market_data_queue: multiprocessing.Queue): |
| | | self.order_queue_distribute_manager = order_queue_distribute_manager |
| | | self.transaction_queue_distribute_manager = transaction_queue_distribute_manager |
| | | self.market_data_queue = market_data_queue |
| | | def __init__(self, data_callback_distribute_manager: CodeDataCallbackDistributeManager): |
| | | self.data_callback_distribute_manager = data_callback_distribute_manager |
| | | # 代码分配的对象 |
| | | self.temp_order_queue_dict = {} |
| | | self.temp_transaction_queue_dict = {} |
| | | self.temp_log_queue_dict = {} |
| | | |
| | | self.filter_order_condition_dict = {} |
| | | self.upload_l2_data_task_dict = {} |
| | | self.l2_order_codes = set() |
| | |
| | | |
| | | # 设置订单过滤条件 |
| | | # special_price:过滤的1手的价格 |
| | | def set_order_fileter_condition(self, code, min_volume, limit_up_price, shadow_price, buy_volume): |
| | | if code not in self.filter_order_condition_dict: |
| | | self.filter_order_condition_dict[code] = [(min_volume, limit_up_price, shadow_price, buy_volume)] |
| | | huaxin_l2_log.info(logger_local_huaxin_l2_subscript, |
| | | f"({code})常规过滤条件设置:{self.filter_order_condition_dict[code]}") |
| | | def set_order_fileter_condition(self, code, min_volume, limit_up_price, shadow_price, buy_volume, special_volumes): |
| | | if not special_volumes: |
| | | special_volumes = set() |
| | | # if code not in self.filter_order_condition_dict: |
| | | try: |
| | | # (最小的量, 涨停价格, 影子单价格, 买的量, 废弃使用, 特殊的量集合) |
| | | self.filter_order_condition_dict[code] = [(min_volume, limit_up_price, shadow_price, buy_volume, |
| | | int(min_volume) // 50, set(special_volumes))] |
| | | # huaxin_l2_log.info(logger_local_huaxin_l2_subscript, |
| | | # f"({code})常规过滤条件设置:{self.filter_order_condition_dict[code]}") |
| | | except Exception as e: |
| | | logger_debug.error(f"{str(e)} - min_volume-{min_volume}") |
| | | |
| | | # 过滤订单 |
| | | def __filter_order(self, item): |
| | |
| | | if item[2] >= filter_condition[0][0]: |
| | | return item |
| | | # 1手的买单满足价格 |
| | | if item[2] == 100 and abs(filter_condition[0][2] - item[1]) < 0.001: |
| | | return item |
| | | # if item[2] == 100 and abs(filter_condition[0][2] - item[1]) < 0.001: |
| | | # return item |
| | | # 买量 |
| | | if item[2] == filter_condition[0][3]: |
| | | return item |
| | | |
| | | # 所有的涨停卖 |
| | | if item[3] != '1': |
| | | # 卖与卖撤 |
| | | if abs(item[1] - filter_condition[0][1]) < 0.001: |
| | | # 涨停价 |
| | | return item |
| | | else: |
| | | if item[2] in filter_condition[0][5]: |
| | | # 特殊手数 |
| | | return item |
| | | return None |
| | | return item |
| | | # 过滤订单 |
| | |
| | | # log_queue.put_nowait(data) |
| | | |
| | | q: collections.deque = self.temp_order_queue_dict.get(code) |
| | | q.append((data['SecurityID'], data['Price'], data['Volume'], data['Side'], data['OrderType'], data['OrderTime'], |
| | | data['MainSeq'], data['SubSeq'], data['OrderNO'], data['OrderStatus'], time.time(), start_time)) |
| | | if q is not None: |
| | | q.append( |
| | | (data['SecurityID'], data['Price'], data['Volume'], data['Side'], data['OrderType'], data['OrderTime'], |
| | | data['MainSeq'], data['SubSeq'], data['OrderNO'], data['OrderStatus'], time.time(), start_time)) |
| | | |
| | | # 添加逐笔成交 |
| | | def add_transaction_detail(self, data): |
| | |
| | | # data['SellNo'], data['ExecType'])) |
| | | |
| | | q: collections.deque = self.temp_transaction_queue_dict.get(code) |
| | | q.append((data['SecurityID'], data['TradePrice'], data['TradeVolume'], |
| | | data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'], |
| | | data['SellNo'], data['ExecType'])) |
| | | if q is not None: |
| | | q.append((data['SecurityID'], data['TradePrice'], data['TradeVolume'], |
| | | data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'], |
| | | data['SellNo'], data['ExecType'], time.time())) |
| | | |
| | | def add_market_data(self, data): |
| | | # 加入上传队列 |
| | | self.market_data_queue.put_nowait(data) |
| | | # self.market_data_queue.put_nowait(data) |
| | | code = data['securityID'] |
| | | callback = self.data_callback_distribute_manager.get_distributed_callback(code) |
| | | if callback: |
| | | callback.OnMarketData(code, data) |
| | | |
| | | # 分配上传队列 |
| | | def distribute_upload_queue(self, code): |
| | | if not self.order_queue_distribute_manager.get_distributed_queue(code): |
| | | self.order_queue_distribute_manager.distribute_queue(code) |
| | | if not self.transaction_queue_distribute_manager.get_distributed_queue(code): |
| | | self.transaction_queue_distribute_manager.distribute_queue(code) |
| | | def distribute_upload_queue(self, code, _target_codes=None): |
| | | """ |
| | | 分配上传队列 |
| | | @param code: 代码 |
| | | @param _target_codes: 所有的目标代码 |
| | | @return: |
| | | """ |
| | | if not self.data_callback_distribute_manager.get_distributed_callback(code): |
| | | self.data_callback_distribute_manager.distribute_callback(code, _target_codes) |
| | | |
| | | if code not in self.temp_order_queue_dict: |
| | | self.temp_order_queue_dict[code] = collections.deque() |
| | | if code not in self.temp_transaction_queue_dict: |
| | | self.temp_transaction_queue_dict[code] = collections.deque() |
| | | if code not in self.temp_log_queue_dict: |
| | | self.temp_log_queue_dict[code] = queue.Queue() |
| | | |
| | | |
| | | self.temp_log_queue_dict[code] = queue.Queue(maxsize=1000) |
| | | if code not in self.upload_l2_data_task_dict: |
| | | t1 = threading.Thread(target=lambda: self.__run_upload_order_task(code), daemon=True) |
| | | t1.start() |
| | |
| | | # 释放已经分配的队列 |
| | | |
| | | def release_distributed_upload_queue(self, code): |
| | | self.order_queue_distribute_manager.release_distribute_queue(code) |
| | | self.transaction_queue_distribute_manager.release_distribute_queue(code) |
| | | self.data_callback_distribute_manager.release_distribute_callback(code) |
| | | if code in self.temp_order_queue_dict: |
| | | self.temp_order_queue_dict[code].clear() |
| | | self.temp_order_queue_dict.pop(code) |
| | |
| | | self.upload_l2_data_task_dict.pop(code) |
| | | |
| | | def __upload_l2_data(self, code, _queue, datas): |
| | | _queue.put_nowait((code, datas, time.time())) |
| | | _queue.put_nowait(marshal.dumps([code, datas, time.time()])) |
| | | |
| | | # 处理订单数据并上传 |
| | | def __run_upload_order_task(self, code): |
| | | q: collections.deque = self.temp_order_queue_dict.get(code) |
| | | temp_list = [] |
| | | queue_info = self.order_queue_distribute_manager.get_distributed_queue(code) |
| | | upload_queue = queue_info[1] |
| | | filter_condition = self.filter_order_condition_dict.get(code) |
| | | while True: |
| | | try: |
| | | while len(q) > 0: |
| | |
| | | |
| | | if temp_list: |
| | | # 上传数据 |
| | | self.__upload_l2_data(code, upload_queue, temp_list) |
| | | # self.__upload_l2_data(code, upload_queue, temp_list) |
| | | # self.__upload_l2_order_data(code, temp_list) |
| | | temp_list = [] |
| | | __start_time = time.time() |
| | | last_data = temp_list[-1] |
| | | self.data_callback_distribute_manager.get_distributed_callback(code).OnL2Order(code, temp_list, |
| | | time.time()) |
| | | use_time = time.time() - __start_time |
| | | if use_time > 0.01: |
| | | # 记录10ms以上的数据 |
| | | huaxin_l2_log.info(logger_local_huaxin_l2_error, f"耗时:{use_time}s 结束数据:{last_data}") |
| | | |
| | | # 记录所有的订单号 |
| | | if filter_condition: |
| | | huaxin_l2_log.info(logger_local_huaxin_l2_orderdetail, |
| | | f"{[(x[0], x[1], x[2], x[4], x[8]) for x in temp_list if x[2] >= filter_condition[0][0]]}") |
| | | temp_list.clear() |
| | | else: |
| | | if code not in self.temp_order_queue_dict: |
| | | self.l2_order_codes.discard(code) |
| | |
| | | except Exception as e: |
| | | logging.exception(e) |
| | | finally: |
| | | pass |
| | | temp_list.clear() |
| | | |
| | | # 处理成交数据并上传 |
| | | def __run_upload_transaction_task(self, code): |
| | | q: collections.deque = self.temp_transaction_queue_dict.get(code) |
| | | queue_info = self.transaction_queue_distribute_manager.get_distributed_queue(code) |
| | | upload_queue = queue_info[1] |
| | | temp_list = [] |
| | | while True: |
| | | try: |
| | |
| | | temp_list.append(data) |
| | | if temp_list: |
| | | # 上传数据 |
| | | self.__upload_l2_data(code, upload_queue, temp_list) |
| | | # self.__upload_l2_data(code, upload_queue, temp_list) |
| | | self.data_callback_distribute_manager.get_distributed_callback(code).OnL2Transaction(code, |
| | | temp_list) |
| | | temp_list = [] |
| | | else: |
| | | if code not in self.temp_transaction_queue_dict: |
| | | self.l2_transaction_codes.discard(code) |
| | | break |
| | | self.l2_transaction_codes.add(code) |
| | | time.sleep(0.002) |
| | | time.sleep(0.001) |
| | | except: |
| | | pass |
| | | finally: |
| | | pass |
| | | temp_list.clear() |
| | | |
| | | def __run_log_task(self, code): |
| | | q: queue.Queue = self.temp_log_queue_dict.get(code) |
| | |
| | | self.code_socket_client_dict = {} |
| | | self.rlock = threading.RLock() |
| | | context = zmq.Context() |
| | | if constant.is_windows(): |
| | | return |
| | | for host in self.ipchosts: |
| | | socket = context.socket(zmq.REQ) |
| | | socket.connect(host) |
| | |
| | | if code not in self.code_socket_client_dict: |
| | | raise Exception("尚未分配host") |
| | | host, socket = self.code_socket_client_dict[code] |
| | | socket.send_json(data) |
| | | socket.send(marshal.dumps(data)) |
| | | socket.recv_string() |
| | | |
| | | |
| | |
| | | |
| | | |
| | | def add_subscript_codes(codes): |
| | | print("add_subscript_codes", codes) |
| | | # print("add_subscript_codes", codes) |
| | | # 加入上传队列 |
| | | common_queue.put(('', "l2_subscript_codes", list(codes))) |
| | | |
| | |
| | | return True |
| | | else: |
| | | # 再次发送 |
| | | print("再次发送") |
| | | # print("再次发送") |
| | | return __send_response(sk, msg) |
| | | except ConnectionResetError as e: |
| | | SendResponseSkManager.del_send_response_sk(type) |
| | |
| | | |
| | | |
| | | def __run_upload_common(): |
| | | print("__run_upload_common") |
| | | # print("__run_upload_common") |
| | | logger_system.info(f"l2_client __run_upload_common 线程ID:{tool.get_thread_id()}") |
| | | while True: |
| | | try: |
| | |
| | | |
| | | |
| | | def __run_log(): |
| | | print("__run_log") |
| | | # print("__run_log") |
| | | logger_system.info(f"l2_client __run_log 线程ID:{tool.get_thread_id()}") |
| | | async_log_util.huaxin_l2_log.run_sync() |
| | | |