Administrator
2023-02-16 92cb2dd75ea37b64b174f42ddd0b5b17d6a4634a
l2/l2_data_manager_new.py
@@ -16,7 +16,7 @@
import tool
from trade import trade_data_manager, trade_manager, trade_queue_manager, l2_trade_factor, l2_trade_util, \
    trade_result_manager
from l2 import safe_count_manager, l2_data_manager, l2_data_log, l2_log
from l2 import safe_count_manager, l2_data_manager, l2_data_log, l2_log, l2_data_source_util
from l2.cancel_buy_strategy import SecondCancelBigNumComputer, HourCancelBigNumComputer, L2LimitUpMoneyStatisticUtil, \
    L2LimitUpSellStatisticUtil
from l2.l2_data_manager import L2DataException, TradePointManager
@@ -121,8 +121,9 @@
            # 如果是涨停买撤信号需要看数据位置是否比开始处理时间早
            if L2DataUtil.is_limit_up_price_buy_cancel(data["val"]):
                # 获取买入信号
                buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(total_datas[i],
                                                                                 local_today_num_operate_map.get(code))
                buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data(code, total_datas[i],
                                                                                                 local_today_num_operate_map.get(
                                                                                                     code))
                if buy_index is not None and buy_index < begin_pos:
                    continue
@@ -152,7 +153,6 @@
class L2TradeDataProcessor:
    unreal_buy_dict = {}
    random_key = {}
    l2BigNumForMProcessor = L2BigNumForMProcessor()
    __codeActualPriceProcessor = CodeActualPriceProcessor()
    buy1PriceManager = trade_queue_manager.Buy1PriceManager()
@@ -164,8 +164,7 @@
    # 数据处理入口
    # datas: 本次截图数据
    # capture_timestamp:截图时间戳
    def process(cls, code, datas, capture_timestamp, do_id):
        cls.random_key[code] = do_id
    def process(cls, code, datas, capture_timestamp):
        __start_time = round(t.time() * 1000)
        try:
            if len(datas) > 0:
@@ -189,7 +188,7 @@
                finally:
                    # 保存数据
                    __start_time = round(t.time() * 1000)
                    l2.l2_data_util.save_l2_data(code, datas, add_datas, cls.random_key[code])
                    l2.l2_data_util.save_l2_data(code, datas, add_datas)
                    __start_time = l2_data_log.l2_time(code,
                                                       round(t.time() * 1000) - __start_time,
                                                       "保存数据时间({})".format(len(add_datas)))
@@ -204,7 +203,7 @@
            print(id(local_today_datas))
            # 拼接数据
            local_today_datas[code].extend(add_datas)
            l2.l2_data_util.load_num_operate_map(l2.l2_data_util.local_today_num_operate_map, code, add_datas)
            l2.l2_data_util.load_num_operate_map(local_today_num_operate_map, code, add_datas)
            # 第1条数据是否为09:30:00
            if add_datas[0]["val"]["time"] == "09:30:00":
@@ -291,12 +290,14 @@
        def buy_1_cancel():
            _start_time = round(t.time() * 1000)
            # 撤单计算,只看买1
            cancel_data, cancel_msg = L2LimitUpMoneyStatisticUtil.process_data(cls.random_key[code], code, start_index,
            cancel_data, cancel_msg = L2LimitUpMoneyStatisticUtil.process_data(code, start_index,
                                                                               end_index,
                                                                               buy_single_index, buy_exec_index)
            l2_data_log.l2_time(code, round(t.time() * 1000) - _start_time,
                                "已下单-买1统计耗时")
            if constant.TEST:
                return None, ""
            return cancel_data, cancel_msg
        # S撤
@@ -307,8 +308,7 @@
            try:
                b_need_cancel, b_cancel_data = SecondCancelBigNumComputer.need_cancel(code, buy_single_index,
                                                                                      buy_exec_index, start_index,
                                                                                      end_index, total_data,
                                                                                      cls.random_key[code])
                                                                                      end_index, total_data)
                if b_need_cancel:
                    return b_cancel_data, "S大单撤销比例触发阈值"
            except Exception as e:
@@ -324,9 +324,8 @@
            _start_time = round(t.time() * 1000)
            try:
                b_need_cancel, b_cancel_data = HourCancelBigNumComputer.need_cancel(code, buy_exec_index, start_index,
                                                                                    end_index, total_data,
                                                                                    cls.random_key[code])
                if b_need_cancel and not cancel_data:
                                                                                    end_index, total_data, local_today_num_operate_map.get(code))
                if b_need_cancel and b_cancel_data:
                    return b_cancel_data, "H撤销比例触发阈值"
            except Exception as e:
                logging.exception(e)
@@ -340,7 +339,7 @@
            _start_time = round(t.time() * 1000)
            # 统计板上卖
            try:
                cancel_data, cancel_msg = L2LimitUpSellStatisticUtil.process(cls.random_key[code], code, start_index,
                cancel_data, cancel_msg = L2LimitUpSellStatisticUtil.process(code, start_index,
                                                                             end_index,
                                                                             buy_exec_index)
                return cancel_data, cancel_msg
@@ -714,7 +713,7 @@
            f2 = dask.delayed(limit_up_time_manager.save_limit_up_time)(code, total_datas[compute_index]["val"]["time"])
            f3 = dask.delayed(cls.__virtual_buy)(code, buy_single_index, compute_index, capture_time)
            f4 = dask.delayed(l2_data_manager.TradePointManager.delete_buy_cancel_point)(code)
            f5 = dask.delayed(L2LimitUpMoneyStatisticUtil.process_data)(cls.random_key[code], code, buy_single_index,
            f5 = dask.delayed(L2LimitUpMoneyStatisticUtil.process_data)(code, buy_single_index,
                                                                        compute_index,
                                                                        buy_single_index,
                                                                        buy_exec_index, False)
@@ -744,7 +743,7 @@
                need_cancel, cancel_data = SecondCancelBigNumComputer.need_cancel(code, buy_single_index,
                                                                                  compute_index,
                                                                                  buy_single_index, compute_index,
                                                                                  total_datas, cls.random_key[code],
                                                                                  total_datas,
                                                                                  True)
                _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time,
                                                  "S级大单处理耗时", force=True)
@@ -758,7 +757,7 @@
                    cls.__buy(code, capture_time, total_datas[compute_index], compute_index)
            else:
                SecondCancelBigNumComputer.need_cancel(code, buy_single_index, compute_index, buy_single_index,
                                                       compute_index, total_datas, cls.random_key[code], False)
                                                       compute_index, total_datas, False)
                _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time,
                                                  "S级大单处理耗时", force=True)
@@ -927,9 +926,9 @@
                    # 只统计59万以上的金额
                    # 涨停买撤
                    # 判断买入位置是否在买入信号之前
                    buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(total_datas[i],
                                                                                     local_today_num_operate_map.get(
                                                                                         code))
                    buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data(code, total_datas[i],
                                                                                                     local_today_num_operate_map.get(
                                                                                                         code))
                    if buy_index is not None:
                        # 找到买撤数据的买入点
                        if buy_index >= buy_single_index:
@@ -940,7 +939,7 @@
                            l2_log.buy_debug(code, "{}数据在买入信号之后 撤买纯买手数:{} 目标手数:{}", i, buy_nums, threshold_num)
                        else:
                            l2_log.buy_debug(code, "{}数据在买入信号之前,买入位:{}", i, buy_index)
                            if total_datas[buy_single_index]["val"]["time"] == buy_data["val"]["time"]:
                            if total_datas[buy_single_index]["val"]["time"] == total_datas[buy_index]["val"]["time"]:
                                # 同一秒,当作买入信号之后处理
                                buy_nums -= int(_val["num"]) * int(data["re"])
                                buy_count -= int(data["re"])
@@ -1102,9 +1101,9 @@
        cls.random_key[code] = random.randint(0, 100000)
        buy_single_begin_index, buy_exec_index = 426, 479
        L2LimitUpMoneyStatisticUtil.process_data(cls.random_key[code], code, 480, 519,
        L2LimitUpMoneyStatisticUtil.process_data(code, 480, 519,
                                                 buy_single_begin_index, buy_exec_index, False)
        L2LimitUpMoneyStatisticUtil.process_data(cls.random_key[code], code, 480, 519,
        L2LimitUpMoneyStatisticUtil.process_data(code, 480, 519,
                                                 buy_single_begin_index, buy_exec_index, False)
    @classmethod