Administrator
3 小时以前 0e642d0c27e40df326733401be6f8c8115e54911
l2/place_order_single_data_manager.py
@@ -1,8 +1,11 @@
"""
下单信号管理
"""
import time
from l2 import l2_log
from l2.huaxin import l2_huaxin_util
from log_module import async_log_util
from log_module.log import logger_l2_trade_buy, logger_debug
from utils import tool
@@ -13,9 +16,34 @@
    """
    __latest_sell_data = {}
    # 最近的涨停卖委托列表
    __latest_limit_up_sell_list_dict = {}
    __latest_limit_up_sell_order_no_index_dict = {}
    __latest_limit_up_sell_order_no_set_dict = {}
    # 主动卖订单号集合
    __active_sell_order_no_set_dict = {}
    # 主动买单数据,格式:{"code":[买单号,当前成交手数, 当前成交金额, 开始时间, 结束时间]}
    __latest_active_buy_order_data_dict = {}
    # 所有的买单数据,格式:{code:[[买单数据],...]}
    __active_buy_order_datas_dict = {}
    @classmethod
    def clear_passive_sell_data(cls, code):
        """
        清除被动卖相关的数据
        @param code:
        @return:
        """
        if code in cls.__latest_active_buy_order_data_dict:
            cls.__latest_active_buy_order_data_dict.pop(code)
        if code in cls.__active_buy_order_datas_dict:
            cls.__active_buy_order_datas_dict.pop(code)
        if code in cls.__latest_limit_up_sell_list_dict:
            cls.__latest_limit_up_sell_list_dict.pop(code)
        if code in cls.__latest_limit_up_sell_order_no_set_dict:
            cls.__latest_limit_up_sell_order_no_set_dict.pop(code)
    @classmethod
    def add_l2_delegate_limit_up_sell(cls, code, data):
@@ -28,10 +56,17 @@
        if code not in cls.__latest_limit_up_sell_list_dict:
            cls.__latest_limit_up_sell_list_dict[code] = []
        cls.__latest_limit_up_sell_list_dict[code].append(data)
        if code not in cls.__latest_limit_up_sell_order_no_index_dict:
            cls.__latest_limit_up_sell_order_no_index_dict[code] = {}
        cls.__latest_limit_up_sell_order_no_index_dict[code][data['val']['orderNo']] = len(
            cls.__latest_limit_up_sell_list_dict[code]) - 1
        # async_log_util.info(logger_debug, f"添加涨停卖数据:{code}-{data}")
        if code not in cls.__latest_limit_up_sell_order_no_set_dict:
            cls.__latest_limit_up_sell_order_no_set_dict[code] = set()
        cls.__latest_limit_up_sell_order_no_set_dict[code].add(data['val']['orderNo'])
        # 只保留前20的数据
        if len(cls.__latest_limit_up_sell_list_dict[code]) > 20:
            delete_datas = cls.__latest_limit_up_sell_list_dict[code][:-20]
            cls.__latest_limit_up_sell_list_dict[code] = cls.__latest_limit_up_sell_list_dict[code][-20:]
            # 删除之前的map
            for d in delete_datas:
                cls.__latest_limit_up_sell_order_no_set_dict[code].discard(d["val"]["orderNo"])
    @classmethod
    def add_l2_delegate_limit_up_sell_cancel(cls, code, order_no):
@@ -41,54 +76,217 @@
        @param order_no:
        @return:
        """
        if code not in cls.__latest_limit_up_sell_order_no_index_dict:
        if code not in cls.__latest_limit_up_sell_order_no_set_dict:
            return
        index = cls.__latest_limit_up_sell_order_no_index_dict[code].get(order_no)
        if index is None:
        order_no_set = cls.__latest_limit_up_sell_order_no_set_dict[code]
        if order_no not in order_no_set:
            return
        cls.__latest_limit_up_sell_order_no_index_dict[code].pop(order_no)
        del cls.__latest_limit_up_sell_list_dict[code][index]
        cls.__latest_limit_up_sell_order_no_set_dict[code].discard(order_no)
        for i in range(0, len(cls.__latest_limit_up_sell_list_dict[code])):
            if cls.__latest_limit_up_sell_list_dict[code][i]['val']['orderNo'] == order_no:
                cls.__latest_limit_up_sell_list_dict[code].pop(i)
                break
    @classmethod
    def process_passive_limit_up_sell_data(cls, data):
    def get_latest_limit_up_sell_order_count(cls, code):
        sell_list = cls.__latest_limit_up_sell_list_dict.get(code)
        if not sell_list:
            return 0
        return len(sell_list)
    @classmethod
    def process_passive_limit_up_sell_data(cls, code, fdatas):
        """
        添加涨停被动卖成交数据
        @param data: 数据格式:(data['SecurityID'], data['TradePrice'], data['TradeVolume'],
        @param fdata: 数据格式:(data['SecurityID'], data['TradePrice'], data['TradeVolume'],
        #           data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'],
        #           data['SellNo'], data['ExecType'])
                    [(数据本身, 是否主动买, 是否涨停, 总成交额, 不含ms时间,含ms时间)]
        @return: 涨停卖是否已经吃完
        """
        try:
            start_time = time.time()
            sell_list = cls.__latest_limit_up_sell_list_dict.get(code)
            if not sell_list:
                return False
            last_sell_info = sell_list[-1]
            for data in fdatas:
                # 过滤被动买
                if not data[1]:
                    # 出现被动买需要将历史大单清空
                    if cls.__active_buy_order_datas_dict.get(code):
                        cls.__active_buy_order_datas_dict[code].clear()
                    continue
                money = data[3]
                # 统计买单数据
                if code not in cls.__latest_active_buy_order_data_dict:
                    # [买单号,当前成交股数, 当前成交金额, 开始时间, 结束时间]
                    cls.__latest_active_buy_order_data_dict[code] = [data[0][6], data[0][2], money, data[0][3],
                                                                     data[0][3]]
                else:
                    if data[0][6] == cls.__latest_active_buy_order_data_dict[code][0]:
                        # 同一买单号
                        cls.__latest_active_buy_order_data_dict[code][1] += data[0][2]
                        cls.__latest_active_buy_order_data_dict[code][2] += money
                        cls.__latest_active_buy_order_data_dict[code][4] = data[0][3]
                    else:
                        # 不同买单号
                        if cls.__latest_active_buy_order_data_dict[code][2] >= 2990000:
                            # 加入大单列表
                            if code not in cls.__active_buy_order_datas_dict:
                                cls.__active_buy_order_datas_dict[code] = []
                            cls.__active_buy_order_datas_dict[code].append(
                                cls.__latest_active_buy_order_data_dict[code])
                        cls.__latest_active_buy_order_data_dict[code] = [data[0][6], data[0][2], money, data[0][3],
                                                                         data[0][3]]
                if not data[2]:
                    # 排除主动卖/非涨停卖
                    continue
                sell_no = data[0][7]
                if sell_no != last_sell_info['val']['orderNo']:
                    continue
                # 需要判断当前单是否已经成交完成
                if code not in cls.__latest_sell_data:
                    cls.__latest_sell_data[code] = [sell_no, data[0][2]]
                else:
                    if cls.__latest_sell_data[code][0] == sell_no:
                        cls.__latest_sell_data[code][1] += data[0][2]
                    else:
                        cls.__latest_sell_data[code] = [sell_no, data[0][2]]
                sell_info_num = last_sell_info['val']['num']
                deal_num = cls.__latest_sell_data[code][1] // 100
                if sell_info_num == deal_num:
                    use_time = round((time.time() - start_time) * 1000, 3)
                    l2_log.info(code, logger_l2_trade_buy,
                                f"找到最近的被动涨停卖单数据:{last_sell_info['val']['orderNo']}, 成交数据:{data} 计算耗时:{use_time}ms, 可以触发下单")
                    # 将历史大单列表与最近的大单加入列表
                    big_buy_order_datas = []
                    if code in cls.__active_buy_order_datas_dict:
                        big_buy_order_datas.extend(cls.__active_buy_order_datas_dict[code])
                    if cls.__latest_active_buy_order_data_dict[code][2] >= 2990000:
                        big_buy_order_datas.append(cls.__latest_active_buy_order_data_dict[code])
                    # 成交完成
                    L2TradeSingleDataManager.set_latest_sell_data(code, data, big_buy_order_datas)
                    l2_log.info(code, logger_l2_trade_buy, "被动卖数据处理完毕")
                    if tool.is_sz_code(code):
                        # 涨停主动卖已经被吃完,可以清除
                        return True
                    break
        except Exception as e:
            logger_debug.exception(e)
        return False
    @classmethod
    def filter_last_limit_up_sell_data(cls, code, fdatas):
        """
        筛选出最后一条涨停卖成交数据
        @param code:
        @param fdatas:
        @return: (成交数据, 卖单数据)
        """
        def compute_last_sell():
            # 最大买单号
            max_buy_order_no = fdatas[-1][0][6]
            for i in range(len(sell_list) - 1, -1, -1):
                if sell_list[i]['val']['orderNo'] > max_buy_order_no:
                    continue
                return sell_list[i]
            return None
        if not fdatas[-1][2]:
            # 最后一条数据不是涨停成交数据
            return None
        sell_list = cls.__latest_limit_up_sell_list_dict.get(code)
        if not sell_list:
            return None
        last_sell_info = compute_last_sell()
        if not last_sell_info:
            return None
        for data in fdatas:
            if not data[2]:
                # 排除主动卖/非涨停卖
                continue
            sell_no = data[0][7]
            if sell_no != last_sell_info['val']['orderNo']:
                continue
            # 需要判断当前单是否已经成交完成
            if code not in cls.__latest_sell_data:
                cls.__latest_sell_data[code] = [sell_no, data[0][2]]
            else:
                if cls.__latest_sell_data[code][0] == sell_no:
                    cls.__latest_sell_data[code][1] += data[0][2]
                else:
                    cls.__latest_sell_data[code] = [sell_no, data[0][2]]
            sell_info_num = last_sell_info['val']['num']
            deal_num = cls.__latest_sell_data[code][1] // 100
            if sell_info_num == deal_num:
                # 最后一笔涨停卖已经成交完成
                l2_log.info(code, logger_l2_trade_buy,
                            f"找到最近的被动涨停卖单数据:{last_sell_info}, 成交数据:{data}  可以触发下单")
                return data, last_sell_info
        return None
    @classmethod
    def add_active_limit_up_sell_data(cls, data):
        """
        添加主动卖数据
        @param data:
        @return:
        """
        try:
            # 需要判断当前单是否已经成交完成
            code = data[0]
            sell_no = data[7]
            if code not in cls.__latest_sell_data:
                cls.__latest_sell_data[code] = [sell_no, data[2]]
            else:
                if cls.__latest_sell_data[code][0] == sell_no:
                    cls.__latest_sell_data[code][1] += data[2]
                else:
                    cls.__latest_sell_data[code] = [sell_no, data[2]]
            # 判断是否是最后一笔卖单
            l2_log.info(code, logger_l2_trade_buy, f"被动卖数据:{data}")
            if code not in cls.__active_sell_order_no_set_dict:
                cls.__active_sell_order_no_set_dict[code] = set()
            cls.__active_sell_order_no_set_dict[code].add(sell_no)
            # 判断这个订单号是否成交完
            sell_list = cls.__latest_limit_up_sell_list_dict.get(code)
            if not sell_list:
                return
            sell_info = sell_list[-1]
            l2_log.info(code, logger_l2_trade_buy, f"最近涨停卖:{sell_info['val']['orderNo']}")
            if sell_no == sell_info['val']['orderNo'] and sell_info["val"]["num"] == cls.__latest_sell_data[code][
                1] // 100:
                # 成交完成
                L2TradeSingleDataManager.set_latest_sell_data(code, data)
                logger_l2_trade_buy.info(f"{code}#找到最近的被动涨停卖单数据:{data['val']['orderNo']}, 可以触发下单")
                # l2_log.info(code, logger_l2_trade_buy, f"找到最近的被动涨停卖单数据:{data['val']['orderNo']}, 可以触发下单")
        except Exception as e:
            logger_debug.exception(e)
class L2TradeSingleCallback:
    """
    交易信号回调
    """
    def OnTradeSingle(self, code, big_buy_order_count, _type, data):
        """
         交易数据信号回调
        @param code:
        @param big_buy_order_count: 大买单数量
        @param _type: 类型:0-TYPE_PASSIVE  1-TYPE_ACTIVE
        @param data: (逐笔成交数据,生效时间)
        @return:
        """
    def OnLimitUpActiveBuy(self, code, transaction_data, no_left_limit_up_sell):
        """
         涨停主动买触发
        @param code:
        @param transaction_data: 成交数据
        @param no_left_limit_up_sell: 是否还剩余涨停卖
        @return:
        """
    def OnLastLimitUpSellDeal(self, code, data):
        """
        最后一笔涨停卖成交
        @param code: 代码
        @param data: 成交的数据
        @return:
        """
class L2TradeSingleDataManager:
    __callback = None
    TYPE_PASSIVE = 0
    TYPE_ACTIVE = 1
@@ -102,18 +300,33 @@
    __latest_sell_active_deal_data_dict = {}  # 格式为:{code:(逐笔成交数据,生效时间(带ms))}
    @classmethod
    def set_latest_sell_data(cls, code, data):
    def set_callback(cls, callback: L2TradeSingleCallback):
        cls.__callback = callback
    @classmethod
    def set_latest_sell_data(cls, code, fdata, big_active_buy_order_datas):
        """
        设置最近成交的涨停卖被动成交数据
        @param code: 代码
        @param data: L2逐笔成交数据  数据格式:(data['SecurityID'], data['TradePrice'], data['TradeVolume'],
        #           data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'],
        #           data['SellNo'], data['ExecType'])
        @param big_active_buy_order_datas: 大主动买单数据:[[买单号,当前成交股数, 当前成交金额, 开始时间, 结束时间],....]
        @return:
        """
        deal_time = l2_huaxin_util.convert_time(data[3], True)
        deal_time = fdata[5]
        # 生效时间在1s以内
        cls.__latest_sell_data_dict[code] = (data, tool.trade_time_add_millionsecond(deal_time, 1000))
        cls.__latest_sell_data_dict[code] = (fdata[0], tool.trade_time_add_millionsecond(deal_time, 1000))
        if cls.__callback:
            big_buy_order_count = 0
            if big_active_buy_order_datas:
                for b in big_active_buy_order_datas:
                    if b[0] > fdata[0][7]:
                        # 买单在卖单之后
                        big_buy_order_count += 1
            cls.__callback.OnTradeSingle(code, big_buy_order_count, cls.TYPE_PASSIVE, cls.__latest_sell_data_dict[code])
    @classmethod
    def set_sell_passive_to_active_datas(cls, code, passive_data, active_data):
@@ -124,10 +337,19 @@
        @param active_data: 主动卖成交逐笔
        @return:
        """
        deal_time = l2_huaxin_util.convert_time(passive_data[3], True)
        # 生效时间在1s以内
        cls.__latest_sell_active_deal_data_dict[code] = (
            active_data, tool.trade_time_add_millionsecond(deal_time, 1000))
        # 暂时不需要生效
        l2_log.info(code, logger_l2_trade_buy, f"被动卖变主动卖:{passive_data} => {active_data}")
        # deal_time = l2_huaxin_util.convert_time(passive_data[3], True)
        # # 生效时间在1s以内
        # cls.__latest_sell_active_deal_data_dict[code] = (
        #     active_data, tool.trade_time_add_millionsecond(deal_time, 1000))
        # if cls.__callback:
        #     cls.__callback.OnTradeSingle(code, 0, cls.TYPE_ACTIVE, cls.__latest_sell_active_deal_data_dict[code])
    @classmethod
    def set_limit_up_active_buy(cls, code, transaction_datas, no_left_limit_up_sell):
        if transaction_datas:
            cls.__callback.OnLimitUpActiveBuy(code, transaction_datas, no_left_limit_up_sell)
    @classmethod
    def get_valid_trade_single(cls, code, latest_time_with_ms):
@@ -158,7 +380,7 @@
        single = cls.get_valid_trade_single(code, buy_data_time_with_ms)
        if not single:
            return False, "没有找到可用信号"
        if single[0][6] < buy_data['val']['orderNo']:
        if single[0][0][6] < buy_data['val']['orderNo']:
            # 信号位置的成交买单号要小于买入数据的买单号才行
            return True, f"找到信号:{single}"
        else:
@@ -171,3 +393,5 @@
        if code in cls.__latest_sell_active_deal_data_dict:
            cls.__latest_sell_active_deal_data_dict.pop(code)
        L2TradeSingleDataProcessor.clear_passive_sell_data(code)