Administrator
4 天以前 48fb7a00951f91bdc707e5dd2d196e5bccb752c3
huaxin_client/l2_data_manager.py
@@ -1,21 +1,25 @@
# -*- coding: utf-8 -*-
import json
import logging
import multiprocessing
import marshal
import queue
import threading
import time
import constant
from huaxin_client import socket_util
from huaxin_client.client_network import SendResponseSkManager
# 活动时间
from huaxin_client.code_queue_distribute_manager import CodeQueueDistributeManager
from huaxin_client.code_queue_distribute_manager import CodeDataCallbackDistributeManager
from log_module import async_log_util
from log_module.async_log_util import huaxin_l2_log
from log_module.log import logger_local_huaxin_l2_error, logger_system, logger_local_huaxin_l2_subscript
from log_module.log import logger_local_huaxin_l2_error, logger_system, logger_local_huaxin_l2_subscript, \
    logger_local_huaxin_l2_special_volume, logger_debug, logger_local_huaxin_l2_orderdetail
from utils import tool
import collections
import zmq
order_detail_upload_active_time_dict = {}
transaction_upload_active_time_dict = {}
@@ -24,56 +28,62 @@
tmep_transaction_queue_dict = {}
target_codes = set()
target_codes_add_time = {}
common_queue = queue.Queue()
common_queue = queue.Queue(maxsize=1000)
# L2上传数据管理器
class L2DataUploadManager:
    def __init__(self, order_queue_distribute_manager: CodeQueueDistributeManager,
                 transaction_queue_distribute_manager: CodeQueueDistributeManager,
                 market_data_queue: multiprocessing.Queue):
        self.order_queue_distribute_manager = order_queue_distribute_manager
        self.transaction_queue_distribute_manager = transaction_queue_distribute_manager
        self.market_data_queue = market_data_queue
    def __init__(self, data_callback_distribute_manager: CodeDataCallbackDistributeManager):
        self.data_callback_distribute_manager = data_callback_distribute_manager
        # 代码分配的对象
        self.temp_order_queue_dict = {}
        self.temp_transaction_queue_dict = {}
        self.temp_log_queue_dict = {}
        self.filter_order_condition_dict = {}
        self.upload_l2_data_task_dict = {}
        self.l2_order_codes = set()
        self.l2_transaction_codes = set()
    # 设置订单过滤条件
    def set_order_fileter_condition(self, code, min_volume, limit_up_price, special_volumes=None,
                                    special_volumes_expire_time=None):
        if special_volumes is None:
    # special_price:过滤的1手的价格
    def set_order_fileter_condition(self, code, min_volume, limit_up_price, shadow_price, buy_volume, special_volumes):
        if not special_volumes:
            special_volumes = set()
        if code in self.filter_order_condition_dict and not special_volumes and not special_volumes_expire_time:
            self.filter_order_condition_dict[code][0] = (min_volume, limit_up_price)
            huaxin_l2_log.info(logger_local_huaxin_l2_subscript,
                               f"({code})常规过滤条件设置:{self.filter_order_condition_dict[code]}")
        else:
            self.filter_order_condition_dict[code] = [(min_volume, limit_up_price), special_volumes,
                                                      special_volumes_expire_time]
            huaxin_l2_log.info(logger_local_huaxin_l2_subscript,
                               f"({code})下单后过滤条件设置:{self.filter_order_condition_dict[code]}")
        # if code not in self.filter_order_condition_dict:
        try:
            # (最小的量, 涨停价格, 影子单价格, 买的量, 废弃使用, 特殊的量集合)
            self.filter_order_condition_dict[code] = [(min_volume, limit_up_price, shadow_price, buy_volume,
                                                       int(min_volume) // 50, set(special_volumes))]
            # huaxin_l2_log.info(logger_local_huaxin_l2_subscript,
            #                    f"({code})常规过滤条件设置:{self.filter_order_condition_dict[code]}")
        except Exception as e:
            logger_debug.error(f"{str(e)} - min_volume-{min_volume}")
    # 过滤订单
    def __filter_order(self, item):
        if item[2] == 100 and item[3] == '1':
            # 不过滤买1手
            return item
        filter_condition = self.filter_order_condition_dict.get(item[0])
        if filter_condition:
            # item[2]为量
            if item[2] >= filter_condition[0][0]:
                return item
            if filter_condition[1] and item[2] in filter_condition[1]:
                if filter_condition[2] and time.time() > filter_condition[2]:
                    # 超时了,需要清除特殊量数据
                    filter_condition[1] = set()
                    filter_condition[2] = None
                    return None
            # 1手的买单满足价格
            # if item[2] == 100 and abs(filter_condition[0][2] - item[1]) < 0.001:
            #     return item
            # 买量
            if item[2] == filter_condition[0][3]:
                return item
            # 所有的涨停卖
            if item[3] != '1':
                # 卖与卖撤
                if abs(item[1] - filter_condition[0][1]) < 0.001:
                    # 涨停价
                    return item
            else:
                if item[2] in filter_condition[0][5]:
                    # 特殊手数
                    return item
            return None
        return item
        # 过滤订单
@@ -97,10 +107,16 @@
        # queue_info[1].put_nowait(
        #     (data['SecurityID'], data['Price'], data['Volume'], data['Side'], data['OrderType'], data['OrderTime'],
        #      data['MainSeq'], data['SubSeq'], data['OrderNO'], data['OrderStatus'], time.time(), start_time))
        # if data['Volume'] == 100:
        #     log_queue = self.temp_log_queue_dict.get(code)
        #     if log_queue:
        #         log_queue.put_nowait(data)
        q: collections.deque = self.temp_order_queue_dict.get(code)
        q.append((data['SecurityID'], data['Price'], data['Volume'], data['Side'], data['OrderType'], data['OrderTime'],
                  data['MainSeq'], data['SubSeq'], data['OrderNO'], data['OrderStatus'], time.time(), start_time))
        if q is not None:
            q.append(
                (data['SecurityID'], data['Price'], data['Volume'], data['Side'], data['OrderType'], data['OrderTime'],
                 data['MainSeq'], data['SubSeq'], data['OrderNO'], data['OrderStatus'], time.time(), start_time))
    # 添加逐笔成交
    def add_transaction_detail(self, data):
@@ -115,54 +131,68 @@
        #                           data['SellNo'], data['ExecType']))
        q: collections.deque = self.temp_transaction_queue_dict.get(code)
        q.append((data['SecurityID'], data['TradePrice'], data['TradeVolume'],
                  data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'],
                  data['SellNo'], data['ExecType']))
        if q is not None:
            q.append((data['SecurityID'], data['TradePrice'], data['TradeVolume'],
                      data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'],
                      data['SellNo'], data['ExecType'], time.time()))
    def add_market_data(self, data):
        # 加入上传队列
        self.market_data_queue.put_nowait(data)
        # self.market_data_queue.put_nowait(data)
        code = data['securityID']
        callback = self.data_callback_distribute_manager.get_distributed_callback(code)
        if callback:
            callback.OnMarketData(code, data)
    # 分配上传队列
    def distribute_upload_queue(self, code):
        if not self.order_queue_distribute_manager.get_distributed_queue(code):
            self.order_queue_distribute_manager.distribute_queue(code)
        if not self.transaction_queue_distribute_manager.get_distributed_queue(code):
            self.transaction_queue_distribute_manager.distribute_queue(code)
    def distribute_upload_queue(self, code, _target_codes=None):
        """
        分配上传队列
        @param code: 代码
        @param _target_codes: 所有的目标代码
        @return:
        """
        if not self.data_callback_distribute_manager.get_distributed_callback(code):
            self.data_callback_distribute_manager.distribute_callback(code, _target_codes)
        if code not in self.temp_order_queue_dict:
            self.temp_order_queue_dict[code] = collections.deque()
        if code not in self.temp_transaction_queue_dict:
            self.temp_transaction_queue_dict[code] = collections.deque()
        if code not in self.temp_log_queue_dict:
            self.temp_log_queue_dict[code] = queue.Queue(maxsize=1000)
        if code not in self.upload_l2_data_task_dict:
            t1 = threading.Thread(target=lambda: self.__run_upload_order_task(code), daemon=True)
            t1.start()
            t2 = threading.Thread(target=lambda: self.__run_upload_transaction_task(code), daemon=True)
            t2.start()
            # t3 = threading.Thread(target=lambda: self.__run_log_task(code), daemon=True)
            # t3.start()
            self.upload_l2_data_task_dict[code] = (t1, t2)
        # 释放已经分配的队列
    def release_distributed_upload_queue(self, code):
        self.order_queue_distribute_manager.release_distribute_queue(code)
        self.transaction_queue_distribute_manager.release_distribute_queue(code)
        self.data_callback_distribute_manager.release_distribute_callback(code)
        if code in self.temp_order_queue_dict:
            self.temp_order_queue_dict[code].clear()
            self.temp_order_queue_dict.pop(code)
        if code in self.temp_transaction_queue_dict:
            self.temp_transaction_queue_dict[code].clear()
            self.temp_transaction_queue_dict.pop(code)
        if code in self.temp_log_queue_dict:
            self.temp_log_queue_dict.pop(code)
        if code in self.upload_l2_data_task_dict:
            self.upload_l2_data_task_dict.pop(code)
    def __upload_l2_data(self, code, _queue, datas):
        _queue.put_nowait((code, datas, time.time()))
        _queue.put_nowait(marshal.dumps([code, datas, time.time()]))
    # 处理订单数据并上传
    def __run_upload_order_task(self, code):
        q: collections.deque = self.temp_order_queue_dict.get(code)
        temp_list = []
        queue_info = self.order_queue_distribute_manager.get_distributed_queue(code)
        upload_queue = queue_info[1]
        filter_condition = self.filter_order_condition_dict.get(code)
        while True:
            try:
                while len(q) > 0:
@@ -174,8 +204,22 @@
                if temp_list:
                    # 上传数据
                    self.__upload_l2_data(code, upload_queue, temp_list)
                    temp_list = []
                    # self.__upload_l2_data(code, upload_queue, temp_list)
                    # self.__upload_l2_order_data(code, temp_list)
                    __start_time = time.time()
                    last_data = temp_list[-1]
                    self.data_callback_distribute_manager.get_distributed_callback(code).OnL2Order(code, temp_list,
                                                                                                   time.time())
                    use_time = time.time() - __start_time
                    if use_time > 0.01:
                        # 记录10ms以上的数据
                        huaxin_l2_log.info(logger_local_huaxin_l2_error, f"耗时:{use_time}s  结束数据:{last_data}")
                    # 记录所有的订单号
                    if filter_condition:
                        huaxin_l2_log.info(logger_local_huaxin_l2_orderdetail,
                                           f"{[(x[0], x[1], x[2], x[4], x[8]) for x in temp_list if x[2] >= filter_condition[0][0]]}")
                    temp_list.clear()
                else:
                    if code not in self.temp_order_queue_dict:
                        self.l2_order_codes.discard(code)
@@ -186,13 +230,11 @@
            except Exception as e:
                logging.exception(e)
            finally:
                pass
                temp_list.clear()
    # 处理成交数据并上传
    def __run_upload_transaction_task(self, code):
        q: collections.deque = self.temp_transaction_queue_dict.get(code)
        queue_info = self.transaction_queue_distribute_manager.get_distributed_queue(code)
        upload_queue = queue_info[1]
        temp_list = []
        while True:
            try:
@@ -203,18 +245,91 @@
                        temp_list.append(data)
                if temp_list:
                    # 上传数据
                    self.__upload_l2_data(code, upload_queue, temp_list)
                    # self.__upload_l2_data(code, upload_queue, temp_list)
                    self.data_callback_distribute_manager.get_distributed_callback(code).OnL2Transaction(code,
                                                                                                         temp_list)
                    temp_list = []
                else:
                    if code not in self.temp_transaction_queue_dict:
                        self.l2_transaction_codes.discard(code)
                        break
                    self.l2_transaction_codes.add(code)
                    time.sleep(0.002)
                    time.sleep(0.001)
            except:
                pass
            finally:
                pass
                temp_list.clear()
    def __run_log_task(self, code):
        q: queue.Queue = self.temp_log_queue_dict.get(code)
        while True:
            try:
                temp = q.get(timeout=10)
                huaxin_l2_log.info(logger_local_huaxin_l2_special_volume,
                                   f"{temp}")
            except:
                time.sleep(0.02)
            finally:
                if code not in self.temp_log_queue_dict:
                    break
class L2DataUploadProtocolManager:
    # ipchosts IPC协议
    def __init__(self, ipchosts):
        self.ipchosts = ipchosts
        # 所有的client
        self.socket_client_dict = {}
        # 保存代码分配的client 格式:{code:(host, socket)}
        self.code_socket_client_dict = {}
        self.rlock = threading.RLock()
        context = zmq.Context()
        if constant.is_windows():
            return
        for host in self.ipchosts:
            socket = context.socket(zmq.REQ)
            socket.connect(host)
            self.socket_client_dict[host] = socket
    # 获取
    def __get_available_ipchost(self):
        if len(self.code_socket_client_dict) >= len(self.socket_client_dict):
            raise Exception("无可用host")
        used_hosts = set([self.code_socket_client_dict[k][0] for k in self.code_socket_client_dict])
        for host in self.socket_client_dict:
            if host not in used_hosts:
                return host, self.socket_client_dict[host]
        raise Exception("无可用host")
    # 分配HOST
    def distribute_upload_host(self, code):
        if code in self.code_socket_client_dict:
            return
        self.rlock.acquire()
        try:
            host_info = self.__get_available_ipchost()
            if host_info:
                self.code_socket_client_dict[code] = host_info
        finally:
            self.rlock.release()
    def release_distributed_upload_host(self, code):
        if code not in self.code_socket_client_dict:
            return
        self.rlock.acquire()
        try:
            if code in self.code_socket_client_dict:
                self.code_socket_client_dict.pop(code)
        finally:
            self.rlock.release()
    def upload_data_as_json(self, code, data):
        if code not in self.code_socket_client_dict:
            raise Exception("尚未分配host")
        host, socket = self.code_socket_client_dict[code]
        socket.send(marshal.dumps(data))
        socket.recv_string()
def add_target_code(code):
@@ -230,7 +345,7 @@
def add_subscript_codes(codes):
    print("add_subscript_codes", codes)
    # print("add_subscript_codes", codes)
    # 加入上传队列
    common_queue.put(('', "l2_subscript_codes", list(codes)))
@@ -254,7 +369,7 @@
            return True
        else:
            # 再次发送
            print("再次发送")
            # print("再次发送")
            return __send_response(sk, msg)
    except ConnectionResetError as e:
        SendResponseSkManager.del_send_response_sk(type)
@@ -285,7 +400,7 @@
def __run_upload_common():
    print("__run_upload_common")
    # print("__run_upload_common")
    logger_system.info(f"l2_client __run_upload_common 线程ID:{tool.get_thread_id()}")
    while True:
        try:
@@ -301,7 +416,7 @@
def __run_log():
    print("__run_log")
    # print("__run_log")
    logger_system.info(f"l2_client __run_log 线程ID:{tool.get_thread_id()}")
    async_log_util.huaxin_l2_log.run_sync()
@@ -328,4 +443,10 @@
def test():
    pass
    ipclist = []
    for i in range(0, 70):
        ipclist.append(f"ipc://l2order{i}.ipc")
    manager = L2DataUploadProtocolManager(ipclist)
    code = "000333"
    manager.distribute_upload_host(code)
    manager.upload_data_as_json(code, {"test": "test"})