""" L2的数据处理 """ import json from db import redis_manager_delegate as redis_manager from db.redis_manager_delegate import RedisUtils from utils import tool from utils.tool import CodeDataCacheUtil _db = 1 _redisManager = redis_manager.RedisManager(1) class L2DataException(Exception): # 价格不匹配 CODE_PRICE_ERROR = 1 # 无收盘价 CODE_NO_CLOSE_PRICE = 2 def __init__(self, code, msg): super().__init__(self) self.code = code self.msg = msg def __str__(self): return self.msg def get_code(self): return self.code # 交易点管理器,用于管理买入点;买撤点;距离买入点的净买入数据;距离买撤点的买撤数据 class TradePointManager: __db = 1 __redisManager = redis_manager.RedisManager(1) __buy_compute_index_info_cache = {} __buy_cancel_single_pos_cache = {} __instance = None def __new__(cls, *args, **kwargs): if not cls.__instance: cls.__instance = super(TradePointManager, cls).__new__(cls, *args, **kwargs) cls.load_data() return cls.__instance @classmethod def __get_redis(cls): return cls.__redisManager.getRedis() @classmethod def load_data(cls): redis_ = cls.__get_redis() keys = RedisUtils.keys(redis_, "buy_compute_index_info-*") for k in keys: code = k.split("-")[-1] val = RedisUtils.get(redis_, k) val = json.loads(val) CodeDataCacheUtil.set_cache(cls.__buy_compute_index_info_cache, code, val) keys = RedisUtils.keys(redis_, "buy_cancel_single_pos-*") for k in keys: code = k.split("-")[-1] val = RedisUtils.get(redis_, k) CodeDataCacheUtil.set_cache(cls.__buy_cancel_single_pos_cache, code, int(val)) # 删除买入点数据 def delete_buy_point(self, code): CodeDataCacheUtil.clear_cache(self.__buy_compute_index_info_cache, code) RedisUtils.delete_async(self.__db, "buy_compute_index_info-{}".format(code)) # 获取买入点信息 # 返回数据为:买入点 累计纯买额 已经计算的数据索引 def get_buy_compute_start_data(self, code): _key = "buy_compute_index_info-{}".format(code) _data_json = RedisUtils.get(self.__get_redis(), _key) if _data_json is None: return None, None, None, 0, 0, [], 0 _data = json.loads(_data_json) return _data[0], _data[1], _data[2], _data[3], _data[4], _data[5], _data[6] def get_buy_compute_start_data_cache(self, code): cache_result = CodeDataCacheUtil.get_cache(self.__buy_compute_index_info_cache, code) if cache_result[0]: return cache_result[1] return None, None, None, 0, 0, [], 0 # 设置买入点的值 # buy_single_index 买入信号位 # buy_exec_index 买入执行位 # compute_index 计算位置 # nums 累计纯买额 def set_buy_compute_start_data(self, code, buy_single_index, buy_exec_index, compute_index, nums, count, max_num_sets, volume_rate): expire = tool.get_expire() _key = "buy_compute_index_info-{}".format(code) data_ = None if buy_single_index is not None: data_ = (buy_single_index, buy_exec_index, compute_index, nums, count, list(max_num_sets), volume_rate) else: _buy_single_index, _buy_exec_index, _compute_index, _nums, _count, _max_num_index, _volume_rate = self.get_buy_compute_start_data_cache( code) data_ = (_buy_single_index, buy_exec_index, compute_index, nums, count, list(max_num_sets), volume_rate) CodeDataCacheUtil.set_cache(self.__buy_compute_index_info_cache, code, data_) RedisUtils.setex_async(self.__db, _key, expire, json.dumps(data_)) # 获取撤买入开始计算的信息 # 返回数据的内容为:撤销点索引 撤买纯买额 计算的数据索引 def get_buy_cancel_single_pos(self, code): info = RedisUtils.get(self.__get_redis(), "buy_cancel_single_pos-{}".format(code)) if info is None: return None else: return int(info) def get_buy_cancel_single_pos_cache(self, code): cache_result = tool.CodeDataCacheUtil.get_cache(self.__buy_cancel_single_pos_cache, code) if cache_result[0]: return cache_result[1] return None # 设置买撤点信息 # buy_num 纯买额 computed_index计算到的下标 index撤买信号起点 def set_buy_cancel_single_pos(self, code, index): tool.CodeDataCacheUtil.set_cache(self.__buy_cancel_single_pos_cache, code, index) expire = tool.get_expire() RedisUtils.setex_async(self.__db, "buy_cancel_single_pos-{}".format(code), expire, index) # 删除买撤点数据 def delete_buy_cancel_point(self, code): tool.CodeDataCacheUtil.clear_cache(self.__buy_cancel_single_pos_cache, code) RedisUtils.delete_async(self.__db, "buy_cancel_single_pos-{}".format(code)) # 清除l2数据 def clear_l2_data(code): redis_l2 = redis_manager.RedisManager(1).getRedis() try: keys = RedisUtils.keys(redis_l2, "l2-{}-*".format(code), auto_free=False) for k in keys: RedisUtils.delete(redis_l2, k, auto_free=False) RedisUtils.delete(redis_l2, "l2-data-latest-{}".format(code), auto_free=False) finally: RedisUtils.realse(redis_l2) second_930 = 9 * 3600 + 30 * 60 + 0 # 初始化l2固定代码库 def init_l2_fixed_codes(): key = "l2-fixed-codes" redis = _redisManager.getRedis() try: count = RedisUtils.scard(redis, key, auto_free=False) if count > 0: RedisUtils.delete(redis, key, auto_free=False) RedisUtils.sadd(redis, key, "000000", auto_free=False) RedisUtils.expire(redis, key, tool.get_expire(), auto_free=False) finally: RedisUtils.realse(redis) # 移除l2固定监控代码 def remove_from_l2_fixed_codes(code): key = "l2-fixed-codes" RedisUtils.srem(_redisManager.getRedis(), key, code) # 添加代码到L2固定监控 def add_to_l2_fixed_codes(code): key = "l2-fixed-codes" RedisUtils.sadd(_redisManager.getRedis(), key, code) RedisUtils.expire(_redisManager.getRedis(), key, tool.get_expire()) # 是否在l2固定监控代码中 def is_in_l2_fixed_codes(code): key = "l2-fixed-codes" return RedisUtils.sismember(_redisManager.getRedis(), key, code) if __name__ == "__main__": TradePointManager().get_buy_compute_start_data_cache("603912")