Administrator
2023-08-07 8e163037d6334645ac0ab88f3a2f70841ce4ec55
redis批量提交数据
2个文件已修改
112 ■■■■ 已修改文件
l2/cancel_buy_strategy.py 61 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/code_price_manager.py 51 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/cancel_buy_strategy.py
@@ -41,7 +41,7 @@
        CodeDataCacheUtil.set_cache(cls.__s_big_num_cancel_compute_data_cache, code,
                                    (process_index, buy_num, cancel_num))
        key = "s_big_num_cancel_compute_data-{}".format(code)
        RedisUtils.setex( cls.__get_redis(),key, tool.get_expire(), json.dumps((process_index, buy_num, cancel_num)))
        RedisUtils.setex(cls.__get_redis(), key, tool.get_expire(), json.dumps((process_index, buy_num, cancel_num)))
    @classmethod
    def __get_compute_data(cls, code):
@@ -74,7 +74,7 @@
        for key in ks:
            keys = RedisUtils.keys(cls.__get_redis(), key)
            for k in keys:
                code = k.replace("s_big_num_cancel_compute_data-","")
                code = k.replace("s_big_num_cancel_compute_data-", "")
                cls.__clear_data(code)
    # 计算净大单
@@ -269,7 +269,7 @@
    def __save_watch_index_set(cls, code, datas, process_index, finish):
        CodeDataCacheUtil.set_cache(cls.__cancel_watch_indexs_cache, code, (list(datas), process_index, finish))
        key = f"h_cancel_watch_indexs-{code}"
        RedisUtils.setex(cls.__get_redis(),key, tool.get_expire(), json.dumps((list(datas), process_index, finish)))
        RedisUtils.setex(cls.__get_redis(), key, tool.get_expire(), json.dumps((list(datas), process_index, finish)))
    # 保存成交进度
    @classmethod
@@ -296,8 +296,8 @@
        CodeDataCacheUtil.set_cache(cls.__cancel_watch_indexs_exec_cache, code,
                                    (list(datas), process_index, total_count, big_num_count, finished))
        key = f"h_cancel_watch_indexs_exec-{code}"
        RedisUtils.setex( cls.__get_redis(),key, tool.get_expire(),
                               json.dumps((list(datas), process_index, total_count, big_num_count, finished)))
        RedisUtils.setex(cls.__get_redis(), key, tool.get_expire(),
                         json.dumps((list(datas), process_index, total_count, big_num_count, finished)))
    # 保存成交进度
    @classmethod
@@ -320,11 +320,11 @@
    # 保存已经撤单的监听位置
    @classmethod
    def __add_watch_canceled_index(cls, code, index):
    def __add_watch_canceled_index(cls, redis, code, index):
        CodeDataCacheUtil.clear_cache(cls.__cancel_watch_canceled_indexs_cache, code)
        key = f"h_cancel_watch_canceled_indexs-{code}"
        RedisUtils.sadd(cls.__get_redis(), key, index)
        RedisUtils.expire(cls.__get_redis(), key, tool.get_expire())
        RedisUtils.sadd(redis, key, index)
        RedisUtils.expire(redis, key, tool.get_expire())
    @classmethod
    def __get_watch_canceled_index(cls, code):
@@ -344,7 +344,7 @@
    def __del_watch_canceled_index(cls, code):
        CodeDataCacheUtil.clear_cache(cls.__cancel_watch_canceled_indexs_cache, code)
        key = f"h_cancel_watch_canceled_indexs-{code}"
        RedisUtils.delete( cls.__get_redis(), key)
        RedisUtils.delete(cls.__get_redis(), key)
    # 保存成交进度
    @classmethod
@@ -352,7 +352,8 @@
        CodeDataCacheUtil.set_cache(cls.__cancel_traded_progress_cache, code,
                                    (origin_process_index, latest_process_index))
        key = "h_cancel_traded_progress-{}".format(code)
        RedisUtils.setex(cls.__get_redis(),key, tool.get_expire(), json.dumps((origin_process_index, latest_process_index)))
        RedisUtils.setex(cls.__get_redis(), key, tool.get_expire(),
                         json.dumps((origin_process_index, latest_process_index)))
    @classmethod
    def __get_traded_progress(cls, code):
@@ -378,7 +379,7 @@
        CodeDataCacheUtil.set_cache(cls.__cancel_compute_data_cache, code,
                                    (process_index, cancel_num))
        key = "h_cancel_compute_data-{}".format(code)
        RedisUtils.setex(cls.__get_redis(),key, tool.get_expire(), json.dumps((process_index, cancel_num)))
        RedisUtils.setex(cls.__get_redis(), key, tool.get_expire(), json.dumps((process_index, cancel_num)))
    @classmethod
    def __get_compute_data(cls, code):
@@ -487,6 +488,7 @@
                cancel_num += total_data[nx]["re"] * num
        try:
            pipe = cls.__get_redis().pipeline()
            for i in range(start_index, end_index + 1):
                if i <= processed_index:
                    # 已经处理过了
@@ -502,7 +504,7 @@
                        has_watch_canceled = True
                        cancel_num += data["re"] * val["num"]
                        # 加入
                        cls.__add_watch_canceled_index(code, f"{buy_index}-{val['num']}")
                        cls.__add_watch_canceled_index(pipe, code, f"{buy_index}-{val['num']}")
                        rate__ = round(cancel_num / total_nums, 4)
                        if rate__ > cancel_rate_threshold:
                            indexs__ = list(watch_indexs_dict.keys())
@@ -512,7 +514,9 @@
                                                len(watch_indexs_dict.keys()))
                            l2_log.trade_record(code, "H撤", "'index':{} , 'rate':{} ,'target_rate':{}", i, rate__,
                                                cancel_rate_threshold)
                            pipe.execute()
                            return True, data
            pipe.execute()
            rate__ = round(cancel_num / total_nums, 4)
            if rate__ > cancel_rate_threshold:
@@ -755,7 +759,7 @@
    @classmethod
    def __set_real_order_index(cls, code, index):
        CodeDataCacheUtil.set_cache(cls.__cancel_real_order_index_cache, code, index)
        RedisUtils.setex(cls.__get_redis(),f"d_cancel_real_order_index-{code}", tool.get_expire(), f"{index}")
        RedisUtils.setex(cls.__get_redis(), f"d_cancel_real_order_index-{code}", tool.get_expire(), f"{index}")
    @classmethod
    def __get_real_order_index(cls, code):
@@ -776,7 +780,7 @@
    @classmethod
    def clear(cls, code=None):
        if code:
            RedisUtils.delete( cls.__get_redis(), f"d_cancel_real_order_index-{code}")
            RedisUtils.delete(cls.__get_redis(), f"d_cancel_real_order_index-{code}")
        else:
            keys = RedisUtils.keys(cls.__get_redis(), "d_cancel_real_order_index-*")
            if keys:
@@ -847,15 +851,15 @@
        return cls.__redis_manager.getRedis()
    @classmethod
    def __add_watch_index(cls, code, index):
    def __add_watch_index(cls, redis, code, index):
        CodeDataCacheUtil.clear_cache(cls.__cancel_watch_index_cache, code)
        RedisUtils.sadd(cls.__get_redis(), f"l_cancel_watch_index-{code}", index)
        RedisUtils.expire(cls.__get_redis(), f"l_cancel_watch_index-{code}", tool.get_expire())
        RedisUtils.sadd(redis, f"l_cancel_watch_index-{code}", index)
        RedisUtils.expire(redis, f"l_cancel_watch_index-{code}", tool.get_expire())
    @classmethod
    def __del_watch_index(cls, code, index):
    def __del_watch_index(cls, pipe, code, index):
        CodeDataCacheUtil.clear_cache(cls.__cancel_watch_index_cache, code)
        RedisUtils.srem(cls.__get_redis(), f"l_cancel_watch_index-{code}", index)
        RedisUtils.srem(pipe, f"l_cancel_watch_index-{code}", index)
    @classmethod
    def __get_watch_indexes(cls, code):
@@ -921,10 +925,13 @@
        # 数据维护
        add_indexes = watch_indexes - old_watch_indexes
        delete_indexes = old_watch_indexes - watch_indexes
        pipe = cls.__get_redis().pipeline()
        for i in add_indexes:
            cls.__add_watch_index(code, i)
            cls.__add_watch_index(pipe, code, i)
        for i in delete_indexes:
            cls.__del_watch_index(code, i)
            cls.__del_watch_index(pipe, code, i)
        if add_indexes or delete_indexes:
            pipe.execute()
    @classmethod
    def need_cancel(cls, code, buy_exec_index, start_index, end_index, total_data, local_today_num_operate_map,
@@ -1003,7 +1010,7 @@
        key = "l2_limit_up_second_money-{}-{}".format(code, time.replace(":", ""))
        RedisUtils.setex(cls.__get_redis(),key, tool.get_expire(), json.dumps((old_num, old_from, old_to)))
        RedisUtils.setex(cls.__get_redis(), key, tool.get_expire(), json.dumps((old_num, old_from, old_to)))
    @classmethod
    def __get_l2_second_money_record(cls, code, time):
@@ -1028,7 +1035,7 @@
    @classmethod
    def __set_l2_latest_money_record(cls, code, index, num):
        key = "l2_limit_up_money-{}".format(code)
        RedisUtils.setex(cls.__get_redis(),key, tool.get_expire(), json.dumps((num, index)))
        RedisUtils.setex(cls.__get_redis(), key, tool.get_expire(), json.dumps((num, index)))
    # 返回数量,索引
    @classmethod
@@ -1339,7 +1346,7 @@
    @classmethod
    def __incre_sell_data(cls, code, num):
        key = "limit_up_sell_num-{}".format(code)
        RedisUtils.incrby( cls.__get_redis(),key, num)
        RedisUtils.incrby(cls.__get_redis(), key, num)
    @classmethod
    def __get_sell_data(cls, code):
@@ -1352,7 +1359,7 @@
    @classmethod
    def __save_process_index(cls, code, index):
        key = "limit_up_sell_index-{}".format(code)
        RedisUtils.setex( cls.__get_redis(),key, tool.get_expire(), index)
        RedisUtils.setex(cls.__get_redis(), key, tool.get_expire(), index)
    @classmethod
    def __get_process_index(cls, code):
@@ -1366,9 +1373,9 @@
    @classmethod
    def delete(cls, code):
        key = "limit_up_sell_num-{}".format(code)
        RedisUtils.delete( cls.__get_redis(), key)
        RedisUtils.delete(cls.__get_redis(), key)
        key = "limit_up_sell_index-{}".format(code)
        RedisUtils.delete( cls.__get_redis(), key)
        RedisUtils.delete(cls.__get_redis(), key)
    @classmethod
    def clear(cls):
l2/code_price_manager.py
@@ -51,6 +51,20 @@
        cls.__current_buy_1_price[code] = buy_1_price
        RedisUtils.setex(cls.__get_redis(), f"buy1_price-{code}", tool.get_expire(), buy_1_price)
    # datas:[(code, buy_1_price)]
    @classmethod
    def __save_buy1_prices(cls, datas):
        pipe = cls.__get_redis().pipeline()
        for d in datas:
            code = d[0]
            buy_1_price = d[1]
            # 不保存重复的数据
            if code in cls.__current_buy_1_price and cls.__current_buy_1_price[code] == buy_1_price:
                continue
            cls.__current_buy_1_price[code] = buy_1_price
            RedisUtils.setex(pipe, f"buy1_price-{code}", tool.get_expire(), buy_1_price)
        pipe.execute()
    @classmethod
    def __get_buy1_price(cls, code):
        return RedisUtils.get(cls.__get_redis(), f"buy1_price-{code}")
@@ -113,6 +127,43 @@
            # 之前涨停过且现在尚未涨停
            cls.set_open_limit_up_lowest_price(code, buy_1_price)
    # datas:[ (code, buy_1_price, time_str, limit_up_price, sell_1_price, sell_1_volumn)  ]
    @classmethod
    def processes(cls, datas):
        temp_buy1_prices = []
        for d in datas:
            code, buy_1_price, time_str, limit_up_price, sell_1_price, sell_1_volumn = d
            data_str = f"{buy_1_price},{time_str},{limit_up_price},{sell_1_price},{sell_1_volumn}"
            if cls.__latest_data.get(code) == data_str:
                continue
            cls.__latest_data[code] = data_str
            # 保存买1价格
            temp_buy1_prices.append((code, buy_1_price))
            # 记录日志
            logger_trade_queue_price_info.info(
                f"code={code} data: time_str-{time_str}, buy_1_price-{buy_1_price},limit_up_price-{limit_up_price},sell_1_price-{sell_1_price},sell_1_volumn-{sell_1_volumn}")
            # 买1价格不能小于1块
            if float(buy_1_price) < 1.0:
                continue
            is_limit_up = abs(float(limit_up_price) - float(buy_1_price)) < 0.01
            old_limit_up_time, old_open_limit_up_time = cls.__get_buy1_price_info_cache(code)
            if old_limit_up_time and old_open_limit_up_time:
                continue
            if is_limit_up and old_limit_up_time is None and float(sell_1_price) < 0.1 and int(sell_1_volumn) <= 0:
                # 卖1消失,买1为涨停价则表示涨停
                cls.__save_buy1_price_info(code, time_str, None)
            elif old_limit_up_time and not is_limit_up and old_open_limit_up_time is None:
                # 有涨停时间,当前没有涨停,之前没有打开涨停
                cls.__save_buy1_price_info(code, old_limit_up_time, time_str)
            if old_limit_up_time and not is_limit_up:
                # 之前涨停过且现在尚未涨停
                cls.set_open_limit_up_lowest_price(code, buy_1_price)
        if temp_buy1_prices:
            cls.__save_buy1_prices(temp_buy1_prices)
    # 是否可以下单
    @classmethod
    def is_can_buy(cls, code):