Administrator
2023-03-17 8b848e8a9fa242b39f92f3a28faf89be10a6e456
l2/l2_data_manager_new.py
@@ -4,6 +4,7 @@
import big_money_num_manager
import code_data_util
import code_volumn_manager
import constant
import global_util
import gpcode_manager
@@ -16,7 +17,7 @@
import tool
from trade import trade_data_manager, trade_manager, trade_queue_manager, l2_trade_factor, l2_trade_util, \
    trade_result_manager
from l2 import safe_count_manager, l2_data_manager, l2_data_log, l2_log, l2_data_source_util
from l2 import safe_count_manager, l2_data_manager, l2_data_log, l2_log, l2_data_source_util, code_price_manager
from l2.cancel_buy_strategy import SecondCancelBigNumComputer, HourCancelBigNumComputer, L2LimitUpMoneyStatisticUtil, \
    L2LimitUpSellStatisticUtil
from l2.l2_data_manager import L2DataException, TradePointManager
@@ -153,12 +154,13 @@
class L2TradeDataProcessor:
    unreal_buy_dict = {}
    volume_rate_info = {}
    l2BigNumForMProcessor = L2BigNumForMProcessor()
    __codeActualPriceProcessor = CodeActualPriceProcessor()
    buy1PriceManager = trade_queue_manager.Buy1PriceManager()
    __ths_l2_trade_queue_manager = trade_queue_manager.thsl2tradequeuemanager()
    __thsBuy1VolumnManager = trade_queue_manager.THSBuy1VolumnManager()
    __buyL2SafeCountManager = safe_count_manager.BuyL2SafeCountManager()
    __l2PlaceOrderParamsManager = l2_trade_factor.L2PlaceOrderParamsManager()
    @classmethod
    # 数据处理入口
@@ -224,6 +226,10 @@
                                           "l2数据预处理时间")
        if len(add_datas) > 0:
            # 计算量
            volume_rate = code_volumn_manager.get_volume_rate(code)
            volume_rate_index = code_volumn_manager.get_volume_rate_index(volume_rate)
            cls.volume_rate_info[code] = (volume_rate, volume_rate_index)
            # 是否为首板代码
            is_first_code = gpcode_manager.FirstCodeManager.is_in_first_record(code)
            latest_time = add_datas[len(add_datas) - 1]["val"]["time"]
@@ -315,6 +321,9 @@
                b_need_cancel, b_cancel_data = SecondCancelBigNumComputer.need_cancel(code, buy_single_index,
                                                                                      buy_exec_index, start_index,
                                                                                      end_index, total_data,
                                                                                      code_volumn_manager.get_volume_rate_index(
                                                                                          buy_volume_rate),
                                                                                      cls.volume_rate_info[code][1],
                                                                                      is_first_code)
                if b_need_cancel:
                    return b_cancel_data, "S大单撤销比例触发阈值"
@@ -333,7 +342,11 @@
                b_need_cancel, b_cancel_data = HourCancelBigNumComputer.need_cancel(code, buy_exec_index, start_index,
                                                                                    end_index, total_data,
                                                                                    local_today_num_operate_map.get(
                                                                                        code), is_first_code)
                                                                                        code),
                                                                                    code_volumn_manager.get_volume_rate_index(
                                                                                        buy_volume_rate),
                                                                                    cls.volume_rate_info[code][1],
                                                                                    is_first_code)
                if b_need_cancel and b_cancel_data:
                    return b_cancel_data, "H撤销比例触发阈值"
            except Exception as e:
@@ -380,7 +393,8 @@
        total_data = local_today_datas.get(code)
        _start_time = tool.get_now_timestamp()
        # 获取买入信号起始点
        buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set = cls.__get_order_begin_pos(code)
        buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set, buy_volume_rate = cls.__get_order_begin_pos(
            code)
        f1 = compute_safe_count()
        f2 = compute_m_big_num()
@@ -389,6 +403,9 @@
        f5 = buy_1_cancel()
        f6 = sell_cancel()
        dask_result = is_need_cancel(f1, f2, f3, f4, f5, f6)
        if is_first_code:
            dask_result = is_need_cancel(f3, f4)
        cancel_data, cancel_msg = dask_result.compute()
        _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time,
@@ -431,7 +448,7 @@
        if not can:
            l2_log.debug(code, "不可以下单,原因:{}", reason)
            if need_clear_data:
                buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set = cls.__get_order_begin_pos(
                buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set, buy_volume_rate = cls.__get_order_begin_pos(
                    code)
                trade_result_manager.real_cancel_success(code, buy_single_index, buy_exec_index,
                                                         local_today_datas.get(code))
@@ -492,15 +509,24 @@
        __start_time = t.time()
        # 判断是否为首板代码
        if is_first_code:
            # 首板代码且尚未涨停过的不能下单
            if not gpcode_manager.WantBuyCodesManager.is_in(code):
            is_limited_up = gpcode_manager.FirstCodeManager.is_limited_up(code)
            if not is_limited_up:
                gpcode_manager.FirstCodeManager.add_limited_up_record([code])
                place_order_count = trade_data_manager.placeordercountmanager.get_place_order_count(
                    code)
                if place_order_count == 0:
                    trade_data_manager.placeordercountmanager.place_order(code)
                return False, True, "首板代码,且尚未涨停过"
                if not code_price_manager.Buy1PriceManager.is_can_buy(code):
                    return False, True, "首板代码,没在想要买名单中且未打开涨停板"
                if not is_limited_up:
                    return False, True, "首板代码,没在想要买名单中且未涨停过"
            # 之前的代码
            # 首板代码且尚未涨停过的不能下单
            # is_limited_up = gpcode_manager.FirstCodeManager.is_limited_up(code)
            # if not is_limited_up:
            #     gpcode_manager.FirstCodeManager.add_limited_up_record([code])
            #     place_order_count = trade_data_manager.placeordercountmanager.get_place_order_count(
            #         code)
            #     if place_order_count == 0:
            #         trade_data_manager.placeordercountmanager.place_order(code)
            #     return False, True, "首板代码,且尚未涨停过"
        try:
            # 买1价格必须为涨停价才能买
@@ -523,7 +549,7 @@
                if sell1_time is not None and sell1_volumn > 0:
                    # 获取执行位信息
                    buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set = cls.__get_order_begin_pos(
                    buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set, buy_volume_rate = cls.__get_order_begin_pos(
                        code)
                    buy_nums = num
                    for i in range(buy_exec_index + 1, total_datas[-1]["index"] + 1):
@@ -629,7 +655,7 @@
    @classmethod
    def cancel_buy(cls, code, msg=None, source="l2"):
        # 是否是交易队列触发
        buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set = cls.__get_order_begin_pos(
        buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set, buy_volume_rate = cls.__get_order_begin_pos(
            code)
        total_datas = local_today_datas[code]
        if source == "trade_queue":
@@ -675,17 +701,13 @@
                                                      local_today_num_operate_map.get(code))
        # 获取买入信号计算起始位置
        buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set = cls.__get_order_begin_pos(
        buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set, buy_volume_rate = cls.__get_order_begin_pos(
            code)
        # 是否为新获取到的位置
        new_get_single = False
        if buy_single_index is None:
            place_order_count = trade_data_manager.placeordercountmanager.get_place_order_count(code)
            continue_count = 3
            # 前2次的信号连续笔数为3,后面为2
            if place_order_count > 2:
                continue_count = 2
            continue_count = cls.__l2PlaceOrderParamsManager.get_begin_continue_buy_count(cls.volume_rate_info[code][1])
            # 有买入信号
            has_single, _index = cls.__compute_order_begin_pos(code, max(
                (compute_start_index - continue_count - 1) if new_add else compute_start_index, 0), continue_count,
@@ -732,7 +754,7 @@
                                                                                                             max_num_set)
        _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, "纯买额统计时间")
        l2_log.debug(code, "m值-{} m值因子-{}", threshold_money, msg)
        l2_log.debug(code, "m值-{} 量比:{}", threshold_money, cls.volume_rate_info[code][0])
        # 买入信号位与计算位置间隔2s及以上了
        if rebegin_buy_pos is not None:
@@ -746,7 +768,8 @@
                         buy_count, total_datas[compute_index])
            f1 = dask.delayed(cls.__save_order_begin_data)(code, buy_single_index, compute_index, compute_index,
                                                           buy_nums, buy_count, max_num_set_new)
                                                           buy_nums, buy_count, max_num_set_new,
                                                           cls.volume_rate_info[code][0])
            f2 = dask.delayed(limit_up_time_manager.save_limit_up_time)(code, total_datas[compute_index]["val"]["time"])
            f3 = dask.delayed(cls.__virtual_buy)(code, buy_single_index, compute_index, capture_time)
            f4 = dask.delayed(l2_data_manager.TradePointManager.delete_buy_cancel_point)(code)
@@ -781,6 +804,8 @@
                                                                                  compute_index,
                                                                                  buy_single_index, compute_index,
                                                                                  total_datas, is_first_code,
                                                                                  cls.volume_rate_info[code][1],
                                                                                  cls.volume_rate_info[code][1],
                                                                                  True)
                _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time,
                                                  "S级大单处理耗时", force=True)
@@ -794,7 +819,9 @@
                    cls.__buy(code, capture_time, total_datas[compute_index], compute_index, is_first_code)
            else:
                SecondCancelBigNumComputer.need_cancel(code, buy_single_index, compute_index, buy_single_index,
                                                       compute_index, total_datas, is_first_code, False)
                                                       compute_index, total_datas, is_first_code,
                                                       cls.volume_rate_info[code][1],
                                                       cls.volume_rate_info[code][1], False)
                _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time,
                                                  "S级大单处理耗时", force=True)
@@ -809,7 +836,7 @@
            # 未达到下单条件,保存纯买额,设置纯买额
            # 记录买入信号位置
            cls.__save_order_begin_data(code, buy_single_index, -1, compute_end_index, buy_nums, buy_count,
                                        max_num_set_new)
                                        max_num_set_new, None)
            print("保存大单时间", round((t.time() - _start_time) * 1000))
            _start_time = t.time()
        pass
@@ -817,15 +844,16 @@
    # 获取下单起始信号
    @classmethod
    def __get_order_begin_pos(cls, code):
        buy_single_index, buy_exec_index, compute_index, num, count, max_num_set = l2_data_manager.TradePointManager.get_buy_compute_start_data(
        buy_single_index, buy_exec_index, compute_index, num, count, max_num_set, volume_rate = l2_data_manager.TradePointManager.get_buy_compute_start_data(
            code)
        return buy_single_index, buy_exec_index, compute_index, num, count, max_num_set
        return buy_single_index, buy_exec_index, compute_index, num, count, max_num_set, volume_rate
    # 保存下单起始信号
    @classmethod
    def __save_order_begin_data(self, code, buy_single_index, buy_exec_index, compute_index, num, count, max_num_set):
    def __save_order_begin_data(self, code, buy_single_index, buy_exec_index, compute_index, num, count, max_num_set,
                                volume_rate):
        TradePointManager.set_buy_compute_start_data(code, buy_single_index, buy_exec_index, compute_index, num, count,
                                                     max_num_set)
                                                     max_num_set, volume_rate)
    # 计算下单起始信号
    # compute_data_count 用于计算的l2数据数量
@@ -873,7 +901,7 @@
    @classmethod
    def __get_threshmoney(cls, code):
        return l2_trade_factor.L2TradeFactorUtil.compute_m_value(code)
        return l2_trade_factor.L2TradeFactorUtil.compute_m_value(code, cls.volume_rate_info[code][1])
    # 计算万手哥笔数
    @classmethod
@@ -901,34 +929,30 @@
        buy_nums = origin_num
        buy_count = origin_count
        limit_up_price = gpcode_manager.get_limit_up_price(code)
        buy1_price = cls.buy1PriceManager.get_price(code)
        if limit_up_price is None:
            raise Exception("涨停价无法获取")
        # 目标手数
        threshold_num = round(threshold_money / (limit_up_price * 100))
        place_order_count = trade_data_manager.placeordercountmanager.get_place_order_count(code)
        # place_order_count = trade_data_manager.placeordercountmanager.get_place_order_count(code)
        # 目标订单数量
        threshold_count = cls.__buyL2SafeCountManager.get_safe_count(code, is_first_code, place_order_count)
        threshold_count = cls.__buyL2SafeCountManager.get_safe_count(code, is_first_code,
                                                                     cls.__l2PlaceOrderParamsManager.get_safe_count_rate(
                                                                         cls.volume_rate_info[code][1]))
        buy_single_time_seconds = L2DataUtil.get_time_as_second(total_datas[buy_single_index]["val"]["time"])
        # 可以触发买,当有涨停买信号时才会触发买
        trigger_buy = True
        if place_order_count > 3:
            place_order_count = 3
        # 间隔最大时间依次为:3,9,27,81
        max_space_time = pow(3, place_order_count + 1) - 1
        max_space_time = cls.__l2PlaceOrderParamsManager.get_time_range(cls.volume_rate_info[code][1])
        # 最大买量
        max_buy_num = 0
        max_buy_num_set = set(max_num_set)
        # 需要的最小大单笔数
        big_num_count = 2
        if place_order_count > 1:
            # 第一次下单需要大单最少2笔,以后只需要1笔
            big_num_count = 1
        big_num_count = cls.__l2PlaceOrderParamsManager.get_big_num_count(cls.volume_rate_info[code][1])
        # 较大单的手数
        bigger_num = round(5900 / limit_up_price)