""" 股票代码管理器 """ import json import random import time import client_manager import constant from db import redis_manager import tool import decimal from ths import l2_listen_pos_health_manager __redisManager = redis_manager.RedisManager(0) class CodesNameManager: redisManager = redis_manager.RedisManager(0) @classmethod def __get_redis(cls): return cls.redisManager.getRedis() @classmethod def list_code_name_dict(cls): dict_ = {} val = cls.list_first_code_name_dict() if val is not None: for k in val: dict_[k] = val[k] val = cls.list_second_code_name_dict() if val is not None: for k in val: dict_[k] = val[k] return dict_ @classmethod def list_first_code_name_dict(cls): val = cls.__get_redis().get("gp_list_names_first") if val is not None: val = json.loads(val) return val return None @classmethod def list_second_code_name_dict(cls): val = cls.__get_redis().get("gp_list_names") if val is not None: val = json.loads(val) return val return None @classmethod def get_first_code_name(cls, code): val = cls.__get_redis().get("gp_list_names_first") if not val: return None val = json.loads(val) for k in val: if val[k] == code: return k return None @classmethod def get_second_code_name(cls, code): val = cls.__get_redis().get("gp_list_names") if not val: return None val = json.loads(val) for k in val: if val[k] == code: return k @classmethod def get_first_name_code(cls, name): val = cls.__get_redis().get("gp_list_names_first") if not val: return None val = json.loads(val) return val.get(name) @classmethod def add_first_code_name(cls, code, name): val = cls.__get_redis().get("gp_list_names_first") if not val: return None val = json.loads(val) val[name] = code cls.set_first_code_names(val) @classmethod def get_second_name_code(cls, name): val = cls.__get_redis().get("gp_list_names") if not val: return None val = json.loads(val) return val.get(name) # 设置首板代码名称 @classmethod def set_first_code_names(cls, datas): cls.__get_redis().set("gp_list_names_first", json.dumps(datas)) # 设置二板代码名称 @classmethod def set_second_code_names(cls, datas): cls.__get_redis().set("gp_list_names", json.dumps(datas)) # 删除首板代码名称 @classmethod def clear_first_code_names(cls): cls.__get_redis().delete("gp_list_names_first") # 设置二板代码名称 @classmethod def clear_second_code_names(cls): cls.__get_redis().delete("gp_list_names") # 首板代码管理 class FirstCodeManager: redisManager = redis_manager.RedisManager(0) @classmethod def __get_redis(cls): return cls.redisManager.getRedis() # 加入首板历史记录 @classmethod def add_record(cls, codes): for code in codes: cls.__get_redis().sadd("first_code_record", code) cls.__get_redis().expire("first_code_record", tool.get_expire()) @classmethod def is_in_first_record(cls, code): if cls.__get_redis().sismember("first_code_record", code): return True else: return False # 加入首板涨停过代码集合 @classmethod def add_limited_up_record(cls, codes): for code in codes: cls.__get_redis().sadd("first_code_limited_up_record", code) cls.__get_redis().expire("first_code_limited_up_record", tool.get_expire()) # 是否涨停过 @classmethod def is_limited_up(cls, code): if cls.__get_redis().sismember("first_code_limited_up_record", code): return True else: return False # 想要买的代码 class WantBuyCodesManager: redisManager = redis_manager.RedisManager(0) __redis_key = "want_buy_codes" @classmethod def __get_redis(cls): return cls.redisManager.getRedis() @classmethod def clear(cls): cls.__get_redis().delete(cls.__redis_key) @classmethod def add_code(cls, code): cls.__get_redis().sadd(cls.__redis_key, code) @classmethod def remove_code(cls, code): cls.__get_redis().srem(cls.__redis_key, code) @classmethod def is_in(cls, code): return cls.__get_redis().sismember(cls.__redis_key, code) @classmethod def list_code(cls): return cls.__get_redis().smembers(cls.__redis_key) def __parse_codes_data(code_datas): codes = [] name_codes = {} for _data in code_datas: # 正常的股票 if _data["sec_type"] == 1 and _data["sec_level"] == 1: code = _data["symbol"].split(".")[1] if code.find("30") != 0 and code.find("68") != 0: name = _data["sec_name"] codes.append(code) # 保存代码对应的名称 name_codes[name] = code return codes, name_codes # -------------------------------二板代码管理--------------------------------- def set_gp_list(code_datas): codes, name_codes = __parse_codes_data(code_datas) redis_instance = __redisManager.getRedis() # 删除之前的 redis_instance.delete("gp_list") CodesNameManager.clear_second_code_names() for d in codes: redis_instance.sadd("gp_list", d) CodesNameManager.set_second_code_names(name_codes) # 新增代码 def add_gp_list(code_datas): if len(code_datas) > 200: raise Exception("不能超过200个数据") redis_instance = __redisManager.getRedis() codes, name_codes = __parse_codes_data(code_datas) for d in codes: redis_instance.sadd("gp_list", d) old_name_codes = CodesNameManager.list_second_code_name_dict() if old_name_codes is None: old_name_codes = name_codes else: for key in name_codes: old_name_codes[key] = name_codes[key] CodesNameManager.set_second_code_names(old_name_codes) # -------------------------------首板代码管理------------------------------- # 添加首板代码 # code_datas 掘金返回的数据 def set_first_gp_codes_with_data(code_datas): redis_instance = __redisManager.getRedis() codes, name_codes = __parse_codes_data(code_datas) codes_set = set() for code in codes: codes_set.add(code) old_codes_set = redis_instance.smembers("gp_list_first") if old_codes_set is None: old_codes_set = set() del_set = old_codes_set - codes_set add_codes = codes_set - old_codes_set for code in add_codes: redis_instance.sadd("gp_list_first", code) for code in del_set: redis_instance.srem("gp_list_first", code) redis_instance.expire("gp_list_first", tool.get_expire()) old_name_codes = CodesNameManager.list_first_code_name_dict() if old_name_codes is None: old_name_codes = name_codes else: for key in name_codes: old_name_codes[key] = name_codes[key] CodesNameManager.set_first_code_names(old_name_codes) # 移除首板代码 def remove_first_gp_code(codes): redis_instance = __redisManager.getRedis() for code in codes: redis_instance.srem("gp_list_first", code) # 获取首板代码 def get_first_gp_codes(): redis_instance = __redisManager.getRedis() return redis_instance.smembers("gp_list_first") # 是否在首板里面 def is_in_first_gp_codes(code): redis_instance = __redisManager.getRedis() return redis_instance.sismember("gp_list_first", code) # 获取名称对应的代码 def get_name_code(name): code = CodesNameManager.get_second_name_code(name) if code is not None: return code code = CodesNameManager.get_first_name_code(name) return code def get_code_name(code): name = CodesNameManager.get_second_code_name(code) if name is not None: return name name = CodesNameManager.get_first_code_name(code) return name def get_name_codes(): return CodesNameManager.list_code_name_dict() # 涨停数据保存 def set_limit_up_list(gpset): if gpset is None: return # 获取基本信息 redis_instance = __redisManager.getRedis() # 删除之前的 redis_instance.delete("gp_limit_up_list") for d in gpset: redis_instance.sadd("gp_limit_up_list", json.dumps(d)) redis_instance.expire("gp_limit_up_list", tool.get_expire()) redis_instance.setex("gp_limit_up_list_update_time", tool.get_expire(), round(time.time() * 1000)) # 获取涨停列表 def get_limit_up_list(): redis_instance = __redisManager.getRedis() return redis_instance.get("gp_limit_up_list_update_time"), redis_instance.smembers("gp_limit_up_list") def rm_gp(code): redis_instance = __redisManager.getRedis() redis_instance.srem("gp_list", code) remove_first_gp_code([code]) def is_in_gp_pool(code): redis_instance = __redisManager.getRedis() return redis_instance.sismember("gp_list", code) or is_in_first_gp_codes(code) def get_gp_list(): redis_instance = __redisManager.getRedis() codes = redis_instance.smembers("gp_list") first_codes = get_first_gp_codes() return set.union(codes, first_codes) # 获取二板代码 def get_second_gp_list(): redis_instance = __redisManager.getRedis() codes = redis_instance.smembers("gp_list") return codes def get_gp_list_with_prefix(data=None): if data is None: data = get_gp_list() list = [] for d in data: if d[0:2] == '00': list.append("SZSE.{}".format(d)) elif d[0:2] == '60': list.append("SHSE.{}".format(d)) return list # 获取收盘价 def get_price_pre(code): redis_instance = __redisManager.getRedis() result = redis_instance.get("price-pre-{}".format(code)) if result is not None: return float(result) return None # 设置收盘价 def set_price_pre(code, price): codes = get_gp_list() if code not in codes: return redis_instance = __redisManager.getRedis() redis_instance.setex("price-pre-{}".format(code), tool.get_expire(), str(price)) __limit_up_price_dict = {} # 获取涨停价 def get_limit_up_price(code): # 读取内存中的值 if code in __limit_up_price_dict: return __limit_up_price_dict[code] price = get_price_pre(code) if price is None: return None limit_up_price = tool.to_price(decimal.Decimal(str(price)) * decimal.Decimal("1.1")) __limit_up_price_dict[code] = limit_up_price return limit_up_price def get_limit_up_price_by_preprice(price): if price is None: return None return tool.to_price(decimal.Decimal(str(price)) * decimal.Decimal("1.1")) # 获取跌停价 def get_limit_down_price(code): price = get_price_pre(code) if price is None: return None return tool.to_price(decimal.Decimal(str(price)) * decimal.Decimal("0.9")) # 获取现价 def get_price(code): redis_instance = __redisManager.getRedis() result = redis_instance.get("price-{}".format(code)) if result is not None: return float(result) return None # 设置现价 def set_price(code, price): redis_instance = __redisManager.getRedis() redis_instance.setex("price-{}".format(code), tool.get_expire(), price) # 获取正在监听的代码 def get_listen_codes(): redis_instance = __redisManager.getRedis() keys = redis_instance.keys("listen_code-*-*") codes = set() for k in keys: code = redis_instance.get(k) if code is not None and len(code) > 0: codes.add(code) return codes # 根据位置获取正在监听的代码 def get_listen_code_by_pos(client_id, pos): redis_instance = __redisManager.getRedis() key = "listen_code-{}-{}".format(client_id, pos) value = redis_instance.get(key) # print("redis:", key,value) return value # 设置位置的监听代码 def set_listen_code_by_pos(client_id, pos, code): redis_instance = __redisManager.getRedis() redis_instance.setex("listen_code-{}-{}".format(client_id, pos), tool.get_expire(), code) # 同步监听的代码集合 __sync_listen_codes_pos() # 同步监听代码位置信息 def __sync_listen_codes_pos(): redis_instance = __redisManager.getRedis() # 获取已经正在监听的代码 keys = redis_instance.keys("code_listen_pos-*") codes_set = set() for key in keys: codes_set.add(key.replace("code_listen_pos-", "")) keys = redis_instance.keys("listen_code-*-*") for key in keys: result = redis_instance.get(key) if result: # 移除需要添加的代码 codes_set.discard(result) client_id_, pos_, code_ = int(key.split("-")[1]), int(key.split("-")[2]), result key_ = "code_listen_pos-{}".format(code_) val = redis_instance.get(key_) if val is None: redis_instance.setex(key_, tool.get_expire(), json.dumps((client_id_, pos_))) else: val = json.loads(val) if val[0] != client_id_ or val[1] != pos_: redis_instance.setex(key_, tool.get_expire(), json.dumps((client_id_, pos_))) # 移除没有监听的代码 for code_ in codes_set: redis_instance.delete(code_) # 初始化位置 def init_listen_code_by_pos(client_id, pos): redis_instance = __redisManager.getRedis() key = "listen_code-{}-{}".format(client_id, pos) redis_instance.setnx(key, "") redis_instance.expire(key, tool.get_expire()) # 清除所有监听代码 def clear_listen_codes(): redis_instance = __redisManager.getRedis() keys = redis_instance.keys("listen_code-*-*") for key in keys: redis_instance.setex(key, tool.get_expire(), "") def clear_first_codes(): redis_instance = __redisManager.getRedis() redis_instance.delete("gp_list_first") redis_instance.delete("gp_list_names_first") redis_instance.delete("first_code_record") redis_instance.delete("first_code_limited_up_record") # 获取可以操作的位置 def get_can_listen_pos(client_id=0): client_ids = [] if client_id <= 0: client_ids = client_manager.getValidL2Clients() else: client_ids.append(client_id) # random.shuffle(client_ids) available_positions = [] for client_id in client_ids: redis_instance = __redisManager.getRedis() k = "listen_code-{}-*".format(client_id) keys = redis_instance.keys(k) # random.shuffle(keys) codes = [] for key in keys: index = key.split("-")[-1] if int(index) + 1 > constant.L2_CODE_COUNT_PER_DEVICE: continue result = redis_instance.get(key) if result is None or len(result) == 0: available_positions.append((client_id, int(key.replace("listen_code-{}-".format(client_id), "")))) else: codes.append((key, result)) # 查询是否有重复的代码 codes_set = set() count = 0 for code in codes: count = count + 1 codes_set.add(code[1]) if len(codes_set) < count: return client_id, int(code[0].replace("listen_code-{}-".format(client_id), "")) if available_positions: # 获取健康状态 available_positions_health_states = l2_listen_pos_health_manager.list_health_state(available_positions) available_positions.sort(key=lambda x: available_positions_health_states[x], reverse=True) # 取第1个数据 return available_positions[0][0], available_positions[0][1] return None, None # 获取可以操作的位置 def get_free_listen_pos_count(): client_ids = client_manager.getValidL2Clients() free_count = 0 for client_id in client_ids: redis_instance = __redisManager.getRedis() k = "listen_code-{}-*".format(client_id) keys = redis_instance.keys(k) for key in keys: code = redis_instance.get(key) if not code: free_count += 1 return free_count # 获取正在监听的代码的位置 def get_listen_code_pos(code): redis_instance = __redisManager.getRedis() val = redis_instance.get("code_listen_pos-{}".format(code)) if val is None: return None, None val = json.loads(val) return val[0], val[1] # 是否正在监听 def is_listen(code): redis_instance = __redisManager.getRedis() val = redis_instance.get("code_listen_pos-{}".format(code)) if val is None: return False else: return True # codes = get_listen_codes() # return codes.__contains__(code) def is_listen_old(code): codes = get_listen_codes() return codes.__contains__(code) # 监听是否满了 def is_listen_full(): clients = client_manager.getValidL2Clients() codes = get_listen_codes() return len(codes) >= constant.L2_CODE_COUNT_PER_DEVICE * len(clients) # 是否正在操作 def is_operate(code): redis_instance = __redisManager.getRedis() return redis_instance.get("gp_operate-{}".format(code)) is not None # 设置正在操作的代码 def set_operate(code): redis_instance = __redisManager.getRedis() redis_instance.setex("gp_operate-{}".format(code), 30, "1") # 批量设置正在操作的代码 def set_operates(codes): redis_instance = __redisManager.getRedis() for code in codes: redis_instance.setex("gp_operate-{}".format(code), 30, "1") # 移除正在操作的代码 def rm_operate(code): redis_instance = __redisManager.getRedis() redis_instance.delete("gp_operate-{}".format(code)) # 批量移除正在操作的代码 def rm_operates(codes): redis_instance = __redisManager.getRedis() for code in codes: redis_instance.delete("gp_operate-{}".format(code)) if __name__ == '__main__': print(get_can_listen_pos())