""" 安全笔数管理 """ # 下单L2的安全笔数管理 import json from db.redis_manager_delegate import RedisUtils from l2 import l2_data_source_util from trade import l2_trade_factor from db import redis_manager_delegate as redis_manager from utils import tool from l2.l2_data_util import L2DataUtil, local_today_buyno_map class BuyL2SafeCountManager(object): __db = 0 __redis_manager = redis_manager.RedisManager(0) __instance = None latest_place_order_info_cache = {} safe_count_l2_cache = {} def __new__(cls, *args, **kwargs): if not cls.__instance: cls.__instance = super(BuyL2SafeCountManager, cls).__new__(cls, *args, **kwargs) cls.__load_datas() return cls.__instance def __init__(self): self.last_buy_queue_data = {} @classmethod def __get_redis(cls): return cls.__redis_manager.getRedis() @classmethod def __load_datas(cls): __redis = cls.__get_redis() try: keys = RedisUtils.keys(__redis, "safe_count_l2-*") for k in keys: ks = k.split("-") code, last_buy_single_index = ks[1], int(ks[2]) val = RedisUtils.get(__redis, k) val = json.loads(val) tool.CodeDataCacheUtil.set_cache(cls.safe_count_l2_cache, f"{code}-{last_buy_single_index}", val) keys = RedisUtils.keys(__redis, "latest_place_order_info-*") for k in keys: ks = k.split("-") code = ks[-1] val = RedisUtils.get(__redis, k) val = json.loads(val) tool.CodeDataCacheUtil.set_cache(cls.latest_place_order_info_cache, code, val) finally: RedisUtils.realse(__redis) # 记录每一次的处理进度 def __save_compute_progress(self, code, last_buy_single_index, process_index, buy_num, cancel_num): key = "safe_count_l2-{}-{}".format(code, last_buy_single_index) tool.CodeDataCacheUtil.set_cache(self.safe_count_l2_cache, f"{code}-{last_buy_single_index}", (last_buy_single_index, process_index, buy_num, cancel_num)) RedisUtils.setex_async(self.__db, key, tool.get_expire(), json.dumps((last_buy_single_index, process_index, buy_num, cancel_num))) # 返回数据与更新时间 def __get_compute_progress(self, code, last_buy_single_index): key = "safe_count_l2-{}-{}".format(code, last_buy_single_index) val = RedisUtils.get(self.__get_redis(), key) if val is None: return None, -1, 0, 0 val = json.loads(val) return val[0], val[1], val[2], val[3] def __get_compute_progress_cache(self, code, last_buy_single_index): cache_result = tool.CodeDataCacheUtil.get_cache(self.safe_count_l2_cache, f"{code}-{last_buy_single_index}") if cache_result[0]: return cache_result[1] return None, -1, 0, 0 # 保存最近的下单信息 def __save_latest_place_order_info(self, code, buy_single_index, buy_exec_index, cancel_index): tool.CodeDataCacheUtil.set_cache(self.latest_place_order_info_cache, code, (buy_single_index, buy_exec_index, cancel_index)) key = "latest_place_order_info-{}".format(code) RedisUtils.setex_async(self.__db, key, tool.get_expire(), json.dumps((buy_single_index, buy_exec_index, cancel_index))) def __get_latest_place_order_info(self, code): key = "latest_place_order_info-{}".format(code) val = RedisUtils.get(self.__get_redis(), key) if val is None: return None, None, None val = json.loads(val) return val[0], val[1], val[2] def __get_latest_place_order_info_cache(self, code): cache_result = tool.CodeDataCacheUtil.get_cache(self.latest_place_order_info_cache, code) if cache_result[0]: return cache_result[1] return None, None, None def __get_all_compute_progress(self, code): key_regex = f"safe_count_l2-{code}-*" keys = RedisUtils.keys(self.__get_redis(), key_regex) vals = [] for k in keys: val = RedisUtils.get(self.__get_redis(), k) val = json.loads(val) vals.append(val) return vals def clear_data(self, code): key_regex = f"safe_count_l2-{code}-*" keys = RedisUtils.keys(self.__get_redis(), key_regex) for k in keys: RedisUtils.delete(self.__get_redis(), k) tool.CodeDataCacheUtil.clear_cache(self.latest_place_order_info_cache, code) key = f"latest_place_order_info-{code}" RedisUtils.delete_async(self.__db, key) # 获取基础的安全笔数 def __get_base_save_count(self, code, is_first): return l2_trade_factor.L2TradeFactorUtil.get_safe_buy_count(code, is_first) # 获取最后的安全笔数 def get_safe_count(self, code, is_first_code, rate): # rate = self.__get_rate(code) count, min_count, max_count = self.__get_base_save_count(code, is_first_code) # 第4次下单按第一次算 # if place_order_count and place_order_count >= 3: # rate = 1 # print("--------------------------------") # print("安全笔数比例:", rate) # print("--------------------------------") # count, min_count, max_count = self.__get_base_save_count(code, is_first_code) # count = round(count * rate) # if count < min_count: # count = min_count # if count > max_count: # count = max_count return int(round(count * (1 + rate), 0)) # 计算留下来的比例 # last_buy_single_index 上一次下单信号起始位置 # cancel_index 上一次取消下单的位置 # start_index 数据开始位置 # end_index 数据结束位置 def compute_left_rate(self, code, start_index, end_index, total_datas, local_today_num_operate_map): last_buy_single_index, buy_exec_index, cancel_index = self.__get_latest_place_order_info_cache(code) if last_buy_single_index is None: return cancel_time = None if cancel_index is not None: cancel_time = total_datas[cancel_index]["val"]["time"] # 获取处理的进度 last_buy_single_index_, process_index, buy_num, cancel_num = self.__get_compute_progress_cache(code, last_buy_single_index) break_index = -1 for i in range(start_index, end_index): data = total_datas[i] val = data["val"] # 如果没有取消位置就一直计算下去, 计算截至时间不能大于取消时间 if cancel_time and int(cancel_time.replace(":", "")) < int(val["time"].replace(":", "")): break_index = i break if break_index >= 0: end_index = break_index - 1 # 获取开始计算的位置 start_compute_index = min(start_index, last_buy_single_index) if start_compute_index <= process_index: start_compute_index = process_index + 1 for i in range(start_compute_index, end_index): data = total_datas[i] val = data["val"] if process_index >= i: continue if L2DataUtil.is_limit_up_price_buy(val): # 涨停买 buy_num += int(val["num"]) * data["re"] elif L2DataUtil.is_limit_up_price_buy_cancel(val): # 获取买入信息 buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data_v2(data, local_today_buyno_map.get(code)) if buy_index is not None: if last_buy_single_index <= buy_index <= end_index: cancel_num += int(val["num"]) * data["re"] process_index = end_index # 保存处理进度与数量 self.__save_compute_progress(code, last_buy_single_index, process_index, buy_num, cancel_num) # 获取比例 def __get_rate(self, code): vals = self.__get_all_compute_progress(code) rate = (1 - 0) for val in vals: temp_rate = (1 - round((val[2] - val[3]) / val[2], 4)) if temp_rate > 1: temp_rate = 1 rate *= temp_rate return rate # 下单成功 def save_place_order_info(self, code, buy_single_index, buy_exec_index, cancel_index): self.__save_latest_place_order_info(code, buy_single_index, buy_exec_index, cancel_index)