From 6bbfbbb16d792f7737ec86cabdba5c0e98dcf4b4 Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期五, 29 八月 2025 17:41:29 +0800 Subject: [PATCH] 有涨停买撤单要触发撤单计算 --- l2/transaction_progress.py | 185 +++++++++++++++++++++++++++++++++++++-------- 1 files changed, 151 insertions(+), 34 deletions(-) diff --git a/l2/transaction_progress.py b/l2/transaction_progress.py index 6d570b6..f53fbea 100644 --- a/l2/transaction_progress.py +++ b/l2/transaction_progress.py @@ -3,96 +3,213 @@ ''' # 涔板叆闃熷垪 +import itertools import json +import time import constant -from db import redis_manager -import tool +from db import redis_manager_delegate as redis_manager +from db.redis_manager_delegate import RedisUtils +from utils import tool import l2.l2_data_util +from log_module.log import logger_l2_trade_buy_queue, logger_l2_trade_buy_progress class TradeBuyQueue: + buy_progress_index_cache = {} + latest_buy_progress_index_cache = {} + # 鎴愪氦閫熺巼 + trade_speed_cache = {} + + __db = 0 __redis_manager = redis_manager.RedisManager(0) + __instance = None + + def __new__(cls, *args, **kwargs): + if not cls.__instance: + cls.__instance = super(TradeBuyQueue, cls).__new__(cls, *args, **kwargs) + cls.__load_datas() + return cls.__instance def __init__(self): self.last_buy_queue_data = {} - def __getRedis(self): - return self.__redis_manager.getRedis() + @classmethod + def __get_redis(cls): + return cls.__redis_manager.getRedis() + + @classmethod + def __load_datas(cls): + __redis = cls.__get_redis() + try: + keys = RedisUtils.keys(__redis, "trade_buy_progress_index-*") + for k in keys: + code = k.split("-")[-1] + val = RedisUtils.get(__redis, k) + val = json.loads(val) + tool.CodeDataCacheUtil.set_cache(cls.buy_progress_index_cache, code, val) + + finally: + RedisUtils.realse(__redis) def __save_buy_queue_data(self, code, num_list): key = "trade_buy_queue_data-{}".format(code) - self.__getRedis().setex(key, tool.get_expire(), json.dumps((num_list, tool.get_now_time_str()))) + RedisUtils.setex(self.__get_redis(), key, tool.get_expire(), json.dumps((num_list, tool.get_now_time_str()))) # 杩斿洖鏁版嵁涓庢洿鏂版椂闂� def __get_buy_queue_data(self, code): key = "trade_buy_queue_data-{}".format(code) - val = self.__getRedis().get(key) + val = RedisUtils.get(self.__get_redis(), key) if val is None: return None, None val = json.loads(val) return val[0], [1] - def __save_buy_progress_index(self, code, index): + def __save_buy_progress_index(self, code, index, is_default): + tool.CodeDataCacheUtil.set_cache(self.buy_progress_index_cache, code, (index, is_default)) key = "trade_buy_progress_index-{}".format(code) - self.__getRedis().setex(key, tool.get_expire(), index) + RedisUtils.setex_async(self.__db, key, tool.get_expire(), json.dumps((index, is_default))) # 杩斿洖鏁版嵁涓庢洿鏂版椂闂� def __get_buy_progress_index(self, code): key = "trade_buy_progress_index-{}".format(code) - val = self.__getRedis().get(key) + val = RedisUtils.get(self.__get_redis(), key) if val is None: - return None - return int(val) + return None, True + val = json.loads(val) + return int(val[0]), bool(val[1]) + + def __get_buy_progress_index_cache(self, code): + cache_result = tool.CodeDataCacheUtil.get_cache(self.buy_progress_index_cache, code) + if cache_result[0]: + return cache_result[1] + return None, True # 鏈�杩戠殑闈炴定鍋滀拱1鐨勬椂闂� def __save_latest_not_limit_up_time(self, code, time_str): key = "latest_not_limit_up_time-{}".format(code) - self.__getRedis().setex(key, tool.get_expire(), time_str) + RedisUtils.setex(self.__get_redis(), key, tool.get_expire(), time_str) def __get_latest_not_limit_up_time(self, code): key = "latest_not_limit_up_time-{}".format(code) - self.__getRedis().get(key) + if not constant.TEST: + return RedisUtils.get(self.__get_redis(), key) + return None # 淇濆瓨鏁版嵁,杩斿洖淇濆瓨鏁版嵁鐨勬潯鏁� def save(self, code, limit_up_price, buy_1_price, buy_1_time, queues): - # 濡傛灉涔�1涓嶄负娑ㄥ仠浠峰氨涓嶉渶瑕佷繚瀛� - if queues == self.last_buy_queue_data.get(code): + # 2涓互涓婄殑鏁版嵁鎵嶆湁澶勭悊浠峰�� + if not queues or len(queues) < 2: return None - if abs(float(buy_1_price) - float(limit_up_price)) >= 0.01: + # 濡傛灉涔�1涓嶄负娑ㄥ仠浠峰氨涓嶉渶瑕佷繚瀛� + old_queues = self.last_buy_queue_data.get(code) + if old_queues and len(old_queues) == len(queues): + # 鍏冪礌鐩稿悓灏变笉闇�瑕佸啀娆″鐞� + old_str = ",".join([str(k) for k in old_queues[1:]]) + new_str = ",".join([str(k) for k in queues[1:]]) + if old_str == new_str: + return None + self.last_buy_queue_data[code] = queues + + if abs(float(buy_1_price) - float(limit_up_price)) >= 0.001: # 淇濆瓨鏈�杩戠殑娑ㄥ仠璧峰鏃堕棿 self.__save_latest_not_limit_up_time(code, buy_1_time) return None - - self.last_buy_queue_data[code] = queues min_num = round(constant.L2_MIN_MONEY / (limit_up_price * 100)) num_list = [] - for num in queues: - if num > min_num: + # 蹇界暐绗竴鏉℃暟鎹� + for i in range(1, len(queues)): + num = queues[i] + if num > min_num and len(num_list) < 4: num_list.append(num) # 淇濆瓨鍒楄〃 self.__save_buy_queue_data(code, num_list) return num_list # 淇濆瓨鎴愪氦绱㈠紩 - def save_traded_index(self, code, buy1_price, buyQueueBig): + def compute_traded_index(self, code, buy1_price, buyQueueBig, exec_time=None): total_datas = l2.l2_data_util.local_today_datas.get(code) today_num_operate_map = l2.l2_data_util.local_today_num_operate_map.get(code) - index = l2.l2_data_util.L2TradeQueueUtils.find_traded_progress_index(buy1_price, total_datas, - today_num_operate_map, buyQueueBig,self.__get_latest_not_limit_up_time(code)) - if index is not None: - # 淇濆瓨鎴愪氦杩涘害 - self.__save_buy_progress_index(code, index) - return index - return None + index = None + if True: + # 鏈�澶�5涓暟鎹� + buyQueueBigTemp = buyQueueBig + last_index, is_default = self.get_traded_index(code) + c_last_index = 0 + if not is_default and last_index is not None: + c_last_index = last_index + latest_not_limit_up_time = self.__get_latest_not_limit_up_time(code) + # 濡傛灉鏄�3涓�/4涓暟鎹壘涓嶅埌灏辫皟鏁撮『搴� + fbuyQueueBigTempList = [] + if 3 <= len(buyQueueBigTemp) <= 4: + buyQueueBigTempList = itertools.permutations(buyQueueBigTemp, len(buyQueueBigTemp)) + for tempQueue in buyQueueBigTempList: + if list(tempQueue) != buyQueueBigTemp: + fbuyQueueBigTempList.append(tempQueue) + fbuyQueueBigTempList.insert(0, buyQueueBigTemp) + for temp in fbuyQueueBigTempList: + try: + index = l2.l2_data_util.L2TradeQueueUtils.find_traded_progress_index(code, buy1_price, total_datas, + today_num_operate_map, + temp, + c_last_index, + latest_not_limit_up_time + ) + if index is not None: + # 鍒ゆ柇浣嶇疆鏄惁澶т簬鎵ц浣�2s + if exec_time and tool.trade_time_sub(total_datas[index]["val"]["time"], exec_time) > 5: + # 浣嶇疆鏄惁澶т簬鎵ц浣�2s琛ㄧず鏃犳晥 + index = None + continue + # 鍙兘鍓婂噺涓�鍗婁互涓嬫墠鑳界粓姝� + if len(temp) * 2 < len(buyQueueBig): + index = None + break + except: + pass + + if index is not None: + logger_l2_trade_buy_queue.info(f"纭畾浜ゆ槗杩涘害锛歝ode-{code} index-{index}") + logger_l2_trade_buy_progress.info( + f"纭畾浜ゆ槗杩涘害鎴愬姛锛歝ode-{code} index-{index} queues:{buyQueueBig} last_index-{c_last_index} latest_not_limit_up_time-{latest_not_limit_up_time} exec_time-{exec_time}") + # 淇濆瓨鎴愪氦杩涘害 + # self.__save_buy_progress_index(code, index, False) + return index + else: + logger_l2_trade_buy_progress.warning( + f"纭畾浜ゆ槗杩涘害澶辫触锛歝ode-{code} queues:{buyQueueBig} last_index-{c_last_index} latest_not_limit_up_time-{latest_not_limit_up_time} exec_time-{exec_time}") + return index # 鑾峰彇鎴愪氦杩涘害绱㈠紩 def get_traded_index(self, code): - index = self.__get_buy_progress_index(code) - return index + index, is_default = self.__get_buy_progress_index_cache(code) + return index, is_default + + # 璁剧疆浜ゆ槗杩涘害 + def set_traded_index(self, code, index, total_datas=None): + last_info = self.latest_buy_progress_index_cache.get(code) + # 浜ゆ槗杩涘害鏄惁鏀瑰彉 + traded_index_changed = False + if not last_info or last_info[0] != index: + if last_info and total_datas: + val = total_datas[last_info[0]]['val'] + if time.time() - last_info[1] > 0: + rate = round(val["num"] * float(val["price"]) * 100 / (time.time() - last_info[1])) + # 鎴愪氦閫熺巼 + self.trade_speed_cache[code] = rate + self.latest_buy_progress_index_cache[code] = (index, time.time()) + traded_index_changed = True + self.__save_buy_progress_index(code, index, False) + return traded_index_changed + + # 鑾峰彇鎴愪氦閫熺巼 + def get_trade_speed(self, code): + return self.trade_speed_cache.get(code) -if __name__ == '__main': - - - pass +if __name__ == '__main__': + a = [1, 2, 3, 4] + results = [str(k) for k in a] + b = [1, 2, 3] + result = (",".join([str(k) for k in a]) == ",".join([str(k) for k in b])) + print(result) -- Gitblit v1.8.0