Administrator
2023-01-06 59fba698b03a51a8da5b56a919ebbf94d4784f74
l2_data_manager.py
@@ -1,18 +1,33 @@
"""
L2的数据处理
"""
import decimal
import json
import logging
import random
import time as t
from datetime import datetime
import data_process
import big_money_num_manager
import code_data_util
import constant
import global_data_loader
import global_util
import industry_codes_sort
import l2_data_log
import l2_data_util
import mysql
import gpcode_manager
import mongo_data
import l2_trade_factor
import log
import redis_manager
import ths_industry_util
import tool
import trade_manager
from log import logger_l2_trade
from log import logger_l2_trade, logger_l2_trade_cancel, logger_l2_trade_buy, logger_l2_process,logger_l2_data
import trade_data_manager
import limit_up_time_manager
_redisManager = redis_manager.RedisManager(1)
# l2数据管理
@@ -53,63 +68,110 @@
    @staticmethod
    def delete_buy_point(code):
        redis = TradePointManager.__get_redis()
        redis.delete("buy_compute_index-{}".format(code))
        redis.delete("buy_compute_num-{}".format(code))
    # 删除买撤点数据
    @staticmethod
    def delete_buy_cancel_point(code):
        redis = TradePointManager.__get_redis()
        redis.delete("buy_cancel_compute_index-{}".format(code))
        redis.delete("buy_cancel_compute_num-{}".format(code))
        redis.delete("buy_compute_index_info-{}".format(code))
    # 获取买入点信息
    # 返回数据为:买入点 累计纯买额 已经计算的数据索引
    @staticmethod
    def get_buy_compute_start_data(code):
        redis = TradePointManager.__get_redis()
        index = redis.get("buy_compute_index-{}".format(code))
        total_num = redis.get("buy_compute_num-{}".format(code))
        if index is None:
            return None, 0
        else:
            return int(index), int(total_num)
        _key = "buy_compute_index_info-{}".format(code)
        _data_json = redis.get(_key)
        if _data_json is None:
            return None, None, None, 0, 0
        _data = json.loads(_data_json)
        return _data[0], _data[1], _data[2], _data[3], _data[4]
    # 设置买入点的值
    # buy_single_index 买入信号位
    # buy_exec_index 买入执行位
    # compute_index 计算位置
    # nums 累计纯买额
    @staticmethod
    def set_buy_compute_start_data(code, num_add, index=None):
    def set_buy_compute_start_data(code, buy_single_index, buy_exec_index, compute_index, nums, count):
        redis = TradePointManager.__get_redis()
        expire = tool.get_expire()
        if index is not None:
            redis.setex("buy_compute_index-{}".format(code), expire, index)
        key = "buy_compute_num-{}".format(code)
        if redis.get(key) is None:
            redis.setex(key, expire, num_add)
        _key = "buy_compute_index_info-{}".format(code)
        if buy_single_index is not None:
            redis.setex(_key, expire, json.dumps((buy_single_index, buy_exec_index, compute_index, nums, count)))
        else:
            redis.incrby(key, num_add)
            _buy_single_index, _buy_exec_index, _compute_index, _nums, _count = TradePointManager.get_buy_compute_start_data(
                code)
            redis.setex(_key, expire, json.dumps((_buy_single_index, buy_exec_index, compute_index, nums, count)))
    # 获取撤买入开始计算的信息
    # 返回数据的内容为:撤销点索引 撤买纯买额 计算的数据索引
    @staticmethod
    def get_buy_cancel_compute_start_data(code):
    def get_buy_cancel_single_pos(code):
        redis = TradePointManager.__get_redis()
        index = redis.get("buy_cancel_compute_index-{}".format(code))
        total_num = redis.get("buy_cancel_compute_num-{}".format(code))
        if index is None:
            return None, 0
        info = redis.get("buy_cancel_single_pos-{}".format(code))
        if info is None:
            return None
        else:
            return int(index), int(total_num)
            return int(info)
    # 设置买撤点信息
    @staticmethod
    def set_buy_cancel_compute_start_data(code, num_add, index=None):
    # buy_num 纯买额  computed_index计算到的下标  index撤买信号起点
    @classmethod
    def set_buy_cancel_single_pos(cls, code, index):
        redis = TradePointManager.__get_redis()
        expire = tool.get_expire()
        if index is not None:
            redis.setex("buy_cancel_compute_index-{}".format(code), expire, index)
        key = "buy_cancel_compute_num-{}".format(code)
        if redis.get(key) is None:
            redis.setex(key, expire, num_add)
        redis.setex("buy_cancel_single_pos-{}".format(code), expire, index)
    # 删除买撤点数据
    @classmethod
    def delete_buy_cancel_point(cls, code):
        redis = TradePointManager.__get_redis()
        redis.delete("buy_cancel_single_pos-{}".format(code))
    # 设置买撤纯买额
    @classmethod
    def set_compute_info_for_cancel_buy(cls, code, index, nums):
        redis = TradePointManager.__get_redis()
        expire = tool.get_expire()
        redis.setex("compute_info_for_cancel_buy-{}".format(code), expire, json.dumps((index, nums)))
        logger_l2_trade_buy.info("{}保存撤单纯买额信息:{},{}", code, index, nums)
    # 获取买撤纯买额计算信息
    @classmethod
    def get_compute_info_for_cancel_buy(cls, code):
        redis = TradePointManager.__get_redis()
        info = redis.get("compute_info_for_cancel_buy-{}".format(code))
        if info is None:
            return None, 0
        else:
            redis.incrby(key, num_add)
            info = json.loads(info)
            return info[0], info[1]
    @classmethod
    def delete_compute_info_for_cancel_buy(cls, code):
        redis = TradePointManager.__get_redis()
        redis.delete("compute_info_for_cancel_buy-{}".format(code))
    # 从买入信号开始设置涨停买与涨停撤的单数
    @classmethod
    def set_count_info_for_cancel_buy(cls, code, index, buy_count, cancel_count):
        redis = TradePointManager.__get_redis()
        expire = tool.get_expire()
        redis.setex("count_info_for_cancel_buy-{}".format(code), expire, json.dumps((index, buy_count, cancel_count)))
        logger_l2_trade_buy.info("{}保存撤单纯买额信息:{},{}", code, index, buy_count, cancel_count)
    # 获取买撤纯买额计算信息
    @classmethod
    def get_count_info_for_cancel_buy(cls, code):
        redis = TradePointManager.__get_redis()
        info = redis.get("count_info_for_cancel_buy-{}".format(code))
        if info is None:
            return None, 0, 0
        else:
            info = json.loads(info)
            return info[0], info[1], info[2]
    @classmethod
    def delete_count_info_for_cancel_buy(cls, code):
        redis = TradePointManager.__get_redis()
        redis.delete("count_info_for_cancel_buy-{}".format(code))
def load_l2_data(code, force=False):
@@ -126,21 +188,30 @@
        # 获取今日的数据
    if local_today_datas.get(code) is None or force:
        datas = []
        keys = redis.keys("l2-{}-*".format(code))
        for k in keys:
            value = redis.get(k)
            _data = l2_data_util.l2_data_key_2_obj(k, value)
            datas.append(_data)
        # 排序
        new_datas = sorted(datas,
                           key=lambda e: (int(e.__getitem__('val')["time"].replace(":", "")), e.__getitem__('index')))
        local_today_datas[code] = new_datas
    # 根据今日数据加载
    l2_data_util.load_num_operate_map(local_today_num_operate_map, code, local_today_datas.get(code), force)
        datas = log.load_l2_from_log()
        datas = datas.get(code)
        if datas is None:
            datas= []
        local_today_datas[code] = datas
        # 从数据库加载
        # datas = []
        # keys = redis.keys("l2-{}-*".format(code))
        # for k in keys:
        #     value = redis.get(k)
        #     _data = l2_data_util.l2_data_key_2_obj(k, value)
        #     datas.append(_data)
        # # 排序
        # new_datas = sorted(datas,
        #                    key=lambda e: (int(e.__getitem__('val')["time"].replace(":", "")), e.__getitem__('index')))
        # local_today_datas[code] = new_datas
        # 根据今日数据加载
        l2_data_util.load_num_operate_map(local_today_num_operate_map, code, local_today_datas.get(code), force)
def saveL2Data(code, datas):
@tool.async_call
def saveL2Data(code, datas, msg=""):
    start_time = round(t.time() * 1000)
    # 查询票是否在待监听的票里面
    if not gpcode_manager.is_in_gp_pool(code):
        return None
@@ -152,304 +223,1710 @@
            # 计算保留的时间
            expire = tool.get_expire()
            index = 0
            start_index = redis_instance.get("l2-maxindex-{}".format(code))
            if start_index is None:
                start_index = 0
            else:
                start_index = int(start_index)
            max_index = start_index
            i = 0
            for _data in datas:
                index = index + 1
                i += 1
                key = "l2-" + _data["key"]
                value = redis_instance.get(key)
                if value is None:
                    # 新增
                    max_index = start_index + index
                    value = {"index": start_index + index, "re": _data['re']}
                    redis_instance.setex(key, expire, json.dumps(value))
                    try:
                        value = {"index": _data["index"], "re": _data["re"]}
                        redis_instance.setex(key, expire, json.dumps(value))
                    except:
                        logging.error("更正L2数据出错:{} key:{}".format(code, key))
                else:
                    json_value = json.loads(value)
                    if json_value["re"] != _data["re"]:
                        json_value["re"] = _data["re"]
                        redis_instance.setex(key, expire, json.dumps(json_value))
            redis_instance.setex("l2-maxindex-{}".format(code), expire, max_index)
    finally:
        redis_instance.delete("l2-save-{}".format(code))
    print("保存新数据用时:", msg, "耗时:{}".format(round(t.time() * 1000) - start_time))
    return datas
def parseL2Data(str):
    now = int(t.time())
    day = datetime.now().strftime("%Y%m%d")
    dict = json.loads(str)
    data = dict["data"]
    client = dict["client"]
    code = data["code"]
    channel = data["channel"]
    capture_time = data["captureTime"]
    process_time = data["processTime"]
    data = data["data"]
    datas = []
    dataIndexs = {}
    # 获取涨停价
    limit_up_price = gpcode_manager.get_limit_up_price(code)
    same_time_num = {}
    for item in data:
        # 解析数据
        time = item["time"]
        if time in same_time_num:
            same_time_num[time] = same_time_num[time] + 1
        else:
            same_time_num[time] = 1
        price = float(item["price"])
        num = item["num"]
        limitPrice = item["limitPrice"]
        # 涨停价
        if limit_up_price is not None and limit_up_price == tool.to_price(decimal.Decimal(price)):
            limitPrice = 1
            item["limitPrice"] = "{}".format(limitPrice)
        operateType = item["operateType"]
        cancelTime = item["cancelTime"]
        cancelTimeUnit = item["cancelTimeUnit"]
        key = "{}-{}-{}-{}-{}-{}-{}-{}".format(code, operateType, time, num, price, limitPrice, cancelTime,
                                               cancelTimeUnit)
        if key in dataIndexs:
            # 数据重复次数+1
            datas[dataIndexs[key]]['re'] = datas[dataIndexs[key]]['re'] + 1
        else:
            # 数据重复次数默认为1
            datas.append({"key": key, "val": item, "re": 1})
            dataIndexs.setdefault(key, len(datas) - 1)
    for key in same_time_num:
        if same_time_num[key] > 50:
            # 只能保存近3s的数据
            ts1 = l2_data_util.get_time_as_seconds(datas[-1]["val"]["time"])
            ts_now = l2_data_util.get_time_as_seconds(datetime.now().strftime("%H:%M:%S"))
            if abs(ts1 - ts_now) <= 3:
                # TODO 保存数据
                redis = _redisManager.getRedis()
                redis.set("big_data-{}-{}".format(code, int(round(t.time() * 1000))), str)
    return day, client, channel, code, datas
# 纠正数据,将re字段替换为较大值
def correct_data(code, _datas):
    latest_data = local_latest_datas.get(code)
    if latest_data is None:
        latest_data = []
    for data in _datas:
        for _ldata in latest_data:
            if _ldata["key"] == data["key"] and _ldata["re"] != data["re"]:
                max_re = max(_ldata["re"], data["re"])
                _ldata["re"] = max_re
                data["re"] = max_re
    return _datas
    datas = L2DataUtil.format_l2_data(data, code, limit_up_price)
    # 获取涨停价
    return day, client, channel, code, capture_time, process_time, datas, data
# 保存l2数据
def save_l2_data(code, datas, add_datas):
    redis = _redisManager.getRedis()
    # 保存最近的数据
    redis.setex("l2-data-latest-{}".format(code), tool.get_expire(), json.dumps(datas))
    # 设置进内存
    if code in local_latest_datas:
        local_latest_datas[code] = datas
    else:
        local_latest_datas.setdefault(code, datas)
    __set_l2_data_latest_count(code, len(datas))
    # 只有有新曾数据才需要保存
    if len(add_datas) > 0:
        # 保存最近的数据
        __start_time = round(t.time() * 1000)
        redis.setex("l2-data-latest-{}".format(code), tool.get_expire(), json.dumps(datas))
        l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time, "保存最近l2数据用时")
        # 设置进内存
        local_latest_datas[code] = datas
        __set_l2_data_latest_count(code, len(datas))
        try:
            logger_l2_data.info("{}-{}",code,add_datas)
        except Exception as e:
            logging.exception(e)
        saveL2Data(code, add_datas)
# 获取增量数据
def get_add_data(code, datas):
    if datas is not None and len(datas) < 1:
        return []
    last_key = ""
    __latest_datas = local_latest_datas.get(code)
    if __latest_datas is not None and len(__latest_datas) > 0:
        last_key = __latest_datas[-1]["key"]
    count = 0
    start_index = -1
    # 如果原来没有数据
# 清除l2数据
def clear_l2_data(code):
    redis_l2 = redis_manager.RedisManager(1).getRedis()
    keys = redis_l2.keys("l2-{}-*".format(code))
    for k in keys:
        redis_l2.delete(k)
    for n in reversed(datas):
        count += 1
        if n["key"] == last_key:
            start_index = len(datas) - count
            break
    if len(last_key) > 0:
        if start_index < 0 or start_index + 1 >= len(datas):
            return []
    redis_l2.delete("l2-data-latest-{}".format(code))
class L2DataUtil:
    @classmethod
    def is_same_time(cls, time1, time2):
        if constant.TEST:
            return True
        time1_s = time1.split(":")
        time1_second = int(time1_s[0]) * 3600 + int(time1_s[1]) * 60 + int(time1_s[2])
        time2_s = time2.split(":")
        time2_second = int(time2_s[0]) * 3600 + int(time2_s[1]) * 60 + int(time2_s[2])
        if abs(time2_second - time1_second) < 3:
            return True
        else:
            return datas[start_index + 1:]
    else:
        return datas[start_index + 1:]
            return False
    # 获取增量数据
    @classmethod
    def get_add_data(cls, code, datas, _start_index):
        if datas is not None and len(datas) < 1:
            return []
        last_data = None
        latest_datas_ = local_latest_datas.get(code)
        if latest_datas_ is not None and len(latest_datas_) > 0:
            last_data = latest_datas_[-1]
def __is_same_time(time1, time2):
    # TODO 测试
    # if 1 > 0:
    #     return True
    time1_s = time1.split(":")
    time1_second = int(time1_s[0]) * 3600 + int(time1_s[1]) * 60 + int(time1_s[2])
    time2_s = time2.split(":")
    time2_second = int(time2_s[0]) * 3600 + int(time2_s[1]) * 60 + int(time2_s[2])
    if abs(time2_second - time1_second) < 3:
        count = 0
        start_index = -1
        # 如果原来没有数据
        # 设置add_data的序号
        for n in reversed(datas):
            count += 1
            if n["key"] == (last_data["key"] if last_data is not None else ""):
                start_index = len(datas) - count
                break
        _add_datas = []
        if last_data is not None:
            if start_index < 0:
                if L2DataUtil.get_time_as_second(datas[0]["val"]["time"]) >= L2DataUtil.get_time_as_second(
                        last_data["val"]["time"]):
                    _add_datas = datas
                else:
                    _add_datas = []
            elif start_index + 1 >= len(datas):
                _add_datas = []
            else:
                _add_datas = datas[start_index + 1:]
        else:
            _add_datas = datas[start_index + 1:]
        for i in range(0, len(_add_datas)):
            _add_datas[i]["index"] = _start_index + i
        return _add_datas
    # 纠正数据,将re字段替换为较大值
    @classmethod
    def correct_data(cls, code, _datas):
        latest_data = local_latest_datas.get(code)
        if latest_data is None:
            latest_data = []
        save_list = []
        for data in _datas:
            for _ldata in latest_data:
                # 新数据条数比旧数据多才保存
                if _ldata["key"] == data["key"] and _ldata["re"] < data["re"]:
                    max_re = max(_ldata["re"], data["re"])
                    _ldata["re"] = max_re
                    data["re"] = max_re
                    # 保存到数据库,更新re的数据
                    save_list.append(_ldata)
        if len(save_list) > 0:
            saveL2Data(code, save_list, "保存纠正数据")
            local_latest_datas[code] = latest_data
        return _datas
    # 处理l2数据
    @classmethod
    def format_l2_data(cls, data, code, limit_up_price):
        datas = []
        dataIndexs = {}
        same_time_num = {}
        for item in data:
            # 解析数据
            time = item["time"]
            if time in same_time_num:
                same_time_num[time] = same_time_num[time] + 1
            else:
                same_time_num[time] = 1
            price = float(item["price"])
            num = item["num"]
            limitPrice = item["limitPrice"]
            # 涨停价
            if limit_up_price is not None:
                if limit_up_price == tool.to_price(decimal.Decimal(price)):
                    limitPrice = 1
                else:
                    limitPrice = 0
                item["limitPrice"] = "{}".format(limitPrice)
            operateType = item["operateType"]
            # 不需要非涨停买与买撤
            if int(item["limitPrice"]) != 1 and (int(operateType) == 0 or int(operateType) == 1):
                continue
            cancelTime = item["cancelTime"]
            cancelTimeUnit = item["cancelTimeUnit"]
            key = "{}-{}-{}-{}-{}-{}-{}-{}".format(code, operateType, time, num, price, limitPrice, cancelTime,
                                                   cancelTimeUnit)
            if key in dataIndexs:
                # 数据重复次数+1
                datas[dataIndexs[key]]['re'] = datas[dataIndexs[key]]['re'] + 1
            else:
                # 数据重复次数默认为1
                datas.append({"key": key, "val": item, "re": 1})
                dataIndexs.setdefault(key, len(datas) - 1)
        # TODO 测试的时候开启,方便记录大单数据
        # l2_data_util.save_big_data(code, same_time_num, data)
        return datas
    @classmethod
    def get_time_as_second(cls, time_str):
        ts = time_str.split(":")
        return int(ts[0]) * 3600 + int(ts[1]) * 60 + int(ts[2])
    # @classmethod
    # def get_time_as_str(cls, time_seconds):
    #     ts = time_str.split(":")
    #     return int(ts[0]) * 3600 + int(ts[1]) * 60 + int(ts[2])
    # 是否是涨停价买
    @classmethod
    def is_limit_up_price_buy(cls, val):
        if int(val["limitPrice"]) != 1:
            return False
        if int(val["operateType"]) != 0:
            return False
        price = float(val["price"])
        num = int(val["num"])
        if price * num * 100 < 50 * 10000:
            return False
        return True
    else:
    # 是否为涨停卖
    @classmethod
    def is_limit_up_price_sell(cls, val):
        if int(val["limitPrice"]) != 1:
            return False
        if int(val["operateType"]) != 2:
            return False
        price = float(val["price"])
        num = int(val["num"])
        if price * num * 100 < 50 * 10000:
            return False
        return True
    # 是否涨停买撤
    @classmethod
    def is_limit_up_price_buy_cancel(cls, val):
        if int(val["limitPrice"]) != 1:
            return False
        if int(val["operateType"]) != 1:
            return False
        price = float(val["price"])
        num = int(val["num"])
        if price * num * 100 < 50 * 10000:
            return False
        return True
    # 是否卖撤
    @classmethod
    def is_sell_cancel(cls, val):
        if int(val["operateType"]) == 3:
            return True
        return False
    # 是否为卖
    @classmethod
    def is_sell(cls, val):
        if int(val["operateType"]) == 2:
            return True
        return False
def process_data(code, datas):
    now_time_str = datetime.now().strftime("%H:%M:%S")
    __start_time = round(t.time() * 1000)
    try:
        if len(datas) > 0:
            # 判断价格区间是否正确
            if not data_process.is_same_code_with_price(code, float(datas[0]["val"]["price"])):
# L2交易数据处理器
# 一些常见的概念:
# 买入信号位置(出现下单信号的第一条数据的位置):buy_single_index
# 买入执行位置(符合下单信号的最后一条数据):buy_exec_index
# 计算位置(当前计算的整个计算的位置):compute_index
#
                raise L2DataException(L2DataException.CODE_PRICE_ERROR, "股价不匹配 code-{} price-{}".format(code,datas[0]["val"]["price"]))
            # 加载历史数据
            load_l2_data(code)
            # 纠正数据
            datas = correct_data(code, datas)
            add_datas = get_add_data(code, datas)
            if len(add_datas) > 0:
                # 拼接数据
                local_today_datas[code].extend(add_datas)
                l2_data_util.load_num_operate_map(local_today_num_operate_map, code, add_datas)
class L2TradeDataProcessor:
    unreal_buy_dict = {}
    random_key = {}
                latest_time = add_datas[len(add_datas) - 1]["val"]["time"]
                # 时间差不能太大才能处理
                if __is_same_time(now_time_str, latest_time):
                    # logger.info("及时的数据,新增数据数量{}".format(len(add_datas)))
    @classmethod
    def debug(cls, code, content, *args):
        logger_l2_trade.debug(("thread-id={} code={}  ".format(cls.random_key[code], code) + content).format(*args))
                    # 是否已经有买入开始计算点
                    c_index, c_num = TradePointManager.get_buy_compute_start_data(code)
                    if c_index is None:
                        # 判断是否出现禁止交易信号
                        forbidden = __is_have_forbidden_feature(code, len(add_datas) + 6, 6)
                        if forbidden:
                            trade_manager.forbidden_trade(code)
    @classmethod
    def cancel_debug(cls, code, content, *args):
        logger_l2_trade_cancel.debug(
            ("thread-id={} code={}  ".format(cls.random_key[code], code) + content).format(*args))
                        # 没有计算开始点
                        c_index = __get_limit_up_buy_start(code, len(add_datas) + 3, 3)
                        if c_index is not None:
                            total_datas = local_today_datas[code]
                            logger_l2_trade.info("找到买点:{} - {}".format(code, json.dumps(total_datas[c_index])))
    @classmethod
    def buy_debug(cls, code, content, *args):
        logger_l2_trade_buy.debug(
            ("thread-id={} code={}  ".format(cls.random_key[code], code) + content).format(*args))
                            # 触发数据分析 ,获取连续涨停标记数据
                            buy_nums = 0
                            for i in range(c_index, len(total_datas)):
                                _val = total_datas[i]["val"]
                                # 有连续4个涨停买就标记计算起始点
                                if int(_val["limitPrice"]) == 1 and int(_val["operateType"]) == 0:
                                    # 涨停买
                                    buy_nums += int(_val["num"]) * int(total_datas[i]["re"])
                                elif int(_val["limitPrice"]) == 1 and int(_val["operateType"]) == 1:
                                    # 涨停买撤
                                    buy_nums -= int(_val["num"]) * int(total_datas[i]["re"])
    @classmethod
    # 数据处理入口
    # datas: 本次截图数据
    # capture_timestamp:截图时间戳
    def process(cls, code, datas, capture_timestamp):
        cls.random_key[code] = random.randint(0, 100000)
        now_time_str = datetime.now().strftime("%H:%M:%S")
        __start_time = round(t.time() * 1000)
        try:
            if len(datas) > 0:
                            TradePointManager.set_buy_compute_start_data(code, buy_nums, c_index)
                            # 获取涨停价
                            limit_up_price = gpcode_manager.get_limit_up_price(code)
                            if limit_up_price is not None:
                                if buy_nums * limit_up_price * 100 > 1000 * 10000:
                                    # 大于1000w就买
                                    logger_l2_trade.info(
                                        "执行买入:{} - 计算结束点: {}".format(code, json.dumps(total_datas[-1])))
                                    try:
                                        trade_manager.start_buy(code)
                                        TradePointManager.delete_buy_cancel_point(code)
                                    except Exception as e:
                                        pass
                    else:
                        # 有计算开始点,计算新增的数据
                        buy_nums = 0
                        for data in add_datas:
                            _val = data["val"]
                            if int(_val["limitPrice"]) == 1 and int(_val["operateType"]) == 0:
                                # 涨停买
                                buy_nums += int(_val["num"]) * int(data["re"])
                            elif int(_val["limitPrice"]) == 1 and int(_val["operateType"]) == 1:
                                # 涨停买撤
                                buy_nums -= int(_val["num"]) * int(data["re"])
                        TradePointManager.set_buy_compute_start_data(code, buy_nums)
                        latest_num = c_num + buy_nums
                        # 获取涨停价
                        limit_up_price = gpcode_manager.get_limit_up_price(code)
                        if limit_up_price is not None:
                            if latest_num * limit_up_price * 100 > 1000 * 10000:
                                # 大于1000w就买
                                logger_l2_trade.info("执行买入:{} - 计算结束点: {}".format(code, json.dumps(add_datas[-1])))
                                try:
                                    trade_manager.start_buy(code)
                                    TradePointManager.delete_buy_cancel_point(code)
                                except Exception as e:
                                    pass
                # 判断价格区间是否正确
                if not code_data_util.is_same_code_with_price(code, float(datas[0]["val"]["price"])):
                    raise L2DataException(L2DataException.CODE_PRICE_ERROR,
                                          "股价不匹配 code-{} price-{}".format(code, datas[0]["val"]["price"]))
                # 加载历史数据
                load_l2_data(code)
                # 纠正数据
                datas = L2DataUtil.correct_data(code, datas)
                _start_index = 0
                if local_today_datas.get(code) is not None and len(local_today_datas[code]) > 0:
                    _start_index = local_today_datas[code][-1]["index"] + 1
                add_datas = L2DataUtil.get_add_data(code, datas, _start_index)
                if len(add_datas) > 0:
                    # 拼接数据
                    local_today_datas[code].extend(add_datas)
                    l2_data_util.load_num_operate_map(local_today_num_operate_map, code, add_datas)
                total_datas = local_today_datas[code]
                # 过时 买入确认点处理
                # TradeBuyDataManager.process_buy_sure_position_info(code, capture_timestamp, total_datas,
                #                                                    total_datas[-1],
                #                                                    add_datas)
                if len(add_datas) > 0:
                    _start_time = round(t.time() * 1000)
                    latest_time = add_datas[len(add_datas) - 1]["val"]["time"]
                    # 时间差不能太大才能处理
                    # TODO 暂时关闭处理
                    # if L2DataUtil.is_same_time(now_time_str, latest_time):
                    #     # 判断是否已经挂单
                    #     state = trade_manager.get_trade_state(code)
                    #     start_index = len(total_datas) - len(add_datas)
                    #     end_index = len(total_datas) - 1
                    #     if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER:
                    #         # 已挂单
                    #         cls.__process_order(code, start_index, end_index, capture_timestamp)
                    #     else:
                    #         # 未挂单
                    #         cls.__process_not_order(code, start_index, end_index, capture_timestamp)
                    logger_l2_process.info("code:{} 处理数据范围: {}-{} 处理时间:{}", code, add_datas[0]["index"],
                                           add_datas[-1]["index"], round(t.time() * 1000) - __start_time)
                # 保存数据
                save_l2_data(code, datas, add_datas)
        finally:
            if code in cls.unreal_buy_dict:
                cls.unreal_buy_dict.pop(code)
                    if c_index is not None:
                        # 是否处于委托待成交
                        state = trade_manager.get_trade_state(code)
                        if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER:
                            # 已经委托,检测取消接口
                            cancel_index, cancel_num = TradePointManager.get_buy_cancel_compute_start_data(code)
                            if cancel_index is None:
                                # 之前尚未监测到买撤起点
                                cancel_index = __get_limit_up_buy_cancel_start(code, len(add_datas) + 3, 3)
                                if cancel_index is not None:
                                    total_datas = local_today_datas[code]
                                    # print("找到买撤点", cancel_index, total_datas[cancel_index])
                                    logger_l2_trade.info(
                                        "找到买撤点:{} - {}".format(code, json.dumps(total_datas[cancel_index])))
    @classmethod
    def __compute_big_money_data(cls, code, start_index, end_index):
        # 计算大单
        total_datas = local_today_datas[code]
        num = 0
        for index in range(start_index, end_index + 1):
            data = total_datas[index]
            if l2_trade_factor.L2TradeFactorSourceDataUtil.is_big_money(data):
                if int(data["val"]["operateType"]) == 0:
                    num += data["re"]
                elif int(data["val"]["operateType"]) == 1:
                    num -= data["re"]
        big_money_num_manager.add_num(code, num)
                                    # 触发数据分析 ,获取连续涨停标记数据
                                    nums = 0
                                    for i in range(c_index, len(total_datas)):
                                        _val = total_datas[i]["val"]
                                        if int(_val["limitPrice"]) == 1 and int(_val["operateType"]) == 1:
                                            # 涨停买撤
                                            nums += int(_val["num"]) * int(total_datas[i]["re"])
    # 处理未挂单
    @classmethod
    def __process_not_order(cls, code, start_index, end_index, capture_time):
        # 获取阈值
        threshold_money = cls.__get_threshmoney(code)
        cls.__start_compute_buy(code, start_index, end_index, threshold_money, capture_time)
                                    TradePointManager.set_buy_cancel_compute_start_data(code, nums, cancel_index)
                            else:
                                # 之前监测到了买撤销起点
                                cancel_nums_add = 0
                                for data in add_datas:
                                    _val = data["val"]
                                    if int(_val["limitPrice"]) == 1 and int(_val["operateType"]) == 1:
                                        # 涨停买撤
                                        cancel_nums_add += int(_val["num"]) * int(data["re"])
                                TradePointManager.set_buy_cancel_compute_start_data(code, cancel_nums_add)
                                latest_num = cancel_num + cancel_nums_add
                                # 获取涨停价
                                limit_up_price = gpcode_manager.get_limit_up_price(code)
                                if limit_up_price is not None:
                                    if latest_num * limit_up_price * 100 > 1000 * 10000:
                                        # 大于1000w就买
                                        # print("执行撤销")
                                        logger_l2_trade.info(
                                            "执行撤销:{} - {}".format(code, json.dumps(add_datas[-1])))
                                        try:
                                            trade_manager.start_cancel_buy(code)
                                            # 取消买入标识
                                            TradePointManager.delete_buy_point(code)
                                            TradePointManager.delete_buy_cancel_point(code)
                                        except Exception as e:
                                            pass
    @classmethod
    def __statistic_count_l2_data_for_cancel(cls, code, start_index, end_index, has_cancel_single=False):
        index, old_buy_count, old_cancel_count = TradePointManager.get_count_info_for_cancel_buy(code)
        for i in range(start_index, end_index + 1):
            buy_count, buy_cancel_count = cls.__count_l2_data_for_cancel(code, i, i)
            old_buy_count += buy_count
                            pass
            # 保存数据
            save_l2_data(code, datas, add_datas)
    finally:
            old_cancel_count += buy_cancel_count
            if old_buy_count > 0 and (old_buy_count - old_cancel_count) / old_buy_count < 0.3 and has_cancel_single:
                return i, True
        TradePointManager.set_count_info_for_cancel_buy(code, end_index, old_buy_count,
                                                        old_cancel_count)
        return end_index, False
    # 处理已挂单
    @classmethod
    def __process_order(cls, code, start_index, end_index, capture_time, new_add=True):
        if start_index < 0:
            start_index = 0
        if end_index < start_index:
            return
        # 获取之前是否有记录的撤买信号
        #  cancel_index = TradePointManager.get_buy_cancel_single_pos(code)
        # cancel_computed_index, cancel_buy_num = TradePointManager.get_compute_info_for_cancel_buy(code)
        # if cancel_computed_index is None:
        #     logger_l2_trade.error("{} 未获取到买撤纯买额,起始计算位:{}", code, start_index)
        # 统计群撤大单
        L2BetchCancelBigNumProcessor.process_new(code, start_index, end_index)
        # 统计最大连续买单
        L2ContinueLimitUpCountManager.process(code, start_index, end_index)
        # 计算大单撤销
        need_cancel, cancel_data = L2BigNumProcessor.process_cancel_with_big_num(code, start_index, end_index)
        if need_cancel:
            # 已经撤单了
            threshold_money = cls.__get_threshmoney(code)
            # 重新处理下单
            cls.__start_compute_buy(code, cancel_data["index"] + 1, end_index, threshold_money, capture_time)
            return
        # buy_single_index, buy_exec_index, buy_compute_index, buy_num = cls.__get_order_begin_pos(code)
        # if cancel_index is None:
        # 无撤单信号起始点记录
        continue_cancel = L2ContinueLimitUpCountManager.get_continue_count(code)
        order_cancel_begin_start = max(start_index - (continue_cancel - 1),
                                       0) if new_add else start_index
        order_cancel_begin_end = end_index
        total_datas = local_today_datas[code]
        little_cancel = False
        # 大单撤单的数据不为空
        if cancel_data is not None:
            # 小群撤事件
            continue_cancel = 5
            cancel_time_seconds = L2DataUtil.get_time_as_second(cancel_data["val"]["time"])
            # 查找上一秒与下一秒
            for i in range(int(cancel_data["index"]), 0, -1):
                # 查找上一秒和下一秒
                if total_datas[i]["val"]["time"] != cancel_data["val"][
                    "time"] and cancel_time_seconds - L2DataUtil.get_time_as_second(total_datas[i]["val"]["time"]) > 1:
                    order_cancel_begin_start = i + 1
                    break
            for i in range(int(cancel_data["index"]), len(local_today_datas[code])):
                # 查找上一秒和下一秒
                if total_datas[i]["val"]["time"] != cancel_data["val"]["time"] and L2DataUtil.get_time_as_second(
                        total_datas[i]["val"]["time"]) - cancel_time_seconds > 1:
                    order_cancel_begin_end = i - 1
                    break
            cls.cancel_debug(code, "小群撤事件计算范围:{},{}", order_cancel_begin_start, order_cancel_begin_end)
            little_cancel = True
        cancel_start_index = None
        cancel_end_index = None
        need_cancel = False
        if little_cancel:
            # 小群撤事件
            cancel_start_index, cancel_end_index = cls.__compute_order_cancel_little_begin_single(code,
                                                                                                  order_cancel_begin_start
                                                                                                  , continue_cancel,
                                                                                                  order_cancel_begin_end)
            if cancel_start_index is not None:
                cls.debug(code, "找到小群撤信号,撤单信号范围:{}-{}", cancel_start_index, cancel_end_index)
                # 有小群撤信号
                need_cancel = True
            else:
                # 不满足小群撤,从小群撤后面一条数据继续处理
                cls.__process_order(code, cancel_data["index"] + 1, end_index, capture_time, False)
                return
        else:
            # 大群撤事件
            cancel_start_index, cancel_end_index = cls.__compute_order_cancel_begin_single(
                code, order_cancel_begin_start
                , continue_cancel, order_cancel_begin_end)
            if cancel_start_index is not None:
                cls.debug(code, "找到大群撤信号,连续笔数阈值:{}, 撤单信号范围:{}-{}", continue_cancel, cancel_start_index,
                          cancel_end_index)
                # 判断是否有大群撤大单撤
                need_cancel = L2BetchCancelBigNumProcessor.need_cancel(code, cancel_start_index, cancel_end_index)
                if need_cancel:
                    cls.debug(code, "大群撤信号有大单撤销")
                else:
                    cls.debug(code, "大群撤信号无大单撤销")
        if need_cancel:
            # 需要撤买
            cls.cancel_buy(code)
            if cancel_end_index >= end_index:
                return
            # 继续处理下单信号
            threshold_money = cls.__get_threshmoney(code)
            cls.__start_compute_buy(code, cancel_end_index + 1, end_index, threshold_money, capture_time, False)
        else:
            # 是否有虚拟下单
            unreal_buy_info = cls.unreal_buy_dict.get(code)
            if unreal_buy_info is not None:
                cls.debug(code, "有虚拟下单,无买撤信号,开始执行买入,执行位置:{},截图时间:{}", unreal_buy_info[0], capture_time)
                # unreal_buy_info 的内容格式为:(触法买操作下标,截图时间)
                # 真实下单
                cls.__buy(code, unreal_buy_info[1], local_today_datas[code][unreal_buy_info[0]],
                          unreal_buy_info[0])
    # 过时 开始计算撤的信号
    @classmethod
    def __start_compute_cancel(cls, code, cancel_index, compute_start_index, origin_num, threshold_money, capture_time):
        # sure_type 0-虚拟挂买位  1-真实挂买位
        cancel_single = cancel_index is not None
        computed_index, buy_num_for_cancel, sure_type = cls.__sum_buy_num_for_cancel_order(code, compute_start_index,
                                                                                           origin_num, threshold_money,
                                                                                           cancel_single)
        total_datas = local_today_datas[code]
        if computed_index is not None:
            cls.debug(code, "获取到撤单执行信号,信号位置:{},m2:{} 数据:{}", computed_index, threshold_money,
                      total_datas[computed_index])
            # 发出撤买信号,需要撤买
            if cls.unreal_buy_dict.get(code) is not None:
                # 有虚拟下单
                cls.debug(code, "之前有虚拟下单,执行虚拟撤买")
                # 删除虚拟下单标记
                cls.unreal_buy_dict.pop(code)
                # 删除下单标记位置
                TradePointManager.delete_buy_point(code)
            else:
                # 无虚拟下单,需要执行撤单
                cls.debug(code, "之前无虚拟下单,执行真实撤单")
                cls.__cancel_buy(code)
            if computed_index < len(local_today_datas[code]) - 1:
                # 数据尚未处理完,重新进入下单计算流程
                cls.__start_compute_buy(code, computed_index + 1, threshold_money, capture_time, False)
                pass
        else:
            cls.debug(code, "撤买纯买额计算,计算位置:{}-{},目前为止纯买手数:{}", compute_start_index, total_datas[-1]["index"],
                      buy_num_for_cancel)
            # 无需撤买,设置计算信息
            TradePointManager.set_compute_info_for_cancel_buy(code, int(total_datas[-1]["index"]), buy_num_for_cancel)
            # 判断是否有虚拟下单
            unreal_buy_info = cls.unreal_buy_dict.get(code)
            if unreal_buy_info is not None:
                # unreal_buy_info 的内容格式为:(触法买操作下标,截图时间)
                # 真实下单
                cls.debug(code, "无撤单执行信号,有虚拟下单,执行真实下单")
                cls.__buy(code, unreal_buy_info[1], total_datas[unreal_buy_info[0]],
                          unreal_buy_info[0])
                pass
            else:
                # 终止执行
                pass
    @classmethod
    def __buy(cls, code, capture_timestamp, last_data, last_data_index):
        can, reason = cls.__can_buy(code)
        # 不能购买
        if not can:
            cls.debug(code, "不可以下单,原因:{}", reason)
            return
        else:
            cls.debug(code, "可以下单,原因:{}", reason)
        # 删除虚拟下单
        if code in cls.unreal_buy_dict:
            cls.unreal_buy_dict.pop(code)
        cls.debug(code, "开始执行买入")
        try:
            trade_manager.start_buy(code, capture_timestamp, last_data,
                                    last_data_index)
            TradePointManager.delete_buy_cancel_point(code)
            cls.debug(code, "执行买入成功")
        except Exception as e:
            cls.debug(code, "执行买入异常:{}", str(e))
            pass
        finally:
            cls.debug(code, "m值影响因子:", l2_trade_factor.L2TradeFactorUtil.factors_to_string(code))
    # 是否可以买
    @classmethod
    def __can_buy(cls, code):
        limit_up_time = limit_up_time_manager.get_limit_up_time(code)
        if limit_up_time is not None and L2DataUtil.get_time_as_second(limit_up_time) >= L2DataUtil.get_time_as_second(
                "14:30:00"):
            return False, "14:30后涨停的不能买,涨停时间为{}".format(limit_up_time)
        # 同一板块中老二后面的不能买
        industry, codes = ths_industry_util.get_same_industry_codes(code, gpcode_manager.get_gp_list())
        if industry is None:
            return True, "没有获取到行业"
        codes_index = industry_codes_sort.sort_codes(codes,code)
        if codes_index is not None and codes_index.get(code) is not None and codes_index.get(code) > 1:
            return False, "同一板块中老三,老四,...不能买"
        # 13:00后涨停,本板块中涨停票数<29不能买
        limit_up_time = limit_up_time_manager.get_limit_up_time(code)
        if limit_up_time is not None:
            if int(limit_up_time.replace(":", "")) >= 130000 and global_util.industry_hot_num.get(industry) is not None:
                if global_util.industry_hot_num.get(industry) < 29:
                    return False, "13:00后涨停,本板块中涨停票数<29不能买"
        # 老二,本板块中涨停票数<29 不能买
        if codes_index.get(code) is not None and codes_index.get(code) == 1 and global_util.industry_hot_num.get(
                industry) is not None:
            if global_util.industry_hot_num.get(industry) < 29:
                return False, "老二,本板块中涨停票数<29不能买"
        # 可以下单
        return True, None
    @classmethod
    def __cancel_buy(cls, code):
        try:
            cls.debug(code, "开始执行撤单")
            trade_manager.start_cancel_buy(code)
            # 取消买入标识
            TradePointManager.delete_buy_point(code)
            TradePointManager.delete_buy_cancel_point(code)
            TradePointManager.delete_compute_info_for_cancel_buy(code)
            TradePointManager.delete_count_info_for_cancel_buy(code)
            # 删除大群撤事件的大单
            L2BetchCancelBigNumProcessor.del_recod(code)
            cls.debug(code, "执行撤单成功")
        except Exception as e:
            logging.exception(e)
            cls.debug(code, "执行撤单异常:{}", str(e))
    @classmethod
    def cancel_buy(cls, code):
        # 删除大群撤事件的大单
        L2BetchCancelBigNumProcessor.del_recod(code)
        L2ContinueLimitUpCountManager.del_data(code)
        if code in cls.unreal_buy_dict:
            cls.unreal_buy_dict.pop(code)
            # 取消买入标识
            TradePointManager.delete_buy_point(code)
            TradePointManager.delete_buy_cancel_point(code)
            TradePointManager.delete_compute_info_for_cancel_buy(code)
            TradePointManager.delete_count_info_for_cancel_buy(code)
            # 删除大群撤事件的大单
            L2BetchCancelBigNumProcessor.del_recod(code)
        else:
            cls.__cancel_buy(code)
        L2BigNumProcessor.del_big_num_pos(code)
    @classmethod
    def __start_compute_buy(cls, code, compute_start_index, compute_end_index, threshold_money, capture_time,
                            new_add=True):
        if compute_end_index < compute_start_index:
            return
        total_datas = local_today_datas[code]
        # 获取买入信号计算起始位置
        buy_single_index, buy_exec_index, buy_compute_index, num = cls.__get_order_begin_pos(code)
        # 是否为新获取到的位置
        new_get_pos = False
        if buy_single_index is None:
            # 有买入信号
            has_single, _index = cls.__compute_order_begin_pos(code, max(
                compute_start_index - 2 if new_add else compute_start_index, 0), 3, compute_end_index)
            buy_single_index = _index
            if has_single:
                num = 0
                new_get_pos = True
                cls.debug(code, "获取到买入信号起始点:{}  数据:{}", buy_single_index, total_datas[buy_single_index])
                limit_up_time_manager.save_limit_up_time(code, total_datas[buy_single_index]["val"]["time"])
                # 重置大单计算
                big_money_num_manager.reset(code)
        if buy_single_index is None:
            # 未获取到买入信号,终止程序
            return None
        # TODO 可能存在问题 计算大单数量
        cls.__compute_big_money_data(code, max(compute_start_index, buy_single_index), compute_end_index)
        # 买入纯买额统计
        compute_index, buy_nums, rebegin_buy_pos = cls.__sum_buy_num_for_order_3(code, max(buy_single_index,
                                                                                           compute_start_index),
                                                                                 compute_end_index, num,
                                                                                 threshold_money, buy_single_index,
                                                                                 capture_time)
        if rebegin_buy_pos is not None:
            # 需要重新计算纯买额
            cls.__start_compute_buy(code, rebegin_buy_pos, compute_end_index, threshold_money, capture_time, False)
            return
        if compute_index is not None:
            cls.debug(code, "获取到买入执行位置:{} m值:{} 纯买手数:{} 数据:{}", compute_index, threshold_money, buy_nums,
                      total_datas[compute_index])
            # 记录买入信号位置
            cls.__save_order_begin_data(code, buy_single_index, compute_index, compute_index, buy_nums)
            # 虚拟下单
            cls.unreal_buy_dict[code] = (compute_index, capture_time)
            # 删除之前的所有撤单信号
            TradePointManager.delete_buy_cancel_point(code)
            TradePointManager.delete_compute_info_for_cancel_buy(code)
            TradePointManager.delete_count_info_for_cancel_buy(code)
            trade_data_manager.TradeBuyDataManager.remove_buy_position_info(code)
            # 已过时 为买撤保存基础纯买额
            # TradePointManager.set_compute_info_for_cancel_buy(code, compute_index, buy_nums)
            b_buy_count, b_buy_cancel_count = cls.__count_l2_data_before_for_cancel(code, buy_single_index)
            buy_count, buy_cancel_count = cls.__count_l2_data_for_cancel(code, buy_single_index, compute_index)
            TradePointManager.set_count_info_for_cancel_buy(code, compute_index, b_buy_count + buy_count,
                                                            b_buy_cancel_count + buy_cancel_count)
            # 计算大单(从买入信号起始点到挂单执行点),返回是否取消
            cancel_result, cancel_data = L2BigNumProcessor.process_cancel_with_big_num(code, buy_single_index,
                                                                                       compute_index)
            # 计算大群撤的大单
            L2BetchCancelBigNumProcessor.process_new(code, buy_single_index, compute_index)
            # 连续涨停数计算
            L2ContinueLimitUpCountManager.process(code, buy_single_index, compute_index)
            # 数据是否处理完毕
            if compute_index >= compute_end_index:
                cls.debug(code, "数据处理完毕,下单, 数据截图时间-{}", capture_time)
                # 数据已经处理完毕,如果还没撤单就实际下单
                if not cancel_result:
                    cls.__buy(code, capture_time, total_datas[compute_index], compute_index)
            else:
                # 数据尚未处理完毕,进行下一步处理
                cls.debug(code, "数据尚未处理完毕,进行下一步处理,处理进度:{}", compute_index)
                # 如果还没撤单,就继续处理已下单的步骤
                if not cancel_result:
                    cls.__process_order(code, compute_index + 1, compute_end_index, capture_time, False)
                else:
                    cls.__start_compute_buy(code, compute_index + 1, compute_end_index, threshold_money, capture_time,
                                            False)
        else:
            # 未达到下单条件,保存纯买额,设置纯买额
            # 记录买入信号位置
            cls.__save_order_begin_data(code, buy_single_index, -1, compute_end_index, buy_nums)
        pass
    # 获取下单起始信号
    @classmethod
    def __get_order_begin_pos(cls, code):
        buy_single_index, buy_exec_index, compute_index, num = TradePointManager.get_buy_compute_start_data(code)
        return buy_single_index, buy_exec_index, compute_index, num
    @classmethod
    def __save_order_begin_data(self, code, buy_single_index, buy_exec_index, compute_index, num):
        TradePointManager.set_buy_compute_start_data(code, buy_single_index, buy_exec_index, compute_index, num)
    # 计算下单起始信号
    # compute_data_count 用于计算的l2数据数量
    @classmethod
    def __compute_order_begin_pos(cls, code, start_index, continue_count, end_index):
        # 倒数100条数据查询
        datas = local_today_datas[code]
        if end_index - start_index + 1 < continue_count:
            return False, None
        __time = None
        last_index = None
        count = 0
        start = None
        for i in range(start_index, end_index + 1):
            _val = datas[i]["val"]
            # 时间要>=09:30:00
            if L2DataUtil.get_time_as_second(_val["time"]) < second_930:
                continue
            if L2DataUtil.is_limit_up_price_buy(_val) and (last_index is None or (
                    i - last_index == 1 and datas[last_index]["val"]["time"] == datas[i]["val"]["time"])):
                if start is None:
                    start = i
                last_index = i
                count += datas[i]["re"]
                if count >= continue_count:
                    return True, start
            elif not L2DataUtil.is_limit_up_price_sell(_val):
                last_index = None
                count = 0
                start = None
        return False, None
    # 大群撤事件,最多相隔1s
    @classmethod
    def __compute_order_cancel_begin_single(cls, code, start_index, continue_count, end_index):
        datas = local_today_datas[code]
        if end_index - start_index + 1 < continue_count:
            return None, None
        count = 0
        start = -1
        start_time = None
        for i in range(start_index, end_index + 1):
            _val = datas[i]["val"]
            _timestamp = L2DataUtil.get_time_as_second(_val["time"])
            if L2DataUtil.get_time_as_second(_val["time"]) < second_930:
                continue
            if L2DataUtil.is_limit_up_price_buy_cancel(_val) and (start_time is None or _timestamp - start_time < 2):
                if start == -1:
                    start = i
                    start_time = L2DataUtil.get_time_as_second(_val["time"])
                count += datas[i]["re"]
            elif not L2DataUtil.is_limit_up_price_sell(_val):
                if count >= continue_count:
                    return start, i - 1
                start = -1
                count = 0
                start_time = None
        if count >= continue_count:
            return start, end_index
        else:
            return None, None
    # 小群撤事件
    @classmethod
    def __compute_order_cancel_little_begin_single(cls, code, start_index, continue_count, end_index=None):
        # 必须为同一秒的数据
        same_second = True
        datas = local_today_datas[code]
        __len = len(datas)
        if len(datas) - start_index < continue_count:
            return None, None
        count = 0
        start = -1
        start_time = None
        if end_index is None:
            end_index = __len - continue_count
        for i in range(start_index, end_index + 1):
            _val = datas[i]["val"]
            _timestamp = L2DataUtil.get_time_as_second(_val["time"])
            if _timestamp < second_930:
                continue
            # 间隔时间不能多于1s
            if L2DataUtil.is_limit_up_price_buy_cancel(_val) and (start_time is None or _timestamp - start_time < 2):
                if start == -1:
                    start = i
                    start_time = L2DataUtil.get_time_as_second(_val["time"])
                count += int(datas[i]["re"])
            elif not L2DataUtil.is_limit_up_price_sell(_val):
                if count >= continue_count:
                    return start, i - 1
                start = -1
                count = 0
                start_time = None
        if count >= continue_count:
            return start, end_index
        else:
            return None, None
    # 虚拟下单
    def __unreal_order(self):
        pass
    @classmethod
    def __get_threshmoney(cls, code):
        money, msg = l2_trade_factor.L2TradeFactorUtil.compute_m_value(code)
        return money
    # 获取预估挂买位
    @classmethod
    def __get_sure_order_pos(cls, code):
        index, data = trade_data_manager.TradeBuyDataManager.get_buy_sure_position(code)
        if index is None:
            return 0, len(local_today_datas[code]) - 1, local_today_datas[code][-1]
        else:
            return 1, index, data
    # 过时 统计买入净买量
    @classmethod
    def __sum_buy_num_for_order(cls, code, compute_start_index, origin_num, threshold_money):
        total_datas = local_today_datas[code]
        buy_nums = origin_num
        limit_up_price = gpcode_manager.get_limit_up_price(code)
        if limit_up_price is None:
            raise Exception("涨停价无法获取")
        threshold_num = threshold_money / (limit_up_price * 100)
        for i in range(compute_start_index, len(total_datas)):
            _val = total_datas[i]["val"]
            # 有连续4个涨停买就标记计算起始点
            if L2DataUtil.is_limit_up_price_buy(_val):
                # 涨停买
                buy_nums += int(_val["num"]) * int(total_datas[i]["re"])
                if buy_nums >= threshold_num:
                    cls.debug(code, "获取到买入执行点:{} 统计纯买手数:{} 目标纯买手数:{}", i, buy_nums, threshold_num)
                    return i, buy_nums
            elif L2DataUtil.is_limit_up_price_buy_cancel(_val):
                # 涨停买撤
                buy_nums -= int(_val["num"]) * int(total_datas[i]["re"])
        cls.debug(code, "尚未获取到买入执行点,起始计算位置:{} 统计纯买手数:{} 目标纯买手数:{}", compute_start_index, buy_nums,
                  threshold_num)
        return None, buy_nums
    # 过时 统计买入净买量,不计算在买入信号之前的买撤单
    @classmethod
    def __sum_buy_num_for_order_2(cls, code, compute_start_index, origin_num, threshold_money, buy_single_index):
        total_datas = local_today_datas[code]
        buy_nums = origin_num
        limit_up_price = gpcode_manager.get_limit_up_price(code)
        if limit_up_price is None:
            raise Exception("涨停价无法获取")
        threshold_num = threshold_money / (limit_up_price * 100)
        property_buy_num_count = 0
        same_time_property = cls.__get_same_time_property(code)
        for i in range(compute_start_index, len(total_datas)):
            data = total_datas[i]
            _val = total_datas[i]["val"]
            # 有连续4个涨停买就标记计算起始点
            if L2DataUtil.is_limit_up_price_buy(_val):
                # 涨停买
                buy_nums += int(_val["num"]) * int(total_datas[i]["re"])
                if buy_nums >= threshold_num:
                    logger_l2_trade_buy.info("{}获取到买入执行点:{} 统计纯买手数:{} 目标纯买手数:{}", code, i, buy_nums, threshold_num)
            elif L2DataUtil.is_limit_up_price_buy_cancel(_val):
                # 涨停买撤
                # 判断买入位置是否在买入信号之前
                buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(total_datas[i],
                                                                                 local_today_num_operate_map.get(code))
                if buy_index is not None:
                    # 找到买撤数据的买入点
                    if buy_index >= buy_single_index:
                        buy_nums -= int(_val["num"]) * int(data["re"])
                        cls.buy_debug(code, "{}数据在买入信号之后 撤买纯买手数:{} 目标手数:{}", i, buy_nums, threshold_num)
                    else:
                        cls.buy_debug(code, "{}数据在买入信号之前,买入位:{}", i, buy_index)
                        if total_datas[buy_single_index]["val"]["time"] == buy_data["val"]["time"]:
                            # 同一秒,而且还在预估买入位之后按概率计算
                            property_buy_num_count -= int(_val["num"]) * int(data["re"])
                            cls.buy_debug(code, "{}数据买入位与预估买入位在同一秒", i)
                else:
                    # 未找到买撤数据的买入点
                    cls.cancel_debug(code, "未找到买撤数据的买入点: 位置-{} 数据-{}", i, data)
                    buy_nums -= int(_val["num"]) * int(total_datas[i]["re"])
            property_buy_num = round(property_buy_num_count * same_time_property)
            cls.buy_debug(code, "买入信号点之前同一秒买入手数-{},位置-{},总手数:{},目标手数:{}", property_buy_num, i,
                          buy_nums + property_buy_num, threshold_num)
            # 有撤单信号,且小于阈值
            if buy_nums + property_buy_num >= threshold_num:
                return i, buy_nums + property_buy_num
        cls.buy_debug(code, "尚未获取到买入执行点,起始计算位置:{} 统计纯买手数:{} 目标纯买手数:{}", compute_start_index,
                      buy_nums + property_buy_num,
                      threshold_num)
        return None, buy_nums + property_buy_num
    # 统计买入净买量,不计算在买入信号之前的买撤单
    @classmethod
    def __sum_buy_num_for_order_3(cls, code, compute_start_index, compute_end_index, origin_num, threshold_money,
                                  buy_single_index,
                                  capture_time):
        total_datas = local_today_datas[code]
        buy_nums = origin_num
        limit_up_price = gpcode_manager.get_limit_up_price(code)
        if limit_up_price is None:
            raise Exception("涨停价无法获取")
        threshold_num = threshold_money / (limit_up_price * 100)
        buy_single_time_seconds = L2DataUtil.get_time_as_second(total_datas[buy_single_index]["val"]["time"])
        for i in range(compute_start_index, compute_end_index + 1):
            data = total_datas[i]
            _val = total_datas[i]["val"]
            if L2DataUtil.get_time_as_second(_val["time"]) - buy_single_time_seconds > 1:
                TradePointManager.delete_buy_point(code)
                if i == compute_end_index:
                    # 数据处理完毕
                    return None, buy_nums, None
                else:
                    # 计算买入信号,不能同一时间开始计算
                    for ii in range(buy_single_index + 1, compute_end_index + 1):
                        if total_datas[buy_single_index]["val"]["time"] != total_datas[ii]["val"]["time"]:
                            return None, buy_nums, ii
            # 涨停买
            if L2DataUtil.is_limit_up_price_buy(_val):
                # 涨停买
                buy_nums += int(_val["num"]) * int(total_datas[i]["re"])
                if buy_nums >= threshold_num:
                    logger_l2_trade_buy.info("{}获取到买入执行点:{} 统计纯买手数:{} 目标纯买手数:{}", code, i, buy_nums, threshold_num)
            elif L2DataUtil.is_limit_up_price_buy_cancel(_val):
                # 涨停买撤
                # 判断买入位置是否在买入信号之前
                buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(total_datas[i],
                                                                                 local_today_num_operate_map.get(code))
                if buy_index is not None:
                    # 找到买撤数据的买入点
                    if buy_index >= buy_single_index:
                        buy_nums -= int(_val["num"]) * int(data["re"])
                        cls.buy_debug(code, "{}数据在买入信号之后 撤买纯买手数:{} 目标手数:{}", i, buy_nums, threshold_num)
                    else:
                        cls.buy_debug(code, "{}数据在买入信号之前,买入位:{}", i, buy_index)
                        if total_datas[buy_single_index]["val"]["time"] == buy_data["val"]["time"]:
                            # 同一秒,当作买入信号之后处理
                            buy_nums -= int(_val["num"]) * int(data["re"])
                            cls.buy_debug(code, "{}数据买入位与预估买入位在同一秒", i)
                else:
                    # 未找到买撤数据的买入点
                    cls.buy_debug(code, "未找到买撤数据的买入点: 位置-{} 数据-{}", i, data)
                    buy_nums -= int(_val["num"]) * int(total_datas[i]["re"])
            cls.buy_debug(code, "位置-{},总手数:{},目标手数:{}", i,
                          buy_nums, threshold_num)
            # 有撤单信号,且小于阈值
            if buy_nums >= threshold_num:
                return i, buy_nums, None
        cls.buy_debug(code, "尚未获取到买入执行点,起始计算位置:{} 统计纯买手数:{} 目标纯买手数:{}", compute_start_index,
                      buy_nums,
                      threshold_num)
        return None, buy_nums, None
    # 计算买入信号之前的且和买入信号数据在同一时间的数量
    @classmethod
    def __count_l2_data_before_for_cancel(cls, code, buy_single_index):
        total_data = local_today_datas[code]
        single_time = total_data[buy_single_index]["val"]["time"]
        buy_count = 0
        cancel_count = 0
        for i in range(buy_single_index, -1, -1):
            if single_time == total_data[i]["val"]["time"]:
                if L2DataUtil.is_limit_up_price_buy(total_data[i]["val"]):
                    buy_count += int(total_data[i]["re"])
                elif L2DataUtil.is_limit_up_price_buy(total_data[i]["val"]):
                    cancel_count += int(total_data[i]["re"])
            else:
                break
        return buy_count, cancel_count
    @classmethod
    def __count_l2_data_for_cancel(cls, code, start_index, end_index):
        total_data = local_today_datas[code]
        buy_count = 0
        cancel_count = 0
        for i in range(start_index, end_index + 1):
            if L2DataUtil.is_limit_up_price_buy(total_data[i]["val"]):
                buy_count += int(total_data[i]["re"])
            elif L2DataUtil.is_limit_up_price_buy_cancel(total_data[i]["val"]):
                cancel_count += int(total_data[i]["re"])
        return buy_count, cancel_count
    # 同一时间买入的概率计算
    @classmethod
    def __get_same_time_property(cls, code):
        # 计算板块热度
        industry = global_util.code_industry_map.get(code)
        if industry is not None:
            hot_num = global_util.industry_hot_num.get(industry)
            if hot_num is not None:
                return 1 - l2_trade_factor.L2TradeFactorUtil.get_industry_rate(hot_num)
        return 0.5
    # 过时 统计买撤净买量
    @classmethod
    def __sum_buy_num_for_cancel_order(cls, code, start_index, origin_num, threshold_money, cancel_single=True):
        buy_nums = origin_num
        total_datas = local_today_datas[code]
        limit_up_price = gpcode_manager.get_limit_up_price(code)
        if limit_up_price is None:
            raise Exception("涨停价无法获取")
        threshold_num = threshold_money / (limit_up_price * 100)
        # 获取预估挂买位 sure_type:0 虚拟挂买 1 实际挂买
        sure_type, sure_pos, sure_data = cls.__get_sure_order_pos(code)
        same_time_property = cls.__get_same_time_property(code)
        # 同一秒,在预估买入位之后的数据之和
        property_buy_num_count = 0
        cls.cancel_debug(code, "撤单纯买额计算位置:{}-{} 预估挂买位:{} 是否有撤单信号:{}", start_index, len(total_datas) - 1, sure_pos,
                         cancel_single)
        for i in range(start_index, len(total_datas)):
            data = total_datas[i]
            _val = data["val"]
            if L2DataUtil.is_limit_up_price_buy(_val):
                # 涨停买
                if i < sure_pos:
                    buy_nums += int(_val["num"]) * int(data["re"])
                elif sure_data["val"]["time"] == _val["time"]:
                    # 同一秒买入,而且还在预估买入位之后
                    property_buy_num_count += int(_val["num"]) * int(data["re"])
            elif L2DataUtil.is_limit_up_price_buy_cancel(_val):
                # 涨停撤买
                # 判断买入位置是否在买入信号之前
                buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data,
                                                                                 local_today_num_operate_map.get(code))
                if buy_index is not None:
                    # 找到买撤数据的买入点
                    if buy_index < sure_pos:
                        buy_nums -= int(_val["num"]) * int(data["re"])
                        cls.cancel_debug(code, "{}数据在预估买入位之前 撤买纯买手数:{} 目标手数:{}", i, buy_nums, threshold_num)
                    else:
                        cls.cancel_debug(code, "{}数据在预估买入位之后,买入位:{}", i, buy_index)
                        if sure_data["val"]["time"] == buy_data["val"]["time"]:
                            # 同一秒,而且还在预估买入位之后按概率计算
                            property_buy_num_count -= int(_val["num"]) * int(data["re"])
                            cls.debug(code, "{}数据买入位与预估买入位在同一秒", i)
                else:
                    # 未找到买撤数据的买入点
                    cls.cancel_debug(code, "未找到买撤数据的买入点: 位置-{} 数据-{}", i, data)
            property_buy_num = round(property_buy_num_count * same_time_property)
            cls.cancel_debug(code, "预估买入点之后同一秒买入手数-{},位置-{},总手数:{},目标手数:{}", property_buy_num, i,
                             buy_nums + property_buy_num, threshold_num)
            # 有撤单信号,且小于阈值
            if buy_nums + property_buy_num <= threshold_num and cancel_single:
                return i, buy_nums + property_buy_num, sure_type
        buy_num_news = buy_nums + round(property_buy_num_count * same_time_property)
        cls.cancel_debug(code, "处理起始位置:{} 最终纯买额:{}", start_index, buy_num_news)
        return None, buy_num_news, sure_type
        # 统计买撤净买量
    @classmethod
    def __count_num_for_cancel_order(cls, code, start_index, origin_buy_num, origin_cancel_num, min_rate,
                                     betch_cancel_single=True):
        buy_nums = origin_buy_num
        buy_cancel_num = origin_cancel_num
        total_datas = local_today_datas[code]
        limit_up_price = gpcode_manager.get_limit_up_price(code)
        if limit_up_price is None:
            raise Exception("涨停价无法获取")
        # 获取预估挂买位 sure_type:0 虚拟挂买 1 实际挂买
        for i in range(start_index, len(total_datas)):
            data = total_datas[i]
            _val = data["val"]
            if L2DataUtil.is_limit_up_price_buy(_val):
                # 涨停买
                buy_nums += int(data["re"])
            elif L2DataUtil.is_limit_up_price_buy_cancel(_val):
                buy_cancel_num += int(data["re"])
            # 有撤单信号,且小于阈值
            if (buy_nums - buy_cancel_num) / buy_cancel_num <= min_rate and betch_cancel_single:
                return i, buy_nums, buy_cancel_num
        return None, buy_nums, buy_cancel_num
    @classmethod
    def test(cls):
        code = "000593"
        load_l2_data(code, True)
        if False:
            state = trade_manager.get_trade_state(code)
            cls.random_key[code] = random.randint(0, 100000)
            capture_timestamp = 1999988888
            try:
                if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER:
                    # 已挂单
                    cls.__process_order(code, 201, 237, capture_timestamp)
                else:
                    # 未挂单
                    cls.__process_not_order(code, 201, 237, capture_timestamp)
            except Exception as e:
                logging.exception(e)
            return
        _start = t.time()
        # 按s批量化数据
        total_datas = local_today_datas[code]
        start_time = total_datas[0]["val"]["time"]
        start_index = 0
        for i in range(0, len(total_datas)):
            if total_datas[i]["val"]["time"] != start_time:
                cls.random_key[code] = random.randint(0, 100000)
                # 处理数据
                start = start_index
                # if start != 201:
                #     continue
                end = i - 1
                print("处理进度:{},{}".format(start, end))
                capture_timestamp = 1999999999
                state = trade_manager.get_trade_state(code)
                try:
                    if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER:
                        # 已挂单
                        cls.__process_order(code, start, end, capture_timestamp)
                    else:
                        # 未挂单
                        cls.__process_not_order(code, start, end, capture_timestamp)
                except Exception as e:
                    logging.exception(e)
                # t.sleep(1)
                start_index = i
                start_time = total_datas[i]["val"]["time"]
        print("时间花费:", round((t.time() - _start) * 1000))
    @classmethod
    def test1(cls):
        code = "000593"
        load_l2_data(code, True)
        print(cls.__compute_order_begin_pos(code, 232, 3, 239))
    @classmethod
    def test2(cls):
        code = "600082"
        load_l2_data(code, True)
        cls.random_key[code] = random.randint(0, 100000)
        need_cancel, cancel_data = L2BigNumProcessor.process_cancel_with_big_num(code, 121, 123)
    @classmethod
    def test_can_order(cls):
        code = "000948"
        global_data_loader.load_industry()
        limit_up_time_manager.load_limit_up_time()
        print(cls.__can_buy(code))
# 连续涨停买单数最大值管理器
class L2ContinueLimitUpCountManager:
    @classmethod
    def del_data(cls, code):
        cls.__del_last_record(code)
        cls.__del_max(code)
    # 获取最大值
    @classmethod
    def __get_max(cls, code):
        key = "max_same_time_buy_count-{}".format(code)
        redis = _redisManager.getRedis()
        val = redis.get(key)
        if val is not None:
            return int(val)
        else:
            return None
    # 保存最大值
    @classmethod
    def __save_max(cls, code, max_num):
        key = "max_same_time_buy_count-{}".format(code)
        redis = _redisManager.getRedis()
        redis.setex(key, tool.get_expire(), max_num)
    @classmethod
    def __del_max(cls, code):
        key = "max_same_time_buy_count-{}".format(code)
        redis = _redisManager.getRedis()
        redis.delete(key)
    # 保存上一条数据最大值
    @classmethod
    def __save_last_record(cls, code, _time, count, index):
        key = "same_time_buy_last_count-{}".format(code)
        redis = _redisManager.getRedis()
        redis.setex(key, tool.get_expire(), json.dumps((_time, count, index)))
    @classmethod
    def __del_last_record(cls, code):
        key = "same_time_buy_last_count-{}".format(code)
        redis = _redisManager.getRedis()
        redis.delete(key)
    @classmethod
    def __get_last_record(cls, code):
        key = "same_time_buy_last_count-{}".format(code)
        redis = _redisManager.getRedis()
        val = redis.get(key)
        if val is None:
            return None, None, None
        else:
            val = json.loads(val)
            return val[0], val[1], val[2]
    @classmethod
    def process(cls, code, start_index, end_index):
        last_time, last_count, last_index = cls.__get_last_record(code)
        total_datas = local_today_datas[code]
        time_count_dict = {}
        for index in range(start_index, end_index + 1):
            if last_index is not None and last_index >= index:
                continue
            if L2DataUtil.is_limit_up_price_buy(total_datas[index]["val"]):
                if last_count is None:
                    last_count = 0
                    last_time = total_datas[index]["val"]["time"]
                    last_index = index
                if last_time == total_datas[index]["val"]["time"]:
                    last_count += total_datas[index]["re"]
                    last_index = index
                else:
                    if last_count is not None and last_count > 0:
                        time_count_dict[last_time] = last_count
                    last_count = total_datas[index]["re"]
                    last_time = total_datas[index]["val"]["time"]
                    last_index = index
            else:
                if last_count is not None and last_count > 0:
                    time_count_dict[last_time] = last_count
                last_count = 0
                last_time = None
                last_index = None
        if last_count is not None and last_count > 0:
            time_count_dict[last_time] = last_count
            # 保存latest
            cls.__save_last_record(code, last_time, last_count, last_index)
        else:
            # 移除
            cls.__del_last_record(code)
        # 查找这批数据中的最大数量
        max_time = None
        max_num = None
        for key in time_count_dict:
            if max_time is None:
                max_time = key
                max_num = time_count_dict[key]
            if time_count_dict[key] > max_num:
                max_num = time_count_dict[key]
                max_time = key
        if max_num is not None:
            old_max = cls.__get_max(code)
            if old_max is None or max_num > old_max:
                cls.__save_max(code, max_num)
    @classmethod
    def get_continue_count(cls, code):
        count = cls.__get_max(code)
        if count is None:
            count = 0
        count = count // 3
        if count < 15:
            count = 15
        return count
# 大单处理器
class L2BigNumProcessor:
    # 是否需要根据大单撤单,返回是否需要撤单与撤单信号的数据
    @classmethod
    def __need_cancel_with_max_num(cls, code, max_num_info, start_index, end_index):
        if max_num_info is None:
            return False, None
        # 如果是买入单,需要看他前面同一秒是否有撤单
        if int(max_num_info["val"]["operateType"]) == 0:
            # 只有买撤信号在买入信号之前的同一秒的单才会撤单情况
            _map = local_today_num_operate_map.get(code)
            if _map is not None:
                cancel_datas = _map.get(
                    "{}-{}-{}".format(max_num_info["val"]["num"], "1", max_num_info["val"]["price"]))
                if cancel_datas is not None:
                    for cancel_data in cancel_datas:
                        # 只能在当前规定的数据范围查找,以防出现重复查找
                        if cancel_data["index"] < start_index or cancel_data["index"] > end_index:
                            continue
                        if cancel_data["index"] > max_num_info["index"]:
                            buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(cancel_data,
                                                                                             local_today_num_operate_map[
                                                                                                 code])
                            if buy_index is None:
                                continue
                            if buy_data["val"]["time"] != max_num_info["val"]["time"]:
                                continue
                            min_space, max_space = l2_data_util.compute_time_space_as_second(
                                cancel_data["val"]["cancelTime"],
                                cancel_data["val"][
                                    "cancelTimeUnit"])
                            if min_space < 60:
                                L2TradeDataProcessor.cancel_debug(code, "找到大单撤单,但撤单间隔时间小于60s,撤单数据-{}",
                                                                  json.dumps(cancel_data))
                                return True, cancel_data
                            else:
                                # 如果间隔时间大于等于60s,这判断小群撤事件
                                L2TradeDataProcessor.cancel_debug(code, "找到大单撤单,但撤单间隔时间大于60s,撤单数据-{}",
                                                                  json.dumps(cancel_data))
                                return False, cancel_data
            return False, None
        else:
            return True, None
    # 计算数量最大的涨停买/涨停撤
    @classmethod
    def __compute_max_num(cls, code, start_index, end_index, max_num_info, buy_exec_time):
        new_max_info = max_num_info
        max_num = 0
        if max_num_info is not None:
            max_num = int(max_num_info["val"]["num"])
        # 计算大单
        total_data = local_today_datas[code]
        for i in range(start_index, end_index + 1):
            data = total_data[i]
            val = data["val"]
            if not L2DataUtil.is_limit_up_price_buy(val) and not L2DataUtil.is_limit_up_price_buy_cancel(
                    val):
                continue
            # 下单时间与买入执行时间之差大于60s的不做处理
            if l2_data_util.get_time_as_seconds(val["time"]) - l2_data_util.get_time_as_seconds(buy_exec_time) > 1:
                continue
            if L2DataUtil.is_limit_up_price_buy(val):
                pass
            elif L2DataUtil.is_limit_up_price_buy_cancel(val):
                min_space, max_space = l2_data_util.compute_time_space_as_second(val["cancelTime"],
                                                                                 val["cancelTimeUnit"])
                # 只能处理1s内的撤单
                if min_space > 1:
                    continue
                # buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data,
                #                                                                  local_today_num_operate_map.get(code))
                # if buy_index is None:
                #     continue
                # if l2_data_util.get_time_as_seconds(buy_data["val"]["time"]) - l2_data_util.get_time_as_seconds(
                #         buy_exec_time) > 1:
                #     continue
            num = int(total_data[i]["val"]["num"])
            if num > max_num:
                max_num = num
                new_max_info = data
        return new_max_info
    @classmethod
    def __save_big_num_pos(cls, code, index):
        redis = _redisManager.getRedis()
        redis.setex("big_num_pos-{}".format(code), tool.get_expire(), index)
    @classmethod
    def __get_big_num_pos(cls, code):
        redis = _redisManager.getRedis()
        index = redis.get("big_num_pos-{}".format(code))
        if index is not None:
            return int(index)
        return index
    @classmethod
    def del_big_num_pos(cls, code):
        redis = _redisManager.getRedis()
        redis.delete("big_num_pos-{}".format(code))
    @classmethod
    def __cancel_buy(cls, code, index):
        L2TradeDataProcessor.debug(code, "撤买,触发位置-{},触发条件:大单,数据:{}", index, local_today_datas[code][index])
        L2TradeDataProcessor.cancel_buy(code)
    # 处理数据中的大单,返回是否已经撤单和撤单数据的时间
    @classmethod
    def process_cancel_with_big_num(cls, code, start_index, end_index):
        total_data = local_today_datas[code]
        # 如果无下单信号就无需处理
        buy_single_index, buy_exec_index, compute_index, nums = TradePointManager.get_buy_compute_start_data(code)
        if buy_single_index is None or buy_exec_index is None or buy_exec_index < 0:
            return False, None
        # 判断是否有大单记录
        index = cls.__get_big_num_pos(code)
        # 无大单记录
        if index is None:
            # 计算大单
            new_max_info = cls.__compute_max_num(code, start_index, end_index, None,
                                                 total_data[buy_exec_index]["val"]["time"])
            if new_max_info is None:
                return False, None
            L2TradeDataProcessor.debug(code, "获取到大单位置信息:{}", json.dumps(new_max_info))
            index = new_max_info["index"]
            # 大单是否有撤单信号
            need_cancel, cancel_data = cls.__need_cancel_with_max_num(code, new_max_info, start_index, end_index)
            if need_cancel:
                # 需要撤单
                # 撤单
                L2TradeDataProcessor.cancel_debug(code, "新找到大单-{},需要撤买", new_max_info["index"])
                cls.__cancel_buy(code, new_max_info["index"])
                return True, cancel_data,
            else:
                # 无需撤单
                # 保存大单记录
                cls.__save_big_num_pos(code, index)
                return False, None
        else:
            # 有大单记录
            need_cancel = False
            cancel_index = -1
            need_cancel, cancel_data = cls.__need_cancel_with_max_num(code, total_data[index], start_index, end_index)
            # 需要撤单
            if need_cancel:
                # 撤单
                cls.__cancel_buy(code, cancel_index)
                return True, cancel_data
            # 无需撤单
            else:
                # 计算新的大单
                max_num_data = cls.__compute_max_num(code, start_index, end_index, total_data[index],
                                                     total_data[buy_exec_index]["val"]["time"])
                if index == int(max_num_data["index"]):
                    return False, cancel_data
                L2TradeDataProcessor.debug(code, "找到大单位置信息:{}", json.dumps(max_num_data))
                # 大单是否有撤单信号
                need_cancel, cancel_data = cls.__need_cancel_with_max_num(code, max_num_data, max_num_data["index"],
                                                                          end_index)
                if need_cancel:
                    # 需要撤单
                    # 撤单
                    cls.__cancel_buy(code, max_num_data["index"] if cancel_data is None else cancel_data)
                    L2TradeDataProcessor.cancel_debug(code, "原来跟踪到大单无撤买信号-{},新跟踪的大单需要撤买-{}", index,
                                                      max_num_data["index"])
                    return True, cancel_data
                else:
                    # 无需撤单
                    # 保存大单记录
                    cls.__save_big_num_pos(code, max_num_data["index"])
                    return False, cancel_data
    @classmethod
    def test(cls):
        code = "000036"
        load_l2_data(code, True)
        new_max_info = cls.__compute_max_num(code, 470, 476, None, "09:32:59")
        print(new_max_info)
# 大群撤大单跟踪
class L2BetchCancelBigNumProcessor:
    @classmethod
    def __get_recod(cls, code):
        redis = _redisManager.getRedis()
        _val = redis.get("betch_cancel_big_num-{}".format(code))
        if _val is None:
            return None, None
        else:
            datas = json.loads(_val)
            return datas[0], datas[1]
    @classmethod
    def del_recod(cls, code):
        redis = _redisManager.getRedis()
        key = "betch_cancel_big_num-{}".format(code)
        redis.delete(key)
    @classmethod
    def __save_recod(cls, code, max_big_num_info, big_nums_info):
        redis = _redisManager.getRedis()
        key = "betch_cancel_big_num-{}".format(code)
        redis.setex(key, tool.get_expire(), json.dumps((max_big_num_info, big_nums_info)))
    # 暂时弃用
    @classmethod
    def need_cancel(cls, code, start_index, end_index):
        # 是否需要撤单
        max_big_num_info, big_nums_info = cls.__get_recod(code)
        if big_nums_info is None:
            # 无大单信息
            return True
        nums_set = set()
        index_set = set()
        for d in big_nums_info:
            nums_set.add(d[0])
            index_set.add(d[1])
        total_datas = local_today_datas[code]
        count = 0
        latest_buy_index = end_index
        for index in range(start_index, end_index + 1):
            if not nums_set.__contains__(total_datas[index]["val"]["num"]):
                continue
            buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(total_datas[index],
                                                                             local_today_num_operate_map[code])
            if buy_index is None:
                continue
            if index_set.__contains__(buy_index):
                count += buy_data["re"]
                latest_buy_index = buy_index
        # 获取大单数量
        total_count = 0
        for i in index_set:
            if i <= latest_buy_index:
                total_count += total_datas[i]["re"]
        L2TradeDataProcessor.debug(code, "大群撤大单数量:{}/{}", count, total_count)
        # 大单小于5笔无脑撤,后修改为无大单无脑撤
        if total_count <= 0:
            return True
        # 大单撤单笔数大于总大单笔数的1/5就撤单
        if count / total_count >= 0.2:
            return True
        else:
            return False
        pass
    # def need_cancel(cls, code, start_index, end_index):
    #     total_datas = local_today_datas[code]
    #     for index in range(start_index,end_index+1):
    #         price = total_datas[index]["val"]["price"]
    #         num = total_datas[index]["val"]["num"]
    #         if total_datas[index]
    # 过时
    @classmethod
    def process(cls, code, start_index, end_index):
        # 处理大单
        # 获取大单列表,大单格式为:((num,index,re),[(num,index,re),(num,index,re)])
        total_datas = local_today_datas[code]
        max_big_num_info, big_nums_info = cls.__get_recod(code)
        # 寻找最大值
        for index in range(start_index, end_index + 1):
            # 只处理涨停买与涨停买撤
            if not L2DataUtil.is_limit_up_price_buy(
                    total_datas[index]["val"]):
                continue
            if max_big_num_info is None:
                max_big_num_info = (
                    int(total_datas[start_index]["val"]["num"]), total_datas[start_index]["index"])
            if int(total_datas[index]["val"]["num"]) > max_big_num_info[0]:
                max_big_num_info = (
                    int(total_datas[index]["val"]["num"]), total_datas[index]["index"])
        # 将大于最大值90%的数据加入
        if max_big_num_info is not None:
            min_num = round(max_big_num_info[0] * 0.9)
            for index in range(start_index, end_index + 1):
                # 只统计涨停买
                if not L2DataUtil.is_limit_up_price_buy(
                        total_datas[index]["val"]):
                    continue
                if int(total_datas[index]["val"]["num"]) >= min_num:
                    if big_nums_info is None:
                        big_nums_info = []
                    big_nums_info.append((int(total_datas[index]["val"]["num"]), total_datas[index]["index"]))
            # 移除小于90%的数据
            big_nums_info_new = []
            index_set = set()
            for d in big_nums_info:
                if d[0] >= min_num:
                    if not index_set.__contains__(d[1]):
                        index_set.add(d[1])
                        big_nums_info_new.append(d)
            cls.__save_recod(code, max_big_num_info, big_nums_info_new)
    # 最新方法
    @classmethod
    def process_new(cls, code, start_index, end_index):
        # 处理大单
        # 获取大单列表,大单格式为:((num,index,re),[(num,index,re),(num,index,re)])
        total_datas = local_today_datas[code]
        max_big_num_info, big_nums_info = cls.__get_recod(code)
        # 大于等于8000手或者金额>=300万就是大单
        for index in range(start_index, end_index + 1):
            # 只统计涨停买
            if not L2DataUtil.is_limit_up_price_buy(
                    total_datas[index]["val"]):
                continue
            # 大于等于8000手或者金额 >= 300
            # 万就是大单
            if int(total_datas[index]["val"]["num"]) >= 8000 or int(total_datas[index]["val"]["num"]) * float(
                    total_datas[index]["val"]["price"]) >= 30000:
                if big_nums_info is None:
                    big_nums_info = []
                big_nums_info.append((int(total_datas[index]["val"]["num"]), total_datas[index]["index"]))
        # 移除小于90%的数据
        big_nums_info_new = []
        index_set = set()
        if big_nums_info is not None:
            for d in big_nums_info:
                if not index_set.__contains__(d[1]):
                    index_set.add(d[1])
                    big_nums_info_new.append(d)
            cls.__save_recod(code, max_big_num_info, big_nums_info_new)
# 卖跟踪
class L2SellProcessor:
    @classmethod
    def __get_recod(cls, code):
        redis = _redisManager.getRedis()
        _val = redis.get("sell_num-{}".format(code))
        if _val is None:
            return None, None
        else:
            datas = json.loads(_val)
            return datas[0], datas[1]
    @classmethod
    def del_recod(cls, code):
        redis = _redisManager.getRedis()
        key = "sell_num-{}".format(code)
        redis.delete(key)
    @classmethod
    def __save_recod(cls, code, process_index, count):
        redis = _redisManager.getRedis()
        key = "sell_num-{}".format(code)
        redis.setex(key, tool.get_expire(), json.dumps((process_index, count)))
    # 暂时弃用
    @classmethod
    def need_cancel(cls, code, start_index, end_index):
        # 是否需要撤单
        process_index, count = cls.__get_recod(code)
        if process_index is None:
            # 无卖的信息
            return False
        if count is None:
            count = 0
        limit_up_price = gpcode_manager.get_limit_up_price(code)
        if limit_up_price is None:
            return False
        if float(limit_up_price) * count * 100 >= l2_trade_factor.L2TradeFactorUtil.get_base_safe_val(
                global_util.zyltgb_map[code]):
            return True
        return False
    @classmethod
    def process(cls, code, start_index, end_index):
        # 处理大单
        # 获取大单列表,大单格式为:((num,index,re),[(num,index,re),(num,index,re)])
        total_datas = local_today_datas[code]
        process_index, count = cls.__get_recod(code)
        # 寻找最大值
        for index in range(start_index, end_index + 1):
            # 只处理涨停卖
            if not L2DataUtil.is_limit_up_price_sell(
                    total_datas[index]["val"]):
                continue
            # 不处理历史数据
            if process_index is not None and process_index >= index:
                continue
            if count is None:
                count = 0
            count += int(total_datas[index]["val"]["num"])
        if process_index is None:
            process_index = end_index
        cls.__save_recod(code, process_index, count)
def __get_time_second(time_str):
@@ -601,7 +2078,7 @@
                        index_2 = j
                        break
            if index_1 - index_0 == 1 and index_2 - index_1 == 1:
                logger_l2_trade.info("连续3个涨停买撤 {},{},{}".format(code, i, json.dumps(datas[i])))
                # logger_l2_trade.info("连续3个涨停买撤 {},{},{}".format(code, i, json.dumps(datas[i])))
                return i
    return None
@@ -697,25 +2174,4 @@
if __name__ == "__main__":
    # 删除大数据
    redis = redis_manager.RedisManager(1).getRedis()
    keys = redis.keys("big_data*")
    for key in keys:
        redis.delete(key)
    # print("big_data-{}-{}".format("123", int(round(t.time() * 1000))))
    # load_l2_data("002868")
    # keys= local_today_num_operate_map["002868"]
    # for k in keys:
    #     print(len( local_today_num_operate_map["002868"][k]))
    # pass
    # __set_buy_compute_start_data("000000", 100, 1)
    # __set_buy_compute_start_data("000000", 100)
    # __set_l2_data_latest_count("000333", 20)
    # print(type(get_l2_data_latest_count("000333")))
    # datas = ["2", "3", "4", "5"]
    # print(datas[4:])
    # print(decimal.Decimal("19.294").quantize(decimal.Decimal("0.00"), decimal.ROUND_HALF_UP))
    # 获取增量数据
    # 保存数据
    # 拼接数据
    clear_l2_data("603912")