import copy import logging import random import time as t from code_attribute import big_money_num_manager, code_volumn_manager, code_data_util, industry_codes_sort, \ limit_up_time_manager, global_data_loader, gpcode_manager, code_nature_analyse import constant from code_attribute.code_nature_analyse import HighIncreaseCodeManager from db.redis_manager_delegate import RedisUtils from l2.huaxin import l2_huaxin_util, huaxin_delegate_postion_manager from l2.l2_sell_manager import L2MarketSellManager, L2LimitUpSellManager from l2.transaction_progress import TradeBuyQueue from log_module import async_log_util, log_export from third_data import kpl_data_manager, block_info from utils import global_util, ths_industry_util, tool import l2_data_util from db import redis_manager_delegate as redis_manager from third_data.code_plate_key_manager import CodePlateKeyBuyManager, KPLCodeJXBlockManager, LimitUpCodesPlateKeyManager from trade import trade_manager, trade_queue_manager, l2_trade_factor, l2_trade_util, \ trade_result_manager, current_price_process_manager, trade_data_manager, trade_huaxin, trade_record_log_util from l2 import l2_data_manager, l2_log, l2_data_source_util, code_price_manager, \ transaction_progress, cancel_buy_strategy, l2_data_log from l2.cancel_buy_strategy import SecondCancelBigNumComputer, HourCancelBigNumComputer, DCancelBigNumComputer, \ LCancelBigNumComputer, LatestCancelIndexManager, FastCancelBigNumComputer, LCancelRateManager from l2.l2_data_manager import L2DataException, OrderBeginPosInfo from l2.l2_data_util import local_today_datas, L2DataUtil, local_today_num_operate_map, local_today_buyno_map, \ local_latest_datas, local_today_canceled_buyno_map import l2.l2_data_util from log_module.log import logger_l2_trade_buy, logger_l2_process, logger_l2_error, logger_debug from trade.trade_data_manager import CodeActualPriceProcessor, PlaceOrderCountManager from trade.trade_manager import TradeTargetCodeModeManager, AccountAvailableMoneyManager class L2DataManager: # 格式化数据 def format_data(self, datas): format_datas = [] for data in datas: format_datas.append({"val": data, "re": 1}) return format_datas # 获取新增数据 def get_add_datas(self, format_datas): pass # 从数据库加载数据 def load_data(self, code=None, force=False): pass # 保存数据 def save_datas(self, add_datas, datas): pass # m值大单处理 class L2BigNumForMProcessor: _db = 1 _redis_manager = redis_manager.RedisManager(1) m_big_money_begin_cache = {} m_big_money_process_index_cache = {} __instance = None def __new__(cls, *args, **kwargs): if not cls.__instance: cls.__instance = super(L2BigNumForMProcessor, cls).__new__(cls, *args, **kwargs) cls.__load_datas() return cls.__instance @classmethod def __get_redis(cls): return cls._redis_manager.getRedis() @classmethod def __load_datas(cls): _redis = cls._redis_manager.getRedis() try: keys = RedisUtils.keys(_redis, "m_big_money_begin-*") for k in keys: code = k.split("-")[-1] val = RedisUtils.get(_redis, k) tool.CodeDataCacheUtil.set_cache(cls.m_big_money_begin_cache, code, int(val)) keys = RedisUtils.keys(_redis, "m_big_money_process_index-*") for k in keys: code = k.split("-")[-1] val = RedisUtils.get(_redis, k) tool.CodeDataCacheUtil.set_cache(cls.m_big_money_process_index_cache, code, int(val)) finally: RedisUtils.realse(_redis) # 保存计算开始位置 def set_begin_pos(self, code, index): if self.__get_begin_pos_cache(code) is None: tool.CodeDataCacheUtil.set_cache(self.m_big_money_begin_cache, code, index) # 保存位置 key = "m_big_money_begin-{}".format(code) RedisUtils.setex_async(self._db, key, tool.get_expire(), index) # 获取计算开始位置 def __get_begin_pos(self, code): key = "m_big_money_begin-{}".format(code) val = RedisUtils.get(self.__get_redis(), key) if val is None: return None return int(val) def __get_begin_pos_cache(self, code): cache_result = tool.CodeDataCacheUtil.get_cache(self.m_big_money_begin_cache, code) if cache_result[0]: return cache_result[1] return None # 清除已经处理的数据 def clear_processed_end_index(self, code): tool.CodeDataCacheUtil.clear_cache(self.m_big_money_process_index_cache, code) key = "m_big_money_process_index-{}".format(code) RedisUtils.delete_async(self._db, key) # 添加已经处理过的单 def __set_processed_end_index(self, code, index): tool.CodeDataCacheUtil.set_cache(self.m_big_money_process_index_cache, code, index) key = "m_big_money_process_index-{}".format(code) RedisUtils.setex_async(self._db, key, tool.get_expire(), index) # 是否已经处理过 def __get_processed_end_index(self, code): key = "m_big_money_process_index-{}".format(code) val = RedisUtils.get(self.__get_redis(), key) if val is None: return None return int(val) def __get_processed_end_index_cache(self, code): cache_result = tool.CodeDataCacheUtil.get_cache(self.m_big_money_process_index_cache, code) if cache_result[0]: return cache_result[1] return None # 处理大单 def process(self, code, start_index, end_index, limit_up_price): begin_pos = self.__get_begin_pos_cache(code) if begin_pos is None: # 没有获取到开始买入信号 return # 上次处理到的坐标 processed_index = self.__get_processed_end_index_cache(code) if processed_index is None: processed_index = 0 if processed_index >= end_index: return start_time = round(t.time() * 1000) total_datas = local_today_datas[code] num_splites = [round(5000 / limit_up_price), round(10000 / limit_up_price), round(20000 / limit_up_price), round(30000 / limit_up_price)] total_num = 0 for i in range(max(start_index, processed_index), end_index + 1): data = total_datas[i] if not L2DataUtil.is_limit_up_price_buy_cancel(data["val"]) and not L2DataUtil.is_limit_up_price_buy( data["val"]): continue # 如果是涨停买撤信号需要看数据位置是否比开始处理时间早 if L2DataUtil.is_limit_up_price_buy_cancel(data["val"]): # 获取买入信号 buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data_v2(total_datas[i], local_today_buyno_map.get( code)) if buy_index is not None and buy_index < begin_pos: continue # 计算成交金额 num = int(data["val"]["num"]) temp = 0 if num < num_splites[0]: pass elif num < num_splites[1]: temp = 1 elif num < num_splites[2]: temp = round(4 / 3, 3) elif num < num_splites[3]: temp = 2 else: temp = 4 count = int(temp * data["re"] * 1000) if L2DataUtil.is_limit_up_price_buy_cancel(data["val"]): count = 0 - count total_num += count self.__set_processed_end_index(code, end_index) big_money_num_manager.add_num(code, total_num) print("m值大单计算范围:{}-{} 时间:{}".format(max(start_index, processed_index), end_index, round(t.time() * 1000) - start_time)) class L2TradeDataProcessor: unreal_buy_dict = {} volume_rate_info = {} # 最近的闪电下单位置,格式为{code:(总卖时间,总卖额)} __latest_fast_place_order_info_dict = {} __codeActualPriceProcessor = CodeActualPriceProcessor() __ths_l2_trade_queue_manager = trade_queue_manager.thsl2tradequeuemanager() __thsBuy1VolumnManager = trade_queue_manager.THSBuy1VolumnManager() __l2PlaceOrderParamsManagerDict = {} __last_buy_single_dict = {} __TradeBuyQueue = transaction_progress.TradeBuyQueue() __latest_process_order_unique_keys = {} __latest_process_not_order_unique_keys_count = {} __trade_log_placr_order_info_dict = {} # 下单信息保存 # 初始化 __TradePointManager = l2_data_manager.TradePointManager() __SecondCancelBigNumComputer = SecondCancelBigNumComputer() __HourCancelBigNumComputer = HourCancelBigNumComputer() __LCancelBigNumComputer = LCancelBigNumComputer() __TradeStateManager = trade_manager.TradeStateManager() __CodesTradeStateManager = trade_manager.CodesTradeStateManager() __PauseBuyCodesManager = gpcode_manager.PauseBuyCodesManager() __Buy1PriceManager = code_price_manager.Buy1PriceManager() __AccountAvailableMoneyManager = AccountAvailableMoneyManager() __TradeBuyDataManager = trade_data_manager.TradeBuyDataManager() __LimitUpTimeManager = limit_up_time_manager.LimitUpTimeManager() __BlackListCodeManager = l2_trade_util.BlackListCodeManager() __WhiteListCodeManager = l2_trade_util.WhiteListCodeManager() __WantBuyCodesManager = gpcode_manager.WantBuyCodesManager() __TradeTargetCodeModeManager = TradeTargetCodeModeManager() __TradeOrderIdManager = trade_huaxin.TradeOrderIdManager() __LatestCancelIndexManager = LatestCancelIndexManager() __L2MarketSellManager = L2MarketSellManager() __L2LimitUpSellManager = L2LimitUpSellManager() __PlaceOrderCountManager = PlaceOrderCountManager() # 获取代码评分 @classmethod def get_code_scores(cls): score_dict = {} for code in cls.__l2PlaceOrderParamsManagerDict: score = cls.__l2PlaceOrderParamsManagerDict[code].score score_dict[code] = score return score_dict @classmethod # 数据处理入口 # datas: 本次截图数据 # capture_timestamp:截图时间戳 def process(cls, code, datas, capture_timestamp): __start_time = round(t.time() * 1000) try: if len(datas) > 0: # 判断价格区间是否正确 if not code_data_util.is_same_code_with_price(code, float(datas[0]["val"]["price"])): raise L2DataException(L2DataException.CODE_PRICE_ERROR, "股价不匹配 code-{} price-{}".format(code, datas[0]["val"]["price"])) # 加载历史数据,返回数据是否正常 is_normal = l2.l2_data_util.load_l2_data(code) if not is_normal: print("历史数据异常:", code) # 数据不正常需要禁止交易 l2_trade_util.forbidden_trade(code, msg="L2历史数据异常") # 纠正数据 if constant.L2_SOURCE_TYPE == constant.L2_SOURCE_TYPE_THS: # 同花顺需要纠正数据,其他渠道不需要 datas = l2.l2_data_util.L2DataUtil.correct_data(code, local_latest_datas.get(code), datas) _start_index = 0 if local_today_datas.get(code) is not None and len( local_today_datas[code]) > 0: _start_index = local_today_datas[code][-1]["index"] + 1 add_datas = l2.l2_data_util.L2DataUtil.get_add_data(code, local_latest_datas.get(code), datas, _start_index) # -------------数据增量处理------------ try: cls.process_add_datas(code, add_datas, capture_timestamp, __start_time) finally: # 保存数据 __start_time = round(t.time() * 1000) l2.l2_data_util.save_l2_data(code, datas, add_datas) # __start_time = l2_data_log.l2_time(code, # round(t.time() * 1000) - __start_time, # "保存数据时间({})".format(len(add_datas))) finally: if code in cls.unreal_buy_dict: cls.unreal_buy_dict.pop(code) @classmethod def set_real_place_order_index(cls, code, index, order_begin_pos: OrderBeginPosInfo): trade_record_log_util.add_real_place_order_position_log(code, index, order_begin_pos.buy_single_index) if order_begin_pos.mode == OrderBeginPosInfo.MODE_FAST: need_cancel = FastCancelBigNumComputer().set_real_order_index(code, index) if need_cancel: l2_log.debug(code, "触发撤单,撤单位置:{} ,撤单原因:{}", index, "F撤不够2笔触发撤单") cls.cancel_buy(code, msg="F撤不够2笔触发撤单") return l2_log.debug(code, "设置真实下单位:{}", index) cancel_buy_strategy.set_real_place_position(code, index, order_begin_pos.buy_single_index) # 处理华鑫L2数据 @classmethod def process_huaxin(cls, code, origin_datas): datas = None try: l2_data_log.l2_time_log(code, "开始加载历史数据") # 加载历史的L2数据 is_normal = l2.l2_data_util.load_l2_data(code, load_latest=False) if not is_normal: # 数据不正常需要禁止交易 l2_trade_util.forbidden_trade(code, msg="L2历史数据异常") # 转换数据格式 _start_index = 0 total_datas = local_today_datas.get(code) if total_datas: _start_index = total_datas[-1]["index"] + 1 l2_data_log.l2_time_log(code, "开始格式化原始数据") datas = l2_huaxin_util.get_format_l2_datas(code, origin_datas, gpcode_manager.get_limit_up_price(code), _start_index) __start_time = round(t.time() * 1000) l2_data_log.l2_time_log(code, "开始处理数据") if len(datas) > 0: cls.process_add_datas(code, datas, 0, __start_time) except Exception as e: async_log_util.error(logger_l2_error, f"code:{code}") # async_log_util.exception(logger_l2_error, e) logger_l2_error.exception(e) finally: if datas: l2_data_log.l2_time_log(code, "开始保存数据") l2.l2_data_util.save_l2_data(code, None, datas) origin_datas.clear() @classmethod def process_add_datas(cls, code, add_datas, capture_timestamp, __start_time): now_time_str = tool.get_now_time_str() if len(add_datas) > 0: if code not in cls.__trade_log_placr_order_info_dict: cls.__trade_log_placr_order_info_dict[code] = trade_record_log_util.PlaceOrderInfo() # 拼接数据 local_today_datas[code].extend(add_datas) l2.l2_data_util.load_num_operate_map(local_today_num_operate_map, code, add_datas) l2.l2_data_util.load_buy_no_map(local_today_buyno_map, code, add_datas) l2.l2_data_util.load_canceled_buy_no_map(local_today_canceled_buyno_map, code, add_datas) l2_data_log.l2_time_log(code, "process_add_datas 加载完数据") if constant.L2_SOURCE_TYPE == constant.L2_SOURCE_TYPE_HUAXIN: try: if constant.TEST: pass # order_begin_pos = cls.__get_order_begin_pos(code) # if order_begin_pos.buy_exec_index and order_begin_pos.buy_exec_index>=0: # place_order_index = add_datas[-1]["index"] # cls.set_real_place_order_index(code, place_order_index, order_begin_pos.buy_single_index) else: # 获取下单位置 place_order_index = huaxin_delegate_postion_manager.get_l2_place_order_position(code, float( gpcode_manager.get_limit_up_price(code)), add_datas) if place_order_index: order_begin_pos = cls.__get_order_begin_pos( code) cls.set_real_place_order_index(code, place_order_index, order_begin_pos) async_log_util.info(logger_l2_process, "code:{} 获取到下单真实位置:{}", code, place_order_index) except: async_log_util.error(logger_l2_error, f"{code} 处理真实下单位置出错") # 第1条数据是否为09:30:00 if add_datas[0]["val"]["time"] == "09:30:00": if global_util.cuurent_prices.get(code): price_data = global_util.cuurent_prices.get(code) if price_data[1]: # 当前涨停价,设置涨停时间 async_log_util.info(logger_l2_process, "开盘涨停:{}", code) # 保存涨停时间 cls.__LimitUpTimeManager.save_limit_up_time(code, "09:30:00") total_datas = local_today_datas[code] # __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time, # "l2数据预处理时间") if len(add_datas) > 0: # 是否为首板代码 is_first_code = True # gpcode_manager.FirstCodeManager().is_in_first_record(code) # 计算量 current_sell = cls.__L2MarketSellManager.get_current_total_sell_data(code) total_sell_volume = 0 if current_sell and len(current_sell) > 2: total_sell_volume = current_sell[2] volume_rate = code_volumn_manager.get_volume_rate(code, total_sell_volume=total_sell_volume) volume_rate_index = code_volumn_manager.get_volume_rate_index(volume_rate) # 计算分值 limit_up_time = cls.__LimitUpTimeManager.get_limit_up_time_cache(code) if limit_up_time is None: limit_up_time = tool.get_now_time_str() score = None cls.__l2PlaceOrderParamsManagerDict[code] = l2_trade_factor.L2PlaceOrderParamsManager(code, is_first_code, volume_rate, volume_rate_index, score, total_datas[-1][ 'val']['time']) cls.volume_rate_info[code] = (volume_rate, volume_rate_index) latest_time = add_datas[len(add_datas) - 1]["val"]["time"] # __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time, # "l2数据准备时间") # 时间差不能太大才能处理 if not l2_trade_util.is_in_forbidden_trade_codes(code): # 计算板上卖,当数据少时才计算,否则不计算 try: if len(add_datas) < 20: has_limit_up_sell = False for d in add_datas: if L2DataUtil.is_limit_up_price_sell(d["val"]): if d["val"]["num"] * float(d["val"]["price"]) < 5000: continue cls.__L2LimitUpSellManager.add_limit_up_sell(code, d["index"]) has_limit_up_sell = True if has_limit_up_sell: LCancelRateManager.compute_big_num_deal_rate(code) # elif L2DataUtil.is_limit_up_price_sell_cancel(d["val"]): # cls.__L2LimitUpSellManager.add_limit_up_sell(code, d["index"]) except Exception as e: async_log_util.error(logger_l2_error, f"计算板上卖出错:{str(e)}") # 判断是否已经挂单 state = cls.__CodesTradeStateManager.get_trade_state_cache(code) start_index = len(total_datas) - len(add_datas) end_index = len(total_datas) - 1 l2_data_log.l2_time_log(code, "process_add_datas 开始处理") if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER or state == trade_manager.TRADE_STATE_BUY_SUCCESS: # 已挂单 cls.__process_order(code, start_index, end_index, capture_timestamp, is_first_code) else: # 未挂单,时间相差不大才能挂单 if l2.l2_data_util.L2DataUtil.is_same_time(now_time_str, latest_time): cls.__process_not_order(code, start_index, end_index, capture_timestamp, is_first_code) async_log_util.info(logger_l2_process, "code:{} 处理数据范围: {}-{} 处理时间:{} 截图时间戳:{}", code, add_datas[0]["index"], add_datas[-1]["index"], round(t.time() * 1000) - __start_time, capture_timestamp) # 处理未挂单 @classmethod def __process_not_order(cls, code, start_index, end_index, capture_time, is_first_code): __start_time = round(t.time() * 1000) # 获取阈值 threshold_money, msg = cls.__get_threshmoney(code) cls.__start_compute_buy(code, start_index, end_index, threshold_money, capture_time, is_first_code) # 测试专用 @classmethod def process_order(cls, code, start_index, end_index, capture_time, is_first_code, new_add=True): cls.__process_order(code, start_index, end_index, capture_time, is_first_code, new_add) # 处理已挂单 @classmethod def __process_order(cls, code, start_index, end_index, capture_time, is_first_code, new_add=True): # 增加推出机制 unique_key = f"{start_index}-{end_index}" if cls.__latest_process_order_unique_keys.get(code) == unique_key: async_log_util.error(logger_l2_error, f"重复处理数据:code-{code} start_index-{start_index} end_index-{end_index}") return cls.__latest_process_order_unique_keys[code] = unique_key # S撤 def s_cancel(_buy_single_index, _buy_exec_index): _start_time = round(t.time() * 1000) # S撤单计算,看秒级大单撤单 try: b_need_cancel, b_cancel_data = cls.__SecondCancelBigNumComputer.need_cancel(code, _buy_single_index, _buy_exec_index, start_index, end_index, total_data, code_volumn_manager.get_volume_rate_index( buy_volume_rate), cls.volume_rate_info[code][ 1], is_first_code) if b_need_cancel: return b_cancel_data, "S大单撤销比例触发阈值" except Exception as e: logging.exception(e) async_log_util.error(logger_l2_error, f"S撤出错 参数:buy_single_index-{_buy_single_index} buy_exec_index-{_buy_exec_index}") async_log_util.exception(logger_l2_error, e) finally: # l2_data_log.l2_time(code, round(t.time() * 1000) - _start_time, # "已下单-s级大单估算") pass return None, "" # H撤 def h_cancel(_buy_single_index, _buy_exec_index): _start_time = round(t.time() * 1000) try: b_need_cancel, b_cancel_data = cls.__HourCancelBigNumComputer.need_cancel(code, _buy_single_index, _buy_exec_index, start_index, end_index, total_data, code_volumn_manager.get_volume_rate_index( order_begin_pos.buy_volume_rate), cls.volume_rate_info[code][1], is_first_code) if b_need_cancel and b_cancel_data: return b_cancel_data, "H撤" except Exception as e: if constant.TEST: logging.exception(e) async_log_util.error(logger_l2_error, f"H撤出错 参数:buy_single_index-{_buy_single_index} buy_exec_index-{_buy_exec_index} {str(e)}") async_log_util.exception(logger_l2_error, e) # logger_l2_error.exception(e) finally: # l2_data_log.l2_time(code, round(t.time() * 1000) - _start_time, "已下单-H撤大单计算") pass return None, "" # F撤 def f_cancel(_buy_single_index, _buy_exec_index): try: b_need_cancel, b_cancel_data = FastCancelBigNumComputer().need_cancel(code, start_index, end_index, order_begin_pos) if b_need_cancel and b_cancel_data: return b_cancel_data, f"F撤" except Exception as e: if constant.TEST: logging.exception(e) async_log_util.error(logger_l2_error, f"F撤出错 参数:buy_single_index-{_buy_single_index} buy_exec_index-{_buy_exec_index} 错误原因:{str(e)}") async_log_util.exception(logger_l2_error, e) return None, "" # L撤 def l_cancel(_buy_single_index, _buy_exec_index): _start_time = round(t.time() * 1000) try: b_need_cancel, b_cancel_data, extra_msg = cls.__LCancelBigNumComputer.need_cancel(code, _buy_exec_index, start_index, end_index, total_data, is_first_code) if b_need_cancel and b_cancel_data: return b_cancel_data, f"L撤({extra_msg})" except Exception as e: async_log_util.error(logger_l2_error, f"L撤出错 参数:buy_single_index-{_buy_single_index} buy_exec_index-{_buy_exec_index} 错误原因:{str(e)}") # logger_l2_error.exception(e) async_log_util.exception(logger_l2_error, e) finally: # l2_data_log.l2_time(code, round(t.time() * 1000) - _start_time, "已下单-L撤大单计算") pass return None, "" if start_index < 0: start_index = 0 if end_index < start_index: return total_data = local_today_datas.get(code) _start_time = tool.get_now_timestamp() # 获取买入信号起始点 order_begin_pos = cls.__get_order_begin_pos( code) # 默认量为0.2 if order_begin_pos.buy_volume_rate is None: buy_volume_rate = 0.2 cancel_data, cancel_msg = None, "" if order_begin_pos.mode == OrderBeginPosInfo.MODE_FAST: cancel_data, cancel_msg = f_cancel(order_begin_pos.buy_single_index, order_begin_pos.buy_exec_index) # 依次处理 if not cancel_data: cancel_data, cancel_msg = l_cancel(order_begin_pos.buy_single_index, order_begin_pos.buy_exec_index) # 暂时取消S撤 # if not cancel_data: # cancel_data, cancel_msg = s_cancel(buy_single_index, buy_exec_index) if not cancel_data: cancel_data, cancel_msg = h_cancel(order_begin_pos.buy_single_index, order_begin_pos.buy_exec_index) if cancel_data: l2_log.debug(code, "触发撤单,撤单位置:{} ,撤单原因:{}", cancel_data["index"], cancel_msg) # 撤单 cls.cancel_buy(code, cancel_msg, cancel_index=cancel_data["index"]) # 撤单成功,继续计算下单 cls.__process_not_order(code, cancel_data["index"] + 1, end_index, capture_time, is_first_code) else: pass @classmethod def __buy(cls, code, capture_timestamp, last_data, last_data_index, is_first_code): __start_time = tool.get_now_timestamp() can, need_clear_data, reason = False, False, "" if not is_first_code: can, need_clear_data, reason = cls.__can_buy(code) else: can, need_clear_data, reason = cls.__can_buy_first(code) # __start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - __start_time, "最后判断是否能下单", force=True) # 删除虚拟下单 if code in cls.unreal_buy_dict: cls.unreal_buy_dict.pop(code) order_begin_pos = cls.__get_order_begin_pos( code) if not can: l2_log.debug(code, "不可以下单,原因:{}", reason) trade_record_log_util.add_cant_place_order_log(code, reason) if need_clear_data: trade_result_manager.real_cancel_success(code, order_begin_pos.buy_single_index, order_begin_pos.buy_exec_index, local_today_datas.get(code)) return False else: l2_log.debug(code, "可以下单,原因:{}", reason) try: l2_log.debug(code, "开始执行买入") trade_manager.start_buy(code, capture_timestamp, last_data, last_data_index, order_begin_pos.mode) l2_log.debug(code, "执行买入成功") ################下单成功处理################ trade_result_manager.real_buy_success(code, cls.__TradePointManager) l2_log.debug(code, "处理买入成功1") cancel_buy_strategy.set_real_place_position(code, local_today_datas.get(code)[-1]["index"], order_begin_pos.buy_single_index) l2_log.debug(code, "处理买入成功2") params_desc = cls.__l2PlaceOrderParamsManagerDict[code].get_buy_rank_desc() l2_log.debug(code, params_desc) ############记录下单时的数据############ try: jx_blocks, jx_blocks_by = KPLCodeJXBlockManager().get_jx_blocks_cache( code), KPLCodeJXBlockManager().get_jx_blocks_cache(code, by=True) if jx_blocks: jx_blocks = jx_blocks[0] if jx_blocks_by: jx_blocks_by = jx_blocks_by[0] info = cls.__trade_log_placr_order_info_dict[code] info.mode = order_begin_pos.mode info.set_buy_index(order_begin_pos.buy_single_index, order_begin_pos.buy_exec_index) if jx_blocks: info.set_kpl_blocks(list(jx_blocks)) elif jx_blocks_by: info.set_kpl_blocks(list(jx_blocks_by)) else: info.set_kpl_blocks([]) can_buy_result = CodePlateKeyBuyManager.can_buy(code) if can_buy_result: info.set_kpl_match_blocks(can_buy_result[0]) trade_record_log_util.add_place_order_log(code, info) except Exception as e: async_log_util.error(logger_l2_error, f"加入买入记录日志出错:{str(e)}") except Exception as e: async_log_util.exception(logger_l2_error, e) l2_log.debug(code, "执行买入异常:{}", str(e)) pass finally: # l2_log.debug(code, "m值影响因子:{}", l2_trade_factor.L2TradeFactorUtil.factors_to_string(code)) pass return True # 是否可以取消 @classmethod def __can_cancel(cls, code): if constant.TEST: return True, "" if cls.__WhiteListCodeManager.is_in_cache(code): return False, "代码在白名单中" # 暂时注释掉 # 14点后如果是板块老大就不需要取消了 # now_time_str = tool.get_now_time_str() # if int(now_time_str.replace(":", "")) >= 140000: # industry, codes = ths_industry_util.get_same_industry_codes(code, gpcode_manager.get_gp_list()) # if industry is None: # return True, "没有获取到行业" # codes_index = industry_codes_sort.sort_codes(codes, code) # if codes_index is not None and codes_index.get(code) is not None: # # 同一板块中老二后面的不能买 # if codes_index.get(code) == 0: # return False, "14:00后老大不能撤单" # elif codes_index.get(code) == 1: # # 判断老大是否都是09:30:00涨停的 # # 同1板块老大是09:30:00涨停,老二14:00砸开的不撤 # first_count = 0 # for key in codes_index: # if codes_index[key] == 0: # first_count += 1 # if limit_up_time_manager.get_limit_up_time(key) == "09:30:00": # first_count -= 1 # if first_count == 0: # return False, "14:00后老大都开盘涨停,老二不能撤单" return True, "" # 是否可以买 # 返回是否可以买,是否需要清除之前的买入信息,原因 @classmethod def __can_buy(cls, code): __start_time = t.time() if not cls.__TradeStateManager.is_can_buy_cache(): return False, True, f"今日已禁止交易" # 之前的代码 # 首板代码且尚未涨停过的不能下单 # is_limited_up = gpcode_manager.FirstCodeManager().is_limited_up(code) # if not is_limited_up: # gpcode_manager.FirstCodeManager().add_limited_up_record([code]) # place_order_count = trade_data_manager.PlaceOrderCountManager().get_place_order_count( # code) # if place_order_count == 0: # trade_data_manager.PlaceOrderCountManager().place_order(code) # return False, True, "首板代码,且尚未涨停过" try: # 买1价格必须为涨停价才能买 # buy1_price = cls.buy1PriceManager().get_price(code) # if buy1_price is None: # return False, "买1价尚未获取到" # limit_up_price = gpcode_manager.get_limit_up_price(code) # if limit_up_price is None: # return False, "尚未获取到涨停价" # if abs(float(buy1_price) - float(limit_up_price)) >= 0.01: # return False, "买1价不为涨停价,买1价-{} 涨停价-{}".format(buy1_price, limit_up_price) # 从买入信号起始点到当前数据末尾的纯买手数与当前的卖1做比较,如果比卖1小则不能买入 total_datas = local_today_datas[code] if total_datas[-1]["index"] + 1 > len(total_datas): return False, True, "L2数据错误" try: sell1_time, sell1_price, sell1_volumn = cls.__ths_l2_trade_queue_manager.get_sell1_info(code) l2_log.buy_debug(code, "卖1信息为:({},{},{})", sell1_time, sell1_price, sell1_volumn) if sell1_time is not None and sell1_volumn > 0: # 获取执行位信息 order_begin_pos = cls.__get_order_begin_pos(code) buy_nums = order_begin_pos.num for i in range(order_begin_pos.buy_exec_index + 1, total_datas[-1]["index"] + 1): _val = total_datas[i]["val"] # 涨停买 if L2DataUtil.is_limit_up_price_buy(_val): # 涨停买 buy_nums += _val["num"] * total_datas[i]["re"] elif L2DataUtil.is_limit_up_price_buy_cancel(_val): buy_nums -= _val["num"] * total_datas[i]["re"] if buy_nums < sell1_volumn * 0.49: return False, False, "纯买量({})小于卖1量的49%{} 卖1时间:{}".format(buy_nums, sell1_volumn, sell1_time) except Exception as e: logging.exception(e) # 量比超过1.3的不能买 volumn_rate = cls.volume_rate_info[code][0] if volumn_rate >= 1.3: return False, False, "最大量比超过1.3不能买" limit_up_time = cls.__LimitUpTimeManager.get_limit_up_time_cache(code) if limit_up_time is not None: limit_up_time_seconds = l2.l2_data_util.L2DataUtil.get_time_as_second( limit_up_time) if limit_up_time_seconds >= l2.l2_data_util.L2DataUtil.get_time_as_second( "13:00:00"): return False, False, "二板下午涨停的不能买,涨停时间为{}".format(limit_up_time) if limit_up_time_seconds >= l2.l2_data_util.L2DataUtil.get_time_as_second("14:55:00"): return False, False, "14:55后涨停的不能买,涨停时间为{}".format(limit_up_time) # 同一板块中老二后面的不能买 industry, codes = ths_industry_util.get_same_industry_codes(code, gpcode_manager.get_gp_list()) if industry is None: return True, False, "没有获取到行业" codes_index = industry_codes_sort.sort_codes(codes, code) if codes_index is not None and codes_index.get(code) is not None and codes_index.get(code) > 1: # 当老大老二当前没涨停 return False, False, "同一板块中老三,老四,...不能买" if cls.__codeActualPriceProcessor.is_under_water(code, total_datas[-1]["val"]["time"]): # 水下捞且板块中的票小于16不能买 # if global_util.industry_hot_num.get(industry) is not None and global_util.industry_hot_num.get( # industry) <= 16: # return False, "水下捞,板块中的票小于2只,为{}".format(global_util.industry_hot_num.get(industry)) # 水下捞自由流通市值大于老大的不要买 if codes_index.get(code) != 0: # 获取老大的市值 for c in codes_index: if codes_index.get(c) == 0 and global_util.zyltgb_map.get(code) > global_util.zyltgb_map.get(c): return False, False, "水下捞,不是老大,且自由流通市值大于老大" # 13:30后涨停,本板块中涨停票数<29不能买 # if limit_up_time is not None: # if int(limit_up_time.replace(":", "")) >= 133000 and global_util.industry_hot_num.get(industry) is not None: # if global_util.industry_hot_num.get(industry) < 16: # return False, "13:30后涨停,本板块中涨停票数<16不能买" if codes_index.get(code) is not None and codes_index.get(code) == 1: # 如果老大已经买成功了, 老二就不需要买了 first_codes = [] for key in codes_index: if codes_index.get(key) == 0: first_codes.append(key) # 暂时注释掉 # for key in first_codes: # state = trade_manager.get_trade_state(key) # if state == trade_manager.TRADE_STATE_BUY_SUCCESS: # # 老大已经买成功了 # return False, "老大{}已经买成功,老二无需购买".format(key) # # # 有9点半涨停的老大才能买老二,不然不能买 # # 获取老大的涨停时间 # for key in first_codes: # # 找到了老大 # time_ = limit_up_time_manager.get_limit_up_time(key) # if time_ == "09:30:00": # return True, "9:30涨停的老大,老二可以下单" # return False, "老大非9:30涨停,老二不能下单" # 过时 老二,本板块中涨停票数<29 不能买 # if codes_index.get(code) is not None and codes_index.get(code) == 1 and global_util.industry_hot_num.get( # industry) is not None: # if global_util.industry_hot_num.get(industry) < 29: # return False, "老二,本板块中涨停票数<29不能买" # 可以下单 return True, False, None finally: # l2_data_log.l2_time(code, round((t.time() - __start_time) * 1000), "是否可以下单计算") pass @classmethod def __can_buy_first(cls, code): if not cls.__TradeStateManager.is_can_buy_cache(): return False, True, f"今日已禁止交易" if cls.__PauseBuyCodesManager.is_in_cache(code): return False, True, f"该代码被暂停交易" limit_up_price = gpcode_manager.get_limit_up_price(code) if float(limit_up_price) >= constant.MAX_CODE_PRICE: HighIncreaseCodeManager().add_code(code) return False, True, f"股价大于{constant.MAX_CODE_PRICE}块" place_order_count = cls.__PlaceOrderCountManager.get_place_order_count(code) if place_order_count and place_order_count >= 10: l2_trade_util.forbidden_trade(code, msg="当日下单次数已达10次") return False, True, f"当日下单次数已达10次" # ---------均价约束------------- average_rate = cls.__Buy1PriceManager.get_average_rate(code) if average_rate and average_rate <= 0.01: return False, True, f"均价涨幅({average_rate})小于1%" # -------量的约束-------- if float(limit_up_price) < 3.0 and cls.volume_rate_info[code][0] < 0.6: return False, True, f"涨停价小于3块,当日量比({cls.volume_rate_info[code][0]})小于0.6" if HighIncreaseCodeManager().is_in(code) and cls.volume_rate_info[code][0] < 0.6: return False, True, f"股价涨得过高,当日量比({cls.volume_rate_info[code][0]})小于0.6" if int(tool.get_now_time_str().replace(":", "")) <= int("100000"): if cls.volume_rate_info[code][0] < 0.1: return False, True, f"当日量比({cls.volume_rate_info[code][0]})小于0.1" else: if cls.volume_rate_info[code][0] < 0.2: return False, True, f"当日量比({cls.volume_rate_info[code][0]})小于0.2" k_format = code_nature_analyse.CodeNatureRecordManager().get_k_format_cache(code) if k_format and (k_format[1][0] or k_format[3][0]): # 股价创新高或者逼近前高 if cls.volume_rate_info[code][0] < 0.3: return False, True, f"股价创新高或者逼近前高,当日量比({cls.volume_rate_info[code][0]})小于0.3" if code_nature_analyse.LatestMaxVolumeManager().is_latest_max_volume(code): # 最近几天有最大量,判断量比是否大于60% if cls.volume_rate_info[code][0] < 0.6: # HighIncreaseCodeManager().add_code(code) return False, True, f"近日出现最大量,当日量比({cls.volume_rate_info[code][0]})小于0.6" total_data = local_today_datas.get(code) if constant.L2_SOURCE_TYPE == constant.L2_SOURCE_TYPE_HUAXIN: trade_price = current_price_process_manager.get_trade_price(code) if trade_price is None: return False, True, f"尚未获取到当前成交价" if False and float(limit_up_price) - float(trade_price) > 0.00001: # 计算信号起始位置到当前的手数 order_begin_pos = cls.__get_order_begin_pos( code) num_operate_map = local_today_num_operate_map.get(code) total_num = 0 for i in range(order_begin_pos.buy_single_index, total_data[-1]["index"] + 1): data = total_data[i] val = data["val"] if not L2DataUtil.is_limit_up_price_buy(val): continue left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code, data[ "index"], total_data, local_today_canceled_buyno_map.get( code)) total_num += left_count * val["num"] m_base_val = l2_trade_factor.L2PlaceOrderParamsManager.get_base_m_val(code) thresh_hold_num = m_base_val // (float(gpcode_manager.get_limit_up_price(code)) * 100) if total_num < thresh_hold_num * 2: return False, False, f"当前成交价({trade_price})尚未在0档及以内 且 纯买额({total_num})小于2倍M值({thresh_hold_num * 2})" # 判断成交进度是否距离我们的位置很近 trade_index, is_default = cls.__TradeBuyQueue.get_traded_index(code) if False and not is_default and trade_index: not_cancel_num = 0 num_operate_map = local_today_num_operate_map.get(code) for i in range(trade_index + 1, total_data[-1]["index"] + 1): if L2DataUtil.is_limit_up_price_buy(total_data[i]["val"]): left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code, total_data[ i][ "index"], total_data, local_today_canceled_buyno_map.get( code)) if left_count > 0: not_cancel_num += total_data[i]["val"]["num"] m_base_val = l2_trade_factor.L2PlaceOrderParamsManager.get_base_m_val(code) not_cancel_money = not_cancel_num * 100 * float(gpcode_manager.get_limit_up_price(code)) if m_base_val > not_cancel_money: return False, False, f"成交位置距离当前位置纯买额({not_cancel_money})小于m值({m_base_val})" else: # 判断买1价格档位 zyltgb = global_util.zyltgb_map.get(code) if zyltgb is None: global_data_loader.load_zyltgb() zyltgb = global_util.zyltgb_map.get(code) if zyltgb >= 200 * 100000000: buy1_price = cls.__Buy1PriceManager.get_buy1_price(code) if buy1_price is None: return False, True, f"尚未获取到买1价" dif = float(limit_up_price) - float(buy1_price) # 大于10档 if dif > 0.10001: return False, True, f"自由流通200亿以上,买1剩余档数大于10档,买一({buy1_price})涨停({limit_up_price})" open_limit_up_lowest_price = cls.__Buy1PriceManager.get_open_limit_up_lowest_price(code) price_pre_close = gpcode_manager.CodePrePriceManager.get_price_pre_cache(code) if open_limit_up_lowest_price and ( float(open_limit_up_lowest_price) - price_pre_close) / price_pre_close < 0.05: return False, True, f"炸板后最低价跌至5%以下" # 回封的票,下13:15买入需要判断板块是否为独苗 if open_limit_up_lowest_price and int(total_data[-1]["val"]["time"].replace(":", "")) > 131500: # 获取当前票的涨停原因 if code in LimitUpCodesPlateKeyManager.today_total_limit_up_reason_dict: limit_up_reason = kpl_data_manager.KPLLimitUpDataRecordManager.get_current_block(code) if limit_up_reason and limit_up_reason not in constant.KPL_INVALID_BLOCKS: # 判断是否是独苗 codes = kpl_data_manager.KPLLimitUpDataRecordManager.get_current_codes_by_block(limit_up_reason) if codes: codes = copy.deepcopy(codes) codes.discard(code) if not codes: return False, True, f"13:15以后炸板之后下单,({limit_up_reason}) 为独苗" # limit_up_info = cls.__Buy1PriceManager.get_limit_up_info(code) # if limit_up_info[0] is None and False: # total_data = local_today_datas.get(code) # buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set, buy_volume_rate = cls.__get_order_begin_pos( # code) # # 之前没有涨停过 # # 统计买入信号位到当前位置没有撤的大单金额 # min_money_w = l2_data_util.get_big_money_val(float(total_data[buy_single_index]["val"]["price"])) // 10000 # left_big_num = cls.__SecondCancelBigNumComputer.compute_left_big_num(code, # buy_single_index, # buy_exec_index, # total_data[-1][ # "index"], # total_data, # 0, min_money_w) # if left_big_num > 0: # # 重新获取分数与分数索引 # limit_up_time = cls.__LimitUpTimeManager.get_limit_up_time_cache(code) # if limit_up_time is None: # limit_up_time = tool.get_now_time_str() # logger_place_order_score.info("code={},data='score_index':{},'score_info':{}", code, # cls.__l2PlaceOrderParamsManagerDict[code].score_index, # cls.__l2PlaceOrderParamsManagerDict[code].score_info) if not cls.__WantBuyCodesManager.is_in_cache(code): if cls.__TradeTargetCodeModeManager.get_mode_cache() == TradeTargetCodeModeManager.MODE_ONLY_BUY_WANT_CODES: return False, True, f"只买想买单中的代码" score_index = None # cls.__l2PlaceOrderParamsManagerDict[code].score_index score = None # cls.__l2PlaceOrderParamsManagerDict[code].score score_info = None # cls.__l2PlaceOrderParamsManagerDict[code].score_info # lp = LineProfiler() # lp.enable() # lp_wrap = lp(cls.can_buy_first) # results = lp_wrap(code, limit_up_price, score_index, score, score_info, cls.volume_rate_info[code]) # output = io.StringIO() # lp.print_stats(stream=output) # lp.disable() # with open(f"{constant.get_path_prefix()}/logs/profile/{code}_can_buy_first.txt", 'w') as f: # f.write(output.getvalue()) # return results return cls.can_buy_first(code, limit_up_price) else: return True, False, "在想买名单中" @classmethod def can_buy_first(cls, code, limit_up_price): # 判断板块 can_buy_result = CodePlateKeyBuyManager.can_buy(code) if can_buy_result is None: async_log_util.warning(logger_debug, "没有获取到板块缓存,将获取板块") yesterday_codes = kpl_data_manager.get_yesterday_limit_up_codes() CodePlateKeyBuyManager.update_can_buy_blocks(code, kpl_data_manager.KPLLimitUpDataRecordManager.latest_origin_datas, kpl_data_manager.KPLLimitUpDataRecordManager.total_datas, yesterday_codes, block_info.get_before_blocks_dict(), kpl_data_manager.KPLLimitUpDataRecordManager.get_current_reason_codes_dict()) can_buy_result = CodePlateKeyBuyManager.can_buy(code) if can_buy_result is None: return False, True, "尚未获取到板块信息" if can_buy_result[1]: # ------自由流通市值约束------ zyltgb = global_util.zyltgb_map.get(code) if zyltgb: if zyltgb < 10 * 100000000: return False, True, f"独苗,自由流通小于10亿({zyltgb})" if code in cls.volume_rate_info and cls.volume_rate_info[code][0] < 0.6: return False, True, f"独苗:量比({cls.volume_rate_info[code][0]})未达到60%" # 获取K线形态,判断是否近2天是否为10天内最大量 k_format = code_nature_analyse.CodeNatureRecordManager().get_k_format_cache(code) if k_format and len(k_format) >= 10 and k_format[9]: # 是独苗 if code in cls.volume_rate_info and cls.volume_rate_info[code][0] < 0.3: return False, True, f"近2天有10日内最高量,量比({cls.volume_rate_info[code][0]})未达到30%" # 是独苗 if can_buy_result[1]: now_time_int = int(tool.get_now_time_str().replace(":", "")) if now_time_int < int("100000") or int("130000") <= now_time_int < int("133000"): # 独苗必须9:30-10:00和13:00-13:30时间段内买 return True, False, f"独苗:{can_buy_result[2]}" else: return False, True, f"独苗:当前时间不能买" else: if can_buy_result[0]: return True, False, can_buy_result[2] return False, True, can_buy_result[2] @classmethod def __cancel_buy(cls, code): try: l2_log.debug(code, "开始执行撤单") trade_manager.start_cancel_buy(code) l2_log.debug(code, "执行撤单成功") return True except Exception as e: logging.exception(e) l2_log.debug(code, "执行撤单异常:{}", str(e)) return False finally: pass @classmethod def cancel_buy(cls, code, msg=None, source="l2", cancel_index=None): # 是否是交易队列触发 order_begin_pos = cls.__get_order_begin_pos( code) total_datas = local_today_datas.get(code) if not total_datas: return False if source == "trade_queue": # 交易队列触发的需要下单后5s if order_begin_pos.buy_exec_index is not None and order_begin_pos.buy_exec_index > 0: now_time_str = tool.get_now_time_str() if tool.trade_time_sub(now_time_str, total_datas[order_begin_pos.buy_exec_index]["val"]["time"]) < 5: return False if code in cls.unreal_buy_dict: cls.unreal_buy_dict.pop(code) # 取消买入标识 trade_result_manager.virtual_cancel_success(code, order_begin_pos.buy_single_index, order_begin_pos.buy_exec_index, total_datas) else: can_cancel, reason = cls.__can_cancel(code) if not can_cancel: # 不能取消 l2_log.cancel_debug(code, "撤单中断,原因:{}", reason) l2_log.debug(code, "撤单中断,原因:{}", reason) return False if cancel_index is None: cancel_index = total_datas[-1]["index"] cls.__LatestCancelIndexManager.set_latest_cancel_index(code, cancel_index) # 添加撤单日志记录 trade_record_log_util.add_cancel_msg_log(code, msg) cancel_result = cls.__cancel_buy(code) if cancel_result: trade_result_manager.real_cancel_success(code, order_begin_pos.buy_single_index, order_begin_pos.buy_exec_index, total_datas) l2_log.debug(code, "执行撤单结束,原因:{}", msg) return True # 虚拟下单 @classmethod def __virtual_buy(cls, code, buy_single_index, buy_exec_index, capture_time): cls.unreal_buy_dict[code] = (buy_exec_index, capture_time) trade_result_manager.virtual_buy_success(code) # 是否是在板上买 @classmethod def __is_at_limit_up_buy(cls, code_): # 总卖为0,当前成交价为涨停价就判断为板上买 current_sell_data = cls.__L2MarketSellManager.get_current_total_sell_data(code_) if current_sell_data and tool.trade_time_sub(tool.get_now_time_str(), current_sell_data[0]) < 5: # 5s内的数据才有效 if current_sell_data[1] <= 0: # 总卖为0 trade_price = current_price_process_manager.get_trade_price(code_) limit_up_price = gpcode_manager.get_limit_up_price(code_) if trade_price and limit_up_price and abs(float(trade_price) - float(limit_up_price)) <= 0.001: # 当前成交价为涨停价 return True return False @classmethod def __start_compute_buy(cls, code, compute_start_index, compute_end_index, threshold_money, capture_time, is_first_code, new_add=True): if compute_end_index < compute_start_index: return unique_key = f"{code}-{compute_start_index}-{compute_end_index}" if cls.__latest_process_not_order_unique_keys_count.get( unique_key) and cls.__latest_process_not_order_unique_keys_count.get(unique_key) > 2: async_log_util.error(logger_l2_error, f"重复处理数据:code-{code} start_index-{compute_start_index} end_index-{compute_end_index}") return if unique_key not in cls.__latest_process_not_order_unique_keys_count: cls.__latest_process_not_order_unique_keys_count[unique_key] = 0 cls.__latest_process_not_order_unique_keys_count[unique_key] += 1 _start_time = tool.get_now_timestamp() total_datas = local_today_datas[code] # 获取买入信号计算起始位置 order_begin_pos = cls.__get_order_begin_pos( code) # 是否为新获取到的位置 new_get_single = False buy_single_index = order_begin_pos.buy_single_index if buy_single_index is None: # 尝试计算快速成交信号 has_single, _index, sell_info = cls.__compute_fast_order_begin_pos(code, compute_start_index, compute_end_index) fast_msg = None if has_single: order_begin_pos.mode = OrderBeginPosInfo.MODE_FAST order_begin_pos.sell_info = sell_info elif _index is not None and _index < 0: fast_msg = sell_info continue_count = cls.__l2PlaceOrderParamsManagerDict[code].get_begin_continue_buy_count() # 有买入信号 has_single, _index = cls.__compute_order_begin_pos(code, max( (compute_start_index - continue_count - 1) if new_add else compute_start_index, 0), continue_count, compute_end_index) order_begin_pos.mode = OrderBeginPosInfo.MODE_NORMAL # 如果买入信号与上次的买入信号一样就不能算新的信号 if cls.__last_buy_single_dict.get(code) == _index: has_single = None _index = None buy_single_index = _index if has_single: # 判断是否是板上买 order_begin_pos.at_limit_up = cls.__is_at_limit_up_buy(code) # -------------计算信号位到成交位的纯买额(过远则为无效位置)------------------- if order_begin_pos.at_limit_up: # 获取成交进度位置 trade_index, is_default = TradeBuyQueue().get_traded_index(code) if trade_index and not is_default: m_base_val = l2_trade_factor.L2PlaceOrderParamsManager.get_base_m_val(code) thresh_hold_num = m_base_val * 3 // (float(gpcode_manager.get_limit_up_price(code)) * 100) # 真实下单位到成交位置的纯买额 * 3 total_num = 0 for i in range(trade_index + 1, buy_single_index): data = total_datas[i] val = data["val"] if not L2DataUtil.is_limit_up_price_buy(val): continue left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2( code, data["index"], total_datas, local_today_canceled_buyno_map.get( code)) total_num += left_count * val["num"] if total_num > thresh_hold_num: break if total_num > thresh_hold_num: # 距离成交进度位置过远 l2_log.debug(code, f"获取的信号位无效(板上买,范围:{trade_index + 1}-{order_begin_pos.buy_single_index} 未成交总手{total_num}/阈值{thresh_hold_num})") return None cls.__last_buy_single_dict[code] = buy_single_index new_get_single = True order_begin_pos.num = 0 order_begin_pos.count = 0 order_begin_pos.buy_single_index = buy_single_index if order_begin_pos.sell_info: k_format = code_nature_analyse.CodeNatureRecordManager().get_k_format_cache(code) if k_format and (k_format[1][0] or k_format[3][0]): # 股价新高或者逼近前高 order_begin_pos.threshold_money = int(sell_info[1]) else: if float(total_datas[buy_single_index]["val"]["price"]) >= 3 and cls.volume_rate_info[code][ 0] > 0.3 and sell_info[1] > 2000 * 10000: # 暂时打8折 # order_begin_pos.threshold_money = int(sell_info[1] * 0.8) # 深证总卖大于1000万的票,m值打5折 if code.find('00') == 0: order_begin_pos.threshold_money = int(sell_info[1] * 0.6) else: order_begin_pos.threshold_money = int(sell_info[1] * 0.8) else: order_begin_pos.threshold_money = int(sell_info[1]) l2_log.debug(code, "获取到买入信号起始点:{} ,计算范围:{}-{} ,量比:{},是否板上买:{},数据:{} 模式:{}({})", buy_single_index, compute_start_index, compute_end_index, cls.volume_rate_info[code], order_begin_pos.at_limit_up, total_datas[buy_single_index], order_begin_pos.mode, fast_msg) # _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, "下单信号计算时间") if order_begin_pos.buy_single_index is None: # 未获取到买入信号,终止程序 return None # 开始计算的位置 start_process_index = max(order_begin_pos.buy_single_index, compute_start_index) if new_get_single: start_process_index = order_begin_pos.buy_single_index # _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, "计算m值大单") threshold_money, msg = cls.__get_threshmoney(code) # _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, "m值阈值计算") # 买入纯买额统计 new_buy_exec_index, buy_nums, buy_count, rebegin_buy_pos, max_num_set_new = None, None, None, None, [] if order_begin_pos.mode == OrderBeginPosInfo.MODE_FAST: threshold_money = order_begin_pos.threshold_money new_buy_exec_index, buy_nums, buy_count, rebegin_buy_pos, threshold_money_new = cls.__sum_buy_num_for_order_fast( code, start_process_index, compute_end_index, order_begin_pos.num, order_begin_pos.count, threshold_money, order_begin_pos.buy_single_index) threshold_money = threshold_money_new order_begin_pos.threshold_money = threshold_money else: new_buy_exec_index, buy_nums, buy_count, rebegin_buy_pos, max_num_set_new = cls.__sum_buy_num_for_order_3( code, start_process_index, compute_end_index, order_begin_pos.num, order_begin_pos.count, threshold_money, order_begin_pos.buy_single_index, order_begin_pos.max_num_set, order_begin_pos.at_limit_up) l2_log.debug(code, "m值-{} 量比:{} rebegin_buy_pos:{}", threshold_money, cls.volume_rate_info[code][0], rebegin_buy_pos) # 买入信号位与计算位置间隔2s及以上了 if rebegin_buy_pos is not None: # 需要重新计算纯买额 cls.__start_compute_buy(code, rebegin_buy_pos, compute_end_index, threshold_money, capture_time, is_first_code, False) return if new_buy_exec_index is not None: l2_log.debug(code, "获取到买入执行位置:{} m值:{} 纯买手数:{} 纯买单数:{} 是否板上买:{} 数据:{} ,量比:{} ,下单模式:{}", new_buy_exec_index, threshold_money, buy_nums, buy_count, order_begin_pos.at_limit_up, total_datas[new_buy_exec_index], cls.volume_rate_info[code], order_begin_pos.mode) cls.__save_order_begin_data(code, OrderBeginPosInfo(buy_single_index=buy_single_index, buy_exec_index=new_buy_exec_index, buy_compute_index=new_buy_exec_index, num=buy_nums, count=buy_count, max_num_set=max_num_set_new, buy_volume_rate=cls.volume_rate_info[code][0], mode=order_begin_pos.mode, sell_info=order_begin_pos.sell_info, threshold_money=threshold_money)) cls.__LimitUpTimeManager.save_limit_up_time(code, total_datas[new_buy_exec_index]["val"]["time"]) l2_log.debug(code, "delete_buy_cancel_point") # 直接下单 ordered = cls.__buy(code, capture_time, total_datas[-1], total_datas[-1]["index"], is_first_code) # 保存闪电下单的买入信息 if order_begin_pos.mode == OrderBeginPosInfo.MODE_FAST: cls.__latest_fast_place_order_info_dict[code] = ( order_begin_pos.sell_info[0], order_begin_pos.sell_info[1]) # 数据是否处理完毕 if new_buy_exec_index < compute_end_index: if ordered: cls.__process_order(code, new_buy_exec_index + 1, compute_end_index, capture_time, is_first_code, False) else: cls.__start_compute_buy(code, new_buy_exec_index + 1, compute_end_index, threshold_money, capture_time, is_first_code, False) else: # 未达到下单条件,保存纯买额,设置纯买额 # 记录买入信号位置 cls.__save_order_begin_data(code, OrderBeginPosInfo(buy_single_index=buy_single_index, buy_exec_index=-1, buy_compute_index=compute_end_index, num=buy_nums, count=buy_count, max_num_set=max_num_set_new, mode=order_begin_pos.mode, sell_info=order_begin_pos.sell_info, threshold_money=threshold_money)) _start_time = t.time() l2_data_log.l2_time_log(code, "__start_compute_buy 结束") # 获取下单起始信号 @classmethod def __get_order_begin_pos(cls, code) -> OrderBeginPosInfo: order_begin_pos = cls.__TradePointManager.get_buy_compute_start_data_cache( code) return order_begin_pos # 保存下单起始信号 @classmethod def __save_order_begin_data(cls, code, info: OrderBeginPosInfo): cls.__TradePointManager.set_buy_compute_start_data_v2(code, info) # 计算下单起始信号 # compute_data_count 用于计算的l2数据数量 @classmethod def __compute_order_begin_pos(cls, code, start_index, continue_count, end_index): second_930 = 9 * 3600 + 30 * 60 + 0 # 倒数100条数据查询 datas = local_today_datas[code] if end_index - start_index + 1 < continue_count: return False, None __time = None last_index = None count = 0 start = None now_time_s = tool.get_time_as_second(tool.get_now_time_str()) for i in range(start_index, end_index + 1): _val = datas[i]["val"] time_s = L2DataUtil.get_time_as_second(_val["time"]) # 时间要>=09:30:00 if time_s < second_930: continue if not constant.TEST: if abs(now_time_s - time_s) > 2: # 正式环境下不处理2s外的数据 continue if L2DataUtil.is_limit_up_price_buy(_val): # 金额要大于50万 if _val["num"] * float(_val["price"]) < 5000: continue # 寻找前面continue_count-1个涨停买 # for j in range(start_index - 1, -1, -1): # if datas[j]["val"] if last_index is None or (datas[last_index]["val"]["time"] == datas[i]["val"]["time"]): if start is None: start = i last_index = i count += datas[i]["re"] if count >= continue_count: return True, start else: # 本条数据作为起点 last_index = i count = datas[i]["re"] start = i elif not L2DataUtil.is_sell(_val) and not L2DataUtil.is_sell_cancel(_val): # 剔除卖与卖撤 last_index = None count = 0 start = None return False, None # 快速买入法的信号位置查找 @classmethod def __compute_fast_order_begin_pos(cls, code, start_index, end_index): limit_up_price = gpcode_manager.get_limit_up_price(code) # if float(limit_up_price) < 3: # return False, -1, "股价小于3块" total_datas = local_today_datas[code] start_time_str = total_datas[start_index]["val"]["time"] # if tool.trade_time_sub(start_time_str, "13:00:00") > 0: # return False, -1, "超过规定时间" refer_sell_data = cls.__L2MarketSellManager.get_refer_sell_data(code, start_time_str) if refer_sell_data is None: return False, -1, "总卖为空" if cls.__L2MarketSellManager.is_refer_sell_time_used(code, refer_sell_data[0]): return False, -1, "总卖统计时间已被使用" # 是否大于500万 if refer_sell_data[1] <= 500 * 10000: return False, -1, "总卖小于指定金额" # 统计之前的卖 threshold_money = refer_sell_data[1] for i in range(start_index - 1, -1, -1): val = total_datas[i]["val"] if tool.compare_time(val["time"], refer_sell_data[0]) <= 0: break if L2DataUtil.is_sell(val): threshold_money += val["num"] * int(float(val["price"]) * 100) elif L2DataUtil.is_sell_cancel(val): threshold_money -= val["num"] * int(float(val["price"]) * 100) # 是否为本秒的第一个涨停买 for i in range(start_index, end_index + 1): data = total_datas[i] val = data['val'] if not L2DataUtil.is_limit_up_price_buy(val): # 要统计卖与卖撤 if L2DataUtil.is_sell(val): threshold_money += val["num"] * int(float(val["price"]) * 100) elif L2DataUtil.is_sell_cancel(val): threshold_money -= val["num"] * int(float(val["price"]) * 100) continue # 50 万以下的不需要 if val["num"] * float(val["price"]) < 5000: continue # 是否为本s的第一次涨停 is_first_limit_up = True for j in range(i - 1, -1, -1): temp_val = total_datas[j]["val"] if temp_val["time"] == val["time"]: if L2DataUtil.is_limit_up_price_buy(temp_val) and temp_val["num"] * float( temp_val["price"]) >= 5000: is_first_limit_up = True break else: break if is_first_limit_up: return True, i, [refer_sell_data[0], threshold_money] return False, None, None @classmethod def __get_threshmoney(cls, code): m, msg = cls.__l2PlaceOrderParamsManagerDict[code].get_m_val() if trade_manager.CodesTradeStateManager().get_trade_state_cache(code) == trade_manager.TRADE_STATE_NOT_TRADE: # 首次下单m值扩大1.5倍 m = int(m * 1.5) return m, msg # 计算万手哥笔数 @classmethod def __compute_big_money_count(cls, total_datas, start_index, end_index): count = 0 for i in range(start_index, end_index + 1): if L2DataUtil.is_limit_up_price_buy(total_datas[i]["val"]): count += total_datas[i]["re"] elif L2DataUtil.is_limit_up_price_buy_cancel(total_datas[i]["val"]): count -= total_datas[i]["re"] return count # 统计买入净买量,不计算在买入信号之前的买撤单 @classmethod def __sum_buy_num_for_order_3(cls, code, compute_start_index, compute_end_index, origin_num, origin_count, threshold_money, buy_single_index, max_num_set, at_limit_up=False): _start_time = t.time() total_datas = local_today_datas[code] # is_first_code = gpcode_manager.FirstCodeManager().is_in_first_record_cache(code) buy_nums = origin_num buy_count = origin_count limit_up_price = gpcode_manager.get_limit_up_price(code) if limit_up_price is None: raise Exception("涨停价无法获取") threshold_num = None # 大目标手数(满足这个就不需要看安全笔数) threshold_max_num = None # ----------------调整板上下单的m值与安全笔数---------------- if at_limit_up: # 板上买,获取最近一次闪电下单的总卖额 sell_data = cls.__latest_fast_place_order_info_dict.get(code) if sell_data: # 有过闪电下单 # 总卖的一半作为m值 threshold_num = int(sell_data[1] / (limit_up_price * 100)) // 2 threshold_max_num = 1 if not threshold_num: # 目标手数 threshold_num = round(threshold_money / (limit_up_price * 100)) if not threshold_max_num: threshold_max_num = int(threshold_num * 1.2) # place_order_count = trade_data_manager.PlaceOrderCountManager().get_place_order_count(code) # 目标订单数量 threshold_count = cls.__l2PlaceOrderParamsManagerDict[code].get_safe_count() buy_single_time_seconds = L2DataUtil.get_time_as_second(total_datas[buy_single_index]["val"]["time"]) # 可以触发买,当有涨停买信号时才会触发买 trigger_buy = True # 间隔最大时间依次为:3,9,27,81 max_space_time = cls.__l2PlaceOrderParamsManagerDict[code].get_time_range() # 最大买量 max_buy_num = 0 max_buy_num_set = set(max_num_set) # 需要的最小大单笔数 big_num_count = cls.__l2PlaceOrderParamsManagerDict[code].get_big_num_count() # 较大单的手数 bigger_num = round(5000 / limit_up_price) for i in range(compute_start_index, compute_end_index + 1): data = total_datas[i] _val = total_datas[i]["val"] trigger_buy = False # 必须为连续2秒内的数据 if L2DataUtil.get_time_as_second(_val["time"]) - buy_single_time_seconds + 1 > max_space_time: cls.__TradePointManager.delete_buy_point(code) if i == compute_end_index: # 数据处理完毕 return None, buy_nums, buy_count, None, max_buy_num_set else: # 计算买入信号,不能同一时间开始计算 for ii in range(buy_single_index + 1, compute_end_index + 1): if total_datas[buy_single_index]["val"]["time"] != total_datas[ii]["val"]["time"]: return None, buy_nums, buy_count, ii, max_buy_num_set # 涨停买 if L2DataUtil.is_limit_up_price_buy(_val): if l2_data_util.is_big_money(_val): max_buy_num_set.add(i) if _val["num"] >= bigger_num: trigger_buy = True # 只统计59万以上的金额 buy_nums += int(_val["num"]) * int(total_datas[i]["re"]) buy_count += int(total_datas[i]["re"]) if (buy_nums >= threshold_num and buy_count >= threshold_count) or buy_nums >= threshold_max_num: async_log_util.info(logger_l2_trade_buy, f"{code}获取到买入执行点:{i} 统计纯买手数:{buy_nums} 目标纯买手数:{threshold_num}/{threshold_max_num} 统计纯买单数:{buy_count} 目标纯买单数:{threshold_count}, 大单数量:{len(max_buy_num_set)}") elif L2DataUtil.is_limit_up_price_buy_cancel(_val): if _val["num"] >= bigger_num: # 只统计59万以上的金额 # 涨停买撤 # 判断买入位置是否在买入信号之前 buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data_v2(total_datas[i], local_today_buyno_map.get( code)) if buy_index is not None: # 找到买撤数据的买入点 if buy_index >= buy_single_index: buy_nums -= int(_val["num"]) * int(data["re"]) buy_count -= int(data["re"]) # 大单撤销 max_buy_num_set.discard(buy_index) l2_log.buy_debug(code, "{}数据在买入信号之后 撤买纯买手数:{} 目标手数:{}", i, buy_nums, threshold_num) else: l2_log.buy_debug(code, "{}数据在买入信号之前,买入位:{}", i, buy_index) if total_datas[buy_single_index]["val"]["time"] == total_datas[buy_index]["val"]["time"]: # 同一秒,当作买入信号之后处理 buy_nums -= int(_val["num"]) * int(data["re"]) buy_count -= int(data["re"]) # 大单撤销 max_buy_num_set.discard(buy_index) l2_log.buy_debug(code, "{}数据买入位与预估买入位在同一秒", i) else: # 未找到买撤数据的买入点 l2_log.buy_debug(code, "未找到买撤数据的买入点: 位置-{} 数据-{}", i, data) buy_nums -= int(_val["num"]) * int(total_datas[i]["re"]) buy_count -= int(total_datas[i]["re"]) l2_log.buy_debug(code, "位置-{},总手数:{},目标手数:{}", i, buy_nums, threshold_num) max_buy_num_set_count = 0 for i1 in max_buy_num_set: max_buy_num_set_count += total_datas[i1]["re"] # 有撤单信号,且小于阈值 if buy_nums >= threshold_num and buy_count >= threshold_count and trigger_buy and max_buy_num_set_count >= big_num_count: try: info = cls.__trade_log_placr_order_info_dict[code] info.set_trade_factor(threshold_money, threshold_count, list(max_buy_num_set)) except Exception as e: async_log_util.error(logger_l2_error, f"记录交易因子出错:{str(e)}") return i, buy_nums, buy_count, None, max_buy_num_set l2_log.buy_debug(code, "尚未获取到买入执行点,起始计算位置:{} 统计纯买手数:{} 目标纯买手数:{} 统计纯买单数:{} 目标纯买单数:{} 大单数量:{} 目标大单数量:{}", compute_start_index, buy_nums, threshold_num, buy_count, threshold_count, max_buy_num_set_count, big_num_count) return None, buy_nums, buy_count, None, max_buy_num_set # 返回(买入执行点, 总手, 总笔数, 从新计算起点, 纯买额阈值) # 计算快速买入 @classmethod def __sum_buy_num_for_order_fast(cls, code, compute_start_index, compute_end_index, origin_num, origin_count, threshold_money_origin, buy_single_index): _start_time = t.time() total_datas = local_today_datas[code] # is_first_code = gpcode_manager.FirstCodeManager().is_in_first_record_cache(code) buy_nums = origin_num buy_count = origin_count limit_up_price = gpcode_manager.get_limit_up_price(code) if limit_up_price is None: raise Exception("涨停价无法获取") limit_up_price = float(limit_up_price) threshold_money = threshold_money_origin # 目标手数 threshold_num = round(threshold_money / (limit_up_price * 100)) buy_single_time_seconds = L2DataUtil.get_time_as_second(total_datas[buy_single_index]["val"]["time"]) # 可以触发买,当有涨停买信号时才会触发买 trigger_buy = True # 间隔最大时间为3s max_space_time = 3 for i in range(compute_start_index, compute_end_index + 1): data = total_datas[i] _val = total_datas[i]["val"] trigger_buy = False # 必须为连续2秒内的数据 if L2DataUtil.get_time_as_second(_val["time"]) - buy_single_time_seconds + 1 > max_space_time: cls.__TradePointManager.delete_buy_point(code) if i == compute_end_index: # 数据处理完毕 return None, buy_nums, buy_count, None, threshold_money else: # 计算买入信号,不能同一时间开始计算 for ii in range(buy_single_index + 1, compute_end_index + 1): if total_datas[buy_single_index]["val"]["time"] != total_datas[ii]["val"]["time"]: return None, buy_nums, buy_count, ii, threshold_money if L2DataUtil.is_sell(_val): threshold_money += _val["num"] * int(float(_val["price"]) * 100) threshold_num = round(threshold_money / (limit_up_price * 100)) elif L2DataUtil.is_sell_cancel(_val): threshold_money -= _val["num"] * int(float(_val["price"]) * 100) threshold_num = round(threshold_money / (limit_up_price * 100)) # 涨停买 elif L2DataUtil.is_limit_up_price_buy(_val): trigger_buy = True # 只统计59万以上的金额 buy_nums += int(_val["num"]) * int(total_datas[i]["re"]) buy_count += int(total_datas[i]["re"]) if buy_nums >= threshold_num: async_log_util.info(logger_l2_trade_buy, f"{code}获取到买入执行点(快速买入):{i} 统计纯买手数:{buy_nums} 目标纯买手数:{threshold_num} 统计纯买单数:{buy_count}") elif L2DataUtil.is_limit_up_price_buy_cancel(_val): # 判断买入位置是否在买入信号之前 buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data_v2(total_datas[i], local_today_buyno_map.get( code)) if buy_index is not None: # 找到买撤数据的买入点 if buy_index >= buy_single_index: buy_nums -= int(_val["num"]) * int(data["re"]) buy_count -= int(data["re"]) l2_log.buy_debug(code, "{}数据在买入信号之后 撤买纯买手数:{} 目标手数:{}", i, buy_nums, threshold_num) else: l2_log.buy_debug(code, "{}数据在买入信号之前,买入位:{}", i, buy_index) if total_datas[buy_single_index]["val"]["time"] == total_datas[buy_index]["val"]["time"]: # 同一秒,当作买入信号之后处理 buy_nums -= int(_val["num"]) * int(data["re"]) buy_count -= int(data["re"]) # 大单撤销 l2_log.buy_debug(code, "{}数据买入位与预估买入位在同一秒", i) else: # 未找到买撤数据的买入点 l2_log.buy_debug(code, "未找到买撤数据的买入点: 位置-{} 数据-{}", i, data) buy_nums -= int(_val["num"]) * int(total_datas[i]["re"]) buy_count -= int(total_datas[i]["re"]) l2_log.buy_debug(code, "位置-{},总手数:{},目标手数:{}", i, buy_nums, threshold_num) # 有撤单信号,且小于阈值 if buy_nums >= threshold_num and trigger_buy: try: info = cls.__trade_log_placr_order_info_dict[code] info.set_trade_factor(threshold_money, 0, []) except Exception as e: async_log_util.error(logger_l2_error, f"记录交易因子出错:{str(e)}") return i, buy_nums, buy_count, None, threshold_money l2_log.buy_debug(code, "尚未获取到买入执行点(快速买入),起始计算位置:{} 统计纯买手数:{} 目标纯买手数:{} 统计纯买单数:{}", compute_start_index, buy_nums, threshold_num, buy_count) return None, buy_nums, buy_count, None, threshold_money def test_trade_record(): code = "000333" __trade_log_placr_order_info_dict = {code: trade_record_log_util.PlaceOrderInfo()} try: jx_blocks, jx_blocks_by = KPLCodeJXBlockManager().get_jx_blocks_cache( code), KPLCodeJXBlockManager().get_jx_blocks_cache(code, by=True) info = __trade_log_placr_order_info_dict[code] info.set_buy_index(0, 1) if jx_blocks: info.set_kpl_blocks(list(jx_blocks)) elif jx_blocks_by: info.set_kpl_blocks(list(jx_blocks_by)) else: info.set_kpl_blocks([]) trade_record_log_util.add_place_order_log(code, info) except: pass if __name__ == "__main__": # test_trade_record() # yesterday_limit_up_data_records = kpl_data_manager.get_current_limit_up_data_records(1)[0][1] # yesterday_codes = set([x[0] for x in yesterday_limit_up_data_records]) # print(yesterday_codes) code = "603003" datas = log_export.load_l2_from_log() datas = datas.get(code) if datas is None: datas = [] l2.l2_data_util.local_today_datas[code] = datas[:191] l2.l2_data_util.load_buy_no_map(l2.l2_data_util.local_today_buyno_map, code, l2.l2_data_util.local_today_datas[code]) l2.l2_data_util.load_canceled_buy_no_map(l2.l2_data_util.local_today_canceled_buyno_map, code, l2.l2_data_util.local_today_datas[code]) start_index = 73 end_index = 190 LCancelBigNumComputer().compute_watch_index(code, start_index, end_index)