Administrator
2022-12-09 954e42723fab626b33f6dbff9246bd235981fe7a
撤单策略初步修改
13个文件已修改
275 ■■■■ 已修改文件
big_money_num_manager.py 17 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
gpcode_manager.py 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
juejin.py 16 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_data_log.py 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_data_manager.py 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_data_manager_new.py 161 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_data_util.py 11 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_trade_factor.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
redis_manager.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
server.py 18 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
tool.py 14 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade_data_manager.py 17 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade_gui.py 5 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
big_money_num_manager.py
@@ -9,6 +9,14 @@
__redisManager = redis_manager.RedisManager(0)
# 是否为大单
def is_big_num(val):
    if int(val["num"]) >= 8000 or int(val["num"]) * float(val["price"]) >= 30000:
        return True
    else:
        return False
def add_num(code, num):
    redis = __redisManager.getRedis()
    redis.incrby("big_money-{}".format(code), num)
@@ -30,7 +38,14 @@
    num = redis.get("big_money-{}".format(code))
    if num is None:
        return 0
    return round(int(num)/1000/4)
    return round(int(num) / 1000 / 4)
def reset_all():
    redis = __redisManager.getRedis()
    keys = redis.keys("big_money-*")
    for k in keys:
        redis.setex(k, tool.get_expire(), 0)
if __name__ == "__main__":
gpcode_manager.py
@@ -133,6 +133,7 @@
        return None
    limit_up_price = tool.to_price(decimal.Decimal(str(price)) * decimal.Decimal("1.1"))
    __limit_up_price_dict[code] = limit_up_price
    return limit_up_price
def get_limit_up_price_by_preprice(price):
juejin.py
@@ -58,6 +58,10 @@
def init_data():
    # 重置所有的大单数据
    big_money_num_manager.reset_all()
    # 清除水下捞数据
    __actualPriceProcessor.clear_under_water_data()
    # 载入行业股票代码
    global_data_loader.load_industry()
    # 载入代码自由流通市值
@@ -68,6 +72,10 @@
# 每日初始化
def everyday_init():
    # 交易時間不能做初始化
    if not tool.is_init_time():
        raise Exception("交易时间不能初始化")
    codes = gpcode_manager.get_gp_list()
    logger_system.info("每日初始化")
@@ -246,6 +254,9 @@
def accpt_prices(prices):
    print("价格代码数量:", len(prices))
    __actualPriceProcessor.save_current_price_codes_count(len(prices))
    # 采集的代码数量不对
    if len(gpcode_manager.get_gp_list()) - len(prices) > 2:
        return
    now_str = datetime.datetime.now().strftime("%H:%M:%S")
    now_strs = now_str.split(":")
    now_second = int(now_strs[0]) * 60 * 60 + int(now_strs[1]) * 60 + int(now_strs[2])
@@ -276,7 +287,10 @@
                    logging.exception(e)
                try:
                    __actualPriceProcessor.save_current_price(code, price, gpcode_manager.get_limit_up_price_by_preprice(pricePre) == tool.to_price(decimal.Decimal(d["price"])))
                    __actualPriceProcessor.save_current_price(code, price,
                                                              gpcode_manager.get_limit_up_price_by_preprice(
                                                                  pricePre) == tool.to_price(
                                                                  decimal.Decimal(d["price"])))
                except Exception as e:
                    logging.exception(e)
l2_data_log.py
@@ -6,7 +6,9 @@
def l2_time(code, time_, description, new_line=False):
    timestamp = int(time.time() * 1000)
    log.logger_l2_process_time.info("{} {}: {}-{}{}",timestamp, description, code, time_, "\n" if new_line else "")
    # 只记录耗时较长的信息
    if time_ > 50:
        log.logger_l2_process_time.info("{} {}: {}-{}{}", timestamp, description, code, time_, "\n" if new_line else "")
    return timestamp
@@ -22,4 +24,4 @@
        log.logger_l2_trade_buy.debug(("thread-id={} code={}  ".format(self.key, code) + content).format(*args))
    def trade_cancel(self, code, content, *args):
        log.logger_l2_trade_cancel.debug(("thread-id={} code={}  ".format(self.key, code) + content).format(*args))
        log.logger_l2_trade_cancel.debug(("thread-id={} code={}  ".format(self.key, code) + content).format(*args))
l2_data_manager.py
@@ -424,6 +424,7 @@
            return False
        return True
    # 是否为涨停卖
    @classmethod
    def is_limit_up_price_sell(cls, val):
        if int(val["limitPrice"]) != 1:
l2_data_manager_new.py
@@ -217,14 +217,15 @@
                        state = trade_manager.get_trade_state(code)
                        start_index = len(total_datas) - len(add_datas)
                        end_index = len(total_datas) - 1
                        if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER:
                        if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER or state == trade_manager.TRADE_STATE_BUY_SUCCESS:
                            # 已挂单
                            cls.__process_order(code, start_index, end_index, capture_timestamp)
                        else:
                            # 未挂单
                            cls.__process_not_order(code, start_index, end_index, capture_timestamp)
                    logger_l2_process.info("code:{} 处理数据范围: {}-{} 处理时间:{}", code, add_datas[0]["index"],
                                           add_datas[-1]["index"], round(t.time() * 1000) - __start_time)
                    logger_l2_process.info("code:{} 处理数据范围: {}-{} 处理时间:{} 截图时间戳:{}", code, add_datas[0]["index"],
                                           add_datas[-1]["index"], round(t.time() * 1000) - __start_time,
                                           capture_timestamp)
                    __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time, "l2数据处理时间")
                # 保存数据
                l2_data_manager.save_l2_data(code, datas, add_datas)
@@ -258,7 +259,7 @@
        buy_single_index, buy_exec_index, buy_compute_index, num, count = cls.__get_order_begin_pos(code)
        # 撤单计算,只看买1
        cancel_data, cancel_msg = L2LimitUpMoneyStatisticUtil.process_data(code, start_index, end_index,
                                                                           buy_single_index)
                                                                           buy_single_index, buy_exec_index)
        # 计算m值大单
        cls.l2BigNumForMProcessor.process(code, max(buy_single_index, start_index), end_index,
@@ -368,7 +369,7 @@
            return False, "同一板块中老三,老四,...不能买"
        if cls.__codeActualPriceProcessor.is_under_water(code):
            # 水下捞且板块中的票小于21不能买
            # 水下捞且板块中的票小于16不能买
            if global_util.industry_hot_num.get(industry) is not None and global_util.industry_hot_num.get(
                    industry) <= 16:
                return False, "水下捞,板块中的票小于2只,为{}".format(global_util.industry_hot_num.get(industry))
@@ -518,7 +519,8 @@
            l2_data_manager.TradePointManager.delete_buy_cancel_point(code)
            # 涨停封单额计算
            L2LimitUpMoneyStatisticUtil.process_data(code, buy_single_index, compute_index, buy_single_index, False)
            L2LimitUpMoneyStatisticUtil.process_data(code, buy_single_index, compute_index, buy_single_index,
                                                     buy_exec_index, False)
            _start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - _start_time, "记录执行买入数据")
@@ -628,7 +630,13 @@
            count = threshold_count - sub_threshold_count
            if count < 3:
                count = 3
            return round(count*buy1_factor)
            count = round(count * buy1_factor)
            # 最高30笔,最低8笔
            if count > 30:
                count = 30
            if count < 8:
                count = 8
            return count
        _start_time = t.time()
        total_datas = local_today_datas[code]
@@ -846,6 +854,17 @@
                    return True, "9:30涨停的老大,老二可以下单"
            return False, "老大非9:30涨停,老二不能下单"
    @classmethod
    def test3(cls):
        code = "002693"
        load_l2_data(code, True)
        start_index = 334
        end_index = 341
        buy_single_index = 152
        cls.random_key[code] = random.randint(0, 100000)
        L2LimitUpMoneyStatisticUtil.process_data(code, start_index, end_index,
                                                 buy_single_index)
# 涨停封单额统计
class L2LimitUpMoneyStatisticUtil:
@@ -969,12 +988,13 @@
    # 返回取消的标志数据
    # with_cancel 是否需要判断是否撤销
    @classmethod
    def process_data(cls, code, start_index, end_index, buy_single_begin_index, with_cancel=True):
    def process_data(cls, code, start_index, end_index, buy_single_begin_index, buy_exec_index, with_cancel=True):
        start_time = round(t.time() * 1000)
        total_datas = local_today_datas[code]
        time_dict_num = {}
        # 记录计算的坐标
        time_dict_num_index = {}
        # 坐标-量的map
        num_dict = {}
        # 统计时间分布
        time_dict = {}
@@ -1022,8 +1042,39 @@
        time_list = []
        # 到当前时间累积的买1量
        time_total_num_dict = {}
        # 大单撤销笔数
        cancel_big_num_count = 0
        buy_exec_time = tool.get_time_as_second(total_datas[buy_exec_index])
        # 从同花顺买1矫正过后的位置开始计算,到end_index结束
        for i in range(index + 1, end_index + 1):
            data = total_datas[i]
            # 统计撤销数量
            if big_money_num_manager.is_big_num(data["val"]):
                if L2DataUtil.is_limit_up_price_buy_cancel(data["val"]):
                    cancel_big_num_count += int(data["re"])
                    # TODO 大量重复的工作需要处理,可以暂存在内存中,从而减少计算
                    # 获取是否在买入执行信号周围2s
                    buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data["val"],
                                                                                     local_today_num_operate_map.get(
                                                                                         code))
                    if buy_index is not None and buy_data is not None:
                        # 相差1s
                        buy_time = buy_data["val"]["time"]
                        if abs(buy_exec_time - tool.get_time_as_second(buy_time)) < 2:
                            cancel_big_num_count += int(data["re"])
                elif L2DataUtil.is_limit_up_price_buy(data["val"]):
                    cancel_big_num_count -= int(data["re"])
            threshold_rate = 0.5
            if cancel_big_num_count >= 0:
                if cancel_big_num_count < 10:
                    threshold_rate = threshold_rate - cancel_big_num_count * 0.01
                else:
                    threshold_rate = threshold_rate - 10 * 0.01
            time_ = data["val"]["time"]
            if time_ not in time_start_index_dict:
                # 记录每一秒的开始位置
@@ -1048,12 +1099,27 @@
                # 上1s的总数
                last_second_total_volumn = time_total_num_dict.get(time_list[-1])
                if last_second_total_volumn > 0 and (
                        last_second_total_volumn - total_num) / last_second_total_volumn >= 0.5:
                        last_second_total_volumn - total_num) / last_second_total_volumn >= threshold_rate:
                    # 相邻2s内的数据减小50%
                    cancel_index = i
                    cancel_msg = "相邻2s({})内的封单量减小50%({}->{})".format(time_, last_second_total_volumn,
                                                                     total_num)
                    break
                # 记录中有上2个数据
                if len(time_list) >= 2:
                    # 倒数第2个数据
                    last_2_second_total_volumn = time_total_num_dict.get(time_list[-2])
                    if last_2_second_total_volumn > 0:
                        if last_2_second_total_volumn > last_second_total_volumn > total_num:
                            dif = last_2_second_total_volumn - total_num
                            if dif / last_2_second_total_volumn >= threshold_rate:
                                cancel_index = i
                                cancel_msg = "相邻3s({})内的封单量(第3秒 与 第1的 减小比例)减小50%({}->{}->{})".format(time_,
                                                                                                     last_2_second_total_volumn,
                                                                                                     last_second_total_volumn,
                                                                                                     total_num)
                                break
        if not with_cancel:
            cancel_index = None
@@ -1072,7 +1138,82 @@
        return None, None
# 涨停卖统计
class L2LimitUpSellStatisticUtil:
    _redisManager = redis_manager.RedisManager(0)
    @classmethod
    def __get_redis(cls):
        return cls._redisManager.getRedis()
    # 新增卖数据
    @classmethod
    def __incre_sell_data(cls, code, num):
        key = "limit_up_sell_num-{}".format(code)
        cls.__get_redis().incrby(key, num)
    @classmethod
    def __get_sell_data(cls, code):
        key = "limit_up_sell_num-{}".format(code)
        val = cls.__get_redis().get(key)
        if val is None:
            return 0
        return int(val)
    @classmethod
    def __save_process_index(cls, code, index):
        key = "limit_up_sell_index-{}".format(code)
        cls.__get_redis().setex(key, tool.get_expire(), index)
    @classmethod
    def __get_process_index(cls, code):
        key = "limit_up_sell_index-{}".format(code)
        val = cls.__get_redis().get(key)
        if val is None:
            return -1
        return int(val)
    # 清除数据
    @classmethod
    def delete(cls, code):
        key = "limit_up_sell_num-{}".format(code)
        cls.__get_redis().delete(key)
        key = "limit_up_sell_index-{}".format(code)
        cls.__get_redis().delete(key)
    # 处理数据,返回是否需要撤单
    @classmethod
    def process(cls, code, start_index, end_index, buy_exec_index):
        # 获取涨停卖的阈值
        limit_up_price = gpcode_manager.get_limit_up_price(code)
        zyltgb = l2_trade_factor.L2TradeFactorUtil.get_zyltgb(code)
        threshold_num = zyltgb * 0.015 // (limit_up_price * 100)
        total_num = cls.__get_sell_data(code)
        cancel_index = None
        process_index = cls.__get_process_index(code)
        for i in range(start_index, end_index + 1):
            if i < buy_exec_index:
                continue
            if i <= process_index:
                continue
            total_datas = local_today_datas.get(code)
            if L2DataUtil.is_limit_up_price_sell(total_datas[i]["val"]):
                num = int(total_datas[i]["val"]["num"])
                cls.__incre_sell_data(code, num)
                total_num += num
                if total_num > threshold_num:
                    cancel_index = i
                    break
        if cancel_index is not None:
            process_index = cancel_index
        else:
            process_index = end_index
        # 保存处理的位置
        cls.__save_process_index(code, process_index)
        return cancel_index
if __name__ == "__main__":
    L2TradeDataProcessor.test2()
    L2TradeDataProcessor.test3()
    print("----------------------")
    # L2TradeDataProcessor.test()
l2_data_util.py
@@ -96,9 +96,18 @@
        return __time * 3600, (__time + 1) * 3600
# 获取买入时间范围
def get_buy_time_range(cancel_data):
    # 计算时间区间
    min_space, max_space = compute_time_space_as_second(cancel_data["val"]["cancelTime"],
                                                        cancel_data["val"]["cancelTimeUnit"])
    max_time = __sub_time(cancel_data["val"]["time"], min_space)
    min_time = __sub_time(cancel_data["val"]["time"], max_space)
    return min_time, max_time
# 根据买撤数据(与今日总的数据)计算买入数据
def get_buy_data_with_cancel_data(cancel_data, local_today_num_operate_map):
    # 计算时间区间
    min_space, max_space = compute_time_space_as_second(cancel_data["val"]["cancelTime"],
                                                        cancel_data["val"]["cancelTimeUnit"])
    max_time = __sub_time(cancel_data["val"]["time"], min_space)
l2_trade_factor.py
@@ -194,7 +194,7 @@
        return "zyltgb:%s, total_industry_limit_percent:%s, volumn_day60_max:%s, volumn_yest:%s, volumn_today:%s,limit_up_time:%s, big_money_num:%s" % vals
    @classmethod
    def __get_zyltgb(cls, code):
    def get_zyltgb(cls, code):
        zyltgb = global_util.zyltgb_map.get(code)
        if zyltgb is None:
            global_data_loader.load_zyltgb()
@@ -218,7 +218,7 @@
    # 获取安全笔数
    @classmethod
    def get_safe_buy_count(cls, code):
        gb = cls.__get_zyltgb(code)
        gb = cls.get_zyltgb(code)
        if not gb:
            # 默认10笔
            return 8
redis_manager.py
@@ -21,9 +21,9 @@
if __name__ == "__main__":
    _redisManager = RedisManager(1)
    _redisManager = RedisManager(0)
    redis = _redisManager.getRedis()
    keys = redis.keys("*601975*")
    keys = redis.keys("under_water_seconds-*")
    for k in keys:
        redis.delete(k)
server.py
@@ -26,6 +26,7 @@
import ths_industry_util
import ths_util
import tool
import trade_gui
import trade_manager
import l2_code_operate
from code_data_util import ZYLTGBUtil
@@ -49,7 +50,7 @@
    l2_data_error_dict = {}
    last_trade_delegate_data = None
    buy1_volumn_manager = THSBuy1VolumnManager()
    latest_buy1_volumn_dict={}
    latest_buy1_volumn_dict = {}
    buy1_price_manager = Buy1PriceManager()
    def setup(self):
@@ -186,7 +187,6 @@
                        if limit_up_time_manager.get_limit_up_time(d["code"]) is None:
                            limit_up_time_manager.save_limit_up_time(d["code"], d["time"])
                elif type == 3:
                    # 交易成功信息
                    dataList = data_process.parseList(_str)
@@ -197,6 +197,7 @@
                    trade_manager.save_trade_success_data(dataList)
                elif type == 5:
                    logger_trade_delegate.debug("接收到委托信息")
                    # 交易委托信息
                    dataList = data_process.parseList(_str)
                    if self.last_trade_delegate_data != _str:
@@ -208,6 +209,8 @@
                    except Exception as e:
                        logging.exception(e)
                    trade_manager.save_trade_delegate_data(dataList)
                    # 刷新交易界面
                    trade_gui.THSGuiTrade().refresh_data()
                elif type == 4:
                    # 行业代码信息
@@ -242,8 +245,9 @@
                elif type == 50:
                    data = data_process.parse(_str)["data"]
                    if data is not None:
                        print(data)
                        index = data["index"]
                        code_name = data["codeName"]
                        code_name = data["codeName"].replace(" ", "")
                        volumn = data["volumn"]
                        price = data["price"]
                        time_ = data["time"]
@@ -253,12 +257,12 @@
                        code = global_util.name_codes.get(code_name)
                        if code is not None:
                            # 记录日志
                            if self.latest_buy1_volumn_dict.get(code) != "{}-{}".format(volumn,price):
                            if self.latest_buy1_volumn_dict.get(code) != "{}-{}".format(volumn, price):
                                # 记录数据
                                logger_buy_1_volumn_record.info("{}-{}",code,data)
                            self.latest_buy1_volumn_dict[code] = "{}-{}".format(volumn,price)
                                logger_buy_1_volumn_record.info("{}-{}", code, data)
                            self.latest_buy1_volumn_dict[code] = "{}-{}".format(volumn, price)
                            # 保存买1价格
                            self.buy1_price_manager.save(code,price)
                            self.buy1_price_manager.save(code, price)
                            # 校正时间
                            time_ = tool.compute_buy1_real_time(time_)
                            # 保存数据
tool.py
@@ -38,10 +38,10 @@
    date = datetime.datetime.now().strftime("%Y-%m-%d")
    return date
def get_now_time_str():
    time_str = datetime.datetime.now().strftime("%H:%M:%S")
    return time_str
# 转为价格,四舍五入保留2位小数
@@ -90,6 +90,17 @@
        return True
    else:
        return False
# 是否为初始化时间
def is_init_time():
    relative_timestamp = t.time() % (24 * 60 * 60) + 8 * 60 * 60
    start1 = 60 * 60 * 9 + 30 * 60
    end1 = 60 * 60 * 15 + 1 * 60
    if start1 < relative_timestamp < end1:
        return False
    else:
        return True
def is_set_code_time():
@@ -152,6 +163,7 @@
    cha = (s - 2) % 3
    return time_seconds_format(s - 2 - cha)
if __name__ == "__main__":
    print(trade_time_sub("11:29:59", "13:00:00"))
    print(trade_time_sub("11:29:59", "14:00:00"))
trade_data_manager.py
@@ -183,6 +183,8 @@
    def __increment_down_price_time(self, code, seconds):
        key = "under_water_seconds-{}".format(code)
        self.__get_redis().incrby(key, seconds)
        # 设置个失效时间
        self.__get_redis().expire(key, tool.get_expire())
    def __get_down_price_time_as_seconds(self, code):
        key = "under_water_seconds-{}".format(code)
@@ -192,14 +194,24 @@
        else:
            return int(val)
    def __save_current_price_codes_count(self,count):
    # 清除所有的水下捞数据
    def clear_under_water_data(self):
        key = "under_water_*"
        keys = self.__get_redis().keys(key)
        for k in keys:
            self.__get_redis().delete(k)
    def __save_current_price_codes_count(self, count):
        key = "current_price_codes_count"
        self.__get_redis().setex(key,10,count)
        self.__get_redis().setex(key, 10, count)
    def __get_current_price_codes_count(self):
        key = "current_price_codes_count"
        count = self.__get_redis().get(key)
        return 0 if count is None else count
    def process_rate(self, code, rate, time_str):
        # 9点半之前的数据不处理
@@ -232,7 +244,6 @@
    def get_current_price_codes_count(self):
        return self.__get_current_price_codes_count()
    # 是否为水下捞
    def is_under_water(self, code):
trade_gui.py
@@ -771,7 +771,10 @@
        if trade_win is None:
            return None
        code_name_win = win32gui.GetDlgItem(trade_win, 0x000005C2)
        return THSGuiUtil.getText(code_name_win)
        name = THSGuiUtil.getText(code_name_win)
        if name is not None:
            name=name.replace(" ","")
        return name
    @classmethod
    def fill_codes(cls, codes):