''' 成交进度 ''' # 买入队列 import itertools import json import constant from db import redis_manager from db.redis_manager import RedisUtils from utils import tool import l2.l2_data_util from log_module.log import logger_l2_trade_buy_queue, logger_l2_trade_buy_progress buy_progress_index_cache = {} class TradeBuyQueue: __db = 0 __redis_manager = redis_manager.RedisManager(0) def __init__(self): self.last_buy_queue_data = {} def __getRedis(self): return self.__redis_manager.getRedis() def __save_buy_queue_data(self, code, num_list): key = "trade_buy_queue_data-{}".format(code) RedisUtils.setex(self.__getRedis(), key, tool.get_expire(), json.dumps((num_list, tool.get_now_time_str()))) # 返回数据与更新时间 def __get_buy_queue_data(self, code): key = "trade_buy_queue_data-{}".format(code) val = RedisUtils.get(self.__getRedis(), key) if val is None: return None, None val = json.loads(val) return val[0], [1] def __save_buy_progress_index(self, code, index, is_default): tool.CodeDataCacheUtil.set_cache(buy_progress_index_cache, code, (index, is_default)) key = "trade_buy_progress_index-{}".format(code) RedisUtils.setex_async(self.__db, key, tool.get_expire(), json.dumps((index, is_default))) # 返回数据与更新时间 def __get_buy_progress_index(self, code): key = "trade_buy_progress_index-{}".format(code) val = RedisUtils.get(self.__getRedis(), key) if val is None: return None, True val = json.loads(val) return int(val[0]), bool(val[1]) def __get_buy_progress_index_cache(self, code): cache_result = tool.CodeDataCacheUtil.get_cache(buy_progress_index_cache, code) if cache_result[0]: return cache_result[1] val = self.__get_buy_progress_index(code) tool.CodeDataCacheUtil.set_cache(buy_progress_index_cache, code, val) return val # 最近的非涨停买1的时间 def __save_latest_not_limit_up_time(self, code, time_str): key = "latest_not_limit_up_time-{}".format(code) RedisUtils.setex(self.__getRedis(), key, tool.get_expire(), time_str) def __get_latest_not_limit_up_time(self, code): key = "latest_not_limit_up_time-{}".format(code) if not constant.TEST: return RedisUtils.get(self.__getRedis(), key) return None # 保存数据,返回保存数据的条数 def save(self, code, limit_up_price, buy_1_price, buy_1_time, queues): # 2个以上的数据才有处理价值 if not queues or len(queues) < 2: return None # 如果买1不为涨停价就不需要保存 old_queues = self.last_buy_queue_data.get(code) if old_queues and len(old_queues) == len(queues): # 元素相同就不需要再次处理 old_str = ",".join([str(k) for k in old_queues[1:]]) new_str = ",".join([str(k) for k in queues[1:]]) if old_str == new_str: return None self.last_buy_queue_data[code] = queues if abs(float(buy_1_price) - float(limit_up_price)) >= 0.01: # 保存最近的涨停起始时间 self.__save_latest_not_limit_up_time(code, buy_1_time) return None min_num = round(constant.L2_MIN_MONEY / (limit_up_price * 100)) num_list = [] # 忽略第一条数据 for i in range(1, len(queues)): num = queues[i] if num > min_num and len(num_list) < 4: num_list.append(num) # 保存列表 self.__save_buy_queue_data(code, num_list) return num_list # 保存成交索引 def compute_traded_index(self, code, buy1_price, buyQueueBig, exec_time=None): total_datas = l2.l2_data_util.local_today_datas.get(code) today_num_operate_map = l2.l2_data_util.local_today_num_operate_map.get(code) index = None if True: # 最多5个数据 buyQueueBigTemp = buyQueueBig last_index, is_default = self.get_traded_index(code) c_last_index = 0 if not is_default and last_index is not None: c_last_index = last_index latest_not_limit_up_time = self.__get_latest_not_limit_up_time(code) # 如果是3个/4个数据找不到就调整顺序 fbuyQueueBigTempList = [] if 3 <= len(buyQueueBigTemp) <= 4: buyQueueBigTempList = itertools.permutations(buyQueueBigTemp, len(buyQueueBigTemp)) for tempQueue in buyQueueBigTempList: if list(tempQueue) != buyQueueBigTemp: fbuyQueueBigTempList.append(tempQueue) fbuyQueueBigTempList.insert(0, buyQueueBigTemp) for temp in fbuyQueueBigTempList: try: index = l2.l2_data_util.L2TradeQueueUtils.find_traded_progress_index(code, buy1_price, total_datas, today_num_operate_map, temp, c_last_index, latest_not_limit_up_time ) if index is not None: # 判断位置是否大于执行位2s if exec_time and tool.trade_time_sub(total_datas[index]["val"]["time"], exec_time) > 5: # 位置是否大于执行位2s表示无效 index = None continue # 只能削减一半以下才能终止 if len(temp) * 2 < len(buyQueueBig): index = None break except: pass if index is not None: logger_l2_trade_buy_queue.info(f"确定交易进度:code-{code} index-{index}") logger_l2_trade_buy_progress.info( f"确定交易进度成功:code-{code} index-{index} queues:{buyQueueBig} last_index-{c_last_index} latest_not_limit_up_time-{latest_not_limit_up_time} exec_time-{exec_time}") # 保存成交进度 # self.__save_buy_progress_index(code, index, False) return index else: logger_l2_trade_buy_progress.warning( f"确定交易进度失败:code-{code} queues:{buyQueueBig} last_index-{c_last_index} latest_not_limit_up_time-{latest_not_limit_up_time} exec_time-{exec_time}") return index # 获取成交进度索引 def get_traded_index(self, code): index, is_default = self.__get_buy_progress_index_cache(code) return index, is_default def set_traded_index(self, code, index): self.__save_buy_progress_index(code, index, False) if __name__ == '__main__': a = [1, 2, 3, 4] results = [str(k) for k in a] b = [1, 2, 3] result = (",".join([str(k) for k in a]) == ",".join([str(k) for k in b])) print(result)