Administrator
2023-01-13 48dcb788e821d40be37fa05c9789b751a6e2a69b
H撤大单修改
11个文件已修改
549 ■■■■■ 已修改文件
constant.py 9 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
data_export_util.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_code_operate.py 4 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_data_manager.py 31 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_data_manager_new.py 265 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_trade_test.py 84 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
log.py 17 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade_data_manager.py 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade_gui.py 75 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade_manager.py 31 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade_queue_manager.py 26 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
constant.py
@@ -1,4 +1,11 @@
# 是否为测试
TEST = False
TEST = True
# 水下捞累计连续水下时间最小值
UNDER_WATER_PRICE_TIME_AS_SECONDS = 600
# 大单金额(单位为百)
BIG_MONEY_AMOUNT = 29900
# 大单笔数
BIG_MONEY_NUM = 7888
#h撤大单笔数
H_CANCEL_BUY_COUNT = 40
data_export_util.py
@@ -193,6 +193,6 @@
if __name__ == "__main__":
    codes = ["002441"]
    codes = ["002043"]
    for code in codes:
        export_l2_excel(code)
l2_code_operate.py
@@ -174,8 +174,6 @@
                        client = data["client"]
                        pos = data["pos"]
                        L2CodeOperate.setGPCode(client, pos, "")
                else:
                    time.sleep(1)
            except Exception as e:
@@ -303,7 +301,7 @@
# 批量设置代码
def betch_set_client_codes(client_id,codes_info):
def betch_set_client_codes(client_id, codes_info):
    # 获取涨幅前16位代码
    return L2CodeOperate.betchSetGPCode(client_id, codes_info)
l2_data_manager.py
@@ -25,7 +25,7 @@
import ths_industry_util
import tool
import trade_manager
from log import logger_l2_trade, logger_l2_trade_cancel, logger_l2_trade_buy, logger_l2_process,logger_l2_data
from log import logger_l2_trade, logger_l2_trade_cancel, logger_l2_trade_buy, logger_l2_process, logger_l2_data
import trade_data_manager
import limit_up_time_manager
@@ -78,9 +78,9 @@
        _key = "buy_compute_index_info-{}".format(code)
        _data_json = redis.get(_key)
        if _data_json is None:
            return None, None, None, 0, 0
            return None, None, None, 0, 0, -1
        _data = json.loads(_data_json)
        return _data[0], _data[1], _data[2], _data[3], _data[4]
        return _data[0], _data[1], _data[2], _data[3], _data[4], _data[5]
    # 设置买入点的值
    # buy_single_index 买入信号位
@@ -88,16 +88,18 @@
    # compute_index 计算位置
    # nums 累计纯买额
    @staticmethod
    def set_buy_compute_start_data(code, buy_single_index, buy_exec_index, compute_index, nums, count):
    def set_buy_compute_start_data(code, buy_single_index, buy_exec_index, compute_index, nums, count, max_num_index):
        redis = TradePointManager.__get_redis()
        expire = tool.get_expire()
        _key = "buy_compute_index_info-{}".format(code)
        if buy_single_index is not None:
            redis.setex(_key, expire, json.dumps((buy_single_index, buy_exec_index, compute_index, nums, count)))
            redis.setex(_key, expire,
                        json.dumps((buy_single_index, buy_exec_index, compute_index, nums, count, max_num_index)))
        else:
            _buy_single_index, _buy_exec_index, _compute_index, _nums, _count = TradePointManager.get_buy_compute_start_data(
            _buy_single_index, _buy_exec_index, _compute_index, _nums, _count, _max_num_index = TradePointManager.get_buy_compute_start_data(
                code)
            redis.setex(_key, expire, json.dumps((_buy_single_index, buy_exec_index, compute_index, nums, count)))
            redis.setex(_key, expire,
                        json.dumps((_buy_single_index, buy_exec_index, compute_index, nums, count, max_num_index)))
    # 获取撤买入开始计算的信息
    # 返回数据的内容为:撤销点索引 撤买纯买额 计算的数据索引
@@ -191,7 +193,7 @@
        datas = log.load_l2_from_log()
        datas = datas.get(code)
        if datas is None:
            datas= []
            datas = []
        local_today_datas[code] = datas
        # 从数据库加载
@@ -277,7 +279,7 @@
        local_latest_datas[code] = datas
        __set_l2_data_latest_count(code, len(datas))
        try:
            logger_l2_data.info("{}-{}",code,add_datas)
            logger_l2_data.info("{}-{}", code, add_datas)
        except Exception as e:
            logging.exception(e)
        saveL2Data(code, add_datas)
@@ -796,7 +798,7 @@
        industry, codes = ths_industry_util.get_same_industry_codes(code, gpcode_manager.get_gp_list())
        if industry is None:
            return True, "没有获取到行业"
        codes_index = industry_codes_sort.sort_codes(codes,code)
        codes_index = industry_codes_sort.sort_codes(codes, code)
        if codes_index is not None and codes_index.get(code) is not None and codes_index.get(code) > 1:
            return False, "同一板块中老三,老四,...不能买"
@@ -941,8 +943,9 @@
    # 获取下单起始信号
    @classmethod
    def __get_order_begin_pos(cls, code):
        buy_single_index, buy_exec_index, compute_index, num = TradePointManager.get_buy_compute_start_data(code)
        return buy_single_index, buy_exec_index, compute_index, num
        buy_single_index, buy_exec_index, compute_index, num, max_num_index = TradePointManager.get_buy_compute_start_data(
            code)
        return buy_single_index, buy_exec_index, compute_index, num, max_num_index
    @classmethod
    def __save_order_begin_data(self, code, buy_single_index, buy_exec_index, compute_index, num):
@@ -1637,7 +1640,8 @@
    def process_cancel_with_big_num(cls, code, start_index, end_index):
        total_data = local_today_datas[code]
        # 如果无下单信号就无需处理
        buy_single_index, buy_exec_index, compute_index, nums = TradePointManager.get_buy_compute_start_data(code)
        buy_single_index, buy_exec_index, compute_index, nums, max_num_index = TradePointManager.get_buy_compute_start_data(
            code)
        if buy_single_index is None or buy_exec_index is None or buy_exec_index < 0:
            return False, None
        # 判断是否有大单记录
@@ -1926,7 +1930,6 @@
        if process_index is None:
            process_index = end_index
        cls.__save_recod(code, process_index, count)
def __get_time_second(time_str):
l2_data_manager_new.py
@@ -301,7 +301,8 @@
        if end_index < start_index:
            return
        # 获取买入信号起始点
        buy_single_index, buy_exec_index, buy_compute_index, num, count = cls.__get_order_begin_pos(code)
        buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_index = cls.__get_order_begin_pos(
            code)
        # 撤单计算,只看买1
        cancel_data, cancel_msg = L2LimitUpMoneyStatisticUtil.process_data(code, start_index, end_index,
@@ -389,7 +390,8 @@
                trade_data_manager.placeordercountmanager.place_order(code)
                # 获取买入位置信息
                try:
                    buy_single_index, buy_exec_index, buy_compute_index, num, count = cls.__get_order_begin_pos(code)
                    buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_index = cls.__get_order_begin_pos(
                        code)
                    SecondAverageBigNumComputer.place_order_success(code, buy_single_index, buy_exec_index)
                    AverageBigNumComputer.place_order_success(code, buy_single_index, buy_exec_index)
                    LongAverageBigNumComputer.place_order_success(code, buy_single_index, buy_exec_index)
@@ -454,7 +456,8 @@
            if sell1_time is not None and sell1_volumn > 0:
                # 获取执行位信息
                total_datas = local_today_datas[code]
                buy_single_index, buy_exec_index, buy_compute_index, num, count = cls.__get_order_begin_pos(code)
                buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_index = cls.__get_order_begin_pos(
                    code)
                buy_nums = num
                for i in range(buy_exec_index + 1, total_datas[-1]["index"] + 1):
                    _val = total_datas[i]["val"]
@@ -558,7 +561,8 @@
        # 是否是交易队列触发
        if source == "trade_queue":
            # 交易队列触发的需要下单后5s
            buy_single_index, buy_exec_index, buy_compute_index, num, count = cls.__get_order_begin_pos(code)
            buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_index = cls.__get_order_begin_pos(
                code)
            total_datas = local_today_datas[code]
            if buy_exec_index is not None and buy_exec_index > 0:
                now_time_str = datetime.datetime.now().strftime("%H:%M:%S")
@@ -603,7 +607,8 @@
        _start_time = round(t.time() * 1000)
        total_datas = local_today_datas[code]
        # 获取买入信号计算起始位置
        buy_single_index, buy_exec_index, buy_compute_index, num, count = cls.__get_order_begin_pos(code)
        buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_index = cls.__get_order_begin_pos(
            code)
        # 是否为新获取到的位置
        if buy_single_index is None:
@@ -619,7 +624,8 @@
            if has_single:
                num = 0
                count = 0
                cls.debug(code, "获取到买入信号起始点:{}  数据:{}", buy_single_index, total_datas[buy_single_index])
                cls.debug(code, "获取到买入信号起始点:{} ,计算范围:{}-{} ,数据:{}", buy_single_index, compute_start_index,
                          compute_end_index, total_datas[buy_single_index])
                # 如果是今天第一次有下单开始信号,需要设置大单起始点
                cls.l2BigNumForMProcessor.set_begin_pos(code, buy_single_index)
@@ -637,12 +643,16 @@
        threshold_money, msg = cls.__get_threshmoney(code)
        # 买入纯买额统计
        compute_index, buy_nums, buy_count, rebegin_buy_pos = cls.__sum_buy_num_for_order_3(code, max(buy_single_index,
                                                                                                      compute_start_index),
                                                                                            compute_end_index, num,
                                                                                            count, threshold_money,
                                                                                            buy_single_index,
                                                                                            capture_time)
        compute_index, buy_nums, buy_count, rebegin_buy_pos, max_num_index_new = cls.__sum_buy_num_for_order_3(code,
                                                                                                               max(
                                                                                                                   buy_single_index,
                                                                                                                   compute_start_index),
                                                                                                               compute_end_index,
                                                                                                               num,
                                                                                                               count,
                                                                                                               threshold_money,
                                                                                                               buy_single_index,
                                                                                                               max_num_index)
        _start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - _start_time, "纯买额统计时间")
        cls.debug(code, "m值-{} m值因子-{}", threshold_money, msg)
@@ -658,7 +668,8 @@
                      buy_count,
                      total_datas[compute_index])
            # 记录买入信号位置
            cls.__save_order_begin_data(code, buy_single_index, compute_index, compute_index, buy_nums, buy_count)
            cls.__save_order_begin_data(code, buy_single_index, compute_index, compute_index, buy_nums, buy_count,
                                        max_num_index_new)
            # 如果是今天第一次有下单执行信号,涨停时间(买入执行位时间)
            limit_up_time_manager.save_limit_up_time(code, total_datas[compute_index]["val"]["time"])
            # 虚拟下单
@@ -702,7 +713,8 @@
        else:
            # 未达到下单条件,保存纯买额,设置纯买额
            # 记录买入信号位置
            cls.__save_order_begin_data(code, buy_single_index, -1, compute_end_index, buy_nums, buy_count)
            cls.__save_order_begin_data(code, buy_single_index, -1, compute_end_index, buy_nums, buy_count,
                                        max_num_index_new)
            print("保存大单时间", round((t.time() - _start_time) * 1000))
            _start_time = t.time()
        pass
@@ -710,14 +722,15 @@
    # 获取下单起始信号
    @classmethod
    def __get_order_begin_pos(cls, code):
        buy_single_index, buy_exec_index, compute_index, num, count = l2_data_manager.TradePointManager.get_buy_compute_start_data(
        buy_single_index, buy_exec_index, compute_index, num, count, max_num_index = l2_data_manager.TradePointManager.get_buy_compute_start_data(
            code)
        return buy_single_index, buy_exec_index, compute_index, num, count
        return buy_single_index, buy_exec_index, compute_index, num, count, max_num_index
    # 保存下单起始信号
    @classmethod
    def __save_order_begin_data(self, code, buy_single_index, buy_exec_index, compute_index, num, count):
        TradePointManager.set_buy_compute_start_data(code, buy_single_index, buy_exec_index, compute_index, num, count)
    def __save_order_begin_data(self, code, buy_single_index, buy_exec_index, compute_index, num, count, max_num_index):
        TradePointManager.set_buy_compute_start_data(code, buy_single_index, buy_exec_index, compute_index, num, count,
                                                     max_num_index)
    # 计算下单起始信号
    # compute_data_count 用于计算的l2数据数量
@@ -770,9 +783,9 @@
    # 是否为万手哥
    @classmethod
    def __is_big_money(cls, limit_up_price, val):
        if int(val["num"]) >= 7888:
        if int(val["num"]) >= constant.BIG_MONEY_NUM:
            return True
        if int(val["num"]) * limit_up_price >= 29900:
        if int(val["num"]) * limit_up_price >= constant.BIG_MONEY_AMOUNT:
            return True
        return False
@@ -790,7 +803,7 @@
    # 统计买入净买量,不计算在买入信号之前的买撤单
    @classmethod
    def __sum_buy_num_for_order_3(cls, code, compute_start_index, compute_end_index, origin_num, origin_count,
                                  threshold_money, buy_single_index, capture_time):
                                  threshold_money, buy_single_index, max_num_index):
        def get_threshold_count():
            count = threshold_count - sub_threshold_count
            if count < 3:
@@ -837,10 +850,15 @@
        # 可以触发买,当有涨停买信号时才会触发买
        trigger_buy = True
        place_order_count = trade_data_manager.placeordercountmanager.get_place_order_count(code)
        if place_order_count>3:
        if place_order_count > 3:
            place_order_count = 3
        # 间隔最大时间依次为:3,9,27,81
        max_space_time = pow(3,place_order_count + 1) - 1
        max_space_time = pow(3, place_order_count + 1) - 1
        # 最大买量
        max_buy_num = 0
        max_buy_num_index = max_num_index
        if max_num_index > -1:
            max_buy_num = int(total_datas[max_num_index]["val"]["num"])
        for i in range(compute_start_index, compute_end_index + 1):
            data = total_datas[i]
            _val = total_datas[i]["val"]
@@ -850,17 +868,20 @@
                TradePointManager.delete_buy_point(code)
                if i == compute_end_index:
                    # 数据处理完毕
                    return None, buy_nums, buy_count, None
                    return None, buy_nums, buy_count, None, max_buy_num_index
                else:
                    # 计算买入信号,不能同一时间开始计算
                    for ii in range(buy_single_index + 1, compute_end_index + 1):
                        if total_datas[buy_single_index]["val"]["time"] != total_datas[ii]["val"]["time"]:
                            return None, buy_nums, buy_count, ii
                            return None, buy_nums, buy_count, ii, max_buy_num_index
            # 涨停买
            if L2DataUtil.is_limit_up_price_buy(_val):
                if cls.__is_big_money(limit_up_price, _val):
                    sub_threshold_count += int(total_datas[i]["re"])
                if round(int(_val["num"]) * float(_val["price"])) >= 5900:
                    if int(_val["num"]) > max_buy_num:
                        max_buy_num = int(_val["num"])
                        max_buy_num_index = i
                    trigger_buy = True
                    # 只统计59万以上的金额
                    buy_nums += int(_val["num"]) * int(total_datas[i]["re"])
@@ -901,15 +922,16 @@
            cls.buy_debug(code, "位置-{},总手数:{},目标手数:{}", i,
                          buy_nums, threshold_num)
            # 有撤单信号,且小于阈值
            if buy_nums >= threshold_num and buy_count >= get_threshold_count() and trigger_buy:
                return i, buy_nums, buy_count, None
            if buy_nums >= threshold_num and buy_count >= get_threshold_count() and trigger_buy and max_buy_num_index > -1 and cls.__is_big_money(
                    limit_up_price, total_datas[max_buy_num_index]["val"]):
                return i, buy_nums, buy_count, None, max_buy_num_index
        cls.buy_debug(code, "尚未获取到买入执行点,起始计算位置:{} 统计纯买手数:{} 目标纯买手数:{}  统计纯买单数:{} 目标纯买单数:{} 大单数量:{}",
                      compute_start_index,
                      buy_nums,
                      threshold_num, buy_count, get_threshold_count(), sub_threshold_count)
        return None, buy_nums, buy_count, None
        return None, buy_nums, buy_count, None, max_buy_num_index
    @classmethod
    def test(cls):
@@ -1063,6 +1085,7 @@
# 涨停封单额统计
class L2LimitUpMoneyStatisticUtil:
    _redisManager = redis_manager.RedisManager(1)
    _thsBuy1VolumnManager = trade_queue_manager.THSBuy1VolumnManager()
    @classmethod
    def __get_redis(cls):
@@ -1243,6 +1266,10 @@
        # 大单撤销笔数
        cancel_big_num_count = 0
        buy_exec_time = tool.get_time_as_second(total_datas[buy_exec_index]["val"]["time"])
        # 获取最大封单额
        max_buy1_volume = cls._thsBuy1VolumnManager.get_max_buy1_volume(code)
        # 从同花顺买1矫正过后的位置开始计算,到end_index结束
        for i in range(index + 1, end_index + 1):
@@ -1294,6 +1321,12 @@
            if start_index <= i <= end_index:
                # 如果是减小项
                if val < 0:
                    # 当前量小于最大量的24%则需要取消
                    if exec_time_offset >= 30:
                        if total_num <= min_volumn_big and max_buy1_volume * 0.24 > total_num:
                            cancel_index = i
                            cancel_msg = "封板额小于最高封板额的24% {}/{}".format(total_num, max_buy1_volume)
                            break
                    # 累计封单金额小于1000万
                    if total_num < min_volumn:
                        # 与执行位相隔>=5s时规则生效
@@ -1331,7 +1364,7 @@
                                    break
                # ------大单撤处理-------
                # if total_num < min_volumn_big:
                if exec_time_offset < 1800:
                if exec_time_offset < 31:
                    pass
                    # try:
                    #     b_need_cancel, b_cancel_index = AverageBigNumComputer.need_cancel(code, i, i)
@@ -1341,14 +1374,14 @@
                    #         break
                    # except Exception as e:
                    #     logging.exception(e)
                # 30分钟外才执行
                elif 1800 <= exec_time_offset <= 5400:
                # 30s外才执行
                elif 31 <= exec_time_offset:
                    try:
                        b_need_cancel, b_cancel_index = LongAverageBigNumComputer.need_cancel(code, buy_exec_index, i,
                                                                                              i)
                        if b_need_cancel:
                            cancel_index = b_cancel_index
                            cancel_msg = "60s-1h内大单撤销比例触发阈值"
                            cancel_msg = "30s后内大单撤销比例触发阈值"
                            break
                    except Exception as e:
                        logging.exception(e)
@@ -1520,41 +1553,65 @@
        val = cls.__getRedis().get(key)
        return val
    # 保存结束位置
    @classmethod
    def __save_end_index(cls, code, end_index):
        key = "s_average_big_num_end_index_set-{}".format(code)
        cls.__getRedis().sadd(key, end_index)
    @classmethod
    def __list_end_indexs(cls, code):
        key = "s_average_big_num_end_index_set-{}".format(code)
        vals = cls.__getRedis().smembers(key)
        if vals is None:
            return None
        results = []
        for val in vals:
            results.append(int(val))
        results.sort()
        return results
    @classmethod
    def __clear_data(cls, code):
        key = "s_average_big_num_comput_info-{}".format(code)
        cls.__getRedis().delete(key)
        key = "s_average_big_num-{}".format(code)
        cls.__getRedis().delete(key)
        ks = ["s_average_big_num_comput_info-{}".format(code), "s_average_big_num-{}".format(code),
              "s_average_big_num_end_index_set-{}".format(code)]
        for key in ks:
            cls.__getRedis().delete(key)
    @classmethod
    def clear_data(cls):
        key = "s_average_big_num_comput_info-*"
        keys = cls.__getRedis().keys(key)
        for k in keys:
            cls.__getRedis().delete(k)
        key = "s_average_big_num-*"
        keys = cls.__getRedis().keys(key)
        for k in keys:
            cls.__getRedis().delete(k)
        ks = ["s_average_big_num_comput_info-*", "s_average_big_num-*", "s_average_big_num_end_index_set-*"]
        for key in ks:
            keys = cls.__getRedis().keys(key)
            for k in keys:
                cls.__getRedis().delete(k)
    # 计算平均手数
    # 计算范围:买入信号起始点到买入执行位的下一张图结束点数据为止
    @classmethod
    def compute_average_big_num(cls, code, buy_single_index, start_index, end_index):
        cls.__save_end_index(code, end_index)
        # 保存结束位置
        end_indexs = cls.__list_end_indexs(code)
        print("compute_average_big_num", code, buy_single_index, start_index, end_index)
        L2TradeDataProcessor.cancel_debug(code, "开始计算短大单位置")
        total_data = local_today_datas[code]
        num = 0
        count = 0
        apply_time_second = int(cls.get_apply_time(code).replace(":", ""))
        apply_time = cls.get_apply_time(code)
        apply_time_second = int(apply_time.replace(":", ""))
        for ei in end_indexs:
            if int(total_data[ei]["val"]["time"].replace(":", "")) >= apply_time_second:
                end_index = ei
                break
        for i in range(start_index, end_index + 1):
            data = total_data[i]
            val = data["val"]
            if int(val["time"].replace(":", "")) > apply_time_second:
                # 重新设置计算结束位置
                end_index = i - 1
                break
            # if int(val["time"].replace(":", "")) > apply_time_second:
            #     # 重新设置计算结束位置
            #     end_index = i - 1
            #     break
            if L2DataUtil.is_limit_up_price_buy(val):  # and float(val["price"]) * int(val["num"]) > 7500:
                # 75万以上的才参与计算平均大单
@@ -1573,7 +1630,8 @@
                    num += int(val["num"])
        average_num = num // count
        average_num = round(5900 / gpcode_manager.get_limit_up_price(code))
        average_num = min(constant.BIG_MONEY_NUM,
                          round(constant.BIG_MONEY_AMOUNT / gpcode_manager.get_limit_up_price(code)))
        average_up_count = 0
        for i in range(start_index, end_index + 1):
            data = total_data[i]
@@ -1591,10 +1649,16 @@
    @classmethod
    def need_cancel(cls, code, buy_single_index, buy_exec_index, start_index, end_index, need_cancel=True):
        average_num, average_up_count, a_start_index, a_end_index = cls.__get_average_data(code)
        L2TradeDataProcessor.cancel_debug(code,"s级是否需要撤单,数据范围:{}-{}  平均大单信息-({},{},{},{})",start_index,end_index,average_num, average_up_count, a_start_index, a_end_index)
        L2TradeDataProcessor.cancel_debug(code, "s级是否需要撤单,数据范围:{}-{}  平均大单信息-({},{},{},{})", start_index, end_index,
                                          average_num, average_up_count, a_start_index, a_end_index)
        if average_num is None:
            return False, None
        total_data = local_today_datas[code]
        # 只守护30s
        if tool.trade_time_sub(total_data[start_index]["val"]["time"], total_data[buy_exec_index]["val"]["time"]) > 30:
            return False, None
        # 如果start_index与buy_single_index相同,即是下单后的第一次计算
        # 需要查询买入信号之前的同1s是否有涨停撤的数据
        if buy_single_index == start_index:
@@ -1625,6 +1689,17 @@
                                                                                     code))
                if buy_index is not None and a_start_index <= buy_index <= a_end_index:
                    cls.__save_cancel_data(code, i)
                else:
                    # 有部分撤销从而导致的无法溯源,这时就需要判断预估买入时间是否在a_start_index到a_end_index的时间区间
                    min_space, max_space = l2_data_util.compute_time_space_as_second(val["cancelTime"],
                                                                                     val["cancelTimeUnit"])
                    # 只判断S级撤销,只有s级撤销才有可能相等
                    if max_space - min_space <= 1:
                        buy_time = tool.trade_time_add_second(val["time"], 0 - min_space)
                        if int(total_data[a_start_index]["val"]["time"].replace(":", "")) <= int(
                                buy_time.replace(":", "")) <= int(
                            total_data[a_end_index]["val"]["time"].replace(":", "")):
                            cls.__save_cancel_data(code, i)
        if need_cancel:
            # 计算买撤大单暂比
            cancel_datas = cls.__get_cancel_datas(code)
@@ -1635,9 +1710,9 @@
                if place_order_count <= 1:
                    cancel_rate_threshold = 0.49
                elif place_order_count <= 2:
                    cancel_rate_threshold = 0.549
                else:
                    cancel_rate_threshold = 0.59
                else:
                    cancel_rate_threshold = 0.69
                cancel_indexs = []
                for index in cancel_datas:
                    cancel_indexs.append(int(index))
@@ -1717,9 +1792,16 @@
    @classmethod
    def test(cls):
        cls.__test(("000909", 607, 646, 646, 694))
        # cls.__test(("000909", 607, 646, 646, 694))
        # 代码 买入信号起始点  买入信息执行位置  计算末位 最远计算位置
        # cls.__test(("002793", 292, 308, 314, 410))
        cls.__save_end_index("000333", 200)
        cls.__save_end_index("000333", 101)
        cls.__save_end_index("000333", 99)
        cls.__save_end_index("000333", 120)
        cls.__save_end_index("000333", 126)
        cls.__save_end_index("000333", 126)
        print(cls.__list_end_indexs("000333"))
        # 执行是否需要撤销
@@ -1810,7 +1892,8 @@
        average_num = num // count
        # average_num = 0
        average_num = round(5900 / gpcode_manager.get_limit_up_price(code))
        average_num = min(constant.BIG_MONEY_NUM,
                          round(constant.BIG_MONEY_AMOUNT / gpcode_manager.get_limit_up_price(code)))
        average_up_count = 0
        for i in range(start_index, end_index + 1):
            data = total_data[i]
@@ -1825,6 +1908,8 @@
    # 是否需要撤单
    @classmethod
    def need_cancel(cls, code, buy_single_index, buy_exec_index, start_index, end_index, need_cancel=True):
        # 暂时取消此撤单条件
        return False, None
        average_num, average_up_count, a_start_index, a_end_index = cls.__get_average_data(code)
        if average_num is None:
            return False, None
@@ -1946,27 +2031,29 @@
        return cls.__redis_manager.getRedis()
    @classmethod
    def __save_average_data(cls, code, average_num, average_up_count, start_index, end_index):
        L2TradeDataProcessor.cancel_debug(code, "获取到长大单位置信息:平均手数-{} 大单数量-{} 计算开始范围-{}:{}".format(average_num,
                                                                                                 average_up_count,
                                                                                                 start_index,
                                                                                                 end_index))
    def __save_average_data(cls, code, average_num, average_up_count, total_count, start_index, end_index):
        L2TradeDataProcessor.cancel_debug(code, "获取到长大单位置信息:平均手数-{} 大单数量-{} 样本数量-{} 计算开始范围-{}:{}".format(average_num,
                                                                                                         average_up_count,
                                                                                                         total_count,
                                                                                                         start_index,
                                                                                                         end_index))
        key = "l_average_big_num-{}".format(code)
        cls.__getRedis().setex(key, 3600, json.dumps((average_num, average_up_count, start_index, end_index)))
        cls.__getRedis().setex(key, tool.get_expire(),
                               json.dumps((average_num, average_up_count, total_count, start_index, end_index)))
    @classmethod
    def __get_average_data(cls, code):
        key = "l_average_big_num-{}".format(code)
        val = cls.__getRedis().get(key)
        if val is None:
            return None, None, None, None
            return None, None, None, None, None
        val = json.loads(val)
        return val[0], val[1], val[2], val[3]
        return val[0], val[1], val[2], val[3], val[4]
    @classmethod
    def __save_compute_info(cls, code, cancel_count, process_index):
        key = "l_average_big_num_comput_info-{}".format(code)
        cls.__getRedis().setex(key, 3600, json.dumps((cancel_count, process_index)))
        cls.__getRedis().setex(key, tool.get_expire(), json.dumps((cancel_count, process_index)))
    @classmethod
    def __get_compute_info(cls, code):
@@ -1989,14 +2076,14 @@
    @classmethod
    def compute_average_big_num(cls, code, buy_single_index, buy_exec_index):
        total_data = local_today_datas[code]
        latest_index = total_data[-1]["index"]
        end_index = total_data[-1]["index"]
        start_index = buy_single_index
        start_index = buy_exec_index
        if tool.trade_time_sub(total_data[end_index]["val"]["time"], total_data[buy_exec_index]["val"]["time"]) < 3:
            return
        exec_time = total_data[buy_exec_index]["val"]["time"]
        o_average_num, o_average_up_count, o_start_index, o_start_index = cls.__get_average_data(code)
        if o_average_num is not None:
        o_average_num, o_average_up_count, o_count, o_start_index, o_start_index = cls.__get_average_data(code)
        if o_average_num is not None and o_count >= constant.H_CANCEL_BUY_COUNT:
            return
        # 获取买入执行位后2s的数据末位
        for i in range(end_index, buy_exec_index, - 1):
@@ -2009,32 +2096,46 @@
        for i in range(start_index, end_index + 1):
            data = total_data[i]
            val = data["val"]
            if L2DataUtil.is_limit_up_price_buy(val):
            if L2DataUtil.is_limit_up_price_buy(val) and int(val["num"]) * float(val["price"]) >= 5900:
                count += data["re"]
                num += int(val["num"])
        average_num = num / count
        average_num = round(average_num)
                num += int(val["num"]) * data["re"]
        # 如果小于30笔,需要再往后计算
        if count < constant.H_CANCEL_BUY_COUNT:
            for i in range(end_index + 1, latest_index + 1, 1):
                data = total_data[i]
                val = data["val"]
                if L2DataUtil.is_limit_up_price_buy(val) and int(val["num"]) * float(val["price"]) >= 5900:
                    count += data["re"]
                    num += int(val["num"]) * data["re"]
                    if count >= constant.H_CANCEL_BUY_COUNT:
                        end_index = i
                        break
        # 获取大单数量
        average_up_count = 0
        average_num = round(num / count)
        for i in range(start_index, end_index + 1):
            data = total_data[i]
            val = data["val"]
            if L2DataUtil.is_limit_up_price_buy(val):
                if int(val["num"]) >= average_num:
                    average_up_count += data["re"]
            if int(val["num"]) >= average_num:
                average_up_count += data["re"]
        # 保存数据
        cls.__save_average_data(code, average_num, average_up_count, start_index, end_index)
        cls.__save_average_data(code, average_num, average_up_count, count, start_index, end_index)
        cls.__save_compute_info(code, 0, buy_exec_index)
    # 是否需要撤单
    @classmethod
    def need_cancel(cls, code, buy_exec_index, start_index, end_index):
        average_num, average_up_count, a_start_index, a_end_index = cls.__get_average_data(code)
        average_num, average_up_count, total_count, a_start_index, a_end_index = cls.__get_average_data(code)
        if average_num is None:
            return False, None
        cancel_count, process_index = cls.__get_compute_info(code)
        total_data = local_today_datas[code]
        # 14:30过后不再守护
        if int(total_data[end_index]["val"]["time"].replace(":", "")) > int("143000"):
            return False, None
        try:
            for i in range(start_index, end_index + 1):
            for i in range(min(start_index, buy_exec_index + 1), end_index + 1):
                if i <= buy_exec_index:
                    continue
                if process_index >= i:
@@ -2050,11 +2151,9 @@
                        # 买入位置要在平均值计算范围内
                        cancel_count += data["re"]
                        process_index = i
                        if tool.trade_time_sub(val["time"], total_data[buy_exec_index]["val"]["time"]) > 3600:
                            continue
                        sj = 0  # 5 * tool.trade_time_sub(val["time"],total_data[buy_exec_index]["val"]["time"])
                        print("计算结果", cancel_count, average_up_count, sj)
                        if cancel_count / (average_up_count - sj) >= 0.79:
                        print("h平均大单计算结果:", "取消数量", cancel_count, "大单总数", average_up_count, sj)
                        if cancel_count / (average_up_count - sj) >= 0.75:
                            return True, i
        finally:
            cls.__save_compute_info(code, cancel_count, process_index)
@@ -2094,6 +2193,8 @@
if __name__ == "__main__":
    # trade_manager.start_cancel_buy("000637")
    # t.sleep(10)
    # AverageBigNumComputer.test()
    # LongAverageBigNumComputer.test()
    # L2TradeDataProcessor.test()
l2_trade_test.py
@@ -11,11 +11,13 @@
import log
import redis_manager
import tool
import trade_data_manager
import trade_manager
from l2_data_manager import TradePointManager
# from l2_data_manager_new import L2TradeDataProcessor, L2LimitUpMoneyStatisticUtil, AverageBigNumComputer
# from trade_queue_manager import THSBuy1VolumnManager
from l2_data_manager_new import L2TradeDataProcessor, L2LimitUpMoneyStatisticUtil, AverageBigNumComputer, \
    SecondAverageBigNumComputer
from trade_queue_manager import THSBuy1VolumnManager
def clear_trade_data(code):
@@ -27,7 +29,7 @@
    big_money_num_manager.reset(code)
    redis_trade = redis_manager.RedisManager(2).getRedis()
    redis_trade.delete("trade-state-{}".format(code))
    trade_data_manager.placeordercountmanager.clear_place_order_count(code)
    redis_info = redis_manager.RedisManager(0).getRedis()
    keys = redis_info.keys("*{}*".format(code))
    for k in keys:
@@ -39,42 +41,46 @@
        redis_info.delete(k)
# class VirtualTrade(unittest.TestCase):
#     code = "002093"
#     clear_trade_data(code)
#     l2_data_manager.load_l2_data(code)
#     total_datas = l2_data_manager.local_today_datas[code]
#     pos_list = log.get_l2_process_position(code)
#     del pos_list[-1]
#     if pos_list[-1][1] < total_datas[-1]["index"]:
#         # 剩下的数据根据秒来分
#         start_index = -1
#         for i in range(pos_list[-1][1] + 1, total_datas[-1]["index"] + 1):
#             if total_datas[i]["val"]["time"] != total_datas[i - 1]["val"]["time"]:
#                 if start_index < 0:
#                     start_index = i
#                 else:
#                     pos_list.append((start_index, i - 1))
#                     start_index = i
#     if pos_list[-1][1] < total_datas[-1]["index"]:
#         pos_list.append((pos_list[-1][1] + 1, total_datas[-1]["index"]))
#     l2_data_manager_new.local_today_datas = {code: []}
#     l2_trade_factor.L2TradeFactorUtil.get_safe_buy_count = mock.Mock(return_value=10)
#     for indexs in pos_list:
#         L2TradeDataProcessor.random_key[code] = mock.Mock(return_value=random.randint(0, 100000))
#         # 设置封单额,获取买1量
#         for i in range(0, 100):
#             time_ = total_datas[indexs[0]]["val"]["time"]
#             time_s = tool.get_time_as_second(time_) - i - 1
#             volumn = THSBuy1VolumnManager().get_buy_1_volumn(code, tool.time_seconds_format(time_s))
#             if volumn is not None:
#                 l2_data_manager_new.L2LimitUpMoneyStatisticUtil.verify_num(code, int(volumn),
#                                                                            tool.time_seconds_format(time_s))
#                 break
#
#         print("----------------处理位置", indexs)
#         L2TradeDataProcessor.process_add_datas(code, total_datas[indexs[0]:indexs[1] + 1], 0, 0)
#
class VirtualTrade(unittest.TestCase):
    code = "002043"
    clear_trade_data(code)
    l2_data_manager.load_l2_data(code)
    total_datas = l2_data_manager.local_today_datas[code]
    pos_list = log.get_l2_process_position(code)
    if pos_list[0][0] > 0:
        pos_list.insert(0, (0, pos_list[0][0] - 1))
    del pos_list[-1]
    if pos_list[-1][1] < total_datas[-1]["index"]:
        # 剩下的数据根据秒来分
        start_index = -1
        for i in range(pos_list[-1][1] + 1, total_datas[-1]["index"] + 1):
            if total_datas[i]["val"]["time"] != total_datas[i - 1]["val"]["time"]:
                if start_index < 0:
                    start_index = i
                else:
                    pos_list.append((start_index, i - 1))
                    start_index = i
    if pos_list[-1][1] < total_datas[-1]["index"]:
        pos_list.append((pos_list[-1][1] + 1, total_datas[-1]["index"]))
    l2_data_manager_new.local_today_datas = {code: []}
    l2_trade_factor.L2TradeFactorUtil.get_safe_buy_count = mock.Mock(return_value=21)
    for indexs in pos_list:
        L2TradeDataProcessor.random_key[code] = mock.Mock(return_value=random.randint(0, 100000))
        if indexs[0] >=898:
            print("")
        # 设置封单额,获取买1量
        for i in range(0, 100):
            time_ = total_datas[indexs[0]]["val"]["time"]
            time_s = tool.get_time_as_second(time_) - i - 1
            volumn = THSBuy1VolumnManager().get_buy_1_volumn(code, tool.time_seconds_format(time_s))
            if volumn is not None:
                l2_data_manager_new.L2LimitUpMoneyStatisticUtil.verify_num(code, int(volumn),
                                                                           tool.time_seconds_format(time_s))
                break
        print("----------------处理位置", indexs)
        L2TradeDataProcessor.process_add_datas(code, total_datas[indexs[0]:indexs[1] + 1], 0, 0)
# class TestTrade(unittest.TestCase):
#     processor = L2TradeDataProcessor()
log.py
@@ -193,7 +193,7 @@
def load_l2_from_log(date=None):
    today_data = {}
    if  date is None:
    if date is None:
        date = datetime.datetime.now().strftime("%Y-%m-%d")
    try:
        with open("D:/logs/gp/l2/l2_data.{}.log".format(date), mode='r') as f:
@@ -220,6 +220,12 @@
    return today_data
# 获取日志时间
def __get_log_time(line):
    time_ = line.split("|")[0].split(" ")[1].split(".")[0]
    return time_
# 获取L2每次批量处理数据的位置范围
def get_l2_process_position(code, date=None):
    if not date:
@@ -232,9 +238,11 @@
                break
            if line.find("code:{}".format(code)) < 0:
                continue
            time_ = __get_log_time(line)
            line = line[line.find("处理数据范围") + len("处理数据范围") + 1:line.find("处理时间")].strip()
            if len(pos_list) == 0 or pos_list[-1][1] < int(line.split("-")[0]):
                pos_list.append((int(line.split("-")[0]), int(line.split("-")[1])))
                if int("093000") <= int(time_.replace(":", "")) <= int("150000"):
                    pos_list.append((int(line.split("-")[0]), int(line.split("-")[1])))
    return pos_list
@@ -251,6 +259,9 @@
            if line.find("code={}".format(code)) < 0:
                continue
            print(line)
            time_ = __get_log_time(line)
            if int("093000") > int(time_.replace(":", "")) or int(time_.replace(":", "")) > int("150000"):
                continue
            if line.find("获取到买入信号起始点") > 0:
                str_ = line.split("获取到买入信号起始点:")[1].strip()
@@ -290,7 +301,7 @@
if __name__ == '__main__':
    # logger_l2_process_time.info("test123")
    codes = ["000909"]
    codes = ["002766"]
    for code in codes:
        export_logs(code)
trade_data_manager.py
@@ -324,6 +324,11 @@
    def get_place_order_count(cls, code):
        return cls.__get_place_order_count(code)
    @classmethod
    def clear_place_order_count(cls, code):
        key = "place_order_count-{}".format(code)
        cls.__get_redis().delete(key)
if __name__ == "__main__":
    processor = CodeActualPriceProcessor()
trade_gui.py
@@ -326,38 +326,42 @@
                        pass
        return 0
    def __get_code_input(self):
        win = self.cancel_win
        if win <= 0 or not win32gui.IsWindowVisible(win):
            self.cancel_win = self.getCancelBuyWin()
            win = self.cancel_win
            if win <= 0:
                raise Exception("无法找到取消委托窗口")
        t = time.time()
        print(t)
        start = int(round(t * 1000))
        print(win)
        # 输入框控件ID 0x000003E9
        code_input = win32gui.GetDlgItem(win, 0x00000996)
        code_input = win32gui.FindWindowEx(code_input, 0, "Edit", None)
        # 刷新句柄
        if code_input <= 0:
            self.refresh_hwnds()
            code_input = win32gui.GetDlgItem(win, 0x00000996)
            code_input = win32gui.FindWindowEx(code_input, 0, "Edit", None)
        return code_input, win
    # 撤买
    def cancel_buy(self, code):
        if constant.TEST:
            return
        self.buy_cancel_lock.acquire()
        code_input = 0
        logger_trade_gui.info("开始获取撤单控件:code-{}".format(code))
        code_input, win = self.__get_code_input()
        try:
            logger_trade_gui.info("开始撤单:code-{}".format(code))
            win = self.cancel_win
            if win <= 0 or not win32gui.IsWindowVisible(win):
                self.cancel_win = self.getCancelBuyWin()
                win = self.cancel_win
                if win <= 0:
                    raise Exception("无法找到取消委托窗口")
            t = time.time()
            print(t)
            start = int(round(t * 1000))
            print(win)
            # 输入框控件ID 0x000003E9
            code_input = win32gui.GetDlgItem(win, 0x00000996)
            code_input = win32gui.FindWindowEx(code_input, 0, "Edit", None)
            # 刷新句柄
            if code_input <= 0:
                self.refresh_hwnds()
                code_input = win32gui.GetDlgItem(win, 0x00000996)
                code_input = win32gui.FindWindowEx(code_input, 0, "Edit", None)
            start = int(round(time.time() * 1000))
            code_result = "-"
            retry_count = 0
            while code != code_result and retry_count < 5:
                if retry_count > 0:
                code_result = self.getText(code_input)
                if retry_count > 0 or len(code_result) > 0:
                    self.input_number(code_input, "")
                    time.sleep(0.01)
@@ -369,6 +373,7 @@
                code_result = code_result.split(",")[0]
                retry_count += 1
            if code != code_result:
                self.input_number(code_input, "")
                raise Exception("输入代码出错")
            # 撤单快捷键X
@@ -382,13 +387,28 @@
            end = int(round(t * 1000))
            print("耗时", end - start)
            logger_trade_gui.info("撤单成功:code-{} 耗时:{}".format(code, end - start))
            time.sleep(0.05)
            time.sleep(0.03)
        finally:
            self.buy_cancel_lock.release()
            # 清空代码框
            self.input_number(code_input, "")
            # 再次清除代码框
            self.input_number(code_input, "")
    # 交易盘口中的撤买
    def cancel_buy_again(self, code):
        if constant.TEST:
            return
        win = THSBuyWinManagerNew.get_distributed_code_win(code)
        if win is None or win <= 0:
            raise Exception("没找到分配的交易窗口")
        cancel_btn_hwnd = win32gui.FindWindowEx(win, 0, "Button", "撤单")
        if cancel_btn_hwnd <= 0:
            raise Exception("没有找到撤单按钮")
        if not win32gui.IsWindowVisible(cancel_btn_hwnd):
            raise Exception("撤单按钮不可见")
        THSGuiUtil.click(cancel_btn_hwnd)
    # 刷新交易窗口数据
    @async_call
@@ -777,7 +797,7 @@
        code_name_win = win32gui.GetDlgItem(trade_win, 0x000005C2)
        name = THSGuiUtil.getText(code_name_win)
        if name is not None:
            name=name.replace(" ","")
            name = name.replace(" ", "")
        return tool.strQ2B(name)
    @classmethod
@@ -816,7 +836,7 @@
                    cls.cancel_distribute_win_for_code(code)
                else:
                    code_name = cls.__get_code_name(win)
                    #'深振业A'
                    # '深振业A'
                    if name_codes.get(code_name) != code:
                        cls.cancel_distribute_win_for_code(code)
                continue
@@ -851,7 +871,10 @@
if __name__ == '__main__':
    THSGuiTrade().cancel_buy("000419")
    try:
        THSGuiTrade().cancel_buy_again("000637")
    except Exception as e:
        print(e)
    # GUITest().test_distribute()
    # try:
    #     THSGuiUtil.set_buy_window_code(0x000112D0, "000333")
trade_manager.py
@@ -244,14 +244,40 @@
    try:
        logger_trade.info("{}开始撤单".format(code))
        set_trade_state(code, TRADE_STATE_BUY_CANCEL_ING)
        logger_trade.info("{}撤单方法开始".format(code))
        guiTrade.cancel_buy(code)
        logger_trade.info("{}撤单方法结束".format(code))
        __cancel_success(code)
        try:
            cancel_buy_again(code)
        except Exception as e1:
            pass
    except Exception as e:
        # 状态还原
        set_trade_state(code, trade_state)
        logger_trade.error("{}撤单异常:{}".format(code, str(e)))
        raise e
    logger_trade.info("{}撤单完毕".format(code))
# 再次撤单,防止没有撤掉
@tool.async_call
def cancel_buy_again(code):
    time.sleep(0.1)
    for i in range(0, 5):
        # 如果时
        trade_state = get_trade_state(code)
        if trade_state != TRADE_STATE_BUY_CANCEL_ING and trade_state != TRADE_STATE_BUY_CANCEL_SUCCESS:
            return
        try:
            logger_trade.info("{}:开始再次撤单", code)
            guiTrade.cancel_buy_again(code)
            logger_trade.info("{}:再次撤单成功", code)
            break
        except Exception as e:
            logger_trade.error("{}再次撤单异常:{}".format(code, str(e)))
            time.sleep(0.1+0.05*i)
            pass
# 取消委托成功
def __cancel_success(code):
@@ -338,7 +364,4 @@
if __name__ == "__main__":
    # time_str = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    # print(time_str)
    # __clear_big_data()
    pass
    cancel_buy_again("000637")
trade_queue_manager.py
@@ -16,6 +16,18 @@
    def __get_redis(self):
        return self.__redisManager.getRedis()
    # 保存最大量
    def __save_max_buy1_volume(self, code, volume):
        key = "max_buy1_volumn-{}".format(code)
        self.__get_redis().setex(key, tool.get_expire(), volume)
    def __get_max_buy1_volume(self, code):
        key = "max_buy1_volumn-{}".format(code)
        val = self.__get_redis().get(key)
        if val is not None:
            return int(val)
        return None
    def __save_recod(self, code, time_str, volumn):
        # 保存每一次的
@@ -83,6 +95,14 @@
            return False, False, None
        self.__last_data[code] = volumn
        if int(time_str.replace(':', '')) >= int("093000"):
            # 保存最大量(9:30过后的量)
            max_volume = self.__get_max_buy1_volume(code)
            if max_volume is None:
                max_volume = 0
            if volumn > max_volume:
                self.__save_max_buy1_volume(code, volumn)
        if code not in self.__code_time_volumn_dict:
            self.__code_time_volumn_dict[code] = {}
        self.__code_time_volumn_dict[code][time_str] = volumn
@@ -129,6 +149,12 @@
        time_str, volumn = self.__get_latest_record(code)
        return time_str, volumn
    def get_max_buy1_volume(self, code):
        val = self.__get_max_buy1_volume(code)
        if val is None:
            return -1
        return val
class JueJinBuy1VolumnManager:
    __redisManager = redis_manager.RedisManager(1)