Administrator
2023-02-05 1252c9489b631905fbce608109260760537b224f
S撤,安全笔数计算优化
1 文件已重命名
17个文件已修改
6个文件已添加
1627 ■■■■■ 已修改文件
alert_util.py 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
constant.py 17 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
data_export_util.py 11 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
gui.py 13 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
juejin.py 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/cancel_buy_strategy.py 357 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_util.py 37 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_log.py 15 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/safe_count_manager.py 136 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/transaction_progress.py 79 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_code_operate.py 3 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_data_log.py 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_data_manager.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_data_manager_new.py 603 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_data_util.py 22 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_trade_test.py 93 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_trade_util.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
log.py 27 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ocr/ocr_server.py 97 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ocr/ocr_util.py 8 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
server.py 74 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade_gui.py 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade_manager.py 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade_queue_manager.py 8 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
alert_util.py
@@ -4,11 +4,12 @@
# 报警
import constant
import tool
def alarm():
    if not tool.is_alert_time():
    if not tool.is_alert_time() or constant.TEST:
        return
    # TODO 暂时关闭报警
    AlertUtil().stop_audio()
constant.py
@@ -1,11 +1,22 @@
# 是否为测试
TEST = True
TEST = False
# 是否允许交易
TRADE_ENABLE = False
# 水下捞累计连续水下时间最小值
UNDER_WATER_PRICE_TIME_AS_SECONDS = 1200
# 大单金额(单位为百)
BIG_MONEY_AMOUNT = 29900
# 大单笔数
BIG_MONEY_NUM = 7888
#h撤大单笔数
H_CANCEL_BUY_COUNT = 40
# S撤比例
S_CANCEL_FIRST_RATE = 0.79
S_CANCEL_SECOND_RATE = 0.69
S_CANCEL_THIRD_RATE = 0.59
# h撤大单笔数
H_CANCEL_BUY_COUNT = 40
# H撤单比例
H_CANCEL_RATE = 0.79
# L2监控的最低金额
L2_MIN_MONEY = 500000
data_export_util.py
@@ -7,6 +7,7 @@
import xlwt
import gpcode_manager
import l2_data_util
import log
@@ -46,7 +47,7 @@
        num_dict[data["val"]["num"]].append(data)
    local_time = time.strftime("%Y%m%dT%H%M%S", time.localtime(time.time()))
    file_name = "{}/{}_{}.xls".format(dest_dir, code, local_time)
    file_name = "{}/{}_{}_{}.xls".format(dest_dir, code, gpcode_manager.get_code_name(code), local_time)
    file_name_txt = "{}/{}_{}.txt".format(dest_dir, code, local_time)
    openfile = open(file_name_txt, 'w')
    try:
@@ -175,11 +176,11 @@
        if cancel_time > 0:
            cancel_time = "{}".format(cancel_time)
            if data["cancelTimeUnit"] == 0:
                cancel_time += "s";
                cancel_time += "s"
            elif data["cancelTimeUnit"] == 1:
                cancel_time += "m";
                cancel_time += "m"
            elif data["cancelTimeUnit"] == 2:
                cancel_time += "h";
                cancel_time += "h"
        ws.write(index, 2, cancel_time)
        ws.write(index, 3, data["price"])
@@ -193,6 +194,6 @@
if __name__ == "__main__":
    codes = ["002363"]
    codes = ["002842"]
    for code in codes:
        export_l2_excel(code)
gui.py
@@ -19,6 +19,7 @@
from juejin import JueJinManager
from l2_code_operate import L2CodeOperate
from l2_trade_factor import L2TradeFactorUtil
from ocr import ocr_server
from server import *
@@ -61,6 +62,12 @@
    # tcpserver.handle_request()  # 只接受一个客户端连接
    tcpserver.serve_forever()  # 永久循环执行,可以接受多个客户端连接
def createOCRServer():
    print("create OCRServer")
    laddr = "", 9002
    tcpserver = ocr_server.MyThreadingTCPServer(laddr, ocr_server.MyBaseRequestHandle)
    tcpserver.serve_forever()
def startJueJin(pipe):
    juejin.JueJinManager(pipe).start()
@@ -74,6 +81,11 @@
        self.serverProcess = multiprocessing.Process(target=createServer, args=(p1, gs_server_pipe,))
        self.jueJinProcess = multiprocessing.Process(target=startJueJin, args=(p2,))
        self.ocrServerProcess = multiprocessing.Process(target=createOCRServer)
        self.p1 = p1
        self.p2 = p2
        self.gs_gui_pipe = gs_gui_pipe
@@ -106,6 +118,7 @@
        # TODO
        self.jueJinProcess.start()
        self.serverProcess.start()
        self.ocrServerProcess.start()
        L2CodeOperate.get_instance()
        # 客户端队列操作
        process = multiprocessing.Process(target=L2CodeOperate.run())
juejin.py
@@ -59,7 +59,6 @@
def init_data():
    # 删除之前的分钟级大单撤单数据
    l2_data_manager_new.SecondAverageBigNumComputer.clear_data()
    l2_data_manager_new.AverageBigNumComputer.clear_data()
    # 删除所有的涨停卖数据
    l2_data_manager_new.L2LimitUpSellStatisticUtil.clear()
l2/cancel_buy_strategy.py
New file
@@ -0,0 +1,357 @@
"""
L2撤单策略
"""
# ---------------------------------S撤-------------------------------
# s级平均大单计算
# 计算范围到申报时间的那一秒
import json
import time
import constant
import l2_data_util
import redis_manager
import tool
import trade_data_manager
from l2 import l2_log
from l2_data_manager import L2DataUtil, local_today_num_operate_map, load_l2_data, local_today_datas
class SecondCancelBigNumComputer:
    __redis_manager = redis_manager.RedisManager(0)
    @classmethod
    def __getRedis(cls):
        return cls.__redis_manager.getRedis()
    # 保存结束位置
    @classmethod
    def __save_compute_data(cls, code, process_index, buy_num, cancel_num):
        key = "s_big_num_cancel_compute_data-{}".format(code)
        cls.__getRedis().setex(key, tool.get_expire(), json.dumps((process_index, buy_num, cancel_num)))
    @classmethod
    def __get_compute_data(cls, code):
        key = "s_big_num_cancel_compute_data-{}".format(code)
        val = cls.__getRedis().get(key)
        if val is None:
            return -1, 0, 0
        val = json.loads(val)
        return val[0], val[1], val[2]
    @classmethod
    def __clear_data(cls, code):
        ks = ["s_big_num_cancel_compute_data-{}".format(code)]
        for key in ks:
            cls.__getRedis().delete(key)
    @classmethod
    def clear_data(cls):
        ks = ["s_big_num_cancel_compute_data-*"]
        for key in ks:
            keys = cls.__getRedis().keys(key)
            for k in keys:
                cls.__getRedis().delete(k)
    # 计算净大单
    @classmethod
    def __compute_left_big_num(cls, code, start_index, end_index, total_data):
        # 获取大单的最小手数
        left_big_num = 0
        for i in range(start_index, end_index + 1):
            data = total_data[i]
            val = data["val"]
            # 去除非大单
            if not l2_data_util.is_big_money(val):
                continue
            if L2DataUtil.is_limit_up_price_buy(val):
                left_big_num += val["num"] * data["re"]
            elif L2DataUtil.is_limit_up_price_buy_cancel(val):
                # 查询买入位置
                buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data,
                                                                                 local_today_num_operate_map.get(
                                                                                     code))
                if buy_index is not None and start_index <= buy_index <= end_index:
                    left_big_num -= val["num"] * data["re"]
                elif buy_index is None:
                    # 有部分撤销从而导致的无法溯源,这时就需要判断预估买入时间是否在a_start_index到a_end_index的时间区间
                    min_space, max_space = l2_data_util.compute_time_space_as_second(val["cancelTime"],
                                                                                     val["cancelTimeUnit"])
                    # 只判断S级撤销,只有s级撤销才有可能相等
                    if max_space - min_space <= 1:
                        buy_time = tool.trade_time_add_second(val["time"], 0 - min_space)
                        if int(total_data[start_index]["val"]["time"].replace(":", "")) <= int(
                                buy_time.replace(":", "")) <= int(
                            total_data[end_index]["val"]["time"].replace(":", "")):
                            left_big_num -= val["num"] * data["re"]
        return left_big_num
    @classmethod
    def need_cancel(cls, code, buy_single_index, buy_exec_index, start_index, end_index, total_data, need_cancel=True):
        # 只守护30s
        if tool.trade_time_sub(total_data[start_index]["val"]["time"], total_data[buy_exec_index]["val"]["time"]) > 30:
            return False, None
        l2_log.cancel_debug(code, "S级是否需要撤单,数据范围:{}-{} ", start_index, end_index)
        if tool.trade_time_sub(total_data[end_index]["val"]["time"], total_data[buy_exec_index]["val"]["time"]) > 30:
            # 结束位置超过了执行位置30s,需要重新确认结束位置
            for i in range(end_index, start_index - 1, -1):
                if total_data[end_index]["val"]["time"] != total_data[i]["val"]["time"]:
                    end_index = i
                    break
        # 获取处理进度
        process_index_old, buy_num, cancel_num = cls.__get_compute_data(code)
        # 如果start_index与buy_single_index相同,即是下单后的第一次计算
        # 需要查询买入信号之前的同1s是否有涨停撤的数据
        process_index = -1
        if buy_single_index == start_index:
            # 第1次计算需要计算买入信号-执行位的净值
            left_big_num = cls.__compute_left_big_num(code, buy_single_index, buy_exec_index, total_data)
            buy_num += left_big_num
            # 设置买入信号-买入执行位的数据不需要处理
            start_index = end_index + 1
            process_index = end_index
            # for i in range(buy_single_index - 1, 0, -1):
            #     data = total_data[i]
            #     val = data["val"]
            #     if val["time"] != total_data[buy_single_index]["val"]["time"]:
            #         break
            #     if L2DataUtil.is_limit_up_price_buy_cancel(val) and int(val["cancelTime"]) == 0:
            #         # 涨停买撤销且撤销的间隔时间为0
            #         # 查询买入信号,如果无法查询到或者是买入位置比买入信号小就不算
            #         buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data,
            #                                                                          local_today_num_operate_map.get(
            #                                                                              code))
            #         if buy_index is not None and a_start_index <= buy_index <= a_end_index:
            #             # 在买入信号之后
            #             cls.__save_cancel_data(code, i)
        try:
            for i in range(start_index, end_index + 1):
                data = total_data[i]
                val = data["val"]
                if process_index_old >= i:
                    # 已经处理过的数据不需要处理
                    continue
                if not l2_data_util.is_big_money(val):
                    continue
                process_index = i
                if L2DataUtil.is_limit_up_price_buy_cancel(val):
                    # 查询买入位置
                    buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data,
                                                                                     local_today_num_operate_map.get(
                                                                                         code))
                    if buy_index is not None and buy_single_index <= buy_index:
                        cancel_num += buy_data["re"] * int(buy_data["val"]["num"])
                    elif buy_index is None:
                        # 有部分撤销从而导致的无法溯源,这时就需要判断预估买入时间是否在a_start_index到a_end_index的时间区间
                        min_space, max_space = l2_data_util.compute_time_space_as_second(val["cancelTime"],
                                                                                         val["cancelTimeUnit"])
                        # 只判断S级撤销,只有s级撤销才有可能相等
                        if max_space - min_space <= 1:
                            buy_time = tool.trade_time_add_second(val["time"], 0 - min_space)
                            if int(total_data[buy_single_index]["val"]["time"].replace(":", "")) <= int(
                                    buy_time.replace(":", "")):
                                cancel_num += buy_data["re"] * int(buy_data["val"]["num"])
                    # 保存数据
                    if need_cancel:
                        cancel_rate_threshold = constant.S_CANCEL_FIRST_RATE
                        place_order_count = trade_data_manager.placeordercountmanager.get_place_order_count(code)
                        if place_order_count <= 1:
                            cancel_rate_threshold = constant.S_CANCEL_FIRST_RATE
                        elif place_order_count <= 2:
                            cancel_rate_threshold = constant.S_CANCEL_SECOND_RATE
                        else:
                            cancel_rate_threshold = constant.S_CANCEL_THIRD_RATE
                        if cancel_num / buy_num > cancel_rate_threshold:
                            return True, total_data[i]
        finally:
            l2_log.cancel_debug(code, "S级大单 范围:{}-{} 取消计算结果:{}/{}", start_index, end_index, cancel_num, buy_num)
            # 保存处理进度与数据
            cls.__save_compute_data(code, process_index, buy_num, cancel_num)
        return False, None
    # 下单成功
    @classmethod
    def place_order_success(cls, code, buy_single_index, buy_exec_index):
        cls.__clear_data(code)
# --------------------------------H撤-------------------------------
class HourCancelBigNumComputer:
    __redis_manager = redis_manager.RedisManager(0)
    @classmethod
    def __getRedis(cls):
        return cls.__redis_manager.getRedis()
    # 保存成交进度
    @classmethod
    def __save_trade_progress(cls, code, index):
        key = f"trade_progress_index-{code}"
        cls.__getRedis().setex(key, tool.get_expire(), index)
    # 保存成交进度
    @classmethod
    def __get_trade_progress(cls, code):
        key = f"trade_progress_index-{code}"
        val = cls.__getRedis().get(key)
        if val is None:
            return None
        return int(val)
    # 保存结束位置
    @classmethod
    def __save_compute_data(cls, code, process_index, buy_num, cancel_num):
        key = "h_cancel_compute_data-{}".format(code)
        cls.__getRedis().setex(key, tool.get_expire(), json.dumps((process_index, buy_num, cancel_num)))
    @classmethod
    def __get_compute_data(cls, code):
        key = "h_cancel_compute_data-{}".format(code)
        val = cls.__getRedis().get(key)
        if val is None:
            return -1, 0, 0
        val = json.loads(val)
        return val[0], val[1], val[2]
    @classmethod
    def __clear_data(cls, code):
        ks = ["h_cancel_compute_data-{}".format(code)]
        for key in ks:
            cls.__getRedis().delete(key)
    @classmethod
    def clear_data(cls):
        ks = ["h_cancel_compute_data-*"]
        for key in ks:
            keys = cls.__getRedis().keys(key)
            for k in keys:
                cls.__getRedis().delete(k)
    # 计算净大单
    @classmethod
    def __compute_left_big_num(cls, code, start_index, end_index, total_data):
        # 获取大单的最小手数
        left_big_num = 0
        for i in range(start_index, end_index + 1):
            data = total_data[i]
            val = data["val"]
            # 去除非大单
            if not l2_data_util.is_big_money(val):
                continue
            if L2DataUtil.is_limit_up_price_buy(val):
                left_big_num += val["num"] * data["re"]
            elif L2DataUtil.is_limit_up_price_buy_cancel(val):
                # 查询买入位置
                buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data,
                                                                                 local_today_num_operate_map.get(
                                                                                     code))
                if buy_index is not None and start_index <= buy_index <= end_index:
                    left_big_num -= val["num"] * data["re"]
                elif buy_index is None:
                    # 有部分撤销从而导致的无法溯源,这时就需要判断预估买入时间是否在a_start_index到a_end_index的时间区间
                    min_space, max_space = l2_data_util.compute_time_space_as_second(val["cancelTime"],
                                                                                     val["cancelTimeUnit"])
                    # 只判断S级撤销,只有s级撤销才有可能相等
                    if max_space - min_space <= 1:
                        buy_time = tool.trade_time_add_second(val["time"], 0 - min_space)
                        if int(total_data[start_index]["val"]["time"].replace(":", "")) <= int(
                                buy_time.replace(":", "")) <= int(
                            total_data[end_index]["val"]["time"].replace(":", "")):
                            left_big_num -= val["num"] * data["re"]
        return left_big_num
    @classmethod
    def need_cancel(cls, code, buy_single_index, buy_exec_index, start_index, end_index, total_data, need_cancel=True):
        # 只守护30s
        if tool.trade_time_sub(total_data[start_index]["val"]["time"], total_data[buy_exec_index]["val"]["time"]) > 30:
            return False, None
        l2_log.cancel_debug(code, "S级是否需要撤单,数据范围:{}-{} ", start_index, end_index)
        if tool.trade_time_sub(total_data[end_index]["val"]["time"], total_data[buy_exec_index]["val"]["time"]) > 30:
            # 结束位置超过了执行位置30s,需要重新确认结束位置
            for i in range(end_index, start_index - 1, -1):
                if total_data[end_index]["val"]["time"] != total_data[i]["val"]["time"]:
                    end_index = i
                    break
        # 获取处理进度
        process_index_old, buy_num, cancel_num = cls.__get_compute_data(code)
        # 如果start_index与buy_single_index相同,即是下单后的第一次计算
        # 需要查询买入信号之前的同1s是否有涨停撤的数据
        process_index = -1
        if buy_single_index == start_index:
            # 第1次计算需要计算买入信号-执行位的净值
            left_big_num = cls.__compute_left_big_num(code, buy_single_index, buy_exec_index, total_data)
            buy_num += left_big_num
            # 设置买入信号-买入执行位的数据不需要处理
            start_index = end_index + 1
            process_index = end_index
        try:
            for i in range(start_index, end_index + 1):
                data = total_data[i]
                val = data["val"]
                if process_index_old >= i:
                    # 已经处理过的数据不需要处理
                    continue
                if not l2_data_util.is_big_money(val):
                    continue
                process_index = i
                if L2DataUtil.is_limit_up_price_buy_cancel(val):
                    # 查询买入位置
                    buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data,
                                                                                     local_today_num_operate_map.get(
                                                                                         code))
                    if buy_index is not None and buy_single_index <= buy_index:
                        cancel_num += buy_data["re"] * int(buy_data["val"]["num"])
                    elif buy_index is None:
                        # 有部分撤销从而导致的无法溯源,这时就需要判断预估买入时间是否在a_start_index到a_end_index的时间区间
                        min_space, max_space = l2_data_util.compute_time_space_as_second(val["cancelTime"],
                                                                                         val["cancelTimeUnit"])
                        # 只判断S级撤销,只有s级撤销才有可能相等
                        if max_space - min_space <= 1:
                            buy_time = tool.trade_time_add_second(val["time"], 0 - min_space)
                            if int(total_data[buy_single_index]["val"]["time"].replace(":", "")) <= int(
                                    buy_time.replace(":", "")):
                                cancel_num += buy_data["re"] * int(buy_data["val"]["num"])
                    # 保存数据
                    if need_cancel:
                        cancel_rate_threshold = constant.S_CANCEL_FIRST_RATE
                        place_order_count = trade_data_manager.placeordercountmanager.get_place_order_count(code)
                        if place_order_count <= 1:
                            cancel_rate_threshold = constant.S_CANCEL_FIRST_RATE
                        elif place_order_count <= 2:
                            cancel_rate_threshold = constant.S_CANCEL_SECOND_RATE
                        else:
                            cancel_rate_threshold = constant.S_CANCEL_THIRD_RATE
                        if cancel_num / buy_num > cancel_rate_threshold:
                            return True, total_data[i]
        finally:
            l2_log.cancel_debug(code, "S级大单 范围:{}-{} 取消计算结果:{}/{}", start_index, end_index, cancel_num, buy_num)
            # 保存处理进度与数据
            cls.__save_compute_data(code, process_index, buy_num, cancel_num)
        return False, None
    # 下单成功
    @classmethod
    def place_order_success(cls, code, buy_single_index, buy_exec_index, total_data):
        cls.__clear_data(code)
        cls.set_trade_progress(code, buy_exec_index)
    # 设置成交进度
    @classmethod
    def set_trade_progress(cls, code, index):
        l2_log.cancel_debug(code, "成交进度:{}", index)
        # 成交进度
        cls.__save_trade_progress(code, index)
l2/l2_data_util.py
New file
@@ -0,0 +1,37 @@
"""
L2相关数据处理
"""
# L2交易队列
class L2TradeQueueUtils(object):
    # 获取成交进度索引
    def find_traded_progress_index(cls, buy_1_price, total_datas, local_today_num_operate_map, queueList):
        if len(queueList) == 0:
            return None
        index_set = set()
        for num in queueList:
            buy_datas = local_today_num_operate_map.get(
                "{}-{}-{}".format(num, "0", buy_1_price))
            if buy_datas is not None and len(buy_datas) > 0:
                for data in buy_datas:
                    index_set.add(data["index"])
        index_list = list(index_set)
        index_list.sort()
        num_list = []
        new_index_list = []
        for index in index_list:
            for i in range(0, total_datas[index]["re"]):
                num_list.append(total_datas[index]["val"]["num"])
                new_index_list.append(index)
        index_list_str = ",".join(list(map(str, num_list)))
        queue_list_str = ",".join(list(map(str, queueList)))
        find_index = index_list_str.find(queue_list_str)
        if find_index >= 0:
            temp_str = index_list_str[0:find_index]
            return new_index_list[len(temp_str.split(","))]
        raise Exception("尚未找到成交进度")
if __name__ == "__main__":
    pass
l2/l2_log.py
New file
@@ -0,0 +1,15 @@
from log import logger_l2_trade_cancel, logger_l2_trade_buy, logger_l2_trade
def debug(cls, code, content, *args):
    logger_l2_trade.debug(("thread-id={} code={}  ".format(cls.random_key[code], code) + content).format(*args))
def buy_debug(cls, code, content, *args):
    logger_l2_trade_buy.debug(
        ("thread-id={} code={}  ".format(cls.random_key[code], code) + content).format(*args))
def cancel_debug(cls, code, content, *args):
    logger_l2_trade_cancel.debug(
        ("thread-id={} code={}  ".format(cls.random_key[code], code) + content).format(*args))
l2/safe_count_manager.py
New file
@@ -0,0 +1,136 @@
"""
安全笔数管理
"""
# 下单L2的安全笔数管理
import json
import l2_trade_factor
import redis_manager
import tool
from l2_data_manager import L2DataUtil
import l2_data_util
class BuyL2SafeCountManager(object):
    __redis_manager = redis_manager.RedisManager(0)
    def __init__(self):
        self.last_buy_queue_data = {}
    def __getRedis(self):
        return self.__redis_manager.getRedis()
    # 记录每一次的处理进度
    def __save_compute_progress(self, code, last_buy_single_index, process_index, buy_num, cancel_num):
        key = "safe_count_l2-{}-{}".format(code, last_buy_single_index)
        self.__getRedis().setex(key, tool.get_expire(),
                                json.dumps((last_buy_single_index, process_index, buy_num, cancel_num)))
    # 返回数据与更新时间
    def __get_compute_progress(self, code, last_buy_single_index):
        key = "safe_count_l2-{}-{}".format(code, last_buy_single_index)
        val = self.__getRedis().get(key)
        if val is None:
            return None, -1, 0, 0
        val = json.loads(val)
        return val[0], val[1], val[2], val[3]
    # 保存最近的下单信息
    def __save_latest_place_order_info(self, code, buy_single_index, buy_exec_index, cancel_index):
        key = "latest_place_order_info-{}".format(code)
        self.__getRedis().setex(key, tool.get_expire(), json.dumps((buy_single_index, buy_exec_index, cancel_index)))
    def __get_latest_place_order_info(self, code):
        key = "latest_place_order_info-{}-{}".format(code)
        val = self.__getRedis().get(key)
        if val is None:
            return None, None, None
        val = json.loads(val)
        return val[0], val[1], val[2]
    def __get_all_compute_progress(self, code):
        key_regex = f"safe_count_l2-{code}-*"
        keys = self.__getRedis().keys(key_regex)
        vals = []
        for k in keys:
            val = self.__getRedis().get(k)
            val = json.loads(val)
            vals.append(val)
        return vals
    def clear_data(self, code):
        pass
    # 获取基础的安全笔数
    def __get_base_save_count(self, code):
        return l2_trade_factor.L2TradeFactorUtil.get_safe_buy_count(code)
    # 获取最后的安全笔数
    def get_safe_count(self, code):
        rate = self.__get_rate(code)
        count = self.__get_base_save_count(code)
        count = round(count * rate)
        if count < 8:
            count = 8
        if count > 21:
            count = 21
        return count
    # 计算留下来的比例
    # last_buy_single_index 上一次下单信号起始位置
    # cancel_index 上一次取消下单的位置
    # start_index 数据开始位置
    # end_index 数据结束位置
    def compute_left_rate(self, code, start_index, end_index, total_datas,
                          local_today_num_operate_map):
        last_buy_single_index, buy_exec_index, cancel_index = self.__get_latest_place_order_info(code)
        if last_buy_single_index is None:
            return
        cancel_time = None
        if cancel_index is not None:
            cancel_time = total_datas[cancel_index]["val"]["time"]
        # 获取处理的进度
        last_buy_single_index_, process_index, buy_num, cancel_num = self.__get_compute_progress(code,
                                                                                                 last_buy_single_index)
        break_index = -1
        for i in range(start_index, end_index):
            data = total_datas[i]
            val = data["val"]
            # 如果没有取消位置就一直计算下去, 计算截至时间不能大于取消时间
            if cancel_time and int(cancel_time.split(":", "")) < int(val["time"].split(":", "")):
                break_index = i
                break
        if break_index >= 0:
            end_index = break_index - 1
        for i in range(start_index, end_index):
            data = total_datas[i]
            val = data["val"]
            if process_index >= i:
                continue
            if L2DataUtil.is_limit_up_price_buy(val):
                # 涨停买
                buy_num += int(val["num"]) * data["re"]
            elif L2DataUtil.is_limit_up_price_buy_cancel(val):
                # 获取买入信息
                buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data, local_today_num_operate_map)
                if buy_index is not None:
                    if last_buy_single_index <= buy_index <= end_index:
                        cancel_num += int(val["num"]) * data["re"]
        process_index = end_index
        # 保存处理进度与数量
        self.__save_compute_progress(code, last_buy_single_index, process_index, buy_num, cancel_num)
    # 获取比例
    def __get_rate(self, code):
        vals = self.__get_all_compute_progress(code)
        rate = (1 - 0)
        for val in vals:
            rate *= (1 - round((val[2] - val[3]) / val[2], 4))
        return rate
    # 下单成功
    def save_place_order_info(self, code, buy_single_index, buy_exec_index, cancel_index):
        self.__save_latest_place_order_info(code, buy_single_index, buy_exec_index, cancel_index)
l2/transaction_progress.py
New file
@@ -0,0 +1,79 @@
'''
成交进度
'''
# 买入队列
import json
import constant
import redis_manager
import tool
import l2_data_manager
import l2.l2_data_util
class TradeBuyQueue:
    __redis_manager = redis_manager.RedisManager(0)
    def __init__(self):
        self.last_buy_queue_data = {}
    def __getRedis(self):
        return self.__redis_manager.getRedis()
    def __save_buy_queue_data(self, code, num_list):
        key = "trade_buy_queue_data-{}".format(code)
        self.__getRedis().setex(key, tool.get_expire(), json.dumps((num_list, tool.get_now_time_str())))
    # 返回数据与更新时间
    def __get_buy_queue_data(self, code):
        key = "trade_buy_queue_data-{}".format(code)
        val = self.__getRedis().get(key)
        if val is None:
            return None, None
        val = json.loads(val)
        return val[0], [1]
    def __save_buy_progress_index(self, code, index):
        key = "trade_buy_progress_index-{}".format(code)
        self.__getRedis().setex(key, tool.get_expire(), index)
        # 返回数据与更新时间
    def __get_buy_progress_index(self, code):
        key = "trade_buy_progress_index-{}".format(code)
        val = self.__getRedis().get(key)
        if val is None:
            return None
        return int(val)
    # 保存数据,返回保存数据的条数
    def save(self, code, limit_up_price, queues):
        if queues == self.last_buy_queue_data.get(code):
            return None
        self.last_buy_queue_data[code] = queues
        min_num = round(constant.L2_MIN_MONEY / (limit_up_price * 100))
        num_list = []
        for num in queues:
            if num > min_num:
                num_list.append(num)
        # 保存列表
        self.__save_buy_queue_data(code, num_list)
        return num_list
    # 保存成交索引
    def save_traded_index(self, code, buy1_price, buyQueueBig):
        total_datas = l2_data_manager.local_today_datas.get(code)
        today_num_operate_map = l2_data_manager.local_today_num_operate_map.get(code)
        index = l2.l2_data_util.L2TradeQueueUtils.find_traded_progress_index(buy1_price, total_datas, total_datas,
                                                                             today_num_operate_map, buyQueueBig)
        if index is not None:
            # 保存成交进度
            self.__save_buy_progress_index(code, index)
            return index
        return None
    # 获取成交进度索引
    def get_traded_index(self, code):
        index = self.__get_buy_progress_index(code)
        return index
l2_code_operate.py
@@ -181,6 +181,9 @@
                print("发送操作异常:", str(e))
    def add_operate(self, type, code, msg="", client=None, pos=None):
        # 09:25:10之后才能操作
        if int(tool.get_now_time_str().replace(":", "")) < int("092510"):
            return
        redis = self.redis_manager_.getRedis()
        redis.rpush("code_operate_queue",
                    json.dumps({"type": type, "msg": msg, "code": code, "client": client, "pos": pos,
l2_data_log.py
@@ -4,11 +4,12 @@
import log
def l2_time(code, time_, description, new_line=False):
def l2_time(code, do_id, time_, description, new_line=False,force=False):
    timestamp = int(time.time() * 1000)
    # 只记录耗时较长的信息
    if time_ > 50:
        log.logger_l2_process_time.info("{} {}: {}-{}{}", timestamp, description, code, time_, "\n" if new_line else "")
    if time_ > 50 or force:
        log.logger_l2_process_time.info("{}-{} {}: {}-{}{}", do_id, timestamp, description, code, time_,
                                        "\n" if new_line else "")
    return timestamp
l2_data_manager.py
@@ -267,14 +267,14 @@
# 保存l2数据
def save_l2_data(code, datas, add_datas):
def save_l2_data(code, datas, add_datas,randomKey=None):
    redis = _redisManager.getRedis()
    # 只有有新曾数据才需要保存
    if len(add_datas) > 0:
        # 保存最近的数据
        __start_time = round(t.time() * 1000)
        redis.setex("l2-data-latest-{}".format(code), tool.get_expire(), json.dumps(datas))
        l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time, "保存最近l2数据用时")
        l2_data_log.l2_time(code,randomKey, round(t.time() * 1000) - __start_time, "保存最近l2数据用时")
        # 设置进内存
        local_latest_datas[code] = datas
        __set_l2_data_latest_count(code, len(datas))
l2_data_manager_new.py
@@ -23,6 +23,8 @@
import trade_manager
import trade_queue_manager
import trade_data_manager
from l2 import safe_count_manager
from l2.cancel_buy_strategy import SecondCancelBigNumComputer
from l2_data_manager import L2DataException, TradePointManager, local_today_datas, L2DataUtil, load_l2_data, \
    local_today_num_operate_map
from log import logger_l2_trade, logger_l2_trade_cancel, logger_l2_trade_buy, logger_l2_process, logger_buy_1_volumn, \
@@ -159,6 +161,8 @@
    __codeActualPriceProcessor = CodeActualPriceProcessor()
    buy1PriceManager = trade_queue_manager.Buy1PriceManager()
    __ths_l2_trade_queue_manager = trade_queue_manager.thsl2tradequeuemanager()
    __thsBuy1VolumnManager = trade_queue_manager.THSBuy1VolumnManager()
    __buyL2SafeCountManager = safe_count_manager.BuyL2SafeCountManager()
    @classmethod
    def debug(cls, code, content, *args):
@@ -178,8 +182,8 @@
    # 数据处理入口
    # datas: 本次截图数据
    # capture_timestamp:截图时间戳
    def process(cls, code, datas, capture_timestamp):
        cls.random_key[code] = random.randint(0, 100000)
    def process(cls, code, datas, capture_timestamp, do_id):
        cls.random_key[code] = do_id
        __start_time = round(t.time() * 1000)
        try:
            if len(datas) > 0:
@@ -201,8 +205,10 @@
                    cls.process_add_datas(code, add_datas, capture_timestamp, __start_time)
                finally:
                    # 保存数据
                    l2_data_manager.save_l2_data(code, datas, add_datas)
                    __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time,
                    __start_time = round(t.time() * 1000)
                    l2_data_manager.save_l2_data(code, datas, add_datas, cls.random_key[code])
                    __start_time = l2_data_log.l2_time(code, cls.random_key[code],
                                                       round(t.time() * 1000) - __start_time,
                                                       "保存数据时间({})".format(len(add_datas)))
        finally:
            if code in cls.unreal_buy_dict:
@@ -216,31 +222,31 @@
            local_today_datas[code].extend(add_datas)
            l2_data_util.load_num_operate_map(l2_data_manager.local_today_num_operate_map, code, add_datas)
            # ---------- 判断是否需要计算大单 -----------
            try:
                average_need, buy_single_index, buy_exec_index = AverageBigNumComputer.is_need_compute_average(
                    code, local_today_datas[code][-1])
                # 计算平均大单
                if average_need:
                    end_index = local_today_datas[code][-1]["index"]
                    if len(add_datas) > 0:
                        end_index = add_datas[-1]["index"]
                    AverageBigNumComputer.compute_average_big_num(code, buy_single_index, buy_single_index,
                                                                  end_index)
            except Exception as e:
                logging.exception(e)
            # try:
            #     average_need, buy_single_index, buy_exec_index = AverageBigNumComputer.is_need_compute_average(
            #         code, local_today_datas[code][-1])
            #     # 计算平均大单
            #     if average_need:
            #         end_index = local_today_datas[code][-1]["index"]
            #         if len(add_datas) > 0:
            #             end_index = add_datas[-1]["index"]
            #         AverageBigNumComputer.compute_average_big_num(code, buy_single_index, buy_single_index,
            #                                                       end_index)
            # except Exception as e:
            #     logging.exception(e)
            try:
                average_need, buy_single_index, buy_exec_index = SecondAverageBigNumComputer.is_need_compute_average(
                    code, local_today_datas[code][-1])
                # 计算平均大单
                if average_need:
                    end_index = local_today_datas[code][-1]["index"]
                    if len(add_datas) > 0:
                        end_index = add_datas[-1]["index"]
                    SecondAverageBigNumComputer.compute_average_big_num(code, buy_single_index, buy_single_index,
                                                                        end_index)
            except Exception as e:
                logging.exception(e)
            # try:
            #     average_need, buy_single_index, buy_exec_index = SecondCancelBigNumComputer.is_need_compute_average(
            #         code, local_today_datas[code][-1])
            #     # 计算平均大单
            #     if average_need:
            #         end_index = local_today_datas[code][-1]["index"]
            #         if len(add_datas) > 0:
            #             end_index = add_datas[-1]["index"]
            #         SecondCancelBigNumComputer.compute_average_big_num(code, buy_single_index, buy_single_index,
            #                                                             end_index)
            # except Exception as e:
            #     logging.exception(e)
            # 第1条数据是否为09:30:00
            if add_datas[0]["val"]["time"] == "09:30:00":
@@ -253,7 +259,8 @@
                        limit_up_time_manager.save_limit_up_time(code, "09:30:00")
        total_datas = local_today_datas[code]
        __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time, "l2数据预处理时间")
        __start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - __start_time,
                                           "l2数据预处理时间")
        if len(add_datas) > 0:
            latest_time = add_datas[len(add_datas) - 1]["val"]["time"]
@@ -275,7 +282,8 @@
            logger_l2_process.info("code:{} 处理数据范围: {}-{} 处理时间:{} 截图时间戳:{}", code, add_datas[0]["index"],
                                   add_datas[-1]["index"], round(t.time() * 1000) - __start_time,
                                   capture_timestamp)
            __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time, "l2数据处理时间")
            __start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - __start_time,
                                               "l2数据处理时间")
    # 处理未挂单
    @classmethod
@@ -284,7 +292,8 @@
        # 获取阈值
        threshold_money, msg = cls.__get_threshmoney(code)
        if round(t.time() * 1000) - __start_time > 10:
            __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time, "获取m值数据耗时")
            __start_time = l2_data_log.l2_time(code, cls.random_key.get(code), round(t.time() * 1000) - __start_time,
                                               "获取m值数据耗时")
        cls.__start_compute_buy(code, start_index, end_index, threshold_money, capture_time)
    # 测试专用
@@ -300,24 +309,37 @@
        if end_index < start_index:
            return
        total_data = local_today_datas.get(code)
        _start_time = round(t.time() * 1000)
        # 获取买入信号起始点
        buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set = cls.__get_order_begin_pos(
            code)
        # 处理安全笔数
        cls.__buyL2SafeCountManager.compute_left_rate(code, start_index, end_index, total_data,
                                                      local_today_num_operate_map.get(code))
        _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
                                          "已下单-获取买入信息耗时")
        # 撤单计算,只看买1
        cancel_data, cancel_msg = L2LimitUpMoneyStatisticUtil.process_data(code, start_index, end_index,
                                                                           buy_single_index, buy_exec_index)
        _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
                                          "已下单-买1统计耗时")
        # 撤单计算,看秒级大单撤单
        try:
            b_need_cancel, b_cancel_data = SecondAverageBigNumComputer.need_cancel(code, buy_single_index,
                                                                                   buy_exec_index, start_index,
                                                                                   end_index)
            b_need_cancel, b_cancel_data = SecondCancelBigNumComputer.need_cancel(code, buy_single_index,
                                                                                  buy_exec_index, start_index,
                                                                                  end_index, total_data)
            if b_need_cancel and not cancel_data:
                cancel_data = b_cancel_data
                cancel_msg = "申报时间截至大单撤销比例触发阈值"
        except Exception as e:
            logging.exception(e)
        _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
                                          "已下单-s级大单估算")
        # 撤单计算,看分钟级大单撤单
        try:
@@ -336,13 +358,17 @@
                                                                             buy_exec_index)
            except Exception as e:
                logging.exception(e)
        _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
                                          "已下单-板上卖耗时")
        # 计算m值大单
        cls.l2BigNumForMProcessor.process(code, max(buy_single_index, start_index), end_index,
                                          gpcode_manager.get_limit_up_price(code))
        _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
                                          "已下单-m值大单计算")
        if cancel_data:
            if cancel_data["index"] == 175:
                print("进入调试")
            cls.debug(code, "触发撤单,撤单位置:{} ,撤单原因:{}", cancel_data["index"], cancel_msg)
            # 撤单
            if cls.cancel_buy(code, cancel_msg):
@@ -351,6 +377,8 @@
            else:
                # 撤单尚未成功
                pass
            _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
                                              "已下单-撤单+处理剩余数据")
        else:
            # 如果有虚拟下单需要真实下单
@@ -361,12 +389,15 @@
                # 真实下单
                cls.__buy(code, unreal_buy_info[1], local_today_datas[code][unreal_buy_info[0]],
                          unreal_buy_info[0])
                _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
                                                  "已下单-真实下单")
        # 判断是否需要计算长大单的信息
        try:
            LongAverageBigNumComputer.compute_average_big_num(code, buy_single_index, buy_exec_index)
        except Exception as e:
            logging.exception(e)
        _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
                                          "已下单-计算长大单")
    @classmethod
    def __buy(cls, code, capture_timestamp, last_data, last_data_index):
@@ -388,11 +419,16 @@
                trade_manager.start_buy(code, capture_timestamp, last_data,
                                        last_data_index)
                trade_data_manager.placeordercountmanager.place_order(code)
                # 下单成功,需要删除最大买1
                cls.__thsBuy1VolumnManager.clear_max_buy1_volume(code)
                # 获取买入位置信息
                try:
                    buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set = cls.__get_order_begin_pos(
                        code)
                    SecondAverageBigNumComputer.place_order_success(code, buy_single_index, buy_exec_index)
                    cls.__buyL2SafeCountManager.save_place_order_info(code, buy_single_index, buy_exec_index,None)
                    SecondCancelBigNumComputer.place_order_success(code, buy_single_index, buy_exec_index)
                    AverageBigNumComputer.place_order_success(code, buy_single_index, buy_exec_index)
                    LongAverageBigNumComputer.place_order_success(code, buy_single_index, buy_exec_index)
                except Exception as e:
@@ -494,7 +530,7 @@
            # 当老大老二当前没涨停
            return False, "同一板块中老三,老四,...不能买"
        if cls.__codeActualPriceProcessor.is_under_water(code,total_datas[-1]["val"]["time"]):
        if cls.__codeActualPriceProcessor.is_under_water(code, total_datas[-1]["val"]["time"]):
            # 水下捞且板块中的票小于16不能买
            # if global_util.industry_hot_num.get(industry) is not None and global_util.industry_hot_num.get(
            #         industry) <= 16:
@@ -562,11 +598,11 @@
    @classmethod
    def cancel_buy(cls, code, msg=None, source="l2"):
        # 是否是交易队列触发
        buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set = cls.__get_order_begin_pos(
            code)
        total_datas = local_today_datas[code]
        if source == "trade_queue":
            # 交易队列触发的需要下单后5s
            buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set = cls.__get_order_begin_pos(
                code)
            total_datas = local_today_datas[code]
            if buy_exec_index is not None and buy_exec_index > 0:
                now_time_str = tool.get_now_time_str()
                if tool.trade_time_sub(now_time_str, total_datas[buy_exec_index]["val"]["time"]) < 5:
@@ -588,6 +624,8 @@
                cls.debug(code, "撤单中断,原因:{}", reason)
                return False
            cls.__cancel_buy(code)
            # 撤单成功
            cls.__buyL2SafeCountManager.save_place_order_info(code, buy_single_index, buy_exec_index, total_datas[-1]["index"])
        l2_data_manager.L2BigNumProcessor.del_big_num_pos(code)
        cls.debug(code, "执行撤单成功,原因:{}", msg)
@@ -597,7 +635,7 @@
    @classmethod
    def __virtual_buy(cls, code, buy_single_index, buy_exec_index, capture_time):
        cls.unreal_buy_dict[code] = (buy_exec_index, capture_time)
        SecondAverageBigNumComputer.place_order_success(code, buy_single_index, buy_exec_index)
        SecondCancelBigNumComputer.place_order_success(code, buy_single_index, buy_exec_index)
        AverageBigNumComputer.place_order_success(code, buy_single_index, buy_exec_index)
        # 删除之前的板上卖信息
        L2LimitUpSellStatisticUtil.delete(code)
@@ -622,7 +660,8 @@
                continue_count = 2
            # 有买入信号
            has_single, _index = cls.__compute_order_begin_pos(code, max(
                compute_start_index - 2 if new_add else compute_start_index, 0), continue_count, compute_end_index)
                (compute_start_index - continue_count - 1) if new_add else compute_start_index, 0), continue_count,
                                                               compute_end_index)
            buy_single_index = _index
            if has_single:
                num = 0
@@ -632,7 +671,7 @@
                # 如果是今天第一次有下单开始信号,需要设置大单起始点
                cls.l2BigNumForMProcessor.set_begin_pos(code, buy_single_index)
        _start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - _start_time, "下单信号计算时间")
        _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time, "下单信号计算时间")
        if buy_single_index is None:
            # 未获取到买入信号,终止程序
@@ -642,12 +681,14 @@
        cls.l2BigNumForMProcessor.process(code, max(buy_single_index, compute_start_index), compute_end_index,
                                          gpcode_manager.get_limit_up_price(code))
        _start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - _start_time, "计算m值大单")
        _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time, "计算m值大单")
        threshold_money, msg = cls.__get_threshmoney(code)
        # 买入纯买额统计
        compute_index, buy_nums, buy_count, rebegin_buy_pos, max_num_set_new = cls.__sum_buy_num_for_order_3(code, max(buy_single_index,compute_start_index),compute_end_index,num,count,threshold_money,buy_single_index,max_num_set)
        _start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - _start_time, "纯买额统计时间")
        compute_index, buy_nums, buy_count, rebegin_buy_pos, max_num_set_new = cls.__sum_buy_num_for_order_3(code, max(
            buy_single_index, compute_start_index), compute_end_index, num, count, threshold_money, buy_single_index,
                                                                                                             max_num_set)
        _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time, "纯买额统计时间")
        cls.debug(code, "m值-{} m值因子-{}", threshold_money, msg)
@@ -675,14 +716,18 @@
            L2LimitUpMoneyStatisticUtil.process_data(code, buy_single_index, compute_index, buy_single_index,
                                                     buy_exec_index, False)
            _start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - _start_time, "记录执行买入数据")
            _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
                                              "记录执行买入数据", force=True)
            # 数据是否处理完毕
            if compute_index >= compute_end_index:
                need_cancel, cancel_data = SecondAverageBigNumComputer.need_cancel(code, buy_single_index,
                                                                                   compute_index,
                                                                                   buy_single_index, compute_index,
                                                                                   True)
                need_cancel, cancel_data = SecondCancelBigNumComputer.need_cancel(code, buy_single_index,
                                                                                  compute_index,
                                                                                  buy_single_index, compute_index,
                                                                                  total_datas,
                                                                                  True)
                _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
                                                  "S级大单处理耗时", force=True)
                # 分钟级大单计算
                # need_cancel, cancel_data = AverageBigNumComputer.need_cancel(code, buy_single_index, compute_index,
                #                                                              buy_single_index, compute_index, True)
@@ -697,13 +742,18 @@
            else:
                # AverageBigNumComputer.need_cancel(code, buy_single_index, compute_index,
                #                                   buy_single_index, compute_index, False)
                SecondAverageBigNumComputer.need_cancel(code, buy_single_index, compute_index,
                                                        buy_single_index, compute_index, False)
                SecondCancelBigNumComputer.need_cancel(code, buy_single_index, compute_index, buy_single_index,
                                                       compute_index, total_datas, False)
                _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
                                                  "S级大单处理耗时", force=True)
                # 数据尚未处理完毕,进行下一步处理
                cls.debug(code, "数据尚未处理完毕,进行下一步处理,处理进度:{}", compute_index)
                # 处理撤单步骤
                cls.__process_order(code, compute_index + 1, compute_end_index, capture_time, False)
                _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
                                                  "处理撤单步骤耗时", force=True)
        else:
            # 未达到下单条件,保存纯买额,设置纯买额
            # 记录买入信号位置
@@ -773,15 +823,6 @@
    @classmethod
    def __get_threshmoney(cls, code):
        return l2_trade_factor.L2TradeFactorUtil.compute_m_value(code)
    # 是否为万手哥
    @classmethod
    def __is_big_money(cls, limit_up_price, val):
        if int(val["num"]) >= constant.BIG_MONEY_NUM:
            return True
        if int(val["num"]) * limit_up_price >= constant.BIG_MONEY_AMOUNT:
            return True
        return False
    # 计算万手哥笔数
    @classmethod
@@ -868,7 +909,7 @@
                            return None, buy_nums, buy_count, ii, max_buy_num_set
            # 涨停买
            if L2DataUtil.is_limit_up_price_buy(_val):
                if cls.__is_big_money(limit_up_price, _val):
                if l2_data_util.is_big_money(_val):
                    sub_threshold_count += int(total_datas[i]["re"])
                    max_buy_num_set.add(i)
                if round(int(_val["num"]) * float(_val["price"])) >= 5900:
@@ -882,7 +923,7 @@
                                                 buy_nums,
                                                 threshold_num, buy_count, get_threshold_count(), sub_threshold_count)
            elif L2DataUtil.is_limit_up_price_buy_cancel(_val):
                if cls.__is_big_money(limit_up_price, _val):
                if l2_data_util.is_big_money(_val):
                    sub_threshold_count -= int(total_datas[i]["re"])
                if round(int(_val["num"]) * float(_val["price"])) >= 5900:
                    # 只统计59万以上的金额
@@ -911,8 +952,15 @@
                        buy_count -= int(total_datas[i]["re"])
            cls.buy_debug(code, "位置-{},总手数:{},目标手数:{}", i,
                          buy_nums, threshold_num)
            # 需要的最小大单笔数
            big_num_count = 2
            if place_order_count > 1:
                # 第一次下单需要大单最少2笔,以后只需要1笔
                big_num_count = 1
            # 有撤单信号,且小于阈值
            if buy_nums >= threshold_num and buy_count >= get_threshold_count() and trigger_buy and len(max_buy_num_set)>1:
            if buy_nums >= threshold_num and buy_count >= get_threshold_count() and trigger_buy and len(
                    max_buy_num_set) >= big_num_count:
                return i, buy_nums, buy_count, None, max_buy_num_set
        cls.buy_debug(code, "尚未获取到买入执行点,起始计算位置:{} 统计纯买手数:{} 目标纯买手数:{}  统计纯买单数:{} 目标纯买单数:{} 大单数量:{}",
@@ -1140,35 +1188,68 @@
        logger_buy_1_volumn.info("涨停封单量矫正:代码-{} 量-{} 时间-{}", code, num, time_str)
        time_ = time_str.replace(":", "")
        key = None
        for i in range(4, -2, -2):
            # 获取本(分钟/小时/天)内秒分布数据
            time_regex = "{}*".format(time_[:i])
            keys_ = cls.__get_l2_second_money_record_keys(code, time_regex)
            if keys_ and len(keys_) > 1:
                # 需要排序
                keys = []
                for k in keys_:
                    keys.append(k)
                keys.sort(key=lambda tup: int(tup.split("-")[-1]))
                # 有2个元素
                for index in range(0, len(keys) - 1):
                    time_1 = keys[index].split("-")[-1]
                    time_2 = keys[index + 1].split("-")[-1]
                    if int(time_1) <= int(time_) <= int(time_2):
                        # 在此时间范围内
                        if time_ == time_2:
                            key = keys[index + 1]
                        else:
                            key = keys[index]
                        break
            if key:
                val = cls.__get_redis().get(key)
                old_num, old_from, old_to = cls.__format_second_money_record_val(val)
                end_index = old_to
                # 保存最近的数据
                cls.__set_l2_latest_money_record(code, end_index, num)
                logger_buy_1_volumn.info("涨停封单量矫正结果:代码-{} 位置-{} 量-{}", code, end_index, num)
        # 获取矫正时间前1分钟的数据
        keys = []
        for i in range(0, 3600):
            temp_time = tool.trade_time_add_second(time_str, 0 - i)
            # 只处理9:30后的数据
            if int(temp_time.replace(":", "")) < int("093000"):
                break
            keys_ = cls.__get_l2_second_money_record_keys(code, temp_time.replace(":", ""))
            if len(keys_) > 0:
                keys.append(keys_[0])
            if len(keys) >= 1:
                break
        keys.sort(key=lambda tup: int(tup.split("-")[-1]))
        if len(keys) > 0:
            key = keys[0]
            val = cls.__get_redis().get(key)
            old_num, old_from, old_to = cls.__format_second_money_record_val(val)
            end_index = old_to
            # 保存最近的数据
            cls.__set_l2_latest_money_record(code, end_index, num)
            logger_buy_1_volumn.info("涨停封单量矫正成功:代码-{} 位置-{} 量-{}", code, end_index, num)
        else:
            logger_buy_1_volumn.info("涨停封单量矫正失败:代码-{} 时间-{} 量-{}", code, time_str, num)
        # 取消此种方法
        #
        # for i in range(4, -2, -2):
        #     # 获取本(分钟/小时/天)内秒分布数据
        #     time_regex = "{}*".format(time_[:i])
        #     keys_ = cls.__get_l2_second_money_record_keys(code, time_regex)
        #     if keys_ and len(keys_) > 1:
        #         # 需要排序
        #         keys = []
        #         for k in keys_:
        #             keys.append(k)
        #         keys.sort(key=lambda tup: int(tup.split("-")[-1]))
        #         # if i == 4:
        #         #    keys=keys[:5]
        #         # 有2个元素
        #         for index in range(0, len(keys) - 1):
        #             time_1 = keys[index].split("-")[-1]
        #             time_2 = keys[index + 1].split("-")[-1]
        #             if int(time_1) <= int(time_) <= int(time_2):
        #                 # 在此时间范围内
        #                 if time_ == time_2:
        #                     key = keys[index + 1]
        #                 else:
        #                     key = keys[index]
        #                 break
        #         if key:
        #             break
        # # 如果没有找到匹配的区间
        # if not key:
        #     # 最后一条数据的时间为相应的区间
        #     total_datas = local_today_datas[code]
        #
        # if key:
        #     val = cls.__get_redis().get(key)
        #     old_num, old_from, old_to = cls.__format_second_money_record_val(val)
        #     end_index = old_to
        #     # 保存最近的数据
        #     cls.__set_l2_latest_money_record(code, end_index, num)
        #     logger_buy_1_volumn.info("涨停封单量矫正结果:代码-{} 位置-{} 量-{}", code, end_index, num)
    # 计算量,用于涨停封单量的计算
    @classmethod
@@ -1385,7 +1466,8 @@
            process_end_index = cancel_index
        # 保存最新累计金额
        # cls.__set_l2_latest_money_record(code, process_end_index, total_num)
        l2_data_log.l2_time(code, round(t.time() * 1000) - start_time, "l2数据封单额计算时间",
        l2_data_log.l2_time(code, L2TradeDataProcessor.random_key[code], round(t.time() * 1000) - start_time,
                            "l2数据封单额计算时间",
                            False)
        if cancel_index:
            L2TradeDataProcessor.cancel_debug(code, "数据处理位置:{}-{},{},最终买1为:{}", start_index, end_index, record_msg,
@@ -1486,313 +1568,6 @@
        load_l2_data(code)
        L2TradeDataProcessor.random_key[code] = 123123
        cls.process(code, 126, 171, 126)
# s级平均大单计算
# 计算范围到申报时间的那一秒
class SecondAverageBigNumComputer:
    __redis_manager = redis_manager.RedisManager(0)
    __place_order_time_dict = {}
    @classmethod
    def __getRedis(cls):
        return cls.__redis_manager.getRedis()
    @classmethod
    def __save_average_data(cls, code, average_num, average_up_count, start_index, end_index):
        key = "s_average_big_num-{}".format(code)
        cls.__getRedis().setex(key, 2000, json.dumps((average_num, average_up_count, start_index, end_index)))
        L2TradeDataProcessor.cancel_debug(code, "保存秒级大单位置信息:平均手数-{} 大单数量-{} 计算开始范围-{}:{}".format(average_num,
                                                                                                 average_up_count,
                                                                                                 start_index,
                                                                                                 end_index))
    @classmethod
    def __get_average_data(cls, code):
        key = "s_average_big_num-{}".format(code)
        val = cls.__getRedis().get(key)
        if val is None:
            return None, None, None, None
        val = json.loads(val)
        return val[0], val[1], val[2], val[3]
    # 保存买撤数据
    @classmethod
    def __save_cancel_data(cls, code, cancel_index):
        key = "s_average_big_num_comput_info-{}".format(code)
        cls.__getRedis().sadd(key, cancel_index)
    # 获取买撤的数据
    @classmethod
    def __get_cancel_datas(cls, code):
        key = "s_average_big_num_comput_info-{}".format(code)
        val = cls.__getRedis().smembers(key)
        return val
    # 保存买撤数据
    @classmethod
    def __save_apply_time(cls, code, time_str):
        key = "s_average_big_num_apply_time-{}".format(code)
        cls.__getRedis().setex(key, tool.get_expire(), time_str)
    # 获取买撤的数据
    @classmethod
    def __get_apply_time(cls, code):
        key = "s_average_big_num_apply_time-{}".format(code)
        val = cls.__getRedis().get(key)
        return val
    # 保存结束位置
    @classmethod
    def __save_end_index(cls, code, end_index):
        key = "s_average_big_num_end_index_set-{}".format(code)
        cls.__getRedis().sadd(key, end_index)
    @classmethod
    def __list_end_indexs(cls, code):
        key = "s_average_big_num_end_index_set-{}".format(code)
        vals = cls.__getRedis().smembers(key)
        if vals is None:
            return None
        results = []
        for val in vals:
            results.append(int(val))
        results.sort()
        return results
    @classmethod
    def __clear_data(cls, code):
        ks = ["s_average_big_num_comput_info-{}".format(code), "s_average_big_num-{}".format(code),
              "s_average_big_num_end_index_set-{}".format(code)]
        for key in ks:
            cls.__getRedis().delete(key)
    @classmethod
    def clear_data(cls):
        ks = ["s_average_big_num_comput_info-*", "s_average_big_num-*", "s_average_big_num_end_index_set-*"]
        for key in ks:
            keys = cls.__getRedis().keys(key)
            for k in keys:
                cls.__getRedis().delete(k)
    # 计算平均手数
    # 计算范围:买入信号起始点到买入执行位的下一张图结束点数据为止
    @classmethod
    def compute_average_big_num(cls, code, buy_single_index, start_index, end_index):
        cls.__save_end_index(code, end_index)
        # 保存结束位置
        end_indexs = cls.__list_end_indexs(code)
        print("compute_average_big_num", code, buy_single_index, start_index, end_index)
        L2TradeDataProcessor.cancel_debug(code, "开始计算短大单位置")
        total_data = local_today_datas[code]
        num = 0
        count = 0
        apply_time = cls.get_apply_time(code)
        apply_time_second = int(apply_time.replace(":", ""))
        for ei in end_indexs:
            if int(total_data[ei]["val"]["time"].replace(":", "")) >= apply_time_second:
                end_index = ei
                break
        for i in range(start_index, end_index + 1):
            data = total_data[i]
            val = data["val"]
            # if int(val["time"].replace(":", "")) > apply_time_second:
            #     # 重新设置计算结束位置
            #     end_index = i - 1
            #     break
            if L2DataUtil.is_limit_up_price_buy(val):  # and float(val["price"]) * int(val["num"]) > 7500:
                # 75万以上的才参与计算平均大单
                count += data["re"]
                num += int(val["num"])
        # 如果没有找到75万以上的单就不添加75w的筛选条件
        if count == 0:
            for i in range(start_index, end_index + 1):
                data = total_data[i]
                val = data["val"]
                if L2DataUtil.is_limit_up_price_buy(val):
                    if int(val["time"].replace(":", "")) > apply_time_second:
                        break
                    # 75万以上的才参与计算平均大单
                    count += data["re"]
                    num += int(val["num"])
        average_num = num // count
        average_num = min(constant.BIG_MONEY_NUM,
                          round(constant.BIG_MONEY_AMOUNT / gpcode_manager.get_limit_up_price(code)))
        average_up_count = 0
        for i in range(start_index, end_index + 1):
            data = total_data[i]
            val = data["val"]
            if L2DataUtil.is_limit_up_price_buy(val):
                if int(val["time"].replace(":", "")) > apply_time_second:
                    break
                if int(val["num"]) >= average_num:
                    average_up_count += data["re"]
        print("平均手数:", average_num, "大单总数:", average_up_count)
        # 保存数据
        cls.__save_average_data(code, average_num, average_up_count, start_index, end_index)
    # 是否需要撤单
    @classmethod
    def need_cancel(cls, code, buy_single_index, buy_exec_index, start_index, end_index, need_cancel=True):
        average_num, average_up_count, a_start_index, a_end_index = cls.__get_average_data(code)
        L2TradeDataProcessor.cancel_debug(code, "s级是否需要撤单,数据范围:{}-{}  平均大单信息-({},{},{},{})", start_index, end_index,
                                          average_num, average_up_count, a_start_index, a_end_index)
        if average_num is None:
            return False, None
        total_data = local_today_datas[code]
        # 只守护30s
        if tool.trade_time_sub(total_data[start_index]["val"]["time"], total_data[buy_exec_index]["val"]["time"]) > 30:
            return False, None
        # 如果start_index与buy_single_index相同,即是下单后的第一次计算
        # 需要查询买入信号之前的同1s是否有涨停撤的数据
        if buy_single_index == start_index:
            for i in range(buy_single_index - 1, 0, -1):
                data = total_data[i]
                val = data["val"]
                if val["time"] != total_data[buy_single_index]["val"]["time"]:
                    break
                if L2DataUtil.is_limit_up_price_buy_cancel(val) and int(val["cancelTime"]) == 0:
                    # 涨停买撤销且撤销的间隔时间为0
                    # 查询买入信号,如果无法查询到或者是买入位置比买入信号小就不算
                    buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data,
                                                                                     local_today_num_operate_map.get(
                                                                                         code))
                    if buy_index is not None and a_start_index <= buy_index <= a_end_index:
                        # 在买入信号之后
                        cls.__save_cancel_data(code, i)
        for i in range(start_index, end_index + 1):
            data = total_data[i]
            val = data["val"]
            # print("处理进度", i)
            if L2DataUtil.is_limit_up_price_buy_cancel(val):
                # 查询买入位置
                buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data,
                                                                                 local_today_num_operate_map.get(
                                                                                     code))
                if buy_index is not None and a_start_index <= buy_index <= a_end_index:
                    cls.__save_cancel_data(code, i)
                else:
                    # 有部分撤销从而导致的无法溯源,这时就需要判断预估买入时间是否在a_start_index到a_end_index的时间区间
                    min_space, max_space = l2_data_util.compute_time_space_as_second(val["cancelTime"],
                                                                                     val["cancelTimeUnit"])
                    # 只判断S级撤销,只有s级撤销才有可能相等
                    if max_space - min_space <= 1:
                        buy_time = tool.trade_time_add_second(val["time"], 0 - min_space)
                        if int(total_data[a_start_index]["val"]["time"].replace(":", "")) <= int(
                                buy_time.replace(":", "")) <= int(
                            total_data[a_end_index]["val"]["time"].replace(":", "")):
                            cls.__save_cancel_data(code, i)
        if need_cancel:
            # 计算买撤大单暂比
            cancel_datas = cls.__get_cancel_datas(code)
            if cancel_datas is not None and len(cancel_datas) > 0:
                L2TradeDataProcessor.cancel_debug(code, "s级大单 取消数量:{}", len(cancel_datas))
                cancel_rate_threshold = 0.49
                place_order_count = trade_data_manager.placeordercountmanager.get_place_order_count(code)
                if place_order_count <= 1:
                    cancel_rate_threshold = 0.49
                elif place_order_count <= 2:
                    cancel_rate_threshold = 0.59
                else:
                    cancel_rate_threshold = 0.69
                cancel_indexs = []
                for index in cancel_datas:
                    cancel_indexs.append(int(index))
                cancel_indexs.sort()
                # print("取消的数据", cancel_indexs)
                cancel_count = 0
                for index in cancel_indexs:
                    data = total_data[index]
                    if int(data["val"]["num"]) >= average_num:
                        cancel_count += data["re"]
                        if cancel_count / average_up_count > cancel_rate_threshold:
                            return True, total_data[index]
        return False, None
    # 是否需要计算
    @classmethod
    def is_need_compute_average(cls, code, latest_data):
        total_datas = local_today_datas[code]
        data = cls.__place_order_time_dict.get(code)
        if data is None:
            return False, None, None
        elif tool.trade_time_sub(latest_data["val"]["time"], cls.get_apply_time(code)) < 5:
            # 有5s时间上传申报时间
            return True, data[1], data[2]
        else:
            cls.__place_order_time_dict.pop(code)
        return False, None, None
    # 设置申报时间
    @classmethod
    def set_apply_time(cls, code, time_str, force=False):
        old_time_str = cls.get_apply_time(code)
        if not force:
            if old_time_str is not None:
                sub_time = tool.trade_time_sub(time_str, old_time_str)
                if sub_time <= 0 or sub_time > 4:
                    # 申报时间与下单时间不能操过4s
                    return
        cls.__save_apply_time(code, time_str)
    @classmethod
    def get_apply_time(cls, code):
        return cls.__get_apply_time(code)
    # 下单成功
    @classmethod
    def place_order_success(cls, code, buy_single_index, buy_exec_index):
        cls.__clear_data(code)
        cls.__place_order_time_dict[code] = (t.time(), buy_single_index, buy_exec_index)
        # 以防万一,先保存下单信息
        total_data = local_today_datas[code]
        cls.set_apply_time(code, total_data[buy_exec_index]["val"]["time"], True)
        cls.compute_average_big_num(code, buy_single_index, buy_single_index, total_data[-1]["index"])
    @classmethod
    def __test(cls, datas):
        code = datas[0]
        load_l2_data(code)
        L2TradeDataProcessor.random_key[code] = 123123
        # 先执行下单
        buy_single_index = datas[1]
        buy_exec_index = datas[2]
        local_today_datas[code] = local_today_datas[code][0:datas[4]]
        cls.place_order_success(code, buy_single_index, buy_exec_index)
        # 执行是否需要计算average
        cls.compute_average_big_num(code, buy_single_index, buy_single_index, datas[3])
        cancel, cancel_data = cls.need_cancel(code, buy_single_index, buy_exec_index, buy_single_index, buy_exec_index,
                                              False)
        for i in range(buy_exec_index + 1, datas[4]):
            cancel, cancel_data = cls.need_cancel(code, buy_single_index, buy_exec_index, i, i)
            if cancel:
                print("需要撤单", cancel, cancel_data["index"])
                break
    @classmethod
    def test(cls):
        # cls.__test(("000909", 607, 646, 646, 694))
        # 代码 买入信号起始点  买入信息执行位置  计算末位 最远计算位置
        # cls.__test(("002793", 292, 308, 314, 410))
        cls.__save_end_index("000333", 200)
        cls.__save_end_index("000333", 101)
        cls.__save_end_index("000333", 99)
        cls.__save_end_index("000333", 120)
        cls.__save_end_index("000333", 126)
        cls.__save_end_index("000333", 126)
        print(cls.__list_end_indexs("000333"))
        # 执行是否需要撤销
# 平均大单计算
@@ -2102,26 +1877,27 @@
                    if count >= constant.H_CANCEL_BUY_COUNT:
                        end_index = i
                        break
        # logging.info(f"H撤大单笔数,{count}")
        # 获取大单数量
        average_up_count = 0
        average_up_total_num = 0
        average_num = round(num / count)
        for i in range(start_index, end_index + 1):
            data = total_data[i]
            val = data["val"]
            if int(val["num"]) >= average_num:
                average_up_count += data["re"]
                average_up_total_num += data["re"] * int(val["num"])
        # 保存数据
        cls.__save_average_data(code, average_num, average_up_count, count, start_index, end_index)
        cls.__save_average_data(code, average_num, average_up_total_num, count, start_index, end_index)
        cls.__save_compute_info(code, 0, buy_exec_index)
    # 是否需要撤单
    @classmethod
    def need_cancel(cls, code, buy_exec_index, start_index, end_index):
        average_num, average_up_count, total_count, a_start_index, a_end_index = cls.__get_average_data(code)
        average_num, average_up_total_num, total_count, a_start_index, a_end_index = cls.__get_average_data(code)
        if average_num is None:
            return False, None
        cancel_count, process_index = cls.__get_compute_info(code)
        cancel_num, process_index = cls.__get_compute_info(code)
        total_data = local_today_datas[code]
        # 14:30过后不再守护
        if int(total_data[end_index]["val"]["time"].replace(":", "")) > int("143000"):
@@ -2141,14 +1917,13 @@
                                                                                         code))
                    if buy_index is not None and a_start_index <= buy_index <= a_end_index:
                        # 买入位置要在平均值计算范围内
                        cancel_count += data["re"]
                        cancel_num += data["re"] * int(val["num"])
                        process_index = i
                        sj = 0  # 5 * tool.trade_time_sub(val["time"],total_data[buy_exec_index]["val"]["time"])
                        print("h平均大单计算结果:", "取消数量", cancel_count, "大单总数", average_up_count, sj)
                        if cancel_count / (average_up_count - sj) >= 0.75:
                        print("h平均大单计算结果:", "取消手数", cancel_num, "大单手数", average_up_total_num)
                        if cancel_num / average_up_total_num >= constant.H_CANCEL_RATE:
                            return True, i
        finally:
            cls.__save_compute_info(code, cancel_count, process_index)
            cls.__save_compute_info(code, cancel_num, process_index)
        return False, None
    # 下单成功
@@ -2190,7 +1965,7 @@
    # AverageBigNumComputer.test()
    # LongAverageBigNumComputer.test()
    # L2TradeDataProcessor.test()
    SecondAverageBigNumComputer.test()
    L2LimitUpMoneyStatisticUtil.verify_num("601958", 89178, "13:22:45")
    # load_l2_data("600213")
    #
    # buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(local_today_datas["600213"][84],
l2_data_util.py
@@ -26,6 +26,28 @@
    return decorator
# 是否为大单
def is_big_money(val):
    price = float(val["price"])
    money = price * int(val["num"])
    if price > 3.0:
        if money >= 30000:
            return True
        else:
            return False
    else:
        max_money = price * 10000
        if money >= max_money * 0.95:
            return True
        else:
            return False
    # if int(val["num"]) >= constant.BIG_MONEY_NUM:
    #     return True
    # if int(val["num"]) * limit_up_price >= constant.BIG_MONEY_AMOUNT:
    #     return True
    # return False_
def compare_time(time1, time2):
    result = int(time1.replace(":", "", 2)) - int(time2.replace(":", "", 2))
    return result
l2_trade_test.py
@@ -15,8 +15,7 @@
import trade_manager
from l2_data_manager import TradePointManager
from l2_data_manager_new import L2TradeDataProcessor, L2LimitUpMoneyStatisticUtil, AverageBigNumComputer, \
    SecondAverageBigNumComputer
# from l2_data_manager_new import L2TradeDataProcessor, L2LimitUpMoneyStatisticUtil, AverageBigNumComputer
from trade_queue_manager import THSBuy1VolumnManager
@@ -40,51 +39,51 @@
        redis_info.delete(k)
class VirtualTrade(unittest.TestCase):
    code = "001236"
    clear_trade_data(code)
    l2_data_manager.load_l2_data(code)
    total_datas = l2_data_manager.local_today_datas[code]
    if total_datas[0]["index"] > 0:
        # 拼接数据
        for i in range(0, total_datas[0]["index"]):
            data = total_datas[0].copy()
            data["index"] = i
            total_datas.insert(i, data)
    pos_list = log.get_l2_process_position(code)
    if pos_list[0][0] > 0:
        pos_list.insert(0, (0, pos_list[0][0] - 1))
    del pos_list[-1]
    if pos_list[-1][1] < total_datas[-1]["index"]:
        # 剩下的数据根据秒来分
        start_index = -1
        for i in range(pos_list[-1][1] + 1, total_datas[-1]["index"] + 1):
            if total_datas[i]["val"]["time"] != total_datas[i - 1]["val"]["time"]:
                if start_index < 0:
                    start_index = i
                else:
                    pos_list.append((start_index, i - 1))
                    start_index = i
    if pos_list[-1][1] < total_datas[-1]["index"]:
        pos_list.append((pos_list[-1][1] + 1, total_datas[-1]["index"]))
    l2_data_manager_new.local_today_datas = {code: []}
    l2_trade_factor.L2TradeFactorUtil.get_safe_buy_count = mock.Mock(return_value=21)
    for indexs in pos_list:
        L2TradeDataProcessor.random_key[code] = mock.Mock(return_value=random.randint(0, 100000))
        # 设置封单额,获取买1量
        for i in range(0, 100):
            time_ = total_datas[indexs[0]]["val"]["time"]
            time_s = tool.get_time_as_second(time_) - i - 1
            volumn = THSBuy1VolumnManager().get_buy_1_volumn(code, tool.time_seconds_format(time_s))
            if volumn is not None:
                l2_data_manager_new.L2LimitUpMoneyStatisticUtil.verify_num(code, int(volumn),
                                                                           tool.time_seconds_format(time_s))
                break
        print("----------------处理位置", indexs)
        L2TradeDataProcessor.process_add_datas(code, total_datas[indexs[0]:indexs[1] + 1], 0, 0)
#
# class VirtualTrade(unittest.TestCase):
#     code = "000701"
#     clear_trade_data(code)
#     l2_data_manager.load_l2_data(code)
#     total_datas = l2_data_manager.local_today_datas[code]
#     if total_datas[0]["index"] > 0:
#         # 拼接数据
#         for i in range(0, total_datas[0]["index"]):
#             data = total_datas[0].copy()
#             data["index"] = i
#             total_datas.insert(i, data)
#
#     pos_list = log.get_l2_process_position(code)
#     if pos_list[0][0] > 0:
#         pos_list.insert(0, (0, pos_list[0][0] - 1))
#     del pos_list[-1]
#     if pos_list[-1][1] < total_datas[-1]["index"]:
#         # 剩下的数据根据秒来分
#         start_index = -1
#         for i in range(pos_list[-1][1] + 1, total_datas[-1]["index"] + 1):
#             if total_datas[i]["val"]["time"] != total_datas[i - 1]["val"]["time"]:
#                 if start_index < 0:
#                     start_index = i
#                 else:
#                     pos_list.append((start_index, i - 1))
#                     start_index = i
#     if pos_list[-1][1] < total_datas[-1]["index"]:
#         pos_list.append((pos_list[-1][1] + 1, total_datas[-1]["index"]))
#     l2_data_manager_new.local_today_datas = {code: []}
#     l2_trade_factor.L2TradeFactorUtil.get_safe_buy_count = mock.Mock(return_value=12)
#     for indexs in pos_list:
#         L2TradeDataProcessor.random_key[code] = mock.Mock(return_value=random.randint(0, 100000))
#         # 设置封单额,获取买1量
#         for i in range(0, 100):
#             time_ = total_datas[indexs[0]]["val"]["time"]
#             time_s = tool.get_time_as_second(time_) - i - 1
#             volumn = THSBuy1VolumnManager().get_buy_1_volumn(code, tool.time_seconds_format(time_s))
#             if volumn is not None:
#                 l2_data_manager_new.L2LimitUpMoneyStatisticUtil.verify_num(code, int(volumn),
#                                                                            tool.time_seconds_format(time_s))
#                 break
#
#         print("----------------处理位置", indexs)
#         L2TradeDataProcessor.process_add_datas(code, total_datas[indexs[0]:indexs[1] + 1], 0, 0)
# class TestTrade(unittest.TestCase):
l2_trade_util.py
@@ -42,3 +42,7 @@
    key = "forbidden-trade-codes"
    redis = __redis_manager.getRedis()
    return redis.sismember(key, code)
if __name__ == "__main__":
    add_to_forbidden_trade_codes("605133")
log.py
@@ -4,6 +4,7 @@
import datetime
import json
import os
import shutil
import sys
from loguru import logger
@@ -63,6 +64,10 @@
                   filter=lambda record: record["extra"].get("name") == "l2_trade_queue",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("l2", "l2_trade_buy_queue"),
                   filter=lambda record: record["extra"].get("name") == "l2_trade_buy_queue",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("juejin", "juejin_tick"),
                   filter=lambda record: record["extra"].get("name") == "juejin_tick",
                   rotation="00:00", compression="zip", enqueue=True)
@@ -114,6 +119,7 @@
logger_l2_trade_cancel = __mylogger.get_logger("l2_trade_cancel")
logger_l2_trade_buy = __mylogger.get_logger("l2_trade_buy")
logger_l2_trade_queue = __mylogger.get_logger("l2_trade_queue")
logger_l2_trade_buy_queue = __mylogger.get_logger("l2_trade_buy_queue")
logger_l2_big_data = __mylogger.get_logger("l2_big_data")
logger_juejin_tick = __mylogger.get_logger("juejin_tick")
@@ -288,20 +294,23 @@
def export_logs(code):
    code_name = gpcode_manager.get_code_name(code)
    date = datetime.datetime.now().strftime("%Y-%m-%d")
    target_dir = f"D:/logs/gp/l2/export/{code}_{code_name}_{date}"
    if os.path.exists(target_dir):
        shutil.rmtree(target_dir)
    os.makedirs(target_dir)
    log_names = ["l2_process", "l2_trade", "l2_trade_cancel", "l2_process_time", "l2_trade_buy"]
    # 导出交易日志
    LogUtil.extract_log_from_key("code={}".format(code), "D:/logs/gp/l2/l2_trade.{}.log".format(date),
                                 "D:/logs/gp/l2/l2_trade.{}_{}.{}.log".format(code, code_name, date))
    # 导出取消日志
    LogUtil.extract_log_from_key("code={}".format(code), "D:/logs/gp/l2/l2_trade_cancel.{}.log".format(date),
                                 "D:/logs/gp/l2/l2_trade_cancel.{}_{}.{}.log".format(code, code_name, date))
    LogUtil.extract_log_from_key("{}".format(code), "D:/logs/gp/l2/l2_process.{}.log".format(date),
                                 "D:/logs/gp/l2/l2_process.{}_{}.{}.log".format(code, code_name, date))
    for log_name in log_names:
        key = f"code={code}"
        if log_name == "l2_process" or log_name == "l2_process_time":
            key = code
        LogUtil.extract_log_from_key(key, f"D:/logs/gp/l2/{log_name}.{date}.log".format(date),
                                     f"{target_dir}/{log_name}.{code}_{code_name}.{date}.log")
if __name__ == '__main__':
    # logger_l2_process_time.info("test123")
    codes = ["002766"]
    codes = ["002842"]
    for code in codes:
        export_logs(code)
ocr/ocr_server.py
New file
@@ -0,0 +1,97 @@
import json
import logging
import socketserver
import socket
import cv2
from ocr import ocr_util
class MyTCPServer(socketserver.TCPServer):
    def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True, pipe_juejin=None, pipe_ui=None):
        self.pipe_juejin = pipe_juejin  # 增加的参数
        self.pipe_ui = pipe_ui
        socketserver.TCPServer.__init__(self, server_address, RequestHandlerClass, bind_and_activate=bind_and_activate)
# 如果使用异步的形式则需要再重写ThreadingTCPServer
class MyThreadingTCPServer(socketserver.ThreadingMixIn, MyTCPServer): pass
class MyBaseRequestHandle(socketserver.BaseRequestHandler):
    ocr_temp_data = {}
    def setup(self):
        super().setup()
    def handle(self):
        host = self.client_address[0]
        super().handle()  # 可以不调用父类的handler(),方法,父类的handler方法什么都没做
        # print("-------handler方法被执行----")
        # print(self.server)
        # print(self.request)  # 服务
        # print("客户端地址:", self.client_address)  # 客户端地址
        # print(self.__dict__)
        # print("- " * 30)
        # print(self.server.__dict__)
        # print("- " * 30)
        sk: socket.socket = self.request
        # 设置非阻塞
        sk.setblocking(False)
        data = bytes()
        while True:
            try:
                temp_data = sk.recv(1024)
                if not temp_data:
                    break
                data += temp_data
            except Exception as e:
                break
        _str = str(data, encoding="gbk")
        print("OCR SERVER 内容:", _str[0:20], "......", _str[-150:-1])
        return_str = "OK"
        try:
            data = ""
            try:
                data = json.loads(_str)
            except:
                raise Exception("json解析失败")
            type = data["type"]
            if type == 100:
                data = data["data"]
                matId = data["matId"]
                index = data["index"]
                maxIndex = data["maxIndex"]
                cols = data["width"]
                rows = data["height"]
                key = data["key"]
                datas = data["data"]
                if self.ocr_temp_data.get(matId) is None:
                    self.ocr_temp_data[matId] = []
                self.ocr_temp_data[matId].extend(datas)
                if maxIndex == index:
                    # 数据传输完成
                    datas = self.ocr_temp_data[matId]
                    if rows * cols == len(datas):
                        self.ocr_temp_data.pop(matId)
                        mat = cv2.numpy.zeros((rows, cols, 1), cv2.numpy.uint8)
                        for r in range(0, rows):
                            for c in range(0, cols):
                                mat[r][c] = [datas[r * cols + c]]
                        # cv2.imwrite("D:/test.png", mat)
                        ocr_results = ocr_util.OcrUtil.ocr_with_key(mat, key)
                        # 图像识别
                        return_str = json.dumps({"code": 0, "data": {"datas": ocr_results}})
                    else:
                        return_str = json.dumps({"code": 2, "msg": "数据出错"})
                else:
                    return_str = json.dumps({"code": 1, "msg": "数据尚未上传完"})
        except Exception as e:
            if str(e).__contains__("json解析失败"):
                logging.error("OCR数据JSON解析解析失败")
                return_str = json.dumps({"code": -1, "msg": str(e)})
        sk.send(return_str.encode())
    def finish(self):
        super().finish()
ocr/ocr_util.py
File was renamed from ocr_util.py
@@ -23,6 +23,8 @@
        for r in res:
            text = r["text"]
            if re.match(key, text):
                res_final.append((text, r["position"]))
        print("识别时间",time.time() - start)
        return res_final
                ps = r["position"]
                res_final.append((text, [(int(ps[0][0]), int(ps[0][1])), (int(ps[1][0]), int(ps[1][1])),
                                         (int(ps[2][0]), int(ps[2][1])), (int(ps[3][0]), int(ps[3][1]))]))
        print("识别时间", time.time() - start)
        return res_final
server.py
@@ -1,13 +1,16 @@
"""
接受客户端数据的服务器
"""
import datetime
import decimal
import json
import logging
import random
import socketserver
import socket
import threading
import time
import cv2
import alert_util
import client_manager
@@ -22,7 +25,9 @@
import l2_data_manager
import l2_data_manager_new
import l2_data_util
import limit_up_time_manager
from l2.cancel_buy_strategy import HourCancelBigNumComputer
from ocr import ocr_util
import ths_industry_util
import ths_util
import tool
@@ -31,9 +36,10 @@
import trade_manager
import l2_code_operate
from code_data_util import ZYLTGBUtil
import l2.transaction_progress
from log import logger_l2_error, logger_device, logger_trade_delegate, logger_buy_1_volumn_record, \
    logger_l2_trade_queue, logger_l2_latest_data
    logger_l2_trade_queue, logger_l2_latest_data, logger_l2_trade_buy_queue
from trade_queue_manager import THSBuy1VolumnManager, Buy1PriceManager, thsl2tradequeuemanager
@@ -58,6 +64,8 @@
    buy1_price_manager = Buy1PriceManager()
    l2_trade_queue_time_dict = {}
    l2_save_time_dict = {}
    l2_trade_buy_queue_dict = {}
    tradeBuyQueue = l2.transaction_progress.TradeBuyQueue()
    def setup(self):
        super().setup()  # 可以不调用父类的setup()方法,父类的setup方法什么都没做
@@ -78,7 +86,7 @@
        # print("- " * 30)
        sk: socket.socket = self.request
        while True:
            data = sk.recv(1024000)
            data = sk.recv(1024 * 1024 * 20)
            if len(data) == 0:
                # print("客户端断开连接")
                break
@@ -92,6 +100,7 @@
                    try:
                        origin_start_time = round(time.time() * 1000)
                        __start_time = round(time.time() * 1000)
                        do_id = random.randint(0, 100000)
                        # level2盘口数据
                        day, client, channel, code, capture_time, process_time, datas, origin_datas = l2_data_manager.parseL2Data(
@@ -105,12 +114,12 @@
                        # 10ms的网络传输延时
                        capture_timestamp = __start_time - process_time - 10
                        # print("截图时间:", process_time)
                        __start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time,
                        __start_time = l2_data_log.l2_time(code, do_id, round(time.time() * 1000) - __start_time,
                                                           "截图时间:{} 数据解析时间".format(process_time))
                        cid, pid = gpcode_manager.get_listen_code_pos(code)
                        __start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time,
                        __start_time = l2_data_log.l2_time(code, do_id, round(time.time() * 1000) - __start_time,
                                                           "l2获取代码位置耗时")
                        # 判断目标代码位置是否与上传数据位置一致
                        if cid is not None and pid is not None and client == int(cid) and channel == int(pid):
@@ -119,16 +128,19 @@
                                l2_code_operate.verify_with_l2_data_pos_info(code, client, channel)
                                __start_time = round(time.time() * 1000)
                                if gpcode_manager.is_listen(code):
                                    __start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time,
                                    __start_time = l2_data_log.l2_time(code, do_id,
                                                                       round(time.time() * 1000) - __start_time,
                                                                       "l2外部数据预处理耗时")
                                    l2_data_manager_new.L2TradeDataProcessor.process(code, datas, capture_timestamp)
                                    __start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time,
                                    l2_data_manager_new.L2TradeDataProcessor.process(code, datas, capture_timestamp,
                                                                                     do_id)
                                    __start_time = l2_data_log.l2_time(code, do_id,
                                                                       round(time.time() * 1000) - __start_time,
                                                                       "l2数据有效处理外部耗时",
                                                                       False)
                                    # 保存原始数据数量
                                    l2_data_util.save_l2_latest_data_number(code, len(origin_datas))
                                    if round(time.time() * 1000) - __start_time > 20:
                                        l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time,
                                        l2_data_log.l2_time(code, do_id, round(time.time() * 1000) - __start_time,
                                                            "异步保存原始数据条数耗时",
                                                            False)
@@ -155,7 +167,7 @@
                                __end_time = round(time.time() * 1000)
                                # 只记录大于40ms的数据
                                if __end_time - origin_start_time > 100:
                                    l2_data_log.l2_time(code, round(time.time() * 1000) - origin_start_time,
                                    l2_data_log.l2_time(code, do_id, round(time.time() * 1000) - origin_start_time,
                                                        "l2数据处理总耗时",
                                                        True)
                    except Exception as e:
@@ -238,7 +250,6 @@
                                        apply_time = tool.trade_time_add_second(apply_time, 1)
                                    print(apply_time)
                                    l2_data_manager_new.SecondAverageBigNumComputer.set_apply_time(code, apply_time)
                    except Exception as e:
                        logging.exception(e)
@@ -271,6 +282,29 @@
                    buy_time = data["buyTime"]
                    buy_one_price = data["buyOnePrice"]
                    buy_one_volumn = data["buyOneVolumn"]
                    buy_queue = data["buyQueue"]
                    buy_queue_result_list = self.tradeBuyQueue.save(code, gpcode_manager.get_limit_up_price(code),
                                                                    buy_queue)
                    if buy_queue_result_list:
                        # 有数据
                        try:
                            buy_one_price_ = decimal.Decimal(round(float(buy_one_price), 2)).quantize(decimal.Decimal("0.00"))
                            buy_progress_index = self.tradeBuyQueue.save_traded_index(code,buy_one_price_,
                                                                                      buy_queue_result_list)
                            if buy_progress_index is not None:
                                HourCancelBigNumComputer.set_trade_progress(code,buy_progress_index)
                            logger_l2_trade_buy_queue.info("获取成交位置成功: code-{} index-{}  数据-{}", code,
                                                           buy_progress_index,
                                                           json.loads(buy_queue_result_list))
                        except Exception as e:
                            logger_l2_trade_buy_queue.warning("获取成交位置失败: code-{} 原因-{}  数据-{}", code, str(e),
                                                              json.loads(buy_queue_result_list))
                    # buy_queue是否有变化
                    if self.l2_trade_buy_queue_dict.get(code) is None or buy_queue != self.l2_trade_buy_queue_dict.get(
                            code):
                        self.l2_trade_buy_queue_dict[code] = buy_queue
                        logger_l2_trade_buy_queue.info("{}-{}", code, buy_queue)
                    # 保存最近的记录
                    if self.ths_l2_trade_queue_manager.save_recod(code, data):
                        if buy_time != "00:00:00":
@@ -352,14 +386,14 @@
                        if client_id in l2_clients:
                            alert_util.alarm()
                elif type == 60:
                    # 心跳信息
                    # L2自启动成功
                    data = data_process.parse(_str)["data"]
                    client_id = data["client"]
                    print("L2自启动成功", client_id)
                    now_str = tool.get_now_time_str()
                    ts = tool.get_time_as_second(now_str)
                    # 9点25到9点28之间的自启动就需要批量设置代码
                    if tool.get_time_as_second("09:24:50") <= ts <= tool.get_time_as_second("09:28:00"):
                    # 9点25到9点28之间的自启动就需要批量设置代码,目前永远不执行
                    if tool.get_time_as_second("09:24:50") <= ts <= tool.get_time_as_second("09:28:00") and False:
                        # 准备批量设置代码
                        return_json = {"code": 1, "msg": "等待批量设置代码"}
                        return_str = json.dumps(return_json)
@@ -382,17 +416,10 @@
                                break
                            else:
                                time.sleep(3)
                    else:
                        return_json = {"code": 0, "msg": "开启在线状态"}
                        return_str = json.dumps(return_json)
                    # print("心跳:", client_id)
                elif type == 100:
                    # 图像识别
                    return_str = data_process.toJson({"code": 0, "data": {"datas": []}})
                    pass
                sk.send(return_str.encode())
        # print("----------handler end ----------")
@@ -473,7 +500,8 @@
if __name__ == "__main__":
    try:
        thsl2tradequeuemanager().test()
        a=round(float("0002.90"),2)
        print(decimal.Decimal(a).quantize(decimal.Decimal("0.00")))
        # repair_ths_main_site(2)
    except Exception as e:
        print(str(e))
trade_gui.py
@@ -212,6 +212,8 @@
            return code_str
    def buy(self, code, limit_up_price, win=0):
        if not constant.TRADE_ENABLE:
            return
        try:
            logger_trade_gui.info("开始买入:code-{}".format(code))
            if win < 1:
@@ -349,7 +351,7 @@
    # 撤买
    def cancel_buy(self, code):
        if constant.TEST:
        if not constant.TRADE_ENABLE:
            return
        self.buy_cancel_lock.acquire()
        logger_trade_gui.info("开始获取撤单控件:code-{}".format(code))
@@ -397,7 +399,7 @@
    # 交易盘口中的撤买
    def cancel_buy_again(self, code):
        if constant.TEST:
        if not constant.TRADE_ENABLE:
            return
        win = THSBuyWinManagerNew.get_distributed_code_win(code)
        if win is None or win <= 0:
trade_manager.py
@@ -200,8 +200,6 @@
# 购买
@tool.async_call
def __buy(code, price, trade_state, capture_timestamp, last_data, last_data_index):
    if constant.TEST:
        return
    try:
        guiTrade.buy(code, price)
        __place_order_success(code, capture_timestamp, last_data, last_data_index)
trade_queue_manager.py
@@ -28,6 +28,10 @@
            return int(val)
        return None
    def __del_max_buy1_volume(self, code):
        key = "max_buy1_volumn-{}".format(code)
        val = self.__get_redis().delete(key)
    def __save_recod(self, code, time_str, volumn):
        # 保存每一次的
@@ -155,6 +159,10 @@
            return -1
        return val
    def clear_max_buy1_volume(self, code):
        self.__del_max_buy1_volume(code)
class JueJinBuy1VolumnManager:
    __redisManager = redis_manager.RedisManager(1)