""" 交易数据股那里器 用于对交易临时数据(交易状态,代码状态等)进行管理 """ import json import time # 交易撤销数据管理器 import constant from db.redis_manager import RedisUtils from utils import global_util, tool import l2_data_util from db import redis_manager from log_module.log import logger_trade class TradeCancelDataManager: capture_time_dict = {} # 保存截图时间 @classmethod def save_l2_capture_time(cls, client_id, pos, code, capture_time): cls.capture_time_dict["{}-{}-{}".format(client_id, pos, code)] = {"create_time": round(time.time() * 1000), "capture_time": capture_time} # 获取最近一次的截图时间 @classmethod def get_latest_l2_capture_time(cls, client_id, pos, code): val = cls.capture_time_dict.get("{}-{}-{}".format(client_id, pos, code)) if val is None: return -1 # 间隔时间不能大于1s if round(time.time() * 1000) - val["create_time"] > 1000: return -1 return val["capture_time"] # 获取l2数据的增长速度 @classmethod def get_l2_data_grow_speed(cls, client_id, pos, code, add_datas, capture_time): count = 0 for data in add_datas: count += data["re"] lastest_capture_time = cls.get_latest_l2_capture_time(client_id, pos, code) if lastest_capture_time < 0: raise Exception("获取上次l2数据截图时间出错") return count / (capture_time - lastest_capture_time) # 获取买入确认点的位置 @classmethod def get_buy_sure_position(cls, index, speed, trade_time): return index + round(speed * trade_time) class TradeBuyDataManager: redisManager = redis_manager.RedisManager(0) buy_sure_position_dict = {} # 设置买入点的信息 # trade_time: 买入点截图时间与下单提交时间差值 # capture_time: 买入点截图时间 # last_data: 买入点最后一条数据 @classmethod def set_buy_position_info(cls, code, capture_time, trade_time, last_data, last_data_index): RedisUtils.setex(cls.redisManager.getRedis(), "buy_position_info-{}".format(code), tool.get_expire(), json.dumps((capture_time, trade_time, last_data, last_data_index))) # 获取买入点信息 @classmethod def get_buy_position_info(cls, code): val_str = RedisUtils.get( cls.redisManager.getRedis(), "buy_position_info-{}".format(code)) if val_str is None: return None, None, None, None else: val = json.loads(val_str) return val[0], val[1], val[2], val[3] # 删除买入点信息 @classmethod def remove_buy_position_info(cls, code): RedisUtils.delete( cls.redisManager.getRedis(), "buy_position_info-{}".format(code)) # 设置买入确认点信息 @classmethod def __set_buy_sure_position(cls, code, index, data): logger_trade.debug("买入确认点信息: code:{} index:{} data:{}", code, index, data) key = "buy_sure_position-{}".format(code) RedisUtils.setex(cls.redisManager.getRedis(), key, tool.get_expire(), json.dumps((index, data))) cls.buy_sure_position_dict[code] = (index, data) # 移除下单信号的详细信息 cls.remove_buy_position_info(code) # 清除买入确认点信息 @classmethod def __clear_buy_sure_position(cls, code): key = "buy_sure_position-{}".format(code) RedisUtils.delete(cls.redisManager.getRedis(), key) if code in cls.buy_sure_position_dict: cls.buy_sure_position_dict.pop(code) # 获取买入确认点信息 @classmethod def get_buy_sure_position(cls, code): temp = cls.buy_sure_position_dict.get(code) if temp is not None: return temp[0], temp[1] key = "buy_sure_position-{}".format(code) val = RedisUtils.get(cls.redisManager.getRedis(), key) if val is None: return None, None else: val = json.loads(val) cls.buy_sure_position_dict[code] = (val[0], val[1]) return val[0], val[1] # 处理买入确认点信息 @classmethod def process_buy_sure_position_info(cls, code, capture_time, l2_today_datas, l2_latest_data, l2_add_datas): buy_capture_time, trade_time, l2_data, l2_data_index = cls.get_buy_position_info(code) if buy_capture_time is None: # 没有购买者信息 return None if capture_time - buy_capture_time < trade_time: # 时间未等待足够 return None # 时间差是否相差2s及以上 old_time = l2_data["val"]["time"] new_time = l2_latest_data["val"]["time"] old_time_int = l2_data_util.get_time_as_seconds(old_time) new_time_int = l2_data_util.get_time_as_seconds(new_time) if new_time_int - old_time_int >= 2: # 间隔2s及其以上表示数据异常 # 间隔2s以上的就以下单时间下一秒末尾作为确认点 start_index = l2_data_index if len(l2_today_datas) - 1 > start_index: for i in range(start_index + 1, len(l2_today_datas)): _time = l2_today_datas[i]["val"]["time"] if l2_data_util.get_time_as_seconds(_time) - old_time_int >= 2: index = i - 1 data = l2_today_datas[index] cls.__set_buy_sure_position(code, index, data) break else: cls.__set_buy_sure_position(code, l2_data_index, l2_data) elif new_time_int - old_time_int >= 0: # 间隔2s内表示数据正常,将其位置设置为新增数据的中间位置 index = len(l2_today_datas) - 1 - (len(l2_add_datas)) // 2 data = l2_today_datas[index] cls.__set_buy_sure_position(code, index, data) else: # 间隔时间小于0 ,一般产生原因是数据回溯产生,故不做处理 logger_trade.warning("预估委托位置错误:数据间隔时间小于0 code-{}", code) pass # 代码实时价格管理器 class CodeActualPriceProcessor: __instance = None def __new__(cls, *args, **kwargs): if not cls.__instance: cls.__instance = super(CodeActualPriceProcessor, cls).__new__(cls, *args, **kwargs) # 初始化设置 # 获取交易窗口的锁 cls.__instance.__redisManager = redis_manager.RedisManager(0) cls.__instance.__under_water_last_time_cache = {} cls.__instance.__code_current_rate_cache = {} cls.__instance.__code_current_rate_latest = {} return cls.__instance def __get_redis(self): return self.__redisManager.getRedis() # 保存跌价的时间 def __save_down_price_time(self, code, time_str): key = "under_water_last_time-{}".format(code) tool.CodeDataCacheUtil.set_cache(self.__under_water_last_time_cache, code, time_str) RedisUtils.setex(self.__get_redis(), key, tool.get_expire(), time_str) def __remove_down_price_time(self, code): key = "under_water_last_time-{}".format(code) tool.CodeDataCacheUtil.clear_cache(self.__under_water_last_time_cache, code) RedisUtils.delete(self.__get_redis(), key) def __get_last_down_price_time(self, code): key = "under_water_last_time-{}".format(code) return RedisUtils.get(self.__get_redis(), key) def __get_last_down_price_time_cache(self, code): cache_result = tool.CodeDataCacheUtil.get_cache(self.__under_water_last_time_cache, code) if cache_result[0]: return cache_result[1] val = self.__get_last_down_price_time(code) tool.CodeDataCacheUtil.set_cache(self.__under_water_last_time_cache, code, val) return val def __increment_down_price_time(self, code, seconds): key = "under_water_seconds-{}".format(code) RedisUtils.incrby( self.__get_redis(), key, seconds) # 设置个失效时间 RedisUtils.expire(self.__get_redis(), key, tool.get_expire()) def __get_down_price_time_as_seconds(self, code): key = "under_water_seconds-{}".format(code) val = RedisUtils.get(self.__get_redis(), key) if val is None: return None else: return int(val) # 清除所有的水下捞数据 def clear_under_water_data(self): key = "under_water_*" keys = RedisUtils.keys(self.__get_redis(), key) for k in keys: RedisUtils.delete(self.__get_redis(), k) def __save_current_price_codes_count(self, count): key = "current_price_codes_count" RedisUtils.setex(self.__get_redis(), key, 10, count) def __get_current_price_codes_count(self): key = "current_price_codes_count" count = RedisUtils.get(self.__get_redis(), key) return 0 if count is None else count # 保存当前涨幅 def __save_current_rate(self, code, rate): # 变化之后才会持久化 if self.__code_current_rate_latest.get(code) == rate: return self.__code_current_rate_latest[code] = rate tool.CodeDataCacheUtil.set_cache(self.__code_current_rate_cache, code, rate) key = "code_current_rate-{}".format(code) RedisUtils.setex(self.__get_redis(), key, tool.get_expire(), rate) # 获取当前涨幅 def __get_current_rate(self, code): key = "code_current_rate-{}".format(code) rate = RedisUtils.get(self.__get_redis(), key) if rate is not None: return float(rate) return None def get_current_rate(self, code): cache_result = tool.CodeDataCacheUtil.get_cache(self.__code_current_rate_cache, code) if cache_result[0]: return cache_result[1] val = self.__get_current_rate(code) tool.CodeDataCacheUtil.set_cache(self.__code_current_rate_cache, code, val) return val def process_rate(self, code, rate, time_str): # 保存目前的代码涨幅 self.__save_current_rate(code, rate) # 9点半之前的数据不处理 if int(time_str.replace(":", "")) < int("093000"): return # now_str = tool.get_now_time_str() if rate >= 0: down_start_time = self.__get_last_down_price_time_cache(code) if down_start_time is None: return else: # 累计增加时间 time_second = tool.trade_time_sub(time_str, down_start_time) self.__increment_down_price_time(code, time_second) # 删除起始时间 self.__remove_down_price_time(code) else: # 记录开始值 if self.__get_last_down_price_time_cache(code) is None: self.__save_down_price_time(code, time_str) # 保存现价 def save_current_price(self, code, price, is_limit_up): global_util.cuurent_prices[code] = (price, is_limit_up, round(time.time())) pass # 获取现价 def get_current_price(self, code): return global_util.cuurent_prices.get(code) # 现价代码数量 def save_current_price_codes_count(self, count): self.__save_current_price_codes_count(count) def get_current_price_codes_count(self): return self.__get_current_price_codes_count() # 是否为水下捞 def is_under_water(self, code, now_time=None): time_seconds = self.__get_down_price_time_as_seconds(code) if time_seconds is None: return False else: if time_seconds >= constant.UNDER_WATER_PRICE_TIME_AS_SECONDS: if now_time is None: now_time = tool.get_now_time_str() space = tool.trade_time_sub(now_time, "09:30:00") if space > 0 and time_seconds / space >= 0.2: return True return False # 当前代码是否涨停 def current_is_limit_up(self, code): data = self.get_current_price(code) if data is None: return None return data[1] # 获取涨幅前几的代码 def get_top_rate_codes(self, top_n): keys = "code_current_rate-*" keys = RedisUtils.keys(self.__get_redis(), keys) infos = [] for k in keys: code = k.split("-")[1] rate = self.get_current_rate(code) infos.append((code, rate)) # 排序信息 sorted_infos = sorted(infos, key=lambda tup: tup[1], reverse=True) sorted_infos = sorted_infos[:top_n] codes = [] for data in sorted_infos: codes.append(data[0]) return codes # 涨停次数管理 class placeordercountmanager: __redisManager = redis_manager.RedisManager(0) @classmethod def __get_redis(cls): return cls.__redisManager.getRedis() @classmethod def __incre_place_order_count(cls, code): key = "place_order_count-{}".format(code) RedisUtils.incrby(cls.__get_redis(), key, 1) RedisUtils.expire(cls.__get_redis(), key, tool.get_expire()) @classmethod def __get_place_order_count(cls, code): key = "place_order_count-{}".format(code) count = RedisUtils.get(cls.__get_redis(), key) if count is not None: return int(count) return 0 @classmethod def place_order(cls, code): cls.__incre_place_order_count(code) @classmethod def get_place_order_count(cls, code): return cls.__get_place_order_count(code) @classmethod def clear_place_order_count(cls, code): key = "place_order_count-{}".format(code) RedisUtils.delete(cls.__get_redis(), key) if __name__ == "__main__": processor = CodeActualPriceProcessor() print(processor.get_top_rate_codes(30))