Administrator
2023-02-08 3ec79004bd769828c8dc18ed35280f81cfb473ff
l2/l2_data_manager_new.py
@@ -8,23 +8,26 @@
import global_util
import gpcode_manager
import industry_codes_sort
import l2_data_log
import l2_data_util
import l2_trade_test
import limit_up_time_manager
from db import redis_manager
import ths_industry_util
import tool
from trade import trade_data_manager, trade_manager, trade_queue_manager, l2_trade_factor, l2_trade_util
from l2 import safe_count_manager, l2_data_manager, l2_data_util
from trade import trade_data_manager, trade_manager, trade_queue_manager, l2_trade_factor, l2_trade_util, \
    trade_result_manager
from l2 import safe_count_manager, l2_data_manager, l2_data_log
from l2.cancel_buy_strategy import SecondCancelBigNumComputer, HourCancelBigNumComputer, L2LimitUpMoneyStatisticUtil, \
    L2LimitUpSellStatisticUtil
from l2.l2_data_manager import L2DataException, TradePointManager
from l2.l2_data_util import local_today_datas, L2DataUtil, load_l2_data, local_today_num_operate_map, local_latest_datas
import l2.l2_data_util
from log import logger_l2_trade, logger_l2_trade_cancel, logger_l2_trade_buy, logger_l2_process, logger_l2_error
# TODO l2数据管理
from trade.trade_data_manager import CodeActualPriceProcessor
import dask
class L2DataManager:
@@ -185,21 +188,22 @@
                    raise L2DataException(L2DataException.CODE_PRICE_ERROR,
                                          "股价不匹配 code-{} price-{}".format(code, datas[0]["val"]["price"]))
                # 加载历史数据
                l2_data_util.load_l2_data(code)
                l2.l2_data_util.load_l2_data(code)
                # 纠正数据
                datas = l2_data_util.L2DataUtil.correct_data(code,local_latest_datas.get(code), datas)
                datas = l2.l2_data_util.L2DataUtil.correct_data(code, local_latest_datas.get(code), datas)
                _start_index = 0
                if local_today_datas.get(code) is not None and len(
                        local_today_datas[code]) > 0:
                    _start_index = local_today_datas[code][-1]["index"] + 1
                add_datas = l2_data_util.L2DataUtil.get_add_data(code,local_latest_datas.get(code), datas, _start_index)
                add_datas = l2.l2_data_util.L2DataUtil.get_add_data(code, local_latest_datas.get(code), datas,
                                                                    _start_index)
                # -------------数据增量处理------------
                try:
                    cls.process_add_datas(code, add_datas, capture_timestamp, __start_time)
                finally:
                    # 保存数据
                    __start_time = round(t.time() * 1000)
                    l2_data_util.save_l2_data(code, datas, add_datas, cls.random_key[code])
                    l2.l2_data_util.save_l2_data(code, datas, add_datas, cls.random_key[code])
                    __start_time = l2_data_log.l2_time(code, cls.random_key[code],
                                                       round(t.time() * 1000) - __start_time,
                                                       "保存数据时间({})".format(len(add_datas)))
@@ -211,9 +215,10 @@
    def process_add_datas(cls, code, add_datas, capture_timestamp, __start_time):
        now_time_str = tool.get_now_time_str()
        if len(add_datas) > 0:
            print(id(local_today_datas))
            # 拼接数据
            local_today_datas[code].extend(add_datas)
            l2_data_util.load_num_operate_map(l2_data_util.local_today_num_operate_map, code, add_datas)
            l2.l2_data_util.load_num_operate_map(l2.l2_data_util.local_today_num_operate_map, code, add_datas)
            # 第1条数据是否为09:30:00
            if add_datas[0]["val"]["time"] == "09:30:00":
@@ -232,7 +237,9 @@
        if len(add_datas) > 0:
            latest_time = add_datas[len(add_datas) - 1]["val"]["time"]
            # 时间差不能太大才能处理
            if l2_data_util.L2DataUtil.is_same_time(now_time_str, latest_time) and not l2_trade_util.is_in_forbidden_trade_codes(code):
            if l2.l2_data_util.L2DataUtil.is_same_time(now_time_str,
                                                       latest_time) and not l2_trade_util.is_in_forbidden_trade_codes(
                code):
                # 判断是否已经挂单
                state = trade_manager.get_trade_state(code)
                start_index = len(total_datas) - len(add_datas)
@@ -259,6 +266,7 @@
        if round(t.time() * 1000) - __start_time > 10:
            __start_time = l2_data_log.l2_time(code, cls.random_key.get(code), round(t.time() * 1000) - __start_time,
                                               "获取m值数据耗时")
        cls.__start_compute_buy(code, start_index, end_index, threshold_money, capture_time)
    # 测试专用
@@ -269,6 +277,115 @@
    # 处理已挂单
    @classmethod
    def __process_order(cls, code, start_index, end_index, capture_time, new_add=True):
        # 计算安全笔数
        @dask.delayed
        def compute_safe_count():
            _start_time = round(t.time() * 1000)
            # 处理安全笔数
            cls.__buyL2SafeCountManager.compute_left_rate(code, start_index, end_index, total_data,
                                                          local_today_num_operate_map.get(code))
            l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
                                "已下单-获取买入信息耗时")
            return None, ""
        @dask.delayed
        # m值大单计算
        def compute_m_big_num():
            _start_time = round(t.time() * 1000)
            # 计算m值大单
            cls.l2BigNumForMProcessor.process(code, max(buy_single_index, start_index), end_index,
                                              gpcode_manager.get_limit_up_price(code))
            l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
                                "已下单-m值大单计算")
            return None, ""
        # 买1撤计算
        @dask.delayed
        def buy_1_cancel():
            _start_time = round(t.time() * 1000)
            # 撤单计算,只看买1
            cancel_data, cancel_msg = L2LimitUpMoneyStatisticUtil.process_data(cls.random_key[code], code, start_index,
                                                                               end_index,
                                                                               buy_single_index, buy_exec_index)
            l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
                                "已下单-买1统计耗时")
            return cancel_data, cancel_msg
        # S撤
        @dask.delayed
        def s_cancel():
            _start_time = round(t.time() * 1000)
            # S撤单计算,看秒级大单撤单
            try:
                b_need_cancel, b_cancel_data = SecondCancelBigNumComputer.need_cancel(code, buy_single_index,
                                                                                      buy_exec_index, start_index,
                                                                                      end_index, total_data,
                                                                                      cls.random_key[code])
                if b_need_cancel:
                    return b_cancel_data, "S大单撤销比例触发阈值"
            except Exception as e:
                logging.exception(e)
            finally:
                l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
                                    "已下单-s级大单估算")
            return None, ""
        # H撤
        @dask.delayed
        def h_cancel():
            _start_time = round(t.time() * 1000)
            try:
                b_need_cancel, b_cancel_data = HourCancelBigNumComputer.need_cancel(code, buy_exec_index, start_index,
                                                                                    end_index, total_data,
                                                                                    cls.random_key[code])
                if b_need_cancel and not cancel_data:
                    return b_cancel_data, "H撤销比例触发阈值"
            except Exception as e:
                logging.exception(e)
            finally:
                l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time, "已下单-H撤大单计算")
            return None, ""
        # 板上卖撤
        @dask.delayed
        def sell_cancel():
            _start_time = round(t.time() * 1000)
            # 统计板上卖
            try:
                cancel_data, cancel_msg = L2LimitUpSellStatisticUtil.process(cls.random_key[code], code, start_index,
                                                                             end_index,
                                                                             buy_exec_index)
                return cancel_data, cancel_msg
            except Exception as e:
                logging.exception(e)
            finally:
                l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time, "已下单-板上卖耗时")
            return None, ""
        # 是否需要撤销
        @dask.delayed
        def is_need_cancel(*args):
            f_cancel_data, f_cancel_msg = None, ""
            try:
                for i in range(0, len(args)):
                    _cancel_data, _cancel_msg = args[i]
                    if _cancel_data:
                        if not f_cancel_data:
                            f_cancel_data, f_cancel_msg = _cancel_data, _cancel_msg
                        else:
                            if _cancel_data["index"] < f_cancel_data["index"]:
                                # 取较早的撤销数据
                                f_cancel_data, f_cancel_msg = _cancel_data, _cancel_msg
            except Exception as e:
                logging.exception(e)
            finally:
                pass
            return f_cancel_data, f_cancel_msg
        if start_index < 0:
            start_index = 0
@@ -276,69 +393,17 @@
            return
        total_data = local_today_datas.get(code)
        _start_time = round(t.time() * 1000)
        # 获取买入信号起始点
        buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set = cls.__get_order_begin_pos(
            code)
        # 处理安全笔数
        cls.__buyL2SafeCountManager.compute_left_rate(code, start_index, end_index, total_data,
                                                      local_today_num_operate_map.get(code))
        buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set = cls.__get_order_begin_pos(code)
        _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
                                          "已下单-获取买入信息耗时")
        # 撤单计算,只看买1
        cancel_data, cancel_msg = L2LimitUpMoneyStatisticUtil.process_data(cls.random_key[code], code, start_index,
                                                                           end_index,
                                                                           buy_single_index, buy_exec_index)
        _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
                                          "已下单-买1统计耗时")
        # S撤单计算,看秒级大单撤单
        try:
            b_need_cancel, b_cancel_data = SecondCancelBigNumComputer.need_cancel(code, buy_single_index,
                                                                                  buy_exec_index, start_index,
                                                                                  end_index, total_data,
                                                                                  cls.random_key[code])
            if b_need_cancel and not cancel_data:
                cancel_data = b_cancel_data
                cancel_msg = "S大单撤销比例触发阈值"
        except Exception as e:
            logging.exception(e)
        _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
                                          "已下单-s级大单估算")
        # H撤
        try:
            b_need_cancel, b_cancel_data = HourCancelBigNumComputer.need_cancel(code, buy_exec_index, start_index,
                                                                                end_index, total_data,
                                                                                cls.random_key[code])
            if b_need_cancel and not cancel_data:
                cancel_data = b_cancel_data
                cancel_msg = "H撤销比例触发阈值"
        except Exception as e:
            logging.exception(e)
        _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
                                          "已下单-H撤大单计算")
        if not cancel_data:
            # 统计板上卖
            try:
                cancel_data, cancel_msg = L2LimitUpSellStatisticUtil.process(cls.random_key[code], code, start_index,
                                                                             end_index,
                                                                             buy_exec_index)
            except Exception as e:
                logging.exception(e)
        _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
                                          "已下单-板上卖耗时")
        # 计算m值大单
        cls.l2BigNumForMProcessor.process(code, max(buy_single_index, start_index), end_index,
                                          gpcode_manager.get_limit_up_price(code))
        _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
                                          "已下单-m值大单计算")
        f1 = compute_safe_count()
        f2 = compute_m_big_num()
        f3 = buy_1_cancel()
        f4 = s_cancel()
        f5 = h_cancel()
        f6 = sell_cancel()
        dask_result = is_need_cancel(f1, f2, f3, f4, f5, f6)
        cancel_data, cancel_msg = dask_result.compute()
        if cancel_data:
            cls.debug(code, "触发撤单,撤单位置:{} ,撤单原因:{}", cancel_data["index"], cancel_msg)
@@ -383,24 +448,8 @@
                cls.debug(code, "开始执行买入")
                trade_manager.start_buy(code, capture_timestamp, last_data,
                                        last_data_index)
                trade_data_manager.placeordercountmanager.place_order(code)
                # 下单成功,需要删除最大买1
                cls.__thsBuy1VolumnManager.clear_max_buy1_volume(code)
                # 获取买入位置信息
                try:
                    buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set = cls.__get_order_begin_pos(
                        code)
                    cls.__buyL2SafeCountManager.save_place_order_info(code, buy_single_index, buy_exec_index, None)
                    SecondCancelBigNumComputer.place_order_success(code, buy_single_index, buy_exec_index)
                    HourCancelBigNumComputer.place_order_success(code, buy_single_index, buy_exec_index,
                                                                 local_today_datas.get(code),
                                                                 local_today_num_operate_map.get(code))
                except Exception as e:
                    logging.exception(e)
                    logger_l2_error.exception(e)
                l2_data_manager.TradePointManager.delete_buy_cancel_point(code)
                ################下单成功处理################
                trade_result_manager.real_buy_success(code)
                cls.debug(code, "执行买入成功")
            except Exception as e:
                cls.debug(code, "执行买入异常:{}", str(e))
@@ -442,6 +491,8 @@
    # 是否可以买
    @classmethod
    def __can_buy(cls, code):
        __start_time = t.time()
        try:
        # 买1价格必须为涨停价才能买
        # buy1_price = cls.buy1PriceManager.get_price(code)
        # if buy1_price is None:
@@ -467,9 +518,9 @@
                    # 涨停买
                    if L2DataUtil.is_limit_up_price_buy(_val):
                        # 涨停买
                        buy_nums += int(_val["num"]) * int(total_datas[i]["re"])
                            buy_nums += _val["num"] * total_datas[i]["re"]
                    elif L2DataUtil.is_limit_up_price_buy_cancel(_val):
                        buy_nums -= int(_val["num"]) * int(total_datas[i]["re"])
                            buy_nums -= _val["num"] * total_datas[i]["re"]
                if buy_nums < sell1_volumn * 0.49:
                    return False, "纯买量({})小于卖1量的49%{} 卖1时间:{}".format(buy_nums, sell1_volumn, sell1_time)
        except Exception as e:
@@ -481,8 +532,8 @@
            return False, "最大量比超过1.3不能买"
        limit_up_time = limit_up_time_manager.get_limit_up_time(code)
        if limit_up_time is not None and l2_data_util.L2DataUtil.get_time_as_second(
                limit_up_time) >= l2_data_util.L2DataUtil.get_time_as_second(
            if limit_up_time is not None and l2.l2_data_util.L2DataUtil.get_time_as_second(
                    limit_up_time) >= l2.l2_data_util.L2DataUtil.get_time_as_second(
            "14:30:00"):
            return False, "14:30后涨停的不能买,涨停时间为{}".format(limit_up_time)
@@ -543,21 +594,20 @@
        #         return False, "老二,本板块中涨停票数<29不能买"
        # 可以下单
        return True, None
        finally:
            l2_data_log.l2_time(code, cls.random_key[code], round((t.time() - __start_time) * 1000), "是否可以下单计算")
    @classmethod
    def __cancel_buy(cls, code):
        try:
            cls.debug(code, "开始执行撤单")
            trade_manager.start_cancel_buy(code)
            # 取消买入标识
            l2_data_manager.TradePointManager.delete_buy_point(code)
            l2_data_manager.TradePointManager.delete_buy_cancel_point(code)
            l2_data_manager.TradePointManager.delete_compute_info_for_cancel_buy(code)
            l2_data_manager.TradePointManager.delete_count_info_for_cancel_buy(code)
            cls.debug(code, "执行撤单成功")
            return True
        except Exception as e:
            logging.exception(e)
            cls.debug(code, "执行撤单异常:{}", str(e))
            return False
    @classmethod
    def cancel_buy(cls, code, msg=None, source="l2"):
@@ -575,10 +625,7 @@
        if code in cls.unreal_buy_dict:
            cls.unreal_buy_dict.pop(code)
            # 取消买入标识
            l2_data_manager.TradePointManager.delete_buy_point(code)
            l2_data_manager.TradePointManager.delete_buy_cancel_point(code)
            l2_data_manager.TradePointManager.delete_compute_info_for_cancel_buy(code)
            l2_data_manager.TradePointManager.delete_count_info_for_cancel_buy(code)
            trade_result_manager.virtual_cancel_success(code, buy_single_index, buy_exec_index, total_datas)
        else:
            can_cancel, reason = cls.__can_cancel(code)
            if not can_cancel:
@@ -586,21 +633,17 @@
                cls.cancel_debug(code, "撤单中断,原因:{}", reason)
                cls.debug(code, "撤单中断,原因:{}", reason)
                return False
            cls.__cancel_buy(code)
            # 撤单成功
            cls.__buyL2SafeCountManager.save_place_order_info(code, buy_single_index, buy_exec_index,
                                                              total_datas[-1]["index"])
        cls.debug(code, "执行撤单成功,原因:{}", msg)
            cancel_result = cls.__cancel_buy(code)
            if cancel_result:
                trade_result_manager.real_cancel_success(code, buy_single_index, buy_exec_index, total_datas)
        cls.debug(code, "执行撤单结束,原因:{}", msg)
        return True
    # 虚拟下单
    @classmethod
    def __virtual_buy(cls, code, buy_single_index, buy_exec_index, capture_time):
        cls.unreal_buy_dict[code] = (buy_exec_index, capture_time)
        SecondCancelBigNumComputer.place_order_success(code, buy_single_index, buy_exec_index)
        # 删除之前的板上卖信息
        L2LimitUpSellStatisticUtil.delete(code)
        trade_result_manager.virtual_buy_success(code)
    @classmethod
    def __start_compute_buy(cls, code, compute_start_index, compute_end_index, threshold_money, capture_time,
@@ -609,6 +652,10 @@
            return
        _start_time = round(t.time() * 1000)
        total_datas = local_today_datas[code]
        # 处理安全笔数
        cls.__buyL2SafeCountManager.compute_left_rate(code, compute_start_index, compute_end_index, total_datas,
                                                      local_today_num_operate_map.get(code))
        # 获取买入信号计算起始位置
        buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set = cls.__get_order_begin_pos(
            code)
@@ -710,7 +757,7 @@
                # 处理撤单步骤
                cls.__process_order(code, compute_index + 1, compute_end_index, capture_time, False)
                _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
                                                  "处理撤单步骤耗时", force=True)
                                                  f"处理撤单步骤耗时,范围:{compute_index + 1}-{compute_end_index}", force=True)
        else:
            # 未达到下单条件,保存纯买额,设置纯买额
@@ -823,7 +870,7 @@
        if limit_up_price is None:
            raise Exception("涨停价无法获取")
        # 目标手数
        threshold_num = threshold_money / (limit_up_price * 100)
        threshold_num = round(threshold_money / (limit_up_price * 100))
        buy1_factor = 1
        # 获取买1是否为涨停价
@@ -835,7 +882,7 @@
            print("买1价不为涨停价,买1价-{} 涨停价-{}".format(buy1_price, limit_up_price))
            buy1_factor = 1.3
        # 目标订单数量
        threshold_count = safe_count_manager.BuyL2SafeCountManager.get_safe_count(code)
        threshold_count = cls.__buyL2SafeCountManager.get_safe_count(code)
        buy_single_time_seconds = L2DataUtil.get_time_as_second(total_datas[buy_single_index]["val"]["time"])
@@ -849,6 +896,13 @@
        # 最大买量
        max_buy_num = 0
        max_buy_num_set = set(max_num_set)
        # 需要的最小大单笔数
        big_num_count = 2
        if place_order_count > 1:
            # 第一次下单需要大单最少2笔,以后只需要1笔
            big_num_count = 1
        for i in range(compute_start_index, compute_end_index + 1):
            data = total_datas[i]
            _val = total_datas[i]["val"]
@@ -878,7 +932,7 @@
                        logger_l2_trade_buy.info("{}获取到买入执行点:{} 统计纯买手数:{} 目标纯买手数:{} 统计纯买单数:{} 目标纯买单数:{}, 大单数量:{}", code,
                                                 i,
                                                 buy_nums,
                                                 threshold_num, buy_count, get_threshold_count(), sub_threshold_count)
                                                 threshold_num, buy_count, get_threshold_count(), sub_threshold_count, )
            elif L2DataUtil.is_limit_up_price_buy_cancel(_val):
                if l2_data_util.is_big_money(_val):
                    sub_threshold_count -= int(total_datas[i]["re"])
@@ -910,20 +964,15 @@
            cls.buy_debug(code, "位置-{},总手数:{},目标手数:{}", i,
                          buy_nums, threshold_num)
            # 需要的最小大单笔数
            big_num_count = 2
            if place_order_count > 1:
                # 第一次下单需要大单最少2笔,以后只需要1笔
                big_num_count = 1
            # 有撤单信号,且小于阈值
            if buy_nums >= threshold_num and buy_count >= get_threshold_count() and trigger_buy and len(
                    max_buy_num_set) >= big_num_count:
                return i, buy_nums, buy_count, None, max_buy_num_set
        cls.buy_debug(code, "尚未获取到买入执行点,起始计算位置:{} 统计纯买手数:{} 目标纯买手数:{}  统计纯买单数:{} 目标纯买单数:{} 大单数量:{}",
        cls.buy_debug(code, "尚未获取到买入执行点,起始计算位置:{} 统计纯买手数:{} 目标纯买手数:{}  统计纯买单数:{} 目标纯买单数:{} 大单数量:{} 目标大单数量:{}",
                      compute_start_index,
                      buy_nums,
                      threshold_num, buy_count, get_threshold_count(), sub_threshold_count)
                      threshold_num, buy_count, get_threshold_count(), len(max_buy_num_set), big_num_count)
        return None, buy_nums, buy_count, None, max_buy_num_set
@@ -988,7 +1037,7 @@
        load_l2_data(code, True)
        _start = t.time()
        capture_timestamp = 1999999999
        cls.process(code, l2_data_util.local_today_datas[code][1552:1641], capture_timestamp)
        cls.process(code, local_today_datas[code][1552:1641], capture_timestamp)
        print("时间花费:", round((t.time() - _start) * 1000))
        pass
@@ -998,8 +1047,8 @@
        load_l2_data(code)
        limit_up_time_manager.load_limit_up_time()
        limit_up_time = limit_up_time_manager.get_limit_up_time(code)
        if limit_up_time is not None and l2_data_util.L2DataUtil.get_time_as_second(
                limit_up_time) >= l2_data_util.L2DataUtil.get_time_as_second(
        if limit_up_time is not None and l2.l2_data_util.L2DataUtil.get_time_as_second(
                limit_up_time) >= l2.l2_data_util.L2DataUtil.get_time_as_second(
            "14:30:00"):
            return False, "14:30后涨停的不能买,涨停时间为{}".format(limit_up_time)