Administrator
2022-11-03 f03eb72394a3fac097bb3ab1f956a83f99f7bd0e
优化撤单与动态m值计算
14个文件已修改
401 ■■■■■ 已修改文件
data_process.py 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
gpcode_manager.py 65 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
gui.py 14 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
juejin.py 5 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_code_operate.py 31 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_data_log.py 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_data_manager.py 14 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_data_manager_new.py 75 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_data_util.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_trade_factor.py 58 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
log.py 11 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
server.py 72 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
tool.py 15 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade_queue_manager.py 33 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
data_process.py
@@ -23,6 +23,7 @@
        return dict["type"]
    except Exception as e:
        logging.exception(e)
        print(str)
        return -1
gpcode_manager.py
@@ -1,6 +1,7 @@
"""
股票代码管理器
"""
import code
import json
import random
import time
@@ -132,6 +133,7 @@
        return None
    return tool.to_price(decimal.Decimal(str(price)) * decimal.Decimal("1.1"))
# 获取跌停价
def get_limit_down_price(code):
    price = get_price_pre(code)
@@ -180,6 +182,38 @@
def set_listen_code_by_pos(client_id, pos, code):
    redis_instance = __redisManager.getRedis()
    redis_instance.setex("listen_code-{}-{}".format(client_id, pos), tool.get_expire(), code)
    # 同步监听的代码集合
    __sync_listen_codes_pos()
# 同步监听代码位置信息
def __sync_listen_codes_pos():
    # 获取已经正在监听的代码
    keys = redis_instance.keys("code_listen_pos-*")
    codes_set = set()
    for key in keys:
        codes_set.add(key.replace("code_listen_pos-", ""))
    keys = redis_instance.keys("listen_code-*-*")
    for key in keys:
        result = redis_instance.get(key)
        if result:
            # 移除需要添加的代码
            codes_set.discard(result)
            client_id_, pos_, code_ = int(key.split("-")[1]), int(key.split("-")[2]), result
            key_ = "code_listen_pos-{}".format(code_)
            val = redis_instance.get(key_)
            if val is None:
                redis_instance.setex(key_, tool.get_expire(), json.dumps((client_id_, pos_)))
            else:
                val = json.loads(val)
                if val[0] != client_id_ or val[1] != pos_:
                    redis_instance.setex(key_, tool.get_expire(), json.dumps((client_id_, pos_)))
    # 移除没有监听的代码
    for code_ in codes_set:
        redis_instance.delete(code_)
# 初始化位置
@@ -227,18 +261,23 @@
# 获取正在监听的代码的位置
def get_listen_code_pos(code):
    redis_instance = __redisManager.getRedis()
    keys = redis_instance.keys("listen_code-*-*")
    for key in keys:
        result = redis_instance.get(key)
        if result is not None and code == result:
            return key.split("-")[1], key.split("-")[2]
    return None, None
    val = redis_instance.get("code_listen_pos-{}".format(code))
    if val is None:
        return None, None
    val = json.loads(val)
    return val[0], val[1]
# 是否正在监听
def is_listen(code):
    codes = get_listen_codes()
    return codes.__contains__(code)
    redis_instance = __redisManager.getRedis()
    val = redis_instance.get("code_listen_pos-{}".format(code))
    if val is None:
        return False
    else:
        return True
    # codes = get_listen_codes()
    # return codes.__contains__(code)
# 监听是否满了
@@ -267,10 +306,6 @@
if __name__ == '__main__':
    # print(get_can_listen_pos(0))
    # print(get_listen_codes())
    # print(is_listen_full())
    # print(is_listen("002271"))
    # print(get_listen_code_pos("002272"))
    code = get_listen_code_by_pos(2, 7)
    print(code)
    _start = time.time()
    is_listen("002703")
    print( (time.time() - _start) * 1000)
gui.py
@@ -21,7 +21,6 @@
from server import *
# 读取server进程的消息
from trade_data_manager import CodeActualPriceProcessor
@@ -305,9 +304,10 @@
            except:
                pass
            try:
                cl_price_count.configure(text="{}".format(self.codeActualPriceProcessor.get_current_price_codes_count()), foreground="#008000")
                cl_price_count.configure(
                    text="{}".format(self.codeActualPriceProcessor.get_current_price_codes_count()),
                    foreground="#008000")
            except:
                pass
@@ -460,6 +460,8 @@
                except Exception as e:
                    showerror("修复出错", str(e))
            # 创建界面
            win = Tk()
            win.title("检测结果")
@@ -484,12 +486,18 @@
            win.geometry("300x300")
            win.mainloop()
        def init():
            juejin.everyday_init()
        width = 800
        height = 290
        frame = Frame(root, {"height": height, "width": width, "bg": "#DDDDDD"})
        cl = Label(frame, text="L2采集状态", bg="#DDDDDD")
        cl.place(x=5, y=5)
        btn = Button(frame, text="每日初始化", command=init)
        btn.place(x=width - 250, y=5)
        btn = Button(frame, text="刷新数据", command=refresh_data)
        btn.place(x=width - 150, y=5)
        auo_refresh = IntVar()
juejin.py
@@ -94,7 +94,10 @@
    # 初始化同花顺主站
    l2_clients = client_manager.getValidL2Clients()
    for client in l2_clients:
        server.repair_ths_main_site(client)
        try:
            server.repair_ths_main_site(client)
        except Exception as e:
            pass
def __run_schedule():
l2_code_operate.py
@@ -18,6 +18,9 @@
import redis_manager
from log import logger_code_operate
__reset_code_dict = {}
__set_operate_code_state_dict = {}
class L2CodeOperate(object):
    __instance = None
@@ -117,7 +120,7 @@
                        L2CodeOperate.setGPCode(client_id, pos, code)
                    # 修复l2的数据错误
                    elif type == 3:
                        if tool.is_trade_time():
                        if tool.is_set_code_time():
                            client = data["client"]
                            data = data["data"]
                            result = server.send_msg(client, data)
@@ -195,6 +198,30 @@
        return redis.get("operate_queue_read_state") is not None
# 通过l2代码校验代码位
@tool.async_call
def verify_with_l2_data_pos_info(code, client, channel):
    code_ = gpcode_manager.get_listen_code_by_pos(client, channel)
    if code_ != code:
        key = "{}-{}-{}".format(client, channel, code)
        # 间隔2s
        if key not in __reset_code_dict or round(
                time.time() * 1000) - __reset_code_dict[key] > 2000:
            L2CodeOperate.set_operate_code_state(client, channel, 0)
            __reset_code_dict[key] = round(time.time() * 1000)
            if code_ is None:
                code_ = ""
            if tool.is_set_code_time():
                L2CodeOperate.repaire_operate(int(client), int(channel), code_)
    else:
        key = "{}-{}".format(client, channel)
        if key not in __set_operate_code_state_dict or round(
                time.time() * 1000) - __set_operate_code_state_dict[key] > 1000:
            __set_operate_code_state_dict[key] = round(time.time() * 1000)
            L2CodeOperate.set_operate_code_state(client, channel, 1)
# 获取客户端正在监听的代码
def get_listen_codes_from_client(client_id):
@@ -223,7 +250,7 @@
                code = gpcode_manager.get_listen_code_by_pos(client_id, index)
                if code is not None and len(code) > 0 and index_codes.get(index) != code:
                    # 交易时间才修复代码
                    if tool.is_trade_time():
                    if tool.is_set_code_time():
                        L2CodeOperate().repaire_operate(client_id, index, code)
                elif code is None or len(code) == 0 and index_codes.get(index) is not None:
                    # 删除前端代码位
l2_data_log.py
@@ -5,8 +5,9 @@
def l2_time(code, time_, description, new_line=False):
    log.logger_l2_process_time.info("{}: {}-{}{}", description, code, time_, "\n" if new_line else "")
    return int(time.time() * 1000)
    timestamp = int(time.time() * 1000)
    log.logger_l2_process_time.info("{} {}: {}-{}{}",timestamp, description, code, time_, "\n" if new_line else "")
    return timestamp
class TradeLog:
l2_data_manager.py
@@ -14,6 +14,7 @@
import data_process
import global_data_loader
import global_util
import l2_data_log
import l2_data_util
import gpcode_manager
@@ -200,6 +201,7 @@
    l2_data_util.load_num_operate_map(local_today_num_operate_map, code, local_today_datas.get(code), force)
@tool.async_call
def saveL2Data(code, datas, msg=""):
    start_time = round(t.time() * 1000)
    # 查询票是否在待监听的票里面
@@ -259,12 +261,11 @@
def save_l2_data(code, datas, add_datas):
    redis = _redisManager.getRedis()
    # 保存最近的数据
    __start_time = round(t.time()* 1000)
    redis.setex("l2-data-latest-{}".format(code), tool.get_expire(), json.dumps(datas))
    l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time, "保存最近l2数据用时")
    # 设置进内存
    if code in local_latest_datas:
        local_latest_datas[code] = datas
    else:
        local_latest_datas.setdefault(code, datas)
    local_latest_datas[code] = datas
    __set_l2_data_latest_count(code, len(datas))
    if len(add_datas) > 0:
        saveL2Data(code, add_datas)
@@ -394,7 +395,8 @@
                # 数据重复次数默认为1
                datas.append({"key": key, "val": item, "re": 1})
                dataIndexs.setdefault(key, len(datas) - 1)
        l2_data_util.save_big_data(code, same_time_num, data)
        # TODO 测试的时候开启,方便记录大单数据
        #l2_data_util.save_big_data(code, same_time_num, data)
        return datas
@@ -2125,6 +2127,8 @@
            process_end_index = cancel_index
        # 保存最新累计金额
        # cls.__set_l2_latest_money_record(code, process_end_index, total_num)
        l2_data_log.l2_time(code, round(t.time() * 1000) - start_time, "l2数据封单额计算时间",
                            False)
        if cancel_index:
            return total_datas[cancel_index], cancel_msg
        return None, None
l2_data_manager_new.py
@@ -226,7 +226,8 @@
                    __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time, "l2数据处理时间")
                # 保存数据
                l2_data_manager.save_l2_data(code, datas, add_datas)
                __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time, "保存数据时间")
                __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time,
                                                   "保存数据时间({})".format(len(add_datas)))
        finally:
            if code in cls.unreal_buy_dict:
@@ -235,23 +236,10 @@
    # 处理未挂单
    @classmethod
    def __process_not_order(cls, code, start_index, end_index, capture_time):
        _start_time = t.time()
        # 获取阈值
        threshold_money, msg = cls.__get_threshmoney(code)
        cls.__start_compute_buy(code, start_index, end_index, threshold_money, capture_time)
    @classmethod
    def __statistic_count_l2_data_for_cancel(cls, code, start_index, end_index, has_cancel_single=False):
        index, old_buy_count, old_cancel_count = l2_data_manager.TradePointManager.get_count_info_for_cancel_buy(code)
        for i in range(start_index, end_index + 1):
            buy_count, buy_cancel_count = cls.__count_l2_data_for_cancel(code, i, i)
            old_buy_count += buy_count
            old_cancel_count += buy_cancel_count
            if old_buy_count > 0 and (old_buy_count - old_cancel_count) / old_buy_count < 0.3 and has_cancel_single:
                return i, True
        l2_data_manager.TradePointManager.set_count_info_for_cancel_buy(code, end_index, old_buy_count,
                                                                        old_cancel_count)
        return end_index, False
    # 处理已挂单
    @classmethod
@@ -274,7 +262,7 @@
        if cancel_data:
            cls.debug(code, "触发撤单,撤单位置:{} ,撤单原因:{}", cancel_data["index"], cancel_msg)
            # 撤单
            cls.cancel_buy(code)
            cls.cancel_buy(code,cancel_msg)
            # 继续计算下单
            cls.__process_not_order(code, cancel_data["index"] + 1, end_index, capture_time)
        else:
@@ -338,12 +326,12 @@
            if codes_index.get(code) != 0:
                return False, "水下捞,不是老大,是老{}".format(codes_index.get(code))
        # 13:00后涨停,本板块中涨停票数<29不能买
        # 13:30后涨停,本板块中涨停票数<29不能买
        limit_up_time = limit_up_time_manager.get_limit_up_time(code)
        if limit_up_time is not None:
            if int(limit_up_time.replace(":", "")) >= 130000 and global_util.industry_hot_num.get(industry) is not None:
                if global_util.industry_hot_num.get(industry) < 29:
                    return False, "13:00后涨停,本板块中涨停票数<29不能买"
            if int(limit_up_time.replace(":", "")) >= 133000 and global_util.industry_hot_num.get(industry) is not None:
                if global_util.industry_hot_num.get(industry) < 16:
                    return False, "13:30后涨停,本板块中涨停票数<16不能买"
        if codes_index.get(code) is not None and codes_index.get(code) == 1:
            # 如果老大已经买成功了,老二就不需要买了
@@ -392,11 +380,8 @@
            cls.debug(code, "执行撤单异常:{}", str(e))
    @classmethod
    def cancel_buy(cls, code):
        # 删除大群撤事件的大单
        l2_data_manager.L2BetchCancelBigNumProcessor.del_recod(code)
    def cancel_buy(cls, code, msg=None):
        l2_data_manager.L2ContinueLimitUpCountManager.del_data(code)
        if code in cls.unreal_buy_dict:
            cls.unreal_buy_dict.pop(code)
            # 取消买入标识
@@ -404,22 +389,22 @@
            l2_data_manager.TradePointManager.delete_buy_cancel_point(code)
            l2_data_manager.TradePointManager.delete_compute_info_for_cancel_buy(code)
            l2_data_manager.TradePointManager.delete_count_info_for_cancel_buy(code)
            # 删除大群撤事件的大单
            l2_data_manager.L2BetchCancelBigNumProcessor.del_recod(code)
        else:
            cls.__cancel_buy(code)
        l2_data_manager.L2BigNumProcessor.del_big_num_pos(code)
        cls.debug(code, "执行撤单成功,原因:{}", msg)
    @classmethod
    def __start_compute_buy(cls, code, compute_start_index, compute_end_index, threshold_money, capture_time,
                            new_add=True):
        if compute_end_index < compute_start_index:
            return
        _start_time = t.time()
        total_datas = local_today_datas[code]
        # 获取买入信号计算起始位置
        buy_single_index, buy_exec_index, buy_compute_index, num, count = cls.__get_order_begin_pos(code)
        # 是否为新获取到的位置
        if buy_single_index is None:
            # 有买入信号
@@ -436,10 +421,16 @@
        if buy_single_index is None:
            # 未获取到买入信号,终止程序
            return None
        _start_time = t.time()
        # 计算m值大单
        cls.l2BigNumForMProcessor.process(code, max(buy_single_index, compute_start_index), compute_end_index,
                                          gpcode_manager.get_limit_up_price(code))
        _start_time = t.time()
        threshold_money, msg = cls.__get_threshmoney(code)
        _start_time = t.time()
        # 买入纯买额统计
        compute_index, buy_nums, buy_count, rebegin_buy_pos = cls.__sum_buy_num_for_order_3(code, max(buy_single_index,
                                                                                                      compute_start_index),
@@ -447,8 +438,10 @@
                                                                                            count, threshold_money,
                                                                                            buy_single_index,
                                                                                            capture_time)
        cls.debug(code, "m值-{} m值因子-{}", threshold_money, msg)
        _start_time = t.time()
        # 买入信号位与计算位置间隔2s及以上了
        if rebegin_buy_pos is not None:
            # 需要重新计算纯买额
@@ -485,6 +478,8 @@
            # 未达到下单条件,保存纯买额,设置纯买额
            # 记录买入信号位置
            cls.__save_order_begin_data(code, buy_single_index, -1, compute_end_index, buy_nums, buy_count)
            print("保存大单时间", round((t.time() - _start_time) * 1000))
            _start_time = t.time()
        pass
    # 获取下单起始信号
@@ -551,6 +546,7 @@
    @classmethod
    def __sum_buy_num_for_order_3(cls, code, compute_start_index, compute_end_index, origin_num, origin_count,
                                  threshold_money, buy_single_index, capture_time):
        _start_time = t.time()
        total_datas = local_today_datas[code]
        buy_nums = origin_num
        buy_count = origin_count
@@ -563,6 +559,7 @@
        threshold_count = l2_trade_factor.L2TradeFactorUtil.get_safe_buy_count(code)
        buy_single_time_seconds = L2DataUtil.get_time_as_second(total_datas[buy_single_index]["val"]["time"])
        for i in range(compute_start_index, compute_end_index + 1):
            data = total_datas[i]
            _val = total_datas[i]["val"]
@@ -617,14 +614,16 @@
        cls.buy_debug(code, "尚未获取到买入执行点,起始计算位置:{} 统计纯买手数:{} 目标纯买手数:{}  统计纯买单数:{} 目标纯买单数:{}", compute_start_index,
                      buy_nums,
                      threshold_num, buy_count, threshold_count)
        return None, buy_nums, buy_count, None
    @classmethod
    def test(cls):
        code = "002898"
        code = "002556"
        l2_trade_test.clear_trade_data(code)
        load_l2_data(code, True)
        _start = t.time()
        if True:
            state = trade_manager.get_trade_state(code)
            cls.random_key[code] = random.randint(0, 100000)
@@ -632,15 +631,15 @@
            try:
                if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER:
                    # 已挂单
                    cls.__process_order(code, 0, 140, capture_timestamp)
                    cls.__process_order(code, 1552, 1641, capture_timestamp)
                else:
                    # 未挂单
                    cls.__process_not_order(code, 0, 140, capture_timestamp)
                    cls.__process_not_order(code, 1552, 1641, capture_timestamp)
            except Exception as e:
                logging.exception(e)
            print("处理时间", round((t.time() - _start) * 1000))
            return
        _start = t.time()
        # 按s批量化数据
        total_datas = local_today_datas[code]
        start_time = total_datas[0]["val"]["time"]
@@ -671,6 +670,20 @@
        print("时间花费:", round((t.time() - _start) * 1000))
    @classmethod
    def test1(cls):
        code = "002556"
        l2_trade_test.clear_trade_data(code)
        l2_data_manager.local_latest_datas[code] = []
        load_l2_data(code, True)
        _start = t.time()
        capture_timestamp = 1999999999
        cls.process(code, l2_data_manager.local_today_datas[code][1552:1641], capture_timestamp)
        print("时间花费:", round((t.time() - _start) * 1000))
        pass
if __name__ == "__main__":
    L2TradeDataProcessor.test()
    print("----------------------")
    L2TradeDataProcessor.test()
l2_data_util.py
@@ -197,7 +197,7 @@
    return None
# l2数据拼接工具
# l2数据拼接工具 TODO 暂时还未启用
class L2DataConcatUtil:
    # 初始化
l2_trade_factor.py
@@ -16,12 +16,15 @@
        yi = round(zyltgb / 100000000)
        if yi < 1:
            yi = 1
        return 5000000 + (yi - 1) * 500000
        m = 5000000 + (yi - 1) * 500000
        return round(m * 0.7)
    # 获取行业影响比例
    # total_limit_percent为统计的比例之和乘以100
    @classmethod
    def get_industry_rate(cls, total_limit_percent):
        if total_limit_percent is None:
            return 0
        t = total_limit_percent / 10
        if t < 0.9:
            return 0
@@ -29,26 +32,15 @@
            return 0.2
        elif t <= 1.6:
            return 0
        elif t <= 2.1:
            return 0.03
        elif t <= 2.6:
            return 0.06
        elif t <= 3.1:
            return 0.09
        elif t <= 3.6:
            return 0.12
        elif t <= 4.1:
            return 0.15
        elif t <= 4.6:
            return 0.18
        elif t <= 5.1:
            return 0.21
        elif t <= 5.6:
            return 0.24
        elif t <= 6.1:
            return 0.27
        else:
            return 0.30
            rate = 0
            for i in range(0, 30):
                if t <= 2.1 + 0.5 * i:
                    rate = 0.03 * (i + 1)
                    break
            if rate > 0.9:
                rate = 0.9
            return rate
    # 获取量影响比例
    @classmethod
@@ -138,9 +130,9 @@
        factors = cls.__get_rate_factors(code)
        return cls.compute_rate(factors[0], factors[1], factors[2], factors[3], factors[4], factors[5], factors[6])
    # 获取代码当前所在的行业热度
    @classmethod
    def __get_rate_factors(cls, code):
        zyltgb = global_util.zyltgb_map.get(code)
    def __get_industry_limit_percent(cls, code):
        # 获取行业热度
        industry = global_util.code_industry_map.get(code)
        if industry is None:
@@ -149,10 +141,16 @@
        total_industry_limit_percent = global_util.industry_hot_num.get(industry) if industry is not None else None
        # 当前票是否涨停
        if total_industry_limit_percent is not None:
            if code in global_util.limit_up_codes_percent:
                total_industry_limit_percent -= global_util.limit_up_codes_percent[code]
        # if total_industry_limit_percent is not None:
        #     if code in global_util.limit_up_codes_percent:
        #         # 减去当前票的涨幅
        #         total_industry_limit_percent -= global_util.limit_up_codes_percent[code]
        return total_industry_limit_percent
    @classmethod
    def __get_rate_factors(cls, code):
        zyltgb = global_util.zyltgb_map.get(code)
        total_industry_limit_percent = cls.__get_industry_limit_percent(code)
        # 获取量
        volumn_day60_max, volumn_yest, volumn_today = global_util.max60_volumn.get(
            code), global_util.yesterday_volumn.get(code), global_util.today_volumn.get(code)
@@ -208,9 +206,9 @@
            return 8
        count = gb // 100000000
        if count > 30:
            return 30
            count = 30
        if count < 5:
            return 5
            count = 5
        big_money_num = global_util.big_money_num.get(code)
        if big_money_num is None:
@@ -219,7 +217,11 @@
        if big_money_num is not None:
            rate = cls.get_big_money_rate(big_money_num)
        return round(count*(1-rate/2))
        # 获取行业热度对应的比例
        total_industry_limit_percent = cls.__get_industry_limit_percent(code)
        industry_rate = cls.get_industry_rate(total_industry_limit_percent)
        # 取大单影响值与行业影响值的较大值
        return round(count * (1 - max(rate, industry_rate)))
# l2因子归因数据
log.py
@@ -54,6 +54,9 @@
        logger.add(self.get_path("ths", "code_operate"),
                   filter=lambda record: record["extra"].get("name") == "code_operate",
                   rotation="00:00", compression="zip", enqueue=True)
        # 显示在控制台
        logger.add(sys.stdout,
                   filter=lambda record: record["extra"].get("name") == "code_operate")
        logger.add(self.get_path("device", "device"), filter=lambda record: record["extra"].get("name") == "device",
                   rotation="00:00", compression="zip", enqueue=True)
@@ -133,8 +136,8 @@
if __name__ == '__main__':
    logger_l2_process_time.info("test123")
    # logger_l2_process_time.info("test123")
    # date = datetime.datetime.now().strftime("%Y-%m-%d")
    # LogUtil.extract_log_from_key("003005", "D:/logs/gp/l2/l2_process_time.{}.log".format(date),
    #                              "D:/logs/gp/l2/l2_process_time{}.{}.log".format("003005", date))
    date = datetime.datetime.now().strftime("%Y-%m-%d")
    LogUtil.extract_log_from_key("002846", "D:/logs/gp/l2/l2_process_time.{}.log".format(date),
                                 "D:/logs/gp/l2/l2_process_time{}.{}.log".format("002846", date))
server.py
@@ -45,8 +45,6 @@
class MyBaseRequestHandle(socketserver.BaseRequestHandler):
    reset_code_dict = {}
    set_operate_code_state_dict = {}
    l2_data_error_dict = {}
    last_trade_delegate_data = None
    buy1_volumn_manager = THSBuy1VolumnManager()
@@ -73,7 +71,7 @@
            data = sk.recv(102400)
            if len(data) == 0:
                # print("客户端断开连接")
                break;
                break
            _str = str(data, encoding="gbk")
            if len(_str) > 0:
                # print("结果:",_str)
@@ -82,57 +80,41 @@
                if type == 0:
                    try:
                        origin_start_time = round(time.time() * 1000)
                        __start_time = round(time.time() * 1000)
                        _start_time = round(time.time() * 1000)
                        # level2盘口数据
                        day, client, channel, code, capture_time, process_time, datas, origin_datas = l2_data_manager.parseL2Data(
                            _str)
                        # 10ms的网络传输延时
                        capture_timestamp = __start_time - process_time - 10
                        __start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time,
                                                           "截图时间:{} 数据解析时间".format(process_time))
                        __start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time, "数据解析时间")
                        # try:
                        #     self.pipe_ui.send(
                        #         json.dumps({"type": "l2_data_notify", "data": {"count": len(datas), "code": code}}))
                        # except:
                        #     pass
                        # 过时 保存l2截图时间
                        # TradeCancelDataManager.save_l2_capture_time(client, channel, code, capture_time)
                        cid, pid = gpcode_manager.get_listen_code_pos(code)
                        __start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time,
                                                           "l2获取代码位置耗时")
                        # 判断目标代码位置是否与上传数据位置一致
                        if cid is not None and pid is not None and client == int(cid) and channel == int(pid):
                            try:
                                # print("L2数据接受",day,code,len(datas))
                                # 查询
                                code_ = gpcode_manager.get_listen_code_by_pos(client, channel)
                                if code_ != code:
                                    key = "{}-{}-{}".format(client, channel, code)
                                    # 间隔2s
                                    if key not in self.reset_code_dict or round(
                                            time.time() * 1000) - self.reset_code_dict[key] > 2000:
                                        self.l2CodeOperate.set_operate_code_state(client, channel, 0)
                                        self.reset_code_dict[key] = round(time.time() * 1000)
                                        if code_ is None:
                                            code_ = ""
                                        if tool.is_trade_time():
                                            self.l2CodeOperate.repaire_operate(int(client), int(channel), code_)
                                else:
                                    key = "{}-{}".format(client, channel)
                                    if key not in self.set_operate_code_state_dict or round(
                                            time.time() * 1000) - self.set_operate_code_state_dict[key] > 1000:
                                        self.set_operate_code_state_dict[key] = round(time.time() * 1000)
                                        self.l2CodeOperate.set_operate_code_state(client, channel, 1)
                                __start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time,
                                                                   "l2数据正确性判断时间")
                                # 校验客户端代码
                                l2_code_operate.verify_with_l2_data_pos_info(code, client, channel)
                                __start_time = round(time.time() * 1000)
                                if gpcode_manager.is_listen(code):
                                    __start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time,
                                                                       "l2外部数据预处理耗时")
                                    l2_data_manager_new.L2TradeDataProcessor.process(code, datas, capture_timestamp)
                                    __start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time,
                                                                       "l2数据有效处理外部耗时",
                                                                       False)
                                    # 保存原始数据数量
                                    l2_data_util.save_l2_latest_data_number(code, len(origin_datas))
                                    if round(time.time() * 1000) - __start_time > 20:
                                        l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time,
                                                            "异步保存原始数据条数耗时",
                                                            False)
                            except l2_data_manager.L2DataException as l:
                                # 单价不符
                                if l.get_code() == l2_data_manager.L2DataException.CODE_PRICE_ERROR:
@@ -155,8 +137,9 @@
                                __end_time = round(time.time() * 1000)
                                # 只记录大于40ms的数据
                                if __end_time - __start_time > 40:
                                    l2_data_log.l2_time(code, round(time.time() * 1000) - _start_time, "l2数据处理总耗时",
                                if __end_time - origin_start_time > 100:
                                    l2_data_log.l2_time(code, round(time.time() * 1000) - origin_start_time,
                                                        "l2数据处理总耗时",
                                                        True)
                    except Exception as e:
                        logging.exception(e)
@@ -171,8 +154,7 @@
                    # 获取基本信息
                    code_datas = juejin.JueJinManager.get_gp_latest_info(code_list)
                    gpcode_manager.set_gp_list(code_datas)
                    # 重新订阅
                    self.server.pipe_juejin.send(json.dumps({"type": "resub"}))
                    # 同步同花顺目标代码
                    t1 = threading.Thread(target=lambda: sync_target_codes_to_ths())
                    t1.setDaemon(True)
@@ -257,11 +239,13 @@
                                seconds = seconds - seconds % 3
                            time_ = tool.time_seconds_format(seconds)
                            # 保存数据
                            need_sync = self.buy1_volumn_manager.save(code, time_, volumn,price)
                            need_sync, need_cancel, cancel_msg = self.buy1_volumn_manager.save(code, time_, volumn,
                                                                                               price)
                            if need_cancel:
                                l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, cancel_msg)
                            if need_sync:
                                # 同步数据
                                l2_data_manager.L2LimitUpMoneyStatisticUtil.verify_num(code, volumn, time_)
                elif type == 30:
                    # 心跳信息
tool.py
@@ -56,6 +56,21 @@
        return False
def is_set_code_time():
    # 测试
    if constant.TEST:
        return True
    relative_timestamp = t.time() % (24 * 60 * 60) + 8 * 60 * 60
    start1 = 60 * 60 * 9 + 14 * 60;
    end1 = 60 * 60 * 11 + 35 * 60;
    start2 = 60 * 60 * 12 + 50 * 60;
    end2 = 60 * 60 * 15 + 5 * 60;
    if start1 < relative_timestamp < end1 or start2 < relative_timestamp < end2:
        return True
    else:
        return False
def run_time():
    def decorator(func):
        def infunc(*args, **kwargs):
trade_queue_manager.py
@@ -5,6 +5,7 @@
import gpcode_manager
import redis_manager
import tool
import trade_manager
class THSBuy1VolumnManager:
@@ -57,14 +58,14 @@
        key = "buy1_volumn_codes"
        return self.__get_redis().smembers(key)
    # 返回是否需要更新数据
    # 返回是否需要更新数据,是否需要撤单,撤单原因
    def save(self, code, time_str, volumn, price):
        # 客户端数据未加载出来过滤
        if volumn < 1:
            return False
            return False, False, None
        # 14:55:00之后不在处理
        if int(time_str.replace(':', '')) >= int("145500"):
            return False
            return False, False, None
        self.__add_recod(code)
        # 判断是否为涨停价
@@ -72,10 +73,10 @@
        if limit_up_price != tool.to_price(decimal.Decimal(price)):
            # 非涨停价
            volumn = 0
        last_volumn = self.__last_data[code]
        # 不保存和上一次相同的数据
        if code in self.__last_data and self.__last_data[code] == volumn:
            return False
        if code in self.__last_data and last_volumn == volumn:
            return False, False, None
        self.__last_data[code] = volumn
        if code not in self.__code_time_volumn_dict:
@@ -94,7 +95,25 @@
            self.__save_last_recod(code, keys[0], self.__code_time_volumn_dict[code][keys[0]])
        self.__save_recod(code, time_str, volumn)
        return True
        # 如果当前已挂单
        state = trade_manager.get_trade_state(code)
        if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER:
            # 判断本次与上一次的封单额是否小于5000w
            limit_up_price = gpcode_manager.get_limit_up_price(code)
            threshold_num = 50000000 // (limit_up_price * 100)
            if volumn < threshold_num and last_volumn < threshold_num:
                # 下降趋势
                if volumn < last_volumn:
                    if (last_volumn - volumn) / last_volumn > 0.5:
                        return True, True, "连续两次封单量降幅达50%以上,时间:{} 封单量:{}-{}".format(time_str, last_volumn, volumn)
                    # 当封单额小于1000万需要撤单
                    min_num = 10000000 // (limit_up_price * 100)
                    if volumn < min_num:
                        return True, True, "最新封单额小于1000万,时间:{} 封单量:{}".format(time_str, volumn)
        return True, False, None
    # 获取校验数据
    # 返回上一次的数据,如果没有上一次的就返回本次的