import copy import logging import threading import time import time as t from cancel_strategy.s_l_h_cancel_strategy import HourCancelBigNumComputer, LCancelRateManager from cancel_strategy.s_l_h_cancel_strategy import LCancelBigNumComputer from cancel_strategy.s_l_h_cancel_strategy import SCancelBigNumComputer from code_attribute import big_money_num_manager, code_volumn_manager, code_data_util, limit_up_time_manager, \ gpcode_manager, code_nature_analyse import constant from db.redis_manager_delegate import RedisUtils from l2.huaxin import l2_huaxin_util, huaxin_delegate_postion_manager from l2.l2_limitup_sell_data_manager import L2LimitUpSellDataManager from l2.l2_sell_manager import L2MarketSellManager, L2LimitUpSellManager from l2.l2_transaction_data_manager import HuaXinSellOrderStatisticManager, BigOrderDealManager, HuaXinBuyOrderManager from l2.place_order_single_data_manager import L2TradeSingleDataProcessor from log_module import async_log_util, log_export from third_data import kpl_data_manager, block_info from third_data.kpl_data_constant import LimitUpDataConstant from trade.buy_radical.radical_buy_data_manager import EveryLimitupBigDealOrderManager from utils import global_util, tool, buy_condition_util, buy_strategy_util, trade_util import l2_data_util from db import redis_manager_delegate as redis_manager from third_data.code_plate_key_manager import CodePlateKeyBuyManager, KPLCodeJXBlockManager from trade import trade_manager, trade_queue_manager, l2_trade_factor, l2_trade_util, \ trade_result_manager, current_price_process_manager, trade_data_manager, trade_huaxin, trade_record_log_util, \ trade_constant, buy_open_limit_up_strategy from trade.buy_radical import radical_buy_data_manager from l2 import l2_data_manager, l2_log, l2_data_source_util, code_price_manager, \ transaction_progress, cancel_buy_strategy, place_order_single_data_manager from l2.cancel_buy_strategy import DCancelBigNumComputer, \ LatestCancelIndexManager, \ NewGCancelBigNumComputer, JCancelBigNumComputer, L2DataComputeUtil, RDCancelBigNumComputer from l2.l2_data_manager import L2DataException, OrderBeginPosInfo from l2.l2_data_util import local_today_datas, L2DataUtil, local_today_num_operate_map, local_today_buyno_map, \ local_latest_datas, local_today_canceled_buyno_map, local_today_sellno_map import l2.l2_data_util from log_module.log import logger_l2_trade_buy, logger_l2_process, logger_l2_error, logger_debug, \ logger_l2_not_buy_reasons, logger_real_place_order_position, logger_l2_trade_buy_queue from trade.trade_data_manager import CodeActualPriceProcessor, PlaceOrderCountManager, AccountMoneyManager, \ RadicalBuyDealCodesManager from trade.trade_manager import TradeTargetCodeModeManager, CodesTradeStateManager from settings.trade_setting import MarketSituationManager, TradeBlockBuyModeManager import concurrent.futures class L2DataManager: # 格式化数据 def format_data(self, datas): format_datas = [] for data in datas: format_datas.append({"val": data, "re": 1}) return format_datas # 获取新增数据 def get_add_datas(self, format_datas): pass # 从数据库加载数据 def load_data(self, code=None, force=False): pass # 保存数据 def save_datas(self, add_datas, datas): pass # m值大单处理 class L2BigNumForMProcessor: _db = 1 _redis_manager = redis_manager.RedisManager(1) m_big_money_begin_cache = {} m_big_money_process_index_cache = {} __instance = None def __new__(cls, *args, **kwargs): if not cls.__instance: cls.__instance = super(L2BigNumForMProcessor, cls).__new__(cls, *args, **kwargs) cls.__load_datas() return cls.__instance @classmethod def __get_redis(cls): return cls._redis_manager.getRedis() @classmethod def __load_datas(cls): _redis = cls._redis_manager.getRedis() try: keys = RedisUtils.keys(_redis, "m_big_money_begin-*") for k in keys: code = k.split("-")[-1] val = RedisUtils.get(_redis, k) tool.CodeDataCacheUtil.set_cache(cls.m_big_money_begin_cache, code, int(val)) keys = RedisUtils.keys(_redis, "m_big_money_process_index-*") for k in keys: code = k.split("-")[-1] val = RedisUtils.get(_redis, k) tool.CodeDataCacheUtil.set_cache(cls.m_big_money_process_index_cache, code, int(val)) finally: RedisUtils.realse(_redis) # 保存计算开始位置 def set_begin_pos(self, code, index): if self.__get_begin_pos_cache(code) is None: tool.CodeDataCacheUtil.set_cache(self.m_big_money_begin_cache, code, index) # 保存位置 key = "m_big_money_begin-{}".format(code) RedisUtils.setex_async(self._db, key, tool.get_expire(), index) # 获取计算开始位置 def __get_begin_pos(self, code): key = "m_big_money_begin-{}".format(code) val = RedisUtils.get(self.__get_redis(), key) if val is None: return None return int(val) def __get_begin_pos_cache(self, code): cache_result = tool.CodeDataCacheUtil.get_cache(self.m_big_money_begin_cache, code) if cache_result[0]: return cache_result[1] return None # 清除已经处理的数据 def clear_processed_end_index(self, code): tool.CodeDataCacheUtil.clear_cache(self.m_big_money_process_index_cache, code) key = "m_big_money_process_index-{}".format(code) RedisUtils.delete_async(self._db, key) # 添加已经处理过的单 def __set_processed_end_index(self, code, index): tool.CodeDataCacheUtil.set_cache(self.m_big_money_process_index_cache, code, index) key = "m_big_money_process_index-{}".format(code) RedisUtils.setex_async(self._db, key, tool.get_expire(), index) # 是否已经处理过 def __get_processed_end_index(self, code): key = "m_big_money_process_index-{}".format(code) val = RedisUtils.get(self.__get_redis(), key) if val is None: return None return int(val) def __get_processed_end_index_cache(self, code): cache_result = tool.CodeDataCacheUtil.get_cache(self.m_big_money_process_index_cache, code) if cache_result[0]: return cache_result[1] return None # 处理大单 def process(self, code, start_index, end_index, limit_up_price): begin_pos = self.__get_begin_pos_cache(code) if begin_pos is None: # 没有获取到开始买入信号 return # 上次处理到的坐标 processed_index = self.__get_processed_end_index_cache(code) if processed_index is None: processed_index = 0 if processed_index >= end_index: return start_time = round(t.time() * 1000) total_datas = local_today_datas[code] num_splites = [round(5000 / limit_up_price), round(10000 / limit_up_price), round(20000 / limit_up_price), round(30000 / limit_up_price)] total_num = 0 for i in range(max(start_index, processed_index), end_index + 1): data = total_datas[i] if not L2DataUtil.is_limit_up_price_buy_cancel(data["val"]) and not L2DataUtil.is_limit_up_price_buy( data["val"]): continue # 如果是涨停买撤信号需要看数据位置是否比开始处理时间早 if L2DataUtil.is_limit_up_price_buy_cancel(data["val"]): # 获取买入信号 buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data_v2(total_datas[i], local_today_buyno_map.get( code)) if buy_index is not None and buy_index < begin_pos: continue # 计算成交金额 num = int(data["val"]["num"]) temp = 0 if num < num_splites[0]: pass elif num < num_splites[1]: temp = 1 elif num < num_splites[2]: temp = round(4 / 3, 3) elif num < num_splites[3]: temp = 2 else: temp = 4 count = int(temp * data["re"] * 1000) if L2DataUtil.is_limit_up_price_buy_cancel(data["val"]): count = 0 - count total_num += count self.__set_processed_end_index(code, end_index) big_money_num_manager.add_num(code, total_num) # print("m值大单计算范围:{}-{} 时间:{}".format(max(start_index, processed_index), end_index, # round(t.time() * 1000) - start_time)) class L2TradeDataProcessor: unreal_buy_dict = {} volume_rate_info = {} # 最近的闪电下单位置,格式为{code:(总卖时间,总卖额)} __latest_fast_place_order_info_dict = {} __codeActualPriceProcessor = CodeActualPriceProcessor() __ths_l2_trade_queue_manager = trade_queue_manager.thsl2tradequeuemanager() __thsBuy1VolumnManager = trade_queue_manager.THSBuy1VolumnManager() __l2PlaceOrderParamsManagerDict = {} __last_buy_single_dict = {} __TradeBuyQueue = transaction_progress.TradeBuyQueue() __latest_process_order_unique_keys = {} __latest_process_not_order_unique_keys_count = {} __trade_log_placr_order_info_dict = {} # 下单信息保存 # 初始化 __TradePointManager = l2_data_manager.TradePointManager() __SCancelBigNumComputer = SCancelBigNumComputer() __HourCancelBigNumComputer = HourCancelBigNumComputer() __LCancelBigNumComputer = LCancelBigNumComputer() __GCancelBigNumComputer = NewGCancelBigNumComputer() __TradeStateManager = trade_manager.TradeStateManager() __CodesTradeStateManager = trade_manager.CodesTradeStateManager() __PauseBuyCodesManager = gpcode_manager.PauseBuyCodesManager() __Buy1PriceManager = code_price_manager.Buy1PriceManager() __AccountMoneyManager = AccountMoneyManager() __TradeBuyDataManager = trade_data_manager.TradeBuyDataManager() __LimitUpTimeManager = limit_up_time_manager.LimitUpTimeManager() __BlackListCodeManager = gpcode_manager.BlackListCodeManager() __WhiteListCodeManager = gpcode_manager.WhiteListCodeManager() __WantBuyCodesManager = gpcode_manager.WantBuyCodesManager() __TradeTargetCodeModeManager = TradeTargetCodeModeManager() __TradeOrderIdManager = trade_huaxin.TradeOrderIdManager() __LatestCancelIndexManager = LatestCancelIndexManager() __L2MarketSellManager = L2MarketSellManager() __L2LimitUpSellManager = L2LimitUpSellManager() __PlaceOrderCountManager = PlaceOrderCountManager() __CodeNatureRecordManager = code_nature_analyse.CodeNatureRecordManager() __MarketSituationManager = MarketSituationManager() __re_compute_threading_pool = concurrent.futures.ThreadPoolExecutor(max_workers=10) # 买入锁 __buy_lock_dict = {} # 当前批次正在处理的数据索引 __processing_data_indexes = {} # 下次买的时间 __next_buy_time_dict = {} # 中断本批次买入数据处理 __break_current_batch_data_for_buy_dict = {} # 最近的执行位置 __latest_exec_indexes = {} # 获取代码评分 @classmethod def get_code_scores(cls): score_dict = {} for code in cls.__l2PlaceOrderParamsManagerDict: score = cls.__l2PlaceOrderParamsManagerDict[code].score score_dict[code] = score return score_dict @classmethod # 数据处理入口 # datas: 本次截图数据 # capture_timestamp:截图时间戳 def process(cls, code, datas, capture_timestamp): __start_time = round(t.time() * 1000) try: if len(datas) > 0: # 判断价格区间是否正确 if not code_data_util.is_same_code_with_price(code, float(datas[0]["val"]["price"])): raise L2DataException(L2DataException.CODE_PRICE_ERROR, "股价不匹配 code-{} price-{}".format(code, datas[0]["val"]["price"])) # 加载历史数据,返回数据是否正常 is_normal = l2.l2_data_util.load_l2_data(code) if not is_normal: # print("历史数据异常:", code) # 数据不正常需要禁止交易 l2_trade_util.forbidden_trade(code, msg="L2历史数据异常", force=True) # 纠正数据 if constant.L2_SOURCE_TYPE == constant.L2_SOURCE_TYPE_THS: # 同花顺需要纠正数据,其他渠道不需要 datas = l2.l2_data_util.L2DataUtil.correct_data(code, local_latest_datas.get(code), datas) _start_index = 0 if local_today_datas.get(code) is not None and len( local_today_datas[code]) > 0: _start_index = local_today_datas[code][-1]["index"] + 1 add_datas = l2.l2_data_util.L2DataUtil.get_add_data(code, local_latest_datas.get(code), datas, _start_index) # -------------数据增量处理------------ try: cls.process_add_datas(code, add_datas, capture_timestamp, __start_time) finally: # 保存数据 __start_time = round(t.time() * 1000) l2.l2_data_util.save_l2_data(code, datas, add_datas) # __start_time = l2_data_log.l2_time(code, # round(t.time() * 1000) - __start_time, # "保存数据时间({})".format(len(add_datas))) finally: if code in cls.unreal_buy_dict: cls.unreal_buy_dict.pop(code) @classmethod def set_real_place_order_index(cls, code, index, order_begin_pos: OrderBeginPosInfo): trade_record_log_util.add_real_place_order_position_log(code, index, order_begin_pos.buy_single_index) l2_log.debug(code, "设置真实下单位:{}", index) cancel_buy_strategy.set_real_place_position(code, index, order_begin_pos.buy_single_index, is_default=False) # 获取真实下单位置之后需要判断F撤 try: cancel_result = cancel_buy_strategy.FCancelBigNumComputer().need_cancel_for_deal_fast(code) if cancel_result[0]: L2TradeDataProcessor.cancel_buy(code, f"F撤:{cancel_result[1]}", cancel_type=trade_constant.CANCEL_TYPE_F) return else: l2_log.f_cancel_debug(code, f"获取真实成交位的F撤未生效:{cancel_result[1]}") except Exception as e: logger_debug.exception(e) # 判断与执行位置的间隔时间 if order_begin_pos.mode != OrderBeginPosInfo.MODE_RADICAL: # 非扫入下单要判断执行位置与真实下单位的间隔时间 total_datas = local_today_datas.get(code) if tool.trade_time_sub_with_ms(L2DataUtil.get_time_with_ms(total_datas[index]["val"]), L2DataUtil.get_time_with_ms( total_datas[order_begin_pos.buy_exec_index]["val"])) > 2000: L2TradeDataProcessor.cancel_buy(code, f"真实下单位({index})与执行位({order_begin_pos.buy_exec_index})相差2s以上", cancel_type=trade_constant.CANCEL_TYPE_F) return # 处理华鑫L2数据 @classmethod def process_huaxin(cls, code, origin_datas): datas = None try: # 加载历史的L2数据 is_normal = l2.l2_data_util.l2_data_is_normal(code) if not is_normal: # 数据不正常需要禁止交易 l2_trade_util.forbidden_trade(code, msg="L2历史数据异常", force=True) # 转换数据格式 _start_index = 0 total_datas = local_today_datas.get(code) if code not in local_today_datas: local_today_datas[code] = [] if total_datas: _start_index = total_datas[-1]["index"] + 1 # 如果是板上,就需要剔除板上卖数据, 默认不剔除 filter_limit_up_sell = False trade_price_info = HuaXinSellOrderStatisticManager.get_latest_trade_price_info(code) limit_up_price = gpcode_manager.get_limit_up_price(code) if limit_up_price: limit_up_price = round(float(limit_up_price), 2) # if trade_price_info and limit_up_price and trade_price_info[0] == limit_up_price: # filter_limit_up_sell = True datas = l2_huaxin_util.get_format_l2_datas(code, origin_datas, limit_up_price, _start_index, filter_limit_up_sell) L2LimitUpSellDataManager.add_l2_data(code, datas) __start_time = round(t.time() * 1000) if len(datas) > 0: cls.process_add_datas(code, datas, 0, __start_time) except Exception as e: async_log_util.error(logger_l2_error, f"code:{code}") # async_log_util.exception(logger_l2_error, e) logger_l2_error.exception(e) finally: if datas: l2.l2_data_util.save_l2_data(code, None, datas) origin_datas.clear() @classmethod def __recompute_real_order_index(cls, code, pre_real_order_index, order_info, compute_type): # 1s之后重新计算 time.sleep(1) real_order_index = huaxin_delegate_postion_manager.RealDelegateOrderPositionManager().recompute_l2_place_order_position( code, order_info, pre_real_order_index, compute_type) if real_order_index and pre_real_order_index != real_order_index: try: exec_index = order_info[6] order_begin_pos = cls.__get_order_begin_pos( code) async_log_util.info(logger_debug, f"下单位矫正:真实下单位-{real_order_index} 订单信息-{order_info} 下单信息-{order_begin_pos.to_dict()}") if order_begin_pos and order_begin_pos.buy_exec_index == exec_index: cls.set_real_place_order_index(code, real_order_index, order_begin_pos) async_log_util.info(logger_real_place_order_position, f"真实下单位置矫正:{code}-({real_order_index},1)") except Exception as e: logger_debug.exception(e) @classmethod def process_add_datas(cls, code, add_datas, capture_timestamp, __start_time): now_time_str = tool.get_now_time_str() # 将本次中断设置为 cls.__break_current_batch_data_for_buy_dict[code] = False if len(add_datas) > 0: # 记录当前批数据的索引 cls.__processing_data_indexes[code] = (add_datas[0]["index"], add_datas[-1]["index"]) if code not in cls.__trade_log_placr_order_info_dict: cls.__trade_log_placr_order_info_dict[code] = trade_record_log_util.PlaceOrderInfo() # 拼接数据 local_today_datas[code].extend(add_datas) l2.l2_data_util.load_num_operate_map(local_today_num_operate_map, code, add_datas) l2.l2_data_util.load_buy_no_map(local_today_buyno_map, code, add_datas) l2.l2_data_util.load_sell_no_map(local_today_sellno_map, code, add_datas) l2.l2_data_util.load_canceled_buy_no_map(local_today_canceled_buyno_map, code, add_datas) if constant.L2_SOURCE_TYPE == constant.L2_SOURCE_TYPE_HUAXIN: try: if constant.TEST: pass # order_begin_pos = cls.__get_order_begin_pos(code) # if order_begin_pos.buy_exec_index and order_begin_pos.buy_exec_index>=0: # place_order_index = add_datas[-1]["index"] # cls.set_real_place_order_index(code, place_order_index, order_begin_pos.buy_single_index) else: # ------------09:30:00之前处理排1的问题-------------- if int(add_datas[-1]["val"]["time"].replace(":", "")) < 93000: # 获取是否下单 buy_info = buy_open_limit_up_strategy.BuyOpenLimitupDataManager().get_place_order_info(code) # 设置虚拟下单信息 if buy_info: try: buy_single_index, buy_exec_index, buy_exec_index = add_datas[0]["index"], \ add_datas[0]["index"], \ add_datas[0]["index"] buy_volume_rate = 0 sell_info = ("09:15:00", 0) threshold_money = 0 order_begin_pos_info = OrderBeginPosInfo(buy_single_index=buy_single_index, buy_exec_index=buy_exec_index, buy_compute_index=buy_exec_index, num=1, count=1, max_num_set=set(), buy_volume_rate=buy_volume_rate, mode=OrderBeginPosInfo.MODE_OPEN_LIMIT_UP, mode_desc=f"排1下单", sell_info=sell_info, threshold_money=threshold_money) cls.__save_order_begin_data(code, order_begin_pos_info) # 设置下单信息 limit_up_price = gpcode_manager.get_limit_up_price_as_num(code) shadow_price = tool.get_shadow_price(limit_up_price) huaxin_delegate_postion_manager.place_order(code, limit_up_price, buy_info[0], buy_exec_index, add_datas[0], buy_info[2], shadow_price=shadow_price, shadow_volume=buy_info[1]) cls.__place_order_success(code, order_begin_pos_info, None) except Exception as e: logger_l2_error.exception(e) finally: buy_open_limit_up_strategy.BuyOpenLimitupDataManager().remove_place_order_info(code) # 获取下单位置 if constant.IS_NEW_VERSION_PLACE_ORDER: place_order_index, order_info, compute_type = huaxin_delegate_postion_manager.RealDelegateOrderPositionManager.compute_l2_place_order_position( code, add_datas) else: place_order_index, order_info, compute_type = huaxin_delegate_postion_manager.get_l2_place_order_position( code, float( gpcode_manager.get_limit_up_price(code)), add_datas) if place_order_index: order_begin_pos = cls.__get_order_begin_pos( code) cls.set_real_place_order_index(code, place_order_index, order_begin_pos) try: cls.__re_compute_threading_pool.submit( cls.__recompute_real_order_index, code, place_order_index, order_info, compute_type) except: pass async_log_util.info(logger_l2_process, f"code:{code} 获取到下单真实位置:{place_order_index}") # 处理涨停卖与涨停卖撤 try: for d in add_datas: if L2DataUtil.is_limit_up_price_sell(d['val']): L2TradeSingleDataProcessor.add_l2_delegate_limit_up_sell(code, d) elif L2DataUtil.is_limit_up_price_sell_cancel(d['val']): L2TradeSingleDataProcessor.add_l2_delegate_limit_up_sell_cancel(code, d['val']['orderNo']) except Exception as e: logger_debug.exception(e) except: async_log_util.error(logger_l2_error, f"{code} 处理真实下单位置出错") # 第1条数据是否为09:30:00 if add_datas[0]["val"]["time"] == "09:30:00": if global_util.cuurent_prices.get(code): price_data = global_util.cuurent_prices.get(code) if price_data[1]: # 当前涨停价,设置涨停时间 async_log_util.info(logger_l2_process, f"开盘涨停:{code}") # 保存涨停时间 cls.__LimitUpTimeManager.save_limit_up_time(code, "09:30:00") total_datas = local_today_datas[code] # __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time, # "l2数据预处理时间") # 9:29:55开始处理数据 place_ordered_desc = "" if len(add_datas) > 0 and int(tool.get_now_time_str().replace(":", "")) > int("092955"): # 是否为首板代码 is_first_code = True # gpcode_manager.FirstCodeManager().is_in_first_record(code) # 计算量 current_sell = cls.__L2MarketSellManager.get_current_total_sell_data(code) total_sell_volume = 0 if current_sell and len(current_sell) > 2: total_sell_volume = current_sell[2] volume_rate = code_volumn_manager.CodeVolumeManager().get_volume_rate(code, total_sell_volume=total_sell_volume) volume_rate_index = code_volumn_manager.CodeVolumeManager().get_volume_rate_index(volume_rate) # 计算分值 limit_up_time = cls.__LimitUpTimeManager.get_limit_up_time_cache(code) if limit_up_time is None: limit_up_time = tool.get_now_time_str() score = None cls.__l2PlaceOrderParamsManagerDict[code] = l2_trade_factor.L2PlaceOrderParamsManager(code, is_first_code, volume_rate, volume_rate_index, score, total_datas[-1][ 'val']['time']) cls.volume_rate_info[code] = (volume_rate, volume_rate_index) latest_time = add_datas[-1]["val"]["time"] # __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time, # "l2数据准备时间") # 时间差不能太大才能处理 if not l2_trade_util.is_in_forbidden_trade_codes(code): # 计算板上卖,当数据少时才计算,否则不计算 try: if len(add_datas) < 20: has_limit_up_sell = False for d in add_datas: if L2DataUtil.is_limit_up_price_sell(d["val"]): if d["val"]["num"] * float(d["val"]["price"]) < 5000: continue cls.__L2LimitUpSellManager.add_limit_up_sell(code, d["index"]) has_limit_up_sell = True if has_limit_up_sell: LCancelRateManager.compute_big_num_deal_info(code) # elif L2DataUtil.is_limit_up_price_sell_cancel(d["val"]): # cls.__L2LimitUpSellManager.add_limit_up_sell(code, d["index"]) except Exception as e: async_log_util.error(logger_l2_error, f"计算板上卖出错:{str(e)}") # 判断是否已经挂单 state = cls.__CodesTradeStateManager.get_trade_state_cache(code) start_index = len(total_datas) - len(add_datas) end_index = len(total_datas) - 1 if state == trade_constant.TRADE_STATE_BUY_DELEGATED or state == trade_constant.TRADE_STATE_BUY_PLACE_ORDER or state == trade_constant.TRADE_STATE_BUY_SUCCESS: # 已挂单 place_ordered_desc = "已下单" cls.__process_order(code, start_index, end_index, capture_timestamp, is_first_code) else: place_ordered_desc = "未下单" # 未挂单,时间相差不大才能挂单 # tool.trade_time_sub(latest_time, "09:32:00") < 0 if l2.l2_data_util.L2DataUtil.is_same_time( now_time_str, latest_time): cls.__process_not_order(code, start_index, end_index, capture_timestamp, is_first_code) else: place_ordered_desc += ":数据延迟" l2_log.info(code, logger_l2_process, "code:{} 处理数据范围: {}-{} 处理时间:{} 线程ID:{} 处理情况:{}", code, add_datas[0]["index"], add_datas[-1]["index"], round(t.time() * 1000) - __start_time, l2_log.threadIds.get(code), place_ordered_desc) # 处理未挂单 @classmethod def __process_not_order(cls, code, start_index, end_index, capture_time, is_first_code): __start_time = round(t.time() * 1000) # 获取阈值 threshold_money, msg = cls.__get_threshmoney(code) cls.__start_compute_buy(code, start_index, end_index, threshold_money, capture_time, is_first_code) # 测试专用 @classmethod def process_order(cls, code, start_index, end_index, capture_time, is_first_code, new_add=True): cls.__process_order(code, start_index, end_index, capture_time, is_first_code, new_add) # 处理已挂单 @classmethod def __process_order(cls, code, start_index, end_index, capture_time, is_first_code, new_add=True): # 增加推出机制 unique_key = f"{start_index}-{end_index}" if cls.__latest_process_order_unique_keys.get(code) == unique_key: async_log_util.error(logger_l2_error, f"重复处理数据:code-{code} start_index-{start_index} end_index-{end_index}") return cls.__latest_process_order_unique_keys[code] = unique_key # H撤 def h_cancel(_buy_single_index, _buy_exec_index): _start_time = round(t.time() * 1000) try: b_need_cancel, b_cancel_data = cls.__HourCancelBigNumComputer.need_cancel(code, _buy_single_index, _buy_exec_index, start_index, end_index, total_data, code_volumn_manager.CodeVolumeManager().get_volume_rate_index( order_begin_pos.buy_volume_rate), cls.volume_rate_info[code][1], is_first_code) if b_need_cancel and b_cancel_data: return b_cancel_data, "H撤" except Exception as e: if constant.TEST: logging.exception(e) # TODO 可能耗时 logger_l2_error.exception(e) async_log_util.error(logger_l2_error, f"H撤出错 参数:buy_single_index-{_buy_single_index} buy_exec_index-{_buy_exec_index} {str(e)}") async_log_util.exception(logger_l2_error, e) # logger_l2_error.exception(e) finally: # l2_data_log.l2_time(code, round(t.time() * 1000) - _start_time, "已下单-H撤大单计算") pass return None, "" # L撤 def l_cancel(_buy_single_index, _buy_exec_index): _start_time = round(t.time() * 1000) try: b_need_cancel, b_cancel_data, extra_msg, b_cancel_type = cls.__LCancelBigNumComputer.need_cancel(code, _buy_exec_index, start_index, end_index, total_data, is_first_code) if b_need_cancel and b_cancel_data: return b_cancel_data, f"L撤({extra_msg})", b_cancel_type except Exception as e: async_log_util.error(logger_l2_error, f"L撤出错 参数:buy_single_index-{_buy_single_index} buy_exec_index-{_buy_exec_index} 错误原因:{str(e)}") # logger_l2_error.exception(e) async_log_util.exception(logger_l2_error, e) finally: # l2_data_log.l2_time(code, round(t.time() * 1000) - _start_time, "已下单-L撤大单计算") pass return None, "", None # G撤 def g_cancel(_buy_single_index, _buy_exec_index): try: b_need_cancel, b_cancel_data, extra_msg = cls.__GCancelBigNumComputer.need_cancel(code, _buy_exec_index, start_index, end_index) if b_need_cancel and b_cancel_data: return b_cancel_data, f"G撤({extra_msg})" except Exception as e: async_log_util.error(logger_l2_error, f"G撤出错 参数:buy_single_index-{_buy_single_index} buy_exec_index-{_buy_exec_index} 错误原因:{str(e)}") # logger_l2_error.exception(e) async_log_util.exception(logger_l2_error, e) finally: # l2_data_log.l2_time(code, round(t.time() * 1000) - _start_time, "已下单-L撤大单计算") pass return None, "" # B撤 def b_cancel(_buy_single_index, _buy_exec_index): try: b_need_cancel, b_cancel_data, extra_msg = cls.__GCancelBigNumComputer.need_cancel_for_b(code, start_index, end_index) if b_need_cancel and b_cancel_data: return b_cancel_data, f"G撤({extra_msg})" except Exception as e: async_log_util.error(logger_l2_error, f"B撤出错 参数:buy_single_index-{_buy_single_index} buy_exec_index-{_buy_exec_index} 错误原因:{str(e)}") return None, "" # J撤 def j_cancel(_buy_single_index, _buy_exec_index): try: b_need_cancel, b_cancel_data, extra_msg = JCancelBigNumComputer().need_cancel(code, start_index, end_index) if b_need_cancel and b_cancel_data: return b_cancel_data, f"J撤({extra_msg})" except Exception as e: async_log_util.error(logger_l2_error, f"J撤出错 参数:buy_single_index-{_buy_single_index} buy_exec_index-{_buy_exec_index} 错误原因:{str(e)}") return None, "" # RD撤 def rd_cancel(_buy_single_index, _buy_exec_index): try: b_need_cancel, b_cancel_data, extra_msg = RDCancelBigNumComputer().need_cancel(code, start_index, end_index) if b_need_cancel and b_cancel_data: big_order_info = radical_buy_data_manager.get_total_deal_big_order_info(code, gpcode_manager.get_limit_up_price_as_num( code)) if big_order_info[0] > 0: return b_cancel_data, f"RD撤({extra_msg})", trade_constant.CANCEL_TYPE_RD except Exception as e: async_log_util.error(logger_l2_error, f"RD撤出错 参数:buy_single_index-{_buy_single_index} buy_exec_index-{_buy_exec_index} 错误原因:{str(e)}") return None, "", trade_constant.CANCEL_TYPE_RD if start_index < 0: start_index = 0 if end_index < start_index: return total_data = local_today_datas.get(code) _start_time = tool.get_now_timestamp() # 获取买入信号起始点 order_begin_pos = cls.__get_order_begin_pos( code) # 默认量为0.2 if order_begin_pos.buy_volume_rate is None: order_begin_pos.buy_volume_rate = 0.2 cancel_data, cancel_msg, cancel_type = None, "", None if order_begin_pos.mode == OrderBeginPosInfo.MODE_RADICAL: if not cancel_data: cancel_data, cancel_msg, cancel_type = rd_cancel(order_begin_pos.buy_single_index, order_begin_pos.buy_exec_index) # 扫入下单只有L撤 if not cancel_data: cancel_data, cancel_msg, cancel_type = l_cancel(order_begin_pos.buy_single_index, order_begin_pos.buy_exec_index) else: if not cancel_data: cancel_data, cancel_msg = g_cancel(order_begin_pos.buy_single_index, order_begin_pos.buy_exec_index) cancel_type = trade_constant.CANCEL_TYPE_G # 依次处理 if not cancel_data: cancel_data, cancel_msg, cancel_type = l_cancel(order_begin_pos.buy_single_index, order_begin_pos.buy_exec_index) # B撤 if not cancel_data: cancel_data, cancel_msg = b_cancel(order_begin_pos.buy_single_index, order_begin_pos.buy_exec_index) cancel_type = trade_constant.CANCEL_TYPE_G if not cancel_data: cancel_data, cancel_msg = h_cancel(order_begin_pos.buy_single_index, order_begin_pos.buy_exec_index) cancel_type = trade_constant.CANCEL_TYPE_H # J撤 if not cancel_data: cancel_data, cancel_msg = j_cancel(order_begin_pos.buy_single_index, order_begin_pos.buy_exec_index) cancel_type = trade_constant.CANCEL_TYPE_J if cancel_data and not DCancelBigNumComputer().has_auto_cancel_rules(code): l2_log.debug(code, "触发撤单,撤单位置:{} ,撤单原因:{}", cancel_data["index"], cancel_msg) # 撤单 cls.cancel_buy(code, cancel_msg, cancel_index=cancel_data["index"], cancel_type=cancel_type) # 撤单成功,继续计算下单 cls.__process_not_order(code, cancel_data["index"] + 1, end_index, capture_time, is_first_code) else: pass @classmethod def __is_pre_can_buy(cls, code): """ 预判断是否可买 @param code: @return: """ if not cls.__TradeStateManager.is_can_buy_cache(): return False, True, f"今日已禁止交易", True if l2_trade_util.is_in_forbidden_trade_codes(code): return False, True, f"代码禁止交易", True if cls.__PauseBuyCodesManager.is_in_cache(code): return False, True, f"该代码被暂停交易", True now_time_int = int(tool.get_now_time_str().replace(":", "")) if now_time_int >= 145700: return False, True, f"14:57后不能交易", True # if now_time_int < 93100: # return False, True, f"09:31之前不能交易", True # 二板以上的票不买 yesterday_codes = kpl_data_manager.get_yesterday_limit_up_codes() if yesterday_codes and code in yesterday_codes: return False, True, f"不买高位板", True if cls.__TradeTargetCodeModeManager.get_mode_cache() == TradeTargetCodeModeManager.MODE_ONLY_BUY_WANT_CODES: if not cls.__WantBuyCodesManager.is_in_cache( code) and not gpcode_manager.GreenListCodeManager().is_in_cache(code): return False, True, f"只买想买:没在想买单和绿单", True if not cls.__WantBuyCodesManager.is_in_cache(code): average_rate = cls.__Buy1PriceManager.get_average_rate(code) if average_rate : if tool.is_ge_code(code): if average_rate <= 0.1: return False, True, f"均价涨幅({average_rate})小于10%", True else: if average_rate <= 0.05: return False, True, f"均价涨幅({average_rate})小于5%", True return True, False, f"", False @classmethod def start_buy(cls, code, last_data, last_data_index, is_first_code, block_info): """ 开始买入 @param code: @param last_data: @param last_data_index: @param is_first_code: @param block_info:板块信息:[(板块,流入信息)] @return: """ return cls.__buy(code, 0, last_data, last_data_index, is_first_code, block_info=block_info) @classmethod def get_active_buy_blocks(cls, code): """ 获取积极买入的板块 @param code: @return: """ if tool.is_sh_code(code): # 上证不积极下单 return None current_total_sell_data = L2MarketSellManager().get_current_total_sell_data(code) place_order_count = trade_data_manager.PlaceOrderCountManager().get_place_order_count(code) # 只有初次下单 + 总抛压大于500w能积极下单 if place_order_count > 0 or current_total_sell_data is None or current_total_sell_data[1] < 500 * 10000: return None if global_util.zyltgb_map.get(code) > 50 * 100000000: # 50亿以上的无法积极下单 return None can_buy_result = CodePlateKeyBuyManager.can_buy(code) if can_buy_result: if can_buy_result[5]: return can_buy_result[5] elif not can_buy_result[0] and can_buy_result[1]: # 10:00:00 前的独苗可积极买入 if tool.trade_time_sub(tool.get_now_time_str(), "10:00:00") <= 0: return ["独苗"] return None @classmethod def __place_order_success(cls, code, order_begin_pos, block_info): ################下单成功处理################ trade_result_manager.real_buy_success(code, cls.__TradePointManager) l2_log.debug(code, "处理买入成功1") cancel_buy_strategy.set_real_place_position(code, local_today_datas.get(code)[-1]["index"], order_begin_pos.buy_single_index, is_default=True) l2_log.debug(code, "处理买入成功2") params_desc = cls.__l2PlaceOrderParamsManagerDict[code].get_buy_rank_desc() l2_log.debug(code, params_desc) #################清除本次下单的大单数据############### EveryLimitupBigDealOrderManager.clear(code) ############记录下单时的数据############ try: jx_blocks, jx_blocks_by = KPLCodeJXBlockManager().get_jx_blocks_cache( code), KPLCodeJXBlockManager().get_jx_blocks_cache(code, by=True) if jx_blocks: jx_blocks = jx_blocks[0] if jx_blocks_by: jx_blocks_by = jx_blocks_by[0] info = cls.__trade_log_placr_order_info_dict[code] info.mode = order_begin_pos.mode info.mode_desc = order_begin_pos.mode_desc info.set_buy_index(order_begin_pos.buy_single_index, order_begin_pos.buy_exec_index) info.set_sell_info(order_begin_pos.sell_info) info.set_block_info(block_info) if jx_blocks: info.set_kpl_blocks(list(jx_blocks)) elif jx_blocks_by: info.set_kpl_blocks(list(jx_blocks_by)) else: info.set_kpl_blocks([]) can_buy_result = CodePlateKeyBuyManager.can_buy(code) if can_buy_result: if not can_buy_result[0] and can_buy_result[1]: info.set_kpl_match_blocks(["独苗"]) elif not can_buy_result[0] and not can_buy_result[1]: info.set_kpl_match_blocks(["非独苗不满足身位"]) else: temps = [] temps.extend([f"{x[0]}" for x in can_buy_result[0]]) if can_buy_result[5]: temps.append(f"积极买入:{can_buy_result[5]}") info.set_kpl_match_blocks(temps) trade_record_log_util.add_place_order_log(code, info) except Exception as e: async_log_util.error(logger_l2_error, f"加入买入记录日志出错:{str(e)}") @classmethod def __buy(cls, code, capture_timestamp, last_data, last_data_index, is_first_code, block_info=None): pre_result = cls.__is_pre_can_buy(code) if not pre_result[0]: l2_log.debug(code, "不可以下单,原因:{}", pre_result[2]) return False # 添加买入锁 if code not in cls.__buy_lock_dict: cls.__buy_lock_dict[code] = threading.Lock() with cls.__buy_lock_dict[code]: # 判断是否可以下单,不处于可下单状态需要返回 state = cls.__CodesTradeStateManager.get_trade_state_cache(code) if not trade_util.is_can_order_by_state(state): # 不处于可下单状态 return False __start_time = tool.get_now_timestamp() order_begin_pos = cls.__get_order_begin_pos( code) if order_begin_pos.MODE_RADICAL == order_begin_pos.mode: # 积极下单不判断条件 can, need_clear_data, reason, is_valid_exec_index = True, False, "扫入下单", True else: return False # can, need_clear_data, reason, is_valid_exec_index = cls.__can_buy_first(code) # 删除虚拟下单 if code in cls.unreal_buy_dict: cls.unreal_buy_dict.pop(code) if not can: if not is_valid_exec_index: if cls.__latest_exec_indexes.get(code) and cls.__latest_exec_indexes[code][ -1] == order_begin_pos.buy_exec_index: # 如果执行位置算做无效,就需要删除当前执行位置 cls.__latest_exec_indexes[code].pop() l2_log.debug(code, "不可以下单,原因:{}", reason) trade_record_log_util.add_cant_place_order_log(code, reason) if need_clear_data: # 中断买入 cls.__break_current_batch_data_for_buy_dict[code] = True trade_result_manager.real_cancel_success(code, order_begin_pos.buy_single_index, order_begin_pos.buy_exec_index, local_today_datas.get(code)) return False else: l2_log.debug(code, "可以下单,原因:{}, 下单模式:{}", reason, order_begin_pos.mode) try: # 判断是否为首封下单 order_begin_pos.first_limit_up_buy = radical_buy_data_manager.is_first_limit_up_buy(code) l2_log.debug(code, "开始执行买入") trade_manager.start_buy(code, capture_timestamp, last_data, last_data_index, order_begin_pos.mode, order_begin_pos.buy_exec_index) l2_log.debug(code, "执行买入成功") cls.__place_order_success(code, order_begin_pos, block_info) except Exception as e: async_log_util.exception(logger_l2_error, e) l2_log.debug(code, "执行买入异常:{}", str(e)) pass finally: # l2_log.debug(code, "m值影响因子:{}", l2_trade_factor.L2TradeFactorUtil.factors_to_string(code)) pass return True # 是否可以取消 @classmethod def __can_cancel(cls, code): if constant.TEST: return True, "" if cls.__WhiteListCodeManager.is_in_cache(code): return False, "代码在白名单中" # 暂时注释掉 # 14点后如果是板块老大就不需要取消了 # now_time_str = tool.get_now_time_str() # if int(now_time_str.replace(":", "")) >= 140000: # industry, codes = ths_industry_util.get_same_industry_codes(code, gpcode_manager.get_gp_list()) # if industry is None: # return True, "没有获取到行业" # codes_index = industry_codes_sort.sort_codes(codes, code) # if codes_index is not None and codes_index.get(code) is not None: # # 同一板块中老二后面的不能买 # if codes_index.get(code) == 0: # return False, "14:00后老大不能撤单" # elif codes_index.get(code) == 1: # # 判断老大是否都是09:30:00涨停的 # # 同1板块老大是09:30:00涨停,老二14:00砸开的不撤 # first_count = 0 # for key in codes_index: # if codes_index[key] == 0: # first_count += 1 # if limit_up_time_manager.get_limit_up_time(key) == "09:30:00": # first_count -= 1 # if first_count == 0: # return False, "14:00后老大都开盘涨停,老二不能撤单" return True, "" @classmethod def __is_at_limit_up_buy(cls, code, buy_exec_index=None): """ 是否是板上放量买 @param code: @param buy_exec_index: @return: """ total_data = local_today_datas.get(code) if buy_exec_index is None: # 执行位置为空,就取最近的一条数据 buy_exec_index = total_data[-1]["index"] latest_exec_indexes = cls.__latest_exec_indexes.get(code) if not latest_exec_indexes: latest_exec_indexes = [] # 判断是否是炸开后买入 last_exec_index = 0 if len(latest_exec_indexes) > 1: last_exec_index = latest_exec_indexes[-2] # 获取最近的非涨停价成交时间 not_limit_up_trade_time_with_ms = current_price_process_manager.get_trade_not_limit_up_time_with_ms( code) is_limit_up_buy = True if not_limit_up_trade_time_with_ms: t1 = int( L2DataUtil.get_time_with_ms(total_data[last_exec_index]["val"]).replace(":", "").replace(".", "")) t2 = int(not_limit_up_trade_time_with_ms.replace(":", "").replace(".", "")) t3 = int(L2DataUtil.get_time_with_ms(total_data[buy_exec_index]["val"]).replace(":", "").replace( ".", "")) if t1 < t2 <= t3: # 炸板时间在两次下单时间中间 is_limit_up_buy = False return is_limit_up_buy # 获取可以买的板块 @classmethod def __get_can_buy_block(cls, code): can_buy_result = CodePlateKeyBuyManager.can_buy(code) if can_buy_result is None: async_log_util.warning(logger_debug, "没有获取到板块缓存,将获取板块") latest_current_limit_up_records = kpl_data_manager.get_latest_current_limit_up_records() codes_delegate = set(CodesTradeStateManager().get_codes_by_trade_states_cache( {trade_constant.TRADE_STATE_BUY_DELEGATED, trade_constant.TRADE_STATE_BUY_PLACE_ORDER})) codes_success = set(CodesTradeStateManager().get_codes_by_trade_states_cache( {trade_constant.TRADE_STATE_BUY_SUCCESS})) CodePlateKeyBuyManager.update_can_buy_blocks(code, kpl_data_manager.KPLLimitUpDataRecordManager.latest_origin_datas, kpl_data_manager.KPLLimitUpDataRecordManager.total_datas, latest_current_limit_up_records, block_info.get_before_blocks_dict(), kpl_data_manager.KPLLimitUpDataRecordManager.get_current_limit_up_reason_codes_dict(), codes_delegate, codes_success) can_buy_result = CodePlateKeyBuyManager.can_buy(code) return can_buy_result @classmethod def can_buy_first(cls, code, limit_up_price): now_timestamp = int(tool.get_now_time_str().replace(":", "")) # 判断板块 # (可以买的板块列表, 是否是独苗, 消息简介,可买的强势主线, 板块关键词) can_buy_result = cls.__get_can_buy_block(code) # l2_log.debug(code, "获取到的板块信息:{}", can_buy_result) if can_buy_result is None: return False, True, "尚未获取到板块信息" # -------量的约束-------- volume_rate_thresholds = buy_condition_util.get_volume_rate_by_level( 1), buy_condition_util.get_volume_rate_by_level(2) k_format = code_nature_analyse.CodeNatureRecordManager().get_k_format_cache(code) # 是否有辨识度 is_special = True if k_format and k_format[8][0] else False # 是否是强势10分钟 is_in_strong_time = now_timestamp <= int("094000") # 是否是强势30分钟 is_in_strong_time_30 = now_timestamp <= int("100000") # 量参考是否在最近30天 # is_refer_volume_date_in_30_days = False # try: # volume_refer_date, volume_refer_date_distance = code_volumn_manager.get_volume_refer_date(code) # if volume_refer_date_distance < 30: # is_refer_volume_date_in_30_days = True # except: # pass # 获取市场行情 situation = cls.__MarketSituationManager.get_situation_cache() zylt_threshold_as_yi = buy_condition_util.get_zyltgb_threshold(situation) zyltgb_as_yi = round(global_util.zyltgb_map.get(code) / 100000000, 2) if zyltgb_as_yi >= zylt_threshold_as_yi[1]: return False, True, f"{zylt_threshold_as_yi[1]}亿以上的都不买({zyltgb_as_yi})" if zyltgb_as_yi < zylt_threshold_as_yi[0]: return False, True, f"{zylt_threshold_as_yi[0]}亿以下的都不买({zyltgb_as_yi})" if constant.ALL_ACTIVE_BUY: return True, False, "买所有" # 最优市值 is_best_zylt = True if zylt_threshold_as_yi[4] <= zyltgb_as_yi <= zylt_threshold_as_yi[5] else False if is_special: # 具有辨识度,算作最优市值 zyltgb_as_yi = zylt_threshold_as_yi[2] + 1 # 是否是最优自由流通市值 is_better_zylt = True if zylt_threshold_as_yi[2] <= zyltgb_as_yi <= zylt_threshold_as_yi[3] else False if is_in_strong_time_30 and is_best_zylt: # 强势30分钟,最优市值, 有可以下单的板块,不看量 return True, False, can_buy_result[2] msg_list = [] if is_in_strong_time: msg_list.append("强势10分钟") # 独苗 if not can_buy_result[0] and can_buy_result[1]: msg_list.append("独苗") if not is_better_zylt: # 如果没有辨识度才不买 if not is_special: return False, True, f"强势10分钟,无辨识度, 独苗({can_buy_result[4]})不下单({can_buy_result[4]})自由流通市值({zyltgb_as_yi})不是特优市值" elif not can_buy_result[0]: return False, True, f"非独苗不满足身位:{can_buy_result[2]}" else: msg_list.append("非独苗") if not is_better_zylt: msg_list.append("不满足自由流通") # 注释量的影响 # if k_format and (k_format[1][0] or k_format[3][0]): # # 股价创新高或者逼近前高 # if code in cls.volume_rate_info and cls.volume_rate_info[code][0] < volume_rate_thresholds[1]: # return False, True, f"强势10分钟,后排,不满足自由市值,股价创新高或者逼近前高,当日量比({cls.volume_rate_info[code][0]})小于{volume_rate_thresholds[1]}" # else: # if code in cls.volume_rate_info and cls.volume_rate_info[code][0] < volume_rate_thresholds[ # 0] and not is_special: # return False, True, f"强势10分钟,后排,无辨识度,不满足自由市值,当日量比({cls.volume_rate_info[code][0]})小于{volume_rate_thresholds[0]}" else: msg_list.append("满足自由流通") # 后排,满足自由流通市值需要下单 return True, False, can_buy_result[2] return True, False, can_buy_result[2] else: # 注释量的影响 # # 上5个交易日有炸板之后 # if has_open_limit_up_in_5: # if code in cls.volume_rate_info and cls.volume_rate_info[code][0] < 0.29: # return False, True, f"非强势10分钟,上5个交易日炸板,量未达到{0.29}({cls.volume_rate_info[code][0]})" # # # 上5个交易日有跌停 # if has_limit_down_in_5: # if code in cls.volume_rate_info and cls.volume_rate_info[code][0] < 0.29: # return False, True, f"非强势10分钟,上5个交易日跌停,量未达到{0.29}({cls.volume_rate_info[code][0]})" # 注释量的影响 # # 获取量的参考日期 # if code in global_util.max60_volumn: # day = global_util.max60_volumn[code][1] # if day in HistoryKDatasUtils.get_latest_trading_date_cache(5): # if code in cls.volume_rate_info and cls.volume_rate_info[code][0] < 0.29 and not is_special: # return False, True, f"参考量在最近5天,无辨识度,量未达到0.29({cls.volume_rate_info[code][0]})" # 非强势10分钟只买主线 if not can_buy_result[0] and can_buy_result[1]: if not is_special and not is_best_zylt: return False, True, f"非强势10分钟,无辨识度,非特优市值,独苗({can_buy_result[4]})不下单" # 注释量的影响 # if can_buy_result[3]: # # 强势主线 # if not is_better_zylt: # if k_format and (k_format[1][0] or k_format[3][0]): # # 股价创新高或者逼近前高 # if code in cls.volume_rate_info and cls.volume_rate_info[code][0] < volume_rate_thresholds[1]: # return False, True, f"非强势10分钟,强势主线后排,不满足自由市值,股价创新高或者逼近前高,当日量比({cls.volume_rate_info[code][0]})小于{volume_rate_thresholds[1]}" # else: # if code in cls.volume_rate_info and cls.volume_rate_info[code][0] < volume_rate_thresholds[ # 0] and not is_special: # return False, True, f"非强势10分钟,强势主线后排,不满足自由市值,无辨识度,当日量比({cls.volume_rate_info[code][0]})小于{volume_rate_thresholds[0]}" # else: # if k_format and (k_format[1][0] or k_format[3][0]): # if code in cls.volume_rate_info and cls.volume_rate_info[code][0] < volume_rate_thresholds[0]: # return False, True, f"非强势10分钟,强势主线后排,满足自由市值,股价创新高或者逼近前高, 当日量比({cls.volume_rate_info[code][0]})小于{volume_rate_thresholds[0]}" # else: # # 非强势主线 # if not is_better_zylt: # if code in cls.volume_rate_info and cls.volume_rate_info[code][0] < volume_rate_thresholds[ # 1] and not is_special: # return False, True, f"非强势10分钟,非强势主线后排【主线后排】,不满足自由市值,无辨识度, 当日量比({cls.volume_rate_info[code][0]})小于{volume_rate_thresholds[1]}" # else: # if k_format and (k_format[1][0] or k_format[3][0]): # # 股价创新高或者逼近前高 # if code in cls.volume_rate_info and cls.volume_rate_info[code][0] < volume_rate_thresholds[1]: # return False, True, f"非强势10分钟,非强势主线后排【主线后排】,满足自由市值,股价创新高或者逼近前高, 当日量比({cls.volume_rate_info[code][0]})小于{volume_rate_thresholds[1]}" # else: # if code in cls.volume_rate_info and cls.volume_rate_info[code][0] < volume_rate_thresholds[ # 0] and not is_special: # return False, True, f"非强势10分钟,非强势主线后排【主线后排】, 满足自由市值,无辨识度,当日量比({cls.volume_rate_info[code][0]})小于{volume_rate_thresholds[0]}" return True, False, can_buy_result[2] @classmethod def can_buy_first_new(cls, code, limit_up_price): now_timestamp = int(tool.get_now_time_str().replace(":", "")) # 判断板块 # (可以买的板块列表, 是否是独苗, 消息简介,可买的强势主线, 板块关键词) can_buy_result = cls.__get_can_buy_block(code) if can_buy_result is None: return False, True, "尚未获取到板块信息" # 是否是强势30分钟 is_in_strong_time_30 = now_timestamp <= int("100000") # 获取市场行情 situation = cls.__MarketSituationManager.get_situation_cache() zylt_threshold_as_yi = buy_condition_util.get_zyltgb_threshold(situation) zyltgb_as_yi = round(global_util.zyltgb_map.get(code) / 100000000, 2) if zyltgb_as_yi < zylt_threshold_as_yi[0]: return False, True, f"{zylt_threshold_as_yi[0]}亿以下的都不买({zyltgb_as_yi})" # 测试时可取消注释 # if 1 > 0: # return True, False, "买所有" if zyltgb_as_yi >= zylt_threshold_as_yi[1]: return False, True, f"{zylt_threshold_as_yi[1]}亿以上的都不买({zyltgb_as_yi})" msg_list = [] if is_in_strong_time_30: msg_list.append("强势30分钟") # 独苗 if not can_buy_result[0] and can_buy_result[1]: msg_list.append("独苗") return True, False, "强势30分钟,独苗" elif not can_buy_result[0]: return False, True, f"强势30分钟,非独苗不满足身位:{can_buy_result[2]}" else: msg_list.append("非独苗") # 后排,满足自由流通市值需要下单 return True, False, can_buy_result[2] else: place_order_count = trade_data_manager.PlaceOrderCountManager().get_place_order_count(code) if place_order_count > 0: return True, False, "之前下过单" if not can_buy_result[0]: # 没有板块 if can_buy_result[1]: # 是独苗 if not TradeBlockBuyModeManager().can_buy_unique_block(): # 不能买独苗 return False, True, f"非强势30分钟,独苗,不满足身位:{can_buy_result[2]}" else: # 非独苗,没有可以买入的板块 return False, True, f"非强势30分钟,非独苗,不满足身位:{can_buy_result[2]}" return True, False, can_buy_result[2] @classmethod def __cancel_buy(cls, code): if code not in cls.__buy_lock_dict: cls.__buy_lock_dict[code] = threading.Lock() with cls.__buy_lock_dict[code]: try: l2_log.debug(code, "开始执行撤单") trade_manager.start_cancel_buy(code) l2_log.debug(code, "执行撤单成功") return True except Exception as e: logging.exception(e) l2_log.debug(code, "执行撤单异常:{}", str(e)) return False finally: pass @classmethod def cancel_buy(cls, code, msg=None, source="l2", cancel_index=None, cancel_type=None): finally_can_cancel = True total_datas = local_today_datas.get(code) order_begin_pos = cls.__get_order_begin_pos( code) if cancel_type != trade_constant.CANCEL_TYPE_HUMAN: # 是否是交易队列触发 # 扫入下单只有L撤能撤单 if order_begin_pos and order_begin_pos.mode == OrderBeginPosInfo.MODE_RADICAL and cancel_type not in { trade_constant.CANCEL_TYPE_L_DOWN, trade_constant.CANCEL_TYPE_L, trade_constant.CANCEL_TYPE_RD, trade_constant.CANCEL_TYPE_P}: l2_log.cancel_debug(code, "撤单中断,原因:{}", "扫入下单不是L撤") return False # 加绿只有L撤/人撤生效 if gpcode_manager.GreenListCodeManager().is_in_cache(code): if cancel_type not in {trade_constant.CANCEL_TYPE_L, trade_constant.CANCEL_TYPE_L_UP, trade_constant.CANCEL_TYPE_L_DOWN, trade_constant.CANCEL_TYPE_RD, trade_constant.CANCEL_TYPE_P}: l2_log.cancel_debug(code, "撤单中断,原因:{}", "加绿不是L撤") return False if not total_datas: return False if source == "trade_queue": # 交易队列触发的需要下单后5s if order_begin_pos.buy_exec_index is not None and order_begin_pos.buy_exec_index > 0: now_time_str = tool.get_now_time_str() if tool.trade_time_sub(now_time_str, total_datas[order_begin_pos.buy_exec_index]["val"]["time"]) < 5: return False if code in cls.unreal_buy_dict: cls.unreal_buy_dict.pop(code) # 取消买入标识 trade_result_manager.virtual_cancel_success(code, order_begin_pos.buy_single_index, order_begin_pos.buy_exec_index, total_datas) else: can_cancel, reason = cls.__can_cancel(code) if not can_cancel: # 不能取消且不是人工撤单 l2_log.cancel_debug(code, "撤单中断,原因:{}", reason) l2_log.debug(code, "撤单中断,原因:{}", reason) return False if finally_can_cancel: if cancel_index is None: cancel_index = total_datas[-1]["index"] cls.__LatestCancelIndexManager.set_latest_cancel_index(code, cancel_index) # 记录撤单日志 real_place_order_index = SCancelBigNumComputer().get_real_place_order_index_cache(code) # 添加撤单日志记录 trade_record_log_util.add_cancel_msg_log(code, real_place_order_index, msg) cancel_result = cls.__cancel_buy(code) if cancel_result: trade_result_manager.real_cancel_success(code, order_begin_pos.buy_single_index, order_begin_pos.buy_exec_index, total_datas, from_real_cancel=True) l2_log.debug(code, "执行撤单结束,原因:{}", msg) return True # 虚拟下单 @classmethod def __virtual_buy(cls, code, buy_single_index, buy_exec_index, capture_time): cls.unreal_buy_dict[code] = (buy_exec_index, capture_time) trade_result_manager.virtual_buy_success(code) @classmethod def __process_with_find_exec_index(cls, code, order_begin_pos: OrderBeginPosInfo, compute_end_index, block_info): """ 处理找到执行位置 @return: """ total_datas = local_today_datas.get(code) l2_log.debug(code, "获取到买入执行位置:{} m值:{} 纯买手数:{} 纯买单数:{} 是否板上买:{} 数据:{} ,量比:{} ,下单模式:{}", order_begin_pos.buy_exec_index, order_begin_pos.threshold_money, order_begin_pos.num, order_begin_pos.count, order_begin_pos.at_limit_up, total_datas[order_begin_pos.buy_exec_index], cls.volume_rate_info[code], order_begin_pos.mode) cls.__save_order_begin_data(code, order_begin_pos) cls.__LimitUpTimeManager.save_limit_up_time(code, total_datas[order_begin_pos.buy_exec_index]["val"]["time"]) l2_log.debug(code, "delete_buy_cancel_point") if code not in cls.__latest_exec_indexes: cls.__latest_exec_indexes[code] = [] cls.__latest_exec_indexes[code].append(order_begin_pos.buy_exec_index) # 保留最近3次的买入执行位置 if len(cls.__latest_exec_indexes[code]) > 3: cls.__latest_exec_indexes[code] = cls.__latest_exec_indexes[code][-3:] # 直接下单 ordered = cls.__buy(code, 0, total_datas[-1], total_datas[-1]["index"], True, block_info=block_info) # 数据是否处理完毕 if order_begin_pos.buy_exec_index < compute_end_index: if ordered: cls.__process_order(code, order_begin_pos.buy_exec_index + 1, compute_end_index, 0, True, False) else: cls.__start_compute_buy(code, order_begin_pos.buy_exec_index + 1, compute_end_index, order_begin_pos.threshold_money, 0, True, False) return ordered @classmethod def __start_compute_buy(cls, code, compute_start_index, compute_end_index, threshold_money, capture_time, is_first_code, new_add=True): # 中断当前批次买入数据处理 if cls.__break_current_batch_data_for_buy_dict.get(code): return # 判断下次买入时间是否正确 if code in cls.__next_buy_time_dict and t.time() < cls.__next_buy_time_dict[code]: l2_log.debug(code, f"下次可下单时间({compute_start_index}-{compute_end_index}):{cls.__next_buy_time_dict[code]}") return if code in cls.__next_buy_time_dict: cls.__next_buy_time_dict.pop(code) if compute_end_index < compute_start_index: return unique_key = f"{code}-{compute_start_index}-{compute_end_index}" if cls.__latest_process_not_order_unique_keys_count.get( unique_key) and cls.__latest_process_not_order_unique_keys_count.get(unique_key) > 2: async_log_util.error(logger_l2_error, f"重复处理数据:code-{code} start_index-{compute_start_index} end_index-{compute_end_index}") return if unique_key not in cls.__latest_process_not_order_unique_keys_count: cls.__latest_process_not_order_unique_keys_count[unique_key] = 0 cls.__latest_process_not_order_unique_keys_count[unique_key] += 1 _start_time = tool.get_now_timestamp() total_datas = local_today_datas[code] # ---------计算激进买入的信号--------- radical_result = cls.__compute_radical_order_begin_pos(code, compute_start_index, compute_end_index) if radical_result[0]: buy_single_index, buy_exec_index = radical_result[1], radical_result[1] buy_volume_rate = cls.volume_rate_info[code][0] refer_sell_data = cls.__L2MarketSellManager.get_refer_sell_data(code, total_datas[buy_single_index]["val"][ "time"]) if refer_sell_data: sell_info = (refer_sell_data[0], refer_sell_data[1]) else: sell_info = (total_datas[buy_single_index]["val"]["time"], 0) threshold_money = 0 order_begin_pos_info = OrderBeginPosInfo(buy_single_index=buy_single_index, buy_exec_index=buy_exec_index, buy_compute_index=buy_exec_index, num=total_datas[buy_single_index]["val"]["num"], count=1, max_num_set=set(), buy_volume_rate=buy_volume_rate, mode=OrderBeginPosInfo.MODE_RADICAL, mode_desc=f"大单不足扫入:{radical_result[2]}", sell_info=sell_info, threshold_money=threshold_money) order_begin_pos_info.at_limit_up = cls.__is_at_limit_up_buy(code) ordered = cls.__process_with_find_exec_index(code, order_begin_pos_info, compute_end_index, block_info=radical_result[3]) if ordered: radical_buy_data_manager.BlockPlaceOrderRecordManager().add_record(code, radical_result[2]) # 监听大单 RDCancelBigNumComputer().set_watch_indexes(code, radical_result[4]) return if RadicalBuyDealCodesManager().get_code_blocks(code): # 已经扫入下过单 return if not constant.CAN_COMMON_BUY: # 常规买不买入了 return # 获取买入信号计算起始位置 order_begin_pos = cls.__get_order_begin_pos(code) # 是否为新获取到的位置 new_get_single = False buy_single_index = order_begin_pos.buy_single_index if buy_single_index is None: # ------------------------------确定信号种类---------------------------------- # 第一步:获取积极下单信号 # if code.find('60') == 0: # 积极买 continue_count = 1 has_single, _index, sell_info, single_msg, mode = cls.__compute_active_order_begin_pos(code, continue_count, compute_start_index, compute_end_index) if not has_single: # 没有信号,如果当前数据是涨停买就记录日志,防止记录过多的日志 if L2DataUtil.is_limit_up_price_buy( total_datas[compute_start_index]["val"]) and L2DataUtil.is_limit_up_price_buy( total_datas[compute_end_index]["val"]): async_log_util.info(logger_l2_trade_buy_queue, "尚未获取到买入信号: code-{} start_index-{} end_index-{} msg-{}", code, compute_start_index, compute_end_index, sell_info) fast_msg = None if has_single: order_begin_pos.mode = mode order_begin_pos.mode_desc = f"委托触发: {single_msg}" order_begin_pos.sell_info = sell_info fast_msg = sell_info # 用了信号就必须清除掉原有信号 place_order_single_data_manager.L2TradeSingleDataManager.clear_data(code) if cls.__last_buy_single_dict.get(code) == _index: has_single = None _index = None buy_single_index = _index if has_single: # 判断是否是板上买 order_begin_pos.at_limit_up = cls.__is_at_limit_up_buy(code) # -----------------------------买入计算初始化,计算纯买额阈值----------------------- cls.__last_buy_single_dict[code] = buy_single_index new_get_single = True order_begin_pos.num = 0 order_begin_pos.count = 0 order_begin_pos.buy_single_index = buy_single_index order_begin_pos.threshold_money = int(sell_info[1] * 1.0) order_begin_pos.max_num_set = set() l2_log.debug(code, "获取到买入信号起始点:{} ,计算范围:{}-{} ,量比:{},是否板上买:{},数据:{} 模式:{}({}) 信号类型:{}", buy_single_index, compute_start_index, compute_end_index, cls.volume_rate_info[code], order_begin_pos.at_limit_up, total_datas[buy_single_index], order_begin_pos.mode, fast_msg, single_msg) # _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, "下单信号计算时间") if order_begin_pos.buy_single_index is None: # 未获取到买入信号,终止程序 return None # 开始计算的位置 start_process_index = max(order_begin_pos.buy_single_index, compute_start_index) if new_get_single: start_process_index = order_begin_pos.buy_single_index # 设置为总卖额 new_buy_exec_index, buy_nums, buy_count, rebegin_buy_pos, threshold_money_new, max_num_set_new, not_buy_msg, clear_buy_single = cls.__sum_buy_num_for_order_active( code, start_process_index, compute_end_index, order_begin_pos.num, order_begin_pos.count, order_begin_pos.threshold_money, order_begin_pos.buy_single_index, order_begin_pos.max_num_set, order_begin_pos.sell_info[1]) threshold_money = threshold_money_new order_begin_pos.threshold_money = threshold_money l2_log.debug(code, "范围:{}-{} m值-{} 量比:{} rebegin_buy_pos:{} clear_buy_single:{}", compute_start_index, compute_end_index, threshold_money, cls.volume_rate_info[code][0], rebegin_buy_pos, clear_buy_single) # 买入信号位与计算位置间隔2s及以上了 if rebegin_buy_pos is not None: # 需要重新计算纯买额 cls.__start_compute_buy(code, rebegin_buy_pos, compute_end_index, threshold_money, capture_time, is_first_code, False) return if new_buy_exec_index is not None: cls.__process_with_find_exec_index(code, OrderBeginPosInfo(buy_single_index=buy_single_index, buy_exec_index=new_buy_exec_index, buy_compute_index=new_buy_exec_index, num=buy_nums, count=buy_count, max_num_set=max_num_set_new, buy_volume_rate=cls.volume_rate_info[code][0], mode=order_begin_pos.mode, mode_desc=order_begin_pos.mode_desc, sell_info=order_begin_pos.sell_info, threshold_money=threshold_money), compute_end_index, block_info=None) else: # 未达到下单条件,保存纯买额,设置纯买额 # 记录买入信号位置 if not clear_buy_single: # 没有清除信号位置就保存下单位置信息 cls.__save_order_begin_data(code, OrderBeginPosInfo(buy_single_index=buy_single_index, buy_exec_index=-1, buy_compute_index=compute_end_index, num=buy_nums, count=buy_count, mode_desc=order_begin_pos.mode_desc, max_num_set=max_num_set_new, mode=order_begin_pos.mode, sell_info=order_begin_pos.sell_info, threshold_money=threshold_money)) # 记录没下单原因 async_log_util.info(logger_l2_not_buy_reasons, f"{code}#{not_buy_msg}") _start_time = t.time() # 获取下单起始信号 @classmethod def __get_order_begin_pos(cls, code) -> OrderBeginPosInfo: order_begin_pos = cls.__TradePointManager.get_buy_compute_start_data_cache( code) return order_begin_pos # 保存下单起始信号 @classmethod def __save_order_begin_data(cls, code, info: OrderBeginPosInfo): cls.__TradePointManager.set_buy_compute_start_data_v2(code, info) @classmethod def save_order_begin_data(cls, code, info: OrderBeginPosInfo): cls.__save_order_begin_data(code, info) # 计算下单起始信号 # compute_data_count 用于计算的l2数据数量 @classmethod def __compute_order_begin_pos(cls, code, start_index, continue_count, end_index): second_930 = 9 * 3600 + 30 * 60 + 0 # 倒数100条数据查询 datas = local_today_datas[code] if end_index - start_index + 1 < continue_count: return False, None __time = None last_index = None count = 0 start = None # now_time_s = tool.get_time_as_second(tool.get_now_time_str()) for i in range(start_index, end_index + 1): _val = datas[i]["val"] time_s = L2DataUtil.get_time_as_second(_val["time"]) # 时间要>=09:30:00 if time_s < second_930: continue # if not constant.TEST: # if abs(now_time_s - time_s) > 2: # # 正式环境下不处理2s外的数据 # continue if L2DataUtil.is_limit_up_price_buy(_val): # 金额要大于50万 if _val["num"] * float(_val["price"]) < 5000: continue # 寻找前面continue_count-1个涨停买 # for j in range(start_index - 1, -1, -1): # if datas[j]["val"] if last_index is None or (datas[last_index]["val"]["time"] == datas[i]["val"]["time"]): if start is None: start = i last_index = i count += datas[i]["re"] if count >= continue_count: return True, start else: # 本条数据作为起点 last_index = i count = datas[i]["re"] start = i elif not L2DataUtil.is_sell(_val) and not L2DataUtil.is_sell_cancel(_val): # 剔除卖与卖撤 last_index = None count = 0 start = None return False, None # 快速买入法的信号位置查找 @classmethod def __compute_fast_order_begin_pos(cls, code, start_index, end_index): total_datas = local_today_datas[code] start_time_str = total_datas[start_index]["val"]["time"] refer_sell_data = cls.__L2MarketSellManager.get_refer_sell_data(code, start_time_str) if refer_sell_data is None: return False, -1, "总卖为空" if cls.__L2MarketSellManager.is_refer_sell_time_used(code, refer_sell_data[0]): return False, -1, "总卖统计时间已被使用" # 是否大于2000万 if refer_sell_data[1] <= 2000 * 10000: return False, -1, "总卖小于指定金额" # 统计之前的卖 threshold_money = refer_sell_data[1] for i in range(start_index - 1, -1, -1): val = total_datas[i]["val"] if tool.compare_time(val["time"], refer_sell_data[0]) <= 0: break if L2DataUtil.is_sell(val): threshold_money += val["num"] * int(float(val["price"]) * 100) elif L2DataUtil.is_sell_cancel(val): threshold_money -= val["num"] * int(float(val["price"]) * 100) # 是否为本秒的第一个涨停买 for i in range(start_index, end_index + 1): data = total_datas[i] val = data['val'] if not L2DataUtil.is_limit_up_price_buy(val): # 要统计卖与卖撤 if L2DataUtil.is_sell(val): threshold_money += val["num"] * int(float(val["price"]) * 100) elif L2DataUtil.is_sell_cancel(val): threshold_money -= val["num"] * int(float(val["price"]) * 100) continue # 50 万以下的不需要 if val["num"] * float(val["price"]) < 5000: continue # 是否为本s的第一次涨停 is_first_limit_up = True for j in range(i - 1, -1, -1): temp_val = total_datas[j]["val"] if temp_val["time"] == val["time"]: if L2DataUtil.is_limit_up_price_buy(temp_val) and temp_val["num"] * float( temp_val["price"]) >= 5000: is_first_limit_up = True break else: break if is_first_limit_up: return True, i, [refer_sell_data[0], threshold_money] return False, None, None @classmethod def __get_active_single_start_index(cls, code, start_index, end_index, continue_count, valid_single=False): """ 获取主动买的信号起始索引 @param code: @param start_index: @param end_index: @param continue_count: @param valid_single: 是否验证成交买入信号 @return: """ total_datas = local_today_datas[code] last_index = None count = 0 start = None for i in range(start_index, end_index + 1): _val = total_datas[i]["val"] if L2DataUtil.is_limit_up_price_buy(_val): # 时间要>=09:30:00 if tool.trade_time_sub(_val["time"], "09:30:00") < 0: continue # 金额要大于50万 if _val["num"] * float(_val["price"]) < 5000: continue if last_index is None or ( total_datas[last_index]["val"]["time"] == total_datas[i]["val"]["time"]): # 深证非板上放量需要判断是否有信号 if valid_single: _single = place_order_single_data_manager.L2TradeSingleDataManager.get_valid_trade_single( code, tool.to_time_with_ms( _val['time'], _val['tms'])) if not _single: continue if start is None: start = i last_index = i count += total_datas[i]["re"] if count >= continue_count: return start else: # 本条数据作为起点 last_index = i count = total_datas[i]["re"] start = i elif not L2DataUtil.is_sell(_val) and not L2DataUtil.is_sell_cancel(_val): # 剔除卖与卖撤 last_index = None count = 0 start = None return None # 计算积极买的下单信号 @classmethod def __compute_active_order_begin_pos(cls, code, continue_count, start_index, end_index): """ 计算买入信号 @param code: @param continue_count: @param start_index: @param end_index: @return: """ total_datas = local_today_datas[code] start_time_str = total_datas[start_index]["val"]["time"] if end_index - start_index + 1 < continue_count: return False, -1, "信号不连续", '', OrderBeginPosInfo.MODE_NORMAL # 获取最近的总卖信息 # (time_str, round(money), volume, sell_1_info) refer_sell_data = cls.__L2MarketSellManager.get_refer_sell_data(code, start_time_str) active_buy_blocks = cls.get_active_buy_blocks(code) if tool.is_sh_code(code): if refer_sell_data is None: # 设置默认卖信息 refer_sell_data = (start_time_str, 0, 0, (round(float(total_datas[start_index]["val"]["price"]), 2), 0)) # 上证:只要有一个涨停买的就是信号 for i in range(start_index, end_index + 1): _val = total_datas[i]["val"] if not L2DataUtil.is_limit_up_price_buy(_val): continue # 时间要>=09:30:00 if tool.trade_time_sub(_val["time"], "09:30:00") < 0: continue # 金额要大于50万 if _val["num"] * float(_val["price"]) < 5000: continue return True, i, [refer_sell_data[0], 0], '上证买入', OrderBeginPosInfo.MODE_ACTIVE if active_buy_blocks else OrderBeginPosInfo.MODE_FAST return False, -1, "未获取到积极买的起始信号", '', OrderBeginPosInfo.MODE_NORMAL else: # 深证 if refer_sell_data is None: return False, -1, "总卖为空", '', OrderBeginPosInfo.MODE_NORMAL if refer_sell_data is None: refer_sell_data = (start_time_str, 0, 0, (round(float(total_datas[start_index]["val"]["price"]), 2), 0)) l2_log.debug(code, f"丢失总卖额,设置默认为0,计算范围:{start_index}-{end_index}") # 判断是否为真的板上放量 single = place_order_single_data_manager.L2TradeSingleDataManager.get_valid_trade_single( code, tool.to_time_with_ms(total_datas[end_index]['val']['time'], total_datas[end_index]['val']['tms'])) trade_price_info = HuaXinSellOrderStatisticManager.get_latest_trade_price_info(code) limit_up_price = gpcode_manager.get_limit_up_price(code) # p is_limit_up = False if limit_up_price and trade_price_info and abs(trade_price_info[0] - float(limit_up_price)) < 0.001: is_limit_up = True if refer_sell_data[1] > 0 or single or not is_limit_up: # 不是板上放量 # 判断最近有没有涨停卖数据 limit_up_sell_count = L2TradeSingleDataProcessor.get_latest_limit_up_sell_order_count(code) if (limit_up_sell_count == 0 or active_buy_blocks) and not single: # 如果没有涨停卖数据/积极下单而且还没有成交买入信号,就按照原来的总卖额计算 threshold_money, sell_1_price = copy.deepcopy(refer_sell_data[1]), refer_sell_data[3][0] for i in range(start_index - 1, -1, -1): val = total_datas[i]["val"] if tool.compare_time(val["time"], refer_sell_data[0]) < 0: # 读取的L2的总卖额是不包含当前s的数据,所以需要将当前s的数据纳入计算 break if L2DataUtil.is_sell(val): threshold_money += val["num"] * int(float(val["price"]) * 100) elif L2DataUtil.is_sell_cancel(val): threshold_money -= val["num"] * int(float(val["price"]) * 100) elif L2DataUtil.is_buy(val): # 判断价格(大于卖1) 被买吃掉 if round(float(val["price"]), 2) - sell_1_price >= 0: threshold_money -= val["num"] * int(float(val["price"]) * 100) buy_single_index = cls.__get_active_single_start_index(code, start_index, end_index, continue_count, valid_single=False) if buy_single_index is not None: return True, buy_single_index, [refer_sell_data[0], threshold_money], f"上板无涨停卖:涨停卖数据-{limit_up_sell_count}", OrderBeginPosInfo.MODE_ACTIVE if active_buy_blocks else OrderBeginPosInfo.MODE_NORMAL else: # 按照成交买入信号计算 buy_single_index = cls.__get_active_single_start_index(code, start_index, end_index, continue_count, valid_single=True) if buy_single_index is not None: return True, buy_single_index, [refer_sell_data[0], 0], "上板有成交买入信号", OrderBeginPosInfo.MODE_ACTIVE if active_buy_blocks else OrderBeginPosInfo.MODE_FAST else: # 板上放量:只需要一笔涨停买可作为信号 buy_single_index = cls.__get_active_single_start_index(code, start_index, end_index, continue_count, valid_single=False) if buy_single_index is not None: return True, buy_single_index, [refer_sell_data[0], 0], "板上放量", OrderBeginPosInfo.MODE_ACTIVE if active_buy_blocks else OrderBeginPosInfo.MODE_NORMAL return False, -1, "未获取到积极买的起始信号", '', OrderBeginPosInfo.MODE_NORMAL # 计算激进买的下单信号 @classmethod def __compute_radical_order_begin_pos(cls, code, start_index, end_index): """ 计算激进买入信号: 1.非板上放量 2.检测到299万大单买入 @param code: @param start_index: @param end_index: @return: (是否获取到信号, 信号位置, 扫入板块/消息, 扫入板块大单流入信息, 需要监听的大单) """ # 激进买信号的时间 def __can_order(): # 判断是否是板上放量 if cls.__is_at_limit_up_buy(code, start_index): return False, None, "板上放量", None total_datas = local_today_datas[code] limit_up_price = gpcode_manager.get_limit_up_price_as_num(code) bigger_money = l2_data_util.get_big_money_val(limit_up_price, tool.is_ge_code(code)) min_num = int(bigger_money / limit_up_price / 100) refer_sell_data = L2MarketSellManager().get_refer_sell_data(code, radical_data[3]) # 参考总卖额 refer_sell_money = 0 if refer_sell_data: refer_sell_money = refer_sell_data[1] big_order_deal_enough_result = radical_buy_data_manager.is_big_order_deal_enough(code, code_volumn_manager.CodeVolumeManager().get_volume_rate_refer_in_5days( code), refer_sell_money, for_buy=True) # 缺乏的大单金额 lack_money = big_order_deal_enough_result[3] # 如果有大单成交就不需要看大单 if constant.CAN_RADICAL_BUY_NEED_BIG_ORDER_EVERYTIME: # 每次下单都需要大单 current_big_order_deal_money_info = EveryLimitupBigDealOrderManager.get_big_buy_deal_order_money_info( code) if current_big_order_deal_money_info and tool.trade_time_sub(tool.get_now_time_str(), current_big_order_deal_money_info[1]) > 60: # 60s以上就不下单了 return False, None, "距离上次统计大单时间过去60s", set() if lack_money == 0: if not tool.is_sh_code(code): # 非上证的票看50w min_num = int(5000 / limit_up_price) # 需要监听的大单 watch_indexes = set() # 总委托大单金额 total_delegating_big_money = 0 single_index = None # 从成交进度位开始看 trade_index, is_default = cls.__TradeBuyQueue.get_traded_index(code) if trade_index is None: trade_index = 0 canceled_buyno_map = local_today_canceled_buyno_map.get(code) for i in range(trade_index, end_index + 1): data = total_datas[i] val = data["val"] if not L2DataUtil.is_limit_up_price_buy(val): continue if val["num"] < min_num: continue # 撤单不算 left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code, i, total_datas, canceled_buyno_map) if left_count == 0: continue # 上证有可能是部分成交的大单 if i == start_index and tool.is_sh_code(code): dealing_active_order_info = HuaXinBuyOrderManager().get_dealing_active_order_info(code) if dealing_active_order_info and dealing_active_order_info[0] == int(val["orderNo"]): # 判断是否为大单 order_money = dealing_active_order_info[2] + round(val["price"], 2) * val["num"] * 100 if order_money >= bigger_money: lack_money -= order_money watch_indexes.add(i) if lack_money < 0: single_index = i break if int(val["orderNo"]) <= radical_data[1]: # 主动买单后的数据不算 continue watch_indexes.add(i) lack_money -= round(val["price"], 2) * val["num"] * 100 if lack_money < 0: single_index = i break if single_index is not None: return True, single_index, "有大单", watch_indexes return False, None, f"大单不足:{trade_index}-{end_index} 缺少的大单-{lack_money}", watch_indexes radical_data = RadicalBuyDealCodesManager.buy_by_l2_delegate_expire_time_dict.get(code) record_codes = radical_buy_data_manager.BlockPlaceOrderRecordManager().get_codes() # 是否扫入过 is_radical_buy = code in record_codes if not radical_data: return False, None, "不满足激进买的条件" if constant.CAN_RADICAL_BUY_AT_LIMIT_UP: # 板上放量可扫入 if t.time() > radical_data[0] and not is_radical_buy: # 没扫入过才需要判断时间 return False, None, "超过生效时间" else: # 板上放量不可扫入 if t.time() > radical_data[0]: return False, None, "超过生效时间" result = __can_order() l2_log.debug(code, f"L2扫入判断:{result}") if result[0]: # 已经扫入下过单且允许板上放量扫入的就需要判断板上放量的距离 if is_radical_buy and constant.CAN_RADICAL_BUY_AT_LIMIT_UP: is_limit_up_buy = cls.__is_at_limit_up_buy(code) if is_limit_up_buy: # 判断成交进度到当前数据的笔数,如果少于10笔且还有未成交的大单(>=299)就可以下单 trade_index, is_default = cls.__TradeBuyQueue.get_traded_index(code) if trade_index is None: trade_index = 0 can_place_order, msg = buy_strategy_util.is_near_by_trade_index(code, trade_index) if not can_place_order: return False, result[1], "扫入过的代码板上放量距离远" else: # 判断该板块前排是否已经有成交 deal_codes = RadicalBuyDealCodesManager().get_deal_codes() buy_blocks = radical_buy_data_manager.is_block_can_radical_buy(code, radical_data[2], deal_codes) if not buy_blocks: return False, result[1], f"板块代码已有成交:{radical_data[2]}" # 如果板上放量不可买入就需要删除信号 if not constant.CAN_RADICAL_BUY_AT_LIMIT_UP and code in RadicalBuyDealCodesManager.buy_by_l2_delegate_expire_time_dict: RadicalBuyDealCodesManager.buy_by_l2_delegate_expire_time_dict.pop(code) return True, result[1], radical_data[2], radical_data[4], result[3] return result @classmethod def test__compute_active_order_begin_pos(cls, code, continue_count, start_index, end_index): return cls.__compute_active_order_begin_pos(code, continue_count, start_index, end_index) @classmethod def __get_threshmoney(cls, code): m, msg = cls.__l2PlaceOrderParamsManagerDict[code].get_m_val() if trade_manager.CodesTradeStateManager().get_trade_state_cache(code) == trade_constant.TRADE_STATE_NOT_TRADE: # 首次下单m值扩大1.5倍 m = int(m * 1.5) return m, msg # 计算万手哥笔数 @classmethod def __compute_big_money_count(cls, total_datas, start_index, end_index): count = 0 for i in range(start_index, end_index + 1): if L2DataUtil.is_limit_up_price_buy(total_datas[i]["val"]): count += total_datas[i]["re"] elif L2DataUtil.is_limit_up_price_buy_cancel(total_datas[i]["val"]): count -= total_datas[i]["re"] return count # 将已经成交的过滤掉 @classmethod def __filter_not_deal_indexes(cls, code, indexes): trade_index, is_default = transaction_progress.TradeBuyQueue().get_traded_index(code) if is_default: return indexes if trade_index is None: return indexes findexes = set() for index in indexes: if index < trade_index: continue findexes.add(index) return findexes # 返回(买入执行点, 总手, 总笔数, 从新计算起点, 纯买额阈值) # 计算快速买入 @classmethod def __sum_buy_num_for_order_fast(cls, code, compute_start_index, compute_end_index, origin_num, origin_count, threshold_money_origin, buy_single_index, max_num_set): _start_time = t.time() total_datas = local_today_datas[code] # is_first_code = gpcode_manager.FirstCodeManager().is_in_first_record_cache(code) buy_nums = origin_num buy_count = origin_count limit_up_price = gpcode_manager.get_limit_up_price(code) if limit_up_price is None: raise Exception("涨停价无法获取") limit_up_price = float(limit_up_price) threshold_money = threshold_money_origin # 目标手数 threshold_num = round(threshold_money / (limit_up_price * 100)) # buy_single_time_seconds = L2DataUtil.get_time_as_second(total_datas[buy_single_index]["val"]["time"]) # 可以触发买,当有涨停买信号时才会触发买 trigger_buy = True # 间隔最大时间为3s max_space_time_ms = 3 * 1000 # 不下单的信息 not_buy_msg = "" max_buy_num_set = set(max_num_set) place_order_count = trade_data_manager.PlaceOrderCountManager().get_place_order_count(code) if place_order_count is None: place_order_count = 0 is_ge_code = tool.is_ge_code(code) for i in range(compute_start_index, compute_end_index + 1): data = total_datas[i] _val = total_datas[i]["val"] trigger_buy = False # 必须为连续2秒内的数据 if L2DataUtil.time_sub_as_ms(_val, total_datas[buy_single_index]["val"]) > max_space_time_ms: cls.__TradePointManager.delete_buy_point(code) if i == compute_end_index: # 数据处理完毕 return None, buy_nums, buy_count, None, threshold_money, max_buy_num_set, f"【{i}】信号不连续,囊括时间-{max_space_time_ms}ms" else: # 计算买入信号,不能同一时间开始计算 for ii in range(buy_single_index + 1, compute_end_index + 1): if total_datas[buy_single_index]["val"]["time"] != total_datas[ii]["val"]["time"]: return None, buy_nums, buy_count, ii, threshold_money, max_buy_num_set, f"【{i}】信号不连续,囊括时间-{max_space_time_ms}ms" if L2DataUtil.is_sell(_val): threshold_money += _val["num"] * int(float(_val["price"]) * 100) threshold_num = round(threshold_money / (limit_up_price * 100)) elif L2DataUtil.is_sell_cancel(_val): threshold_money -= _val["num"] * int(float(_val["price"]) * 100) threshold_num = round(threshold_money / (limit_up_price * 100)) # 涨停买 elif L2DataUtil.is_limit_up_price_buy(_val): if l2_data_util.is_big_money(_val, is_ge_code): max_buy_num_set.add(i) trigger_buy = True # 只统计59万以上的金额 buy_nums += int(_val["num"]) * int(total_datas[i]["re"]) buy_count += int(total_datas[i]["re"]) if buy_nums >= threshold_num: l2_log.info(code, logger_l2_trade_buy, f"{code}获取到买入执行点(快速买入):{i} 统计纯买手数:{buy_nums} 目标纯买手数:{threshold_num} 统计纯买单数:{buy_count}") elif L2DataUtil.is_limit_up_price_buy_cancel(_val): # 判断买入位置是否在买入信号之前 buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data_v2(total_datas[i], local_today_buyno_map.get( code)) if buy_index is not None: # 找到买撤数据的买入点 if buy_index >= buy_single_index: max_buy_num_set.discard(buy_index) buy_nums -= int(_val["num"]) * int(data["re"]) buy_count -= int(data["re"]) l2_log.buy_debug(code, "{}数据在买入信号之后 撤买纯买手数:{} 目标手数:{}", i, buy_nums, threshold_num) else: l2_log.buy_debug(code, "{}数据在买入信号之前,买入位:{}", i, buy_index) if total_datas[buy_single_index]["val"]["time"] == total_datas[buy_index]["val"]["time"]: # 同一秒,当作买入信号之后处理 buy_nums -= int(_val["num"]) * int(data["re"]) buy_count -= int(data["re"]) max_buy_num_set.discard(buy_index) # 大单撤销 l2_log.buy_debug(code, "{}数据买入位与预估买入位在同一秒", i) else: # 未找到买撤数据的买入点 l2_log.buy_debug(code, "未找到买撤数据的买入点: 位置-{} 数据-{}", i, data) buy_nums -= int(_val["num"]) * int(total_datas[i]["re"]) buy_count -= int(total_datas[i]["re"]) l2_log.buy_debug(code, "位置-{},总手数:{},目标手数:{}", i, buy_nums, threshold_num) # 纯买额足够,且笔数大于5笔 if buy_nums < threshold_num: not_buy_msg = f"【{i}】纯买额不够,{buy_nums}/{threshold_num}" continue if not trigger_buy: not_buy_msg = f"【{i}】没有买单触发" continue if buy_count < 5: not_buy_msg = f"【{i}】安全笔数不足,{buy_count}/{5}" continue max_buy_num_set = cls.__filter_not_deal_indexes(code, max_buy_num_set) big_money_count_threshhold = cls.__l2PlaceOrderParamsManagerDict[code].get_big_num_count() if len(max_buy_num_set) < big_money_count_threshhold: not_buy_msg = f"【{i}】大单不满足要求,需要:{big_money_count_threshhold} 总:{len(max_buy_num_set)}" continue try: info = cls.__trade_log_placr_order_info_dict[code] info.set_trade_factor(threshold_money, 0, []) except Exception as e: async_log_util.error(logger_l2_error, f"记录交易因子出错:{str(e)}") return i, buy_nums, buy_count, None, threshold_money, max_buy_num_set, "可以下单" l2_log.buy_debug(code, "尚未获取到买入执行点(快速买入),起始计算位置:{} 统计纯买手数:{} 目标纯买手数:{} 统计纯买单数:{}", compute_start_index, buy_nums, threshold_num, buy_count) return None, buy_nums, buy_count, None, threshold_money, max_buy_num_set, not_buy_msg # 返回(买入执行点, 总手, 总笔数, 从新计算起点, 纯买额阈值) # 计算激进买入 @classmethod def __sum_buy_num_for_order_active(cls, code, compute_start_index, compute_end_index, origin_num, origin_count, threshold_money_origin, buy_single_index, max_num_set, total_sell_money): _start_time = t.time() total_datas = local_today_datas[code] # is_first_code = gpcode_manager.FirstCodeManager().is_in_first_record_cache(code) buy_nums = origin_num buy_count = origin_count limit_up_price = gpcode_manager.get_limit_up_price(code) if limit_up_price is None: raise Exception("涨停价无法获取") limit_up_price = float(limit_up_price) is_at_limit_up = False current_sell_data = cls.__L2MarketSellManager.get_current_total_sell_data(code) if current_sell_data and current_sell_data[1] == 0: # 板上放量买 is_at_limit_up = True threshold_money = threshold_money_origin # 目标手数 threshold_num = round(threshold_money / (limit_up_price * 100)) bigger_threshold_num = round(5000 / (limit_up_price)) # buy_single_time_seconds = L2DataUtil.get_time_as_second(total_datas[buy_single_index]["val"]["time"]) # 可以触发买,当有涨停买信号时才会触发买 trigger_buy = True # 间隔最大时间为3s max_space_time_ms = 3 * 1000 if tool.is_sz_code(code) and not is_at_limit_up: # 深证非板上放量 max_space_time_ms = 1 * 1000 # 上证的间隔时间为1s if tool.is_sh_code(code): max_space_time_ms = 1 * 1000 # 不下单的信息 not_buy_msg = "" max_buy_num_set = set(max_num_set) active_buy_blocks = cls.get_active_buy_blocks(code) is_ge_code = tool.is_ge_code(code) for i in range(compute_start_index, compute_end_index + 1): data = total_datas[i] _val = total_datas[i]["val"] trigger_buy = False # 必须为连续2秒内的数据 if L2DataUtil.time_sub_as_ms(_val, total_datas[buy_single_index]["val"]) > max_space_time_ms: # 清除买点数据 cls.__TradePointManager.delete_buy_point(code) if i == compute_end_index: # 数据处理完毕 return None, buy_nums, buy_count, None, threshold_money, max_buy_num_set, f"【{i}】信号不连续,囊括时间-{max_space_time_ms}ms", True else: # 计算买入信号,不能同一时间开始计算 for ii in range(buy_single_index + 1, compute_end_index + 1): if total_datas[buy_single_index]["val"]["time"] != total_datas[ii]["val"]["time"]: return None, buy_nums, buy_count, ii, threshold_money, max_buy_num_set, f"【{i}】信号不连续,囊括时间-{max_space_time_ms}ms", True # if L2DataUtil.is_sell(_val): # threshold_money += _val["num"] * int(float(_val["price"]) * 100) # threshold_num = round(threshold_money / (limit_up_price * 100)) # elif L2DataUtil.is_sell_cancel(_val): # threshold_money -= _val["num"] * int(float(_val["price"]) * 100) # threshold_num = round(threshold_money / (limit_up_price * 100)) # 涨停买 elif L2DataUtil.is_limit_up_price_buy(_val): if _val['num'] < bigger_threshold_num: continue if l2_data_util.is_big_money(_val, is_ge_code): max_buy_num_set.add(i) trigger_buy = True # 只统计59万以上的金额 buy_nums += int(_val["num"]) * int(total_datas[i]["re"]) buy_count += int(total_datas[i]["re"]) if buy_nums >= threshold_num: l2_log.info(code, logger_l2_trade_buy, f"{code}获取到买入执行点(积极下单):{i} 统计纯买手数:{buy_nums} 目标纯买手数:{threshold_num} 统计纯买单数:{buy_count}") elif L2DataUtil.is_limit_up_price_buy_cancel(_val): # 判断买入位置是否在买入信号之前 buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data_v2(total_datas[i], local_today_buyno_map.get( code)) if buy_index is not None: # 找到买撤数据的买入点 if buy_index >= buy_single_index: max_buy_num_set.discard(buy_index) buy_nums -= int(_val["num"]) * int(data["re"]) buy_count -= int(data["re"]) l2_log.buy_debug(code, "{}数据在买入信号之后 撤买纯买手数:{} 目标手数:{}", i, buy_nums, threshold_num) else: l2_log.buy_debug(code, "{}数据在买入信号之前,买入位:{}", i, buy_index) if total_datas[buy_single_index]["val"]["time"] == total_datas[buy_index]["val"]["time"]: # 同一秒,当作买入信号之后处理 buy_nums -= int(_val["num"]) * int(data["re"]) buy_count -= int(data["re"]) max_buy_num_set.discard(buy_index) # 大单撤销 l2_log.buy_debug(code, "{}数据买入位与预估买入位在同一秒", i) else: # 未找到买撤数据的买入点 l2_log.buy_debug(code, "未找到买撤数据的买入点: 位置-{} 数据-{}", i, data) buy_nums -= int(_val["num"]) * int(total_datas[i]["re"]) buy_count -= int(total_datas[i]["re"]) l2_log.buy_debug(code, "位置-{},总手数:{},目标手数:{}", i, buy_nums, threshold_num) # 计算信号位置之后的主动卖,加入到阈值之中 sell_orders = HuaXinSellOrderStatisticManager.get_latest_transaction_datas(code, min_sell_order_no=int( total_datas[buy_single_index]['val']['orderNo']), min_deal_time=tool.trade_time_add_second( total_datas[buy_single_index]['val']['time'], -1)) sell_order_num = sum([x[1] for x in sell_orders]) // 100 # 纯买额足够,且笔数大于2笔 if buy_nums < threshold_num + sell_order_num: l2_log.buy_debug(code, f"激进买主动卖手数:{sell_order_num}") not_buy_msg = f"【{i}】纯买额不够,{buy_nums}/{threshold_num}" continue if not trigger_buy: not_buy_msg = f"【{i}】没有买单触发" continue safe_count = 1 if active_buy_blocks and current_sell_data[1] > 5000000: # 有激进板块且大于500w才能扫入下单 safe_count = 1 else: # 无激进下单板块 if total_sell_money <= 0: # 所有的板上放量都需要3笔 # 板上下单需要安全笔数3笔 safe_count = 3 else: if tool.is_sz_code(code): # 深证上板 safe_count = 2 elif tool.is_sh_code(code): # 上证安全笔数为3 safe_count = 3 if tool.is_sz_code(code): money_y = code_volumn_manager.CodeVolumeManager().get_reference_volume_as_money_y(code) # 大于8亿的安全笔数必须有8笔 if money_y >= 8: safe_count = 8 # 9:31之前下单,安全笔数最小为5笔 if int(tool.get_now_time_str().replace(":", "")) < int("093100"): safe_count = max(safe_count, 5) # 深证的票,第一次下单必须要有3笔安全笔数 if tool.is_sz_code(code): place_order_count = trade_data_manager.PlaceOrderCountManager().get_place_order_count(code) if place_order_count == 0: safe_count = max(safe_count, 3) if buy_count < safe_count: not_buy_msg = f"【{i}】安全笔数不足,{buy_count}/{safe_count}" continue # max_buy_num_set = cls.__filter_not_deal_indexes(code, max_buy_num_set) # big_money_count_threshhold = cls.__l2PlaceOrderParamsManagerDict[code].get_big_num_count() # if len(max_buy_num_set) < big_money_count_threshhold: # not_buy_msg = f"【{i}】大单不满足要求,需要:{big_money_count_threshhold} 总:{len(max_buy_num_set)}" # continue try: info = cls.__trade_log_placr_order_info_dict[code] info.set_trade_factor(threshold_money, 0, []) except Exception as e: async_log_util.error(logger_l2_error, f"记录交易因子出错:{str(e)}") l2_log.buy_debug(code, f"激进买主动卖手数:{sell_order_num}") return i, buy_nums, buy_count, None, threshold_money, max_buy_num_set, "可以下单", False l2_log.buy_debug(code, "尚未获取到买入执行点(激进买入),起始计算位置:{} 统计纯买手数:{} 目标纯买手数:{} 统计纯买单数:{} ", compute_start_index, buy_nums, threshold_num, buy_count) return None, buy_nums, buy_count, None, threshold_money, max_buy_num_set, not_buy_msg, False def test_trade_record(): code = "000333" __trade_log_placr_order_info_dict = {code: trade_record_log_util.PlaceOrderInfo()} try: jx_blocks, jx_blocks_by = KPLCodeJXBlockManager().get_jx_blocks_cache( code), KPLCodeJXBlockManager().get_jx_blocks_cache(code, by=True) info = __trade_log_placr_order_info_dict[code] info.set_buy_index(0, 1) if jx_blocks: info.set_kpl_blocks(list(jx_blocks)) elif jx_blocks_by: info.set_kpl_blocks(list(jx_blocks_by)) else: info.set_kpl_blocks([]) trade_record_log_util.add_place_order_log(code, info) except: pass if __name__ == "__main__": # test_trade_record() # yesterday_limit_up_data_records = kpl_data_manager.get_current_limit_up_data_records(1)[0][1] # yesterday_codes = set([x[0] for x in yesterday_limit_up_data_records]) # print(yesterday_codes) code = "603003" datas = log_export.load_l2_from_log() datas = datas.get(code) if datas is None: datas = [] l2.l2_data_util.local_today_datas[code] = datas[:191] l2.l2_data_util.load_buy_no_map(l2.l2_data_util.local_today_buyno_map, code, l2.l2_data_util.local_today_datas[code]) l2.l2_data_util.load_canceled_buy_no_map(l2.l2_data_util.local_today_canceled_buyno_map, code, l2.l2_data_util.local_today_datas[code]) start_index = 73 end_index = 190 LCancelBigNumComputer().compute_watch_index(code, start_index, end_index)