Administrator
2023-08-07 8e163037d6334645ac0ab88f3a2f70841ce4ec55
redis批量提交数据
2个文件已修改
82 ■■■■ 已修改文件
l2/cancel_buy_strategy.py 31 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/code_price_manager.py 51 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/cancel_buy_strategy.py
@@ -320,11 +320,11 @@
    # 保存已经撤单的监听位置
    @classmethod
    def __add_watch_canceled_index(cls, code, index):
    def __add_watch_canceled_index(cls, redis, code, index):
        CodeDataCacheUtil.clear_cache(cls.__cancel_watch_canceled_indexs_cache, code)
        key = f"h_cancel_watch_canceled_indexs-{code}"
        RedisUtils.sadd(cls.__get_redis(), key, index)
        RedisUtils.expire(cls.__get_redis(), key, tool.get_expire())
        RedisUtils.sadd(redis, key, index)
        RedisUtils.expire(redis, key, tool.get_expire())
    @classmethod
    def __get_watch_canceled_index(cls, code):
@@ -352,7 +352,8 @@
        CodeDataCacheUtil.set_cache(cls.__cancel_traded_progress_cache, code,
                                    (origin_process_index, latest_process_index))
        key = "h_cancel_traded_progress-{}".format(code)
        RedisUtils.setex(cls.__get_redis(),key, tool.get_expire(), json.dumps((origin_process_index, latest_process_index)))
        RedisUtils.setex(cls.__get_redis(), key, tool.get_expire(),
                         json.dumps((origin_process_index, latest_process_index)))
    @classmethod
    def __get_traded_progress(cls, code):
@@ -487,6 +488,7 @@
                cancel_num += total_data[nx]["re"] * num
        try:
            pipe = cls.__get_redis().pipeline()
            for i in range(start_index, end_index + 1):
                if i <= processed_index:
                    # 已经处理过了
@@ -502,7 +504,7 @@
                        has_watch_canceled = True
                        cancel_num += data["re"] * val["num"]
                        # 加入
                        cls.__add_watch_canceled_index(code, f"{buy_index}-{val['num']}")
                        cls.__add_watch_canceled_index(pipe, code, f"{buy_index}-{val['num']}")
                        rate__ = round(cancel_num / total_nums, 4)
                        if rate__ > cancel_rate_threshold:
                            indexs__ = list(watch_indexs_dict.keys())
@@ -512,7 +514,9 @@
                                                len(watch_indexs_dict.keys()))
                            l2_log.trade_record(code, "H撤", "'index':{} , 'rate':{} ,'target_rate':{}", i, rate__,
                                                cancel_rate_threshold)
                            pipe.execute()
                            return True, data
            pipe.execute()
            rate__ = round(cancel_num / total_nums, 4)
            if rate__ > cancel_rate_threshold:
@@ -847,15 +851,15 @@
        return cls.__redis_manager.getRedis()
    @classmethod
    def __add_watch_index(cls, code, index):
    def __add_watch_index(cls, redis, code, index):
        CodeDataCacheUtil.clear_cache(cls.__cancel_watch_index_cache, code)
        RedisUtils.sadd(cls.__get_redis(), f"l_cancel_watch_index-{code}", index)
        RedisUtils.expire(cls.__get_redis(), f"l_cancel_watch_index-{code}", tool.get_expire())
        RedisUtils.sadd(redis, f"l_cancel_watch_index-{code}", index)
        RedisUtils.expire(redis, f"l_cancel_watch_index-{code}", tool.get_expire())
    @classmethod
    def __del_watch_index(cls, code, index):
    def __del_watch_index(cls, pipe, code, index):
        CodeDataCacheUtil.clear_cache(cls.__cancel_watch_index_cache, code)
        RedisUtils.srem(cls.__get_redis(), f"l_cancel_watch_index-{code}", index)
        RedisUtils.srem(pipe, f"l_cancel_watch_index-{code}", index)
    @classmethod
    def __get_watch_indexes(cls, code):
@@ -921,10 +925,13 @@
        # 数据维护
        add_indexes = watch_indexes - old_watch_indexes
        delete_indexes = old_watch_indexes - watch_indexes
        pipe = cls.__get_redis().pipeline()
        for i in add_indexes:
            cls.__add_watch_index(code, i)
            cls.__add_watch_index(pipe, code, i)
        for i in delete_indexes:
            cls.__del_watch_index(code, i)
            cls.__del_watch_index(pipe, code, i)
        if add_indexes or delete_indexes:
            pipe.execute()
    @classmethod
    def need_cancel(cls, code, buy_exec_index, start_index, end_index, total_data, local_today_num_operate_map,
l2/code_price_manager.py
@@ -51,6 +51,20 @@
        cls.__current_buy_1_price[code] = buy_1_price
        RedisUtils.setex(cls.__get_redis(), f"buy1_price-{code}", tool.get_expire(), buy_1_price)
    # datas:[(code, buy_1_price)]
    @classmethod
    def __save_buy1_prices(cls, datas):
        pipe = cls.__get_redis().pipeline()
        for d in datas:
            code = d[0]
            buy_1_price = d[1]
            # 不保存重复的数据
            if code in cls.__current_buy_1_price and cls.__current_buy_1_price[code] == buy_1_price:
                continue
            cls.__current_buy_1_price[code] = buy_1_price
            RedisUtils.setex(pipe, f"buy1_price-{code}", tool.get_expire(), buy_1_price)
        pipe.execute()
    @classmethod
    def __get_buy1_price(cls, code):
        return RedisUtils.get(cls.__get_redis(), f"buy1_price-{code}")
@@ -113,6 +127,43 @@
            # 之前涨停过且现在尚未涨停
            cls.set_open_limit_up_lowest_price(code, buy_1_price)
    # datas:[ (code, buy_1_price, time_str, limit_up_price, sell_1_price, sell_1_volumn)  ]
    @classmethod
    def processes(cls, datas):
        temp_buy1_prices = []
        for d in datas:
            code, buy_1_price, time_str, limit_up_price, sell_1_price, sell_1_volumn = d
            data_str = f"{buy_1_price},{time_str},{limit_up_price},{sell_1_price},{sell_1_volumn}"
            if cls.__latest_data.get(code) == data_str:
                continue
            cls.__latest_data[code] = data_str
            # 保存买1价格
            temp_buy1_prices.append((code, buy_1_price))
            # 记录日志
            logger_trade_queue_price_info.info(
                f"code={code} data: time_str-{time_str}, buy_1_price-{buy_1_price},limit_up_price-{limit_up_price},sell_1_price-{sell_1_price},sell_1_volumn-{sell_1_volumn}")
            # 买1价格不能小于1块
            if float(buy_1_price) < 1.0:
                continue
            is_limit_up = abs(float(limit_up_price) - float(buy_1_price)) < 0.01
            old_limit_up_time, old_open_limit_up_time = cls.__get_buy1_price_info_cache(code)
            if old_limit_up_time and old_open_limit_up_time:
                continue
            if is_limit_up and old_limit_up_time is None and float(sell_1_price) < 0.1 and int(sell_1_volumn) <= 0:
                # 卖1消失,买1为涨停价则表示涨停
                cls.__save_buy1_price_info(code, time_str, None)
            elif old_limit_up_time and not is_limit_up and old_open_limit_up_time is None:
                # 有涨停时间,当前没有涨停,之前没有打开涨停
                cls.__save_buy1_price_info(code, old_limit_up_time, time_str)
            if old_limit_up_time and not is_limit_up:
                # 之前涨停过且现在尚未涨停
                cls.set_open_limit_up_lowest_price(code, buy_1_price)
        if temp_buy1_prices:
            cls.__save_buy1_prices(temp_buy1_prices)
    # 是否可以下单
    @classmethod
    def is_can_buy(cls, code):