Administrator
2023-02-09 b74016d3ba3750cd27fee83675449da8f1da3926
建立取消单元测试机制/修改H撤(看成交位置相邻大单与总整体数据TOPN大单)
11个文件已修改
451 ■■■■■ 已修改文件
constant.py 19 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
data_export_util.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/cancel_buy_strategy.py 154 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_manager_new.py 35 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_util.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/transaction_progress.py 49 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_trade_test.py 151 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
log.py 21 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ocr/ocr_server.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
server.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/trade_gui.py 12 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
constant.py
@@ -10,21 +10,16 @@
BIG_MONEY_NUM = 7888
# S撤比例
S_CANCEL_FIRST_RATE = 0.79
S_CANCEL_SECOND_RATE = 0.69
S_CANCEL_THIRD_RATE = 0.59
S_CANCEL_FIRST_RATE = 0.69
S_CANCEL_SECOND_RATE = 0.59
S_CANCEL_THIRD_RATE = 0.49
# H撤比例
H_CANCEL_FIRST_RATE = 0.79
H_CANCEL_SECOND_RATE = 0.69
H_CANCEL_THIRD_RATE = 0.59
H_CANCEL_FIRST_RATE = 0.69
H_CANCEL_SECOND_RATE = 0.59
H_CANCEL_THIRD_RATE = 0.49
H_CANCEL_MIN_MONEY = 10000000
H_CANCEL_MIN_COUNT=8
H_CANCEL_MIN_COUNT = 8
# h撤大单笔数
H_CANCEL_BUY_COUNT = 40
# H撤单比例
H_CANCEL_RATE = 0.79
# L2监控的最低金额
L2_MIN_MONEY = 500000
data_export_util.py
@@ -193,6 +193,6 @@
if __name__ == "__main__":
    codes = ["002792"]
    codes = ["601890"]
    for code in codes:
        export_l2_excel(code)
l2/cancel_buy_strategy.py
@@ -15,11 +15,12 @@
import l2_data_util
from db import redis_manager
import tool
from l2.safe_count_manager import BuyL2SafeCountManager
from l2.transaction_progress import TradeBuyQueue
from trade import trade_data_manager, trade_queue_manager, l2_trade_factor
from l2 import l2_log, l2_data_log
from l2.l2_data_util import L2DataUtil, local_today_num_operate_map, local_today_datas
from log import logger_buy_1_volumn
from log import logger_buy_1_volumn, logger_l2_h_cancel, logger_l2_s_cancel
class SecondCancelBigNumComputer:
@@ -108,10 +109,13 @@
    @classmethod
    def need_cancel(cls, code, buy_single_index, buy_exec_index, start_index, end_index, total_data, threadId,
                    need_cancel=True):
        if start_index == 375:
            print("进入调试")
        # 只守护30s
        if tool.trade_time_sub(total_data[start_index]["val"]["time"], total_data[buy_exec_index]["val"]["time"]) > 30:
            return False, None
        l2_log.cancel_debug(threadId, code, "S级是否需要撤单,数据范围:{}-{} ", start_index, end_index)
        logger_l2_s_cancel.debug(f"code-{code} S级是否需要撤单,数据范围:{start_index}-{end_index}")
        if tool.trade_time_sub(total_data[end_index]["val"]["time"], total_data[buy_exec_index]["val"]["time"]) > 30:
            # 结束位置超过了执行位置30s,需要重新确认结束位置
@@ -197,8 +201,10 @@
                        if cancel_num / max(buy_num, 1) > cancel_rate_threshold:
                            return True, total_data[i]
        finally:
            l2_log.cancel_debug(threadId, code, "S级大单 范围:{}-{} 取消计算结果:{}/{},比例:{}", start_index, end_index, cancel_num,
                                buy_num, round(cancel_num / buy_num, 2))
                                buy_num, round(cancel_num / max(buy_num,1), 2))
            # 保存处理进度与数据
            cls.__save_compute_data(code, process_index, buy_num, cancel_num)
        return False, None
@@ -213,15 +219,16 @@
class HourCancelBigNumComputer:
    __redis_manager = redis_manager.RedisManager(0)
    __tradeBuyQueue = TradeBuyQueue()
    __buyL2SafeCountManager = BuyL2SafeCountManager()
    @classmethod
    def __getRedis(cls):
        return cls.__redis_manager.getRedis()
    @classmethod
    def __save_watch_index_set(cls, code, datas):
    def __save_watch_index_set(cls, code, datas, process_index, finish):
        key = f"h_cancel_watch_indexs-{code}"
        cls.__getRedis().setex(key, tool.get_expire(), json.dumps(list(datas)))
        cls.__getRedis().setex(key, tool.get_expire(), json.dumps((list(datas), process_index, finish)))
    # 保存成交进度
    @classmethod
@@ -229,9 +236,9 @@
        key = f"h_cancel_watch_indexs-{code}"
        val = cls.__getRedis().get(key)
        if val is None:
            return None
            return None, -1, False
        val = json.loads(val)
        return val
        return val[0], val[1], val[2]
    # 保存结束位置
    @classmethod
@@ -272,13 +279,13 @@
        # 守护30s以外的数据
        if tool.trade_time_sub(total_data[start_index]["val"]["time"], total_data[buy_exec_index]["val"]["time"]) <= 30:
            return False, None
        watch_indexs = cls.__get_watch_index_set(code)
        watch_indexs = cls.__get_watch_index_set(code)[0]
        watch_indexs_dict = {}
        # 监听的总数
        total_nums = 0
        for indexs in watch_indexs:
            watch_indexs_dict[indexs[0]] = indexs
            total_nums += total_data[indexs[0]]["val"]["num"] * indexs[1]
            total_nums += total_data[indexs[0]]["val"]["num"] * indexs[2]
        if watch_indexs is None:
            l2_log.cancel_debug(threadId, code, "H撤没获取到监听范围数据")
@@ -289,6 +296,13 @@
        l2_log.cancel_debug(threadId, code, "H级是否需要撤单,数据范围:{}-{} ", start_index, end_index)
        # 获取下单次数
        place_order_count = trade_data_manager.placeordercountmanager.get_place_order_count(code)
        cancel_rate_threshold = constant.H_CANCEL_FIRST_RATE
        if place_order_count <= 1:
            cancel_rate_threshold = constant.H_CANCEL_FIRST_RATE
        elif place_order_count <= 2:
            cancel_rate_threshold = constant.H_CANCEL_SECOND_RATE
        else:
            cancel_rate_threshold = constant.H_CANCEL_THIRD_RATE
        process_index = start_index
        try:
            for i in range(start_index, end_index + 1):
@@ -305,19 +319,14 @@
                                                                                         code))
                    if buy_index is not None and buy_index in watch_indexs_dict:
                        cancel_num += buy_data["re"] * int(buy_data["val"]["num"])
                        cancel_rate_threshold = constant.H_CANCEL_FIRST_RATE
                        if place_order_count <= 1:
                            cancel_rate_threshold = constant.H_CANCEL_FIRST_RATE
                        elif place_order_count <= 2:
                            cancel_rate_threshold = constant.H_CANCEL_SECOND_RATE
                        else:
                            cancel_rate_threshold = constant.H_CANCEL_THIRD_RATE
                        if cancel_num / total_nums > cancel_rate_threshold:
                            return True, total_data[i]
        finally:
            l2_log.cancel_debug(threadId, code, "H级撤单计算结果 范围:{}-{} 处理进度:{} 取消计算结果:{}/{}", start_index, end_index,
                                process_index, cancel_num,
                                total_nums)
            logger_l2_h_cancel.info(f"code-{code} H级撤单计算结果 范围:{start_index}-{end_index} 处理进度:{process_index} 目标比例:{cancel_rate_threshold} 取消计算结果:{cancel_num}/{total_nums}")
            # 保存处理进度与数据
            cls.__save_compute_data(code, process_index, cancel_num)
        return False, None
@@ -331,16 +340,66 @@
    # 设置成交进度
    @classmethod
    def set_trade_progress(cls, code, index, total_data, local_today_num_operate_map, is_default=False):
        l2_log.cancel_debug(0, code, "成交进度:{}", index)
        last_index, is_default = cls.__tradeBuyQueue.get_traded_index(code)
        logger_l2_h_cancel.info(f"code-{code} 成交进度:{index} 数据结束位置:"+str(total_data[-1]["index"]))
        last_index, last_is_default = cls.__tradeBuyQueue.get_traded_index(code)
        # 成交进度
        if is_default:
            cls.__tradeBuyQueue.set_default_traded_index(code, index)
        if last_index is None or last_index != index:
            cls.compute_watch_end_index(code, total_data, local_today_num_operate_map)
            cls.__compute_watch_indexs(code, total_data, local_today_num_operate_map)
        else:
            if last_index is None or last_index != index:
                cls.__tradeBuyQueue.set_traded_index(code, index)
                cls.__compute_watch_indexs(code, total_data, local_today_num_operate_map)
    # 涨停买是否撤单
    @classmethod
    def __get_limit_up_buy_no_canceled_count(cls, index, total_data, local_today_num_operate_map):
        data =None
        try:
            data = total_data[index]
        except:
            print("")
        val = data["val"]
        if L2DataUtil.is_limit_up_price_buy(val):
            # 判断当前买是否已经买撤
            cancel_datas = local_today_num_operate_map.get(
                "{}-{}-{}".format(val["num"], "1", val["price"]))
            canceled = False
            if cancel_datas:
                for cancel_data in cancel_datas:
                    buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(cancel_data,
                                                                                     local_today_num_operate_map)
                    if buy_index == index:
                        canceled = True
                        count = data["re"] - cancel_data["re"]
                        if count > 0:
                            return count
                        break
            if not canceled:
                count = data["re"]
                return count
        return 0
        # 计算排名前N的大单
    @classmethod
    def compute_watch_end_index(cls, code, total_data, local_today_num_operate_map):
    def __compute_top_n_num(cls, start_index, total_data, local_today_num_operate_map, count):
        # 找到还未撤的TOPN大单
        watch_set = set()
        for i in range(start_index, total_data[-1]["index"] + 1):
            not_cancel_count = cls.__get_limit_up_buy_no_canceled_count(i, total_data, local_today_num_operate_map)
            if not_cancel_count > 0:
                watch_set.add((i, total_data[i]["val"]["num"], not_cancel_count))
        # 针按照手数排序
        watch_list = list(watch_set)
        watch_list.sort(key=lambda tup: tup[1])
        watch_list.reverse()
        watch_list = watch_list[:count]
        watch_set = set(watch_list)
        return watch_set
    @classmethod
    def __compute_watch_indexs(cls, code, total_data, local_today_num_operate_map):
        trade_progress_index, is_default = cls.__tradeBuyQueue.get_traded_index(code)
        threshold_money, msg = l2_trade_factor.L2TradeFactorUtil.compute_m_value(code)
        if threshold_money < constant.H_CANCEL_MIN_MONEY:
@@ -352,46 +411,43 @@
        total_num = 0
        watch_set = set()
        total_count = 0
        # 暂时不需要使用
        process_index = -1
        finished = False
        safe_count = cls.__buyL2SafeCountManager.get_safe_count(code)
        for i in range(trade_progress_index, total_data[-1]["index"] + 1):
            data = total_data[i]
            val = data["val"]
            if L2DataUtil.is_limit_up_price_buy(val):
            process_index = i
            left_count = cls.__get_limit_up_buy_no_canceled_count(i, total_data, local_today_num_operate_map)
            if left_count > 0:
                data = total_data[i]
                val = data["val"]
                total_num += val["num"] * data["re"]
                # 判断当前买是否已经买撤
                cancel_datas = local_today_num_operate_map.get(
                    "{}-{}-{}".format(val["num"], "1", val["price"]))
                canceled = False
                if cancel_datas:
                    for cancel_data in cancel_datas:
                        buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(cancel_data,
                                                                                         local_today_num_operate_map)
                        if buy_index == i:
                            # 已经买撤
                            total_num -= buy_data["val"]["num"] * cancel_data["re"]
                            canceled = True
                            count = data["re"] - cancel_data["re"]
                            if count > 0:
                                total_count += count
                                watch_set.add((i, count))
                            break
                if not canceled:
                    count = data["re"]
                    total_count += count
                    watch_set.add((i, count))
                total_count += left_count
                watch_set.add((i, val["num"], left_count))
                # 判断是否达到阈值
                if total_num >= threshold_num and total_count >= constant.H_CANCEL_MIN_COUNT:
                if total_count >= safe_count:  # and total_num >= threshold_num
                    finished = True
                    # 最小8笔
                    l2_log.cancel_debug(0, code, "获取到H撤监听数据:{}", json.dumps(list(watch_set)))
                    l2_log.cancel_debug(0, code, "获取到H撤监听数据:{},计算截至位置:{}", json.dumps(list(watch_set)),
                                        total_data[-1]["index"])
                    break
        # 计算TOP N大单
        top_n_watch_set = cls.__compute_top_n_num(trade_progress_index, total_data, local_today_num_operate_map,
                                                  safe_count)
        logger_l2_h_cancel.info(f"code-{code}  H撤监控临单:{watch_set}")
        logger_l2_h_cancel.info(f"code-{code}  H撤监控较大单:{top_n_watch_set}")
        final_watch_set = set.union(watch_set, top_n_watch_set)
        final_watch_list = list(final_watch_set)
        final_watch_list.sort(key=lambda x: x[0])
        logger_l2_h_cancel.info(f"code-{code}  H撤最终监控大单:{final_watch_list}")
        # 保存计算范围
        cls.__save_watch_index_set(code, watch_set)
        cls.__save_watch_index_set(code, final_watch_set, process_index, finished)
        # 删除原来的计算数据
        cls.__del_compute_data(code)
    @classmethod
    def get_watch_indexs(cls, code):
        return cls.__get_watch_index_set(code)
        return cls.__get_watch_index_set(code)[0]
# --------------------------------封单额变化撤------------------------
l2/l2_data_manager_new.py
@@ -367,24 +367,16 @@
        # 是否需要撤销
        @dask.delayed
        def is_need_cancel(*args):
            f_cancel_data, f_cancel_msg = None, ""
            try:
                for i in range(0, len(args)):
                    _cancel_data, _cancel_msg = args[i]
                    if _cancel_data:
                        if not f_cancel_data:
                            f_cancel_data, f_cancel_msg = _cancel_data, _cancel_msg
                        else:
                            if _cancel_data["index"] < f_cancel_data["index"]:
                                # 取较早的撤销数据
                                f_cancel_data, f_cancel_msg = _cancel_data, _cancel_msg
                        return _cancel_data, _cancel_msg
            except Exception as e:
                logging.exception(e)
            finally:
                pass
            return f_cancel_data, f_cancel_msg
            return None, ""
        if start_index < 0:
            start_index = 0
@@ -398,25 +390,29 @@
        f1 = compute_safe_count()
        f2 = compute_m_big_num()
        f3 = buy_1_cancel()
        f4 = s_cancel()
        f5 = h_cancel()
        f3 = s_cancel()
        f4 = h_cancel()
        f5 = buy_1_cancel()
        f6 = sell_cancel()
        dask_result = is_need_cancel(f1, f2, f3, f4, f5, f6)
        cancel_data, cancel_msg = dask_result.compute()
        _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
                                          "已下单-撤单 判断是否需要撤单")
        if cancel_data:
            cls.debug(code, "触发撤单,撤单位置:{} ,撤单原因:{}", cancel_data["index"], cancel_msg)
            # 撤单
            if cls.cancel_buy(code, cancel_msg):
                _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
                                                  "已下单-撤单 耗时")
                # 撤单成功,继续计算下单
                cls.__process_not_order(code, cancel_data["index"] + 1, end_index, capture_time)
                _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
                                                  "处理剩余数据 耗时")
            else:
                # 撤单尚未成功
                pass
            _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
                                              "已下单-撤单+处理剩余数据")
        else:
            # 如果有虚拟下单需要真实下单
            unreal_buy_info = cls.unreal_buy_dict.get(code)
@@ -427,7 +423,7 @@
                cls.__buy(code, unreal_buy_info[1], local_today_datas[code][unreal_buy_info[0]],
                          unreal_buy_info[0])
                _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
                                                  "已下单-真实下单")
                                                  "已下单-真实下单 耗时")
    @classmethod
    def __buy(cls, code, capture_timestamp, last_data, last_data_index):
@@ -1129,10 +1125,13 @@
    # trade_manager.start_cancel_buy("000637")
    # t.sleep(10)
    # L2TradeDataProcessor.test()
    L2LimitUpMoneyStatisticUtil.verify_num("601958", 89178, "13:22:45")
    # L2LimitUpMoneyStatisticUtil.verify_num("601958", 89178, "13:22:45")
    # load_l2_data("600213")
    #
    # buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(local_today_datas["600213"][84],
    #                                                                  local_today_num_operate_map.get(
    #                                                                      "600213"))
    # print(buy_index, buy_data)
    dict_={"code":0}
    dict_.clear()
    print(dict_)
l2/l2_data_util.py
@@ -380,7 +380,7 @@
class L2TradeQueueUtils(object):
    # 获取成交进度索引
    @classmethod
    def find_traded_progress_index(cls, buy_1_price, total_datas, local_today_num_operate_map, queueList,last_index,
    def find_traded_progress_index(cls, buy_1_price, total_datas, local_today_num_operate_map, queueList, last_index,
                                   latest_not_limit_up_time=None):
        if len(queueList) == 0:
            return None
@@ -397,7 +397,7 @@
                    # 在最近一次非涨停买1更新的时间之后才有效
                    if latest_not_limit_up_time is None or tool.trade_time_sub(data["val"]["time"],
                                                                               latest_not_limit_up_time) >= 0:
                        if data["index"]>=last_index:
                        if data["index"] >= last_index:
                            index_set.add(data["index"])
        index_list = list(index_set)
        index_list.sort()
l2/transaction_progress.py
@@ -3,6 +3,7 @@
'''
# 买入队列
import itertools
import json
import constant
@@ -79,9 +80,10 @@
        return num_list
    # 保存成交索引
    def save_traded_index(self, code, buy1_price, buyQueueBig):
    def compute_traded_index(self, code, buy1_price, buyQueueBig):
        total_datas = l2.l2_data_util.local_today_datas.get(code)
        today_num_operate_map = l2.l2_data_util.local_today_num_operate_map.get(code)
        index = None
        for i in range(0, len(buyQueueBig)):
            buyQueueBigTemp = buyQueueBig[i:]
            if i > 0 and len(buyQueueBigTemp) < 2:
@@ -90,18 +92,38 @@
            last_index, is_default = self.get_traded_index(code)
            index = l2.l2_data_util.L2TradeQueueUtils.find_traded_progress_index(buy1_price, total_datas,
                                                                                 today_num_operate_map, buyQueueBigTemp,
                                                                                 (
                                                                                     last_index if last_index is not is_default else 0),
                                                                                 self.__get_latest_not_limit_up_time(
                                                                                     code))
            c_last_index = 0
            if not is_default and last_index is not None:
                c_last_index = last_index
            # 如果是3个/4个数据找不到就调整顺序
            fbuyQueueBigTempList = []
            if 3 <= len(buyQueueBigTemp) <= 4:
                buyQueueBigTempList = itertools.permutations(buyQueueBigTemp, len(buyQueueBigTemp))
                for tempQueue in buyQueueBigTempList:
                    if list(tempQueue) != buyQueueBigTemp:
                        fbuyQueueBigTempList.append(tempQueue)
            fbuyQueueBigTempList.insert(0, buyQueueBigTemp)
            for temp in fbuyQueueBigTempList:
                try:
                    index = l2.l2_data_util.L2TradeQueueUtils.find_traded_progress_index(buy1_price, total_datas,
                                                                                         today_num_operate_map,
                                                                                         temp,
                                                                                         c_last_index,
                                                                                         self.__get_latest_not_limit_up_time(
                                                                                             code))
                    if index is not None:
                        break
                except:
                    pass
            if index is not None:
                logger_l2_trade_buy_queue.info(f"确定交易进度:code-{code} index-{index}")
                # 保存成交进度
                self.__save_buy_progress_index(code, index, False)
                return index
        return None
                break
        if index is not None:
            logger_l2_trade_buy_queue.info(f"确定交易进度:code-{code} index-{index}")
            # 保存成交进度
            # self.__save_buy_progress_index(code, index, False)
            return index
        return index
    # 获取成交进度索引
    def get_traded_index(self, code):
@@ -111,6 +133,9 @@
    def set_default_traded_index(self, code, index):
        self.__save_buy_progress_index(code, index, True)
    def set_traded_index(self, code, index):
        self.__save_buy_progress_index(code, index, False)
if __name__ == '__main':
    pass
l2_trade_test.py
@@ -1,6 +1,7 @@
# 交易测试
# 清除交易数据
import decimal
import itertools
import json
import logging
import random
@@ -41,9 +42,10 @@
    BuyL2SafeCountManager().clear_data(code)
class VirtualTrade(unittest.TestCase):
    def __process_buy_queue(code, buy_queue, time_):
    def __process_buy_queue(self,code, buy_queue, time_):
        if time_ == "09:32:37":
            print("进入调试")
        limit_up_price = gpcode_manager.get_limit_up_price(code)
@@ -56,7 +58,7 @@
                try:
                    buy_one_price_ = decimal.Decimal(round(float(buy_one_price), 2)).quantize(
                        decimal.Decimal("0.00"))
                    buy_progress_index = TradeBuyQueue().save_traded_index(code, buy_one_price_, buy_queue_result_list)
                    buy_progress_index = TradeBuyQueue().compute_traded_index(code, buy_one_price_, buy_queue_result_list)
                    if buy_progress_index is not None:
                        l2.cancel_buy_strategy.HourCancelBigNumComputer.set_trade_progress(code, buy_progress_index,
                                                                                           l2.l2_data_util.local_today_datas.get(
@@ -69,70 +71,75 @@
                except Exception as e:
                    pass
    code = "002792"
    clear_trade_data(code)
    l2.l2_data_util.load_l2_data(code)
    total_datas = deepcopy(l2.l2_data_util.local_today_datas[code])
    if total_datas[0]["index"] > 0:
        # 拼接数据
        for i in range(0, total_datas[0]["index"]):
            data = total_datas[0].copy()
            data["index"] = i
            total_datas.insert(i, data)
    @unittest.skip("跳过此单元测试")
    def test_trade(self):
        code = "002328"
        clear_trade_data(code)
        l2.l2_data_util.load_l2_data(code)
        total_datas = deepcopy(l2.l2_data_util.local_today_datas[code])
        if total_datas[0]["index"] > 0:
            # 拼接数据
            for i in range(0, total_datas[0]["index"]):
                data = total_datas[0].copy()
                data["index"] = i
                total_datas.insert(i, data)
    pos_list = log.get_l2_process_position(code)
    if pos_list[0][0] > 0:
        pos_list.insert(0, (0, pos_list[0][0] - 1))
    del pos_list[-1]
    if pos_list[-1][1] < total_datas[-1]["index"]:
        # 剩下的数据根据秒来分
        start_index = -1
        for i in range(pos_list[-1][1] + 1, total_datas[-1]["index"] + 1):
            if total_datas[i]["val"]["time"] != total_datas[i - 1]["val"]["time"]:
                if start_index < 0:
                    start_index = i
                else:
                    pos_list.append((start_index, i - 1))
                    start_index = i
    if pos_list[-1][1] < total_datas[-1]["index"]:
        pos_list.append((pos_list[-1][1] + 1, total_datas[-1]["index"]))
    l2.l2_data_util.local_today_datas[code].clear()
    print("id:", id(l2.l2_data_util.local_today_datas))
    # l2_trade_factor.L2TradeFactorUtil.get_safe_buy_count = mock.Mock(return_value=12)
    # pos_list.insert(41,(225,306))
    # pos_list.insert(63, (345, 423))
    # pos_list.insert(66, (440, 447))
    # pos_list.insert(75, (472, 488))
    # pos_list.insert(84, (516, 532))
        pos_list = log.get_l2_process_position(code)
        pos_list.insert(108,(375,448))
        if pos_list[0][0] > 0:
            pos_list.insert(0, (0, pos_list[0][0] - 1))
        del pos_list[-1]
        if pos_list[-1][1] < total_datas[-1]["index"]:
            # 剩下的数据根据秒来分
            start_index = -1
            for i in range(pos_list[-1][1] + 1, total_datas[-1]["index"] + 1):
                if total_datas[i]["val"]["time"] != total_datas[i - 1]["val"]["time"]:
                    if start_index < 0:
                        start_index = i
                    else:
                        pos_list.append((start_index, i - 1))
                        start_index = i
        if pos_list[-1][1] < total_datas[-1]["index"]:
            pos_list.append((pos_list[-1][1] + 1, total_datas[-1]["index"]))
        l2.l2_data_util.local_today_datas[code].clear()
        l2.l2_data_util.local_today_num_operate_map[code].clear()
    # 获取交易进度
    trade_progress_list, buy_queues = log.get_trade_progress(code)
        print("id:", id(l2.l2_data_util.local_today_datas))
        # l2_trade_factor.L2TradeFactorUtil.get_safe_buy_count = mock.Mock(return_value=12)
        # pos_list.insert(41,(225,306))
        # pos_list.insert(63, (345, 423))
        # pos_list.insert(66, (440, 447))
        # pos_list.insert(75, (472, 488))
        # pos_list.insert(84, (516, 532))
    for indexs in pos_list:
        l2.l2_data_manager_new.L2TradeDataProcessor.random_key[code] = mock.Mock(return_value=random.randint(0, 100000))
        # 设置封单额,获取买1量
        for i in range(0, 100):
            time_ = total_datas[indexs[0]]["val"]["time"]
            time_s = tool.get_time_as_second(time_) - i - 1
            volumn = THSBuy1VolumnManager().get_buy_1_volumn(code, tool.time_seconds_format(time_s))
            if volumn is not None:
                l2.cancel_buy_strategy.L2LimitUpMoneyStatisticUtil.verify_num(code, int(volumn),
                                                                              tool.time_seconds_format(time_s))
                break
        # 设置委买队列
        for i in range(0, len(buy_queues)):
            if tool.trade_time_sub(buy_queues[i][1], total_datas[indexs[0]]["val"]["time"]) > 0:
                print("委买队列", buy_queues[i])
                try:
                    __process_buy_queue(code, buy_queues[i - 1][0], buy_queues[i - 1][1])
                except:
                    pass
                break
        # 获取交易进度
        trade_progress_list, buy_queues = log.get_trade_progress(code)
        print("----------------处理位置", indexs)
        if indexs[0] >= 224:
            print("进入调试")
        l2.l2_data_manager_new.L2TradeDataProcessor.process_add_datas(code, total_datas[indexs[0]:indexs[1] + 1], 0, 0)
        for indexs in pos_list:
            l2.l2_data_manager_new.L2TradeDataProcessor.random_key[code] = mock.Mock(return_value=random.randint(0, 100000))
            # 设置封单额,获取买1量
            for i in range(0, 100):
                time_ = total_datas[indexs[0]]["val"]["time"]
                time_s = tool.get_time_as_second(time_) - i - 1
                volumn = THSBuy1VolumnManager().get_buy_1_volumn(code, tool.time_seconds_format(time_s))
                if volumn is not None:
                    l2.cancel_buy_strategy.L2LimitUpMoneyStatisticUtil.verify_num(code, int(volumn),
                                                                                  tool.time_seconds_format(time_s))
                    break
            # 设置委买队列
            for i in range(0, len(buy_queues)):
                if tool.trade_time_sub(buy_queues[i][1], total_datas[indexs[0]]["val"]["time"]) > 0:
                    print("委买队列", buy_queues[i])
                    try:
                        self.__process_buy_queue(code, buy_queues[i - 1][0], buy_queues[i - 1][1])
                    except:
                        pass
                    break
            print("----------------处理位置", indexs)
            if indexs[0] >= 224:
                print("进入调试")
            l2.l2_data_manager_new.L2TradeDataProcessor.process_add_datas(code, total_datas[indexs[0]:indexs[1] + 1], 0, 0)
# class TestTrade(unittest.TestCase):
@@ -167,5 +174,25 @@
#     # TradeBuyQueue().save_traded_index(code, "6.94", [1511, 888, 796])
#
class TestTradedProgress(unittest.TestCase):
    @unittest.skip("跳过此单元测试")
    def test_get_progress(self):
        code = "002328"
        l2.l2_data_util.load_l2_data(code)
        TradeBuyQueue.get_traded_index = mock.Mock(return_value=(10, False))
        buy_progress_index = TradeBuyQueue().compute_traded_index(code, "6.94", [1270, 9999, 1973])
    @unittest.skip("跳过此单元测试")
    def test_sort(self):
        list = [1, 2, 3]
        result_list = itertools.permutations(list, 3)
        print(result_list)
        for r in result_list:
            print(r)
if __name__ == "__main__":
    unittest.main()
log.py
@@ -52,6 +52,15 @@
        logger.add(self.get_path("l2", "l2_trade_cancel"),
                   filter=lambda record: record["extra"].get("name") == "l2_trade_cancel",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("l2", "cancel/s_cancel"),
                   filter=lambda record: record["extra"].get("name") == "s_cancel",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("l2", "cancel/h_cancel"),
                   filter=lambda record: record["extra"].get("name") == "h_cancel",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("l2", "l2_trade_buy"),
                   filter=lambda record: record["extra"].get("name") == "l2_trade_buy",
                   rotation="00:00", compression="zip", enqueue=True)
@@ -117,6 +126,8 @@
logger_l2_trade = __mylogger.get_logger("l2_trade")
logger_l2_trade_cancel = __mylogger.get_logger("l2_trade_cancel")
logger_l2_s_cancel = __mylogger.get_logger("s_cancel")
logger_l2_h_cancel = __mylogger.get_logger("h_cancel")
logger_l2_trade_buy = __mylogger.get_logger("l2_trade_buy")
logger_l2_trade_queue = __mylogger.get_logger("l2_trade_queue")
logger_l2_trade_buy_queue = __mylogger.get_logger("l2_trade_buy_queue")
@@ -311,8 +322,11 @@
            if line.find("获取成交位置成功: code-{}".format(code)) < 0:
                continue
            index = int(line.split("index-")[1].split(" ")[0])
            index_list.append((index, time_))
            try:
               index = int(line.split("index-")[1].split(" ")[0])
               index_list.append((index, time_))
            except:
               pass
    return index_list, buy_queues
@@ -334,8 +348,9 @@
if __name__ == '__main__':
    # logger_l2_h_cancel.info("test")
    # logger_l2_process_time.info("test123")
    codes = ["002963"]
    codes = ["002328"]
    for code in codes:
        export_logs(code)
ocr/ocr_server.py
@@ -49,7 +49,7 @@
            except Exception as e:
                break
        _str = str(data, encoding="gbk")
        print("OCR SERVER 内容:", _str[0:20], "......", _str[-150:-1])
        # print("OCR SERVER 内容:", _str[0:20], "......", _str[-150:-1])
        return_str = "OK"
        try:
            data = ""
server.py
@@ -306,7 +306,7 @@
                            try:
                                buy_one_price_ = decimal.Decimal(round(float(buy_one_price), 2)).quantize(
                                    decimal.Decimal("0.00"))
                                buy_progress_index = self.tradeBuyQueue.save_traded_index(code, buy_one_price_,
                                buy_progress_index = self.tradeBuyQueue.compute_traded_index(code, buy_one_price_,
                                                                                          buy_queue_result_list)
                                if buy_progress_index is not None:
                                    HourCancelBigNumComputer.set_trade_progress(code, buy_progress_index,
trade/trade_gui.py
@@ -747,7 +747,7 @@
    # 为代码分配窗口
    @classmethod
    def distribute_win_for_code(cls, code):
    def distribute_win_for_code(cls, code, code_name):
        # 获取是否已经分配
        win = cls.__get_code_win(code)
        if win is not None:
@@ -763,7 +763,13 @@
            raise Exception("窗口已经分配完毕,无可用窗口")
        # 保存窗口分配信息
        cls.__save_code_win(code, win)
        THSGuiUtil.set_buy_window_code(cls.get_trade_win(win), code)
        # 设置代码多试几次
        for i in range(0, 3):
            THSGuiUtil.set_buy_window_code(cls.get_trade_win(win), code)
            time.sleep(0.5)
            code_name_win = cls.__get_code_name(win)
            if code_name == code_name_win:
                break
        return win
    # 删除代码窗口分配
@@ -840,7 +846,7 @@
                    if name_codes.get(code_name) != code:
                        cls.cancel_distribute_win_for_code(code)
                continue
            win = cls.distribute_win_for_code(code)
            win = cls.distribute_win_for_code(code, gpcode_manager.get_code_name(code))
            print("分配的窗口:", win, THSGuiUtil.is_win_exist(win))