""" 股票代码管理器 """ import copy import json import time from db import redis_manager_delegate as redis_manager from db.mysql_data_delegate import Mysqldb from db.redis_manager_delegate import RedisUtils from log_module import log_export from log_module.log import logger_pre_close_price, logger_debug from utils import tool import decimal __redisManager = redis_manager.RedisManager(0) __db = 0 class CodesNameManager: __mysqldb = Mysqldb() __code_name_dict = {} __instance = None def __new__(cls, *args, **kwargs): if not cls.__instance: cls.__instance = super(CodesNameManager, cls).__new__(cls, *args, **kwargs) cls.__load_data() return cls.__instance @classmethod def __load_data(cls): cls.__code_name_dict = cls.list_code_name_dict() @classmethod def list_code_name_dict(cls): # 获取所有的代码名称对应表 dict_ = {} fresults = cls.__mysqldb.select_all("select code, code_name from code_name") for result in fresults: dict_[result[0]] = result[1] return dict_ @classmethod def get_code_name(cls, code): if code in cls.__code_name_dict: return cls.__code_name_dict[code] fresults = cls.__mysqldb.select_one(f"select code_name from code_name where code = '{code}'") if fresults: cls.__code_name_dict[code] = fresults[0] return fresults[0] return None @classmethod def add_code_name(cls, code, name): cls.__code_name_dict[code] = name fresults = cls.__mysqldb.select_one(f"select code_name from code_name where code = '{code}'") if fresults: cls.__mysqldb.execute(f"update code_name set code_name ='{name}', update_time = now() where code= '{code}'") else: cls.__mysqldb.execute(f"insert into code_name(code,code_name,update_time ) values('{code}','{name}',now())") @classmethod def add_code_names(cls, code_name_dict): """ 批量添加代码名称 @param code_name_dict: @return: """ for code in code_name_dict: if code in cls.__code_name_dict and cls.__code_name_dict[code] == code_name_dict[code]: continue cls.add_code_name(code, code_name_dict[code]) # 想要买的代码 class WantBuyCodesManager: __instance = None __db = 0 redisManager = redis_manager.RedisManager(0) __redis_key = "want_buy_codes" def __new__(cls, *args, **kwargs): if not cls.__instance: cls.__instance = super(WantBuyCodesManager, cls).__new__(cls, *args, **kwargs) # 初始化设置 # 获取交易窗口的锁 cls.__instance.__want_buy_codes_cache = RedisUtils.smembers(cls.__get_redis(), cls.__redis_key) return cls.__instance __want_buy_codes_cache = set() @classmethod def __get_redis(cls): return cls.redisManager.getRedis() def clear(self): RedisUtils.delete(self.__get_redis(), self.__redis_key) def add_code(self, code): self.__want_buy_codes_cache.add(code) RedisUtils.sadd(self.__get_redis(), self.__redis_key, code) RedisUtils.expire(self.__get_redis(), self.__redis_key, tool.get_expire()) def remove_code(self, code): self.__want_buy_codes_cache.discard(code) RedisUtils.srem_async(self.__db, self.__redis_key, code) def sync(self): codes = self.list_code() self.__want_buy_codes_cache.clear() if codes: self.__want_buy_codes_cache |= set(codes) def is_in(self, code): return RedisUtils.sismember(self.__get_redis(), self.__redis_key, code) def is_in_cache(self, code): return code in self.__want_buy_codes_cache def list_code(self): return RedisUtils.smembers(self.__get_redis(), self.__redis_key) def list_code_cache(self): return self.__want_buy_codes_cache @tool.singleton class HumanRemoveForbiddenManager: """ 认为移黑管理 """ __db = 0 redisManager = redis_manager.RedisManager(0) __redis_key = "human_remove_forbidden_codes" __codes_cache = set() def __init__(self): self.__load_data() @classmethod def __get_redis(cls): return cls.redisManager.getRedis() def __load_data(self): self.__codes_cache = RedisUtils.smembers(self.__get_redis(), self.__redis_key) if self.__codes_cache is None: self.__codes_cache = set() def add_code(self, code): self.__codes_cache.add(code) RedisUtils.sadd_async(self.__db, self.__redis_key, code) RedisUtils.expire_async(self.__db, self.__redis_key, tool.get_expire()) def remove_code(self, code): self.__codes_cache.discard(code) RedisUtils.srem_async(self.__db, self.__redis_key, code) def is_in_cache(self, code): return code in self.__codes_cache @tool.singleton class HumanForbiddenManager: """ 人为拉黑管理 """ __db = 0 redisManager = redis_manager.RedisManager(0) __redis_key = "human_forbidden_codes" __codes_cache = set() def __init__(self): self.__load_data() @classmethod def __get_redis(cls): return cls.redisManager.getRedis() def __load_data(self): self.__codes_cache = RedisUtils.smembers(self.__get_redis(), self.__redis_key) if self.__codes_cache is None: self.__codes_cache = set() def add_code(self, code): self.__codes_cache.add(code) RedisUtils.sadd_async(self.__db, self.__redis_key, code) RedisUtils.expire_async(self.__db, self.__redis_key, tool.get_expire()) def remove_code(self, code): self.__codes_cache.discard(code) RedisUtils.srem_async(self.__db, self.__redis_key, code) def is_in_cache(self, code): return code in self.__codes_cache # 暂停下单代码管理 # 与黑名单的区别是暂停交易代码只是不交易,不能移除L2监控位 class PauseBuyCodesManager: __instance = None redisManager = redis_manager.RedisManager(0) __redis_key = "pause_buy_codes" def __new__(cls, *args, **kwargs): if not cls.__instance: cls.__instance = super(PauseBuyCodesManager, cls).__new__(cls, *args, **kwargs) cls.__instance.__pause_buy_codes_cache = RedisUtils.smembers(cls.__get_redis(), cls.__redis_key) return cls.__instance @classmethod def __get_redis(cls): return cls.redisManager.getRedis() def clear(self): self.__pause_buy_codes_cache.clear() RedisUtils.delete(self.__get_redis(), self.__redis_key) def sync(self): data = RedisUtils.smembers(self.__get_redis(), self.__redis_key) self.__pause_buy_codes_cache.clear() if data: self.__pause_buy_codes_cache |= data def add_code(self, code): self.__pause_buy_codes_cache.add(code) RedisUtils.sadd(self.__get_redis(), self.__redis_key, code) RedisUtils.expire(self.__get_redis(), self.__redis_key, tool.get_expire()) def remove_code(self, code): self.__pause_buy_codes_cache.discard(code) RedisUtils.srem(self.__get_redis(), self.__redis_key, code) def is_in(self, code): return RedisUtils.sismember(self.__get_redis(), self.__redis_key, code) def is_in_cache(self, code): return code in self.__pause_buy_codes_cache def list_code(self): return RedisUtils.smembers(self.__get_redis(), self.__redis_key) def list_code_cache(self): return self.__pause_buy_codes_cache # 必买单/红名单 class MustBuyCodesManager: __instance = None __db = 0 redisManager = redis_manager.RedisManager(0) __redis_key = "must_buy_rate-" __must_buy_code_cancel_rate_cache = {} def __new__(cls, *args, **kwargs): if not cls.__instance: cls.__instance = super(MustBuyCodesManager, cls).__new__(cls, *args, **kwargs) keys = RedisUtils.keys(cls.__get_redis(), cls.__redis_key + "*") for k in keys: code = k.split("-")[-1] val = RedisUtils.get(cls.__get_redis(), k) cls.__must_buy_code_cancel_rate_cache[code] = round(float(val), 2) return cls.__instance @classmethod def __get_redis(cls): return cls.redisManager.getRedis() def clear(self): self.__must_buy_code_cancel_rate_cache.clear() keys = RedisUtils.keys(self.__get_redis(), self.__redis_key + "*") for k in keys: RedisUtils.delete(self.__get_redis(), k) def add_code(self, code, rate=0.9): self.__must_buy_code_cancel_rate_cache[code] = round(rate, 2) RedisUtils.setex_async(self.__db, self.__redis_key + str(code), tool.get_expire(), str(round(rate, 2))) def remove_code(self, code): if code in self.__must_buy_code_cancel_rate_cache: self.__must_buy_code_cancel_rate_cache.pop(code) RedisUtils.delete_async(self.__db, self.__redis_key + str(code)) def is_in(self, code): return RedisUtils.get(self.__get_redis(), self.__redis_key + str(code)) def is_in_cache(self, code): return code in self.__must_buy_code_cancel_rate_cache def list_code(self): codes = set() keys = RedisUtils.keys(self.__get_redis(), self.__redis_key + "*") if keys: for k in keys: code = k.split("-")[-1] codes.add(code) return codes def list_code_cache(self): return self.__must_buy_code_cancel_rate_cache.keys() def get_cancel_rate_cache(self, code): if code not in self.__must_buy_code_cancel_rate_cache: return None return self.__must_buy_code_cancel_rate_cache[code] class WhiteListCodeManager: __instance = None __redis_manager = redis_manager.RedisManager(2) def __new__(cls, *args, **kwargs): if not cls.__instance: cls.__instance = super(WhiteListCodeManager, cls).__new__(cls, *args, **kwargs) # 初始化设置 # 获取交易窗口的锁 cls.__instance.__white_codes_cache = RedisUtils.smembers(cls.__get_redis(), "white_list_codes") return cls.__instance @classmethod def __get_redis(cls): return cls.__redis_manager.getRedis() def sync(self): data = RedisUtils.smembers(self.__get_redis(), "white_list_codes") self.__white_codes_cache.clear() if data: self.__white_codes_cache |= data def add_code(self, code): self.__white_codes_cache.add(code) RedisUtils.sadd(self.__get_redis(), "white_list_codes", code) RedisUtils.expire(self.__get_redis(), "white_list_codes", tool.get_expire()) def remove_code(self, code): self.__white_codes_cache.discard(code) RedisUtils.srem(self.__get_redis(), "white_list_codes", code) def is_in(self, code): return RedisUtils.sismember(self.__get_redis(), "white_list_codes", code) def is_in_cache(self, code): return code in self.__white_codes_cache def list_codes(self): return RedisUtils.smembers(self.__get_redis(), "white_list_codes") def list_codes_cache(self): return self.__white_codes_cache def clear(self): self.__white_codes_cache.clear() RedisUtils.delete(self.__get_redis(), "white_list_codes") class BlackListCodeManager: __instance = None __db = 2 __redis_manager = redis_manager.RedisManager(2) def __new__(cls, *args, **kwargs): if not cls.__instance: cls.__instance = super(BlackListCodeManager, cls).__new__(cls, *args, **kwargs) # 初始化设置 # 获取交易窗口的锁 cls.__instance.__forbidden_trade_codes_cache = RedisUtils.smembers(cls.__get_redis(), "forbidden-trade-codes") logger_debug.info(f"加载加黑列表:{cls.__instance.__forbidden_trade_codes_cache}") return cls.__instance @classmethod def __get_redis(cls): return cls.__redis_manager.getRedis() def add_code(self, code): self.__forbidden_trade_codes_cache.add(code) RedisUtils.sadd_async(self.__db, "forbidden-trade-codes", code) RedisUtils.expire_async(self.__db, "forbidden-trade-codes", tool.get_expire()) def sync(self): data = RedisUtils.smembers(self.__get_redis(), "forbidden-trade-codes") self.__forbidden_trade_codes_cache.clear() if data: self.__forbidden_trade_codes_cache |= data def remove_code(self, code): self.__forbidden_trade_codes_cache.discard(code) RedisUtils.srem(self.__get_redis(), "forbidden-trade-codes", code) def is_in(self, code): return RedisUtils.sismember(self.__get_redis(), "forbidden-trade-codes", code) def is_in_cache(self, code): return code in self.__forbidden_trade_codes_cache def list_codes(self): codes = RedisUtils.smembers(self.__get_redis(), "forbidden-trade-codes") self.__forbidden_trade_codes_cache = codes return codes def list_codes_cache(self): return self.__forbidden_trade_codes_cache def clear(self): self.__forbidden_trade_codes_cache.clear() RedisUtils.delete(self.__get_redis(), "forbidden-trade-codes") class GreenListCodeManager: """ 绿名单:买入即加红,撤单不移红 """ __instance = None __db = 2 __redis_manager = redis_manager.RedisManager(2) __codes_set = set() def __new__(cls, *args, **kwargs): if not cls.__instance: cls.__instance = super(GreenListCodeManager, cls).__new__(cls, *args, **kwargs) # 初始化设置 # 获取交易窗口的锁 cls.__codes_set = RedisUtils.smembers(cls.__get_redis(), "green-trade-codes") return cls.__instance @classmethod def __get_redis(cls): return cls.__redis_manager.getRedis() def add_code(self, code): self.__codes_set.add(code) RedisUtils.sadd_async(self.__db, "green-trade-codes", code) RedisUtils.expire_async(self.__db, "green-trade-codes", tool.get_expire()) def sync(self): data = RedisUtils.smembers(self.__get_redis(), "green-trade-codes") self.__codes_set.clear() if data: self.__codes_set |= data def remove_code(self, code): self.__codes_set.discard(code) RedisUtils.srem_async(self.__db, "green-trade-codes", code) def is_in(self, code): return RedisUtils.sismember(self.__get_redis(), "green-trade-codes", code) def is_in_cache(self, code): return code in self.__codes_set def list_codes(self): codes = RedisUtils.smembers(self.__get_redis(), "green-trade-codes") self.__codes_set = codes return codes def list_codes_cache(self): return self.__codes_set def clear(self): self.__codes_set.clear() RedisUtils.delete(self.__get_redis(), "green-trade-codes") def __parse_codes_data(code_datas): codes = [] name_codes = {} for _data in code_datas: # 正常的股票 if _data["sec_type"] == 1 and _data["sec_level"] == 1: code = _data["symbol"].split(".")[1] if tool.is_can_buy_code(code): name = _data["sec_name"] codes.append(code) # 保存代码对应的名称 name_codes[name] = code return codes, name_codes # -------------------------------首板代码管理------------------------------- class FirstGPCodesManager: __db = 0 __redisManager = redis_manager.RedisManager(0) __gp_list_first_codes_cache = set() __instance = None def __new__(cls, *args, **kwargs): if not cls.__instance: cls.__instance = super(FirstGPCodesManager, cls).__new__(cls, *args, **kwargs) cls.__load_datas() return cls.__instance @classmethod def __get_redis(cls): return cls.__redisManager.getRedis() @classmethod def __load_datas(cls): __redis = cls.__get_redis() try: cls.__gp_list_first_codes_cache = RedisUtils.smembers(__redis, "gp_list_first") finally: RedisUtils.realse(__redis) @classmethod def __parse_codes_data(cls, code_datas): codes = [] name_codes = {} for _data in code_datas: # 正常的股票 if _data["sec_type"] == 1 and _data["sec_level"] == 1: code = _data["symbol"].split(".")[1] if tool.is_can_buy_code(code): name = _data["sec_name"] codes.append(code) # 保存代码对应的名称 name_codes[name] = code return codes, name_codes # 添加首板代码 # code_datas 掘金返回的数据 def set_first_gp_codes_with_data(self, code_datas): try: codes, name_codes = self.__parse_codes_data(code_datas) codes_set = set() for code in codes: codes_set.add(code) old_codes_set = self.__gp_list_first_codes_cache if old_codes_set is None: old_codes_set = set() del_set = old_codes_set - codes_set add_codes = codes_set - old_codes_set for code in add_codes: RedisUtils.sadd_async(self.__db, "gp_list_first", code, auto_free=False) for code in del_set: RedisUtils.srem_async(self.__db, "gp_list_first", code, auto_free=False) if add_codes or del_set: RedisUtils.expire_async(self.__db, "gp_list_first", tool.get_expire(), auto_free=False) # 更新缓存 self.__gp_list_first_codes_cache.clear() self.__gp_list_first_codes_cache |= codes_set for key in name_codes: CodesNameManager.add_code_name(name_codes[key], key) finally: pass # 移除首板代码 def remove_first_gp_code(self, codes): redis_instance = self.__get_redis() try: for code in codes: self.__gp_list_first_codes_cache.discard(code) RedisUtils.srem(redis_instance, "gp_list_first", code, auto_free=False) finally: RedisUtils.realse(redis_instance) # 获取首板代码 def get_first_gp_codes(self): return RedisUtils.smembers(self.__get_redis(), "gp_list_first") def get_first_gp_codes_cache(self): return self.__gp_list_first_codes_cache # 是否在首板里面 def is_in_first_gp_codes(self, code): return RedisUtils.sismember(self.__get_redis(), "gp_list_first", code) # 是否在首板里面 def is_in_first_gp_codes_cache(self, code): return code in self.__gp_list_first_codes_cache # 代码名字缓存 __code_name_dict = {} # 获取代码的名称 def get_code_name(code): return CodesNameManager().get_code_name(code) def get_name_codes(): code_name_dict = CodesNameManager.list_code_name_dict() return {code_name_dict[x]: x for x in code_name_dict} # 涨停数据保存 def set_limit_up_list(gpset): if gpset is None: return # 获取基本信息 redis_instance = __redisManager.getRedis() try: # 删除之前的 RedisUtils.delete(redis_instance, "gp_limit_up_list", auto_free=False) for d in gpset: RedisUtils.sadd(redis_instance, "gp_limit_up_list", json.dumps(d), auto_free=False) RedisUtils.expire(redis_instance, "gp_limit_up_list", tool.get_expire(), auto_free=False) RedisUtils.setex(redis_instance, "gp_limit_up_list_update_time", tool.get_expire(), round(time.time() * 1000), auto_free=False) finally: RedisUtils.realse(redis_instance) # 获取涨停列表 def get_limit_up_list(): redis_instance = __redisManager.getRedis() try: return RedisUtils.get(redis_instance, "gp_limit_up_list_update_time", auto_free=False), RedisUtils.smembers(redis_instance, "gp_limit_up_list", auto_free=False) finally: RedisUtils.realse(redis_instance) def rm_gp(code): RedisUtils.srem(__redisManager.getRedis(), "gp_list", code) FirstGPCodesManager().remove_first_gp_code([code]) def is_in_gp_pool(code): return RedisUtils.sismember(__redisManager.getRedis(), "gp_list", code) or FirstGPCodesManager().is_in_first_gp_codes_cache(code) def get_gp_list(): codes = RedisUtils.smembers(__redisManager.getRedis(), "gp_list") first_codes = FirstGPCodesManager().get_first_gp_codes_cache() return set.union(codes, first_codes) # 获取二板代码 def get_second_gp_list(): codes = RedisUtils.smembers(__redisManager.getRedis(), "gp_list") return codes def get_gp_list_with_prefix(data=None): if data is None: data = get_gp_list() list = [] for d in data: if tool.is_sz_code(d): list.append("SZSE.{}".format(d)) elif tool.is_sh_code(d): list.append("SHSE.{}".format(d)) return list @tool.singleton class CodePrePriceManager: __price_pre_cache = {} __redisManager = redis_manager.RedisManager(0) def __init__(self): # 加载数据 fdatas = log_export.load_pre_close_price() if fdatas: for code in fdatas: self.__price_pre_cache[code] = round(float(fdatas.get(code)), 2) # 获取收盘价 def get_price_pre(self, code): fdatas = log_export.load_pre_close_price() if code in fdatas: return round(float(fdatas.get(code)), 2) return None # 获取缓存 def get_price_pre_cache(self, code): if code in self.__price_pre_cache: return self.__price_pre_cache[code] return None # 设置收盘价 def set_price_pre(self, code, price, force=False): if code in self.__price_pre_cache and not force: return price = round(float(price), 2) logger_pre_close_price.info(f"{code}-{price}") self.__price_pre_cache[code] = price __limit_up_price_dict = {} # 获取涨停价 def get_limit_up_price(code): # 读取内存中的值 if code in __limit_up_price_dict: return __limit_up_price_dict[code] price = CodePrePriceManager().get_price_pre_cache(code) if price is None: return None limit_up_price = tool.to_price(decimal.Decimal(str(price)) * decimal.Decimal(tool.get_limit_up_rate(code))) __limit_up_price_dict[code] = limit_up_price return limit_up_price def get_limit_up_price_as_num(code): limit_up_price = get_limit_up_price(code) if limit_up_price: return round(float(limit_up_price), 2) return None def get_limit_up_price_cache(code): if code in __limit_up_price_dict: return __limit_up_price_dict[code] return None def get_limit_up_price_by_preprice(code, price): if price is None: return None return tool.to_price(decimal.Decimal(str(price)) * decimal.Decimal(f"{tool.get_limit_up_rate(code)}")) def get_limit_down_price_by_preprice(code, price): if price is None: return None return tool.to_price(decimal.Decimal(str(price)) * decimal.Decimal(f"{tool.get_limit_down_rate(code)}")) # 获取跌停价 def get_limit_down_price(code): price = CodePrePriceManager().get_price_pre_cache(code) if price is None: return None return tool.to_price(decimal.Decimal(str(price)) * decimal.Decimal(f"{tool.get_limit_down_rate(code)}")) # 获取现价 def get_price(code): result = RedisUtils.get(__redisManager.getRedis(), "price-{}".format(code)) if result is not None: return float(result) return None __current_price_cache = {} # 设置现价 def set_price(code, price): if code in __current_price_cache and __current_price_cache[code] == price: return __current_price_cache[code] = price RedisUtils.setex(__redisManager.getRedis(), "price-{}".format(code), tool.get_expire(), price) # datas:[(code,price)] def set_prices(datas): for d in datas: code, price = d[0], d[1] if code in __current_price_cache and __current_price_cache[code] == price: continue __current_price_cache[code] = price RedisUtils.setex_async(__db, "price-{}".format(code), tool.get_expire(), price) # 获取正在监听的代码 def get_listen_codes(): redis_instance = __redisManager.getRedis() try: keys = RedisUtils.keys(redis_instance, "listen_code-*-*", auto_free=False) codes = set() for k in keys: code = RedisUtils.get(redis_instance, k, auto_free=False) if code is not None and len(code) > 0: codes.add(code) return codes finally: RedisUtils.realse(redis_instance) # 根据位置获取正在监听的代码 def get_listen_code_by_pos(client_id, pos): key = "listen_code-{}-{}".format(client_id, pos) value = RedisUtils.get(__redisManager.getRedis(), key) # print("redis:", key,value) return value # 设置位置的监听代码 def set_listen_code_by_pos(client_id, pos, code): RedisUtils.setex(__redisManager.getRedis(), "listen_code-{}-{}".format(client_id, pos), tool.get_expire(), code) # 同步监听的代码集合 __sync_listen_codes_pos() # 同步监听代码位置信息 def __sync_listen_codes_pos(): redis_instance = __redisManager.getRedis() try: # 获取已经正在监听的代码 keys = RedisUtils.keys(redis_instance, "code_listen_pos-*", auto_free=False) codes_set = set() for key in keys: codes_set.add(key.replace("code_listen_pos-", "")) keys = RedisUtils.keys(redis_instance, "listen_code-*-*", auto_free=False) for key in keys: result = RedisUtils.get(redis_instance, key, auto_free=False) if result: # 移除需要添加的代码 codes_set.discard(result) client_id_, pos_, code_ = int(key.split("-")[1]), int(key.split("-")[2]), result key_ = "code_listen_pos-{}".format(code_) val = RedisUtils.get(redis_instance, key_, auto_free=False) if val is None: RedisUtils.setex(redis_instance, key_, tool.get_expire(), json.dumps((client_id_, pos_)), auto_free=False) else: val = json.loads(val) if val[0] != client_id_ or val[1] != pos_: RedisUtils.setex(redis_instance, key_, tool.get_expire(), json.dumps((client_id_, pos_)), auto_free=False) # 移除没有监听的代码 for code_ in codes_set: RedisUtils.delete(redis_instance, code_, auto_free=False) finally: RedisUtils.realse(redis_instance) # 初始化位置 def init_listen_code_by_pos(client_id, pos): key = "listen_code-{}-{}".format(client_id, pos) RedisUtils.setnx(__redisManager.getRedis(), key, "") RedisUtils.expire(__redisManager.getRedis(), key, tool.get_expire()) # 清除所有监听代码 def clear_listen_codes(): redis_instance = __redisManager.getRedis() try: keys = RedisUtils.keys(redis_instance, "listen_code-*-*", auto_free=False) for key in keys: RedisUtils.setex(redis_instance, key, tool.get_expire(), "", auto_free=False) finally: RedisUtils.realse(redis_instance) def clear_first_codes(): redis_instance = __redisManager.getRedis() try: RedisUtils.delete(redis_instance, "gp_list_first", auto_free=False) RedisUtils.delete(redis_instance, "gp_list_names_first", auto_free=False) RedisUtils.delete(redis_instance, "first_code_record", auto_free=False) RedisUtils.delete(redis_instance, "first_code_limited_up_record", auto_free=False) finally: RedisUtils.realse(redis_instance)