""" L2的数据处理 """ import json from db import redis_manager from db.redis_manager import RedisUtils from utils import tool from log_module.log import logger_l2_trade_buy from utils.tool import CodeDataCacheUtil _redisManager = redis_manager.RedisManager(1) class L2DataException(Exception): # 价格不匹配 CODE_PRICE_ERROR = 1 # 无收盘价 CODE_NO_CLOSE_PRICE = 2 def __init__(self, code, msg): super().__init__(self) self.code = code self.msg = msg def __str__(self): return self.msg def get_code(self): return self.code # 交易点管理器,用于管理买入点;买撤点;距离买入点的净买入数据;距离买撤点的买撤数据 class TradePointManager: __buy_compute_index_info_cache = {} @staticmethod def __get_redis(): return _redisManager.getRedis() # 删除买入点数据 @staticmethod def delete_buy_point(code): CodeDataCacheUtil.clear_cache(TradePointManager.__buy_compute_index_info_cache, code) redis = TradePointManager.__get_redis() RedisUtils.delete(redis, "buy_compute_index_info-{}".format(code)) # 获取买入点信息 # 返回数据为:买入点 累计纯买额 已经计算的数据索引 @staticmethod def get_buy_compute_start_data(code): redis = TradePointManager.__get_redis() _key = "buy_compute_index_info-{}".format(code) _data_json = redis.get(_key) if _data_json is None: return None, None, None, 0, 0, [], 0 _data = json.loads(_data_json) return _data[0], _data[1], _data[2], _data[3], _data[4], _data[5], _data[6] @staticmethod def get_buy_compute_start_data_cache(code): cache_result = CodeDataCacheUtil.get_cache(TradePointManager.__buy_compute_index_info_cache, code) if cache_result[0]: return cache_result[1] val = TradePointManager.get_buy_compute_start_data(code) CodeDataCacheUtil.set_cache(TradePointManager.__buy_compute_index_info_cache, code, val) return val # 设置买入点的值 # buy_single_index 买入信号位 # buy_exec_index 买入执行位 # compute_index 计算位置 # nums 累计纯买额 @staticmethod def set_buy_compute_start_data(code, buy_single_index, buy_exec_index, compute_index, nums, count, max_num_sets, volume_rate): redis = TradePointManager.__get_redis() expire = tool.get_expire() _key = "buy_compute_index_info-{}".format(code) data_ = None if buy_single_index is not None: data_ = (buy_single_index, buy_exec_index, compute_index, nums, count, list(max_num_sets), volume_rate) else: _buy_single_index, _buy_exec_index, _compute_index, _nums, _count, _max_num_index, _volume_rate = TradePointManager.get_buy_compute_start_data_cache( code) data_ = (_buy_single_index, buy_exec_index, compute_index, nums, count, list(max_num_sets), volume_rate) CodeDataCacheUtil.set_cache(TradePointManager.__buy_compute_index_info_cache, code, data_) RedisUtils.setex( redis,_key, expire, json.dumps(data_)) # 获取撤买入开始计算的信息 # 返回数据的内容为:撤销点索引 撤买纯买额 计算的数据索引 @staticmethod def get_buy_cancel_single_pos(code): redis = TradePointManager.__get_redis() info = redis.get("buy_cancel_single_pos-{}".format(code)) if info is None: return None else: return int(info) # 设置买撤点信息 # buy_num 纯买额 computed_index计算到的下标 index撤买信号起点 @classmethod def set_buy_cancel_single_pos(cls, code, index): redis = TradePointManager.__get_redis() expire = tool.get_expire() RedisUtils.setex(redis,"buy_cancel_single_pos-{}".format(code), expire, index) # 删除买撤点数据 @classmethod def delete_buy_cancel_point(cls, code): redis = TradePointManager.__get_redis() RedisUtils.delete( redis, "buy_cancel_single_pos-{}".format(code)) # 设置买撤纯买额 @classmethod def set_compute_info_for_cancel_buy(cls, code, index, nums): redis = TradePointManager.__get_redis() expire = tool.get_expire() RedisUtils.setex( redis,"compute_info_for_cancel_buy-{}".format(code), expire, json.dumps((index, nums))) logger_l2_trade_buy.info("{}保存撤单纯买额信息:{},{}", code, index, nums) # 获取买撤纯买额计算信息 @classmethod def get_compute_info_for_cancel_buy(cls, code): redis = TradePointManager.__get_redis() info = redis.get("compute_info_for_cancel_buy-{}".format(code)) if info is None: return None, 0 else: info = json.loads(info) return info[0], info[1] @classmethod def delete_compute_info_for_cancel_buy(cls, code): redis = TradePointManager.__get_redis() RedisUtils.delete(redis, "compute_info_for_cancel_buy-{}".format(code)) # 从买入信号开始设置涨停买与涨停撤的单数 @classmethod def set_count_info_for_cancel_buy(cls, code, index, buy_count, cancel_count): redis = TradePointManager.__get_redis() expire = tool.get_expire() RedisUtils.setex(redis,"count_info_for_cancel_buy-{}".format(code), expire, json.dumps((index, buy_count, cancel_count))) logger_l2_trade_buy.info("{}保存撤单纯买额信息:{},{}", code, index, buy_count, cancel_count) # 获取买撤纯买额计算信息 @classmethod def get_count_info_for_cancel_buy(cls, code): redis = TradePointManager.__get_redis() info = redis.get("count_info_for_cancel_buy-{}".format(code)) if info is None: return None, 0, 0 else: info = json.loads(info) return info[0], info[1], info[2] @classmethod def delete_count_info_for_cancel_buy(cls, code): redis = TradePointManager.__get_redis() RedisUtils.delete(redis, "count_info_for_cancel_buy-{}".format(code)) # 清除l2数据 def clear_l2_data(code): redis_l2 = redis_manager.RedisManager(1).getRedis() keys = RedisUtils.keys( redis_l2, "l2-{}-*".format(code)) for k in keys: RedisUtils.delete(redis_l2,k) RedisUtils.delete(redis_l2, "l2-data-latest-{}".format(code)) second_930 = 9 * 3600 + 30 * 60 + 0 # 初始化l2固定代码库 def init_l2_fixed_codes(): key = "l2-fixed-codes" redis = _redisManager.getRedis() count = redis.scard(key) if count > 0: RedisUtils.delete(redis, key) RedisUtils.sadd(redis, key, "000000") RedisUtils.expire(redis, key, tool.get_expire()) # 移除l2固定监控代码 def remove_from_l2_fixed_codes(code): key = "l2-fixed-codes" redis = _redisManager.getRedis() redis.srem(key, code) # 添加代码到L2固定监控 def add_to_l2_fixed_codes(code): key = "l2-fixed-codes" redis = _redisManager.getRedis() RedisUtils.sadd(redis, key, code) RedisUtils.expire(redis, key, tool.get_expire()) # 是否在l2固定监控代码中 def is_in_l2_fixed_codes(code): key = "l2-fixed-codes" redis = _redisManager.getRedis() return redis.sismember(key, code) if __name__ == "__main__": TradePointManager.get_buy_compute_start_data_cache("603912")