Administrator
2023-02-16 92cb2dd75ea37b64b174f42ddd0b5b17d6a4634a
H撤策略优化,新增热门板块爬取,新增windows截图工具
15个文件已修改
3个文件已添加
1039 ■■■■ 已修改文件
constant.py 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
data_export_util.py 7 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
gui.py 58 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
juejin.py 49 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/cancel_buy_strategy.py 290 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_manager_new.py 47 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_source_util.py 122 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_util.py 135 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/safe_count_manager.py 4 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/transaction_progress.py 54 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_trade_test.py 73 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
log.py 15 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
server.py 30 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/hot_block.py 58 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/trade_data_manager.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/trade_gui.py 27 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/trade_manager.py 14 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utils/capture_util.py 43 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
constant.py
@@ -1,7 +1,7 @@
# 是否为测试
TEST = True
TEST = False
# 是否允许交易
TRADE_ENABLE = False
TRADE_ENABLE = True
# 水下捞累计连续水下时间最小值
UNDER_WATER_PRICE_TIME_AS_SECONDS = 1200
# 大单金额(单位为百)
@@ -13,13 +13,16 @@
S_CANCEL_FIRST_RATE = 0.69
S_CANCEL_SECOND_RATE = 0.59
S_CANCEL_THIRD_RATE = 0.49
# s撤守护时间
S_CANCEL_EXPIRE_TIME = 30
# H撤比例
H_CANCEL_FIRST_RATE = 0.69
H_CANCEL_SECOND_RATE = 0.59
H_CANCEL_THIRD_RATE = 0.49
H_CANCEL_MIN_MONEY = 10000000
H_CANCEL_MIN_COUNT = 8
H_CANCEL_MIN_COUNT = 30
H_CANCEL_MIN_BIG_NUM_COUNT = 3
# L2监控的最低金额
L2_MIN_MONEY = 500000
data_export_util.py
@@ -11,6 +11,7 @@
import l2_data_util
import l2.l2_data_util
import log
from l2 import l2_data_source_util
def export_l2_excel(code, date=None):
@@ -142,7 +143,9 @@
            # 买
            for d in num_dict[data["val"]["num"]]:
                if int(d["val"]["operateType"]) == 1:
                    buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(d, num_operate_map[code])
                    buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data(code, d,
                                                                                                     num_operate_map[
                                                                                                         code])
                    if buy_index == data["index"]:
                        ws.write(index, 8, "{}-{}".format(d["index"], d["val"]["time"]), cancel_style)
                        break
@@ -193,6 +196,6 @@
if __name__ == "__main__":
    codes = ["603083"]
    codes = ["002970"]
    for code in codes:
        export_l2_excel(code)
gui.py
@@ -61,6 +61,7 @@
    # tcpserver.handle_request()  # 只接受一个客户端连接
    tcpserver.serve_forever()  # 永久循环执行,可以接受多个客户端连接
def createOCRServer():
    print("create OCRServer")
    laddr = "", 9002
@@ -81,9 +82,6 @@
        self.serverProcess = multiprocessing.Process(target=createServer, args=(p1, gs_server_pipe,))
        self.jueJinProcess = multiprocessing.Process(target=startJueJin, args=(p2,))
        self.ocrServerProcess = multiprocessing.Process(target=createOCRServer)
        self.p1 = p1
        self.p2 = p2
@@ -272,8 +270,31 @@
            win.geometry("{}x{}".format(width, height))
            win.mainloop()
        # 绘制代码交易窗口分配
        def refresh_trade_buy_win_data():
            code_wins = trade_gui.THSBuyWinManagerNew.get_distributed_code_wins()
            # 获取代码涨幅
            codeActualPriceProcessor = trade_data_manager.CodeActualPriceProcessor()
            datas = []
            for data in code_wins:
                rate = codeActualPriceProcessor.get_current_rate(data[0])
                datas.append((data[0], rate, data[1]))
            datas.sort(key=lambda tup: tup[1] if tup[1] is not None else 1)
            datas.reverse()
            index = 0
            table.model.deleteRows()
            for data in datas:
                table.model.addRow()
                table.model.setValueAt(data[0], index, 0)
                table.model.setValueAt(f"{data[1]}%", index, 1)
                table.model.setValueAt(data[2], index, 2)
                # table.model.setValueAt(data["apply_time"], index, 2)
                index += 1
            table.redraw()
        btn = Button(frame, text="刷新收盘价", command=refresh_close_price_data)
        btn.place(x=5, y=150)
        sv_num = StringVar(value="获取到收盘价数量:未知")
        cl = Label(frame, textvariable=sv_num, bg="#DDDDDD", fg="#666666")
@@ -284,6 +305,34 @@
        btn = Button(frame, text="今日涨停", command=get_limit_up_codes_win)
        btn.place(x=300, y=150)
        trade_win_datas = []
        # draw_trade_buy_win(360, 140)
        table_width = 300
        table_height = 90
        _frame = Frame(frame, {"height": table_height, "width": table_width, "bg": "#DDDDDD"})
        table = tkintertable.TableCanvas(_frame, data={"row0": {'代码': '', '涨幅': '', '窗口句柄': ''}}, read_only=True,
                                         width=table_width, height=table_height, thefont=('微软雅黑', 9), cellwidth=100,
                                         rowheaderwidth=20)
        table.show()
        _frame.place(x=380-12, y=130)
        refresh_trade_buy_win_data()
        refresh_close_price_data()
        btn = Button(frame, text="刷新", command=refresh_trade_buy_win_data,height=1)
        btn.place(x=730-12, y=130)
        def re_distribute_buy_win():
            try:
                juejin.distribute_buy_win()
                refresh_trade_buy_win_data()
                showinfo("提示", "分配完成")
            except Exception as e:
                showerror("分配出错", str(e))
        btn = Button(frame, text="重新分配窗口", command=re_distribute_buy_win, height=1)
        btn.place(x=730-12, y=165)
    # 绘制交易状态
    def __draw_trade_state(self, frame):
@@ -681,7 +730,8 @@
        cl = Label(frame, text="今日委托:", bg="#DDDDDD", fg="#666666")
        cl.place(x=5, y=30)
        delegate_datas = {}
        delegate_datas["row{}".format(0)] = {'委托时间': '', '代码': '', '申报时间': '', '委托数量': '', '委托价格': '', '成交均价': '', '成交数量': '',
        delegate_datas["row{}".format(0)] = {'委托时间': '', '代码': '', '申报时间': '', '委托数量': '', '委托价格': '', '成交均价': '',
                                             '成交数量': '',
                                             '操作': ''}
        cl = Label(frame, text="更新时间:", bg="#DDDDDD", fg="#666666")
juejin.py
@@ -8,6 +8,8 @@
import json
import logging
import time as t
import numpy
import schedule
import gm.api as gmapi
@@ -252,7 +254,7 @@
# 获取到现价
def accpt_prices(prices):
def accept_prices(prices):
    print("价格代码数量:", len(prices))
    __actualPriceProcessor.save_current_price_codes_count(len(prices))
    # 采集的代码数量不对
@@ -401,6 +403,38 @@
        return data
    @staticmethod
    def get_now_price(codes):
        data = JueJinManager.get_gp_current_info(codes)
        prices = []
        for item in data:
            code = item["symbol"].split(".")[1]
            price = item["price"]
            prices.append((code, price))
        return prices
    # 获取代码的涨幅
    @staticmethod
    def get_codes_limit_rate(codes):
        datas = JueJinManager.get_gp_latest_info(codes)
        pre_price_dict = {}
        for data in datas:
            code = data["sec_id"]
            pre_close = tool.to_price(decimal.Decimal(str(data['pre_close'])))
            pre_price_dict[code] = pre_close
        now_prices = JueJinManager.get_now_price(codes)
        f_results = []
        for data in now_prices:
            code = data[0]
            price = data[1]
            pre_price = float(pre_price_dict.get(code))
            rate = round((price - pre_price) * 100 / pre_price, 2)
            f_results.append((code, rate))
        f_results.sort(key=lambda tup: tup[1])
        f_results.reverse()
        return f_results
    @staticmethod
    def get_gp_current_info(codes):
        account_id, s_id, token = getAccountInfo()
        symbols = gpcode_manager.get_gp_list_with_prefix(codes)
@@ -473,5 +507,16 @@
    return _fresult
# 根据涨幅高低分配交易窗口
def distribute_buy_win():
    if tool.trade_time_sub(tool.get_now_time_str(),"09:30:00") > 0:
        raise Exception("只能9:30之前重新分配窗口")
    datas = JueJinManager.get_codes_limit_rate(gpcode_manager.get_gp_list())
    matrix = numpy.array(datas)
    codes = matrix[:, 0].tolist()
    trade_gui.THSBuyWinManagerNew.fill_codes(codes)
if __name__ == '__main__':
    everyday_init()
    distribute_buy_win()
l2/cancel_buy_strategy.py
@@ -18,7 +18,7 @@
from l2.safe_count_manager import BuyL2SafeCountManager
from l2.transaction_progress import TradeBuyQueue
from trade import trade_data_manager, trade_queue_manager, l2_trade_factor
from l2 import l2_log, l2_data_log
from l2 import l2_log, l2_data_log, l2_data_source_util
from l2.l2_data_util import L2DataUtil, local_today_num_operate_map, local_today_datas
from log import logger_buy_1_volumn, logger_l2_h_cancel, logger_l2_s_cancel
@@ -85,9 +85,9 @@
                    left_big_num += val["num"] * data["re"]
            elif L2DataUtil.is_limit_up_price_buy_cancel(val):
                # 查询买入位置
                buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data,
                                                                                 local_today_num_operate_map.get(
                                                                                     code))
                buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data(code, data,
                                                                                                 local_today_num_operate_map.get(
                                                                                                     code))
                if buy_index is not None and start_index <= buy_index <= end_index:
                    if buy_index - buy_single_index < fire_count:
                        left_big_num -= 0
@@ -107,17 +107,17 @@
        return left_big_num
    @classmethod
    def need_cancel(cls, code, buy_single_index, buy_exec_index, start_index, end_index, total_data, threadId,
    def need_cancel(cls, code, buy_single_index, buy_exec_index, start_index, end_index, total_data,
                    need_cancel=True):
        if start_index == 375:
            print("进入调试")
        # 只守护30s
        if tool.trade_time_sub(total_data[start_index]["val"]["time"], total_data[buy_exec_index]["val"]["time"]) > 30:
        if tool.trade_time_sub(total_data[start_index]["val"]["time"], total_data[buy_exec_index]["val"]["time"]) > constant.S_CANCEL_EXPIRE_TIME:
            return False, None
        l2_log.cancel_debug(code, "S级是否需要撤单,数据范围:{}-{} ", start_index, end_index)
        logger_l2_s_cancel.debug(f"code-{code} S级是否需要撤单,数据范围:{start_index}-{end_index}")
        if tool.trade_time_sub(total_data[end_index]["val"]["time"], total_data[buy_exec_index]["val"]["time"]) > 30:
        if tool.trade_time_sub(total_data[end_index]["val"]["time"], total_data[buy_exec_index]["val"]["time"]) >  constant.S_CANCEL_EXPIRE_TIME:
            # 结束位置超过了执行位置30s,需要重新确认结束位置
            for i in range(end_index, start_index - 1, -1):
                if total_data[end_index]["val"]["time"] != total_data[i]["val"]["time"]:
@@ -171,11 +171,11 @@
                    buy_num += data["re"] * int(val["num"])
                elif L2DataUtil.is_limit_up_price_buy_cancel(val):
                    # 查询买入位置
                    buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data,
                                                                                     local_today_num_operate_map.get(
                                                                                         code))
                    buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data(code, data,
                                                                                                     local_today_num_operate_map.get(
                                                                                                         code))
                    if buy_index is not None and buy_single_index <= buy_index:
                        cancel_num += buy_data["re"] * int(buy_data["val"]["num"])
                        cancel_num += total_data[buy_index]["re"] * int(total_data[buy_index]["val"]["num"])
                    elif buy_index is None:
                        # 有部分撤销从而导致的无法溯源,这时就需要判断预估买入时间是否在a_start_index到a_end_index的时间区间
                        min_space, max_space = l2_data_util.compute_time_space_as_second(val["cancelTime"],
@@ -202,8 +202,8 @@
                            return True, total_data[i]
        finally:
            l2_log.cancel_debug( code, "S级大单 范围:{}-{} 取消计算结果:{}/{},比例:{}", start_index, end_index, cancel_num,
                                buy_num, round(cancel_num / max(buy_num,1), 2))
            l2_log.cancel_debug(code, "S级大单 范围:{}-{} 取消计算结果:{}/{},比例:{}", start_index, end_index, cancel_num,
                                buy_num, round(cancel_num / max(buy_num, 1), 2))
            # 保存处理进度与数据
            cls.__save_compute_data(code, process_index, buy_num, cancel_num)
@@ -225,6 +225,7 @@
    def __getRedis(cls):
        return cls.__redis_manager.getRedis()
    # 保存成交位置到执行位置的揽括范围数据
    @classmethod
    def __save_watch_index_set(cls, code, datas, process_index, finish):
        key = f"h_cancel_watch_indexs-{code}"
@@ -240,7 +241,41 @@
        val = json.loads(val)
        return val[0], val[1], val[2]
    # 保存结束位置
    # 保存执行位置后面的守护数据
    @classmethod
    def __save_watch_index_set_after_exec(cls, code, datas, process_index, total_count, big_num_count, finished):
        key = f"h_cancel_watch_indexs_exec-{code}"
        cls.__getRedis().setex(key, tool.get_expire(),
                               json.dumps((list(datas), process_index, total_count, big_num_count, finished)))
    # 保存成交进度
    @classmethod
    def __get_watch_index_set_after_exec(cls, code):
        key = f"h_cancel_watch_indexs_exec-{code}"
        val = cls.__getRedis().get(key)
        if val is None:
            return [], -1, 0, 0, False
        val = json.loads(val)
        return val[0], val[1], val[2], val[3], val[4]
    # 保存成交进度
    @classmethod
    def __save_traded_progress(cls, code, origin_process_index, latest_process_index):
        key = "h_cancel_traded_progress-{}".format(code)
        cls.__getRedis().setex(key, tool.get_expire(), json.dumps((origin_process_index, latest_process_index)))
    @classmethod
    def __get_traded_progress(cls, code):
        key = "h_cancel_traded_progress-{}".format(code)
        val = cls.__getRedis().get(key)
        if val is None:
            return None, None
        val = json.loads(val)
        return val[0], val[1]
    # 保存结算位置
    @classmethod
    def __save_compute_data(cls, code, process_index, cancel_num):
        key = "h_cancel_compute_data-{}".format(code)
@@ -262,38 +297,49 @@
    @classmethod
    def __clear_data(cls, code):
        ks = ["h_cancel_compute_data-{}".format(code)]
        ks = ["h_cancel_compute_data-{}".format(code), f"h_cancel_watch_indexs_exec-{code}",
              f"h_cancel_watch_indexs-{code}",f"h_cancel_traded_progress-{code}"]
        for key in ks:
            cls.__getRedis().delete(key)
    @classmethod
    def clear_data(cls):
        ks = ["h_cancel_compute_data-*"]
        for key in ks:
            keys = cls.__getRedis().keys(key)
            for k in keys:
                cls.__getRedis().delete(k)
    def need_cancel(cls, code, buy_exec_index, start_index, end_index, total_data, local_today_num_operate_map):
        time_space = tool.trade_time_sub(total_data[start_index]["val"]["time"],
                                         total_data[buy_exec_index]["val"]["time"])
        if time_space >=  constant.S_CANCEL_EXPIRE_TIME - 1:
            # 开始计算需要监控的单
            cls.__compute_watch_indexs_after_exec(code, buy_exec_index, total_data, local_today_num_operate_map)
    @classmethod
    def need_cancel(cls, code, buy_exec_index, start_index, end_index, total_data, threadId):
        # 守护30s以外的数据
        if tool.trade_time_sub(total_data[start_index]["val"]["time"], total_data[buy_exec_index]["val"]["time"]) <= 30:
        if time_space <=  constant.S_CANCEL_EXPIRE_TIME:
            return False, None
        watch_indexs = cls.__get_watch_index_set(code)[0]
            # 获取成交进度
        origin_progress_index, latest_progress_index = cls.__get_traded_progress(code)
        # 监听的数据
        watch_indexs_dict = {}
        # 监听的总数
        total_nums = 0
        for indexs in watch_indexs:
            watch_indexs_dict[indexs[0]] = indexs
            total_nums += total_data[indexs[0]]["val"]["num"] * indexs[2]
        if watch_indexs is None:
            l2_log.cancel_debug(code, "H撤没获取到监听范围数据")
            return False, None
        if origin_progress_index is not None:
            # 获取成交位置到执行位置的监控数据
            watch_indexs = cls.__get_watch_index_set(code)[0]
            # 监听的总数
            for indexs in watch_indexs:
                index = indexs[0]
                if index < latest_progress_index:
                    continue
                # 只计算最近的执行位之后的数据
                watch_indexs_dict[index] = indexs
                total_nums += total_data[index]["val"]["num"] * indexs[2]
        # 获取到执行位后的监听数据
        datas, process_index, total_count, big_num_count, finished = cls.__get_watch_index_set_after_exec(code)
        if datas:
            for indexs in datas:
                index = indexs[0]
                watch_indexs_dict[index] = indexs
                total_nums += total_data[index]["val"]["num"] * indexs[2]
        processed_index, cancel_num = cls.__get_compute_data(code)
        l2_log.cancel_debug( code, "H级是否需要撤单,数据范围:{}-{} ", start_index, end_index)
        l2_log.cancel_debug(code, "H级是否需要撤单,数据范围:{}-{} ", start_index, end_index)
        # 获取下单次数
        place_order_count = trade_data_manager.placeordercountmanager.get_place_order_count(code)
        cancel_rate_threshold = constant.H_CANCEL_FIRST_RATE
@@ -314,19 +360,18 @@
                val = data["val"]
                if L2DataUtil.is_limit_up_price_buy_cancel(val):
                    # 查询买入位置
                    buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data,
                                                                                     local_today_num_operate_map.get(
                                                                                         code))
                    buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data(code, data,
                                                                                                     local_today_num_operate_map)
                    if buy_index is not None and buy_index in watch_indexs_dict:
                        cancel_num += buy_data["re"] * int(buy_data["val"]["num"])
                        cancel_num += data["re"] * val["num"]
                        if cancel_num / total_nums > cancel_rate_threshold:
                            return True, total_data[i]
                            return True, data
        finally:
            l2_log.cancel_debug(code, "H级撤单计算结果 范围:{}-{} 处理进度:{} 取消计算结果:{}/{}", start_index, end_index,
                                process_index, cancel_num,
                                total_nums)
            logger_l2_h_cancel.info(f"code-{code} H级撤单计算结果 范围:{start_index}-{end_index} 处理进度:{process_index} 目标比例:{cancel_rate_threshold} 取消计算结果:{cancel_num}/{total_nums}")
            logger_l2_h_cancel.info(
                f"code-{code} H级撤单计算结果 范围:{start_index}-{end_index} 处理进度:{process_index} 目标比例:{cancel_rate_threshold} 取消计算结果:{cancel_num}/{total_nums}")
            # 保存处理进度与数据
            cls.__save_compute_data(code, process_index, cancel_num)
        return False, None
@@ -335,26 +380,29 @@
    @classmethod
    def place_order_success(cls, code, buy_single_index, buy_exec_index, total_data, local_today_num_operate_map):
        cls.__clear_data(code)
        cls.set_trade_progress(code, buy_exec_index, total_data, local_today_num_operate_map, True)
    # 设置成交进度
    @classmethod
    def set_trade_progress(cls, code, index, total_data, local_today_num_operate_map, is_default=False):
        logger_l2_h_cancel.info(f"code-{code} 成交进度:{index} 数据结束位置:"+str(total_data[-1]["index"]))
        last_index, last_is_default = cls.__tradeBuyQueue.get_traded_index(code)
        # 成交进度
        if is_default:
            cls.__tradeBuyQueue.set_default_traded_index(code, index)
            cls.__compute_watch_indexs(code, total_data, local_today_num_operate_map)
    def set_trade_progress(cls, code, data_time, buy_exec_index, index, total_data, local_today_num_operate_map):
        cls.__tradeBuyQueue.set_traded_index(code, index)
        # 如果获取时间与执行时间小于29则不需要处理
        if buy_exec_index is None or buy_exec_index < 0 or tool.trade_time_sub(data_time, total_data[buy_exec_index]["val"]["time"]) < constant.S_CANCEL_EXPIRE_TIME - 1:
            return
        # 保存成交进度
        origin_index, latest_index = cls.__get_traded_progress(code)
        if origin_index is None:
            cls.__save_traded_progress(code, index, index)
            # 计算揽括范围
            cls.__compute_watch_indexs_between_traded_exec(code, index, buy_exec_index, total_data,
                                                           local_today_num_operate_map)
        else:
            if last_index is None or last_index != index:
                cls.__tradeBuyQueue.set_traded_index(code, index)
                cls.__compute_watch_indexs(code, total_data, local_today_num_operate_map)
            cls.__save_traded_progress(code, origin_index, index)
        logger_l2_h_cancel.info(f"code-{code} 成交进度:{index} 数据结束位置:" + str(total_data[-1]["index"]))
    # 涨停买是否撤单
    @classmethod
    def __get_limit_up_buy_no_canceled_count(cls, index, total_data, local_today_num_operate_map):
        data =None
    def __get_limit_up_buy_no_canceled_count(cls, code, index, total_data, local_today_num_operate_map):
        data = None
        try:
            data = total_data[index]
        except:
@@ -367,8 +415,9 @@
            canceled = False
            if cancel_datas:
                for cancel_data in cancel_datas:
                    buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(cancel_data,
                                                                                     local_today_num_operate_map)
                    buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data(code, cancel_data,
                                                                                                     local_today_num_operate_map)
                    if buy_index == index:
                        canceled = True
                        count = data["re"] - cancel_data["re"]
@@ -382,12 +431,14 @@
        # 计算排名前N的大单
    # 过时数据
    @classmethod
    def __compute_top_n_num(cls, start_index, total_data, local_today_num_operate_map, count):
    def __compute_top_n_num(cls, code, start_index, total_data, local_today_num_operate_map, count):
        # 找到还未撤的TOPN大单
        watch_set = set()
        for i in range(start_index, total_data[-1]["index"] + 1):
            not_cancel_count = cls.__get_limit_up_buy_no_canceled_count(i, total_data, local_today_num_operate_map)
            not_cancel_count = cls.__get_limit_up_buy_no_canceled_count(code, i, total_data,
                                                                        local_today_num_operate_map)
            if not_cancel_count > 0:
                watch_set.add((i, total_data[i]["val"]["num"], not_cancel_count))
        # 针按照手数排序
@@ -398,56 +449,82 @@
        watch_set = set(watch_list)
        return watch_set
    # 从成交位置到执行位置
    @classmethod
    def __compute_watch_indexs(cls, code, total_data, local_today_num_operate_map):
        trade_progress_index, is_default = cls.__tradeBuyQueue.get_traded_index(code)
        threshold_money, msg = l2_trade_factor.L2TradeFactorUtil.compute_m_value(code)
        if threshold_money < constant.H_CANCEL_MIN_MONEY:
            threshold_money = constant.H_CANCEL_MIN_MONEY
        limit_up_price = gpcode_manager.get_limit_up_price(code)
        threshold_num = round(threshold_money / (limit_up_price * 100))
        if trade_progress_index is None:
            raise Exception("尚未获取到成交进度")
        total_num = 0
        watch_set = set()
    def __compute_watch_indexs_between_traded_exec(cls, code, progress_index, buy_exec_index, total_data,
                                                   local_today_num_operate_map):
        total_count = 0
        # 暂时不需要使用
        process_index = -1
        finished = False
        safe_count = cls.__buyL2SafeCountManager.get_safe_count(code)
        for i in range(trade_progress_index, total_data[-1]["index"] + 1):
            process_index = i
            left_count = cls.__get_limit_up_buy_no_canceled_count(i, total_data, local_today_num_operate_map)
        watch_set = set()
        big_num_count = 0
        for i in range(progress_index, buy_exec_index):
            left_count = cls.__get_limit_up_buy_no_canceled_count(code, i, total_data, local_today_num_operate_map)
            if left_count > 0:
                data = total_data[i]
                val = data["val"]
                total_num += val["num"] * data["re"]
                if val["num"] * float(val["price"]) <= 9900:
                    continue
                total_count += left_count
                watch_set.add((i, val["num"], left_count))
                if l2_data_util.is_big_money(val):
                    big_num_count += data["re"]
        final_watch_list = list(watch_set)
        final_watch_list.sort(key=lambda x: x[0])
        logger_l2_h_cancel.info(f"code-{code}  H撤监控成交位到执行位:{final_watch_list}")
        cls.__save_watch_index_set(code, final_watch_list, buy_exec_index, True)
        # 删除原来的计算数据
        # cls.__del_compute_data(code)
    # 计算执行位置之后的需要监听的数据
    @classmethod
    def __compute_watch_indexs_after_exec(cls, code, buy_exec_index, total_data, local_today_num_operate_map):
        watch_list, process_index_old, total_count_old, big_num_count_old, finish = cls.__get_watch_index_set_after_exec(
            code)
        if watch_list and finish:
            # 已经计算完了不需要再进行计算
            return
        watch_set = set()
        if watch_list:
            for data in watch_list:
                watch_set.add((data[0], data[1], data[2]))
        # 暂时不需要使用
        process_index = process_index_old
        finished = False
        big_num_count = big_num_count_old
        total_count = total_count_old
        for i in range(buy_exec_index, total_data[-1]["index"] + 1):
            if i <= process_index_old:
                continue
            process_index = i
            left_count = cls.__get_limit_up_buy_no_canceled_count(code, i, total_data, local_today_num_operate_map)
            if left_count > 0:
                data = total_data[i]
                val = data["val"]
                if val["num"] * float(val["price"]) <= 9900:
                    continue
                total_count += left_count
                watch_set.add((i, val["num"], left_count))
                if l2_data_util.is_big_money(val):
                    big_num_count += data["re"]
                # 判断是否达到阈值
                if total_count >= safe_count:  # and total_num >= threshold_num
                if total_count >= constant.H_CANCEL_MIN_COUNT and big_num_count >= constant.H_CANCEL_MIN_BIG_NUM_COUNT:  # and total_num >= threshold_num
                    finished = True
                    # 最小8笔
                    l2_log.cancel_debug(code, "获取到H撤监听数据:{},计算截至位置:{}", json.dumps(list(watch_set)),
                                        total_data[-1]["index"])
                    break
        # 计算TOP N大单
        top_n_watch_set = cls.__compute_top_n_num(trade_progress_index, total_data, local_today_num_operate_map,
                                                  safe_count)
        logger_l2_h_cancel.info(f"code-{code}  H撤监控临单:{watch_set}")
        logger_l2_h_cancel.info(f"code-{code}  H撤监控较大单:{top_n_watch_set}")
        final_watch_set = set.union(watch_set, top_n_watch_set)
        final_watch_list = list(final_watch_set)
        final_watch_list.sort(key=lambda x: x[0])
        logger_l2_h_cancel.info(f"code-{code} 安全笔数:{safe_count}  H撤最终监控大单:{final_watch_list}")
        # 保存计算范围
        cls.__save_watch_index_set(code, final_watch_set, process_index, finished)
        # 删除原来的计算数据
        cls.__del_compute_data(code)
    @classmethod
    def get_watch_indexs(cls, code):
        return cls.__get_watch_index_set(code)[0]
        final_watch_list = list(watch_set)
        final_watch_list.sort(key=lambda x: x[0])
        logger_l2_h_cancel.info(f"code-{code}  H撤监控执行位相邻单:{final_watch_list}")
        # 保存计算范围
        cls.__save_watch_index_set_after_exec(code, final_watch_list, process_index, total_count, big_num_count,
                                              finished)
        # 删除原来的计算数据
        # cls.__del_compute_data(code)
# --------------------------------封单额变化撤------------------------
@@ -535,17 +612,15 @@
                    break
        else:
            keys_ = cls.__get_l2_second_money_record_keys(code, "*")
            key_list=[]
            key_list = []
            for k in keys_:
                time__ = k.split("-")[-1]
                key_list.append((int(time__),k))
                key_list.append((int(time__), k))
            key_list.sort(key=lambda tup: tup[0])
            for t in key_list:
                if t[0] <= int(time_):
                    keys.append(t[1])
                    break
        keys.sort(key=lambda tup: int(tup.split("-")[-1]))
        if len(keys) > 0:
@@ -622,7 +697,7 @@
    # 返回取消的标志数据
    # with_cancel 是否需要判断是否撤销
    @classmethod
    def process_data(cls, random_key, code, start_index, end_index, buy_single_begin_index, buy_exec_index,
    def process_data(cls, code, start_index, end_index, buy_single_begin_index, buy_exec_index,
                     with_cancel=True):
        if buy_single_begin_index is None or buy_exec_index is None:
            return None, None
@@ -697,14 +772,13 @@
                if big_money_num_manager.is_big_num(data["val"]):
                    if L2DataUtil.is_limit_up_price_buy_cancel(data["val"]):
                        cancel_big_num_count += int(data["re"])
                        # TODO 大量重复的工作需要处理,可以暂存在内存中,从而减少计算
                        # 获取是否在买入执行信号周围2s
                        buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data,
                                                                                         local_today_num_operate_map.get(
                                                                                             code))
                        if buy_index is not None and buy_data is not None:
                        buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data(code, data,
                                                                                                         local_today_num_operate_map.get(
                                                                                                             code))
                        if buy_index is not None:
                            # 相差1s
                            buy_time = buy_data["val"]["time"]
                            buy_time = total_datas[buy_index]["val"]["time"]
                            if abs(buy_exec_time - tool.get_time_as_second(buy_time)) < 2:
                                cancel_big_num_count += int(data["re"])
@@ -740,7 +814,7 @@
                # 如果是减小项
                if val < 0:
                    # 当前量小于最大量的24%则需要取消
                    if exec_time_offset >= 30:
                    if exec_time_offset >=  constant.S_CANCEL_EXPIRE_TIME:
                        if total_num <= min_volumn_big and max_buy1_volume * 0.24 > total_num:
                            cancel_index = i
                            cancel_msg = "封板额小于最高封板额的24% {}/{}".format(total_num, max_buy1_volume)
@@ -853,7 +927,7 @@
    # 处理数据,返回是否需要撤单
    # 处理范围:买入执行位-当前最新位置
    @classmethod
    def process(cls, random_key, code, start_index, end_index, buy_exec_index):
    def process(cls, code, start_index, end_index, buy_exec_index):
        # 获取涨停卖的阈值
        limit_up_price = gpcode_manager.get_limit_up_price(code)
        zyltgb = l2_trade_factor.L2TradeFactorUtil.get_zyltgb(code)
l2/l2_data_manager_new.py
@@ -16,7 +16,7 @@
import tool
from trade import trade_data_manager, trade_manager, trade_queue_manager, l2_trade_factor, l2_trade_util, \
    trade_result_manager
from l2 import safe_count_manager, l2_data_manager, l2_data_log, l2_log
from l2 import safe_count_manager, l2_data_manager, l2_data_log, l2_log, l2_data_source_util
from l2.cancel_buy_strategy import SecondCancelBigNumComputer, HourCancelBigNumComputer, L2LimitUpMoneyStatisticUtil, \
    L2LimitUpSellStatisticUtil
from l2.l2_data_manager import L2DataException, TradePointManager
@@ -121,8 +121,9 @@
            # 如果是涨停买撤信号需要看数据位置是否比开始处理时间早
            if L2DataUtil.is_limit_up_price_buy_cancel(data["val"]):
                # 获取买入信号
                buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(total_datas[i],
                                                                                 local_today_num_operate_map.get(code))
                buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data(code, total_datas[i],
                                                                                                 local_today_num_operate_map.get(
                                                                                                     code))
                if buy_index is not None and buy_index < begin_pos:
                    continue
@@ -152,7 +153,6 @@
class L2TradeDataProcessor:
    unreal_buy_dict = {}
    random_key = {}
    l2BigNumForMProcessor = L2BigNumForMProcessor()
    __codeActualPriceProcessor = CodeActualPriceProcessor()
    buy1PriceManager = trade_queue_manager.Buy1PriceManager()
@@ -164,8 +164,7 @@
    # 数据处理入口
    # datas: 本次截图数据
    # capture_timestamp:截图时间戳
    def process(cls, code, datas, capture_timestamp, do_id):
        cls.random_key[code] = do_id
    def process(cls, code, datas, capture_timestamp):
        __start_time = round(t.time() * 1000)
        try:
            if len(datas) > 0:
@@ -189,7 +188,7 @@
                finally:
                    # 保存数据
                    __start_time = round(t.time() * 1000)
                    l2.l2_data_util.save_l2_data(code, datas, add_datas, cls.random_key[code])
                    l2.l2_data_util.save_l2_data(code, datas, add_datas)
                    __start_time = l2_data_log.l2_time(code,
                                                       round(t.time() * 1000) - __start_time,
                                                       "保存数据时间({})".format(len(add_datas)))
@@ -204,7 +203,7 @@
            print(id(local_today_datas))
            # 拼接数据
            local_today_datas[code].extend(add_datas)
            l2.l2_data_util.load_num_operate_map(l2.l2_data_util.local_today_num_operate_map, code, add_datas)
            l2.l2_data_util.load_num_operate_map(local_today_num_operate_map, code, add_datas)
            # 第1条数据是否为09:30:00
            if add_datas[0]["val"]["time"] == "09:30:00":
@@ -291,12 +290,14 @@
        def buy_1_cancel():
            _start_time = round(t.time() * 1000)
            # 撤单计算,只看买1
            cancel_data, cancel_msg = L2LimitUpMoneyStatisticUtil.process_data(cls.random_key[code], code, start_index,
            cancel_data, cancel_msg = L2LimitUpMoneyStatisticUtil.process_data(code, start_index,
                                                                               end_index,
                                                                               buy_single_index, buy_exec_index)
            l2_data_log.l2_time(code, round(t.time() * 1000) - _start_time,
                                "已下单-买1统计耗时")
            if constant.TEST:
                return None, ""
            return cancel_data, cancel_msg
        # S撤
@@ -307,8 +308,7 @@
            try:
                b_need_cancel, b_cancel_data = SecondCancelBigNumComputer.need_cancel(code, buy_single_index,
                                                                                      buy_exec_index, start_index,
                                                                                      end_index, total_data,
                                                                                      cls.random_key[code])
                                                                                      end_index, total_data)
                if b_need_cancel:
                    return b_cancel_data, "S大单撤销比例触发阈值"
            except Exception as e:
@@ -324,9 +324,8 @@
            _start_time = round(t.time() * 1000)
            try:
                b_need_cancel, b_cancel_data = HourCancelBigNumComputer.need_cancel(code, buy_exec_index, start_index,
                                                                                    end_index, total_data,
                                                                                    cls.random_key[code])
                if b_need_cancel and not cancel_data:
                                                                                    end_index, total_data, local_today_num_operate_map.get(code))
                if b_need_cancel and b_cancel_data:
                    return b_cancel_data, "H撤销比例触发阈值"
            except Exception as e:
                logging.exception(e)
@@ -340,7 +339,7 @@
            _start_time = round(t.time() * 1000)
            # 统计板上卖
            try:
                cancel_data, cancel_msg = L2LimitUpSellStatisticUtil.process(cls.random_key[code], code, start_index,
                cancel_data, cancel_msg = L2LimitUpSellStatisticUtil.process(code, start_index,
                                                                             end_index,
                                                                             buy_exec_index)
                return cancel_data, cancel_msg
@@ -714,7 +713,7 @@
            f2 = dask.delayed(limit_up_time_manager.save_limit_up_time)(code, total_datas[compute_index]["val"]["time"])
            f3 = dask.delayed(cls.__virtual_buy)(code, buy_single_index, compute_index, capture_time)
            f4 = dask.delayed(l2_data_manager.TradePointManager.delete_buy_cancel_point)(code)
            f5 = dask.delayed(L2LimitUpMoneyStatisticUtil.process_data)(cls.random_key[code], code, buy_single_index,
            f5 = dask.delayed(L2LimitUpMoneyStatisticUtil.process_data)(code, buy_single_index,
                                                                        compute_index,
                                                                        buy_single_index,
                                                                        buy_exec_index, False)
@@ -744,7 +743,7 @@
                need_cancel, cancel_data = SecondCancelBigNumComputer.need_cancel(code, buy_single_index,
                                                                                  compute_index,
                                                                                  buy_single_index, compute_index,
                                                                                  total_datas, cls.random_key[code],
                                                                                  total_datas,
                                                                                  True)
                _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time,
                                                  "S级大单处理耗时", force=True)
@@ -758,7 +757,7 @@
                    cls.__buy(code, capture_time, total_datas[compute_index], compute_index)
            else:
                SecondCancelBigNumComputer.need_cancel(code, buy_single_index, compute_index, buy_single_index,
                                                       compute_index, total_datas, cls.random_key[code], False)
                                                       compute_index, total_datas, False)
                _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time,
                                                  "S级大单处理耗时", force=True)
@@ -927,9 +926,9 @@
                    # 只统计59万以上的金额
                    # 涨停买撤
                    # 判断买入位置是否在买入信号之前
                    buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(total_datas[i],
                                                                                     local_today_num_operate_map.get(
                                                                                         code))
                    buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data(code, total_datas[i],
                                                                                                     local_today_num_operate_map.get(
                                                                                                         code))
                    if buy_index is not None:
                        # 找到买撤数据的买入点
                        if buy_index >= buy_single_index:
@@ -940,7 +939,7 @@
                            l2_log.buy_debug(code, "{}数据在买入信号之后 撤买纯买手数:{} 目标手数:{}", i, buy_nums, threshold_num)
                        else:
                            l2_log.buy_debug(code, "{}数据在买入信号之前,买入位:{}", i, buy_index)
                            if total_datas[buy_single_index]["val"]["time"] == buy_data["val"]["time"]:
                            if total_datas[buy_single_index]["val"]["time"] == total_datas[buy_index]["val"]["time"]:
                                # 同一秒,当作买入信号之后处理
                                buy_nums -= int(_val["num"]) * int(data["re"])
                                buy_count -= int(data["re"])
@@ -1102,9 +1101,9 @@
        cls.random_key[code] = random.randint(0, 100000)
        buy_single_begin_index, buy_exec_index = 426, 479
        L2LimitUpMoneyStatisticUtil.process_data(cls.random_key[code], code, 480, 519,
        L2LimitUpMoneyStatisticUtil.process_data(code, 480, 519,
                                                 buy_single_begin_index, buy_exec_index, False)
        L2LimitUpMoneyStatisticUtil.process_data(cls.random_key[code], code, 480, 519,
        L2LimitUpMoneyStatisticUtil.process_data(code, 480, 519,
                                                 buy_single_begin_index, buy_exec_index, False)
    @classmethod
l2/l2_data_source_util.py
New file
@@ -0,0 +1,122 @@
"""
L2数据溯源
"""
import tool
class L2DataSourceUtils(object):
    __cancel_and_buy_map = {}
    __buy_and_cancel_map = {}
    @classmethod
    def __save_map(cls, code, buy_index, cancel_index):
        if cls.__cancel_and_buy_map.get(code) is None:
            cls.__cancel_and_buy_map[code] = {}
        cls.__cancel_and_buy_map[code][cancel_index] = buy_index
        if cls.__buy_and_cancel_map.get(code) is None:
            cls.__buy_and_cancel_map[code] = {}
        cls.__buy_and_cancel_map[code][buy_index] = cancel_index
    @classmethod
    def __get_cancel_index(cls, code, buy_index):
        if code not in cls.__buy_and_cancel_map:
            return None
        return cls.__buy_and_cancel_map[code].get(buy_index)
    @classmethod
    def __get_buy_index(cls, code, cancel_index):
        if code not in cls.__cancel_and_buy_map:
            return None
        return cls.__cancel_and_buy_map[code].get(cancel_index)
    @classmethod
    def __compare_time(cls, time1, time2):
        result = int(time1.replace(":", "", 2)) - int(time2.replace(":", "", 2))
        return result
    # 计算时间的区间
    @classmethod
    def __compute_time_space_as_second(cls, cancel_time, cancel_time_unit):
        __time = int(cancel_time)
        if int(cancel_time) == 0:
            return 0, 0
        unit = int(cancel_time_unit)
        if unit == 0:
            # 秒
            return __time, (__time + 1)
        elif unit == 1:
            # 分钟
            return __time * 60, (__time + 1) * 60
        elif unit == 2:
            # 小时
            return __time * 3600, (__time + 1) * 3600
    # 根据买撤数据(与今日总的数据)计算买入数据
    @classmethod
    def get_buy_index_with_cancel_data(cls, code, cancel_data, local_today_num_operate_map):
        key = "{}-{}-{}".format(cancel_data["val"]["num"], "1", cancel_data["val"]["price"])
        cancel_datas = local_today_num_operate_map.get(key)
        try:
            cancel_datas.sort(key=lambda t: t["index"])
        except Exception as e:
            print("测试")
        for item in cancel_datas:
            cls.__get_buy_index_with_cancel_data(code, item, local_today_num_operate_map)
        return cls.__get_buy_index_with_cancel_data(code, cancel_data, local_today_num_operate_map)
    @classmethod
    def __get_buy_index_with_cancel_data(cls, code, cancel_data, local_today_num_operate_map):
        buy_index = cls.__get_buy_index(code, cancel_data["index"])
        if buy_index is not None:
            return buy_index
        min_space, max_space = cls.__compute_time_space_as_second(cancel_data["val"]["cancelTime"],
                                                                  cancel_data["val"]["cancelTimeUnit"])
        max_time = tool.trade_time_add_second(cancel_data["val"]["time"], 0 - min_space)
        min_time = tool.trade_time_add_second(cancel_data["val"]["time"], 0 - max_space)
        buy_datas = local_today_num_operate_map.get(
            "{}-{}-{}".format(cancel_data["val"]["num"], "0", cancel_data["val"]["price"]))
        if buy_datas is None:
            # 无数据
            return None
        # 匹配到的index
        suit_indexes = []
        for i in range(0, len(buy_datas)):
            data = buy_datas[i]
            if int(data["val"]["operateType"]) != 0:
                continue
            if int(data["val"]["num"]) != int(cancel_data["val"]["num"]):
                continue
                # 如果能找到对应的买撤就需要返回
            cancel_index = cls.__get_cancel_index(code, data["index"])
            if cancel_index is not None and cancel_index != cancel_data["index"]:
                continue
            if min_space == 0 and max_space == 0:
                if cls.__compare_time(data["val"]["time"], min_time) == 0:
                    suit_indexes.append(data["index"])
            elif cls.__compare_time(data["val"]["time"], min_time) > 0 and cls.__compare_time(data["val"]["time"],
                                                                                              max_time) <= 0:
                suit_indexes.append(data["index"])
        if len(suit_indexes) >= 2:
            # 多个匹配项,优先溯源离取消位置最近的数据
            suit_indexes.sort(key=lambda t: cancel_data["index"] - t)
        if len(suit_indexes) >= 1:
            cls.__save_map(code, suit_indexes[0], cancel_data["index"])
            return suit_indexes[0]
        return None
# if __name__ == "__main__":
#     code = "000925"
#     l2_data_util.load_l2_data(code)
#     total_datas = l2_data_util.local_today_datas.get(code)
#     local_today_num_operate_map = l2_data_util.local_today_num_operate_map.get(code)
#     cancel_index = 900
#     index = L2DataSourceUtils.get_buy_index_with_cancel_data(code, total_datas[cancel_index],
#                                                              l2_data_util.local_today_num_operate_map.get(code))
#     print("溯源位置:", index)
l2/l2_data_util.py
@@ -9,9 +9,12 @@
import logging
import time
import numpy
import constant
import gpcode_manager
from l2 import l2_data_log
import l2_data_util
from l2 import l2_data_log, l2_data_source_util
import log
from db import redis_manager
import tool
@@ -115,7 +118,7 @@
# 保存l2数据
def save_l2_data(code, datas, add_datas, randomKey=None):
def save_l2_data(code, datas, add_datas):
    redis = _redisManager.getRedis()
    # 只有有新曾数据才需要保存
    if len(add_datas) > 0:
@@ -378,47 +381,113 @@
class L2TradeQueueUtils(object):
    # 买入数据是否已撤
    @classmethod
    def __is_cancel(cls, code, data, total_datas, local_today_num_operate_map):
        val = data["val"]
        cancel_datas = local_today_num_operate_map.get(
            "{}-{}-{}".format(val["num"], "1", val["price"]))
        # 是否有买撤数据
        if cancel_datas:
            for cancel_data in cancel_datas:
                buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data(code, cancel_data,
                                                                                                 local_today_num_operate_map)
                if buy_index == data["index"]:
                    return True
        return False
    # 获取成交进度索引
    @classmethod
    def find_traded_progress_index(cls, buy_1_price, total_datas, local_today_num_operate_map, queueList, last_index,
    def find_traded_progress_index(cls, code, buy_1_price, total_datas, local_today_num_operate_map, queueList,
                                   last_index,
                                   latest_not_limit_up_time=None):
        def find_traded_progress_index_simple(queues):
            index_set = set()
            for num in queues:
                buy_datas = local_today_num_operate_map.get(
                    "{}-{}-{}".format(num, "0", buy_1_price_format))
                if buy_datas is not None and len(buy_datas) > 0:
                    for data in buy_datas:
                        # 在最近一次非涨停买1更新的时间之后才有效
                        if latest_not_limit_up_time is None or tool.trade_time_sub(data["val"]["time"],
                                                                                   latest_not_limit_up_time) >= 0:
                            if data["index"] >= last_index:
                                index_set.add(data["index"])
            index_list = list(index_set)
            index_list.sort()
            num_list = []
            new_index_list = []
            for index in index_list:
                for i in range(0, total_datas[index]["re"]):
                    num_list.append(total_datas[index]["val"]["num"])
                    new_index_list.append(index)
            index_list_str = ",".join(list(map(str, num_list)))
            queue_list_str = ",".join(list(map(str, queues)))
            find_index = index_list_str.find(queue_list_str)
            if find_index >= 0:
                temp_str = index_list_str[0:find_index]
                if temp_str.endswith(","):
                    temp_str = temp_str[:-1]
                if temp_str == "":
                    return new_index_list[0], new_index_list[0:len(queues)]
                start_index = len(temp_str.split(","))
                return new_index_list[start_index], new_index_list[start_index:start_index + len(queues)]
            return None, None
        # 判断匹配的位置是否可信
        def is_trust(indexes):
            cha = []
            for i in range(1, len(indexes)):
                cha.append(indexes[i] - indexes[i - 1] - 1)
            if len(cha) <= 1:
                return True
            # 标准差小于1
            std_result = numpy.std(cha)
            if std_result < 10:
                # 绝对可信
                return True
            for i in range(0, len(cha)):
                if abs(cha[i]) > 10:
                    # 有超过10 的需要判断两个相临数据间的未撤的买入数量
                    buy_count = 0
                    for index in range(indexes[i] + 1, indexes[i + 1] - 1):
                        if L2DataUtil.is_limit_up_price_buy(total_datas[index]["val"]):
                            if not cls.__is_cancel(code, total_datas[index], total_datas, local_today_num_operate_map):
                                buy_count += total_datas[index]["re"]
                    # 暂定3个误差范围
                    if buy_count >= 3:
                        return False
            return True
        if len(queueList) == 0:
            return None
        # last_index不能撤,如果已撤就清零
        if cls.__is_cancel(code, total_datas[last_index], total_datas, local_today_num_operate_map):
            last_index = 0
        # 补齐整数位5位
        buy_1_price_format = f"{buy_1_price}"
        while buy_1_price_format.find(".") < 4:
            buy_1_price_format = "0" + buy_1_price_format
        index_set = set()
        for num in queueList:
            buy_datas = local_today_num_operate_map.get(
                "{}-{}-{}".format(num, "0", buy_1_price_format))
            if buy_datas is not None and len(buy_datas) > 0:
                for data in buy_datas:
                    # 在最近一次非涨停买1更新的时间之后才有效
                    if latest_not_limit_up_time is None or tool.trade_time_sub(data["val"]["time"],
                                                                               latest_not_limit_up_time) >= 0:
                        if data["index"] >= last_index:
                            index_set.add(data["index"])
        index_list = list(index_set)
        index_list.sort()
        num_list = []
        new_index_list = []
        for index in index_list:
            for i in range(0, total_datas[index]["re"]):
                num_list.append(total_datas[index]["val"]["num"])
                new_index_list.append(index)
        index_list_str = ",".join(list(map(str, num_list)))
        queue_list_str = ",".join(list(map(str, queueList)))
        find_index = index_list_str.find(queue_list_str)
        if find_index >= 0:
            temp_str = index_list_str[0:find_index]
            if temp_str.endswith(","):
                temp_str = temp_str[:-1]
            if temp_str == "":
                return new_index_list[0]
            return new_index_list[len(temp_str.split(","))]
        # --------因子查找法(因子的窗口最大为:len(queueList) ,最小为:len(queueList)/2)---------
        max_win_len = len(queueList)
        min_win_len = len(queueList) // 2
        if max_win_len == min_win_len:
            min_win_len = max_win_len - 1
        for win_len in range(max_win_len, min_win_len, -1):
            # 窗口移动
            for i in range(0, max_win_len - win_len + 1):
                queues = queueList[i:i + win_len]
                f_start_index, f_indexs = find_traded_progress_index_simple(queues)
                if f_start_index and is_trust(f_indexs):
                    return f_start_index
        raise Exception("尚未找到成交进度")
if __name__ == "__main__":
    pass
    cha = [0, 2, 4]
    std_result = numpy.std(cha)
    print(std_result)
l2/safe_count_manager.py
@@ -4,6 +4,7 @@
# 下单L2的安全笔数管理
import json
from l2 import l2_data_source_util
from trade import l2_trade_factor
from db import redis_manager
import tool
@@ -127,7 +128,8 @@
                buy_num += int(val["num"]) * data["re"]
            elif L2DataUtil.is_limit_up_price_buy_cancel(val):
                # 获取买入信息
                buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data, local_today_num_operate_map)
                buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data(code, data,
                                                                                                 local_today_num_operate_map)
                if buy_index is not None:
                    if last_buy_single_index <= buy_index <= end_index:
                        cancel_num += int(val["num"]) * data["re"]
l2/transaction_progress.py
@@ -10,7 +10,7 @@
from db import redis_manager
import tool
import l2.l2_data_util
from log import logger_l2_trade_buy_queue
from log import logger_l2_trade_buy_queue, logger_l2_trade_buy_progress
class TradeBuyQueue:
@@ -55,7 +55,9 @@
    def __get_latest_not_limit_up_time(self, code):
        key = "latest_not_limit_up_time-{}".format(code)
        self.__getRedis().get(key)
        if not constant.TEST:
            return self.__getRedis().get(key)
        return None
    # 保存数据,返回保存数据的条数
    def save(self, code, limit_up_price, buy_1_price, buy_1_time, queues):
@@ -80,21 +82,17 @@
        return num_list
    # 保存成交索引
    def compute_traded_index(self, code, buy1_price, buyQueueBig):
    def compute_traded_index(self, code, buy1_price, buyQueueBig, exec_time=None):
        total_datas = l2.l2_data_util.local_today_datas.get(code)
        today_num_operate_map = l2.l2_data_util.local_today_num_operate_map.get(code)
        index = None
        for i in range(0, len(buyQueueBig)):
            buyQueueBigTemp = buyQueueBig[i:]
            if i > 0 and len(buyQueueBigTemp) < 2:
                # 已经执行过一次,且数据量小于2条就终止计算
                break
        if True:
            buyQueueBigTemp = buyQueueBig
            last_index, is_default = self.get_traded_index(code)
            c_last_index = 0
            if not is_default and last_index is not None:
                c_last_index = last_index
            latest_not_limit_up_time = self.__get_latest_not_limit_up_time(code)
            # 如果是3个/4个数据找不到就调整顺序
            fbuyQueueBigTempList = []
            if 3 <= len(buyQueueBigTemp) <= 4:
@@ -105,33 +103,41 @@
            fbuyQueueBigTempList.insert(0, buyQueueBigTemp)
            for temp in fbuyQueueBigTempList:
                try:
                    index = l2.l2_data_util.L2TradeQueueUtils.find_traded_progress_index(buy1_price, total_datas,
                    index = l2.l2_data_util.L2TradeQueueUtils.find_traded_progress_index(code, buy1_price, total_datas,
                                                                                         today_num_operate_map,
                                                                                         temp,
                                                                                         c_last_index,
                                                                                         self.__get_latest_not_limit_up_time(
                                                                                             code))
                                                                                         latest_not_limit_up_time
                                                                                         )
                    if index is not None:
                        break
                        # 判断位置是否大于执行位2s
                        if exec_time and tool.trade_time_sub(total_datas[index]["val"]["time"], exec_time) > 5:
                            # 位置是否大于执行位2s表示无效
                            index = None
                            continue
                        # 只能削减一半以下才能终止
                        if len(temp) * 2 < len(buyQueueBig):
                            index = None
                            break
                except:
                    pass
            if index is not None:
                break
        if index is not None:
            logger_l2_trade_buy_queue.info(f"确定交易进度:code-{code} index-{index}")
            # 保存成交进度
            # self.__save_buy_progress_index(code, index, False)
            return index
            if index is not None:
                logger_l2_trade_buy_queue.info(f"确定交易进度:code-{code} index-{index}")
                logger_l2_trade_buy_progress.info(
                    f"确定交易进度成功:code-{code}  index-{index} queues:{buyQueueBig}  last_index-{c_last_index} latest_not_limit_up_time-{latest_not_limit_up_time}  exec_time-{exec_time}")
                # 保存成交进度
                # self.__save_buy_progress_index(code, index, False)
                return index
            else:
                logger_l2_trade_buy_progress.warning(
                    f"确定交易进度失败:code-{code} queues:{buyQueueBig}  last_index-{c_last_index} latest_not_limit_up_time-{latest_not_limit_up_time} exec_time-{exec_time}")
        return index
    # 获取成交进度索引
    def get_traded_index(self, code):
        index, is_default = self.__get_buy_progress_index(code)
        return index, is_default
    def set_default_traded_index(self, code, index):
        self.__save_buy_progress_index(code, index, True)
    def set_traded_index(self, code, index):
        self.__save_buy_progress_index(code, index, False)
l2_trade_test.py
@@ -5,6 +5,7 @@
import json
import logging
import random
import time
import unittest
from copy import deepcopy
from unittest import mock
@@ -14,6 +15,7 @@
import log
import tool
from db import redis_manager
from l2 import l2_log, l2_data_manager, transaction_progress
from l2.safe_count_manager import BuyL2SafeCountManager
from l2.transaction_progress import TradeBuyQueue
from trade import trade_data_manager
@@ -41,12 +43,13 @@
        redis_info.delete(k)
    BuyL2SafeCountManager().clear_data(code)
    transaction_progress.TradeBuyQueue().set_traded_index(code, 0)
class VirtualTrade(unittest.TestCase):
    def __process_buy_queue(self,code, buy_queue, time_):
        if time_ == "09:32:37":
    def __process_buy_queue(self, code, buy_queue, time_):
        if time_ == "10:25:14":
            print("进入调试")
        limit_up_price = gpcode_manager.get_limit_up_price(code)
        buy_one_price = limit_up_price
@@ -58,22 +61,32 @@
                try:
                    buy_one_price_ = decimal.Decimal(round(float(buy_one_price), 2)).quantize(
                        decimal.Decimal("0.00"))
                    buy_progress_index = TradeBuyQueue().compute_traded_index(code, buy_one_price_, buy_queue_result_list)
                    # 获取执行位时间
                    exec_time = None
                    buy_single_index, buy_exec_index, compute_index, num, count, max_num_set = l2_data_manager.TradePointManager.get_buy_compute_start_data(
                        code)
                    if buy_exec_index:
                        try:
                            exec_time = l2.l2_data_util.local_today_datas.get(code)[buy_exec_index]["val"]["time"]
                        except:
                            pass
                    buy_progress_index = TradeBuyQueue().compute_traded_index(code, buy_one_price_,
                                                                              buy_queue_result_list, exec_time)
                    if buy_progress_index is not None:
                        l2.cancel_buy_strategy.HourCancelBigNumComputer.set_trade_progress(code, buy_progress_index,
                        l2.cancel_buy_strategy.HourCancelBigNumComputer.set_trade_progress(code, time_, buy_exec_index, buy_progress_index,
                                                                                           l2.l2_data_util.local_today_datas.get(
                                                                                               code),
                                                                                           l2.l2_data_util.local_today_num_operate_map.get(
                                                                                               code))
                    log.logger_l2_trade_buy_queue.info("获取成交位置成功: code-{} index-{}  数据-{}", code,
                                                       buy_progress_index,
                                                       json.dumps(buy_queue_result_list))
                        log.logger_l2_trade_buy_queue.info("获取成交位置成功: code-{} index-{}  数据-{}", code,
                                                           buy_progress_index,
                                                           json.dumps(buy_queue_result_list))
                except Exception as e:
                    pass
    # @unittest.skip("跳过此单元测试")
    @unittest.skip("跳过此单元测试")
    def test_trade(self):
        code = "002131"
        code = "002117"
        clear_trade_data(code)
        l2.l2_data_util.load_l2_data(code)
        total_datas = deepcopy(l2.l2_data_util.local_today_datas[code])
@@ -116,7 +129,8 @@
        trade_progress_list, buy_queues = log.get_trade_progress(code)
        for indexs in pos_list:
            l2.l2_data_manager_new.L2TradeDataProcessor.random_key[code] = mock.Mock(return_value=random.randint(0, 100000))
            l2_log.threadIds[code] = mock.Mock(
                return_value=random.randint(0, 100000))
            # 设置封单额,获取买1量
            for i in range(0, 100):
                time_ = total_datas[indexs[0]]["val"]["time"]
@@ -137,9 +151,26 @@
                    break
            print("----------------处理位置", indexs)
            if indexs[0] >= 224:
            if indexs[0] >= 661:
                print("进入调试")
            l2.l2_data_manager_new.L2TradeDataProcessor.process_add_datas(code, total_datas[indexs[0]:indexs[1] + 1], 0, 0)
            l2.l2_data_manager_new.L2TradeDataProcessor.process_add_datas(code, total_datas[indexs[0]:indexs[1] + 1], 0,
                                                                          0)
    @unittest.skip("跳过此单元测试")
    def test_h_cancel(self):
        code = "002870"
        l2.l2_data_util.load_l2_data(code)
        total_datas = l2.l2_data_util.local_today_datas.get(code)
        total_datas = total_datas[:899]
        l2.l2_data_util.local_today_datas[code] = total_datas
        l2.l2_data_util.load_num_operate_map(l2.l2_data_util.local_today_num_operate_map, code, total_datas, True)
        buy_progress_index = 523
        l2.cancel_buy_strategy.HourCancelBigNumComputer.set_trade_progress(code, buy_progress_index,
                                                                           l2.l2_data_util.local_today_datas.get(
                                                                               code),
                                                                           l2.l2_data_util.local_today_num_operate_map.get(
                                                                               code))
# class TestTrade(unittest.TestCase):
@@ -179,19 +210,23 @@
class TestTradedProgress(unittest.TestCase):
    @unittest.skip("跳过此单元测试")
    def test_get_progress(self):
        code = "002328"
        code = "000925"
        l2.l2_data_util.load_l2_data(code)
        # l2.l2_data_util.local_today_datas[code] = l2.l2_data_util.local_today_datas[code][:898]
        TradeBuyQueue.get_traded_index = mock.Mock(return_value=(10, False))
        buy_progress_index = TradeBuyQueue().compute_traded_index(code, "6.94", [1270, 9999, 1973])
        buy_progress_index = TradeBuyQueue().compute_traded_index(code, "9.76",
                                                                  [9999, 1506], "09:32:45")
        print("获取到交易进度:", buy_progress_index)
    @unittest.skip("跳过此单元测试")
    def test_sort(self):
        list = [1, 2, 3]
        result_list = itertools.permutations(list, 3)
        print(result_list)
        for r in result_list:
            print(r)
        _start = round(time.time() * 1000)
        count = 0
        for i in range(0, 100000):
            # if i > 30:
            count += 1
        print("耗时", round(time.time() * 1000) - _start)
if __name__ == "__main__":
log.py
@@ -77,6 +77,10 @@
                   filter=lambda record: record["extra"].get("name") == "l2_trade_buy_queue",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("l2", "l2_trade_buy_progress"),
                   filter=lambda record: record["extra"].get("name") == "l2_trade_buy_progress",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("juejin", "juejin_tick"),
                   filter=lambda record: record["extra"].get("name") == "juejin_tick",
                   rotation="00:00", compression="zip", enqueue=True)
@@ -106,6 +110,10 @@
                   filter=lambda record: record["extra"].get("name") == "day_volumn",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("ths", "buy_win_distibute"),
                   filter=lambda record: record["extra"].get("name") == "buy_win_distibute",
                   rotation="00:00", compression="zip", enqueue=True)
    def get_path(self, dir_name, log_name):
        return "D:/logs/gp/{}/{}".format(dir_name, log_name) + ".{time:YYYY-MM-DD}.log"
@@ -131,6 +139,7 @@
logger_l2_trade_buy = __mylogger.get_logger("l2_trade_buy")
logger_l2_trade_queue = __mylogger.get_logger("l2_trade_queue")
logger_l2_trade_buy_queue = __mylogger.get_logger("l2_trade_buy_queue")
logger_l2_trade_buy_progress =  __mylogger.get_logger("l2_trade_buy_progress")
logger_l2_big_data = __mylogger.get_logger("l2_big_data")
logger_juejin_tick = __mylogger.get_logger("juejin_tick")
@@ -143,6 +152,10 @@
logger_buy_1_volumn_record = __mylogger.get_logger("buy_1_volumn_record")
logger_day_volumn = __mylogger.get_logger("day_volumn")
logger_buy_win_distibute = __mylogger.get_logger("buy_win_distibute")
class LogUtil:
@@ -350,7 +363,7 @@
if __name__ == '__main__':
    # logger_l2_h_cancel.info("test")
    # logger_l2_process_time.info("test123")
    codes = ["002131", "003035", "002131"]
    codes = ["002946"]
    for code in codes:
        export_logs(code)
server.py
@@ -99,7 +99,7 @@
                        origin_start_time = round(time.time() * 1000)
                        __start_time = round(time.time() * 1000)
                        do_id = random.randint(0, 100000)
                        # level2盘口数据
                        day, client, channel, code, capture_time, process_time, datas, origin_datas = l2.l2_data_util.parseL2Data(
                            _str)
@@ -139,8 +139,7 @@
                                        __start_time = l2_data_log.l2_time(code,
                                                                           round(time.time() * 1000) - __start_time,
                                                                           "l2外部数据预处理耗时")
                                        l2_data_manager_new.L2TradeDataProcessor.process(code, datas, capture_timestamp,
                                                                                         do_id)
                                        l2_data_manager_new.L2TradeDataProcessor.process(code, datas, capture_timestamp)
                                        __start_time = l2_data_log.l2_time(code,
                                                                           round(time.time() * 1000) - __start_time,
                                                                           "l2数据有效处理外部耗时",
@@ -308,17 +307,28 @@
                            try:
                                buy_one_price_ = decimal.Decimal(round(float(buy_one_price), 2)).quantize(
                                    decimal.Decimal("0.00"))
                                buy_progress_index = self.tradeBuyQueue.compute_traded_index(code, buy_one_price_,
                                                                                          buy_queue_result_list)
                                # 获取执行位时间
                                exec_time = None
                                buy_single_index, buy_exec_index, compute_index, num, count, max_num_set = l2_data_manager.TradePointManager.get_buy_compute_start_data(
                                    code)
                                if buy_exec_index:
                                    try:
                                        exec_time = l2.l2_data_util.local_today_datas.get(code)[buy_exec_index]["val"]["time"]
                                    except:
                                        pass
                                buy_progress_index = self.tradeBuyQueue.compute_traded_index(code, buy_one_price_, buy_queue_result_list,exec_time)
                                if buy_progress_index is not None:
                                    HourCancelBigNumComputer.set_trade_progress(code, buy_progress_index,
                                    HourCancelBigNumComputer.set_trade_progress(code,buy_time,buy_exec_index, buy_progress_index,
                                                                                l2.l2_data_util.local_today_datas.get(
                                                                                    code),
                                                                                l2.l2_data_util.local_today_num_operate_map.get(
                                                                                    code))
                                logger_l2_trade_buy_queue.info("获取成交位置成功: code-{} index-{}  数据-{}", code,
                                                               buy_progress_index,
                                                               json.dumps(buy_queue_result_list))
                                    logger_l2_trade_buy_queue.info("获取成交位置成功: code-{} index-{}  数据-{}", code,
                                                                   buy_progress_index,
                                                                   json.dumps(buy_queue_result_list))
                                else:
                                    raise Exception("暂未获取到交易进度")
                            except Exception as e:
                                logging.exception(e)
                                print("买入队列", code, buy_queue_result_list)
@@ -364,7 +374,7 @@
                            volumn = item["volumn"]
                            volumnUnit = item["volumnUnit"]
                            code_volumn_manager.save_today_volumn(item["code"], volumn, volumnUnit)
                        juejin.accpt_prices(data)
                        juejin.accept_prices(data)
                elif type == 50:
                    data = data_process.parse(_str)["data"]
                    if data is not None:
third_data/hot_block.py
New file
@@ -0,0 +1,58 @@
"""
热门板块监听
"""
import logging
import time
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
def __parseData(driver):
    items = driver.find_element(by=By.ID, value="nuxt-layout-container").find_element(by=By.CLASS_NAME,
                                                                                      value="topgainer-content-left").find_elements(
        by=By.CLASS_NAME,
        value="topgainer-tag")
    data_list = []
    for item in items:
        print("----------------------")
        header = item.find_element(by=By.TAG_NAME, value="section").find_element(by=By.TAG_NAME, value="header")
        title = header.find_element(by=By.TAG_NAME, value="h3").text
        total_rate = header.find_element(by=By.TAG_NAME, value="span").text
        print(title, total_rate)
        contents = item.find_element(by=By.TAG_NAME, value="div").find_element(by=By.TAG_NAME,
                                                                               value="tbody").find_elements(
            by=By.TAG_NAME, value="tr")
        codes_list = []
        for content in contents:
            tds = content.find_elements(by=By.TAG_NAME, value="td")
            code = tds[0].find_elements(by=By.TAG_NAME, value="span")[1].text
            limit_up_info = tds[1].text
            price = tds[2].text
            rate = tds[3].text
            limit_up_time = tds[4].text
            huanshou = tds[5].text
            ltsz = tds[6].text
            codes_list.append((code, limit_up_info, price, rate, limit_up_time, huanshou, ltsz))
        data_list.append((title, total_rate, codes_list))
        print("----------------------")
    return data_list
# 获取热门板块
def get_hot_block(callback):
    # 先启动浏览器
    options = Options()
    options.add_argument("--disable-blink-features")
    options.add_argument("--disable-blink-features=AutomationControlled")
    driver = webdriver.Chrome(options=options)
    driver.get("https://xuangubao.cn/top-gainer")
    time.sleep(5)
    while True:
        time.sleep(3)
        try:
            result = __parseData(driver)
            callback(result)
        except Exception as e:
            logging.exception(e)
trade/trade_data_manager.py
@@ -222,6 +222,10 @@
            return float(rate)
        return None
    def get_current_rate(self, code):
        return self.__get_current_rate(code)
    def process_rate(self, code, rate, time_str):
        # 保存目前的代码涨幅
        self.__save_current_rate(code, rate)
trade/trade_gui.py
@@ -3,6 +3,7 @@
"""
import array
import logging
import threading
import time
import random
@@ -760,16 +761,23 @@
        # 获取可用的窗口
        win = cls.__get_available_win()
        if win is None:
            logger_buy_win_distibute.error(f"无可用窗口:{code}")
            raise Exception("窗口已经分配完毕,无可用窗口")
        # 保存窗口分配信息
        cls.__save_code_win(code, win)
        # 设置代码多试几次
        is_success = False
        for i in range(0, 3):
            THSGuiUtil.set_buy_window_code(cls.get_trade_win(win), code)
            time.sleep(0.5)
            code_name_win = cls.__get_code_name(win)
            if code_name == code_name_win:
                is_success = True
                break
        if is_success:
            logger_buy_win_distibute.info(f"新分配窗口成功:{code}-{win}")
        else:
            logger_buy_win_distibute.info(f"新分配窗口失败:{code}-{win}")
        return win
    # 删除代码窗口分配
@@ -793,6 +801,18 @@
            cls.__del_code_win(code)
            return None
        return win
    # 获取已分配的交易框信息
    @classmethod
    def get_distributed_code_wins(cls):
        key = "buywin_distribute-*"
        keys = cls.__get_redis().keys(key)
        results = []
        for k in keys:
            code = k.split("-")[-1]
            win = cls.__get_redis().get(k)
            results.append((code, win))
        return results
    # 获取代码名称
    @classmethod
@@ -846,8 +866,11 @@
                    if name_codes.get(code_name) != code:
                        cls.cancel_distribute_win_for_code(code)
                continue
            win = cls.distribute_win_for_code(code, gpcode_manager.get_code_name(code))
            print("分配的窗口:", win, THSGuiUtil.is_win_exist(win))
            try:
                win = cls.distribute_win_for_code(code, gpcode_manager.get_code_name(code))
                print("分配的窗口:", win, THSGuiUtil.is_win_exist(win))
            except Exception as e:
                logging.exception(e)
class GUITest:
trade/trade_manager.py
@@ -32,6 +32,8 @@
guiTrade = THSGuiTrade()
latest_trade_delegate_data = []
# 获取交易状态
def get_trade_state(code):
@@ -338,6 +340,7 @@
        _time = data["time"]
        if _time == "00:00:00":
            continue
        # 买入成功
        if code is not None and int(data["type"]) == 0:
            l2_trade_util.forbidden_trade(code)
            state = get_trade_state(code)
@@ -349,15 +352,24 @@
                l2_data_manager.TradePointManager.delete_buy_point(code)
                # 移除交易窗口分配
                THSBuyWinManagerNew.cancel_distribute_win_for_code(code)
                # TODO 完全成交后移除L2
            # 交易成功时间过去3s之后,且当前委托列表里面还有该代码数据就再次执行撤单
            if tool.trade_time_sub(tool.get_now_time_str(), _time) > 3:
                # 获取到当前是否委托
                for dd in latest_trade_delegate_data:
                    if dd["code"] == code:
                        logger_trade.info("{}交易成功触发,重复下单撤单".format(code))
                        start_cancel_buy(code, True)
# 处理委托成功数据
def process_trade_delegate_data(datas):
    if datas is None:
        return None
    latest_trade_delegate_data.clear()
    latest_trade_delegate_data.extend(datas)
    codes = []
    for data in datas:
        code = data["code"]
        if code is not None:
            codes.append(code)
utils/capture_util.py
New file
@@ -0,0 +1,43 @@
"""
截图工具
"""
import cv2
import numpy
import win32con
import win32gui
import win32ui
def window_capture(hwnd):
    rect = win32gui.GetWindowRect(hwnd)
    w = rect[2] - rect[0]
    h = rect[3] - rect[1]
    if w == 0 or h == 0:
        return None
    # 返回句柄窗口的设备环境,覆盖整个窗口,包括非客户区,标题栏,菜单,边框
    hWndDC = win32gui.GetWindowDC(hwnd)
    # 创建设备描述表
    mfcDC = win32ui.CreateDCFromHandle(hWndDC)
    # 创建内存设备描述表
    saveDC = mfcDC.CreateCompatibleDC()
    # 创建位图对象准备保存图片
    saveBitMap = win32ui.CreateBitmap()
    # 为bitmap开辟存储空间
    saveBitMap.CreateCompatibleBitmap(mfcDC, w, h)
    # 将截图保存到saveBitMap中
    saveDC.SelectObject(saveBitMap)
    # 保存bitmap到内存设备描述表
    saveDC.BitBlt((0, 0), (w, h), mfcDC, (0, 0), win32con.SRCCOPY)
    signedIntsArray = saveBitMap.GetBitmapBits(True)
    # 内存释放
    win32gui.DeleteObject(saveBitMap.GetHandle())
    saveDC.DeleteDC()
    mfcDC.DeleteDC()
    win32gui.ReleaseDC(hwnd, hWndDC)
    im_opencv = numpy.frombuffer(signedIntsArray, dtype='uint8')
    im_opencv.shape = (h, w, 4)
    cv2.cvtColor(im_opencv, cv2.COLOR_BGRA2RGB)
    return im_opencv