Administrator
2025-03-13 984e59be6787f06b927d5ec612f443f54e145044
真实下单位修改/L2成交处理速度提升
12个文件已修改
424 ■■■■■ 已修改文件
huaxin_client/l1_api_client.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_client.py 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_client_v2.py 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/huaxin/huaxin_delegate_postion_manager.py 12 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_limitup_sell_data_manager.py 20 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_transaction_data_manager.py 175 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_transaction_data_processor.py 142 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/place_order_single_data_manager.py 47 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/subscript/l2_subscript_manager.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_test.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/hx_qc_value_util.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l1_api_client.py
@@ -306,7 +306,7 @@
def main():
    request_queue, response_queue = multiprocessing.Queue(), multiprocessing.Queue()
    request_queue, response_queue = multiprocessing.Queue(maxsize=1024), multiprocessing.Queue(maxsize=1024)
    run(request_queue, response_queue)
huaxin_client/l2_client.py
@@ -724,13 +724,13 @@
             'OrderTime': '13000015',
             'MainSeq': 2, 'SubSeq': 6739148, 'OrderNO': 5512467, 'OrderStatus': 'D'}, 0)
    queue_r = multiprocessing.Queue()
    queue_r = multiprocessing.Queue(maxsize=1024)
    order_queues = []
    transaction_queues = []
    market_queue = multiprocessing.Queue()
    market_queue = multiprocessing.Queue(maxsize=1024)
    for i in range(20):
        order_queues.append(multiprocessing.Queue())
        transaction_queues.append(multiprocessing.Queue())
        order_queues.append(multiprocessing.Queue(maxsize=1024))
        transaction_queues.append(multiprocessing.Queue(maxsize=1024))
    threading.Thread(target=test_add_codes).start()
    run(queue_r, order_queues, transaction_queues, market_queue)
huaxin_client/l2_client_v2.py
@@ -711,13 +711,13 @@
             'OrderTime': '13000015',
             'MainSeq': 2, 'SubSeq': 6739148, 'OrderNO': 5512467, 'OrderStatus': 'D'}, 0)
    queue_r = multiprocessing.Queue()
    queue_r = multiprocessing.Queue(maxsize=1024)
    order_queues = []
    transaction_queues = []
    market_queue = multiprocessing.Queue()
    market_queue = multiprocessing.Queue(maxsize=1024)
    for i in range(20):
        order_queues.append(multiprocessing.Queue())
        transaction_queues.append(multiprocessing.Queue())
        order_queues.append(multiprocessing.Queue(maxsize=1024))
        transaction_queues.append(multiprocessing.Queue(maxsize=1024))
    threading.Thread(target=test_add_codes).start()
    run(queue_r, order_queues, transaction_queues, market_queue)
l2/huaxin/huaxin_delegate_postion_manager.py
@@ -223,16 +223,28 @@
        if not match_list:
            # 没有找到真实下单位
            return None
        THRESHOLD_MIN_MS = 0 if tool.is_sz_code(code) else 90
        # 最合适的是时间相差为0,索引相差为1
        for m in match_list:
            if m[0] == 0 and m[1] == 1:
                # 与执行位L2的数据必须相差指定秒数
                if tool.trade_time_sub_with_ms(L2DataUtil.get_time_with_ms(m[2]["val"]),
                                               exec_time_with_ms) < THRESHOLD_MIN_MS:
                    continue
                return m[2]
        # 获取时间差最小的数据
        real_place_order_info = match_list[0]
        for x in match_list:
            # 与执行位L2的数据必须相差指定秒数
            if tool.trade_time_sub_with_ms(L2DataUtil.get_time_with_ms(x[2]["val"]),
                                           exec_time_with_ms) < THRESHOLD_MIN_MS:
                continue
            if x[0] < real_place_order_info[0]:
                real_place_order_info = x
        if tool.trade_time_sub_with_ms(L2DataUtil.get_time_with_ms(real_place_order_info[2]["val"]),
                                       exec_time_with_ms) < THRESHOLD_MIN_MS:
            return None
        return real_place_order_info[2]
    @classmethod
l2/l2_limitup_sell_data_manager.py
@@ -71,32 +71,32 @@
        return cls.__delegating_sell_num_dict.get(code)
    @classmethod
    def set_deal_datas(cls, code, datas):
    def set_deal_datas(cls, code, fdatas):
        """
        设置成交的卖单
        @param code:
        @param datas:  q.append((data['SecurityID'], data['TradePrice'], data['TradeVolume'],data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'],data['SellNo'], data['ExecType']))
        @param fdatas:  数据本身格式: (data['SecurityID'], data['TradePrice'], data['TradeVolume'],data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'],data['SellNo'], data['ExecType'])
                        [(数据本身, 是否主动买, 是否涨停, 总成交额, 不含ms时间,含ms时间)]
        @return: 是否触发计算
        """
        try:
            limit_up_price = gpcode_manager.get_limit_up_price_as_num(code)
            order_no_set = cls.__order_no_set_dict.get(code)
            if order_no_set is None:
                order_no_set = set()
            limit_up_active_buy_datas = []
            for d in datas:
            for d in fdatas:
                # 是否有涨停主动买成交
                if d[6] < d[7]:
                if not d[1]:
                    continue
                if abs(d[1] - limit_up_price) > 0.001:
                if not d[2]:
                    continue
                limit_up_active_buy_datas.append(d)
            total_deal_volume = 0
            if code in cls.__delegating_sell_num_dict:
                for d in datas:
                for d in fdatas:
                    # 减去
                    if d[7] in order_no_set:
                        total_deal_volume += d[2]
                    if d[0][7] in order_no_set:
                        total_deal_volume += d[0][2]
                cls.__delegating_sell_num_dict[code] -= total_deal_volume
            if len(limit_up_active_buy_datas):
@@ -104,7 +104,7 @@
                            f"涨停主动买成交:{limit_up_active_buy_datas}")
                # 打印日志
                l2_log.info(code, hx_logger_l2_sell_deal,
                            f"有涨停主动卖:{code}-{datas[-1][3]}-{cls.__delegating_sell_num_dict.get(code)}, 成交量-{total_deal_volume}")
                            f"有涨停主动卖:{code}-{fdatas[-1][0][3]}-{cls.__delegating_sell_num_dict.get(code)}, 成交量-{total_deal_volume}")
        except:
            pass
l2/l2_transaction_data_manager.py
@@ -199,37 +199,37 @@
        return cls.__dealing_active_buy_order_info_dict.get(code)
    @classmethod
    def statistic_big_buy_data(cls, code, datas, limit_up_price):
    def statistic_big_buy_data(cls, code, fdatas, limit_up_price):
        """
        统计大单买
        @param code:
        @param datas:
        @param fdatas: [(数据本身, 是否主动买, 是否涨停, 总成交额, 不含ms时间,含ms时间)]
        @return: 返回数据里面(成交的大单,50w以上的单)
        """
        big_buy_datas = []
        normal_buy_datas = []
        # 大单阈值
        threshold_big_money = l2_data_util.get_big_money_val(limit_up_price, tool.is_ge_code(code))
        for data in datas:
        for data in fdatas:
            # q.append((data['SecurityID'], data['TradePrice'], data['TradeVolume'],
            #                   data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'],
            #                   data['SellNo'], data['ExecType']))
            is_limit_up = abs(data[1] - limit_up_price) < 0.0001
            money = data[2] * data[1]
            is_limit_up = data[2]
            money = data[3]
            if code not in cls.__dealing_order_info_dict:
                # 数据格式[订单号,总股数,成交金额,成交开始时间,成交结束时间, 最近的成交价格, 最近的卖单号, 涨停价成交金额]
                cls.__dealing_order_info_dict[code] = [data[6], data[2], money, data[3], data[3], data[1],
                                                       data[7], 0]
                cls.__dealing_order_info_dict[code] = [data[0][6], data[0][2], money, data[0][3], data[0][3], data[0][1],
                                                       data[0][7], 0]
                if is_limit_up:
                    cls.__dealing_order_info_dict[code][7] += money
            else:
                if cls.__dealing_order_info_dict[code][0] == data[6]:
                if cls.__dealing_order_info_dict[code][0] == data[0][6]:
                    # 成交同一个订单号
                    cls.__dealing_order_info_dict[code][1] += data[2]
                    cls.__dealing_order_info_dict[code][1] += data[0][2]
                    cls.__dealing_order_info_dict[code][2] += money
                    cls.__dealing_order_info_dict[code][4] = data[3]
                    cls.__dealing_order_info_dict[code][5] = data[1]
                    cls.__dealing_order_info_dict[code][6] = data[7]
                    cls.__dealing_order_info_dict[code][4] = data[0][3]
                    cls.__dealing_order_info_dict[code][5] = data[0][1]
                    cls.__dealing_order_info_dict[code][6] = data[0][7]
                    if is_limit_up:
                        cls.__dealing_order_info_dict[code][7] += money
                else:
@@ -247,28 +247,28 @@
                        normal_buy_datas.append(deal_info)
                    # 初始化本条数据
                    cls.__dealing_order_info_dict[code] = [data[6], data[2], money, data[3], data[3],
                                                           data[1], data[7], 0]
                    cls.__dealing_order_info_dict[code] = [data[0][6], data[0][2], money, data[0][3], data[0][3],
                                                           data[0][1], data[0][7], 0]
                    if is_limit_up:
                        cls.__dealing_order_info_dict[code][7] += money
            # 统计主动买(买单号大于卖单号)
            try:
                if data[6] > data[7]:
                if data[1]:
                    if code not in cls.__dealing_active_buy_order_info_dict:
                        # 数据格式[订单号,总股数,成交金额,成交开始时间,成交结束时间]
                        cls.__dealing_active_buy_order_info_dict[code] = [data[6], data[2], data[2] * data[1], data[3],
                                                                          data[3]]
                        cls.__dealing_active_buy_order_info_dict[code] = [data[0][6], data[0][2], money, data[0][3],
                                                                          data[0][3]]
                    else:
                        if cls.__dealing_active_buy_order_info_dict[code][0] == data[6]:
                        if cls.__dealing_active_buy_order_info_dict[code][0] == data[0][6]:
                            # 成交同一个订单号
                            cls.__dealing_active_buy_order_info_dict[code][1] += data[2]
                            cls.__dealing_active_buy_order_info_dict[code][2] += data[2] * data[1]
                            cls.__dealing_active_buy_order_info_dict[code][4] = data[3]
                            cls.__dealing_active_buy_order_info_dict[code][1] += data[0][2]
                            cls.__dealing_active_buy_order_info_dict[code][2] += money
                            cls.__dealing_active_buy_order_info_dict[code][4] = data[0][3]
                        else:
                            # 初始化本条数据
                            cls.__dealing_active_buy_order_info_dict[code] = [data[6], data[2], data[2] * data[1],
                                                                              data[3], data[3]]
                            cls.__dealing_active_buy_order_info_dict[code] = [data[0][6], data[0][2], money,
                                                                              data[0][3], data[0][3]]
            except:
                pass
@@ -278,7 +278,7 @@
# 卖单统计数据
class HuaXinSellOrderStatisticManager:
    # 最近的大卖单成交,格式:{code:[卖单信息,...]}
    __latest_sell_order_info_list_dict = {}
    # __latest_sell_order_info_list_dict = {}
    # 大单卖单号的集合,格式:{code:{卖单号}}
    __big_sell_order_ids_dict = {}
@@ -303,25 +303,26 @@
    __dealing_order_info_dict = {}
    @classmethod
    def statistic_big_sell_data(cls, code, datas):
    def statistic_big_sell_data(cls, code, fdatas):
        """
        统计大卖单
        @param code:
        @param datas:
        @param fdatas: [(数据本身, 是否主动买, 是否涨停, 总成交额, 不含ms时间,含ms时间)]
        @return: 返回数据里面成交的大单
        """
        big_sell_datas = []
        for data in datas:
        for data in fdatas:
            money = data[3]
            # q.append((data['SecurityID'], data['TradePrice'], data['TradeVolume'],
            #                   data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'],
            #                   data['SellNo'], data['ExecType']))
            if code not in cls.__dealing_order_info_dict:
                # 数据格式[订单号,总股数,成交金额]
                cls.__dealing_order_info_dict[code] = [data[7], data[2], data[2] * data[1]]
            if cls.__dealing_order_info_dict[code][0] == data[7]:
                cls.__dealing_order_info_dict[code] = [data[0][7], data[0][2], money]
            if cls.__dealing_order_info_dict[code][0] == data[0][7]:
                # 成交同一个订单号
                cls.__dealing_order_info_dict[code][1] += data[2]
                cls.__dealing_order_info_dict[code][2] += data[2] * data[1]
                cls.__dealing_order_info_dict[code][1] += data[0][2]
                cls.__dealing_order_info_dict[code][2] += money
            else:
                # 保存上一条数据
                l2_log.info(code, hx_logger_l2_transaction_desc, f"{code}#{cls.__dealing_order_info_dict[code]}")
@@ -331,7 +332,7 @@
                if deal_info[2] >= 2990000:
                    big_sell_datas.append(deal_info)
                # 初始化本条数据
                cls.__dealing_order_info_dict[code] = [data[7], data[2], data[2] * data[1]]
                cls.__dealing_order_info_dict[code] = [data[0][7], data[0][2], money]
        return big_sell_datas
    # 统计所有的成交量
@@ -340,31 +341,32 @@
    __deal_active_buy_volume_list_dict = {}
    @classmethod
    def statistic_total_deal_volume(cls, code, datas, limit_up_price):
    def statistic_total_deal_volume(cls, code, fdatas, limit_up_price):
        """
        统计总共的成交量
        @param code:
        @param fdatas: [(数据本身, 是否主动买, 是否涨停, 总成交额, 不含ms时间,含ms时间)]
        @param limit_up_price:
        @return:
        """
        # 只统计被动买
        if code not in cls.__deal_volume_list_dict:
            cls.__deal_volume_list_dict[code] = []
        time_dict = {}
        for d in datas:
        for d in fdatas:
            # 只统计被动买
            if d[7] < d[6]:
            if d[1]:
                continue
            time_str = ''
            if d[3] in time_dict:
                time_str = time_dict[d[3]]
            else:
                time_dict[d[3]] = l2_huaxin_util.convert_time(d[3])
                time_str = time_dict[d[3]]
            time_str = d[4]
            if cls.__deal_volume_list_dict[code]:
                if cls.__deal_volume_list_dict[code][-1][0] == time_str:
                    # 如果是同一秒
                    cls.__deal_volume_list_dict[code][-1][1] += d[2]
                    cls.__deal_volume_list_dict[code][-1][1] += d[0][2]
                else:
                    # 不是同一秒
                    cls.__deal_volume_list_dict[code].append([time_str, d[2]])
                    cls.__deal_volume_list_dict[code].append([time_str, d[0][2]])
            else:
                cls.__deal_volume_list_dict[code].append([time_str, d[2]])
                cls.__deal_volume_list_dict[code].append([time_str, d[0][2]])
        # 删除超过5条数据
        if len(cls.__deal_volume_list_dict[code]) > 5:
            cls.__deal_volume_list_dict[code] = cls.__deal_volume_list_dict[code][-5:]
@@ -373,34 +375,28 @@
            # 统计主动买的成交量
            if code not in cls.__deal_active_buy_volume_list_dict:
                cls.__deal_active_buy_volume_list_dict[code] = []
            for d in datas:
            for d in fdatas:
                # 只统计主动买
                if d[7] > d[6]:
                if not d[1]:
                    continue
                # 只统计涨停买
                if d[1] != limit_up_price:
                if not d[2]:
                    continue
                if d[3] in time_dict:
                    time_str = time_dict[d[3]]
                else:
                    time_dict[d[3]] = l2_huaxin_util.convert_time(d[3])
                    time_str = time_dict[d[3]]
                time_str = d[4]
                if cls.__deal_active_buy_volume_list_dict[code]:
                    if cls.__deal_active_buy_volume_list_dict[code][-1][0] == time_str:
                        # 如果是同一秒
                        cls.__deal_active_buy_volume_list_dict[code][-1][1] += d[2]
                        cls.__deal_active_buy_volume_list_dict[code][-1][1] += d[0][2]
                    else:
                        # 不是同一秒
                        cls.__deal_active_buy_volume_list_dict[code].append([time_str, d[2]])
                        cls.__deal_active_buy_volume_list_dict[code].append([time_str, d[0][2]])
                else:
                    cls.__deal_active_buy_volume_list_dict[code].append([time_str, d[2]])
                    cls.__deal_active_buy_volume_list_dict[code].append([time_str, d[0][2]])
            # 删除超过10条数据
            if len(cls.__deal_active_buy_volume_list_dict[code]) > 10:
                cls.__deal_active_buy_volume_list_dict[code] = cls.__deal_active_buy_volume_list_dict[code][-10:]
        except:
            pass
        time_dict.clear()
    @classmethod
    def get_latest_3s_continue_deal_volumes(cls, code):
@@ -461,18 +457,21 @@
    # 返回最近1s的大单卖:(总卖金额,[(卖单号,总手数,价格,('开始时间',买单号),('结束时间',买单号)),...])
    @classmethod
    def add_transaction_datas(cls, code, datas, limit_up_price=None):
        # 是否为主动卖
        def is_active_sell(sell_no, buy_no):
            return sell_no > buy_no
    def add_transaction_datas(cls, code, fdatas, limit_up_price=None):
        """
        @param code:
        @param fdatas: [(数据本身, 是否主动买, 是否涨停, 总成交额, 不含ms时间,含ms时间)]
        @param limit_up_price:
        @return:
        """
        f_start_time = time.time()
        use_time_list = []
        # q.append((data['SecurityID'], data['TradePrice'], data['TradeVolume'],
        #           data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'],
        #           data['SellNo'], data['ExecType']))
        if code not in cls.__latest_sell_order_info_list_dict:
            cls.__latest_sell_order_info_list_dict[code] = []
        # if code not in cls.__latest_sell_order_info_list_dict:
        #     cls.__latest_sell_order_info_list_dict[code] = []
        if code not in cls.__big_sell_order_ids_dict:
            cls.__big_sell_order_ids_dict[code] = set()
        if code not in cls.__big_sell_order_info_dict:
@@ -485,41 +484,39 @@
            cls.__latest_all_sell_orders_dict[code] = []
        # 保存最近的成交价格:(价格,成交时间)
        cls.__latest_trade_price_dict[code] = (datas[-1][1], datas[-1][3])
        cls.__latest_trade_price_dict[code] = (fdatas[-1][0][1], fdatas[-1][0][3])
        __start_time = time.time()
        # 是否还有涨停卖剩下
        no_left_limit_up_sell = L2TradeSingleDataProcessor.process_passive_limit_up_sell_data(code, datas,
        no_left_limit_up_sell = L2TradeSingleDataProcessor.process_passive_limit_up_sell_data(code, fdatas,
                                                                                              limit_up_price)
        use_time = time.time() - __start_time
        __start_time = time.time()
        use_time_list.append(("处理涨停卖", use_time))
        for d in datas:
        for d in fdatas:
            # 获取当前是否为主动买
            try:
                _is_active_sell = is_active_sell(d[7], d[6])
                if not _is_active_sell:
                if d[1]:
                    # 主动买
                    continue
                if d[1] == limit_up_price:
                if d[2]:
                    # 涨停主动卖
                    L2TradeSingleDataProcessor.add_active_limit_up_sell_data(d)
                    L2TradeSingleDataProcessor.add_active_limit_up_sell_data(d[0])
                # 判断是否是涨停被动变主动
                last_trade_data = cls.__last_trade_data_dict.get(code)
                if last_trade_data and not is_active_sell(last_trade_data[7], last_trade_data[6]) and last_trade_data[
                    1] == limit_up_price:
                    if d[1] == limit_up_price:
                if last_trade_data and last_trade_data[1] and last_trade_data[2]:
                    if d[2]:
                        # 涨停被动变主动
                        L2TradeSingleDataManager.set_sell_passive_to_active_datas(code, last_trade_data, d)
                cls.__latest_sell_order_info_list_dict[code].append(d)
                        L2TradeSingleDataManager.set_sell_passive_to_active_datas(code, last_trade_data[0], d[0])
                # cls.__latest_sell_order_info_list_dict[code].append(d)
                if code not in cls.__latest_sell_order_dict:
                    cls.__latest_sell_order_dict[code] = [d[7], d[2], d[1], (d[3], d[6]), (d[3], d[6])]
                    cls.__latest_sell_order_dict[code] = [d[0][7], d[0][2], d[0][1], (d[0][3], d[0][6]), (d[0][3], d[0][6])]
                else:
                    if cls.__latest_sell_order_dict[code][0] == d[7]:
                        cls.__latest_sell_order_dict[code][1] += d[2]
                        cls.__latest_sell_order_dict[code][2] = d[1]
                        cls.__latest_sell_order_dict[code][4] = (d[3], d[6])
                    if cls.__latest_sell_order_dict[code][0] == d[0][7]:
                        cls.__latest_sell_order_dict[code][1] += d[0][2]
                        cls.__latest_sell_order_dict[code][2] = d[0][1]
                        cls.__latest_sell_order_dict[code][4] = (d[0][3], d[0][6])
                    else:
                        info = cls.__latest_sell_order_dict[code]
@@ -541,7 +538,7 @@
                            # 将涨停主动卖记入日志
                            l2_log.info(code, hx_logger_l2_active_sell, f"{info}")
                        cls.__latest_sell_order_dict[code] = [d[7], d[2], d[1], (d[3], d[6]), (d[3], d[6])]
                        cls.__latest_sell_order_dict[code] = [d[0][7], d[0][2], d[0][1], (d[0][3], d[0][6]), (d[0][3], d[0][6])]
            finally:
                cls.__last_trade_data_dict[code] = d
@@ -549,7 +546,7 @@
        __start_time = time.time()
        use_time_list.append(("大单统计", use_time))
        latest_time = l2_huaxin_util.convert_time(datas[-1][3], with_ms=True)
        latest_time = fdatas[-1][5]
        min_time = tool.trade_time_add_millionsecond(latest_time, -1000)
        min_time_int = int(min_time.replace(":", "").replace(".", ""))
        # 计算最近1s的大单成交
@@ -605,14 +602,14 @@
        # ----------------统计涨停主动买-----------------
        try:
            limit_up_active_buy_datas = []
            for d in datas:
                if is_active_sell(d[7], d[6]):
            for d in fdatas:
                if not d[1]:
                    # 被动买
                    continue
                # 是否是涨停
                if d[1] == limit_up_price:
                if d[2]:
                    # 有涨停主动买
                    limit_up_active_buy_datas.append(d)
                    limit_up_active_buy_datas.append(d[0])
            L2TradeSingleDataManager.set_limit_up_active_buy(code, limit_up_active_buy_datas, no_left_limit_up_sell)
            use_time = time.time() - __start_time
@@ -624,7 +621,7 @@
        use_time = time.time() - f_start_time
        if use_time > 0.01:
            l2_log.info(code, hx_logger_l2_upload,
                        f"{code}处理成交详细用时:{use_time} 数据数量:{len(datas)}  详情:{use_time_list}")
                        f"{code}处理成交详细用时:{use_time} 数据数量:{len(fdatas)}  详情:{use_time_list}")
        return total_sell_info
    # 获取最近成交数据
l2/l2_transaction_data_processor.py
@@ -1,6 +1,8 @@
import logging
import time
import dask
import constant
from cancel_strategy.s_l_h_cancel_strategy import HourCancelBigNumComputer
from cancel_strategy.s_l_h_cancel_strategy import LCancelBigNumComputer, LCancelRateManager
@@ -32,11 +34,11 @@
    # 计算成交进度
    @classmethod
    def __compute_latest_trade_progress(cls, code, buyno_map, datas):
    def __compute_latest_trade_progress(cls, code, buyno_map, fdatas):
        buy_progress_index = None
        for i in range(len(datas) - 1, -1, -1):
            d = datas[i]
            buy_no = f"{d[6]}"
        for i in range(len(fdatas) - 1, -1, -1):
            d = fdatas[i]
            buy_no = f"{d[0][6]}"
            if buyno_map and buy_no in buyno_map:
                # 成交进度位必须是涨停买
                if L2DataUtil.is_limit_up_price_buy(buyno_map[buy_no]["val"]):
@@ -45,61 +47,94 @@
        return buy_progress_index
    @classmethod
    def statistic_big_order_infos(cls, code, datas, order_begin_pos: OrderBeginPosInfo):
    def statistic_big_order_infos(cls, code, fdatas, order_begin_pos: OrderBeginPosInfo):
        """
        统计大单成交
        @param code:
        @param datas:
        @param fdatas: 格式:[(数据本身, 是否主动买, 是否涨停, 总成交额, 不含ms时间,含ms时间)]
        @return:
        """
        limit_up_price = gpcode_manager.get_limit_up_price_as_num(code)
        buy_datas, bigger_buy_datas = HuaXinBuyOrderManager.statistic_big_buy_data(code, datas, limit_up_price)
        if buy_datas:
            BigOrderDealManager().add_buy_datas(code, buy_datas)
            active_big_buy_orders = []
        @dask.delayed
        def statistic_big_buy_data():
            buy_datas, bigger_buy_datas = HuaXinBuyOrderManager.statistic_big_buy_data(code, fdatas, limit_up_price)
            if buy_datas:
                for x in buy_datas:
                    if x[0] > x[6]:
                        # (买单号, 成交金额, 最后成交时间)
                        active_big_buy_orders.append((x[0], x[2], x[4]))
            EveryLimitupBigDealOrderManager.add_big_buy_order_deal(code, active_big_buy_orders)
        try:
            is_placed_order = l2_data_manager.TradePointManager.is_placed_order(order_begin_pos)
            if is_placed_order:
                if order_begin_pos and order_begin_pos.mode == OrderBeginPosInfo.MODE_RADICAL:
                    RadicalBuyDataManager.big_order_deal(code)
                BigOrderDealManager().add_buy_datas(code, buy_datas)
                active_big_buy_orders = []
                if buy_datas:
                    for x in buy_datas:
                        if x[0] > x[6]:
                            # (买单号, 成交金额, 最后成交时间)
                            active_big_buy_orders.append((x[0], x[2], x[4]))
                EveryLimitupBigDealOrderManager.add_big_buy_order_deal(code, active_big_buy_orders)
            try:
                is_placed_order = l2_data_manager.TradePointManager.is_placed_order(order_begin_pos)
                if is_placed_order:
                    if order_begin_pos and order_begin_pos.mode == OrderBeginPosInfo.MODE_RADICAL:
                        RadicalBuyDataManager.big_order_deal(code)
                if bigger_buy_datas:
                    # 有大于50w的大单成交
                    buyno_map = l2_data_util.local_today_buyno_map.get(code)
                    if buyno_map:
                        for buy_data in bigger_buy_datas:
                            order_no = f"{buy_data[0]}"
                            if order_no in buyno_map:
                                LCancelBigNumComputer().add_deal_index(code, buyno_map[order_no]["index"],
                                                                       order_begin_pos.buy_single_index)
        except Exception as e:
            logger_debug.exception(e)
                    if bigger_buy_datas:
                        # 有大于50w的大单成交
                        buyno_map = l2_data_util.local_today_buyno_map.get(code)
                        if buyno_map:
                            for buy_data in bigger_buy_datas:
                                order_no = f"{buy_data[0]}"
                                if order_no in buyno_map:
                                    LCancelBigNumComputer().add_deal_index(code, buyno_map[order_no]["index"],
                                                                           order_begin_pos.buy_single_index)
            except Exception as e:
                logger_debug.exception(e)
            return buy_datas
        sell_datas = HuaXinSellOrderStatisticManager.statistic_big_sell_data(code, datas)
        if sell_datas:
            BigOrderDealManager().add_sell_datas(code, sell_datas)
        @dask.delayed
        def statistic_big_sell_data():
            sell_datas = HuaXinSellOrderStatisticManager.statistic_big_sell_data(code, fdatas)
            if sell_datas:
                BigOrderDealManager().add_sell_datas(code, sell_datas)
            return sell_datas
        @dask.delayed
        def statistic_big_data(f1_, f2_):
            temp_data = f1_, f2_
            return temp_data
        limit_up_price = gpcode_manager.get_limit_up_price_as_num(code)
        # 并行处理买单与卖单
        f1 = statistic_big_buy_data()
        f2 = statistic_big_sell_data()
        dask_result = statistic_big_data(f1, f2)
        buy_datas, sell_datas = dask_result.compute()
        if buy_datas or sell_datas:
            buy_money = BigOrderDealManager().get_total_buy_money(code)
            sell_money = BigOrderDealManager().get_total_sell_money(code)
            LCancelRateManager.set_big_num_deal_info(code, buy_money, sell_money)
    @classmethod
    def process_huaxin_transaction_datas(cls, code, datas):
    def process_huaxin_transaction_datas(cls, code, o_datas):
        # TODO 整形数据,格式:[(数据本身, 是否主动买, 是否涨停, 总成交额, 不含ms时间,含ms时间)]
        limit_up_price = gpcode_manager.get_limit_up_price_as_num(code)
        # q.append((data['SecurityID'], data['TradePrice'], data['TradeVolume'],
        #                   data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'],
        #                   data['SellNo'], data['ExecType']))
        fdatas = [
            [d, d[6] > d[7], limit_up_price == d[1], d[1] * d[2], '', '']
            for d in o_datas]
        temp_time_dict = {}
        for d in fdatas:
            if d[3] not in temp_time_dict:
                temp_time_dict[d[3]] = l2_huaxin_util.convert_time(d[3], with_ms=True)
            d[5] = temp_time_dict.get(d[3])
            d[4] = d[5][:8]
        temp_time_dict.clear()
        __start_time = time.time()
        limit_up_price = gpcode_manager.get_limit_up_price_as_num(code)
        # 设置成交价
        try:
            current_price_process_manager.set_trade_price(code, datas[-1][1])
            if limit_up_price > datas[-1][1]:
            current_price_process_manager.set_trade_price(code, fdatas[-1][0][1])
            if limit_up_price > fdatas[-1][0][1]:
                # 没有涨停
                EveryLimitupBigDealOrderManager.open_limit_up(code, f"最新成交价:{datas[-1][1]}")
                EveryLimitupBigDealOrderManager.open_limit_up(code, f"最新成交价:{fdatas[-1][0][1]}")
                radical_buy_strategy.clear_data(code)
        except:
            pass
@@ -116,13 +151,13 @@
            is_placed_order = l2_data_manager.TradePointManager.is_placed_order(order_begin_pos)
            _start_time = time.time()
            L2LimitUpSellDataManager.set_deal_datas(code, datas)
            L2LimitUpSellDataManager.set_deal_datas(code, fdatas)
            use_time_list.append(("统计涨停卖成交", time.time() - _start_time))
            _start_time = time.time()
            #  大单统计
            # cls.__statistic_thread_pool.submit(cls.statistic_big_order_infos, code, datas, order_begin_pos)
            try:
                cls.statistic_big_order_infos(code, datas, order_begin_pos)
                cls.statistic_big_order_infos(code, fdatas, order_begin_pos)
            except Exception as e:
                async_log_util.error(hx_logger_l2_debug, f"统计大单出错:{str(e)}")
            use_time_list.append(("统计大单数据", time.time() - _start_time))
@@ -132,24 +167,22 @@
            try:
                # 统计上板时间
                try:
                    for d in datas:
                        if d[6] > d[7]:
                    for d in fdatas:
                        if d[1]:
                            # 主动买
                            if d[1] == limit_up_price:
                            if d[2]:
                                # 涨停
                                current_price_process_manager.set_latest_not_limit_up_time(code,
                                                                                           l2_huaxin_util.convert_time(
                                                                                               d[3], with_ms=True))
                                current_price_process_manager.set_latest_not_limit_up_time(code, d[5])
                        else:
                            # 主动卖(板上)
                            if d[1] == limit_up_price:
                            if d[2]:
                                L2LimitUpSellDataManager.clear_data(code)
                                break
                except:
                    pass
                # 统计卖单
                big_sell_order_info = HuaXinSellOrderStatisticManager.add_transaction_datas(code, datas, limit_up_price)
                big_sell_order_info = HuaXinSellOrderStatisticManager.add_transaction_datas(code, fdatas, limit_up_price)
                use_time_list.append(("处理卖单成交数据", time.time() - _start_time))
                _start_time = time.time()
@@ -171,10 +204,10 @@
                                                                                            order_begin_pos)
                        cancel_type = trade_constant.CANCEL_TYPE_P
                    # 判断时间是否与本地时间相差5s以上
                    if tool.trade_time_sub(tool.get_now_time_str(), l2_huaxin_util.convert_time(datas[-1][3])) > 10:
                    if tool.trade_time_sub(tool.get_now_time_str(), fdatas[-1][4]) > 10:
                        now_seconds = int(tool.get_now_time_str().replace(":", ""))
                        if now_seconds < int("093100"):  # or int("130000") <= now_seconds < int("130200"):
                            need_cancel, cancel_msg = True, f"成交时间与本地时间相差10S以上,{l2_huaxin_util.convert_time(datas[-1][3])}"
                            need_cancel, cancel_msg = True, f"成交时间与本地时间相差10S以上,{fdatas[-1][4]}"
                            cancel_type = trade_constant.CANCEL_TYPE_L2_DELAY
                    if need_cancel:
                        L2TradeDataProcessor.cancel_buy(code, cancel_msg, cancel_type=cancel_type)
@@ -183,7 +216,7 @@
                    use_time_list.append(("处理卖单相关撤数据", time.time() - _start_time))
                    _start_time = time.time()
                # 统计涨停卖成交
                HuaXinSellOrderStatisticManager.statistic_total_deal_volume(code, datas, limit_up_price)
                HuaXinSellOrderStatisticManager.statistic_total_deal_volume(code, fdatas, limit_up_price)
                use_time_list.append(("统计成交量数据", time.time() - _start_time))
            except Exception as e:
                async_log_util.error(logger_debug, f"卖单统计异常:{big_sell_order_info}")
@@ -193,7 +226,7 @@
            # if big_money_count > 0:
            #     LCancelRateManager.compute_big_num_deal_rate(code)
            buy_progress_index = cls.__compute_latest_trade_progress(code, buyno_map, datas)
            buy_progress_index = cls.__compute_latest_trade_progress(code, buyno_map, fdatas)
            if buy_progress_index is not None:
                buy_progress_index_changed = cls.__TradeBuyQueue.set_traded_index(code, buy_progress_index,
@@ -246,4 +279,5 @@
            use_time = int((time.time() - __start_time) * 1000)
            if use_time > 5:
                l2_log.info(code, hx_logger_l2_upload,
                            f"{code}处理成交用时:{use_time} 数据数量:{len(datas)}  详情:{use_time_list}")
                            f"{code}处理成交用时:{use_time} 数据数量:{len(fdatas)}  详情:{use_time_list}")
l2/place_order_single_data_manager.py
@@ -95,12 +95,14 @@
        return len(sell_list)
    @classmethod
    def process_passive_limit_up_sell_data(cls, code, datas, limit_up_price):
    def process_passive_limit_up_sell_data(cls, code, fdatas, limit_up_price):
        """
        添加涨停被动卖成交数据
        @param data: 数据格式:(data['SecurityID'], data['TradePrice'], data['TradeVolume'],
        @param fdata: 数据格式:(data['SecurityID'], data['TradePrice'], data['TradeVolume'],
        #           data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'],
        #           data['SellNo'], data['ExecType'])
                    [(数据本身, 是否主动买, 是否涨停, 总成交额, 不含ms时间,含ms时间)]
        @return: 涨停卖是否已经吃完
        """
        try:
@@ -109,24 +111,25 @@
            if not sell_list:
                return False
            sell_info = sell_list[-1]
            for data in datas:
            for data in fdatas:
                # 过滤被动买
                if data[6] < data[7]:
                if not data[1]:
                    # 出现被动买需要将历史大单清空
                    if cls.__active_buy_order_datas_dict.get(code):
                        cls.__active_buy_order_datas_dict[code].clear()
                    continue
                money = data[3]
                # 统计买单数据
                if code not in cls.__latest_active_buy_order_data_dict:
                    # [买单号,当前成交股数, 当前成交金额, 开始时间, 结束时间]
                    cls.__latest_active_buy_order_data_dict[code] = [data[6], data[2], data[2] * data[1], data[3],
                                                                     data[3]]
                    cls.__latest_active_buy_order_data_dict[code] = [data[0][6], data[0][2], money, data[0][3],
                                                                     data[0][3]]
                else:
                    if data[6] == cls.__latest_active_buy_order_data_dict[code][0]:
                    if data[0][6] == cls.__latest_active_buy_order_data_dict[code][0]:
                        # 同一买单号
                        cls.__latest_active_buy_order_data_dict[code][1] += data[2]
                        cls.__latest_active_buy_order_data_dict[code][2] += data[2] * data[1]
                        cls.__latest_active_buy_order_data_dict[code][4] = data[3]
                        cls.__latest_active_buy_order_data_dict[code][1] += data[0][2]
                        cls.__latest_active_buy_order_data_dict[code][2] += money
                        cls.__latest_active_buy_order_data_dict[code][4] = data[0][3]
                    else:
                        # 不同买单号
                        if cls.__latest_active_buy_order_data_dict[code][2] >= 2990000:
@@ -136,31 +139,29 @@
                            cls.__active_buy_order_datas_dict[code].append(
                                cls.__latest_active_buy_order_data_dict[code])
                        cls.__latest_active_buy_order_data_dict[code] = [data[6], data[2], data[2] * data[1], data[3],
                                                                         data[3]]
                        cls.__latest_active_buy_order_data_dict[code] = [data[0][6], data[0][2], money, data[0][3],
                                                                         data[0][3]]
                if data[1] != limit_up_price:
                if not data[2]:
                    # 排除主动卖/非涨停卖
                    continue
                sell_no = data[7]
                sell_no = data[0][7]
                if sell_no != sell_info['val']['orderNo']:
                    continue
                # 需要判断当前单是否已经成交完成
                if code not in cls.__latest_sell_data:
                    cls.__latest_sell_data[code] = [sell_no, data[2]]
                    cls.__latest_sell_data[code] = [sell_no, data[0][2]]
                else:
                    if cls.__latest_sell_data[code][0] == sell_no:
                        cls.__latest_sell_data[code][1] += data[2]
                        cls.__latest_sell_data[code][1] += data[0][2]
                    else:
                        cls.__latest_sell_data[code] = [sell_no, data[2]]
                        cls.__latest_sell_data[code] = [sell_no, data[0][2]]
                sell_info_num = sell_info['val']['num']
                deal_num = cls.__latest_sell_data[code][1] // 100
                if sell_info_num == deal_num:
                    use_time = round((time.time() - start_time) * 1000, 3)
                    l2_log.info(code, logger_l2_trade_buy,
                                f"找到最近的被动涨停卖单数据:{sell_info['val']['orderNo']}, 成交数据:{data} 计算耗时:{use_time}ms, 可以触发下单")
                    # 将历史大单列表与最近的大单加入列表
                    big_buy_order_datas = []
@@ -243,7 +244,7 @@
        cls.__callback = callback
    @classmethod
    def set_latest_sell_data(cls, code, data, big_active_buy_order_datas):
    def set_latest_sell_data(cls, code, fdata, big_active_buy_order_datas):
        """
        设置最近成交的涨停卖被动成交数据
@@ -254,14 +255,14 @@
        @param big_active_buy_order_datas: 大主动买单数据:[[买单号,当前成交股数, 当前成交金额, 开始时间, 结束时间],....]
        @return:
        """
        deal_time = l2_huaxin_util.convert_time(data[3], True)
        deal_time = fdata[5]
        # 生效时间在1s以内
        cls.__latest_sell_data_dict[code] = (data, tool.trade_time_add_millionsecond(deal_time, 1000))
        cls.__latest_sell_data_dict[code] = (fdata[0], tool.trade_time_add_millionsecond(deal_time, 1000))
        if cls.__callback:
            big_buy_order_count = 0
            if big_active_buy_order_datas:
                for b in big_active_buy_order_datas:
                    if b[0] > data[7]:
                    if b[0] > fdata[0][7]:
                        # 买单在卖单之后
                        big_buy_order_count += 1
l2/subscript/l2_subscript_manager.py
@@ -151,7 +151,7 @@
if __name__ == "__main__":
    queues = [multiprocessing.Queue() for i in range(7)]
    queues = [multiprocessing.Queue(maxsize=1024) for i in range(7)]
    manager = TargetCodeProcessManager(queues, 10)
    counts = [70, 60, 50, 10]
    for i in range(4):
l2_test.py
@@ -114,9 +114,9 @@
    cpu_count = 16
    page_size = int(len(codes) / cpu_count) + 1
    big_order_queue = multiprocessing.Queue()
    big_order_queue = multiprocessing.Queue(maxsize=1024)
    # 大单上传队列
    big_order_upload_queue = queue.Queue()
    big_order_upload_queue = queue.Queue(maxsize=1024)
    for i in range(cpu_count):
        process = multiprocessing.Process(target=l2_client_test.run,
main.py
@@ -113,7 +113,7 @@
        channels = channel_list[index:index + channel_count]
        index += channel_count
        # 订阅信号队列, 数据回调队列(回调频次小的数据通过这种回调)
        sub_single_queue, data_callback_queue = multiprocessing.Queue(), multiprocessing.Queue()
        sub_single_queue, data_callback_queue = multiprocessing.Queue(maxsize=1024), multiprocessing.Queue(maxsize=1024)
        sub_single_queue_list.append(sub_single_queue)
        data_callback_queue_list.append(data_callback_queue)
        l2_process = multiprocessing.Process(target=l2_client_v2.run,
third_data/hx_qc_value_util.py
@@ -145,7 +145,7 @@
def run():
    global request_queue
    request_queue, response_queue = multiprocessing.Queue(), multiprocessing.Queue()
    request_queue, response_queue = multiprocessing.Queue(maxsize=1024), multiprocessing.Queue(maxsize=1024)
    # 启动增值服务进程
    process = multiprocessing.Process(target=l1_api_client.run, args=(request_queue, response_queue,), daemon=True)
    process.start()