Administrator
2023-08-07 20a70116b236a49d68659b451fea0a9f645a0835
redis批量提交数据
3个文件已修改
64 ■■■■■ 已修改文件
code_attribute/gpcode_manager.py 12 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/current_price_process_manager.py 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/trade_data_manager.py 43 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
code_attribute/gpcode_manager.py
@@ -576,6 +576,18 @@
    RedisUtils.setex(__redisManager.getRedis(), "price-{}".format(code), tool.get_expire(), price)
# datas:[(code,price)]
def set_prices(datas):
    pipe =  __redisManager.getRedis().pipeline()
    for d in datas:
        code, price = d[0], d[1]
        if code in __current_price_cache and __current_price_cache[code] == price:
            continue
        __current_price_cache[code] = price
        RedisUtils.setex(pipe, "price-{}".format(code), tool.get_expire(), price)
    pipe.execute()
# 获取正在监听的代码
def get_listen_codes():
    redis_instance = __redisManager.getRedis()
trade/current_price_process_manager.py
@@ -39,9 +39,11 @@
    if True:
        _code_list = []
        _delete_list = []
        prices = []
        rates = []
        for d in prices:
            code, price = d["code"], float(d["price"])
            gpcode_manager.set_price(code, price)
            prices.append((code, price))
            # 获取收盘价
            pricePre = gpcode_manager.CodePrePriceManager.get_price_pre_cache(code)
            if pricePre is not None:
@@ -55,7 +57,7 @@
                    # 暂存涨幅为负的代码
                    _delete_list.append((rate, code, 0))
                try:
                    __actualPriceProcessor.process_rate(code, rate, now_str)
                    rates.append((code, rate))
                except Exception as e:
                    logging.exception(e)
@@ -66,7 +68,8 @@
                                                                  decimal.Decimal(d["price"])))
                except Exception as e:
                    logging.exception(e)
        gpcode_manager.set_prices(prices)
        __actualPriceProcessor.process_rates(rates, now_str)
        # -------------------------------处理交易位置分配---------------------------------
        # 排序
        new_code_list = sorted(_code_list, key=lambda e: (e.__getitem__(2), e.__getitem__(0)), reverse=True)
trade/trade_data_manager.py
@@ -67,7 +67,7 @@
    # 获取买入点信息
    @classmethod
    def get_buy_position_info(cls, code):
        val_str = RedisUtils.get( cls.redisManager.getRedis(), "buy_position_info-{}".format(code))
        val_str = RedisUtils.get(cls.redisManager.getRedis(), "buy_position_info-{}".format(code))
        if val_str is None:
            return None, None, None, None
        else:
@@ -77,7 +77,7 @@
    # 删除买入点信息
    @classmethod
    def remove_buy_position_info(cls, code):
        RedisUtils.delete( cls.redisManager.getRedis(), "buy_position_info-{}".format(code))
        RedisUtils.delete(cls.redisManager.getRedis(), "buy_position_info-{}".format(code))
    # 设置买入确认点信息
    @classmethod
@@ -236,6 +236,19 @@
        key = "code_current_rate-{}".format(code)
        RedisUtils.setex(self.__get_redis(), key, tool.get_expire(), rate)
    # 批量保存
    def __save_current_rates(self, datas):
        # 变化之后才会持久化
        pipe = self.__get_redis().pipeline()
        for d in datas:
            if self.__code_current_rate_latest.get(d[0]) == d[1]:
                continue
            self.__code_current_rate_latest[d[0]] = d[1]
            tool.CodeDataCacheUtil.set_cache(self.__code_current_rate_cache, d[0], d[1])
            key = "code_current_rate-{}".format(d[0])
            RedisUtils.setex(pipe, key, tool.get_expire(), d[1])
        pipe.execute()
    # 获取当前涨幅
    def __get_current_rate(self, code):
        key = "code_current_rate-{}".format(code)
@@ -274,6 +287,32 @@
            if self.__get_last_down_price_time_cache(code) is None:
                self.__save_down_price_time(code, time_str)
    # datas:[(代码,比例)]
    def process_rates(self, datas, time_str):
        # 9点半之前的数据不处理
        if int(time_str.replace(":", "")) < int("093000"):
            return
        # 保存目前的代码涨幅
        self.__save_current_rates(datas)
        # now_str = tool.get_now_time_str()
        for d in datas:
            code, rate = d[0], d[1]
            if rate >= 0:
                down_start_time = self.__get_last_down_price_time_cache(code)
                if down_start_time is None:
                    continue
                else:
                    # 累计增加时间
                    time_second = tool.trade_time_sub(time_str, down_start_time)
                    self.__increment_down_price_time(code, time_second)
                    # 删除起始时间
                    self.__remove_down_price_time(code)
            else:
                # 记录开始值
                if self.__get_last_down_price_time_cache(code) is None:
                    self.__save_down_price_time(code, time_str)
    # 保存现价
    def save_current_price(self, code, price, is_limit_up):
        global_util.cuurent_prices[code] = (price, is_limit_up, round(time.time()))