Administrator
2022-09-30 394e24aaba9b28c6427444d47aab6505c735fb7e
L2数据处理完善
9个文件已修改
1个文件已添加
1234 ■■■■ 已修改文件
big_money_num_manager.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
code_data_util.py 22 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
data_process.py 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
global_util.py 13 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
juejin.py 13 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_data_manager.py 1098 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_data_util.py 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_trade_factor.py 57 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
server.py 11 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade_manager.py 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
big_money_num_manager.py
@@ -25,5 +25,5 @@
if __name__ == "__main__":
    add_num("000332",0)
    add_num("000332", 0)
    expire("000332")
code_data_util.py
New file
@@ -0,0 +1,22 @@
# 股票代码相关的参数
import redis_manager
_redisManager = redis_manager.RedisManager(0)
# 自由流通股本工具类
class ZYLTGBUtil:
    @classmethod
    def save(cls, code, val, unit):
        redis = _redisManager.getRedis()
        redis.setex("zyltgb-{}".format(code), 60 * 60 * 24 * 10,
                    round(float(val) * 100000000) if int(unit) == 0 else round(
                        float(val) * 10000))
    @classmethod
    def get(cls, code):
        redis = _redisManager.getRedis()
        val = redis.get(code)
        if val is not None:
            return int(val)
        return None
data_process.py
@@ -15,6 +15,7 @@
# 统计今日卖出
# 统计今日买入
import tool
from code_data_util import ZYLTGBUtil
__redisManager = redis_manager.RedisManager(0)
@@ -154,6 +155,7 @@
# 保存自由流通市值
def saveZYLTSZ(datasList):
    redis = __redisManager.getRedis()
    _list = []
    for data in datasList:
        # 保存
@@ -161,6 +163,8 @@
                 "update_time": int(round(t.time() * 1000))}
        if float(data["zyltgb"]) > 0:
            _list.append(_dict)
            # 保存10天
            ZYLTGBUtil.save(data["code"],data["zyltgb"],data["zyltgb_unit"])
    mongo_data.save("ths-zylt", _list)
@@ -189,7 +193,7 @@
    val = redis.get("client-active-{}".format(client_id))
    if val is None:
        return None
    val=json.loads(val)
    val = json.loads(val)
    return val[0]
global_util.py
@@ -5,6 +5,7 @@
import code_volumn_manager
import gpcode_manager
import ths_industry_util
from code_data_util import ZYLTGBUtil
TEST = False
@@ -48,15 +49,9 @@
def load_zyltgb():
    codes = gpcode_manager.get_gp_list()
    for code in codes:
        results = mongo_data.find("ths-zylt", {"_id": code})
        if results is not None:
            results = [doc for doc in results]
            if len(results) > 0:
                result = results[0]
                if result["zyltgb_unit"] == 0:
                    zyltgb_map[code] = round(float(result["zyltgb"]) * 100000000)
                else:
                    zyltgb_map[code] = round(float(result["zyltgb"]) * 10000)
        result = ZYLTGBUtil.get(code)
        if result is not None:
            zyltgb_map[code]=result
# 加载量
juejin.py
@@ -76,6 +76,13 @@
    # 清除涨停时间
    global_util.limit_up_time.clear()
    init_data()
    # 初始化同花顺主站
    l2_clients = data_process.getValidL2Clients()
    for client in l2_clients:
        server.repair_ths_main_site(client)
def __run_schedule():
@@ -91,8 +98,7 @@
    # 获取需要监听的股票
    init_data()
    logger_system.info("掘金初始化")
    schedule.every().day.at("09:00:00").do(everyday_init)
    schedule.every().day.at("14:37:00").do(everyday_init)
    schedule.every().day.at("09:15:00").do(everyday_init)
    t1 = threading.Thread(target=lambda: __run_schedule())
    # 后台运行
    t1.setDaemon(True)
@@ -376,5 +382,4 @@
if __name__ == '__main__':
    accpt_prices_before_open([{"code": "001332", "price": "82.37"}, {"code": "002246", "price": "10.10"},
                              {"code": "600537", "price": "6.65"}])
    pass
l2_data_manager.py
@@ -18,7 +18,7 @@
import redis_manager
import tool
import trade_manager
from log import logger_l2_trade, logger_l2_trade_cancel, logger_l2_trade_buy
from log import logger_l2_trade, logger_l2_trade_cancel, logger_l2_trade_buy, logger_l2_process
from trade_data_manager import TradeBuyDataManager
import limit_up_time_manager
@@ -142,6 +142,30 @@
        redis = TradePointManager.__get_redis()
        redis.delete("compute_info_for_cancel_buy-{}".format(code))
    # 从买入信号开始设置涨停买与涨停撤的单数
    @classmethod
    def set_count_info_for_cancel_buy(cls, code, index, buy_count, cancel_count):
        redis = TradePointManager.__get_redis()
        expire = tool.get_expire()
        redis.setex("count_info_for_cancel_buy-{}".format(code), expire, json.dumps((index, buy_count, cancel_count)))
        logger_l2_trade_buy.info("{}保存撤单纯买额信息:{},{}", code, index, buy_count, cancel_count)
    # 获取买撤纯买额计算信息
    @classmethod
    def get_count_info_for_cancel_buy(cls, code):
        redis = TradePointManager.__get_redis()
        info = redis.get("count_info_for_cancel_buy-{}".format(code))
        if info is None:
            return None, 0, 0
        else:
            info = json.loads(info)
            return info[0], info[1], info[2]
    @classmethod
    def delete_count_info_for_cancel_buy(cls, code):
        redis = TradePointManager.__get_redis()
        redis.delete("count_info_for_cancel_buy-{}".format(code))
def load_l2_data(code, force=False):
    redis = _redisManager.getRedis()
@@ -208,11 +232,6 @@
    return datas
# TODO 获取l2的数据
def get_l2_data_index(code, key):
    pass
def parseL2Data(str):
    day = datetime.now().strftime("%Y%m%d")
    dict = json.loads(str)
@@ -247,7 +266,6 @@
class L2DataUtil:
    @classmethod
    def is_same_time(cls, time1, time2):
        # TODO 测试
        if global_util.TEST:
            return True
        time1_s = time1.split(":")
@@ -331,8 +349,11 @@
            num = item["num"]
            limitPrice = item["limitPrice"]
            # 涨停价
            if limit_up_price is not None and limit_up_price == tool.to_price(decimal.Decimal(price)):
                limitPrice = 1
            if limit_up_price is not None:
                if limit_up_price == tool.to_price(decimal.Decimal(price)):
                    limitPrice = 1
                else:
                    limitPrice = 0
                item["limitPrice"] = "{}".format(limitPrice)
            operateType = item["operateType"]
            cancelTime = item["cancelTime"]
@@ -353,6 +374,11 @@
    def get_time_as_second(cls, time_str):
        ts = time_str.split(":")
        return int(ts[0]) * 3600 + int(ts[1]) * 60 + int(ts[2])
    # @classmethod
    # def get_time_as_str(cls, time_seconds):
    #     ts = time_str.split(":")
    #     return int(ts[0]) * 3600 + int(ts[1]) * 60 + int(ts[2])
    # 是否是涨停价买
    @classmethod
@@ -383,13 +409,6 @@
        if price * num * 100 < 50 * 10000:
            return False
        return True
    @staticmethod
    def is_index_end(code, index):
        if index >= len(local_today_datas[code]) - 1:
            return True
        else:
            return False
# L2交易数据处理器
@@ -427,6 +446,7 @@
        __start_time = round(t.time() * 1000)
        try:
            if len(datas) > 0:
                # 判断价格区间是否正确
                if not data_process.is_same_code_with_price(code, float(datas[0]["val"]["price"])):
                    raise L2DataException(L2DataException.CODE_PRICE_ERROR,
@@ -444,25 +464,31 @@
                    local_today_datas[code].extend(add_datas)
                    l2_data_util.load_num_operate_map(local_today_num_operate_map, code, add_datas)
                total_datas = local_today_datas[code]
                # 买入确认点处理
                TradeBuyDataManager.process_buy_sure_position_info(code, capture_timestamp, total_datas,
                                                                   total_datas[-1],
                                                                   add_datas)
                # 过时 买入确认点处理
                # TradeBuyDataManager.process_buy_sure_position_info(code, capture_timestamp, total_datas,
                #                                                    total_datas[-1],
                #                                                    add_datas)
                if len(add_datas) > 0:
                    _start_time = round(t.time() * 1000)
                    # 计算大单数量
                    cls.__compute_big_money_data(code, add_datas)
                    latest_time = add_datas[len(add_datas) - 1]["val"]["time"]
                    # 时间差不能太大才能处理
                    # TODO 暂时关闭处理
                    if L2DataUtil.is_same_time(now_time_str, latest_time):
                        # 判断是否已经挂单
                        state = trade_manager.get_trade_state(code)
                        start_index = len(total_datas) - len(add_datas)
                        end_index = len(total_datas)-1
                        if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER:
                            # 已挂单
                            cls.__process_order(code, len(total_datas) - len(add_datas) - 3, capture_timestamp)
                            cls.__process_order(code, start_index,end_index, capture_timestamp)
                        else:
                            # 未挂单
                            cls.__process_not_order(code, add_datas, capture_timestamp)
                            cls.__process_not_order(code,start_index,end_index,capture_timestamp)
                    logger_l2_process.info("code:{} 处理数据范围: {}-{} 处理时间:{}", code, add_datas[0]["index"],
                                           add_datas[-1]["index"], round(t.time() * 1000) - __start_time)
                # 保存数据
                save_l2_data(code, datas, add_datas)
        finally:
@@ -483,48 +509,140 @@
    # 处理未挂单
    @classmethod
    def __process_not_order(cls, code, add_datas, capture_time):
    def __process_not_order(cls, code, start_index, end_index, capture_time):
        # 获取阈值
        threshold_money = cls.__get_threshmoney(code)
        cls.__start_compute_buy(code, len(local_today_datas[code]) - len(add_datas), threshold_money, capture_time)
        cls.__start_compute_buy(code, start_index, end_index, threshold_money, capture_time)
    @classmethod
    def __statistic_count_l2_data_for_cancel(cls, code, start_index, end_index, has_cancel_single=False):
        index, old_buy_count, old_cancel_count = TradePointManager.get_count_info_for_cancel_buy(code)
        for i in range(start_index, end_index + 1):
            buy_count, buy_cancel_count = cls.__count_l2_data_for_cancel(code, i, i)
            old_buy_count += buy_count
            old_cancel_count += buy_cancel_count
            if old_buy_count > 0 and (old_buy_count - old_cancel_count) / old_buy_count < 0.3 and has_cancel_single:
                return i, True
        TradePointManager.set_count_info_for_cancel_buy(code, end_index, old_buy_count,
                                                        old_cancel_count)
        return end_index, False
    # 处理已挂单
    @classmethod
    def __process_order(cls, code, start_index, capture_time):
    def __process_order(cls, code, start_index, end_index, capture_time, new_add=True):
        if start_index < 0:
            start_index = 0
        if end_index < start_index:
            return
        # 获取之前是否有记录的撤买信号
        cancel_index = TradePointManager.get_buy_cancel_single_pos(code)
        #  cancel_index = TradePointManager.get_buy_cancel_single_pos(code)
        cancel_computed_index, cancel_buy_num = TradePointManager.get_compute_info_for_cancel_buy(code)
        if cancel_computed_index is None:
            logger_l2_trade.error("{} 未获取到买撤纯买额,起始计算位:{}", code, start_index)
        # cancel_computed_index, cancel_buy_num = TradePointManager.get_compute_info_for_cancel_buy(code)
        # if cancel_computed_index is None:
        #     logger_l2_trade.error("{} 未获取到买撤纯买额,起始计算位:{}", code, start_index)
        buy_single_index, buy_exec_index, buy_compute_index, buy_num = cls.__get_order_begin_pos(code)
        if cancel_index is None:
            # 无撤单信号起始点记录
            cancel_index = cls.__compute_order_cancel_begin_single(code, max(start_index - 3, 0), 3)
            if cancel_index is not None:
                cls.debug(code, "找到撤单信号-{},买入信号为 ,数据处理起始点:{}", cancel_index, start_index)
                # 保存撤单信号
                TradePointManager.set_buy_cancel_single_pos(code, cancel_index)
            # 获取阈值 有买撤信号,统计撤买纯买额
        threshold_money = cls.__get_threshmoney(code)
        cls.__start_compute_cancel(code, cancel_index, cancel_computed_index + 1,
                                   cancel_buy_num,
                                   threshold_money,
                                   capture_time)
        # 统计群撤大单
        L2BetchCancelBigNumProcessor.process_new(code, start_index, end_index)
        # 是否有虚拟下单
        unreal_buy_info = cls.unreal_buy_dict.get(code)
        if unreal_buy_info is not None:
            cls.debug(code, "有虚拟下单,无买撤信号,开始执行买入,截图时间:{}", capture_time)
            # unreal_buy_info 的内容格式为:(触法买操作下标,截图时间)
            # 真实下单
            cls.__buy(code, unreal_buy_info[1], local_today_datas[code][unreal_buy_info[0]],
                      unreal_buy_info[0])
        # 统计最大连续买单
        L2ContinueLimitUpCountManager.process(code, start_index, end_index)
    # 开始计算撤的信号
        # 计算大单撤销
        need_cancel, cancel_data = L2BigNumProcessor.process_cancel_with_big_num(code, start_index, end_index)
        if need_cancel:
            # 已经撤单了
            threshold_money = cls.__get_threshmoney(code)
            # 重新处理下单
            cls.__start_compute_buy(code, cancel_data["index"] + 1, end_index, threshold_money, capture_time)
            return
        # buy_single_index, buy_exec_index, buy_compute_index, buy_num = cls.__get_order_begin_pos(code)
        # if cancel_index is None:
        # 无撤单信号起始点记录
        continue_cancel = L2ContinueLimitUpCountManager.get_continue_count(code)
        order_cancel_begin_start = max(start_index - (continue_cancel - 1),
                                       0) if new_add else start_index
        order_cancel_begin_end = end_index
        total_datas = local_today_datas[code]
        little_cancel = False
        # 大单撤单的数据不为空
        if cancel_data is not None:
            # 小群撤事件
            continue_cancel = 5
            cancel_time_seconds = L2DataUtil.get_time_as_second(cancel_data["val"]["time"])
            # 查找上一秒与下一秒
            for i in range(int(cancel_data["index"]), 0, -1):
                # 查找上一秒和下一秒
                if total_datas[i]["val"]["time"] != cancel_data["val"][
                    "time"] and cancel_time_seconds - L2DataUtil.get_time_as_second(total_datas[i]["val"]["time"]) > 1:
                    order_cancel_begin_start = i + 1
                    break
            for i in range(int(cancel_data["index"]), len(local_today_datas[code])):
                # 查找上一秒和下一秒
                if total_datas[i]["val"]["time"] != cancel_data["val"]["time"] and L2DataUtil.get_time_as_second(
                        total_datas[i]["val"]["time"]) - cancel_time_seconds > 1:
                    order_cancel_begin_end = i - 1
                    break
            cls.cancel_debug(code, "小群撤事件计算范围:{},{}", order_cancel_begin_start, order_cancel_begin_end)
            little_cancel = True
        cancel_start_index = None
        cancel_end_index = None
        need_cancel = False
        if little_cancel:
            # 小群撤事件
            cancel_start_index, cancel_end_index = cls.__compute_order_cancel_little_begin_single(code,
                                                                                                  order_cancel_begin_start
                                                                                                  , continue_cancel,
                                                                                                  order_cancel_begin_end)
            if cancel_start_index is not None:
                cls.debug(code, "找到小群撤信号,撤单信号范围:{}-{}", cancel_start_index, cancel_end_index)
                # 有小群撤信号
                need_cancel = True
            else:
                # 不满足小群撤,从小群撤后面一条数据继续处理
                cls.__process_order(code, cancel_data["index"] + 1, end_index, capture_time, False)
                return
        else:
            # 大群撤事件
            cancel_start_index, cancel_end_index = cls.__compute_order_cancel_begin_single(
                code, order_cancel_begin_start
                , continue_cancel, order_cancel_begin_end)
            if cancel_start_index is not None:
                cls.debug(code, "找到大群撤信号,连续笔数阈值:{}, 撤单信号范围:{}-{}", continue_cancel, cancel_start_index,
                          cancel_end_index)
                # 判断是否有大群撤大单撤
                need_cancel = L2BetchCancelBigNumProcessor.need_cancel(code, cancel_start_index, cancel_end_index)
                if need_cancel:
                    cls.debug(code, "大群撤信号有大单撤销")
                else:
                    cls.debug(code, "大群撤信号无大单撤销")
        if need_cancel:
            # 需要撤买
            cls.cancel_buy(code)
            if cancel_end_index >= end_index:
                return
            # 继续处理下单信号
            threshold_money = cls.__get_threshmoney(code)
            cls.__start_compute_buy(code, cancel_end_index + 1, end_index, threshold_money, capture_time, False)
        else:
            # 是否有虚拟下单
            unreal_buy_info = cls.unreal_buy_dict.get(code)
            if unreal_buy_info is not None:
                cls.debug(code, "有虚拟下单,无买撤信号,开始执行买入,执行位置:{},截图时间:{}", unreal_buy_info[0], capture_time)
                # unreal_buy_info 的内容格式为:(触法买操作下标,截图时间)
                # 真实下单
                cls.__buy(code, unreal_buy_info[1], local_today_datas[code][unreal_buy_info[0]],
                          unreal_buy_info[0])
    # 过时 开始计算撤的信号
    @classmethod
    def __start_compute_cancel(cls, code, cancel_index, compute_start_index, origin_num, threshold_money, capture_time):
        # sure_type 0-虚拟挂买位  1-真实挂买位
@@ -552,7 +670,7 @@
            if computed_index < len(local_today_datas[code]) - 1:
                # 数据尚未处理完,重新进入下单计算流程
                cls.__start_compute_buy(code, computed_index + 1, threshold_money, capture_time)
                cls.__start_compute_buy(code, computed_index + 1, threshold_money, capture_time, False)
                pass
        else:
            cls.debug(code, "撤买纯买额计算,计算位置:{}-{},目前为止纯买手数:{}", compute_start_index, total_datas[-1]["index"],
@@ -596,21 +714,38 @@
            TradePointManager.delete_buy_point(code)
            TradePointManager.delete_buy_cancel_point(code)
            TradePointManager.delete_compute_info_for_cancel_buy(code)
            TradePointManager.delete_count_info_for_cancel_buy(code)
            # 删除大群撤事件的大单
            L2BetchCancelBigNumProcessor.del_recod(code)
            cls.debug(code, "执行撤单成功")
        except Exception as e:
            cls.debug(code, "执行撤单异常:{}", str(e))
    @classmethod
    def __start_compute_buy(cls, code, compute_start_index, threshold_money, capture_time):
    def cancel_buy(cls, code):
        # 删除大群撤事件的大单
        L2BetchCancelBigNumProcessor.del_recod(code)
        L2ContinueLimitUpCountManager.del_data(code)
        if code in cls.unreal_buy_dict:
            cls.unreal_buy_dict.pop(code)
        else:
            cls.__cancel_buy(code)
    @classmethod
    def __start_compute_buy(cls, code, compute_start_index, compute_end_index, threshold_money, capture_time,
                            new_add=True):
        if compute_end_index < compute_start_index:
            return
        total_datas = local_today_datas[code]
        # 获取买入信号计算起始位置
        # index, num, finish_index = cls.__get_order_begin_pos(code)
        buy_single_index, buy_exec_index, buy_compute_index, num = cls.__get_order_begin_pos(code)
        # 是否为新获取到的位置
        new_get_pos = False
        if buy_single_index is None:
            # 有买入信号
            has_single, _index = cls.__compute_order_begin_pos(code, max(compute_start_index - 3, 0), 3)
            has_single, _index = cls.__compute_order_begin_pos(code, max(
                compute_start_index - 2 if new_add else compute_start_index, 0), 3, compute_end_index)
            buy_single_index = _index
            if has_single:
                num = 0
@@ -623,9 +758,16 @@
            return None
        # 买入纯买额统计
        compute_index, buy_nums = cls.__sum_buy_num_for_order_2(code, max(buy_single_index, compute_start_index), num,threshold_money,buy_single_index)
        compute_index, buy_nums, rebegin_buy_pos = cls.__sum_buy_num_for_order_3(code, max(buy_single_index,
                                                                                           compute_start_index),
                                                                                 compute_end_index, num,
                                                                                 threshold_money, buy_single_index,
                                                                                 capture_time)
        if rebegin_buy_pos is not None:
            # 需要重新计算纯买额
            cls.__start_compute_buy(code, rebegin_buy_pos, compute_end_index, threshold_money, capture_time, False)
            return
        #cls.__sum_buy_num_for_order(code, max(buy_single_index, compute_start_index), num,threshold_money)
        if compute_index is not None:
            cls.debug(code, "获取到买入执行位置:{} m值:{} 纯买手数:{} 数据:{}", compute_index, threshold_money, buy_nums,
                      total_datas[compute_index])
@@ -636,23 +778,41 @@
            # 删除之前的所有撤单信号
            TradePointManager.delete_buy_cancel_point(code)
            TradePointManager.delete_compute_info_for_cancel_buy(code)
            # 为买撤保存基础纯买额
            TradePointManager.set_compute_info_for_cancel_buy(code, compute_index, buy_nums)
            TradePointManager.delete_count_info_for_cancel_buy(code)
            TradeBuyDataManager.remove_buy_position_info(code)
            # 已过时 为买撤保存基础纯买额
            # TradePointManager.set_compute_info_for_cancel_buy(code, compute_index, buy_nums)
            b_buy_count, b_buy_cancel_count = cls.__count_l2_data_before_for_cancel(code, buy_single_index)
            buy_count, buy_cancel_count = cls.__count_l2_data_for_cancel(code, buy_single_index, compute_index)
            TradePointManager.set_count_info_for_cancel_buy(code, compute_index, b_buy_count + buy_count,
                                                            b_buy_cancel_count + buy_cancel_count)
            # 计算大单(从买入信号起始点到挂单执行点),返回是否取消
            cancel_result, cancel_data = L2BigNumProcessor.process_cancel_with_big_num(code, buy_single_index,
                                                                                       compute_index)
            # 计算大群撤的大单
            L2BetchCancelBigNumProcessor.process_new(code, buy_single_index, compute_index)
            L2ContinueLimitUpCountManager.process(code, buy_single_index, compute_index)
            # 数据是否处理完毕
            if L2DataUtil.is_index_end(code, compute_index):
            if compute_index >= compute_end_index:
                cls.debug(code, "数据处理完毕,下单, 数据截图时间-{}", capture_time)
                # 数据已经处理完毕,下单
                cls.__buy(code, capture_time, total_datas[compute_index], compute_index)
                # 数据已经处理完毕,如果还没撤单就实际下单
                if not cancel_result:
                    cls.__buy(code, capture_time, total_datas[compute_index], compute_index)
            else:
                # 数据尚未处理完毕,进行下一步处理
                cls.debug(code, "数据尚未处理完毕,进行下一步处理,处理进度:{}", compute_index)
                cls.__process_order(code, compute_index + 1, capture_time)
                # 如果还没撤单,就继续处理已下单的步骤
                if not cancel_result:
                    cls.__process_order(code, compute_index + 1, compute_end_index, capture_time, False)
                else:
                    cls.__start_compute_buy(code, compute_index + 1, compute_end_index, threshold_money, capture_time,
                                            False)
        else:
            # 未达到下单条件,保存纯买额,设置纯买额
            # 记录买入信号位置
            cls.__save_order_begin_data(code, buy_single_index, -1, len(total_datas) - 1, buy_nums)
            cls.__save_order_begin_data(code, buy_single_index, -1, compute_end_index, buy_nums)
        pass
    # 获取下单起始信号
@@ -668,105 +828,102 @@
    # 计算下单起始信号
    # compute_data_count 用于计算的l2数据数量
    @classmethod
    def __compute_order_begin_pos(cls, code, start_index, continue_count):
    def __compute_order_begin_pos(cls, code, start_index, continue_count, end_index):
        # 倒数100条数据查询
        datas = local_today_datas[code]
        __len = len(datas)
        if len(datas) - start_index < continue_count:
        if end_index - start_index + 1 < continue_count:
            return False, None
        __time = None
        _limit_up_count_1s = 0
        _limit_up_count_1s_start_index = -1
        for i in range(start_index, __len - (continue_count - 1)):
        last_index = None
        count = 0
        start = None
        for i in range(start_index, end_index + 1):
            _val = datas[i]["val"]
            # 时间要>=09:30:00
            if L2DataUtil.get_time_as_second(_val["time"]) < second_930:
                continue
            # 有连续4个涨停买就标记计算起始点
            if L2DataUtil.is_limit_up_price_buy(_val):
                index_0 = i
                index_1 = -1
                index_2 = -1
                # index_3 = -1
                for j in range(index_0 + 1, __len):
                    # 涨停买
                    if L2DataUtil.is_limit_up_price_buy(datas[j]["val"]):
                        index_1 = j
                        break
                if index_1 > 0:
                    for j in range(index_1 + 1, __len):
                        # 涨停买
                        if L2DataUtil.is_limit_up_price_buy(datas[j]["val"]):
                            index_2 = j
                            break
                # if index_2 > 0:
                #     for j in range(index_2 + 1, __len):
                #         # 涨停买
                #         if datas[j]["val"]["limitPrice"] == 1 and datas[j]["val"]["operateType"] == 0:
                #             index_3 = j
                if index_1 - index_0 == 1 and index_2 - index_1 == 1:  # and index_3 - index_2 == 1
                    logger_l2_trade.info("找到物理连续涨停买 {},{},{}".format(code, i, datas[i]))
                    return True, i
            # 同1s内有不连续的4个涨停买(如果遇买撤就重新计算,中间可间隔不涨停买)标记计算起始点
            if L2DataUtil.is_limit_up_price_buy(_val):
                # 涨停买
                if __time is None:
                    _time = datas[i]["val"]["time"]
                    _limit_up_count_1s = 1
                    _limit_up_count_1s_start_index = i
                elif _time == _val["time"]:
                    _limit_up_count_1s += 1
                else:
                    _time = datas[i]["val"]["time"]
                    _limit_up_count_1s = 1
                    _limit_up_count_1s_start_index = i
            elif _val["operateType"] == 1:
                # 买撤
                _time = None
                _limit_up_count_1s = 0
                _limit_up_count_1s_start_index = -1
            if _limit_up_count_1s >= 4 and _limit_up_count_1s_start_index > -1:
                logger_l2_trade.info("找到同一秒连续涨停买 {},{},{}".format(code, _limit_up_count_1s_start_index, datas[i]))
                return True, _limit_up_count_1s_start_index
            if L2DataUtil.is_limit_up_price_buy(_val) and (last_index is None or (i - last_index == 1 and datas[last_index]["val"]["time"] == datas[i]["val"]["time"])):
                if start is None:
                    start = i
                last_index = i
                count += datas[i]["re"]
                if count >= continue_count:
                    return True, start
            else:
                last_index = None
                count = 0
                start = None
        return False, None
    # 是否有撤销信号
    # 大群撤事件,最多相隔1s
    @classmethod
    def __compute_order_cancel_begin_single(cls, code, start_index, continue_count):
    def __compute_order_cancel_begin_single(cls, code, start_index, continue_count, end_index):
        datas = local_today_datas[code]
        if end_index - start_index + 1 < continue_count:
            return None, None
        count = 0
        start = -1
        start_time = None
        for i in range(start_index, end_index + 1):
            _val = datas[i]["val"]
            _timestamp = L2DataUtil.get_time_as_second(_val["time"])
            if L2DataUtil.get_time_as_second(_val["time"]) < second_930:
                continue
            if L2DataUtil.is_limit_up_price_buy_cancel(_val) and (start_time is None or _timestamp - start_time < 2):
                if start == -1:
                    start = i
                    start_time = L2DataUtil.get_time_as_second(_val["time"])
                count += datas[i]["re"]
            else:
                if count >= continue_count:
                    return start, i - 1
                start = -1
                count = 0
                start_time = None
        if count >= continue_count:
            return start, end_index
        else:
            return None, None
    # 小群撤事件
    @classmethod
    def __compute_order_cancel_little_begin_single(cls, code, start_index, continue_count, end_index=None):
        # 必须为同一秒的数据
        same_second = True
        datas = local_today_datas[code]
        __len = len(datas)
        if len(datas) - start_index < continue_count:
            return None
        for i in range(start_index, __len - (continue_count - 1)):
            return None, None
        count = 0
        start = -1
        start_time = None
        if end_index is None:
            end_index = __len - continue_count
        for i in range(start_index, end_index + 1):
            _val = datas[i]["val"]
            if L2DataUtil.get_time_as_second(_val["time"]) < second_930:
            _timestamp = L2DataUtil.get_time_as_second(_val["time"])
            if _timestamp < second_930:
                continue
            # 有连续3个买撤
            if L2DataUtil.is_limit_up_price_buy_cancel(_val):
                index_0 = i
                index_1 = -1
                index_2 = -1
                for j in range(index_0 + 1, __len):
                    # 涨停买
                    if L2DataUtil.is_limit_up_price_buy_cancel(datas[j]["val"]):
                        index_1 = j
                        break
                if index_1 > 0:
                    for j in range(index_1 + 1, __len):
                        # 涨停买
                        if L2DataUtil.is_limit_up_price_buy_cancel(datas[j]["val"]):
                            index_2 = j
                            break
                if index_1 - index_0 == 1 and index_2 - index_1 == 1:
                    logger_l2_trade.info("连续3个涨停买撤 {},{},{}".format(code, i, json.dumps(datas[i])))
                    return i
        return None
            # 间隔时间不能多于1s
            if L2DataUtil.is_limit_up_price_buy_cancel(_val) and (start_time is None or _timestamp - start_time < 2):
                if start == -1:
                    start = i
                    start_time = L2DataUtil.get_time_as_second(_val["time"])
                count += int(datas[i]["re"])
            else:
                if count >= continue_count:
                    return start, i - 1
                start = -1
                count = 0
                start_time = None
        if count >= continue_count:
            return start, end_index
        else:
            return None, None
    # 是否可以下单
    def __is_can_order(self):
@@ -789,7 +946,7 @@
        else:
            return 1, index, data
    # 统计买入净买量
    # 过时 统计买入净买量
    @classmethod
    def __sum_buy_num_for_order(cls, code, compute_start_index, origin_num, threshold_money):
        total_datas = local_today_datas[code]
@@ -805,16 +962,16 @@
                # 涨停买
                buy_nums += int(_val["num"]) * int(total_datas[i]["re"])
                if buy_nums >= threshold_num:
                    logger_l2_trade_buy.info("{}获取到买入执行点:{} 统计纯买手数:{} 目标纯买手数:{}", code, i, buy_nums, threshold_num)
                    cls.debug(code, "获取到买入执行点:{} 统计纯买手数:{} 目标纯买手数:{}", i, buy_nums, threshold_num)
                    return i, buy_nums
            elif L2DataUtil.is_limit_up_price_buy_cancel(_val):
                # 涨停买撤
                buy_nums -= int(_val["num"]) * int(total_datas[i]["re"])
        logger_l2_trade_buy.info("{}尚未获取到买入执行点,起始计算位置:{} 统计纯买手数:{} 目标纯买手数:{}", code, compute_start_index, buy_nums,
                                 threshold_num)
        cls.debug(code, "尚未获取到买入执行点,起始计算位置:{} 统计纯买手数:{} 目标纯买手数:{}", compute_start_index, buy_nums,
                  threshold_num)
        return None, buy_nums
    # 统计买入净买量,不计算在买入信号之前的买撤单
    # 过时 统计买入净买量,不计算在买入信号之前的买撤单
    @classmethod
    def __sum_buy_num_for_order_2(cls, code, compute_start_index, origin_num, threshold_money, buy_single_index):
        total_datas = local_today_datas[code]
@@ -866,6 +1023,98 @@
                      threshold_num)
        return None, buy_nums + property_buy_num
    # 统计买入净买量,不计算在买入信号之前的买撤单
    @classmethod
    def __sum_buy_num_for_order_3(cls, code, compute_start_index, compute_end_index, origin_num, threshold_money,
                                  buy_single_index,
                                  capture_time):
        total_datas = local_today_datas[code]
        buy_nums = origin_num
        limit_up_price = gpcode_manager.get_limit_up_price(code)
        if limit_up_price is None:
            raise Exception("涨停价无法获取")
        threshold_num = threshold_money / (limit_up_price * 100)
        buy_single_time_seconds = L2DataUtil.get_time_as_second(total_datas[buy_single_index]["val"]["time"])
        for i in range(compute_start_index, compute_end_index + 1):
            data = total_datas[i]
            _val = total_datas[i]["val"]
            if L2DataUtil.get_time_as_second(_val["time"]) - buy_single_time_seconds > 1:
                TradePointManager.delete_buy_point(code)
                if i == compute_end_index:
                    # 数据处理完毕
                    return None, buy_nums, None
                else:
                    # 计算买入信号,不能同一时间开始计算
                    for ii in range(buy_single_index + 1, compute_end_index + 1):
                        if total_datas[buy_single_index]["val"]["time"] != total_datas[ii]["val"]["time"]:
                            return None, buy_nums, ii
            # 涨停买
            if L2DataUtil.is_limit_up_price_buy(_val):
                # 涨停买
                buy_nums += int(_val["num"]) * int(total_datas[i]["re"])
                if buy_nums >= threshold_num:
                    logger_l2_trade_buy.info("{}获取到买入执行点:{} 统计纯买手数:{} 目标纯买手数:{}", code, i, buy_nums, threshold_num)
            elif L2DataUtil.is_limit_up_price_buy_cancel(_val):
                # 涨停买撤
                # 判断买入位置是否在买入信号之前
                buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(total_datas[i],
                                                                                 local_today_num_operate_map.get(code))
                if buy_index is not None:
                    # 找到买撤数据的买入点
                    if buy_index >= buy_single_index:
                        buy_nums -= int(_val["num"]) * int(data["re"])
                        cls.buy_debug(code, "{}数据在买入信号之后 撤买纯买手数:{} 目标手数:{}", i, buy_nums, threshold_num)
                    else:
                        cls.buy_debug(code, "{}数据在买入信号之前,买入位:{}", i, buy_index)
                        if total_datas[buy_single_index]["val"]["time"] == buy_data["val"]["time"]:
                            # 同一秒,当作买入信号之后处理
                            buy_nums -= int(_val["num"]) * int(data["re"])
                            cls.buy_debug(code, "{}数据买入位与预估买入位在同一秒", i)
                else:
                    # 未找到买撤数据的买入点
                    cls.buy_debug(code, "未找到买撤数据的买入点: 位置-{} 数据-{}", i, data)
                    buy_nums -= int(_val["num"]) * int(total_datas[i]["re"])
            cls.buy_debug(code, "位置-{},总手数:{},目标手数:{}", i,
                          buy_nums, threshold_num)
            # 有撤单信号,且小于阈值
            if buy_nums >= threshold_num:
                return i, buy_nums, None
        cls.buy_debug(code, "尚未获取到买入执行点,起始计算位置:{} 统计纯买手数:{} 目标纯买手数:{}", compute_start_index,
                      buy_nums,
                      threshold_num)
        return None, buy_nums, None
    # 计算买入信号之前的且和买入信号数据在同一时间的数量
    @classmethod
    def __count_l2_data_before_for_cancel(cls, code, buy_single_index):
        total_data = local_today_datas[code]
        single_time = total_data[buy_single_index]["val"]["time"]
        buy_count = 0
        cancel_count = 0
        for i in range(buy_single_index, -1, -1):
            if single_time == total_data[i]["val"]["time"]:
                if L2DataUtil.is_limit_up_price_buy(total_data[i]["val"]):
                    buy_count += int(total_data[i]["re"])
                elif L2DataUtil.is_limit_up_price_buy(total_data[i]["val"]):
                    cancel_count += int(total_data[i]["re"])
            else:
                break
        return buy_count, cancel_count
    @classmethod
    def __count_l2_data_for_cancel(cls, code, start_index, end_index):
        total_data = local_today_datas[code]
        buy_count = 0
        cancel_count = 0
        for i in range(start_index, end_index + 1):
            if L2DataUtil.is_limit_up_price_buy(total_data[i]["val"]):
                buy_count += int(total_data[i]["re"])
            elif L2DataUtil.is_limit_up_price_buy_cancel(total_data[i]["val"]):
                cancel_count += int(total_data[i]["re"])
        return buy_count, cancel_count
    # 同一时间买入的概率计算
    @classmethod
    def __get_same_time_property(cls, code):
@@ -877,7 +1126,7 @@
                return 1 - l2_trade_factor.L2TradeFactorUtil.get_industry_rate(hot_num)
        return 0.5
    # 统计买撤净买量
    # 过时 统计买撤净买量
    @classmethod
    def __sum_buy_num_for_cancel_order(cls, code, start_index, origin_num, threshold_money, cancel_single=True):
        buy_nums = origin_num
@@ -934,17 +1183,542 @@
        cls.cancel_debug(code, "处理起始位置:{} 最终纯买额:{}", start_index, buy_num_news)
        return None, buy_num_news, sure_type
        # 统计买撤净买量
    @classmethod
    def __count_num_for_cancel_order(cls, code, start_index, origin_buy_num, origin_cancel_num, min_rate,
                                     betch_cancel_single=True):
        buy_nums = origin_buy_num
        buy_cancel_num = origin_cancel_num
        total_datas = local_today_datas[code]
        limit_up_price = gpcode_manager.get_limit_up_price(code)
        if limit_up_price is None:
            raise Exception("涨停价无法获取")
        # 获取预估挂买位 sure_type:0 虚拟挂买 1 实际挂买
        for i in range(start_index, len(total_datas)):
            data = total_datas[i]
            _val = data["val"]
            if L2DataUtil.is_limit_up_price_buy(_val):
                # 涨停买
                buy_nums += int(data["re"])
            elif L2DataUtil.is_limit_up_price_buy_cancel(_val):
                buy_cancel_num += int(data["re"])
            # 有撤单信号,且小于阈值
            if (buy_nums - buy_cancel_num) / buy_cancel_num <= min_rate and betch_cancel_single:
                return i, buy_nums, buy_cancel_num
        return None, buy_nums, buy_cancel_num
    @classmethod
    def test(cls):
        code = "002336"
        code = "000593"
        load_l2_data(code, True)
        if False:
            state = trade_manager.get_trade_state(code)
            cls.random_key[code] = random.randint(0, 100000)
            capture_timestamp = 1999988888
            try:
                if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER:
                    # 已挂单
                    cls.__process_order(code, 201, 237, capture_timestamp)
                else:
                    # 未挂单
                    cls.__process_not_order(code, 201, 237, capture_timestamp)
            except Exception as e:
                logging.exception(e)
            return
        _start = t.time()
        # 按s批量化数据
        total_datas = local_today_datas[code]
        start_time = total_datas[0]["val"]["time"]
        start_index = 0
        for i in range(0, len(total_datas)):
            if total_datas[i]["val"]["time"] != start_time:
                cls.random_key[code] = random.randint(0, 100000)
                # 处理数据
                start = start_index
                # if start != 201:
                #     continue
                end = i - 1
                print("处理进度:{},{}".format(start, end))
                capture_timestamp = 1999999999
                state = trade_manager.get_trade_state(code)
                try:
                    if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER:
                        # 已挂单
                        cls.__process_order(code, start, end, capture_timestamp)
                    else:
                        # 未挂单
                        cls.__process_not_order(code, start, end, capture_timestamp)
                except Exception as e:
                    logging.exception(e)
                # t.sleep(1)
                start_index = i
                start_time = total_datas[i]["val"]["time"]
        print("时间花费:", round((t.time() - _start) * 1000))
    @classmethod
    def test1(cls):
        code = "000593"
        load_l2_data(code, True)
        print( cls.__compute_order_begin_pos(code,232,3,239))
    @classmethod
    def test2(cls):
        code = "000677"
        load_l2_data(code, True)
        cls.random_key[code] = random.randint(0, 100000)
        load_l2_data(code)
        try:
            # cls.__sum_buy_num_for_cancel_order(code, 112, 100000, 10000000)
            has_single, _index = cls.__compute_order_begin_pos(code, max(9, 0), 3)
            print(has_single, _index)
        except Exception as e:
            logging.exception(e)
        L2BetchCancelBigNumProcessor.process_new(code, 57, 150)
# 连续涨停买单数最大值管理器
class L2ContinueLimitUpCountManager:
    @classmethod
    def del_data(cls, code):
        cls.__del_last_record(code)
        cls.__del_max(code)
    # 获取最大值
    @classmethod
    def __get_max(cls, code):
        key = "max_same_time_buy_count-{}".format(code)
        redis = _redisManager.getRedis()
        val = redis.get(key)
        if val is not None:
            return int(val)
        else:
            return None
    # 保存最大值
    @classmethod
    def __save_max(cls, code, max_num):
        key = "max_same_time_buy_count-{}".format(code)
        redis = _redisManager.getRedis()
        redis.setex(key, tool.get_expire(), max_num)
    @classmethod
    def __del_max(cls, code):
        key = "max_same_time_buy_count-{}".format(code)
        redis = _redisManager.getRedis()
        redis.delete(key)
    # 保存上一条数据最大值
    @classmethod
    def __save_last_record(cls, code, _time, count, index):
        key = "same_time_buy_last_count-{}".format(code)
        redis = _redisManager.getRedis()
        redis.setex(key, tool.get_expire(), json.dumps((_time, count, index)))
    @classmethod
    def __del_last_record(cls, code):
        key = "same_time_buy_last_count-{}".format(code)
        redis = _redisManager.getRedis()
        redis.delete(key)
    @classmethod
    def __get_last_record(cls, code):
        key = "same_time_buy_last_count-{}".format(code)
        redis = _redisManager.getRedis()
        val = redis.get(key)
        if val is None:
            return None, None, None
        else:
            val = json.loads(val)
            return val[0], val[1], val[2]
    @classmethod
    def process(cls, code, start_index, end_index):
        last_time, last_count, last_index = cls.__get_last_record(code)
        total_datas = local_today_datas[code]
        time_count_dict = {}
        for index in range(start_index, end_index + 1):
            if last_index is not None and last_index >= index:
                continue
            if L2DataUtil.is_limit_up_price_buy(total_datas[index]["val"]):
                if last_count is None:
                    last_count = 0
                    last_time = total_datas[index]["val"]["time"]
                    last_index = index
                if last_time == total_datas[index]["val"]["time"]:
                    last_count += total_datas[index]["re"]
                    last_index = index
                else:
                    if last_count is not None and last_count > 0:
                        time_count_dict[last_time] = last_count
                    last_count = total_datas[index]["re"]
                    last_time = total_datas[index]["val"]["time"]
                    last_index = index
            else:
                if last_count is not None and last_count > 0:
                    time_count_dict[last_time] = last_count
                last_count = 0
                last_time = None
                last_index = None
        if last_count is not None and last_count > 0:
            time_count_dict[last_time] = last_count
            # 保存latest
            cls.__save_last_record(code, last_time, last_count, last_index)
        else:
            # 移除
            cls.__del_last_record(code)
        # 查找这批数据中的最大数量
        max_time = None
        max_num = None
        for key in time_count_dict:
            if max_time is None:
                max_time = key
                max_num = time_count_dict[key]
            if time_count_dict[key] > max_num:
                max_num = time_count_dict[key]
                max_time = key
        if max_num is not None:
            old_max = cls.__get_max(code)
            if old_max is None or max_num > old_max:
                cls.__save_max(code, max_num)
    @classmethod
    def get_continue_count(cls, code):
        count = cls.__get_max(code)
        if count is None:
            count = 0
        count = count // 3
        if count < 15:
            count = 15
        return count
# 大单处理器
class L2BigNumProcessor:
    # 是否需要根据大单撤单,返回是否需要撤单与撤单信号的数据
    @classmethod
    def __need_cancel_with_max_num(cls, code, max_num_info):
        if max_num_info is None:
            return False, None
        # 如果是买入单,需要看他前面同一秒是否有撤单
        if int(max_num_info["val"]["operateType"]) == 0:
            # 只有买撤信号在买入信号之前的同一秒的单才会撤单情况
            _map = local_today_num_operate_map.get(code)
            if _map is not None:
                cancel_datas = _map.get(
                    "{}-{}-{}".format(max_num_info["val"]["num"], "1", max_num_info["val"]["price"]))
                if cancel_datas is not None:
                    for cancel_data in cancel_datas:
                        if cancel_data["index"] > max_num_info["index"]:
                            buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(cancel_data,
                                                                                             local_today_num_operate_map[
                                                                                                 code])
                            if buy_index is None:
                                continue
                            if buy_data["val"]["time"] != max_num_info["val"]["time"]:
                                continue
                            min_space, max_space = l2_data_util.compute_time_space_as_second(
                                cancel_data["val"]["cancelTime"],
                                cancel_data["val"][
                                    "cancelTimeUnit"])
                            if min_space < 60:
                                L2TradeDataProcessor.cancel_debug(code, "找到大单撤单,但撤单间隔时间小于60s,撤单数据-{}",
                                                                  json.dumps(cancel_data))
                                return True, cancel_data
                            else:
                                # 如果间隔时间大于等于60s,这判断小群撤事件
                                L2TradeDataProcessor.cancel_debug(code, "找到大单撤单,但撤单间隔时间大于60s,撤单数据-{}",
                                                                  json.dumps(cancel_data))
                                return False, cancel_data
            return False, None
        else:
            return True, None
    # 计算数量最大的涨停买/涨停撤
    @classmethod
    def __compute_max_num(cls, code, start_index, end_index, max_num_info, buy_exec_time):
        new_max_info = max_num_info
        max_num = 0
        if max_num_info is not None:
            max_num = int(max_num_info["val"]["num"])
        # 计算大单
        total_data = local_today_datas[code]
        for i in range(start_index, end_index + 1):
            data = total_data[i]
            val = data["val"]
            if not L2DataUtil.is_limit_up_price_buy(val) and not L2DataUtil.is_limit_up_price_buy_cancel(
                    val):
                continue
            # 下单时间与买入执行时间之差大于60s的不做处理
            if l2_data_util.get_time_as_seconds(val["time"]) - l2_data_util.get_time_as_seconds(buy_exec_time) > 1:
                continue
            if L2DataUtil.is_limit_up_price_buy(val):
                pass
            elif L2DataUtil.is_limit_up_price_buy_cancel(val):
                min_space, max_space = l2_data_util.compute_time_space_as_second(val["cancelTime"],
                                                                                 val["cancelTimeUnit"])
                # 只能处理1s内的撤单
                if min_space > 1:
                    continue
                # buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data,
                #                                                                  local_today_num_operate_map.get(code))
                # if buy_index is None:
                #     continue
                # if l2_data_util.get_time_as_seconds(buy_data["val"]["time"]) - l2_data_util.get_time_as_seconds(
                #         buy_exec_time) > 1:
                #     continue
            num = int(total_data[i]["val"]["num"])
            if num > max_num:
                max_num = num
                new_max_info = data
        return new_max_info
    @classmethod
    def __save_big_num_pos(cls, code, index):
        redis = _redisManager.getRedis()
        redis.setex("big_num_pos-{}".format(code), tool.get_expire(), index)
    @classmethod
    def __get_big_num_pos(cls, code):
        redis = _redisManager.getRedis()
        index = redis.get("big_num_pos-{}".format(code))
        if index is not None:
            return int(index)
        return index
    @classmethod
    def __del_big_num_pos(cls, code):
        redis = _redisManager.getRedis()
        redis.delete("big_num_pos-{}".format(code))
    @classmethod
    def __cancel_buy(cls, code, index):
        L2TradeDataProcessor.debug(code, "撤买,触发位置-{},触发条件:大单,数据:{}", index, local_today_datas[code][index])
        L2TradeDataProcessor.cancel_buy(code)
        cls.__del_big_num_pos(code)
    # 处理数据中的大单,返回是否已经撤单和撤单数据的时间
    @classmethod
    def process_cancel_with_big_num(cls, code, start_index, end_index):
        total_data = local_today_datas[code]
        # 如果无下单信号就无需处理
        buy_single_index, buy_exec_index, compute_index, nums = TradePointManager.get_buy_compute_start_data(code)
        if buy_single_index is None or buy_exec_index is None or buy_exec_index < 0:
            return False, None
        # 判断是否有大单记录
        index = cls.__get_big_num_pos(code)
        # 无大单记录
        if index is None:
            # 计算大单
            new_max_info = cls.__compute_max_num(code, start_index, end_index, None,
                                                 total_data[buy_exec_index]["val"]["time"])
            if new_max_info is None:
                return False, None
            L2TradeDataProcessor.debug(code, "获取到大单位置信息:{}", json.dumps(new_max_info))
            index = new_max_info["index"]
            # 大单是否有撤单信号
            need_cancel, cancel_data = cls.__need_cancel_with_max_num(code, new_max_info)
            if need_cancel:
                # 需要撤单
                # 撤单
                L2TradeDataProcessor.cancel_debug(code, "新找到大单-{},需要撤买", new_max_info["index"])
                cls.__cancel_buy(code, index)
                return True, cancel_data,
            else:
                # 无需撤单
                # 保存大单记录
                cls.__save_big_num_pos(code, index)
                return False, None
        else:
            # 有大单记录
            need_cancel = False
            cancel_index = -1
            need_cancel, cancel_data = cls.__need_cancel_with_max_num(code, total_data[index])
            # 需要撤单
            if need_cancel:
                # 撤单
                cls.__cancel_buy(code, cancel_index)
                return True, cancel_data
            # 无需撤单
            else:
                # 计算新的大单
                max_num_data = cls.__compute_max_num(code, start_index, end_index, total_data[index],
                                                     total_data[buy_exec_index]["val"]["time"])
                if index == int(max_num_data["index"]):
                    return False, cancel_data
                L2TradeDataProcessor.debug(code, "找到大单位置信息:{}", json.dumps(max_num_data))
                # 大单是否有撤单信号
                need_cancel, cancel_data = cls.__need_cancel_with_max_num(code, max_num_data)
                if need_cancel:
                    # 需要撤单
                    # 撤单
                    cls.__cancel_buy(code, max_num_data["index"])
                    L2TradeDataProcessor.cancel_debug(code, "跟踪到大单无撤买信号-{},新跟踪的大单需要撤买-{}", index, max_num_data["index"])
                    return True, cancel_data
                else:
                    # 无需撤单
                    # 保存大单记录
                    cls.__save_big_num_pos(code, max_num_data["index"])
                    return False, cancel_data
# 大群撤大单跟踪
class L2BetchCancelBigNumProcessor:
    @classmethod
    def __get_recod(cls, code):
        redis = _redisManager.getRedis()
        _val = redis.get("betch_cancel_big_num-{}".format(code))
        if _val is None:
            return None, None
        else:
            datas = json.loads(_val)
            return datas[0], datas[1]
    @classmethod
    def del_recod(cls, code):
        redis = _redisManager.getRedis()
        key = "betch_cancel_big_num-{}".format(code)
        redis.delete(key)
    @classmethod
    def __save_recod(cls, code, max_big_num_info, big_nums_info):
        redis = _redisManager.getRedis()
        key = "betch_cancel_big_num-{}".format(code)
        redis.setex(key, tool.get_expire(), json.dumps((max_big_num_info, big_nums_info)))
    # 暂时弃用
    @classmethod
    def need_cancel(cls, code, start_index, end_index):
        # 是否需要撤单
        max_big_num_info, big_nums_info = cls.__get_recod(code)
        if big_nums_info is None:
            # 无大单信息
            return True
        nums_set = set()
        index_set = set()
        for d in big_nums_info:
            nums_set.add(d[0])
            index_set.add(d[1])
        total_datas = local_today_datas[code]
        count = 0
        latest_buy_index = end_index
        for index in range(start_index, end_index + 1):
            if not nums_set.__contains__(total_datas[index]["val"]["num"]):
                continue
            buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(total_datas[index],
                                                                             local_today_num_operate_map[code])
            if buy_index is None:
                continue
            if index_set.__contains__(buy_index):
                count += buy_data["re"]
                latest_buy_index = buy_index
        # 获取大单数量
        total_count = 0
        for i in index_set:
            if i <= latest_buy_index:
                total_count += total_datas[i]["re"]
        # 大单小于5笔无脑撤
        if total_count <= 5:
            return True
        # 大单撤单笔数大于总大单笔数的1/5就撤单
        if count / total_count >= 0.2:
            return True
        else:
            return False
        pass
    # def need_cancel(cls, code, start_index, end_index):
    #     total_datas = local_today_datas[code]
    #     for index in range(start_index,end_index+1):
    #         price = total_datas[index]["val"]["price"]
    #         num = total_datas[index]["val"]["num"]
    #         if total_datas[index]
    # 过时
    @classmethod
    def process(cls, code, start_index, end_index):
        # 处理大单
        # 获取大单列表,大单格式为:((num,index,re),[(num,index,re),(num,index,re)])
        total_datas = local_today_datas[code]
        max_big_num_info, big_nums_info = cls.__get_recod(code)
        # 寻找最大值
        for index in range(start_index, end_index + 1):
            # 只处理涨停买与涨停买撤
            if not L2DataUtil.is_limit_up_price_buy(
                    total_datas[index]["val"]):
                continue
            if max_big_num_info is None:
                max_big_num_info = (
                    int(total_datas[start_index]["val"]["num"]), total_datas[start_index]["index"])
            if int(total_datas[index]["val"]["num"]) > max_big_num_info[0]:
                max_big_num_info = (
                    int(total_datas[index]["val"]["num"]), total_datas[index]["index"])
        # 将大于最大值90%的数据加入
        if max_big_num_info is not None:
            min_num = round(max_big_num_info[0] * 0.9)
            for index in range(start_index, end_index + 1):
                # 只统计涨停买
                if not L2DataUtil.is_limit_up_price_buy(
                        total_datas[index]["val"]):
                    continue
                if int(total_datas[index]["val"]["num"]) >= min_num:
                    if big_nums_info is None:
                        big_nums_info = []
                    big_nums_info.append((int(total_datas[index]["val"]["num"]), total_datas[index]["index"]))
            # 移除小于90%的数据
            big_nums_info_new = []
            index_set = set()
            for d in big_nums_info:
                if d[0] >= min_num:
                    if not index_set.__contains__(d[1]):
                        index_set.add(d[1])
                        big_nums_info_new.append(d)
            cls.__save_recod(code, max_big_num_info, big_nums_info_new)
    # 最新方法
    @classmethod
    def process_new(cls, code, start_index, end_index):
        # 处理大单
        # 获取大单列表,大单格式为:((num,index,re),[(num,index,re),(num,index,re)])
        total_datas = local_today_datas[code]
        max_big_num_info, big_nums_info = cls.__get_recod(code)
        # 大于等于8000手或者金额>=300万就是大单
        for index in range(start_index, end_index + 1):
            # 只统计涨停买
            if not L2DataUtil.is_limit_up_price_buy(
                    total_datas[index]["val"]):
                continue
            # 大于等于8000手或者金额 >= 300
            # 万就是大单
            if int(total_datas[index]["val"]["num"]) >= 8000 or int(total_datas[index]["val"]["num"]) * float(
                    total_datas[index]["val"]["price"]) >= 30000:
                if big_nums_info is None:
                    big_nums_info = []
                big_nums_info.append((int(total_datas[index]["val"]["num"]), total_datas[index]["index"]))
        # 移除小于90%的数据
        big_nums_info_new = []
        index_set = set()
        if big_nums_info is not None:
            for d in big_nums_info:
                if not index_set.__contains__(d[1]):
                    index_set.add(d[1])
                    big_nums_info_new.append(d)
            cls.__save_recod(code, max_big_num_info, big_nums_info_new)
def __get_time_second(time_str):
@@ -1096,7 +1870,7 @@
                        index_2 = j
                        break
            if index_1 - index_0 == 1 and index_2 - index_1 == 1:
                logger_l2_trade.info("连续3个涨停买撤 {},{},{}".format(code, i, json.dumps(datas[i])))
                # logger_l2_trade.info("连续3个涨停买撤 {},{},{}".format(code, i, json.dumps(datas[i])))
                return i
    return None
@@ -1192,4 +1966,4 @@
if __name__ == "__main__":
    L2TradeDataProcessor.test()
    L2TradeDataProcessor.test1()
l2_data_util.py
@@ -64,7 +64,7 @@
# 计算时间的区间
def __compute_time_space_as_second(cancel_time, cancel_time_unit):
def compute_time_space_as_second(cancel_time, cancel_time_unit):
    __time = int(cancel_time)
    if int(cancel_time) == 0:
        return 0, 0
@@ -83,7 +83,7 @@
# 根据买撤数据(与今日总的数据)计算买入数据
def get_buy_data_with_cancel_data(cancel_data, local_today_num_operate_map):
    # 计算时间区间
    min_space, max_space = __compute_time_space_as_second(cancel_data["val"]["cancelTime"],
    min_space, max_space = compute_time_space_as_second(cancel_data["val"]["cancelTime"],
                                                          cancel_data["val"]["cancelTimeUnit"])
    max_time = __sub_time(cancel_data["val"]["time"], min_space)
    min_time = __sub_time(cancel_data["val"]["time"], max_space)
@@ -111,7 +111,6 @@
@async_call
def save_big_data(code, same_time_nums, datas):
    return None
    latest_datas = __last_big_data.get(code)
    d1 = json.dumps(datas)
    d2 = json.dumps(latest_datas)
l2_trade_factor.py
@@ -9,7 +9,7 @@
        yi = round(zyltgb / 100000000)
        if yi < 1:
            yi = 1
        return 6000000 + (yi - 1) * 500000
        return 5000000 + (yi - 1) * 500000
    # 自由流通市值影响比例
    @classmethod
@@ -32,7 +32,7 @@
    @classmethod
    def get_industry_rate(cls, total_limit_percent):
        t = total_limit_percent / 10
        rate = t / 0.5 * 0.04
        rate = t / 0.5 * 0.02 + 0.26
        if rate > 0.52:
            rate = 0.52
        return round(rate, 2)
@@ -41,23 +41,23 @@
    @classmethod
    def get_volumn_rate(cls, day60_max, yest, today):
        old_volumn = yest
        base_rate = 0.25
        base_rate = 0.49
        if day60_max > yest:
            old_volumn = day60_max
            base_rate = 0.26
            base_rate = 0.50
        r = round(today / old_volumn, 2)
        print("比例:", r)
        rate = 0
        if r < 0.11:
            rate = base_rate - (r - 0.01)
        elif r < 0.45:
            rate = base_rate - r
        if r <= 0.25:
            rate = base_rate - (r - 0.01) * 2
        elif r <= 0.5:
            rate = 0.25 - r + (0.01 if day60_max > yest else 0)
        elif r < 0.75:
            rate = (base_rate - 0.2049) + (r - 0.74) * 0.4
        elif r < 1.38:
            rate = base_rate - (r - 0.75) * 0.8
            rate = r - 0.75 + (0.01 if day60_max > yest else 0)
        elif r < 1.74:
            rate = base_rate - (r - 0.75)
        else:
            rate = base_rate - 0.5
            rate = base_rate - 0.99
        return round(rate, 4)
    # 当前股票首次涨停时间的影响比例
@@ -67,28 +67,28 @@
        start_m = 9 * 60 + 30
        m = int(times[0]) * 60 + int(times[1])
        dif = m - start_m
        base_rate = 0.15
        base_rate = 0.5
        rate = 0
        if dif < 1:
            rate = base_rate
        elif dif <= 5:
            rate = base_rate - dif * 0.01
            rate = base_rate - dif * 0.02
        elif dif <= 120:
            # 11:30之前
            rate = 0.0985 - (dif - 6) * 0.0015
            rate = 0.39 - (dif - 6) * 0.004
        else:
            rate = 0.0985 - (dif - 89 - 6) * 0.0015
            if rate < -0.15:
                rate = -0.15
            rate = 0.39 - (120 - 6) * 0.004 - (dif - 210 + 1) * 0.004
            if rate < -0.5:
                rate = -0.5
        return round(rate, 4)
    # 纯万手哥影响值(手数》=9000 OR 金额》=500w)
    @classmethod
    def get_big_money_rate(cls, num):
        if num >= 8:
            return 0.08
        if num >= 10:
            return 0.5
        else:
            return num * 0.01
            return round(num * 0.05, 2)
    @classmethod
    def compute_rate(cls, zyltgb, total_industry_limit_percent, volumn_day60_max, volumn_yest, volumn_today,
@@ -135,11 +135,14 @@
    def compute_m_value(cls, code):
        zyltgb = global_util.zyltgb_map.get(code)
        if zyltgb is None:
            print("没有获取到自由流通市值")
            return 10000000
            global_util.load_zyltgb()
            zyltgb = global_util.zyltgb_map.get(code)
            if zyltgb is None:
                print("没有获取到自由流通市值")
                return 10000000
        zyltgb = cls.get_base_safe_val(zyltgb)
        rate = cls.compute_rate_by_code(code)
        print("m值获取:", code, round(zyltgb * rate))
        # print("m值获取:", code, round(zyltgb * rate))
        return round(zyltgb * rate)
@@ -160,6 +163,6 @@
if __name__ == "__main__":
    print(L2TradeFactorUtil.get_big_money_rate(32))
    print(L2TradeFactorUtil.get_big_money_rate(8))
    print(L2TradeFactorUtil.get_big_money_rate(0))
    print(L2TradeFactorUtil.get_big_money_rate(1))
    print(L2TradeFactorUtil.get_big_money_rate(2))
    print(L2TradeFactorUtil.get_big_money_rate(3))
server.py
@@ -291,9 +291,16 @@
    return result
# 修复同花顺主站
def repair_ths_main_site(client):
    result = send_msg(client, {"action": "updateTHSSite"})
    result = json.loads(result)
    if result["code"] != 0:
        raise Exception(result["msg"])
if __name__ == "__main__":
    try:
        result = get_client_env_state(3)
        print(result)
        repair_ths_main_site(2)
    except Exception as e:
        print(str(e))
trade_manager.py
@@ -175,7 +175,7 @@
    if is_in_forbidden_trade_codes(code):
        raise Exception("禁止交易")
    trade_state = get_trade_state(code)
    if trade_state != TRADE_STATE_NOT_TRADE and trade_state != TRADE_STATE_BUY_CANCEL_SUCCESS:
    if trade_state != TRADE_STATE_NOT_TRADE and trade_state != TRADE_STATE_BUY_CANCEL_SUCCESS and trade_state != TRADE_STATE_BUY_CANCEL_ING:
        raise Exception("代码处于不可交易状态")
    money = get_available_money()
    if money is None:
@@ -303,7 +303,8 @@
    keys = redis_l2.keys("*{}*".format(code))
    for k in keys:
        redis_l2.delete(k)
        if (k.find("l2-") is None or k.find("l2-") < 0) and (k.find("big_data-") is None or k.find("big_data-") < 0):
            redis_l2.delete(k)
    redis_trade = redis_manager.RedisManager(2).getRedis()
    redis_trade.delete("trade-state-{}".format(code))
@@ -318,4 +319,4 @@
if __name__ == "__main__":
    # time_str = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    # print(time_str)
    __clear_data("000068")
    __clear_data("000593")