""" 代码价格管理 """ import json import tool from db import redis_manager class Buy1PriceManager: __redisManager = redis_manager.RedisManager(1) @classmethod def __get_redis(cls): return cls.__redisManager.getRedis() # 保存买1价格信息 @classmethod def __save_buy1_price_info(cls, code, limit_up_time, open_limit_up_time): cls.__get_redis().setex(f"buy1_price_limit_up_info-{code}", tool.get_expire(), json.dumps((limit_up_time, open_limit_up_time))) @classmethod def __get_buy1_price_info(cls, code): data = cls.__get_redis().get(f"buy1_price_limit_up_info-{code}") if not data: return None, None data = json.loads(data) return data[0], data[1] # 处理 @classmethod def process(cls, code, buy_1_price, time_str, limit_up_price): # 买1价格不能小于1块 if float(buy_1_price) < 1.0: return is_limit_up = abs(float(limit_up_price) - float(buy_1_price)) < 0.01 old_limit_up_time, old_open_limit_up_time = cls.__get_buy1_price_info(code) if old_limit_up_time and old_open_limit_up_time: return if is_limit_up and old_limit_up_time is None: cls.__save_buy1_price_info(code, time_str, None) elif old_limit_up_time and not is_limit_up and old_open_limit_up_time is None: # 有涨停时间,当前没有涨停,之前没有打开涨停 cls.__save_buy1_price_info(code, old_limit_up_time, time_str) # 是否可以下单 @classmethod def is_can_buy(cls, code): old_limit_up_time, old_open_limit_up_time = cls.__get_buy1_price_info(code) if old_limit_up_time and old_open_limit_up_time: return True return False if __name__ == "__main__": code = "000333" limit_up_price = "54.00" Buy1PriceManager.process("000333", "53.00", "09:56:00", limit_up_price) print(Buy1PriceManager.is_can_buy(code)) Buy1PriceManager.process("000333", "54.00", "09:57:00", limit_up_price) print(Buy1PriceManager.is_can_buy(code)) Buy1PriceManager.process("000333", "53.00", "09:58:00", limit_up_price) print(Buy1PriceManager.is_can_buy(code)) Buy1PriceManager.process("000333", "54.00", "09:59:00", limit_up_price) print(Buy1PriceManager.is_can_buy(code))