""" L2的数据处理 """ import json from db import redis_manager_delegate as redis_manager from db.redis_manager_delegate import RedisUtils from utils import tool from utils.tool import CodeDataCacheUtil _db = 1 _redisManager = redis_manager.RedisManager(1) # 下单临时信息 class OrderBeginPosInfo(object): MODE_NORMAL = 0 MODE_FAST = 1 # mode: 0-普通交易 1-快速交易 def __init__(self, buy_single_index=None, buy_exec_index=-1, buy_compute_index=None, num=0, count=0, max_num_set=None, buy_volume_rate=None, sell_info=None, threshold_money=None, mode=0): self.buy_single_index = buy_single_index self.buy_exec_index = buy_exec_index self.buy_compute_index = buy_compute_index self.num = num self.count = count self.threshold_money = threshold_money if max_num_set: self.max_num_set = list(max_num_set) else: self.max_num_set = [] self.buy_volume_rate = buy_volume_rate self.sell_info = sell_info self.mode = mode def get_max_num_set(self): if self.max_num_set: return set(self.max_num_set) return None def to_json_str(self): return json.dumps(vars(self)) def to_dict(self): return vars(self) @classmethod def to_object(cls, json_str: str): d = json.loads(json_str) return OrderBeginPosInfo(**d) class L2DataException(Exception): # 价格不匹配 CODE_PRICE_ERROR = 1 # 无收盘价 CODE_NO_CLOSE_PRICE = 2 def __init__(self, code, msg): super().__init__(self) self.code = code self.msg = msg def __str__(self): return self.msg def get_code(self): return self.code # 交易点管理器,用于管理买入点;买撤点;距离买入点的净买入数据;距离买撤点的买撤数据 class TradePointManager: __db = 1 __redisManager = redis_manager.RedisManager(1) __buy_compute_index_info_cache = {} __buy_cancel_single_pos_cache = {} __instance = None def __new__(cls, *args, **kwargs): if not cls.__instance: cls.__instance = super(TradePointManager, cls).__new__(cls, *args, **kwargs) cls.load_data() return cls.__instance @classmethod def __get_redis(cls): return cls.__redisManager.getRedis() @classmethod def load_data(cls): redis_ = cls.__get_redis() keys = RedisUtils.keys(redis_, "buy_compute_index_info-*") for k in keys: code = k.split("-")[-1] val = RedisUtils.get(redis_, k) order = OrderBeginPosInfo.to_object(val) CodeDataCacheUtil.set_cache(cls.__buy_compute_index_info_cache, code, order) keys = RedisUtils.keys(redis_, "buy_cancel_single_pos-*") for k in keys: code = k.split("-")[-1] val = RedisUtils.get(redis_, k) CodeDataCacheUtil.set_cache(cls.__buy_cancel_single_pos_cache, code, int(val)) # 删除买入点数据 def delete_buy_point(self, code): CodeDataCacheUtil.clear_cache(self.__buy_compute_index_info_cache, code) RedisUtils.delete_async(self.__db, "buy_compute_index_info-{}".format(code)) # 获取买入点信息 # 返回数据为:买入点 累计纯买额 已经计算的数据索引 def get_buy_compute_start_data(self, code): _key = "buy_compute_index_info-{}".format(code) _data_json = RedisUtils.get(self.__get_redis(), _key) if _data_json is None: return None, None, None, 0, 0, [], 0 _data = json.loads(_data_json) return _data[0], _data[1], _data[2], _data[3], _data[4], _data[5], _data[6] def get_buy_compute_start_data_cache(self, code) -> OrderBeginPosInfo: cache_result = CodeDataCacheUtil.get_cache(self.__buy_compute_index_info_cache, code) if cache_result[0]: return cache_result[1] return OrderBeginPosInfo() # 设置买入点的值 # buy_single_index 买入信号位 # buy_exec_index 买入执行位 # compute_index 计算位置 # nums 累计纯买额 def set_buy_compute_start_data_v2(self, code, order: OrderBeginPosInfo): expire = tool.get_expire() _key = "buy_compute_index_info-{}".format(code) data_ = None if order.buy_single_index is not None: data_ = order else: old_order = self.get_buy_compute_start_data_cache(code) order.buy_single_index = old_order.buy_single_index data_ = order CodeDataCacheUtil.set_cache(self.__buy_compute_index_info_cache, code, data_) RedisUtils.setex_async(self.__db, _key, expire, data_.to_json_str()) # 清除l2数据 def clear_l2_data(code): redis_l2 = redis_manager.RedisManager(1).getRedis() try: keys = RedisUtils.keys(redis_l2, "l2-{}-*".format(code), auto_free=False) for k in keys: RedisUtils.delete(redis_l2, k, auto_free=False) RedisUtils.delete(redis_l2, "l2-data-latest-{}".format(code), auto_free=False) finally: RedisUtils.realse(redis_l2) second_930 = 9 * 3600 + 30 * 60 + 0 # 初始化l2固定代码库 def init_l2_fixed_codes(): key = "l2-fixed-codes" redis = _redisManager.getRedis() try: count = RedisUtils.scard(redis, key, auto_free=False) if count > 0: RedisUtils.delete(redis, key, auto_free=False) RedisUtils.sadd(redis, key, "000000", auto_free=False) RedisUtils.expire(redis, key, tool.get_expire(), auto_free=False) finally: RedisUtils.realse(redis) # 移除l2固定监控代码 def remove_from_l2_fixed_codes(code): key = "l2-fixed-codes" RedisUtils.srem(_redisManager.getRedis(), key, code) # 添加代码到L2固定监控 def add_to_l2_fixed_codes(code): key = "l2-fixed-codes" RedisUtils.sadd(_redisManager.getRedis(), key, code) RedisUtils.expire(_redisManager.getRedis(), key, tool.get_expire()) # 是否在l2固定监控代码中 def is_in_l2_fixed_codes(code): key = "l2-fixed-codes" return RedisUtils.sismember(_redisManager.getRedis(), key, code) if __name__ == "__main__": code = "002886" TradePointManager().set_buy_compute_start_data_v2(code, OrderBeginPosInfo(buy_single_index=10, buy_exec_index=30, buy_compute_index=40, num=20000, count=10, buy_volume_rate=0.6, mode=OrderBeginPosInfo.MODE_NORMAL, )) print( TradePointManager().get_buy_compute_start_data_cache(code).max_num_set) RedisUtils.run_loop()