''' 成交进度 ''' # 买入队列 import itertools import json import constant from db import redis_manager import tool import l2.l2_data_util from log import logger_l2_trade_buy_queue class TradeBuyQueue: __redis_manager = redis_manager.RedisManager(0) def __init__(self): self.last_buy_queue_data = {} def __getRedis(self): return self.__redis_manager.getRedis() def __save_buy_queue_data(self, code, num_list): key = "trade_buy_queue_data-{}".format(code) self.__getRedis().setex(key, tool.get_expire(), json.dumps((num_list, tool.get_now_time_str()))) # 返回数据与更新时间 def __get_buy_queue_data(self, code): key = "trade_buy_queue_data-{}".format(code) val = self.__getRedis().get(key) if val is None: return None, None val = json.loads(val) return val[0], [1] def __save_buy_progress_index(self, code, index, is_default): key = "trade_buy_progress_index-{}".format(code) self.__getRedis().setex(key, tool.get_expire(), json.dumps((index, is_default))) # 返回数据与更新时间 def __get_buy_progress_index(self, code): key = "trade_buy_progress_index-{}".format(code) val = self.__getRedis().get(key) if val is None: return None, True val = json.loads(val) return int(val[0]), bool(val[1]) # 最近的非涨停买1的时间 def __save_latest_not_limit_up_time(self, code, time_str): key = "latest_not_limit_up_time-{}".format(code) self.__getRedis().setex(key, tool.get_expire(), time_str) def __get_latest_not_limit_up_time(self, code): key = "latest_not_limit_up_time-{}".format(code) self.__getRedis().get(key) # 保存数据,返回保存数据的条数 def save(self, code, limit_up_price, buy_1_price, buy_1_time, queues): # 如果买1不为涨停价就不需要保存 if queues == self.last_buy_queue_data.get(code): return None if abs(float(buy_1_price) - float(limit_up_price)) >= 0.01: # 保存最近的涨停起始时间 self.__save_latest_not_limit_up_time(code, buy_1_time) return None self.last_buy_queue_data[code] = queues min_num = round(constant.L2_MIN_MONEY / (limit_up_price * 100)) num_list = [] # 忽略第一条数据 for i in range(1, len(queues)): num = queues[i] if num > min_num: num_list.append(num) # 保存列表 self.__save_buy_queue_data(code, num_list) return num_list # 保存成交索引 def compute_traded_index(self, code, buy1_price, buyQueueBig): total_datas = l2.l2_data_util.local_today_datas.get(code) today_num_operate_map = l2.l2_data_util.local_today_num_operate_map.get(code) index = None for i in range(0, len(buyQueueBig)): buyQueueBigTemp = buyQueueBig[i:] if i > 0 and len(buyQueueBigTemp) < 2: # 已经执行过一次,且数据量小于2条就终止计算 break last_index, is_default = self.get_traded_index(code) c_last_index = 0 if not is_default and last_index is not None: c_last_index = last_index # 如果是3个/4个数据找不到就调整顺序 fbuyQueueBigTempList = [] if 3 <= len(buyQueueBigTemp) <= 4: buyQueueBigTempList = itertools.permutations(buyQueueBigTemp, len(buyQueueBigTemp)) for tempQueue in buyQueueBigTempList: if list(tempQueue) != buyQueueBigTemp: fbuyQueueBigTempList.append(tempQueue) fbuyQueueBigTempList.insert(0, buyQueueBigTemp) for temp in fbuyQueueBigTempList: try: index = l2.l2_data_util.L2TradeQueueUtils.find_traded_progress_index(buy1_price, total_datas, today_num_operate_map, temp, c_last_index, self.__get_latest_not_limit_up_time( code)) if index is not None: break except: pass if index is not None: break if index is not None: logger_l2_trade_buy_queue.info(f"确定交易进度:code-{code} index-{index}") # 保存成交进度 # self.__save_buy_progress_index(code, index, False) return index return index # 获取成交进度索引 def get_traded_index(self, code): index, is_default = self.__get_buy_progress_index(code) return index, is_default def set_default_traded_index(self, code, index): self.__save_buy_progress_index(code, index, True) def set_traded_index(self, code, index): self.__save_buy_progress_index(code, index, False) if __name__ == '__main': pass