# 买1量管理 import decimal import json from code_attribute import gpcode_manager from db import redis_manager_delegate as redis_manager from db.redis_manager_delegate import RedisUtils from utils import tool from trade import trade_manager, trade_constant class THSBuy1VolumnManager: __db = 1 __redisManager = redis_manager.RedisManager(1) __last_data = {} __code_time_volumn_dict = {} __max_buy1_volumn_cache = {} __instance = None def __new__(cls, *args, **kwargs): if not cls.__instance: cls.__instance = super(THSBuy1VolumnManager, cls).__new__(cls, *args, **kwargs) cls.__load_datas() return cls.__instance @classmethod def __get_redis(cls): return cls.__redisManager.getRedis() @classmethod def __load_datas(cls): __redis = cls.__get_redis() try: keys = RedisUtils.keys(__redis, "max_buy1_volumn-*") for k in keys: code = k.split("-")[-1] val = RedisUtils.get(__redis, k) tool.CodeDataCacheUtil.set_cache(cls.__max_buy1_volumn_cache, code, int(val)) finally: RedisUtils.realse(__redis) # 保存最大量 def __save_max_buy1_volume(self, code, volume): tool.CodeDataCacheUtil.set_cache(self.__max_buy1_volumn_cache, code, volume) key = "max_buy1_volumn-{}".format(code) RedisUtils.setex_async(self.__db, key, tool.get_expire(), volume) def __get_max_buy1_volume(self, code): key = "max_buy1_volumn-{}".format(code) val = RedisUtils.get(self.__get_redis(), key) if val is not None: return int(val) return None def __del_max_buy1_volume(self, code): tool.CodeDataCacheUtil.clear_cache(self.__max_buy1_volumn_cache, code) key = "max_buy1_volumn-{}".format(code) RedisUtils.delete_async(self.__db, key) def __save_recod(self, code, time_str, volumn): # 保存每一次的 key = "buy1_volumn-{}-{}".format(code, time_str) RedisUtils.setex(self.__get_redis(), key, tool.get_expire(), volumn) # 保存最近的 key = "buy1_volumn_latest_info-{}".format(code) RedisUtils.setex(self.__get_redis(), key, tool.get_expire(), json.dumps((time_str, volumn))) # 保存上一次数据 def __save_last_recod(self, code, time_str, volumn): # 保存最近的 key = "buy1_volumn_last_info-{}".format(code) RedisUtils.setex(self.__get_redis(), key, tool.get_expire(), json.dumps((time_str, volumn))) def __get_last_record(self, code): key = "buy1_volumn_last_info-{}".format(code) val = RedisUtils.get(self.__get_redis(), key) if val is None: return None, None val = json.loads(val) return val[0], val[1] def __get_latest_record(self, code): key = "buy1_volumn_latest_info-{}".format(code) val = RedisUtils.get(self.__get_redis(), key) if val is None: return None, None val = json.loads(val) return val[0], val[1] # 添加记录 def __add_recod(self, code): key = "buy1_volumn_codes" RedisUtils.sadd(self.__get_redis(), key, code) RedisUtils.expire(self.__get_redis(), key, 10) # 获取当前正在监听的代码 def get_current_codes(self): key = "buy1_volumn_codes" return RedisUtils.smembers(self.__get_redis(), key) def get_buy_1_volumn(self, code, time_str): key = "buy1_volumn-{}-{}".format(code, time_str) return RedisUtils.get(self.__get_redis(), key) # 返回是否需要更新数据,是否需要撤单,撤单原因 def save(self, code, time_str, volumn, price): # 客户端数据未加载出来过滤 if volumn < 1: return False, False, None # 14:55:00之后不在处理 if int(time_str.replace(':', '')) >= int("145500"): return False, False, None self.__add_recod(code) # 判断是否为涨停价 limit_up_price = gpcode_manager.get_limit_up_price(code) if limit_up_price != tool.to_price(decimal.Decimal(price)): # 非涨停价 volumn = 0 last_volumn = self.__last_data.get(code) # 不保存和上一次相同的数据 if code in self.__last_data and last_volumn == volumn: return False, False, None self.__last_data[code] = volumn if int(time_str.replace(':', '')) >= int("093000"): # 保存最大量(9:30过后的量) max_volume = self.__get_max_buy1_volume(code) if max_volume is None: max_volume = 0 if volumn > max_volume: self.__save_max_buy1_volume(code, volumn) if code not in self.__code_time_volumn_dict: self.__code_time_volumn_dict[code] = {} self.__code_time_volumn_dict[code][time_str] = volumn # 删除倒数第2个之前的数据 keys = [] for k in self.__code_time_volumn_dict[code].keys(): keys.append(k) keys.sort(key=lambda val: int(val.replace(":", ""))) if len(keys) > 2: for i in range(0, len(keys) - 2): self.__code_time_volumn_dict[code].pop(keys[i]) keys = keys[len(keys) - 2:] if len(keys) == 2: self.__save_last_recod(code, keys[0], self.__code_time_volumn_dict[code][keys[0]]) self.__save_recod(code, time_str, volumn) # 如果当前已挂单 state = trade_manager.CodesTradeStateManager().get_trade_state_cache(code) if state == trade_constant.TRADE_STATE_BUY_DELEGATED or state == trade_constant.TRADE_STATE_BUY_PLACE_ORDER: # 判断本次与上一次的封单额是否小于5000w limit_up_price = gpcode_manager.get_limit_up_price(code) threshold_num = 50000000 // (limit_up_price * 100) if volumn < threshold_num and last_volumn < threshold_num: # 下降趋势 if volumn < last_volumn: if (last_volumn - volumn) / last_volumn > 0.5: return True, True, "买1主动触发,连续两次封单量降幅达50%以上,时间:{} 封单量:{}-{}".format(time_str, last_volumn, volumn) # 当封单额小于1000万需要撤单 min_num = 10000000 // (limit_up_price * 100) if volumn < min_num: return True, True, "买1主动触发,最新封单额小于1000万,时间:{} 封单量:{}".format(time_str, volumn) return True, False, None # 获取校验数据 # 返回上一次的数据,如果没有上一次的就返回本次的 def get_verify_data(self, code): time_str, volumn = self.__get_last_record(code) if time_str is not None: return time_str, volumn time_str, volumn = self.__get_latest_record(code) return time_str, volumn def get_max_buy1_volume(self, code): val = self.__get_max_buy1_volume(code) if val is None: return -1 return val def get_max_buy1_volume_cache(self, code): cache_result = tool.CodeDataCacheUtil.get_cache(self.__max_buy1_volumn_cache, code) if cache_result[0]: return cache_result[1] return -1 def clear_max_buy1_volume(self, code): self.__del_max_buy1_volume(code) class JueJinBuy1VolumnManager: __redisManager = redis_manager.RedisManager(1) __last_data = {} def __get_redis(self): return self.__redisManager.getRedis() def __save_recod(self, code, time_str, volumn): # 保存每一次的 key = "buy1_volumn_juejin-{}-{}".format(code, time_str) RedisUtils.setex(self.__get_redis(), key, tool.get_expire(), volumn) key = "buy1_volumn_juejin_latest_info-{}".format(code) RedisUtils.setex(self.__get_redis(), key, tool.get_expire(), volumn) def __get_latest_record(self, code): key = "buy1_volumn_juejin_latest_info-{}".format(code) val = RedisUtils.get(self.__get_redis(), key) if val is None: return None, None val = json.loads(val) return val[0], val[1] # 返回是否需要更新数据 def save(self, code, time_str, volumn, price): # 判断是否为涨停价 limit_up_price = gpcode_manager.get_limit_up_price(code) if limit_up_price != tool.to_price(decimal.Decimal(price)): # 非涨停价 volumn = 0 # 不保存和上一次相同的数据 if code in self.__last_data and self.__last_data[code] == volumn: return False self.__last_data[code] = volumn self.__save_recod(code, time_str, volumn) return True # 获取校验数据 # 返回上一次的数据,如果没有上一次的就返回本次的 def get_verify_data(self, code): time_str, volumn = self.__get_latest_record(code) return time_str, volumn class thsl2tradequeuemanager: __redisManager = redis_manager.RedisManager(0) __filter_dict = {} def __get_redis(self): return self.__redisManager.getRedis() def __save_latest_recod(self, code, info): # 保存每一次的 key = "ths_l2_latest_trade_info-{}".format(code) RedisUtils.setex(self.__get_redis(), key, tool.get_expire(), json.dumps(info)) def __get_latest_record(self, code): key = "ths_l2_latest_trade_info-{}".format(code) val = RedisUtils.get(self.__get_redis(), key) if val is None: return None return json.loads(val) # 添加记录 def __add_buy1_code(self, code): key = "buy1_volumn_codes" RedisUtils.sadd(self.__get_redis(), key, code) RedisUtils.expire(self.__get_redis(), key, 10) # 获取当前正在监听的代码 def get_current_codes(self): key = "buy1_volumn_codes" return RedisUtils.smembers(self.__get_redis(), key) def save_recod(self, code, data): _str = json.dumps(data) if code in self.__filter_dict and self.__filter_dict[code] == _str: return False # 添加买1记录 self.__add_buy1_code(code) self.__filter_dict[code] = _str self.__save_latest_recod(code, data) buy_time = data["buyTime"] buy_one_price = data["buyOnePrice"] buy_one_volumn = data["buyOneVolumn"] sell_time = data["sellTime"] sell_one_price = data["sellOnePrice"] sell_one_volumn = data["sellOneVolumn"] return True def get_sell1_info(self, code): data = self.__get_latest_record(code) if data is None: return None, None, None else: sell_time = data["sellTime"] sell_one_price = data["sellOnePrice"] sell_one_volumn = data["sellOneVolumn"] return sell_time, sell_one_price, int(sell_one_volumn) if __name__ == '__main__': thsl2tradequeuemanager().test()