Administrator
2023-02-06 736e61b89e87f7e3c224feca25e94cda459b9ae6
H撤完善,修改代码文件目录
3个文件已删除
6 文件已重命名
15个文件已修改
2个文件已添加
5802 ■■■■■ 已修改文件
constant.py 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
data_export_util.py 4 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
gui.py 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
industry_codes_sort.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
juejin.py 13 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/cancel_buy_strategy.py 644 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_manager.py 199 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_manager_new.py 1089 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_util.py 376 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_log.py 12 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/safe_count_manager.py 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/transaction_progress.py 28 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_code_operate.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_data_manager.py 1257 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_data_manager_new.py 1974 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_data_test.py 73 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_data_util.py 15 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_trade_test.py 20 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
server.py 50 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ths_util.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/l2_trade_factor.py 8 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/l2_trade_util.py 补丁 | 查看 | 原始文档 | blame | 历史
trade/trade_data_manager.py 补丁 | 查看 | 原始文档 | blame | 历史
trade/trade_gui.py 4 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/trade_manager.py 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/trade_queue_manager.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
constant.py
@@ -14,6 +14,12 @@
S_CANCEL_SECOND_RATE = 0.69
S_CANCEL_THIRD_RATE = 0.59
# H撤比例
H_CANCEL_FIRST_RATE = 0.79
H_CANCEL_SECOND_RATE = 0.69
H_CANCEL_THIRD_RATE = 0.59
# h撤大单笔数
H_CANCEL_BUY_COUNT = 40
# H撤单比例
data_export_util.py
@@ -11,8 +11,6 @@
import l2_data_util
import log
import l2_data_manager
def export_l2_excel(code, date=None):
    # 获取L2的数据
@@ -194,6 +192,6 @@
if __name__ == "__main__":
    codes = ["002842"]
    codes = ["603660"]
    for code in codes:
        export_l2_excel(code)
gui.py
@@ -15,16 +15,15 @@
import redis_manager
import server
import settings
import trade_gui
from juejin import JueJinManager
from l2_code_operate import L2CodeOperate
from l2_trade_factor import L2TradeFactorUtil
from trade.l2_trade_factor import L2TradeFactorUtil
from ocr import ocr_server
from server import *
# 读取server进程的消息
from trade_data_manager import CodeActualPriceProcessor
from trade.trade_data_manager import CodeActualPriceProcessor
def __read_server_pipe(pipe):
industry_codes_sort.py
@@ -6,7 +6,7 @@
import global_util
import limit_up_time_manager
import trade_data_manager
from trade import trade_data_manager
__codeActualPriceProcessor = trade_data_manager.CodeActualPriceProcessor()
juejin.py
@@ -21,7 +21,6 @@
import gpcode_manager
import threading
import l2_trade_util
import server
import tool
@@ -29,12 +28,12 @@
import authority
import decimal
import trade_gui
from trade import trade_gui, l2_trade_util
from l2.cancel_buy_strategy import L2LimitUpSellStatisticUtil
from l2_code_operate import L2CodeOperate
import l2_data_manager_new
from log import logger_juejin_tick, logger_system
from trade_data_manager import CodeActualPriceProcessor
from trade_queue_manager import JueJinBuy1VolumnManager
from trade.trade_data_manager import CodeActualPriceProcessor
from trade.trade_queue_manager import JueJinBuy1VolumnManager
redisManager = redis_manager.RedisManager(0)
__jueJinBuy1VolumnManager = JueJinBuy1VolumnManager()
@@ -58,10 +57,8 @@
def init_data():
    # 删除之前的分钟级大单撤单数据
    l2_data_manager_new.AverageBigNumComputer.clear_data()
    # 删除所有的涨停卖数据
    l2_data_manager_new.L2LimitUpSellStatisticUtil.clear()
    L2LimitUpSellStatisticUtil.clear()
    # 重置所有的大单数据
    big_money_num_manager.reset_all()
    # 清除水下捞数据
l2/cancel_buy_strategy.py
@@ -6,15 +6,20 @@
# s级平均大单计算
# 计算范围到申报时间的那一秒
import json
import logging
import time
import big_money_num_manager
import constant
import gpcode_manager
import l2_data_log
import l2_data_util
import redis_manager
import tool
import trade_data_manager
from trade import trade_data_manager, trade_queue_manager, l2_trade_factor
from l2 import l2_log
from l2_data_manager import L2DataUtil, local_today_num_operate_map, load_l2_data, local_today_datas
from l2.l2_data_manager import L2DataUtil, local_today_num_operate_map, local_today_datas
from log import logger_buy_1_volumn
class SecondCancelBigNumComputer:
@@ -88,11 +93,12 @@
        return left_big_num
    @classmethod
    def need_cancel(cls, code, buy_single_index, buy_exec_index, start_index, end_index, total_data, need_cancel=True):
    def need_cancel(cls, code, buy_single_index, buy_exec_index, start_index, end_index, total_data, threadId,
                    need_cancel=True):
        # 只守护30s
        if tool.trade_time_sub(total_data[start_index]["val"]["time"], total_data[buy_exec_index]["val"]["time"]) > 30:
            return False, None
        l2_log.cancel_debug(code, "S级是否需要撤单,数据范围:{}-{} ", start_index, end_index)
        l2_log.cancel_debug(threadId, code, "S级是否需要撤单,数据范围:{}-{} ", start_index, end_index)
        if tool.trade_time_sub(total_data[end_index]["val"]["time"], total_data[buy_exec_index]["val"]["time"]) > 30:
            # 结束位置超过了执行位置30s,需要重新确认结束位置
@@ -171,7 +177,8 @@
                        if cancel_num / buy_num > cancel_rate_threshold:
                            return True, total_data[i]
        finally:
            l2_log.cancel_debug(code, "S级大单 范围:{}-{} 取消计算结果:{}/{}", start_index, end_index, cancel_num, buy_num)
            l2_log.cancel_debug(threadId, code, "S级大单 范围:{}-{} 取消计算结果:{}/{}", start_index, end_index, cancel_num,
                                buy_num)
            # 保存处理进度与数据
            cls.__save_compute_data(code, process_index, buy_num, cancel_num)
        return False, None
@@ -205,20 +212,40 @@
            return None
        return int(val)
    @classmethod
    def __save_watch_index_set(cls, code, datas):
        key = f"h_cancel_watch_indexs-{code}"
        cls.__getRedis().setex(key, tool.get_expire(), json.dumps(list(datas)))
    # 保存成交进度
    @classmethod
    def __get_watch_index_set(cls, code):
        key = f"h_cancel_watch_indexs-{code}"
        val = cls.__getRedis().get(key)
        if val is None:
            return None
        val = json.loads(val)
        return val
    # 保存结束位置
    @classmethod
    def __save_compute_data(cls, code, process_index, buy_num, cancel_num):
    def __save_compute_data(cls, code, process_index, cancel_num):
        key = "h_cancel_compute_data-{}".format(code)
        cls.__getRedis().setex(key, tool.get_expire(), json.dumps((process_index, buy_num, cancel_num)))
        cls.__getRedis().setex(key, tool.get_expire(), json.dumps((process_index, cancel_num)))
    @classmethod
    def __get_compute_data(cls, code):
        key = "h_cancel_compute_data-{}".format(code)
        val = cls.__getRedis().get(key)
        if val is None:
            return -1, 0, 0
            return -1, 0
        val = json.loads(val)
        return val[0], val[1], val[2]
        return val[0], val[1]
    @classmethod
    def __del_compute_data(cls, code):
        key = "h_cancel_compute_data-{}".format(code)
        cls.__getRedis().delete(key)
    @classmethod
    def __clear_data(cls, code):
@@ -234,124 +261,543 @@
            for k in keys:
                cls.__getRedis().delete(k)
    # 计算净大单
    @classmethod
    def __compute_left_big_num(cls, code, start_index, end_index, total_data):
        # 获取大单的最小手数
        left_big_num = 0
        for i in range(start_index, end_index + 1):
            data = total_data[i]
            val = data["val"]
            # 去除非大单
            if not l2_data_util.is_big_money(val):
                continue
            if L2DataUtil.is_limit_up_price_buy(val):
                left_big_num += val["num"] * data["re"]
            elif L2DataUtil.is_limit_up_price_buy_cancel(val):
                # 查询买入位置
                buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data,
                                                                                 local_today_num_operate_map.get(
                                                                                     code))
                if buy_index is not None and start_index <= buy_index <= end_index:
                    left_big_num -= val["num"] * data["re"]
                elif buy_index is None:
                    # 有部分撤销从而导致的无法溯源,这时就需要判断预估买入时间是否在a_start_index到a_end_index的时间区间
                    min_space, max_space = l2_data_util.compute_time_space_as_second(val["cancelTime"],
                                                                                     val["cancelTimeUnit"])
                    # 只判断S级撤销,只有s级撤销才有可能相等
                    if max_space - min_space <= 1:
                        buy_time = tool.trade_time_add_second(val["time"], 0 - min_space)
                        if int(total_data[start_index]["val"]["time"].replace(":", "")) <= int(
                                buy_time.replace(":", "")) <= int(
                            total_data[end_index]["val"]["time"].replace(":", "")):
                            left_big_num -= val["num"] * data["re"]
        return left_big_num
    @classmethod
    def need_cancel(cls, code, buy_single_index, buy_exec_index, start_index, end_index, total_data, need_cancel=True):
        # 只守护30s
        if tool.trade_time_sub(total_data[start_index]["val"]["time"], total_data[buy_exec_index]["val"]["time"]) > 30:
    def need_cancel(cls, code, buy_exec_index, start_index, end_index, total_data, threadId):
        # 守护30s以外的数据
        if tool.trade_time_sub(total_data[start_index]["val"]["time"], total_data[buy_exec_index]["val"]["time"]) <= 30:
            return False, None
        l2_log.cancel_debug(code, "S级是否需要撤单,数据范围:{}-{} ", start_index, end_index)
        watch_indexs = cls.__get_watch_index_set(code)
        watch_indexs_dict = {}
        # 监听的总数
        total_nums = 0
        for indexs in watch_indexs:
            watch_indexs_dict[indexs[0]] = indexs
            total_nums += total_data[indexs[0]]["val"]["num"] * indexs[1]
        if tool.trade_time_sub(total_data[end_index]["val"]["time"], total_data[buy_exec_index]["val"]["time"]) > 30:
            # 结束位置超过了执行位置30s,需要重新确认结束位置
            for i in range(end_index, start_index - 1, -1):
                if total_data[end_index]["val"]["time"] != total_data[i]["val"]["time"]:
                    end_index = i
                    break
        if watch_indexs is None:
            l2_log.cancel_debug(threadId, code, "H撤没获取到监听范围数据")
            return False, None
        # 获取处理进度
        process_index_old, buy_num, cancel_num = cls.__get_compute_data(code)
        processed_index, cancel_num = cls.__get_compute_data(code)
        # 如果start_index与buy_single_index相同,即是下单后的第一次计算
        # 需要查询买入信号之前的同1s是否有涨停撤的数据
        process_index = -1
        if buy_single_index == start_index:
            # 第1次计算需要计算买入信号-执行位的净值
            left_big_num = cls.__compute_left_big_num(code, buy_single_index, buy_exec_index, total_data)
            buy_num += left_big_num
            # 设置买入信号-买入执行位的数据不需要处理
            start_index = end_index + 1
            process_index = end_index
        l2_log.cancel_debug(threadId, code, "H级是否需要撤单,数据范围:{}-{} ", start_index, end_index)
        # 获取下单次数
        place_order_count = trade_data_manager.placeordercountmanager.get_place_order_count(code)
        process_index = start_index
        try:
            for i in range(start_index, end_index + 1):
                data = total_data[i]
                val = data["val"]
                if process_index_old >= i:
                    # 已经处理过的数据不需要处理
                    continue
                if not l2_data_util.is_big_money(val):
                if i <= processed_index:
                    # 已经处理过了
                    continue
                process_index = i
                data = total_data[i]
                val = data["val"]
                if L2DataUtil.is_limit_up_price_buy_cancel(val):
                    # 查询买入位置
                    buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data,
                                                                                     local_today_num_operate_map.get(
                                                                                         code))
                    if buy_index is not None and buy_single_index <= buy_index:
                    if buy_index is not None and buy_index in watch_indexs_dict:
                        cancel_num += buy_data["re"] * int(buy_data["val"]["num"])
                    elif buy_index is None:
                        # 有部分撤销从而导致的无法溯源,这时就需要判断预估买入时间是否在a_start_index到a_end_index的时间区间
                        min_space, max_space = l2_data_util.compute_time_space_as_second(val["cancelTime"],
                                                                                         val["cancelTimeUnit"])
                        # 只判断S级撤销,只有s级撤销才有可能相等
                        if max_space - min_space <= 1:
                            buy_time = tool.trade_time_add_second(val["time"], 0 - min_space)
                            if int(total_data[buy_single_index]["val"]["time"].replace(":", "")) <= int(
                                    buy_time.replace(":", "")):
                                cancel_num += buy_data["re"] * int(buy_data["val"]["num"])
                    # 保存数据
                    if need_cancel:
                        cancel_rate_threshold = constant.S_CANCEL_FIRST_RATE
                        place_order_count = trade_data_manager.placeordercountmanager.get_place_order_count(code)
                        cancel_rate_threshold = constant.H_CANCEL_FIRST_RATE
                        if place_order_count <= 1:
                            cancel_rate_threshold = constant.S_CANCEL_FIRST_RATE
                            cancel_rate_threshold = constant.H_CANCEL_FIRST_RATE
                        elif place_order_count <= 2:
                            cancel_rate_threshold = constant.S_CANCEL_SECOND_RATE
                            cancel_rate_threshold = constant.H_CANCEL_SECOND_RATE
                        else:
                            cancel_rate_threshold = constant.S_CANCEL_THIRD_RATE
                        if cancel_num / buy_num > cancel_rate_threshold:
                            cancel_rate_threshold = constant.H_CANCEL_THIRD_RATE
                        if cancel_num / total_nums > cancel_rate_threshold:
                            return True, total_data[i]
        finally:
            l2_log.cancel_debug(code, "S级大单 范围:{}-{} 取消计算结果:{}/{}", start_index, end_index, cancel_num, buy_num)
            l2_log.cancel_debug(threadId, code, "H级撤单计算结果 范围:{}-{} 处理进度:{} 取消计算结果:{}/{}", start_index, end_index,
                                process_index, cancel_num,
                                total_nums)
            # 保存处理进度与数据
            cls.__save_compute_data(code, process_index, buy_num, cancel_num)
            cls.__save_compute_data(code, process_index, cancel_num)
        return False, None
    # 下单成功
    @classmethod
    def place_order_success(cls, code, buy_single_index, buy_exec_index, total_data):
    def place_order_success(cls, code, buy_single_index, buy_exec_index, total_data, local_today_num_operate_map):
        cls.__clear_data(code)
        cls.set_trade_progress(code, buy_exec_index)
        cls.set_trade_progress(code, buy_exec_index, total_data, local_today_num_operate_map)
    # 设置成交进度
    @classmethod
    def set_trade_progress(cls, code, index):
        l2_log.cancel_debug(code, "成交进度:{}", index)
    def set_trade_progress(cls, code, index, total_data, local_today_num_operate_map):
        l2_log.cancel_debug(0, code, "成交进度:{}", index)
        # 成交进度
        cls.__save_trade_progress(code, index)
        cls.compute_watch_end_index(code, total_data, local_today_num_operate_map)
    @classmethod
    def compute_watch_end_index(cls, code, total_data, local_today_num_operate_map):
        trade_progress_index = cls.__get_trade_progress(code)
        threshold_money = l2_trade_factor.L2TradeFactorUtil.compute_m_value(code) * 2
        # 最小值1500万
        if threshold_money < 15000000:
            threshold_money = 15000000
        limit_up_price = gpcode_manager.get_limit_up_price(code)
        threshold_num = round(threshold_money / (limit_up_price * 100))
        if trade_progress_index is None:
            raise Exception("尚未获取到成交进度")
        total_num = 0
        watch_set = set()
        for i in range(trade_progress_index + 1, total_data[-1]["index"] + 1):
            data = total_data[i]
            val = data["val"]
            if L2DataUtil.is_limit_up_price_buy(val):
                total_num += val["num"] * data["re"]
                # 判断当前买是否已经买撤
                cancel_datas = local_today_num_operate_map.get(
                    "{}-{}-{}".format(val["num"], "1", val["price"]))
                canceled = False
                if cancel_datas:
                    for cancel_data in cancel_datas:
                        buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(cancel_data,
                                                                                         local_today_num_operate_map)
                        if buy_index == i:
                            # 已经买撤
                            total_num -= buy_data["val"]["num"] * cancel_data["re"]
                            canceled = True
                            if data["re"] - cancel_data["re"] > 0:
                                watch_set.add((i, data["re"] - cancel_data["re"]))
                            break
                if not canceled:
                    watch_set.add((i, data["re"]))
                # 判断是否达到阈值
                if total_num >= threshold_num:
                    l2_log.cancel_debug(0, code, "获取到H撤监听数据:{}", json.dumps(watch_set))
                    break
        # 保存计算范围
        cls.__save_watch_index_set(code, watch_set)
        # 删除原来的计算数据
        cls.__del_compute_data(code)
    @classmethod
    def get_watch_indexs(cls, code):
        return cls.__get_watch_index_set(code)
# --------------------------------封单额变化撤------------------------
# 涨停封单额统计
class L2LimitUpMoneyStatisticUtil:
    _redisManager = redis_manager.RedisManager(1)
    _thsBuy1VolumnManager = trade_queue_manager.THSBuy1VolumnManager()
    @classmethod
    def __get_redis(cls):
        return cls._redisManager.getRedis()
    # 设置l2的每一秒涨停封单额数据
    @classmethod
    def __set_l2_second_money_record(cls, code, time, num, from_index, to_index):
        old_num, old_from, old_to = cls.__get_l2_second_money_record(code, time)
        if old_num is None:
            old_num = num
            old_from = from_index
            old_to = to_index
        else:
            old_num += num
            old_to = to_index
        key = "l2_limit_up_second_money-{}-{}".format(code, time.replace(":", ""))
        cls.__get_redis().setex(key, tool.get_expire(), json.dumps((old_num, old_from, old_to)))
    @classmethod
    def __get_l2_second_money_record(cls, code, time):
        key = "l2_limit_up_second_money-{}-{}".format(code, time.replace(":", ""))
        val = cls.__get_redis().get(key)
        return cls.__format_second_money_record_val(val)
    @classmethod
    def __format_second_money_record_val(cls, val):
        if val is None:
            return None, None, None
        val = json.loads(val)
        return val[0], val[1], val[2]
    @classmethod
    def __get_l2_second_money_record_keys(cls, code, time_regex):
        key = "l2_limit_up_second_money-{}-{}".format(code, time_regex)
        keys = cls.__get_redis().keys(key)
        return keys
    # 设置l2最新的封单额数据
    @classmethod
    def __set_l2_latest_money_record(cls, code, index, num):
        key = "l2_limit_up_money-{}".format(code)
        cls.__get_redis().setex(key, tool.get_expire(), json.dumps((num, index)))
    # 返回数量,索引
    @classmethod
    def __get_l2_latest_money_record(cls, code):
        key = "l2_limit_up_money-{}".format(code)
        result = cls.__get_redis().get(key)
        if result:
            result = json.loads(result)
            return result[0], result[1]
        else:
            return 0, -1
    # 矫正数据
    # 矫正方法为取矫正时间两侧的秒分布数据,用于确定计算结束坐标
    @classmethod
    def verify_num(cls, code, num, time_str):
        # 记录买1矫正日志
        logger_buy_1_volumn.info("涨停封单量矫正:代码-{} 量-{} 时间-{}", code, num, time_str)
        time_ = time_str.replace(":", "")
        key = None
        # 获取矫正时间前1分钟的数据
        keys = []
        for i in range(0, 3600):
            temp_time = tool.trade_time_add_second(time_str, 0 - i)
            # 只处理9:30后的数据
            if int(temp_time.replace(":", "")) < int("093000"):
                break
            keys_ = cls.__get_l2_second_money_record_keys(code, temp_time.replace(":", ""))
            if len(keys_) > 0:
                keys.append(keys_[0])
            if len(keys) >= 1:
                break
        keys.sort(key=lambda tup: int(tup.split("-")[-1]))
        if len(keys) > 0:
            key = keys[0]
            val = cls.__get_redis().get(key)
            old_num, old_from, old_to = cls.__format_second_money_record_val(val)
            end_index = old_to
            # 保存最近的数据
            cls.__set_l2_latest_money_record(code, end_index, num)
            logger_buy_1_volumn.info("涨停封单量矫正成功:代码-{} 位置-{} 量-{}", code, end_index, num)
        else:
            logger_buy_1_volumn.info("涨停封单量矫正失败:代码-{} 时间-{} 量-{}", code, time_str, num)
        # 取消此种方法
        #
        # for i in range(4, -2, -2):
        #     # 获取本(分钟/小时/天)内秒分布数据
        #     time_regex = "{}*".format(time_[:i])
        #     keys_ = cls.__get_l2_second_money_record_keys(code, time_regex)
        #     if keys_ and len(keys_) > 1:
        #         # 需要排序
        #         keys = []
        #         for k in keys_:
        #             keys.append(k)
        #         keys.sort(key=lambda tup: int(tup.split("-")[-1]))
        #         # if i == 4:
        #         #    keys=keys[:5]
        #         # 有2个元素
        #         for index in range(0, len(keys) - 1):
        #             time_1 = keys[index].split("-")[-1]
        #             time_2 = keys[index + 1].split("-")[-1]
        #             if int(time_1) <= int(time_) <= int(time_2):
        #                 # 在此时间范围内
        #                 if time_ == time_2:
        #                     key = keys[index + 1]
        #                 else:
        #                     key = keys[index]
        #                 break
        #         if key:
        #             break
        # # 如果没有找到匹配的区间
        # if not key:
        #     # 最后一条数据的时间为相应的区间
        #     total_datas = local_today_datas[code]
        #
        # if key:
        #     val = cls.__get_redis().get(key)
        #     old_num, old_from, old_to = cls.__format_second_money_record_val(val)
        #     end_index = old_to
        #     # 保存最近的数据
        #     cls.__set_l2_latest_money_record(code, end_index, num)
        #     logger_buy_1_volumn.info("涨停封单量矫正结果:代码-{} 位置-{} 量-{}", code, end_index, num)
    # 计算量,用于涨停封单量的计算
    @classmethod
    def __compute_num(cls, code, data, buy_single_data):
        if L2DataUtil.is_limit_up_price_buy_cancel(data["val"]) or L2DataUtil.is_sell(data["val"]):
            # 涨停买撤与卖
            return 0 - int(data["val"]["num"]) * data["re"]
        else:
            # 卖撤
            if L2DataUtil.is_sell_cancel(data["val"]):
                # 卖撤的买数据是否在买入信号之前,如果在之前就不计算,不在之前就计算
                if l2_data_util.is_sell_index_before_target(data, buy_single_data,
                                                            local_today_num_operate_map.get(code)):
                    return 0
            return int(data["val"]["num"]) * data["re"]
    @classmethod
    def clear(cls, code):
        key = "l2_limit_up_money-{}".format(code)
        cls.__get_redis().delete(key)
    # 返回取消的标志数据
    # with_cancel 是否需要判断是否撤销
    @classmethod
    def process_data(cls, random_key, code, start_index, end_index, buy_single_begin_index, buy_exec_index,
                     with_cancel=True):
        if buy_single_begin_index is None or buy_exec_index is None:
            return None, None
        start_time = round(time.time() * 1000)
        total_datas = local_today_datas[code]
        time_dict_num = {}
        # 记录计算的坐标
        time_dict_num_index = {}
        # 坐标-量的map
        num_dict = {}
        # 统计时间分布
        time_dict = {}
        for i in range(start_index, end_index + 1):
            data = total_datas[i]
            val = data["val"]
            time_ = val["time"]
            if time_ not in time_dict:
                time_dict[time_] = i
        for i in range(start_index, end_index + 1):
            data = total_datas[i]
            val = data["val"]
            time_ = val["time"]
            if time_ not in time_dict_num:
                time_dict_num[time_] = 0
                time_dict_num_index[time_] = {"s": i, "e": i}
            time_dict_num_index[time_]["e"] = i
            num = cls.__compute_num(code, data, total_datas[buy_single_begin_index])
            num_dict[i] = num
            time_dict_num[time_] = time_dict_num[time_] + num
        for t_ in time_dict_num:
            cls.__set_l2_second_money_record(code, t_, time_dict_num[t_], time_dict_num_index[t_]["s"],
                                             time_dict_num_index[t_]["e"])
        print("保存涨停封单额时间:", round(time.time() * 1000) - start_time)
        # 累计最新的金额
        total_num, index = cls.__get_l2_latest_money_record(code)
        record_msg = f"同花顺买1信息 {total_num},{index}"
        if index == -1:
            # 没有获取到最新的矫正封单额,需要从买入信号开始点计算
            index = buy_single_begin_index - 1
            total_num = 0
        cancel_index = None
        cancel_msg = None
        # 待计算量
        limit_up_price = gpcode_manager.get_limit_up_price(code)
        min_volumn = round(10000000 / (limit_up_price * 100))
        min_volumn_big = min_volumn * 5
        # 不同时间的数据开始坐标
        time_start_index_dict = {}
        # 数据时间分布
        time_list = []
        # 到当前时间累积的买1量
        time_total_num_dict = {}
        # 大单撤销笔数
        cancel_big_num_count = 0
        buy_exec_time = tool.get_time_as_second(total_datas[buy_exec_index]["val"]["time"])
        # 获取最大封单额
        max_buy1_volume = cls._thsBuy1VolumnManager.get_max_buy1_volume(code)
        # 从同花顺买1矫正过后的位置开始计算,到end_index结束
        for i in range(index + 1, end_index + 1):
            data = total_datas[i]
            # 统计撤销数量
            try:
                if big_money_num_manager.is_big_num(data["val"]):
                    if L2DataUtil.is_limit_up_price_buy_cancel(data["val"]):
                        cancel_big_num_count += int(data["re"])
                        # TODO 大量重复的工作需要处理,可以暂存在内存中,从而减少计算
                        # 获取是否在买入执行信号周围2s
                        buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data,
                                                                                         local_today_num_operate_map.get(
                                                                                             code))
                        if buy_index is not None and buy_data is not None:
                            # 相差1s
                            buy_time = buy_data["val"]["time"]
                            if abs(buy_exec_time - tool.get_time_as_second(buy_time)) < 2:
                                cancel_big_num_count += int(data["re"])
                    elif L2DataUtil.is_limit_up_price_buy(data["val"]):
                        cancel_big_num_count -= int(data["re"])
            except Exception as e:
                logging.exception(e)
            threshold_rate = 0.5
            if cancel_big_num_count >= 0:
                if cancel_big_num_count < 10:
                    threshold_rate = threshold_rate - cancel_big_num_count * 0.01
                else:
                    threshold_rate = threshold_rate - 10 * 0.01
            time_ = data["val"]["time"]
            if time_ not in time_start_index_dict:
                # 记录每一秒的开始位置
                time_start_index_dict[time_] = i
                # 记录时间分布
                time_list.append(time_)
                # 上一段时间的总数
                time_total_num_dict[time_] = total_num
            exec_time_offset = tool.trade_time_sub(data["val"]["time"], total_datas[buy_exec_index]["val"]["time"])
            val = num_dict.get(i)
            if val is None:
                val = cls.__compute_num(code, data, total_datas[buy_single_begin_index])
            total_num += val
            # 在处理数据的范围内,就需要判断是否要撤单了
            if start_index <= i <= end_index:
                # 如果是减小项
                if val < 0:
                    # 当前量小于最大量的24%则需要取消
                    if exec_time_offset >= 30:
                        if total_num <= min_volumn_big and max_buy1_volume * 0.24 > total_num:
                            cancel_index = i
                            cancel_msg = "封板额小于最高封板额的24% {}/{}".format(total_num, max_buy1_volume)
                            break
                    # 累计封单金额小于1000万
                    if total_num < min_volumn:
                        # 与执行位相隔>=5s时规则生效
                        if exec_time_offset >= 5:
                            cancel_index = i
                            cancel_msg = "封单金额小于1000万,为{}".format(total_num)
                            break
                    # 相邻2s内的数据减小50%
                    # 上1s的总数
                    last_second_total_volumn = time_total_num_dict.get(time_list[-1])
                    if last_second_total_volumn > 0 and (
                            last_second_total_volumn - total_num) / last_second_total_volumn >= threshold_rate:
                        # 与执行位相隔>=5s时规则生效
                        if exec_time_offset >= 5:
                            # 相邻2s内的数据减小50%
                            cancel_index = i
                            cancel_msg = "相邻2s({})内的封单量减小50%({}->{})".format(time_, last_second_total_volumn,
                                                                             total_num)
                        break
                    # 记录中有上2个数据
                    if len(time_list) >= 2:
                        # 倒数第2个数据
                        last_2_second_total_volumn = time_total_num_dict.get(time_list[-2])
                        if last_2_second_total_volumn > 0:
                            if last_2_second_total_volumn > last_second_total_volumn > total_num:
                                dif = last_2_second_total_volumn - total_num
                                if dif / last_2_second_total_volumn >= threshold_rate:
                                    # 与执行位相隔>=5s时规则生效
                                    if exec_time_offset >= 5:
                                        cancel_index = i
                                        cancel_msg = "相邻3s({})内的封单量(第3秒 与 第1的 减小比例)减小50%({}->{}->{})".format(time_,
                                                                                                             last_2_second_total_volumn,
                                                                                                             last_second_total_volumn,
                                                                                                             total_num)
                                    break
        if not with_cancel:
            cancel_index = None
        print("封单额计算时间:", round(time.time() * 1000) - start_time)
        process_end_index = end_index
        if cancel_index:
            process_end_index = cancel_index
        # 保存最新累计金额
        # cls.__set_l2_latest_money_record(code, process_end_index, total_num)
        l2_data_log.l2_time(code, random_key, round(time.time() * 1000) - start_time,
                            "l2数据封单额计算时间",
                            False)
        if cancel_index:
            l2_log.cancel_debug(random_key, code, "数据处理位置:{}-{},{},最终买1为:{}", start_index, end_index, record_msg,
                                total_num)
            return total_datas[cancel_index], cancel_msg
        return None, None
# ---------------------------------板上卖-----------------------------
# 涨停卖统计
class L2LimitUpSellStatisticUtil:
    _redisManager = redis_manager.RedisManager(0)
    @classmethod
    def __get_redis(cls):
        return cls._redisManager.getRedis()
    # 新增卖数据
    @classmethod
    def __incre_sell_data(cls, code, num):
        key = "limit_up_sell_num-{}".format(code)
        cls.__get_redis().incrby(key, num)
    @classmethod
    def __get_sell_data(cls, code):
        key = "limit_up_sell_num-{}".format(code)
        val = cls.__get_redis().get(key)
        if val is None:
            return 0
        return int(val)
    @classmethod
    def __save_process_index(cls, code, index):
        key = "limit_up_sell_index-{}".format(code)
        cls.__get_redis().setex(key, tool.get_expire(), index)
    @classmethod
    def __get_process_index(cls, code):
        key = "limit_up_sell_index-{}".format(code)
        val = cls.__get_redis().get(key)
        if val is None:
            return -1
        return int(val)
    # 清除数据,当取消成功与买入之前需要清除数据
    @classmethod
    def delete(cls, code):
        key = "limit_up_sell_num-{}".format(code)
        cls.__get_redis().delete(key)
        key = "limit_up_sell_index-{}".format(code)
        cls.__get_redis().delete(key)
    @classmethod
    def clear(cls):
        keys = cls.__get_redis().keys("limit_up_sell_num-*")
        for k in keys:
            cls.__get_redis().delete(k)
    # 处理数据,返回是否需要撤单
    # 处理范围:买入执行位-当前最新位置
    @classmethod
    def process(cls, random_key, code, start_index, end_index, buy_exec_index):
        # 获取涨停卖的阈值
        limit_up_price = gpcode_manager.get_limit_up_price(code)
        zyltgb = l2_trade_factor.L2TradeFactorUtil.get_zyltgb(code)
        # 大于自由流通市值的4.8%
        threshold_num = int(zyltgb * 0.048) // (limit_up_price * 100)
        total_num = cls.__get_sell_data(code)
        cancel_index = None
        process_index = cls.__get_process_index(code)
        total_datas = local_today_datas.get(code)
        for i in range(start_index, end_index + 1):
            if i < buy_exec_index:
                continue
            if i <= process_index:
                continue
            if L2DataUtil.is_limit_up_price_sell(total_datas[i]["val"]) or L2DataUtil.is_sell(total_datas[i]["val"]):
                num = int(total_datas[i]["val"]["num"])
                cls.__incre_sell_data(code, num)
                total_num += num
                if total_num > threshold_num:
                    cancel_index = i
                    break
        if cancel_index is not None:
            process_index = cancel_index
        else:
            process_index = end_index
        l2_log.cancel_debug(random_key, code, "板上卖信息:计算位置:{}-{} 板上卖数据{}/{}", start_index, end_index, total_num,
                            threshold_num)
        cls.__save_process_index(code, process_index)
        if cancel_index is not None:
            return total_datas[cancel_index], "板上卖的手数{} 超过{}".format(total_num, threshold_num)
        return None, ""
l2/l2_data_manager.py
New file
@@ -0,0 +1,199 @@
"""
L2的数据处理
"""
import json
import redis_manager
import tool
from log import logger_l2_trade_buy
_redisManager = redis_manager.RedisManager(1)
class L2DataException(Exception):
    # 价格不匹配
    CODE_PRICE_ERROR = 1
    # 无收盘价
    CODE_NO_CLOSE_PRICE = 2
    def __init__(self, code, msg):
        super().__init__(self)
        self.code = code
        self.msg = msg
    def __str__(self):
        return self.msg
    def get_code(self):
        return self.code
# 交易点管理器,用于管理买入点;买撤点;距离买入点的净买入数据;距离买撤点的买撤数据
class TradePointManager:
    @staticmethod
    def __get_redis():
        return _redisManager.getRedis()
    # 删除买入点数据
    @staticmethod
    def delete_buy_point(code):
        redis = TradePointManager.__get_redis()
        redis.delete("buy_compute_index_info-{}".format(code))
    # 获取买入点信息
    # 返回数据为:买入点 累计纯买额 已经计算的数据索引
    @staticmethod
    def get_buy_compute_start_data(code):
        redis = TradePointManager.__get_redis()
        _key = "buy_compute_index_info-{}".format(code)
        _data_json = redis.get(_key)
        if _data_json is None:
            return None, None, None, 0, 0, []
        _data = json.loads(_data_json)
        return _data[0], _data[1], _data[2], _data[3], _data[4], _data[5]
    # 设置买入点的值
    # buy_single_index 买入信号位
    # buy_exec_index 买入执行位
    # compute_index 计算位置
    # nums 累计纯买额
    @staticmethod
    def set_buy_compute_start_data(code, buy_single_index, buy_exec_index, compute_index, nums, count, max_num_sets):
        redis = TradePointManager.__get_redis()
        expire = tool.get_expire()
        _key = "buy_compute_index_info-{}".format(code)
        if buy_single_index is not None:
            redis.setex(_key, expire,
                        json.dumps((buy_single_index, buy_exec_index, compute_index, nums, count, list(max_num_sets))))
        else:
            _buy_single_index, _buy_exec_index, _compute_index, _nums, _count, _max_num_index = TradePointManager.get_buy_compute_start_data(
                code)
            redis.setex(_key, expire,
                        json.dumps((_buy_single_index, buy_exec_index, compute_index, nums, count, list(max_num_sets))))
    # 获取撤买入开始计算的信息
    # 返回数据的内容为:撤销点索引 撤买纯买额 计算的数据索引
    @staticmethod
    def get_buy_cancel_single_pos(code):
        redis = TradePointManager.__get_redis()
        info = redis.get("buy_cancel_single_pos-{}".format(code))
        if info is None:
            return None
        else:
            return int(info)
    # 设置买撤点信息
    # buy_num 纯买额  computed_index计算到的下标  index撤买信号起点
    @classmethod
    def set_buy_cancel_single_pos(cls, code, index):
        redis = TradePointManager.__get_redis()
        expire = tool.get_expire()
        redis.setex("buy_cancel_single_pos-{}".format(code), expire, index)
    # 删除买撤点数据
    @classmethod
    def delete_buy_cancel_point(cls, code):
        redis = TradePointManager.__get_redis()
        redis.delete("buy_cancel_single_pos-{}".format(code))
    # 设置买撤纯买额
    @classmethod
    def set_compute_info_for_cancel_buy(cls, code, index, nums):
        redis = TradePointManager.__get_redis()
        expire = tool.get_expire()
        redis.setex("compute_info_for_cancel_buy-{}".format(code), expire, json.dumps((index, nums)))
        logger_l2_trade_buy.info("{}保存撤单纯买额信息:{},{}", code, index, nums)
    # 获取买撤纯买额计算信息
    @classmethod
    def get_compute_info_for_cancel_buy(cls, code):
        redis = TradePointManager.__get_redis()
        info = redis.get("compute_info_for_cancel_buy-{}".format(code))
        if info is None:
            return None, 0
        else:
            info = json.loads(info)
            return info[0], info[1]
    @classmethod
    def delete_compute_info_for_cancel_buy(cls, code):
        redis = TradePointManager.__get_redis()
        redis.delete("compute_info_for_cancel_buy-{}".format(code))
    # 从买入信号开始设置涨停买与涨停撤的单数
    @classmethod
    def set_count_info_for_cancel_buy(cls, code, index, buy_count, cancel_count):
        redis = TradePointManager.__get_redis()
        expire = tool.get_expire()
        redis.setex("count_info_for_cancel_buy-{}".format(code), expire, json.dumps((index, buy_count, cancel_count)))
        logger_l2_trade_buy.info("{}保存撤单纯买额信息:{},{}", code, index, buy_count, cancel_count)
    # 获取买撤纯买额计算信息
    @classmethod
    def get_count_info_for_cancel_buy(cls, code):
        redis = TradePointManager.__get_redis()
        info = redis.get("count_info_for_cancel_buy-{}".format(code))
        if info is None:
            return None, 0, 0
        else:
            info = json.loads(info)
            return info[0], info[1], info[2]
    @classmethod
    def delete_count_info_for_cancel_buy(cls, code):
        redis = TradePointManager.__get_redis()
        redis.delete("count_info_for_cancel_buy-{}".format(code))
# 清除l2数据
def clear_l2_data(code):
    redis_l2 = redis_manager.RedisManager(1).getRedis()
    keys = redis_l2.keys("l2-{}-*".format(code))
    for k in keys:
        redis_l2.delete(k)
    redis_l2.delete("l2-data-latest-{}".format(code))
second_930 = 9 * 3600 + 30 * 60 + 0
#  初始化l2固定代码库
def init_l2_fixed_codes():
    key = "l2-fixed-codes"
    redis = _redisManager.getRedis()
    count = redis.scard(key)
    if count > 0:
        redis.delete(key)
    redis.sadd(key, "000000")
    redis.expire(key, tool.get_expire())
# 移除l2固定监控代码
def remove_from_l2_fixed_codes(code):
    key = "l2-fixed-codes"
    redis = _redisManager.getRedis()
    redis.srem(key, code)
# 添加代码到L2固定监控
def add_to_l2_fixed_codes(code):
    key = "l2-fixed-codes"
    redis = _redisManager.getRedis()
    redis.sadd(key, code)
    redis.expire(key, tool.get_expire())
# 是否在l2固定监控代码中
def is_in_l2_fixed_codes(code):
    key = "l2-fixed-codes"
    redis = _redisManager.getRedis()
    return redis.sismember(key, code)
if __name__ == "__main__":
    clear_l2_data("603912")
l2/l2_data_manager_new.py
New file
@@ -0,0 +1,1089 @@
import logging
import random
import time as t
import big_money_num_manager
import code_data_util
import constant
import global_util
import gpcode_manager
import industry_codes_sort
import l2_data_log
import l2_data_util
import l2_trade_test
import limit_up_time_manager
import redis_manager
import ths_industry_util
import tool
from trade import trade_data_manager, trade_manager, trade_queue_manager, l2_trade_factor, l2_trade_util
from l2 import safe_count_manager, l2_data_manager, l2_data_util
from l2.cancel_buy_strategy import SecondCancelBigNumComputer, HourCancelBigNumComputer, L2LimitUpMoneyStatisticUtil, \
    L2LimitUpSellStatisticUtil
from l2.l2_data_manager import L2DataException, TradePointManager
from l2.l2_data_util import local_today_datas, L2DataUtil, load_l2_data, local_today_num_operate_map, local_latest_datas
from log import logger_l2_trade, logger_l2_trade_cancel, logger_l2_trade_buy, logger_l2_process, logger_l2_error
# TODO l2数据管理
from trade.trade_data_manager import CodeActualPriceProcessor
class L2DataManager:
    # 格式化数据
    def format_data(self, datas):
        format_datas = []
        for data in datas:
            format_datas.append({"val": data, "re": 1})
        return format_datas
    # 获取新增数据
    def get_add_datas(self, format_datas):
        pass
    # 从数据库加载数据
    def load_data(self, code=None, force=False):
        pass
    # 保存数据
    def save_datas(self, add_datas, datas):
        pass
# m值大单处理
class L2BigNumForMProcessor:
    def __init__(self):
        self._redis_manager = redis_manager.RedisManager(1)
    def __get_redis(self):
        return self._redis_manager.getRedis()
    # 保存计算开始位置
    def set_begin_pos(self, code, index):
        if self.__get_begin_pos(code) is None:
            # 保存位置
            key = "m_big_money_begin-{}".format(code)
            self.__get_redis().setex(key, tool.get_expire(), index)
    # 获取计算开始位置
    def __get_begin_pos(self, code):
        key = "m_big_money_begin-{}".format(code)
        val = self.__get_redis().get(key)
        if val is None:
            return None
        return int(val)
    # 清除已经处理的数据
    def clear_processed_end_index(self, code):
        key = "m_big_money_process_index-{}".format(code)
        self.__get_redis().delete(key)
    # 添加已经处理过的单
    def __set_processed_end_index(self, code, index):
        key = "m_big_money_process_index-{}".format(code)
        self.__get_redis().setex(key, tool.get_expire(), index)
    # 是否已经处理过
    def __get_processed_end_index(self, code):
        key = "m_big_money_process_index-{}".format(code)
        val = self.__get_redis().get(key)
        if val is None:
            return None
        return int(val)
    # 处理大单
    def process(self, code, start_index, end_index, limit_up_price):
        begin_pos = self.__get_begin_pos(code)
        if begin_pos is None:
            # 没有获取到开始买入信号
            return
        # 上次处理到的坐标
        processed_index = self.__get_processed_end_index(code)
        if processed_index is None:
            processed_index = 0
        if processed_index >= end_index:
            return
        start_time = round(t.time() * 1000)
        total_datas = local_today_datas[code]
        num_splites = [round(5000 / limit_up_price), round(10000 / limit_up_price), round(20000 / limit_up_price),
                       round(30000 / limit_up_price)]
        total_num = 0
        for i in range(max(start_index, processed_index), end_index + 1):
            data = total_datas[i]
            if not L2DataUtil.is_limit_up_price_buy_cancel(data["val"]) and not L2DataUtil.is_limit_up_price_buy(
                    data["val"]):
                continue
            # 如果是涨停买撤信号需要看数据位置是否比开始处理时间早
            if L2DataUtil.is_limit_up_price_buy_cancel(data["val"]):
                # 获取买入信号
                buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(total_datas[i],
                                                                                 local_today_num_operate_map.get(code))
                if buy_index is not None and buy_index < begin_pos:
                    continue
            # 计算成交金额
            num = int(data["val"]["num"])
            temp = 0
            if num < num_splites[0]:
                pass
            elif num < num_splites[1]:
                temp = 1
            elif num < num_splites[2]:
                temp = round(4 / 3, 3)
            elif num < num_splites[3]:
                temp = 2
            else:
                temp = 4
            count = int(temp * data["re"] * 1000)
            if L2DataUtil.is_limit_up_price_buy_cancel(data["val"]):
                count = 0 - count
            total_num += count
        self.__set_processed_end_index(code, end_index)
        big_money_num_manager.add_num(code, total_num)
        print("m值大单计算范围:{}-{}  时间:{}".format(max(start_index, processed_index), end_index,
                                             round(t.time() * 1000) - start_time))
class L2TradeDataProcessor:
    unreal_buy_dict = {}
    random_key = {}
    l2BigNumForMProcessor = L2BigNumForMProcessor()
    __codeActualPriceProcessor = CodeActualPriceProcessor()
    buy1PriceManager = trade_queue_manager.Buy1PriceManager()
    __ths_l2_trade_queue_manager = trade_queue_manager.thsl2tradequeuemanager()
    __thsBuy1VolumnManager = trade_queue_manager.THSBuy1VolumnManager()
    __buyL2SafeCountManager = safe_count_manager.BuyL2SafeCountManager()
    @classmethod
    def debug(cls, code, content, *args):
        logger_l2_trade.debug(("thread-id={} code={}  ".format(cls.random_key[code], code) + content).format(*args))
    @classmethod
    def cancel_debug(cls, code, content, *args):
        logger_l2_trade_cancel.debug(
            ("thread-id={} code={}  ".format(cls.random_key[code], code) + content).format(*args))
    @classmethod
    def buy_debug(cls, code, content, *args):
        logger_l2_trade_buy.debug(
            ("thread-id={} code={}  ".format(cls.random_key[code], code) + content).format(*args))
    @classmethod
    # 数据处理入口
    # datas: 本次截图数据
    # capture_timestamp:截图时间戳
    def process(cls, code, datas, capture_timestamp, do_id):
        cls.random_key[code] = do_id
        __start_time = round(t.time() * 1000)
        try:
            if len(datas) > 0:
                # 判断价格区间是否正确
                if not code_data_util.is_same_code_with_price(code, float(datas[0]["val"]["price"])):
                    raise L2DataException(L2DataException.CODE_PRICE_ERROR,
                                          "股价不匹配 code-{} price-{}".format(code, datas[0]["val"]["price"]))
                # 加载历史数据
                l2_data_util.load_l2_data(code)
                # 纠正数据
                datas = l2_data_util.L2DataUtil.correct_data(code,local_latest_datas.get(code), datas)
                _start_index = 0
                if local_today_datas.get(code) is not None and len(
                        local_today_datas[code]) > 0:
                    _start_index = local_today_datas[code][-1]["index"] + 1
                add_datas = l2_data_util.L2DataUtil.get_add_data(code,local_latest_datas.get(code), datas, _start_index)
                # -------------数据增量处理------------
                try:
                    cls.process_add_datas(code, add_datas, capture_timestamp, __start_time)
                finally:
                    # 保存数据
                    __start_time = round(t.time() * 1000)
                    l2_data_util.save_l2_data(code, datas, add_datas, cls.random_key[code])
                    __start_time = l2_data_log.l2_time(code, cls.random_key[code],
                                                       round(t.time() * 1000) - __start_time,
                                                       "保存数据时间({})".format(len(add_datas)))
        finally:
            if code in cls.unreal_buy_dict:
                cls.unreal_buy_dict.pop(code)
    @classmethod
    def process_add_datas(cls, code, add_datas, capture_timestamp, __start_time):
        now_time_str = tool.get_now_time_str()
        if len(add_datas) > 0:
            # 拼接数据
            local_today_datas[code].extend(add_datas)
            l2_data_util.load_num_operate_map(l2_data_util.local_today_num_operate_map, code, add_datas)
            # 第1条数据是否为09:30:00
            if add_datas[0]["val"]["time"] == "09:30:00":
                if global_util.cuurent_prices.get(code):
                    price_data = global_util.cuurent_prices.get(code)
                    if price_data[1]:
                        # 当前涨停价,设置涨停时间
                        logger_l2_process.info("开盘涨停:{}", code)
                        # 保存涨停时间
                        limit_up_time_manager.save_limit_up_time(code, "09:30:00")
        total_datas = local_today_datas[code]
        __start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - __start_time,
                                           "l2数据预处理时间")
        if len(add_datas) > 0:
            latest_time = add_datas[len(add_datas) - 1]["val"]["time"]
            # 时间差不能太大才能处理
            if l2_data_util.L2DataUtil.is_same_time(now_time_str, latest_time) and not l2_trade_util.is_in_forbidden_trade_codes(code):
                # 判断是否已经挂单
                state = trade_manager.get_trade_state(code)
                start_index = len(total_datas) - len(add_datas)
                end_index = len(total_datas) - 1
                if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER or state == trade_manager.TRADE_STATE_BUY_SUCCESS:
                    # 已挂单
                    cls.__process_order(code, start_index, end_index, capture_timestamp)
                else:
                    # 未挂单
                    cls.__process_not_order(code, start_index, end_index, capture_timestamp)
            logger_l2_process.info("code:{} 处理数据范围: {}-{} 处理时间:{} 截图时间戳:{}", code, add_datas[0]["index"],
                                   add_datas[-1]["index"], round(t.time() * 1000) - __start_time,
                                   capture_timestamp)
            __start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - __start_time,
                                               "l2数据处理时间")
    # 处理未挂单
    @classmethod
    def __process_not_order(cls, code, start_index, end_index, capture_time):
        __start_time = round(t.time() * 1000)
        # 获取阈值
        threshold_money, msg = cls.__get_threshmoney(code)
        if round(t.time() * 1000) - __start_time > 10:
            __start_time = l2_data_log.l2_time(code, cls.random_key.get(code), round(t.time() * 1000) - __start_time,
                                               "获取m值数据耗时")
        cls.__start_compute_buy(code, start_index, end_index, threshold_money, capture_time)
    # 测试专用
    @classmethod
    def process_order(cls, code, start_index, end_index, capture_time, new_add=True):
        cls.__process_order(code, start_index, end_index, capture_time, new_add)
    # 处理已挂单
    @classmethod
    def __process_order(cls, code, start_index, end_index, capture_time, new_add=True):
        if start_index < 0:
            start_index = 0
        if end_index < start_index:
            return
        total_data = local_today_datas.get(code)
        _start_time = round(t.time() * 1000)
        # 获取买入信号起始点
        buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set = cls.__get_order_begin_pos(
            code)
        # 处理安全笔数
        cls.__buyL2SafeCountManager.compute_left_rate(code, start_index, end_index, total_data,
                                                      local_today_num_operate_map.get(code))
        _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
                                          "已下单-获取买入信息耗时")
        # 撤单计算,只看买1
        cancel_data, cancel_msg = L2LimitUpMoneyStatisticUtil.process_data(cls.random_key[code], code, start_index,
                                                                           end_index,
                                                                           buy_single_index, buy_exec_index)
        _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
                                          "已下单-买1统计耗时")
        # S撤单计算,看秒级大单撤单
        try:
            b_need_cancel, b_cancel_data = SecondCancelBigNumComputer.need_cancel(code, buy_single_index,
                                                                                  buy_exec_index, start_index,
                                                                                  end_index, total_data,
                                                                                  cls.random_key[code])
            if b_need_cancel and not cancel_data:
                cancel_data = b_cancel_data
                cancel_msg = "S大单撤销比例触发阈值"
        except Exception as e:
            logging.exception(e)
        _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
                                          "已下单-s级大单估算")
        # H撤
        try:
            b_need_cancel, b_cancel_data = HourCancelBigNumComputer.need_cancel(code, buy_exec_index, start_index,
                                                                                end_index, total_data,
                                                                                cls.random_key[code])
            if b_need_cancel and not cancel_data:
                cancel_data = b_cancel_data
                cancel_msg = "H撤销比例触发阈值"
        except Exception as e:
            logging.exception(e)
        _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
                                          "已下单-H撤大单计算")
        if not cancel_data:
            # 统计板上卖
            try:
                cancel_data, cancel_msg = L2LimitUpSellStatisticUtil.process(cls.random_key[code], code, start_index,
                                                                             end_index,
                                                                             buy_exec_index)
            except Exception as e:
                logging.exception(e)
        _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
                                          "已下单-板上卖耗时")
        # 计算m值大单
        cls.l2BigNumForMProcessor.process(code, max(buy_single_index, start_index), end_index,
                                          gpcode_manager.get_limit_up_price(code))
        _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
                                          "已下单-m值大单计算")
        if cancel_data:
            cls.debug(code, "触发撤单,撤单位置:{} ,撤单原因:{}", cancel_data["index"], cancel_msg)
            # 撤单
            if cls.cancel_buy(code, cancel_msg):
                # 撤单成功,继续计算下单
                cls.__process_not_order(code, cancel_data["index"] + 1, end_index, capture_time)
            else:
                # 撤单尚未成功
                pass
            _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
                                              "已下单-撤单+处理剩余数据")
        else:
            # 如果有虚拟下单需要真实下单
            unreal_buy_info = cls.unreal_buy_dict.get(code)
            if unreal_buy_info is not None:
                cls.debug(code, "有虚拟下单,无买撤信号,开始执行买入,执行位置:{},截图时间:{}", unreal_buy_info[0], capture_time)
                # unreal_buy_info 的内容格式为:(触法买操作下标,截图时间)
                # 真实下单
                cls.__buy(code, unreal_buy_info[1], local_today_datas[code][unreal_buy_info[0]],
                          unreal_buy_info[0])
                _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
                                                  "已下单-真实下单")
    @classmethod
    def __buy(cls, code, capture_timestamp, last_data, last_data_index):
        can, reason = cls.__can_buy(code)
        # 删除虚拟下单
        if code in cls.unreal_buy_dict:
            cls.unreal_buy_dict.pop(code)
        if not can:
            cls.debug(code, "不可以下单,原因:{}", reason)
            if not reason.startswith("买1价不为涨停价"):
                # 中断买入
                trade_manager.break_buy(code, reason)
            return
        else:
            cls.debug(code, "可以下单,原因:{}", reason)
            try:
                cls.debug(code, "开始执行买入")
                trade_manager.start_buy(code, capture_timestamp, last_data,
                                        last_data_index)
                trade_data_manager.placeordercountmanager.place_order(code)
                # 下单成功,需要删除最大买1
                cls.__thsBuy1VolumnManager.clear_max_buy1_volume(code)
                # 获取买入位置信息
                try:
                    buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set = cls.__get_order_begin_pos(
                        code)
                    cls.__buyL2SafeCountManager.save_place_order_info(code, buy_single_index, buy_exec_index, None)
                    SecondCancelBigNumComputer.place_order_success(code, buy_single_index, buy_exec_index)
                    HourCancelBigNumComputer.place_order_success(code, buy_single_index, buy_exec_index,
                                                                 local_today_datas.get(code),
                                                                 local_today_num_operate_map.get(code))
                except Exception as e:
                    logging.exception(e)
                    logger_l2_error.exception(e)
                l2_data_manager.TradePointManager.delete_buy_cancel_point(code)
                cls.debug(code, "执行买入成功")
            except Exception as e:
                cls.debug(code, "执行买入异常:{}", str(e))
                pass
            finally:
                cls.debug(code, "m值影响因子:{}", l2_trade_factor.L2TradeFactorUtil.factors_to_string(code))
    # 是否可以取消
    @classmethod
    def __can_cancel(cls, code):
        if constant.TEST:
            return True, ""
        # 暂时注释掉
        # 14点后如果是板块老大就不需要取消了
        # now_time_str = tool.get_now_time_str()
        # if int(now_time_str.replace(":", "")) >= 140000:
        #     industry, codes = ths_industry_util.get_same_industry_codes(code, gpcode_manager.get_gp_list())
        #     if industry is None:
        #         return True, "没有获取到行业"
        #     codes_index = industry_codes_sort.sort_codes(codes, code)
        #     if codes_index is not None and codes_index.get(code) is not None:
        #         # 同一板块中老二后面的不能买
        #         if codes_index.get(code) == 0:
        #             return False, "14:00后老大不能撤单"
        #         elif codes_index.get(code) == 1:
        #             # 判断老大是否都是09:30:00涨停的
        #             # 同1板块老大是09:30:00涨停,老二14:00砸开的不撤
        #             first_count = 0
        #             for key in codes_index:
        #                 if codes_index[key] == 0:
        #                     first_count += 1
        #                     if limit_up_time_manager.get_limit_up_time(key) == "09:30:00":
        #                         first_count -= 1
        #             if first_count == 0:
        #                 return False, "14:00后老大都开盘涨停,老二不能撤单"
        return True, ""
    # 是否可以买
    @classmethod
    def __can_buy(cls, code):
        # 买1价格必须为涨停价才能买
        # buy1_price = cls.buy1PriceManager.get_price(code)
        # if buy1_price is None:
        #     return False, "买1价尚未获取到"
        # limit_up_price = gpcode_manager.get_limit_up_price(code)
        # if limit_up_price is None:
        #     return False, "尚未获取到涨停价"
        # if abs(float(buy1_price) - float(limit_up_price)) >= 0.01:
        #     return False, "买1价不为涨停价,买1价-{} 涨停价-{}".format(buy1_price, limit_up_price)
        # 从买入信号起始点到当前数据末尾的纯买手数与当前的卖1做比较,如果比卖1小则不能买入
        total_datas = local_today_datas[code]
        try:
            sell1_time, sell1_price, sell1_volumn = cls.__ths_l2_trade_queue_manager.get_sell1_info(code)
            cls.buy_debug(code, "卖1信息为:({},{},{})", sell1_time, sell1_price, sell1_volumn)
            if sell1_time is not None and sell1_volumn > 0:
                # 获取执行位信息
                buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set = cls.__get_order_begin_pos(
                    code)
                buy_nums = num
                for i in range(buy_exec_index + 1, total_datas[-1]["index"] + 1):
                    _val = total_datas[i]["val"]
                    # 涨停买
                    if L2DataUtil.is_limit_up_price_buy(_val):
                        # 涨停买
                        buy_nums += int(_val["num"]) * int(total_datas[i]["re"])
                    elif L2DataUtil.is_limit_up_price_buy_cancel(_val):
                        buy_nums -= int(_val["num"]) * int(total_datas[i]["re"])
                if buy_nums < sell1_volumn * 0.49:
                    return False, "纯买量({})小于卖1量的49%{} 卖1时间:{}".format(buy_nums, sell1_volumn, sell1_time)
        except Exception as e:
            logging.exception(e)
        # 量比超过1.3的不能买
        volumn_rate = l2_trade_factor.L2TradeFactorUtil.get_volumn_rate_by_code(code)
        if volumn_rate >= 1.3:
            return False, "最大量比超过1.3不能买"
        limit_up_time = limit_up_time_manager.get_limit_up_time(code)
        if limit_up_time is not None and l2_data_util.L2DataUtil.get_time_as_second(
                limit_up_time) >= l2_data_util.L2DataUtil.get_time_as_second(
            "14:30:00"):
            return False, "14:30后涨停的不能买,涨停时间为{}".format(limit_up_time)
        # 同一板块中老二后面的不能买
        industry, codes = ths_industry_util.get_same_industry_codes(code, gpcode_manager.get_gp_list())
        if industry is None:
            return True, "没有获取到行业"
        codes_index = industry_codes_sort.sort_codes(codes, code)
        if codes_index is not None and codes_index.get(code) is not None and codes_index.get(code) > 1:
            # 当老大老二当前没涨停
            return False, "同一板块中老三,老四,...不能买"
        if cls.__codeActualPriceProcessor.is_under_water(code, total_datas[-1]["val"]["time"]):
            # 水下捞且板块中的票小于16不能买
            # if global_util.industry_hot_num.get(industry) is not None and global_util.industry_hot_num.get(
            #         industry) <= 16:
            #     return False, "水下捞,板块中的票小于2只,为{}".format(global_util.industry_hot_num.get(industry))
            # 水下捞自由流通市值大于老大的不要买
            if codes_index.get(code) != 0:
                # 获取老大的市值
                for c in codes_index:
                    if codes_index.get(c) == 0 and global_util.zyltgb_map.get(code) > global_util.zyltgb_map.get(c):
                        return False, "水下捞,不是老大,且自由流通市值大于老大"
        # 13:30后涨停,本板块中涨停票数<29不能买
        # if limit_up_time is not None:
        #     if int(limit_up_time.replace(":", "")) >= 133000 and global_util.industry_hot_num.get(industry) is not None:
        #         if global_util.industry_hot_num.get(industry) < 16:
        #             return False, "13:30后涨停,本板块中涨停票数<16不能买"
        if codes_index.get(code) is not None and codes_index.get(code) == 1:
            # 如果老大已经买成功了, 老二就不需要买了
            first_codes = []
            for key in codes_index:
                if codes_index.get(key) == 0:
                    first_codes.append(key)
            # 暂时注释掉
            # for key in first_codes:
            #     state = trade_manager.get_trade_state(key)
            #     if state == trade_manager.TRADE_STATE_BUY_SUCCESS:
            #         # 老大已经买成功了
            #         return False, "老大{}已经买成功,老二无需购买".format(key)
            #
            # # 有9点半涨停的老大才能买老二,不然不能买
            # # 获取老大的涨停时间
            # for key in first_codes:
            #     # 找到了老大
            #     time_ = limit_up_time_manager.get_limit_up_time(key)
            #     if time_ == "09:30:00":
            #         return True, "9:30涨停的老大,老二可以下单"
            # return False, "老大非9:30涨停,老二不能下单"
        # 过时  老二,本板块中涨停票数<29 不能买
        # if codes_index.get(code) is not None and codes_index.get(code) == 1 and global_util.industry_hot_num.get(
        #         industry) is not None:
        #     if global_util.industry_hot_num.get(industry) < 29:
        #         return False, "老二,本板块中涨停票数<29不能买"
        # 可以下单
        return True, None
    @classmethod
    def __cancel_buy(cls, code):
        try:
            cls.debug(code, "开始执行撤单")
            trade_manager.start_cancel_buy(code)
            # 取消买入标识
            l2_data_manager.TradePointManager.delete_buy_point(code)
            l2_data_manager.TradePointManager.delete_buy_cancel_point(code)
            l2_data_manager.TradePointManager.delete_compute_info_for_cancel_buy(code)
            l2_data_manager.TradePointManager.delete_count_info_for_cancel_buy(code)
            cls.debug(code, "执行撤单成功")
        except Exception as e:
            logging.exception(e)
            cls.debug(code, "执行撤单异常:{}", str(e))
    @classmethod
    def cancel_buy(cls, code, msg=None, source="l2"):
        # 是否是交易队列触发
        buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set = cls.__get_order_begin_pos(
            code)
        total_datas = local_today_datas[code]
        if source == "trade_queue":
            # 交易队列触发的需要下单后5s
            if buy_exec_index is not None and buy_exec_index > 0:
                now_time_str = tool.get_now_time_str()
                if tool.trade_time_sub(now_time_str, total_datas[buy_exec_index]["val"]["time"]) < 5:
                    return False
        if code in cls.unreal_buy_dict:
            cls.unreal_buy_dict.pop(code)
            # 取消买入标识
            l2_data_manager.TradePointManager.delete_buy_point(code)
            l2_data_manager.TradePointManager.delete_buy_cancel_point(code)
            l2_data_manager.TradePointManager.delete_compute_info_for_cancel_buy(code)
            l2_data_manager.TradePointManager.delete_count_info_for_cancel_buy(code)
        else:
            can_cancel, reason = cls.__can_cancel(code)
            if not can_cancel:
                # 不能取消
                cls.cancel_debug(code, "撤单中断,原因:{}", reason)
                cls.debug(code, "撤单中断,原因:{}", reason)
                return False
            cls.__cancel_buy(code)
            # 撤单成功
            cls.__buyL2SafeCountManager.save_place_order_info(code, buy_single_index, buy_exec_index,
                                                              total_datas[-1]["index"])
        cls.debug(code, "执行撤单成功,原因:{}", msg)
        return True
    # 虚拟下单
    @classmethod
    def __virtual_buy(cls, code, buy_single_index, buy_exec_index, capture_time):
        cls.unreal_buy_dict[code] = (buy_exec_index, capture_time)
        SecondCancelBigNumComputer.place_order_success(code, buy_single_index, buy_exec_index)
        # 删除之前的板上卖信息
        L2LimitUpSellStatisticUtil.delete(code)
    @classmethod
    def __start_compute_buy(cls, code, compute_start_index, compute_end_index, threshold_money, capture_time,
                            new_add=True):
        if compute_end_index < compute_start_index:
            return
        _start_time = round(t.time() * 1000)
        total_datas = local_today_datas[code]
        # 获取买入信号计算起始位置
        buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set = cls.__get_order_begin_pos(
            code)
        # 是否为新获取到的位置
        if buy_single_index is None:
            place_order_count = trade_data_manager.placeordercountmanager.get_place_order_count(code)
            continue_count = 3
            # 前2次的信号连续笔数为3,后面为2
            if place_order_count > 2:
                continue_count = 2
            # 有买入信号
            has_single, _index = cls.__compute_order_begin_pos(code, max(
                (compute_start_index - continue_count - 1) if new_add else compute_start_index, 0), continue_count,
                                                               compute_end_index)
            buy_single_index = _index
            if has_single:
                num = 0
                count = 0
                cls.debug(code, "获取到买入信号起始点:{} ,计算范围:{}-{} ,数据:{}", buy_single_index, compute_start_index,
                          compute_end_index, total_datas[buy_single_index])
                # 如果是今天第一次有下单开始信号,需要设置大单起始点
                cls.l2BigNumForMProcessor.set_begin_pos(code, buy_single_index)
        _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time, "下单信号计算时间")
        if buy_single_index is None:
            # 未获取到买入信号,终止程序
            return None
        # 计算m值大单
        cls.l2BigNumForMProcessor.process(code, max(buy_single_index, compute_start_index), compute_end_index,
                                          gpcode_manager.get_limit_up_price(code))
        _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time, "计算m值大单")
        threshold_money, msg = cls.__get_threshmoney(code)
        # 买入纯买额统计
        compute_index, buy_nums, buy_count, rebegin_buy_pos, max_num_set_new = cls.__sum_buy_num_for_order_3(code, max(
            buy_single_index, compute_start_index), compute_end_index, num, count, threshold_money, buy_single_index,
                                                                                                             max_num_set)
        _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time, "纯买额统计时间")
        cls.debug(code, "m值-{} m值因子-{}", threshold_money, msg)
        # 买入信号位与计算位置间隔2s及以上了
        if rebegin_buy_pos is not None:
            # 需要重新计算纯买额
            cls.__start_compute_buy(code, rebegin_buy_pos, compute_end_index, threshold_money, capture_time, False)
            return
        if compute_index is not None:
            cls.debug(code, "获取到买入执行位置:{} m值:{} 纯买手数:{} 纯买单数:{} 数据:{}", compute_index, threshold_money, buy_nums,
                      buy_count,
                      total_datas[compute_index])
            # 记录买入信号位置
            cls.__save_order_begin_data(code, buy_single_index, compute_index, compute_index, buy_nums, buy_count,
                                        max_num_set_new)
            # 如果是今天第一次有下单执行信号,涨停时间(买入执行位时间)
            limit_up_time_manager.save_limit_up_time(code, total_datas[compute_index]["val"]["time"])
            # 虚拟下单
            cls.__virtual_buy(code, buy_single_index, compute_index, capture_time)
            # 删除之前的所有撤单信号
            l2_data_manager.TradePointManager.delete_buy_cancel_point(code)
            # 涨停封单额计算
            L2LimitUpMoneyStatisticUtil.process_data(cls.random_key[code], code, buy_single_index, compute_index,
                                                     buy_single_index,
                                                     buy_exec_index, False)
            _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
                                              "记录执行买入数据", force=True)
            # 数据是否处理完毕
            if compute_index >= compute_end_index:
                need_cancel, cancel_data = SecondCancelBigNumComputer.need_cancel(code, buy_single_index,
                                                                                  compute_index,
                                                                                  buy_single_index, compute_index,
                                                                                  total_datas, cls.random_key[code],
                                                                                  True)
                _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
                                                  "S级大单处理耗时", force=True)
                cls.debug(code, "数据处理完毕,下单, 数据截图时间-{}", capture_time)
                # 数据已经处理完毕,如果还没撤单就实际下单
                if need_cancel:
                    if cls.cancel_buy(code, "S级大单撤销"):
                        # 执行撤单成功
                        pass
                else:
                    cls.__buy(code, capture_time, total_datas[compute_index], compute_index)
            else:
                SecondCancelBigNumComputer.need_cancel(code, buy_single_index, compute_index, buy_single_index,
                                                       compute_index, total_datas, cls.random_key[code], False)
                _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
                                                  "S级大单处理耗时", force=True)
                # 数据尚未处理完毕,进行下一步处理
                cls.debug(code, "数据尚未处理完毕,进行下一步处理,处理进度:{}", compute_index)
                # 处理撤单步骤
                cls.__process_order(code, compute_index + 1, compute_end_index, capture_time, False)
                _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
                                                  "处理撤单步骤耗时", force=True)
        else:
            # 未达到下单条件,保存纯买额,设置纯买额
            # 记录买入信号位置
            cls.__save_order_begin_data(code, buy_single_index, -1, compute_end_index, buy_nums, buy_count,
                                        max_num_set_new)
            print("保存大单时间", round((t.time() - _start_time) * 1000))
            _start_time = t.time()
        pass
    # 获取下单起始信号
    @classmethod
    def __get_order_begin_pos(cls, code):
        buy_single_index, buy_exec_index, compute_index, num, count, max_num_set = l2_data_manager.TradePointManager.get_buy_compute_start_data(
            code)
        return buy_single_index, buy_exec_index, compute_index, num, count, max_num_set
    # 保存下单起始信号
    @classmethod
    def __save_order_begin_data(self, code, buy_single_index, buy_exec_index, compute_index, num, count, max_num_set):
        TradePointManager.set_buy_compute_start_data(code, buy_single_index, buy_exec_index, compute_index, num, count,
                                                     max_num_set)
    # 计算下单起始信号
    # compute_data_count 用于计算的l2数据数量
    @classmethod
    def __compute_order_begin_pos(cls, code, start_index, continue_count, end_index):
        second_930 = 9 * 3600 + 30 * 60 + 0
        # 倒数100条数据查询
        datas = local_today_datas[code]
        if end_index - start_index + 1 < continue_count:
            return False, None
        __time = None
        last_index = None
        count = 0
        start = None
        for i in range(start_index, end_index + 1):
            _val = datas[i]["val"]
            # 时间要>=09:30:00
            if L2DataUtil.get_time_as_second(_val["time"]) < second_930:
                continue
            if L2DataUtil.is_limit_up_price_buy(_val):
                if last_index is None or (datas[last_index]["val"]["time"] == datas[i]["val"]["time"]):
                    if start is None:
                        start = i
                    last_index = i
                    count += datas[i]["re"]
                    if count >= continue_count:
                        return True, start
                else:
                    # 本条数据作为起点
                    last_index = i
                    count = datas[i]["re"]
                    start = i
            elif not L2DataUtil.is_sell(_val) and not L2DataUtil.is_sell_cancel(_val):
                # 剔除卖与卖撤
                last_index = None
                count = 0
                start = None
        return False, None
    @classmethod
    def __get_threshmoney(cls, code):
        return l2_trade_factor.L2TradeFactorUtil.compute_m_value(code)
    # 计算万手哥笔数
    @classmethod
    def __compute_big_money_count(cls, total_datas, start_index, end_index):
        count = 0
        for i in range(start_index, end_index + 1):
            if L2DataUtil.is_limit_up_price_buy(total_datas[i]["val"]):
                count += total_datas[i]["re"]
            elif L2DataUtil.is_limit_up_price_buy_cancel(total_datas[i]["val"]):
                count -= total_datas[i]["re"]
        return count
    # 统计买入净买量,不计算在买入信号之前的买撤单
    @classmethod
    def __sum_buy_num_for_order_3(cls, code, compute_start_index, compute_end_index, origin_num, origin_count,
                                  threshold_money, buy_single_index, max_num_set):
        def get_threshold_count():
            count = threshold_count  # - sub_threshold_count
            # if count < 3:
            #     count = 3
            # count = round(count * buy1_factor)
            # # 最高30笔,最低8笔
            # if count > 21:
            #     count = 21
            # if count < 8:
            #     count = 8
            return count
        _start_time = t.time()
        total_datas = local_today_datas[code]
        # 计算从买入信号开始到计算开始位置的大单数量
        sub_threshold_count = cls.__compute_big_money_count(total_datas, buy_single_index, compute_start_index - 1)
        if sub_threshold_count < 0:
            sub_threshold_count = 0
        buy_nums = origin_num
        buy_count = origin_count
        limit_up_price = gpcode_manager.get_limit_up_price(code)
        buy1_price = cls.buy1PriceManager.get_price(code)
        if limit_up_price is None:
            raise Exception("涨停价无法获取")
        # 目标手数
        threshold_num = threshold_money / (limit_up_price * 100)
        buy1_factor = 1
        # 获取买1是否为涨停价
        if buy1_price is None:
            buy1_factor = 1.3
        elif limit_up_price is None:
            buy1_factor = 1.3
        elif abs(float(buy1_price) - float(limit_up_price)) >= 0.01:
            print("买1价不为涨停价,买1价-{} 涨停价-{}".format(buy1_price, limit_up_price))
            buy1_factor = 1.3
        # 目标订单数量
        threshold_count = safe_count_manager.BuyL2SafeCountManager.get_safe_count(code)
        buy_single_time_seconds = L2DataUtil.get_time_as_second(total_datas[buy_single_index]["val"]["time"])
        # 可以触发买,当有涨停买信号时才会触发买
        trigger_buy = True
        place_order_count = trade_data_manager.placeordercountmanager.get_place_order_count(code)
        if place_order_count > 3:
            place_order_count = 3
        # 间隔最大时间依次为:3,9,27,81
        max_space_time = pow(3, place_order_count + 1) - 1
        # 最大买量
        max_buy_num = 0
        max_buy_num_set = set(max_num_set)
        for i in range(compute_start_index, compute_end_index + 1):
            data = total_datas[i]
            _val = total_datas[i]["val"]
            trigger_buy = False
            # 必须为连续3秒内的数据
            if L2DataUtil.get_time_as_second(_val["time"]) - buy_single_time_seconds > max_space_time:
                TradePointManager.delete_buy_point(code)
                if i == compute_end_index:
                    # 数据处理完毕
                    return None, buy_nums, buy_count, None, max_buy_num_set
                else:
                    # 计算买入信号,不能同一时间开始计算
                    for ii in range(buy_single_index + 1, compute_end_index + 1):
                        if total_datas[buy_single_index]["val"]["time"] != total_datas[ii]["val"]["time"]:
                            return None, buy_nums, buy_count, ii, max_buy_num_set
            # 涨停买
            if L2DataUtil.is_limit_up_price_buy(_val):
                if l2_data_util.is_big_money(_val):
                    # sub_threshold_count += int(total_datas[i]["re"])
                    max_buy_num_set.add(i)
                if round(int(_val["num"]) * float(_val["price"])) >= 5900:
                    trigger_buy = True
                    # 只统计59万以上的金额
                    buy_nums += int(_val["num"]) * int(total_datas[i]["re"])
                    buy_count += int(total_datas[i]["re"])
                    if buy_nums >= threshold_num and buy_count >= get_threshold_count():
                        logger_l2_trade_buy.info("{}获取到买入执行点:{} 统计纯买手数:{} 目标纯买手数:{} 统计纯买单数:{} 目标纯买单数:{}, 大单数量:{}", code,
                                                 i,
                                                 buy_nums,
                                                 threshold_num, buy_count, get_threshold_count(), sub_threshold_count)
            elif L2DataUtil.is_limit_up_price_buy_cancel(_val):
                if l2_data_util.is_big_money(_val):
                    sub_threshold_count -= int(total_datas[i]["re"])
                if round(int(_val["num"]) * float(_val["price"])) >= 5900:
                    # 只统计59万以上的金额
                    # 涨停买撤
                    # 判断买入位置是否在买入信号之前
                    buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(total_datas[i],
                                                                                     local_today_num_operate_map.get(
                                                                                         code))
                    if buy_index is not None:
                        # 找到买撤数据的买入点
                        if buy_index >= buy_single_index:
                            buy_nums -= int(_val["num"]) * int(data["re"])
                            buy_count -= int(data["re"])
                            cls.buy_debug(code, "{}数据在买入信号之后 撤买纯买手数:{} 目标手数:{}", i, buy_nums, threshold_num)
                        else:
                            cls.buy_debug(code, "{}数据在买入信号之前,买入位:{}", i, buy_index)
                            if total_datas[buy_single_index]["val"]["time"] == buy_data["val"]["time"]:
                                # 同一秒,当作买入信号之后处理
                                buy_nums -= int(_val["num"]) * int(data["re"])
                                buy_count -= int(data["re"])
                                cls.buy_debug(code, "{}数据买入位与预估买入位在同一秒", i)
                    else:
                        # 未找到买撤数据的买入点
                        cls.buy_debug(code, "未找到买撤数据的买入点: 位置-{} 数据-{}", i, data)
                        buy_nums -= int(_val["num"]) * int(total_datas[i]["re"])
                        buy_count -= int(total_datas[i]["re"])
            cls.buy_debug(code, "位置-{},总手数:{},目标手数:{}", i,
                          buy_nums, threshold_num)
            # 需要的最小大单笔数
            big_num_count = 2
            if place_order_count > 1:
                # 第一次下单需要大单最少2笔,以后只需要1笔
                big_num_count = 1
            # 有撤单信号,且小于阈值
            if buy_nums >= threshold_num and buy_count >= get_threshold_count() and trigger_buy and len(
                    max_buy_num_set) >= big_num_count:
                return i, buy_nums, buy_count, None, max_buy_num_set
        cls.buy_debug(code, "尚未获取到买入执行点,起始计算位置:{} 统计纯买手数:{} 目标纯买手数:{}  统计纯买单数:{} 目标纯买单数:{} 大单数量:{}",
                      compute_start_index,
                      buy_nums,
                      threshold_num, buy_count, get_threshold_count(), sub_threshold_count)
        return None, buy_nums, buy_count, None, max_buy_num_set
    @classmethod
    def test(cls):
        code = "002556"
        l2_trade_test.clear_trade_data(code)
        load_l2_data(code, True)
        _start = t.time()
        if True:
            state = trade_manager.get_trade_state(code)
            cls.random_key[code] = random.randint(0, 100000)
            capture_timestamp = 1999988888
            try:
                if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER:
                    # 已挂单
                    cls.__process_order(code, 1552, 1641, capture_timestamp)
                else:
                    # 未挂单
                    cls.__process_not_order(code, 1552, 1641, capture_timestamp)
            except Exception as e:
                logging.exception(e)
            print("处理时间", round((t.time() - _start) * 1000))
            return
        # 按s批量化数据
        total_datas = local_today_datas[code]
        start_time = total_datas[0]["val"]["time"]
        start_index = 0
        for i in range(0, len(total_datas)):
            if total_datas[i]["val"]["time"] != start_time:
                cls.random_key[code] = random.randint(0, 100000)
                # 处理数据
                start = start_index
                # if start != 201:
                #     continue
                end = i - 1
                print("处理进度:{},{}".format(start, end))
                capture_timestamp = 1999999999
                state = trade_manager.get_trade_state(code)
                try:
                    if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER:
                        # 已挂单
                        cls.__process_order(code, start, end, capture_timestamp)
                    else:
                        # 未挂单
                        cls.__process_not_order(code, start, end, capture_timestamp)
                except Exception as e:
                    logging.exception(e)
                # t.sleep(1)
                start_index = i
                start_time = total_datas[i]["val"]["time"]
        print("时间花费:", round((t.time() - _start) * 1000))
    @classmethod
    def test1(cls):
        code = "002556"
        l2_trade_test.clear_trade_data(code)
        local_latest_datas[code] = []
        load_l2_data(code, True)
        _start = t.time()
        capture_timestamp = 1999999999
        cls.process(code, l2_data_util.local_today_datas[code][1552:1641], capture_timestamp)
        print("时间花费:", round((t.time() - _start) * 1000))
        pass
    @classmethod
    def test2(cls):
        code = "002864"
        load_l2_data(code)
        limit_up_time_manager.load_limit_up_time()
        limit_up_time = limit_up_time_manager.get_limit_up_time(code)
        if limit_up_time is not None and l2_data_util.L2DataUtil.get_time_as_second(
                limit_up_time) >= l2_data_util.L2DataUtil.get_time_as_second(
            "14:30:00"):
            return False, "14:30后涨停的不能买,涨停时间为{}".format(limit_up_time)
        # 同一板块中老二后面的不能买
        industry, codes = ths_industry_util.get_same_industry_codes(code, gpcode_manager.get_gp_list())
        if industry is None:
            return True, "没有获取到行业"
        codes_index = industry_codes_sort.sort_codes(codes, code)
        if codes_index is not None and codes_index.get(code) is not None and codes_index.get(code) > 1:
            return False, "同一板块中老三,老四,...不能买"
        if cls.__codeActualPriceProcessor.is_under_water(code):
            # 水下捞且板块中的票小于21不能买
            if global_util.industry_hot_num.get(industry) is not None and global_util.industry_hot_num.get(
                    industry) <= 16:
                return False, "水下捞,板块中的票小于2只,为{}".format(global_util.industry_hot_num.get(industry))
            if codes_index.get(code) != 0:
                return False, "水下捞,不是老大,是老{}".format(codes_index.get(code))
        # 13:30后涨停,本板块中涨停票数<29不能买
        limit_up_time = limit_up_time_manager.get_limit_up_time(code)
        if limit_up_time is not None:
            if int(limit_up_time.replace(":", "")) >= 133000 and global_util.industry_hot_num.get(industry) is not None:
                if global_util.industry_hot_num.get(industry) < 16:
                    return False, "13:30后涨停,本板块中涨停票数<16不能买"
        if codes_index.get(code) is not None and codes_index.get(code) == 1:
            # ----此条注释-----
            # 如果老大已经买成功了,老二就不需要买了
            # first_codes = []
            # for key in codes_index:
            #     if codes_index.get(key) == 0:
            #         first_codes.append(key)
            #
            # for key in first_codes:
            #     state = trade_manager.get_trade_state(key)
            #     if state == trade_manager.TRADE_STATE_BUY_SUCCESS:
            #         # 老大已经买成功了
            #         return False, "老大{}已经买成功,老二无需购买".format(key)
            # ----此条注释-----
            # ----此条注释-----
            # 有9点半涨停的老大才能买老二,不然不能买
            # 获取老大的涨停时间
            # for key in first_codes:
            #     # 找到了老大
            #     time_ = limit_up_time_manager.get_limit_up_time(key)
            #     if time_ == "09:30:00":
            #         return True, "9:30涨停的老大,老二可以下单"
            # return False, "老大非9:30涨停,老二不能下单"
            # ----此条注释-----
            return True, "老二可以下单"
    @classmethod
    def test3(cls):
        code = "002094"
        load_l2_data(code, True)
        cls.random_key[code] = random.randint(0, 100000)
        buy_single_begin_index, buy_exec_index = 426, 479
        L2LimitUpMoneyStatisticUtil.process_data(cls.random_key[code], code, 480, 519,
                                                 buy_single_begin_index, buy_exec_index, False)
        L2LimitUpMoneyStatisticUtil.process_data(cls.random_key[code], code, 480, 519,
                                                 buy_single_begin_index, buy_exec_index, False)
    @classmethod
    def test_can_buy(cls):
        code = "002923"
        load_l2_data(code, True)
        limit_up_time_manager.load_limit_up_time()
        can, msg = cls.__can_buy(code)
        print(can, msg)
if __name__ == "__main__":
    # trade_manager.start_cancel_buy("000637")
    # t.sleep(10)
    # L2TradeDataProcessor.test()
    L2LimitUpMoneyStatisticUtil.verify_num("601958", 89178, "13:22:45")
    # load_l2_data("600213")
    #
    # buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(local_today_datas["600213"][84],
    #                                                                  local_today_num_operate_map.get(
    #                                                                      "600213"))
    # print(buy_index, buy_data)
l2/l2_data_util.py
@@ -2,20 +2,385 @@
L2相关数据处理
"""
# L2交易队列
import datetime
import decimal
import json
import logging
import time
import constant
import gpcode_manager
import l2_data_log
import log
import redis_manager
import tool
_redisManager = redis_manager.RedisManager(1)
# l2数据管理
# 本地最新一次上传的数据
local_latest_datas = {}
# 本地今日数据
local_today_datas = {}
# 本地手数+操作那类型组成的临时变量
# 用于加快数据处理,用空换时间
local_today_num_operate_map = {}
def load_l2_data(code, force=False):
    redis = _redisManager.getRedis()
    # 加载最近的l2数据
    if local_latest_datas.get(code) is None or force:
        # 获取最近的数据
        _data = redis.get("l2-data-latest-{}".format(code))
        if _data is not None:
            if code in local_latest_datas:
                local_latest_datas[code] = json.loads(_data)
            else:
                local_latest_datas.setdefault(code, json.loads(_data))
        # 获取今日的数据
    if local_today_datas.get(code) is None or force:
        datas = log.load_l2_from_log()
        datas = datas.get(code)
        if datas is None:
            datas = []
        local_today_datas[code] = datas
        # 从数据库加载
        # datas = []
        # keys = redis.keys("l2-{}-*".format(code))
        # for k in keys:
        #     value = redis.get(k)
        #     _data = l2_data_util.l2_data_key_2_obj(k, value)
        #     datas.append(_data)
        # # 排序
        # new_datas = sorted(datas,
        #                    key=lambda e: (int(e.__getitem__('val')["time"].replace(":", "")), e.__getitem__('index')))
        # local_today_datas[code] = new_datas
        # 根据今日数据加载
        load_num_operate_map(local_today_num_operate_map, code, local_today_datas.get(code), force)
# 将数据根据num-operate分类
def load_num_operate_map(local_today_num_operate_map, code, source_datas, clear=False):
    if local_today_num_operate_map.get(code) is None:
        local_today_num_operate_map[code] = {}
    if clear:
        local_today_num_operate_map[code] = {}
    for data in source_datas:
        key = "{}-{}-{}".format(data["val"]["num"], data["val"]["operateType"], data["val"]["price"])
        if local_today_num_operate_map[code].get(key) is None:
            local_today_num_operate_map[code].setdefault(key, [])
        local_today_num_operate_map[code].get(key).append(data)
@tool.async_call
def saveL2Data(code, datas, msg=""):
    start_time = round(time.time() * 1000)
    # 查询票是否在待监听的票里面
    if not gpcode_manager.is_in_gp_pool(code):
        return None
    # 验证股价的正确性
    redis_instance = _redisManager.getRedis()
    try:
        if redis_instance.setnx("l2-save-{}".format(code), "1") > 0:
            # 计算保留的时间
            expire = tool.get_expire()
            i = 0
            for _data in datas:
                i += 1
                key = "l2-" + _data["key"]
                value = redis_instance.get(key)
                if value is None:
                    # 新增
                    try:
                        value = {"index": _data["index"], "re": _data["re"]}
                        redis_instance.setex(key, expire, json.dumps(value))
                    except:
                        logging.error("更正L2数据出错:{} key:{}".format(code, key))
                else:
                    json_value = json.loads(value)
                    if json_value["re"] != _data["re"]:
                        json_value["re"] = _data["re"]
                        redis_instance.setex(key, expire, json.dumps(json_value))
    finally:
        redis_instance.delete("l2-save-{}".format(code))
    print("保存新数据用时:", msg, "耗时:{}".format(round(time.time() * 1000) - start_time))
    return datas
# 保存l2数据
def save_l2_data(code, datas, add_datas, randomKey=None):
    redis = _redisManager.getRedis()
    # 只有有新曾数据才需要保存
    if len(add_datas) > 0:
        # 保存最近的数据
        __start_time = round(time.time() * 1000)
        redis.setex("l2-data-latest-{}".format(code), tool.get_expire(), json.dumps(datas))
        l2_data_log.l2_time(code, randomKey, round(time.time() * 1000) - __start_time, "保存最近l2数据用时")
        # 设置进内存
        local_latest_datas[code] = datas
        __set_l2_data_latest_count(code, len(datas))
        try:
            log.logger_l2_data.info("{}-{}", code, add_datas)
        except Exception as e:
            logging.exception(e)
        saveL2Data(code, add_datas)
# 设置最新的l2数据采集的数量
def __set_l2_data_latest_count(code, count):
    redis = _redisManager.getRedis()
    key = "latest-l2-count-{}".format(code)
    redis.setex(key, 2, count)
    pass
# 获取代码最近的l2数据数量
def get_l2_data_latest_count(code):
    if code is None or len(code) < 1:
        return 0
    redis = _redisManager.getRedis()
    key = "latest-l2-count-{}".format(code)
    result = redis.get(key)
    if result is None:
        return 0
    else:
        return int(result)
def parseL2Data(str):
    day = datetime.datetime.now().strftime("%Y%m%d")
    dict = json.loads(str)
    data = dict["data"]
    client = dict["client"]
    code = data["code"]
    channel = data["channel"]
    capture_time = data["captureTime"]
    process_time = data["processTime"]
    data = data["data"]
    limit_up_price = gpcode_manager.get_limit_up_price(code)
    datas = L2DataUtil.format_l2_data(data, code, limit_up_price)
    # 获取涨停价
    return day, client, channel, code, capture_time, process_time, datas, data
class L2DataUtil:
    @classmethod
    def is_same_time(cls, time1, time2):
        if constant.TEST:
            return True
        time1_s = time1.split(":")
        time1_second = int(time1_s[0]) * 3600 + int(time1_s[1]) * 60 + int(time1_s[2])
        time2_s = time2.split(":")
        time2_second = int(time2_s[0]) * 3600 + int(time2_s[1]) * 60 + int(time2_s[2])
        if abs(time2_second - time1_second) < 3:
            return True
        else:
            return False
    # 获取增量数据
    @classmethod
    def get_add_data(cls, code, latest_datas, datas, _start_index):
        if datas is not None and len(datas) < 1:
            return []
        last_data = None
        latest_datas_ = latest_datas
        if latest_datas_ is not None and len(latest_datas_) > 0:
            last_data = latest_datas_[-1]
        count = 0
        start_index = -1
        # 如果原来没有数据
        # 设置add_data的序号
        for n in reversed(datas):
            count += 1
            if n["key"] == (last_data["key"] if last_data is not None else ""):
                start_index = len(datas) - count
                break
        _add_datas = []
        if last_data is not None:
            if start_index < 0:
                if L2DataUtil.get_time_as_second(datas[0]["val"]["time"]) >= L2DataUtil.get_time_as_second(
                        last_data["val"]["time"]):
                    _add_datas = datas
                else:
                    _add_datas = []
            elif start_index + 1 >= len(datas):
                _add_datas = []
            else:
                _add_datas = datas[start_index + 1:]
        else:
            _add_datas = datas[start_index + 1:]
        for i in range(0, len(_add_datas)):
            _add_datas[i]["index"] = _start_index + i
        return _add_datas
    # 纠正数据,将re字段替换为较大值
    @classmethod
    def correct_data(cls, code, latest_datas, _datas):
        latest_data = latest_datas
        if latest_data is None:
            latest_data = []
        save_list = []
        for data in _datas:
            for _ldata in latest_data:
                # 新数据条数比旧数据多才保存
                if _ldata["key"] == data["key"] and _ldata["re"] < data["re"]:
                    max_re = max(_ldata["re"], data["re"])
                    _ldata["re"] = max_re
                    data["re"] = max_re
                    # 保存到数据库,更新re的数据
                    save_list.append(_ldata)
        if len(save_list) > 0:
            saveL2Data(code, save_list, "保存纠正数据")
            local_latest_datas[code] = latest_data
        return _datas
    # 处理l2数据
    @classmethod
    def format_l2_data(cls, data, code, limit_up_price):
        datas = []
        dataIndexs = {}
        same_time_num = {}
        for item in data:
            # 解析数据
            time = item["time"]
            if time in same_time_num:
                same_time_num[time] = same_time_num[time] + 1
            else:
                same_time_num[time] = 1
            price = float(item["price"])
            num = item["num"]
            limitPrice = item["limitPrice"]
            # 涨停价
            if limit_up_price is not None:
                if limit_up_price == tool.to_price(decimal.Decimal(price)):
                    limitPrice = 1
                else:
                    limitPrice = 0
                item["limitPrice"] = "{}".format(limitPrice)
            operateType = item["operateType"]
            # 不需要非涨停买与买撤
            if int(item["limitPrice"]) != 1 and (int(operateType) == 0 or int(operateType) == 1):
                continue
            cancelTime = item["cancelTime"]
            cancelTimeUnit = item["cancelTimeUnit"]
            key = "{}-{}-{}-{}-{}-{}-{}-{}".format(code, operateType, time, num, price, limitPrice, cancelTime,
                                                   cancelTimeUnit)
            if key in dataIndexs:
                # 数据重复次数+1
                datas[dataIndexs[key]]['re'] = datas[dataIndexs[key]]['re'] + 1
            else:
                # 数据重复次数默认为1
                datas.append({"key": key, "val": item, "re": 1})
                dataIndexs.setdefault(key, len(datas) - 1)
        # TODO 测试的时候开启,方便记录大单数据
        # l2_data_util.save_big_data(code, same_time_num, data)
        return datas
    @classmethod
    def get_time_as_second(cls, time_str):
        ts = time_str.split(":")
        return int(ts[0]) * 3600 + int(ts[1]) * 60 + int(ts[2])
    # @classmethod
    # def get_time_as_str(cls, time_seconds):
    #     ts = time_str.split(":")
    #     return int(ts[0]) * 3600 + int(ts[1]) * 60 + int(ts[2])
    # 是否是涨停价买
    @classmethod
    def is_limit_up_price_buy(cls, val):
        if int(val["limitPrice"]) != 1:
            return False
        if int(val["operateType"]) != 0:
            return False
        price = float(val["price"])
        num = int(val["num"])
        # if price * num * 100 < 50 * 10000:
        #     return False
        return True
    # 是否为涨停卖
    @classmethod
    def is_limit_up_price_sell(cls, val):
        if int(val["limitPrice"]) != 1:
            return False
        if int(val["operateType"]) != 2:
            return False
        price = float(val["price"])
        num = int(val["num"])
        # if price * num * 100 < 50 * 10000:
        #     return False
        return True
    # 是否涨停买撤
    @classmethod
    def is_limit_up_price_buy_cancel(cls, val):
        if int(val["limitPrice"]) != 1:
            return False
        if int(val["operateType"]) != 1:
            return False
        price = float(val["price"])
        num = int(val["num"])
        # if price * num * 100 < 50 * 10000:
        #     return False
        return True
    # 是否卖撤
    @classmethod
    def is_sell_cancel(cls, val):
        if int(val["operateType"]) == 3:
            return True
        return False
    # 是否为卖
    @classmethod
    def is_sell(cls, val):
        if int(val["operateType"]) == 2:
            return True
        return False
class L2TradeQueueUtils(object):
    # 获取成交进度索引
    def find_traded_progress_index(cls, buy_1_price, total_datas, local_today_num_operate_map, queueList):
    @classmethod
    def find_traded_progress_index(cls, buy_1_price, total_datas, local_today_num_operate_map, queueList,
                                   latest_not_limit_up_time=None):
        if len(queueList) == 0:
            return None
        # 补齐整数位5位
        buy_1_price_format = f"{buy_1_price}"
        while buy_1_price_format.find(".") < 4:
            buy_1_price_format = "0" + buy_1_price_format
        index_set = set()
        for num in queueList:
            buy_datas = local_today_num_operate_map.get(
                "{}-{}-{}".format(num, "0", buy_1_price))
                "{}-{}-{}".format(num, "0", buy_1_price_format))
            if buy_datas is not None and len(buy_datas) > 0:
                for data in buy_datas:
                    index_set.add(data["index"])
                    # 在最近一次非涨停买1更新的时间之后才有效
                    if latest_not_limit_up_time is None or tool.trade_time_sub(data["val"]["time"],
                                                                               latest_not_limit_up_time) >= 0:
                        index_set.add(data["index"])
        index_list = list(index_set)
        index_list.sort()
        num_list = []
@@ -29,6 +394,9 @@
        find_index = index_list_str.find(queue_list_str)
        if find_index >= 0:
            temp_str = index_list_str[0:find_index]
            if temp_str.endswith(","):
                temp_str = temp_str[:-1]
            return new_index_list[len(temp_str.split(","))]
        raise Exception("尚未找到成交进度")
l2/l2_log.py
@@ -1,15 +1,15 @@
from log import logger_l2_trade_cancel, logger_l2_trade_buy, logger_l2_trade
def debug(cls, code, content, *args):
    logger_l2_trade.debug(("thread-id={} code={}  ".format(cls.random_key[code], code) + content).format(*args))
def debug(random_key, code, content, *args):
    logger_l2_trade.debug(("thread-id={} code={}  ".format(random_key, code) + content).format(*args))
def buy_debug(cls, code, content, *args):
def buy_debug(random_key, code, content, *args):
    logger_l2_trade_buy.debug(
        ("thread-id={} code={}  ".format(cls.random_key[code], code) + content).format(*args))
        ("thread-id={} code={}  ".format(random_key, code) + content).format(*args))
def cancel_debug(cls, code, content, *args):
def cancel_debug(random_key, code, content, *args):
    logger_l2_trade_cancel.debug(
        ("thread-id={} code={}  ".format(cls.random_key[code], code) + content).format(*args))
        ("thread-id={} code={}  ".format(random_key, code) + content).format(*args))
l2/safe_count_manager.py
@@ -4,10 +4,10 @@
# 下单L2的安全笔数管理
import json
import l2_trade_factor
from trade import l2_trade_factor
import redis_manager
import tool
from l2_data_manager import L2DataUtil
from l2.l2_data_util import L2DataUtil
import l2_data_util
@@ -41,7 +41,7 @@
        self.__getRedis().setex(key, tool.get_expire(), json.dumps((buy_single_index, buy_exec_index, cancel_index)))
    def __get_latest_place_order_info(self, code):
        key = "latest_place_order_info-{}-{}".format(code)
        key = "latest_place_order_info-{}".format(code)
        val = self.__getRedis().get(key)
        if val is None:
            return None, None, None
l2/transaction_progress.py
@@ -8,7 +8,7 @@
import constant
import redis_manager
import tool
import l2_data_manager
from l2 import l2_data_manager
import l2.l2_data_util
@@ -46,10 +46,25 @@
            return None
        return int(val)
    # 最近的非涨停买1的时间
    def __save_latest_not_limit_up_time(self, code, time_str):
        key = "latest_not_limit_up_time-{}".format(code)
        self.__getRedis().setex(key, tool.get_expire(), time_str)
    def __get_latest_not_limit_up_time(self, code):
        key = "latest_not_limit_up_time-{}".format(code)
        self.__getRedis().get(key)
    # 保存数据,返回保存数据的条数
    def save(self, code, limit_up_price, queues):
    def save(self, code, limit_up_price, buy_1_price, buy_1_time, queues):
        # 如果买1不为涨停价就不需要保存
        if queues == self.last_buy_queue_data.get(code):
            return None
        if abs(float(buy_1_price) - float(limit_up_price)) >= 0.01:
            # 保存最近的涨停起始时间
            self.__save_latest_not_limit_up_time(code, buy_1_time)
            return None
        self.last_buy_queue_data[code] = queues
        min_num = round(constant.L2_MIN_MONEY / (limit_up_price * 100))
        num_list = []
@@ -64,8 +79,8 @@
    def save_traded_index(self, code, buy1_price, buyQueueBig):
        total_datas = l2_data_manager.local_today_datas.get(code)
        today_num_operate_map = l2_data_manager.local_today_num_operate_map.get(code)
        index = l2.l2_data_util.L2TradeQueueUtils.find_traded_progress_index(buy1_price, total_datas, total_datas,
                                                                             today_num_operate_map, buyQueueBig)
        index = l2.l2_data_util.L2TradeQueueUtils.find_traded_progress_index(buy1_price, total_datas,
                                                                             today_num_operate_map, buyQueueBig,self.__get_latest_not_limit_up_time(code))
        if index is not None:
            # 保存成交进度
            self.__save_buy_progress_index(code, index)
@@ -77,3 +92,8 @@
        index = self.__get_buy_progress_index(code)
        return index
if __name__ == '__main':
    pass
l2_code_operate.py
@@ -9,8 +9,8 @@
import client_manager
import gpcode_manager
import l2_data_manager
import l2_trade_util
from l2 import l2_data_manager
from trade import l2_trade_util
import server
import tool
l2_data_manager.py
File was deleted
l2_data_manager_new.py
File was deleted
l2_data_test.py
File was deleted
l2_data_util.py
@@ -4,12 +4,11 @@
"""
# 比较时间的大小
import datetime
import json
import time
from tool import async_call
import l2_data_manager
from l2 import l2_data_manager
import tool
@@ -72,18 +71,6 @@
    return _data
# 将数据根据num-operate分类
def load_num_operate_map(local_today_num_operate_map, code, source_datas, clear=False):
    if local_today_num_operate_map.get(code) is None:
        local_today_num_operate_map[code] = {}
    if clear:
        local_today_num_operate_map[code] = {}
    for data in source_datas:
        key = "{}-{}-{}".format(data["val"]["num"], data["val"]["operateType"], data["val"]["price"])
        if local_today_num_operate_map[code].get(key) is None:
            local_today_num_operate_map[code].setdefault(key, [])
        local_today_num_operate_map[code].get(key).append(data)
# 减去时间
l2_trade_test.py
@@ -1,22 +1,13 @@
# 交易测试
# 清除交易数据
import random
import unittest
from unittest import mock
import big_money_num_manager
import l2_data_manager
import l2_data_manager_new
import l2_trade_factor
import log
import redis_manager
import tool
import trade_data_manager
import trade_manager
from l2_data_manager import TradePointManager
from trade import trade_data_manager
from l2.l2_data_manager import TradePointManager
# from l2_data_manager_new import L2TradeDataProcessor, L2LimitUpMoneyStatisticUtil, AverageBigNumComputer
from trade_queue_manager import THSBuy1VolumnManager
def clear_trade_data(code):
@@ -112,5 +103,12 @@
#     print(buy_single_index, buy_exec_index, compute_index, num, count)
class TestData(unittest.TestCase):
    code = "002103"
    # l2_data_manager.load_l2_data(code)
    # TradeBuyQueue().save_traded_index(code, "6.94", [1511, 888, 796])
if __name__ == "__main__":
    unittest.main()
server.py
@@ -10,8 +10,6 @@
import threading
import time
import cv2
import alert_util
import client_manager
import code_volumn_manager
@@ -22,25 +20,21 @@
import authority
import juejin
import l2_data_log
import l2_data_manager
import l2_data_manager_new
from l2 import l2_data_manager_new, l2_data_manager
import l2_data_util
from l2.cancel_buy_strategy import HourCancelBigNumComputer
from l2.cancel_buy_strategy import HourCancelBigNumComputer, L2LimitUpMoneyStatisticUtil
from ocr import ocr_util
import ths_industry_util
import ths_util
import tool
import trade_data_manager
import trade_gui
import trade_manager
from trade import trade_gui, trade_data_manager, trade_manager
import l2_code_operate
from code_data_util import ZYLTGBUtil
import l2.transaction_progress
from log import logger_l2_error, logger_device, logger_trade_delegate, logger_buy_1_volumn_record, \
    logger_l2_trade_queue, logger_l2_latest_data, logger_l2_trade_buy_queue
from trade_queue_manager import THSBuy1VolumnManager, Buy1PriceManager, thsl2tradequeuemanager
from trade.trade_queue_manager import THSBuy1VolumnManager, Buy1PriceManager, thsl2tradequeuemanager
class MyTCPServer(socketserver.TCPServer):
@@ -204,12 +198,13 @@
                    gp_list = gpcode_manager.get_gp_list()
                    gp_code_set = set(gp_list)
                    now_str = tool.get_now_time_str()
                    for d in dataList:
                        if d["time"] == "00:00:00" or tool.get_time_as_second(now_str) < tool.get_time_as_second(
                                d["time"]):
                            continue
                        if d["code"] not in gp_code_set:
                            continue
                    if dataList:
                        for d in dataList:
                            if d["time"] == "00:00:00" or tool.get_time_as_second(now_str) < tool.get_time_as_second(
                                    d["time"]):
                                continue
                            if d["code"] not in gp_code_set:
                                continue
                        # 获取是否有涨停时间
                        # if limit_up_time_manager.get_limit_up_time(d["code"]) is None:
@@ -284,21 +279,27 @@
                    buy_one_volumn = data["buyOneVolumn"]
                    buy_queue = data["buyQueue"]
                    buy_queue_result_list = self.tradeBuyQueue.save(code, gpcode_manager.get_limit_up_price(code),
                                                                    buy_one_price, buy_time,
                                                                    buy_queue)
                    if buy_queue_result_list:
                        # 有数据
                        try:
                            buy_one_price_ = decimal.Decimal(round(float(buy_one_price), 2)).quantize(decimal.Decimal("0.00"))
                            buy_progress_index = self.tradeBuyQueue.save_traded_index(code,buy_one_price_,
                            buy_one_price_ = decimal.Decimal(round(float(buy_one_price), 2)).quantize(
                                decimal.Decimal("0.00"))
                            buy_progress_index = self.tradeBuyQueue.save_traded_index(code, buy_one_price_,
                                                                                      buy_queue_result_list)
                            if buy_progress_index is not None:
                                HourCancelBigNumComputer.set_trade_progress(code,buy_progress_index)
                                HourCancelBigNumComputer.set_trade_progress(code, buy_progress_index,
                                                                            l2_data_manager.local_today_datas.get(code),
                                                                            l2_data_manager.local_today_num_operate_map.get(
                                                                                code))
                            logger_l2_trade_buy_queue.info("获取成交位置成功: code-{} index-{}  数据-{}", code,
                                                           buy_progress_index,
                                                           json.loads(buy_queue_result_list))
                                                           json.dumps(buy_queue_result_list))
                        except Exception as e:
                            print("买入队列", code, buy_queue_result_list)
                            logger_l2_trade_buy_queue.warning("获取成交位置失败: code-{} 原因-{}  数据-{}", code, str(e),
                                                              json.loads(buy_queue_result_list))
                                                              json.dumps(buy_queue_result_list))
                    # buy_queue是否有变化
                    if self.l2_trade_buy_queue_dict.get(code) is None or buy_queue != self.l2_trade_buy_queue_dict.get(
@@ -317,8 +318,7 @@
                                l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, cancel_msg, "trade_queue")
                            if need_sync:
                                # 同步数据
                                l2_data_manager_new.L2LimitUpMoneyStatisticUtil.verify_num(code, int(buy_one_volumn),
                                                                                           buy_time)
                                L2LimitUpMoneyStatisticUtil.verify_num(0, code, int(buy_one_volumn), buy_time)
                    # print(buy_time, buy_one_price, buy_one_volumn)
                    # print("L2买卖队列",datas)
@@ -370,7 +370,7 @@
                                l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, cancel_msg, "trade_queue")
                            if need_sync:
                                # 同步数据
                                l2_data_manager_new.L2LimitUpMoneyStatisticUtil.verify_num(code, volumn, time_)
                                L2LimitUpMoneyStatisticUtil.verify_num(code, volumn, time_)
                elif type == 30:
                    # 心跳信息
@@ -500,7 +500,7 @@
if __name__ == "__main__":
    try:
        a=round(float("0002.90"),2)
        a = round(float("0002.90"), 2)
        print(decimal.Decimal(a).quantize(decimal.Decimal("0.00")))
        # repair_ths_main_site(2)
    except Exception as e:
ths_util.py
@@ -10,7 +10,7 @@
import redis_manager
import tool
import trade_gui
from trade import trade_gui
__redisManager = redis_manager.RedisManager(2)
trade/l2_trade_factor.py
File was renamed from l2_trade_factor.py
@@ -3,13 +3,11 @@
"""
# l2交易因子
import functools
import big_money_num_manager
import global_data_loader
import global_util
import limit_up_time_manager
import trade_data_manager
class L2TradeFactorUtil:
@@ -233,10 +231,10 @@
        else:
            count = 21
        volumn_day60_max, volumn_yest, volumn_today = cls.__get_volumns(code)
        rate = cls.get_volumn_rate(volumn_day60_max, volumn_yest, volumn_today)
        # volumn_day60_max, volumn_yest, volumn_today = cls.__get_volumns(code)
        # rate = cls.get_volumn_rate(volumn_day60_max, volumn_yest, volumn_today)
        # 取大单影响值与行业影响值的较大值
        count = round(count * (1 - rate))
        # count = round(count * (1 - rate))
        if count < 8:
            count = 8
        elif count > 21:
trade/l2_trade_util.py
trade/trade_data_manager.py
trade/trade_gui.py
File was renamed from trade_gui.py
@@ -11,10 +11,8 @@
import win32con
import constant
import gpcode_manager
import l2_trade_util
from trade import l2_trade_util
import redis_manager
import tool
from log import *
from tool import async_call
trade/trade_manager.py
File was renamed from trade_manager.py
@@ -5,14 +5,11 @@
# 交易管理器
import time
import constant
import gpcode_manager
import l2_trade_util
import mysql_data
import trade_data_manager
from trade_gui import THSBuyWinManagerNew, THSGuiTrade
from trade import trade_data_manager, l2_trade_util
from trade.trade_gui import THSBuyWinManagerNew, THSGuiTrade
import time as t
import l2_data_manager
from l2 import l2_data_manager
from log import *
trade/trade_queue_manager.py
File was renamed from trade_queue_manager.py
@@ -5,7 +5,7 @@
import gpcode_manager
import redis_manager
import tool
import trade_manager
from trade import trade_manager
class THSBuy1VolumnManager: