# 买1量管理 import decimal import json import gpcode_manager import redis_manager import tool class THSBuy1VolumnManager: __redisManager = redis_manager.RedisManager(1) __last_data = {} __code_time_volumn_dict = {} def __get_redis(self): return self.__redisManager.getRedis() def __save_recod(self, code, time_str, volumn): # 保存每一次的 key = "buy1_volumn-{}-{}".format(code, time_str) self.__get_redis().setex(key, tool.get_expire(), volumn) # 保存最近的 key = "buy1_volumn_latest_info-{}".format(code) self.__get_redis().setex(key, tool.get_expire(), json.dumps((time_str, volumn))) # 保存上一次数据 def __save_last_recod(self, code, time_str, volumn): # 保存最近的 key = "buy1_volumn_last_info-{}".format(code) self.__get_redis().setex(key, tool.get_expire(), json.dumps((time_str, volumn))) def __get_last_record(self, code): key = "buy1_volumn_last_info-{}".format(code) val = self.__get_redis().get(key) if val is None: return None, None val = json.loads(val) return val[0], val[1] def __get_latest_record(self, code): key = "buy1_volumn_latest_info-{}".format(code) val = self.__get_redis().get(key) if val is None: return None, None val = json.loads(val) return val[0], val[1] # 返回是否需要更新数据 def save(self, code, time_str, volumn): if volumn < 1: return False # 不保存和上一次相同的数据 if code in self.__last_data and self.__last_data[code] == volumn: return False self.__last_data[code] = volumn if code not in self.__code_time_volumn_dict: self.__code_time_volumn_dict[code] = {} self.__code_time_volumn_dict[code][time_str] = volumn # 删除倒数第2个之前的数据 keys = [] for k in self.__code_time_volumn_dict[code].keys(): keys.append(k) keys.sort(key=lambda val: int(val.replace(":", ""))) if len(keys) > 2: for i in range(0, len(keys) - 2): self.__code_time_volumn_dict[code].pop(keys[i]) keys = keys[len(keys) - 2:] if len(keys) == 2: self.__save_last_recod(code, keys[0], self.__code_time_volumn_dict[code][keys[0]]) self.__save_recod(code, time_str, volumn) return True # 获取校验数据 # 返回上一次的数据,如果没有上一次的就返回本次的 def get_verify_data(self, code): time_str, volumn = self.__get_last_record(code) if time_str is not None: return time_str, volumn time_str, volumn = self.__get_latest_record(code) return time_str, volumn class JueJinBuy1VolumnManager: __redisManager = redis_manager.RedisManager(1) __last_data = {} def __get_redis(self): return self.__redisManager.getRedis() def __save_recod(self, code, time_str, volumn): # 保存每一次的 key = "buy1_volumn_juejin-{}-{}".format(code, time_str) self.__get_redis().setex(key, tool.get_expire(), volumn) key = "buy1_volumn_juejin_latest_info-{}".format(code) self.__get_redis().setex(key, tool.get_expire(), volumn) def __get_latest_record(self, code): key = "buy1_volumn_juejin_latest_info-{}".format(code) val = self.__get_redis().get(key) if val is None: return None, None val = json.loads(val) return val[0], val[1] # 返回是否需要更新数据 def save(self, code, time_str, volumn, price): # 判断是否为涨停价 limit_up_price = gpcode_manager.get_limit_up_price(code) if limit_up_price != tool.to_price(decimal.Decimal(price)): # 非涨停价 return False if volumn < 1: return False # 不保存和上一次相同的数据 if code in self.__last_data and self.__last_data[code] == volumn: return False self.__last_data[code] = volumn self.__save_recod(code, time_str, volumn) return True # 获取校验数据 # 返回上一次的数据,如果没有上一次的就返回本次的 def get_verify_data(self, code): time_str, volumn = self.__get_latest_record(code) return time_str, volumn if __name__ == '__main__': JueJinBuy1VolumnManager().save("001203", "15:00:00", 40586553, 12.12)