import logging import random import time as t import big_money_num_manager import code_data_util import code_volumn_manager import constant import global_util import gpcode_manager import industry_codes_sort import l2_data_util import l2_trade_test import limit_up_time_manager from db import redis_manager import ths_industry_util import tool from trade import trade_data_manager, trade_manager, trade_queue_manager, l2_trade_factor, l2_trade_util, \ trade_result_manager, first_code_score_manager from l2 import safe_count_manager, l2_data_manager, l2_data_log, l2_log, l2_data_source_util, code_price_manager from l2.cancel_buy_strategy import SecondCancelBigNumComputer, HourCancelBigNumComputer, L2LimitUpMoneyStatisticUtil, \ L2LimitUpSellStatisticUtil from l2.l2_data_manager import L2DataException, TradePointManager from l2.l2_data_util import local_today_datas, L2DataUtil, load_l2_data, local_today_num_operate_map, local_latest_datas import l2.l2_data_util from log import logger_l2_trade, logger_l2_trade_cancel, logger_l2_trade_buy, logger_l2_process, logger_l2_error # TODO l2数据管理 from trade.trade_data_manager import CodeActualPriceProcessor import dask class L2DataManager: # 格式化数据 def format_data(self, datas): format_datas = [] for data in datas: format_datas.append({"val": data, "re": 1}) return format_datas # 获取新增数据 def get_add_datas(self, format_datas): pass # 从数据库加载数据 def load_data(self, code=None, force=False): pass # 保存数据 def save_datas(self, add_datas, datas): pass # m值大单处理 class L2BigNumForMProcessor: def __init__(self): self._redis_manager = redis_manager.RedisManager(1) def __get_redis(self): return self._redis_manager.getRedis() # 保存计算开始位置 def set_begin_pos(self, code, index): if self.__get_begin_pos(code) is None: # 保存位置 key = "m_big_money_begin-{}".format(code) self.__get_redis().setex(key, tool.get_expire(), index) # 获取计算开始位置 def __get_begin_pos(self, code): key = "m_big_money_begin-{}".format(code) val = self.__get_redis().get(key) if val is None: return None return int(val) # 清除已经处理的数据 def clear_processed_end_index(self, code): key = "m_big_money_process_index-{}".format(code) self.__get_redis().delete(key) # 添加已经处理过的单 def __set_processed_end_index(self, code, index): key = "m_big_money_process_index-{}".format(code) self.__get_redis().setex(key, tool.get_expire(), index) # 是否已经处理过 def __get_processed_end_index(self, code): key = "m_big_money_process_index-{}".format(code) val = self.__get_redis().get(key) if val is None: return None return int(val) # 处理大单 def process(self, code, start_index, end_index, limit_up_price): begin_pos = self.__get_begin_pos(code) if begin_pos is None: # 没有获取到开始买入信号 return # 上次处理到的坐标 processed_index = self.__get_processed_end_index(code) if processed_index is None: processed_index = 0 if processed_index >= end_index: return start_time = round(t.time() * 1000) total_datas = local_today_datas[code] num_splites = [round(5000 / limit_up_price), round(10000 / limit_up_price), round(20000 / limit_up_price), round(30000 / limit_up_price)] total_num = 0 for i in range(max(start_index, processed_index), end_index + 1): data = total_datas[i] if not L2DataUtil.is_limit_up_price_buy_cancel(data["val"]) and not L2DataUtil.is_limit_up_price_buy( data["val"]): continue # 如果是涨停买撤信号需要看数据位置是否比开始处理时间早 if L2DataUtil.is_limit_up_price_buy_cancel(data["val"]): # 获取买入信号 buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data(code, total_datas[i], local_today_num_operate_map.get( code)) if buy_index is not None and buy_index < begin_pos: continue # 计算成交金额 num = int(data["val"]["num"]) temp = 0 if num < num_splites[0]: pass elif num < num_splites[1]: temp = 1 elif num < num_splites[2]: temp = round(4 / 3, 3) elif num < num_splites[3]: temp = 2 else: temp = 4 count = int(temp * data["re"] * 1000) if L2DataUtil.is_limit_up_price_buy_cancel(data["val"]): count = 0 - count total_num += count self.__set_processed_end_index(code, end_index) big_money_num_manager.add_num(code, total_num) print("m值大单计算范围:{}-{} 时间:{}".format(max(start_index, processed_index), end_index, round(t.time() * 1000) - start_time)) class L2TradeDataProcessor: unreal_buy_dict = {} volume_rate_info = {} l2BigNumForMProcessor = L2BigNumForMProcessor() __codeActualPriceProcessor = CodeActualPriceProcessor() __ths_l2_trade_queue_manager = trade_queue_manager.thsl2tradequeuemanager() __thsBuy1VolumnManager = trade_queue_manager.THSBuy1VolumnManager() __buyL2SafeCountManager = safe_count_manager.BuyL2SafeCountManager() __l2PlaceOrderParamsManagerDict = {} @classmethod # 数据处理入口 # datas: 本次截图数据 # capture_timestamp:截图时间戳 def process(cls, code, datas, capture_timestamp): __start_time = round(t.time() * 1000) try: if len(datas) > 0: # 判断价格区间是否正确 if not code_data_util.is_same_code_with_price(code, float(datas[0]["val"]["price"])): raise L2DataException(L2DataException.CODE_PRICE_ERROR, "股价不匹配 code-{} price-{}".format(code, datas[0]["val"]["price"])) # 加载历史数据,返回数据是否正常 is_normal = l2.l2_data_util.load_l2_data(code) if not is_normal: print("历史数据异常:", code) # 数据不正常需要禁止交易 l2_trade_util.forbidden_trade(code) # 纠正数据 datas = l2.l2_data_util.L2DataUtil.correct_data(code, local_latest_datas.get(code), datas) _start_index = 0 if local_today_datas.get(code) is not None and len( local_today_datas[code]) > 0: _start_index = local_today_datas[code][-1]["index"] + 1 add_datas = l2.l2_data_util.L2DataUtil.get_add_data(code, local_latest_datas.get(code), datas, _start_index) # -------------数据增量处理------------ try: cls.process_add_datas(code, add_datas, capture_timestamp, __start_time) finally: # 保存数据 __start_time = round(t.time() * 1000) l2.l2_data_util.save_l2_data(code, datas, add_datas) __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time, "保存数据时间({})".format(len(add_datas))) finally: if code in cls.unreal_buy_dict: cls.unreal_buy_dict.pop(code) @classmethod def process_add_datas(cls, code, add_datas, capture_timestamp, __start_time): now_time_str = tool.get_now_time_str() if len(add_datas) > 0: print(id(local_today_datas)) # 拼接数据 local_today_datas[code].extend(add_datas) l2.l2_data_util.load_num_operate_map(local_today_num_operate_map, code, add_datas) # 第1条数据是否为09:30:00 if add_datas[0]["val"]["time"] == "09:30:00": if global_util.cuurent_prices.get(code): price_data = global_util.cuurent_prices.get(code) if price_data[1]: # 当前涨停价,设置涨停时间 logger_l2_process.info("开盘涨停:{}", code) # 保存涨停时间 limit_up_time_manager.save_limit_up_time(code, "09:30:00") total_datas = local_today_datas[code] __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time, "l2数据预处理时间") if len(add_datas) > 0: # 是否为首板代码 is_first_code = gpcode_manager.FirstCodeManager.is_in_first_record(code) # 计算量 volume_rate = code_volumn_manager.get_volume_rate(code) volume_rate_index = code_volumn_manager.get_volume_rate_index(volume_rate) # 计算分值 limit_up_time = limit_up_time_manager.get_limit_up_time(code) if limit_up_time is None: limit_up_time = tool.get_now_time_str() score = first_code_score_manager.get_score(code, volume_rate, limit_up_time) cls.__l2PlaceOrderParamsManagerDict[code] = l2_trade_factor.L2PlaceOrderParamsManager(code, is_first_code, volume_rate, volume_rate_index, score) l2_log.debug(code, "量比:{},量索引:{}", volume_rate, volume_rate_index) cls.volume_rate_info[code] = (volume_rate, volume_rate_index) latest_time = add_datas[len(add_datas) - 1]["val"]["time"] # 时间差不能太大才能处理 if not l2_trade_util.is_in_forbidden_trade_codes(code): # 判断是否已经挂单 state = trade_manager.get_trade_state(code) start_index = len(total_datas) - len(add_datas) end_index = len(total_datas) - 1 if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER or state == trade_manager.TRADE_STATE_BUY_SUCCESS: # 已挂单 cls.__process_order(code, start_index, end_index, capture_timestamp, is_first_code) else: # 未挂单,时间相差不大才能挂单 if l2.l2_data_util.L2DataUtil.is_same_time(now_time_str, latest_time): cls.__process_not_order(code, start_index, end_index, capture_timestamp, is_first_code) logger_l2_process.info("code:{} 处理数据范围: {}-{} 处理时间:{} 截图时间戳:{}", code, add_datas[0]["index"], add_datas[-1]["index"], round(t.time() * 1000) - __start_time, capture_timestamp) __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time, "l2数据处理时间") # 处理未挂单 @classmethod def __process_not_order(cls, code, start_index, end_index, capture_time, is_first_code): __start_time = round(t.time() * 1000) # 获取阈值 threshold_money, msg = cls.__get_threshmoney(code) if round(t.time() * 1000) - __start_time > 10: __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time, "获取m值数据耗时") cls.__start_compute_buy(code, start_index, end_index, threshold_money, capture_time, is_first_code) # 测试专用 @classmethod def process_order(cls, code, start_index, end_index, capture_time, is_first_code, new_add=True): cls.__process_order(code, start_index, end_index, capture_time, is_first_code, new_add) # 处理已挂单 @classmethod def __process_order(cls, code, start_index, end_index, capture_time, is_first_code, new_add=True): # 计算安全笔数 @dask.delayed def compute_safe_count(): _start_time = round(t.time() * 1000) # 处理安全笔数 cls.__buyL2SafeCountManager.compute_left_rate(code, start_index, end_index, total_data, local_today_num_operate_map.get(code)) l2_data_log.l2_time(code, round(t.time() * 1000) - _start_time, "已下单-获取买入信息耗时") return None, "" @dask.delayed # m值大单计算 def compute_m_big_num(): _start_time = round(t.time() * 1000) # 计算m值大单 cls.l2BigNumForMProcessor.process(code, max(buy_single_index, start_index), end_index, gpcode_manager.get_limit_up_price(code)) l2_data_log.l2_time(code, round(t.time() * 1000) - _start_time, "已下单-m值大单计算") return None, "" # 买1撤计算 @dask.delayed def buy_1_cancel(): _start_time = round(t.time() * 1000) # 撤单计算,只看买1 cancel_data, cancel_msg = L2LimitUpMoneyStatisticUtil.process_data(code, start_index, end_index, buy_single_index, buy_exec_index) l2_data_log.l2_time(code, round(t.time() * 1000) - _start_time, "已下单-买1统计耗时") # 买1不会触发撤单 return None, "" # return cancel_data, cancel_msg # S撤 @dask.delayed def s_cancel(): _start_time = round(t.time() * 1000) # S撤单计算,看秒级大单撤单 try: b_need_cancel, b_cancel_data = SecondCancelBigNumComputer.need_cancel(code, buy_single_index, buy_exec_index, start_index, end_index, total_data, code_volumn_manager.get_volume_rate_index( buy_volume_rate), cls.volume_rate_info[code][1], is_first_code) if b_need_cancel: return b_cancel_data, "S大单撤销比例触发阈值" except Exception as e: logging.exception(e) finally: l2_data_log.l2_time(code, round(t.time() * 1000) - _start_time, "已下单-s级大单估算") return None, "" # H撤 @dask.delayed def h_cancel(): _start_time = round(t.time() * 1000) try: b_need_cancel, b_cancel_data = HourCancelBigNumComputer.need_cancel(code, buy_exec_index, start_index, end_index, total_data, local_today_num_operate_map.get( code), code_volumn_manager.get_volume_rate_index( buy_volume_rate), cls.volume_rate_info[code][1], is_first_code) if b_need_cancel and b_cancel_data: return b_cancel_data, "H撤销比例触发阈值" except Exception as e: logging.exception(e) finally: l2_data_log.l2_time(code, round(t.time() * 1000) - _start_time, "已下单-H撤大单计算") return None, "" # 板上卖撤 @dask.delayed def sell_cancel(): _start_time = round(t.time() * 1000) # 统计板上卖 try: cancel_data, cancel_msg = L2LimitUpSellStatisticUtil.process(code, start_index, end_index, buy_exec_index) return cancel_data, cancel_msg except Exception as e: logging.exception(e) finally: l2_data_log.l2_time(code, round(t.time() * 1000) - _start_time, "已下单-板上卖耗时") return None, "" # 是否需要撤销 @dask.delayed def is_need_cancel(*args): try: for i in range(0, len(args)): _cancel_data, _cancel_msg = args[i] if _cancel_data: return _cancel_data, _cancel_msg except Exception as e: logging.exception(e) finally: pass return None, "" if start_index < 0: start_index = 0 if end_index < start_index: return total_data = local_today_datas.get(code) _start_time = tool.get_now_timestamp() # 获取买入信号起始点 buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set, buy_volume_rate = cls.__get_order_begin_pos( code) f1 = compute_safe_count() f2 = compute_m_big_num() f3 = s_cancel() f4 = h_cancel() f5 = buy_1_cancel() f6 = sell_cancel() dask_result = is_need_cancel(f1, f2, f3, f4, f5, f6) if is_first_code: dask_result = is_need_cancel(f3, f4) cancel_data, cancel_msg = dask_result.compute() _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, "已下单-撤单 判断是否需要撤单") if cancel_data: l2_log.debug(code, "触发撤单,撤单位置:{} ,撤单原因:{}", cancel_data["index"], cancel_msg) # 撤单 if cls.cancel_buy(code, cancel_msg): _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, "已下单-撤单 耗时") # 撤单成功,继续计算下单 cls.__process_not_order(code, cancel_data["index"] + 1, end_index, capture_time, is_first_code) _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, "处理剩余数据 耗时") else: # 撤单尚未成功 pass else: # 如果有虚拟下单需要真实下单 unreal_buy_info = cls.unreal_buy_dict.get(code) if unreal_buy_info is not None: l2_log.debug(code, "有虚拟下单,无买撤信号,开始执行买入,执行位置:{},截图时间:{}", unreal_buy_info[0], capture_time) # unreal_buy_info 的内容格式为:(触法买操作下标,截图时间) # 真实下单 cls.__buy(code, unreal_buy_info[1], local_today_datas[code][unreal_buy_info[0]], unreal_buy_info[0], is_first_code) _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, "已虚拟下单-执行真实下单 外部耗时") @classmethod def __buy(cls, code, capture_timestamp, last_data, last_data_index, is_first_code): __start_time = tool.get_now_timestamp() can, need_clear_data, reason = False,False,"" if not is_first_code: can, need_clear_data, reason = cls.__can_buy(code) else: can, need_clear_data, reason = cls.__can_buy_first(code) __start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - __start_time, "最后判断是否能下单", force=True) # 删除虚拟下单 if code in cls.unreal_buy_dict: cls.unreal_buy_dict.pop(code) if not can: l2_log.debug(code, "不可以下单,原因:{}", reason) if need_clear_data: buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set, buy_volume_rate = cls.__get_order_begin_pos( code) trade_result_manager.real_cancel_success(code, buy_single_index, buy_exec_index, local_today_datas.get(code)) return else: l2_log.debug(code, "可以下单,原因:{}", reason) try: l2_log.debug(code, "开始执行买入") trade_manager.start_buy(code, capture_timestamp, last_data, last_data_index) ################下单成功处理################ trade_result_manager.real_buy_success(code) l2_log.debug(code, "执行买入成功") except Exception as e: l2_log.debug(code, "执行买入异常:{}", str(e)) pass finally: l2_log.debug(code, "m值影响因子:{}", l2_trade_factor.L2TradeFactorUtil.factors_to_string(code)) # 是否可以取消 @classmethod def __can_cancel(cls, code): if constant.TEST: return True, "" if l2_trade_util.WhiteListCodeManager.is_in(code): return False, "代码在白名单中" # 暂时注释掉 # 14点后如果是板块老大就不需要取消了 # now_time_str = tool.get_now_time_str() # if int(now_time_str.replace(":", "")) >= 140000: # industry, codes = ths_industry_util.get_same_industry_codes(code, gpcode_manager.get_gp_list()) # if industry is None: # return True, "没有获取到行业" # codes_index = industry_codes_sort.sort_codes(codes, code) # if codes_index is not None and codes_index.get(code) is not None: # # 同一板块中老二后面的不能买 # if codes_index.get(code) == 0: # return False, "14:00后老大不能撤单" # elif codes_index.get(code) == 1: # # 判断老大是否都是09:30:00涨停的 # # 同1板块老大是09:30:00涨停,老二14:00砸开的不撤 # first_count = 0 # for key in codes_index: # if codes_index[key] == 0: # first_count += 1 # if limit_up_time_manager.get_limit_up_time(key) == "09:30:00": # first_count -= 1 # if first_count == 0: # return False, "14:00后老大都开盘涨停,老二不能撤单" return True, "" # 是否可以买 # 返回是否可以买,是否需要清除之前的买入信息,原因 @classmethod def __can_buy(cls, code): __start_time = t.time() # 之前的代码 # 首板代码且尚未涨停过的不能下单 # is_limited_up = gpcode_manager.FirstCodeManager.is_limited_up(code) # if not is_limited_up: # gpcode_manager.FirstCodeManager.add_limited_up_record([code]) # place_order_count = trade_data_manager.placeordercountmanager.get_place_order_count( # code) # if place_order_count == 0: # trade_data_manager.placeordercountmanager.place_order(code) # return False, True, "首板代码,且尚未涨停过" try: # 买1价格必须为涨停价才能买 # buy1_price = cls.buy1PriceManager.get_price(code) # if buy1_price is None: # return False, "买1价尚未获取到" # limit_up_price = gpcode_manager.get_limit_up_price(code) # if limit_up_price is None: # return False, "尚未获取到涨停价" # if abs(float(buy1_price) - float(limit_up_price)) >= 0.01: # return False, "买1价不为涨停价,买1价-{} 涨停价-{}".format(buy1_price, limit_up_price) # 从买入信号起始点到当前数据末尾的纯买手数与当前的卖1做比较,如果比卖1小则不能买入 total_datas = local_today_datas[code] if total_datas[-1]["index"] + 1 > len(total_datas): return False, True, "L2数据错误" try: sell1_time, sell1_price, sell1_volumn = cls.__ths_l2_trade_queue_manager.get_sell1_info(code) l2_log.buy_debug(code, "卖1信息为:({},{},{})", sell1_time, sell1_price, sell1_volumn) if sell1_time is not None and sell1_volumn > 0: # 获取执行位信息 buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set, buy_volume_rate = cls.__get_order_begin_pos( code) buy_nums = num for i in range(buy_exec_index + 1, total_datas[-1]["index"] + 1): _val = total_datas[i]["val"] # 涨停买 if L2DataUtil.is_limit_up_price_buy(_val): # 涨停买 buy_nums += _val["num"] * total_datas[i]["re"] elif L2DataUtil.is_limit_up_price_buy_cancel(_val): buy_nums -= _val["num"] * total_datas[i]["re"] if buy_nums < sell1_volumn * 0.49: return False, False, "纯买量({})小于卖1量的49%{} 卖1时间:{}".format(buy_nums, sell1_volumn, sell1_time) except Exception as e: logging.exception(e) # 量比超过1.3的不能买 volumn_rate = cls.volume_rate_info[code][0] if volumn_rate >= 1.3: return False, False, "最大量比超过1.3不能买" limit_up_time = limit_up_time_manager.get_limit_up_time(code) if limit_up_time is not None: limit_up_time_seconds = l2.l2_data_util.L2DataUtil.get_time_as_second( limit_up_time) if limit_up_time_seconds >= l2.l2_data_util.L2DataUtil.get_time_as_second( "13:00:00"): return False, False, "二板下午涨停的不能买,涨停时间为{}".format(limit_up_time) if limit_up_time_seconds >= l2.l2_data_util.L2DataUtil.get_time_as_second("14:55:00"): return False, False, "14:55后涨停的不能买,涨停时间为{}".format(limit_up_time) # 同一板块中老二后面的不能买 industry, codes = ths_industry_util.get_same_industry_codes(code, gpcode_manager.get_gp_list()) if industry is None: return True, False, "没有获取到行业" codes_index = industry_codes_sort.sort_codes(codes, code) if codes_index is not None and codes_index.get(code) is not None and codes_index.get(code) > 1: # 当老大老二当前没涨停 return False, False, "同一板块中老三,老四,...不能买" if cls.__codeActualPriceProcessor.is_under_water(code, total_datas[-1]["val"]["time"]): # 水下捞且板块中的票小于16不能买 # if global_util.industry_hot_num.get(industry) is not None and global_util.industry_hot_num.get( # industry) <= 16: # return False, "水下捞,板块中的票小于2只,为{}".format(global_util.industry_hot_num.get(industry)) # 水下捞自由流通市值大于老大的不要买 if codes_index.get(code) != 0: # 获取老大的市值 for c in codes_index: if codes_index.get(c) == 0 and global_util.zyltgb_map.get(code) > global_util.zyltgb_map.get(c): return False, False, "水下捞,不是老大,且自由流通市值大于老大" # 13:30后涨停,本板块中涨停票数<29不能买 # if limit_up_time is not None: # if int(limit_up_time.replace(":", "")) >= 133000 and global_util.industry_hot_num.get(industry) is not None: # if global_util.industry_hot_num.get(industry) < 16: # return False, "13:30后涨停,本板块中涨停票数<16不能买" if codes_index.get(code) is not None and codes_index.get(code) == 1: # 如果老大已经买成功了, 老二就不需要买了 first_codes = [] for key in codes_index: if codes_index.get(key) == 0: first_codes.append(key) # 暂时注释掉 # for key in first_codes: # state = trade_manager.get_trade_state(key) # if state == trade_manager.TRADE_STATE_BUY_SUCCESS: # # 老大已经买成功了 # return False, "老大{}已经买成功,老二无需购买".format(key) # # # 有9点半涨停的老大才能买老二,不然不能买 # # 获取老大的涨停时间 # for key in first_codes: # # 找到了老大 # time_ = limit_up_time_manager.get_limit_up_time(key) # if time_ == "09:30:00": # return True, "9:30涨停的老大,老二可以下单" # return False, "老大非9:30涨停,老二不能下单" # 过时 老二,本板块中涨停票数<29 不能买 # if codes_index.get(code) is not None and codes_index.get(code) == 1 and global_util.industry_hot_num.get( # industry) is not None: # if global_util.industry_hot_num.get(industry) < 29: # return False, "老二,本板块中涨停票数<29不能买" # 可以下单 return True, False, None finally: l2_data_log.l2_time(code, round((t.time() - __start_time) * 1000), "是否可以下单计算") @classmethod def __can_buy_first(cls, code): if not gpcode_manager.WantBuyCodesManager.is_in(code): # 查看分数等级 score_index = cls.__l2PlaceOrderParamsManagerDict[code].score_index score = cls.__l2PlaceOrderParamsManagerDict[code].score if score_index < 0: return False, True, f"分值:{score}未达到需要买入的分数线" if -1 < score_index < 3: return True, False, f"分值:{score}达到主动买入的分数线,买入等级:f{score_index}" is_limited_up = gpcode_manager.FirstCodeManager.is_limited_up(code) gpcode_manager.FirstCodeManager.add_limited_up_record([code]) if not code_price_manager.Buy1PriceManager.is_can_buy(code): return False, True, f"首板代码,没在想要买名单中且未打开涨停板,分数:{score}" if not is_limited_up: return False, True, f"首板代码,没在想要买名单中且未涨停过,分数:{score}" return True, False, "" else: return True, False, "在想买名单中" @classmethod def __cancel_buy(cls, code): try: l2_log.debug(code, "开始执行撤单") trade_manager.start_cancel_buy(code) l2_log.debug(code, "执行撤单成功") return True except Exception as e: logging.exception(e) l2_log.debug(code, "执行撤单异常:{}", str(e)) return False @classmethod def cancel_buy(cls, code, msg=None, source="l2"): # 是否是交易队列触发 buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set, buy_volume_rate = cls.__get_order_begin_pos( code) total_datas = local_today_datas[code] if source == "trade_queue": # 交易队列触发的需要下单后5s if buy_exec_index is not None and buy_exec_index > 0: now_time_str = tool.get_now_time_str() if tool.trade_time_sub(now_time_str, total_datas[buy_exec_index]["val"]["time"]) < 5: return False if code in cls.unreal_buy_dict: cls.unreal_buy_dict.pop(code) # 取消买入标识 trade_result_manager.virtual_cancel_success(code, buy_single_index, buy_exec_index, total_datas) else: can_cancel, reason = cls.__can_cancel(code) if not can_cancel: # 不能取消 l2_log.cancel_debug(code, "撤单中断,原因:{}", reason) l2_log.debug(code, "撤单中断,原因:{}", reason) return False cancel_result = cls.__cancel_buy(code) if cancel_result: trade_result_manager.real_cancel_success(code, buy_single_index, buy_exec_index, total_datas) l2_log.debug(code, "执行撤单结束,原因:{}", msg) return True # 虚拟下单 @classmethod def __virtual_buy(cls, code, buy_single_index, buy_exec_index, capture_time): cls.unreal_buy_dict[code] = (buy_exec_index, capture_time) trade_result_manager.virtual_buy_success(code) @classmethod def __start_compute_buy(cls, code, compute_start_index, compute_end_index, threshold_money, capture_time, is_first_code, new_add=True): if compute_end_index < compute_start_index: return _start_time = tool.get_now_timestamp() total_datas = local_today_datas[code] # 处理安全笔数 cls.__buyL2SafeCountManager.compute_left_rate(code, compute_start_index, compute_end_index, total_datas, local_today_num_operate_map.get(code)) # 获取买入信号计算起始位置 buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set, buy_volume_rate = cls.__get_order_begin_pos( code) # 是否为新获取到的位置 new_get_single = False if buy_single_index is None: continue_count = cls.__l2PlaceOrderParamsManagerDict[code].get_begin_continue_buy_count() # 有买入信号 has_single, _index = cls.__compute_order_begin_pos(code, max( (compute_start_index - continue_count - 1) if new_add else compute_start_index, 0), continue_count, compute_end_index) buy_single_index = _index if has_single: new_get_single = True num = 0 count = 0 l2_log.debug(code, "获取到买入信号起始点:{} ,计算范围:{}-{} ,数据:{}", buy_single_index, compute_start_index, compute_end_index, total_datas[buy_single_index]) # 如果是今天第一次有下单开始信号,需要设置大单起始点 cls.l2BigNumForMProcessor.set_begin_pos(code, buy_single_index) _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, "下单信号计算时间") if buy_single_index is None: # 未获取到买入信号,终止程序 return None # 开始计算的位置 start_process_index = max(buy_single_index, compute_start_index) if new_get_single: start_process_index = buy_single_index # 计算m值大单 cls.l2BigNumForMProcessor.process(code, start_process_index, compute_end_index, gpcode_manager.get_limit_up_price(code)) _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, "计算m值大单") threshold_money, msg = cls.__get_threshmoney(code) _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, "m值阈值计算") # 买入纯买额统计 compute_index, buy_nums, buy_count, rebegin_buy_pos, max_num_set_new = cls.__sum_buy_num_for_order_3(code, start_process_index, compute_end_index, num, count, threshold_money, buy_single_index, max_num_set) _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, "纯买额统计时间") l2_log.debug(code, "m值-{} 量比:{}", threshold_money, cls.volume_rate_info[code][0]) # 买入信号位与计算位置间隔2s及以上了 if rebegin_buy_pos is not None: # 需要重新计算纯买额 cls.__start_compute_buy(code, rebegin_buy_pos, compute_end_index, threshold_money, capture_time, is_first_code, False) return if compute_index is not None: l2_log.debug(code, "获取到买入执行位置:{} m值:{} 纯买手数:{} 纯买单数:{} 数据:{} ,量比:{} ", compute_index, threshold_money, buy_nums, buy_count, total_datas[compute_index],cls.volume_rate_info[code][0]) f1 = dask.delayed(cls.__save_order_begin_data)(code, buy_single_index, compute_index, compute_index, buy_nums, buy_count, max_num_set_new, cls.volume_rate_info[code][0]) f2 = dask.delayed(limit_up_time_manager.save_limit_up_time)(code, total_datas[compute_index]["val"]["time"]) f3 = dask.delayed(cls.__virtual_buy)(code, buy_single_index, compute_index, capture_time) f4 = dask.delayed(l2_data_manager.TradePointManager.delete_buy_cancel_point)(code) f5 = dask.delayed(L2LimitUpMoneyStatisticUtil.process_data)(code, buy_single_index, compute_index, buy_single_index, buy_exec_index, False) dask.compute(f1, f2, f3, f4, f5) # 已被并行处理 # # 记录买入信号位置 # cls.__save_order_begin_data(code, buy_single_index, compute_index, compute_index, buy_nums, buy_count, # max_num_set_new) # # 如果是今天第一次有下单执行信号,涨停时间(买入执行位时间) # limit_up_time_manager.save_limit_up_time(code, total_datas[compute_index]["val"]["time"]) # # 虚拟下单 # cls.__virtual_buy(code, buy_single_index, compute_index, capture_time) # # 删除之前的所有撤单信号 # l2_data_manager.TradePointManager.delete_buy_cancel_point(code) # # # 涨停封单额计算 # L2LimitUpMoneyStatisticUtil.process_data(cls.random_key[code], code, buy_single_index, compute_index, # buy_single_index, # buy_exec_index, False) _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, "记录执行买入数据", force=True) # 数据是否处理完毕 if compute_index >= compute_end_index: need_cancel, cancel_data = SecondCancelBigNumComputer.need_cancel(code, buy_single_index, compute_index, buy_single_index, compute_index, total_datas, is_first_code, cls.volume_rate_info[code][1], cls.volume_rate_info[code][1], True) _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, "S级大单处理耗时", force=True) l2_log.debug(code, "数据处理完毕,下单, 数据截图时间-{}", capture_time) # 数据已经处理完毕,如果还没撤单就实际下单 if need_cancel: if cls.cancel_buy(code, "S级大单撤销"): # 执行撤单成功 pass else: cls.__buy(code, capture_time, total_datas[compute_index], compute_index, is_first_code) else: SecondCancelBigNumComputer.need_cancel(code, buy_single_index, compute_index, buy_single_index, compute_index, total_datas, is_first_code, cls.volume_rate_info[code][1], cls.volume_rate_info[code][1], False) _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, "S级大单处理耗时", force=True) # 数据尚未处理完毕,进行下一步处理 l2_log.debug(code, "数据尚未处理完毕,进行下一步处理,处理进度:{}", compute_index) # 处理撤单步骤 cls.__process_order(code, compute_index + 1, compute_end_index, capture_time, is_first_code, False) _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, f"处理撤单步骤耗时,范围:{compute_index + 1}-{compute_end_index}", force=True) else: # 未达到下单条件,保存纯买额,设置纯买额 # 记录买入信号位置 cls.__save_order_begin_data(code, buy_single_index, -1, compute_end_index, buy_nums, buy_count, max_num_set_new, None) print("保存大单时间", round((t.time() - _start_time) * 1000)) _start_time = t.time() pass # 获取下单起始信号 @classmethod def __get_order_begin_pos(cls, code): buy_single_index, buy_exec_index, compute_index, num, count, max_num_set, volume_rate = l2_data_manager.TradePointManager.get_buy_compute_start_data( code) return buy_single_index, buy_exec_index, compute_index, num, count, max_num_set, volume_rate # 保存下单起始信号 @classmethod def __save_order_begin_data(self, code, buy_single_index, buy_exec_index, compute_index, num, count, max_num_set, volume_rate): TradePointManager.set_buy_compute_start_data(code, buy_single_index, buy_exec_index, compute_index, num, count, max_num_set, volume_rate) # 计算下单起始信号 # compute_data_count 用于计算的l2数据数量 @classmethod def __compute_order_begin_pos(cls, code, start_index, continue_count, end_index): second_930 = 9 * 3600 + 30 * 60 + 0 # 倒数100条数据查询 datas = local_today_datas[code] if end_index - start_index + 1 < continue_count: return False, None __time = None last_index = None count = 0 start = None for i in range(start_index, end_index + 1): _val = datas[i]["val"] # 时间要>=09:30:00 if L2DataUtil.get_time_as_second(_val["time"]) < second_930: continue if L2DataUtil.is_limit_up_price_buy(_val): if last_index is None or (datas[last_index]["val"]["time"] == datas[i]["val"]["time"]): if start is None: start = i last_index = i count += datas[i]["re"] if count >= continue_count: return True, start else: # 本条数据作为起点 last_index = i count = datas[i]["re"] start = i elif not L2DataUtil.is_sell(_val) and not L2DataUtil.is_sell_cancel(_val): # 剔除卖与卖撤 last_index = None count = 0 start = None return False, None @classmethod def __get_threshmoney(cls, code): return cls.__l2PlaceOrderParamsManagerDict[code].get_m_val() # 计算万手哥笔数 @classmethod def __compute_big_money_count(cls, total_datas, start_index, end_index): count = 0 for i in range(start_index, end_index + 1): if L2DataUtil.is_limit_up_price_buy(total_datas[i]["val"]): count += total_datas[i]["re"] elif L2DataUtil.is_limit_up_price_buy_cancel(total_datas[i]["val"]): count -= total_datas[i]["re"] return count # 统计买入净买量,不计算在买入信号之前的买撤单 @classmethod def __sum_buy_num_for_order_3(cls, code, compute_start_index, compute_end_index, origin_num, origin_count, threshold_money, buy_single_index, max_num_set): def get_threshold_count(): count = threshold_count return count _start_time = t.time() total_datas = local_today_datas[code] is_first_code = gpcode_manager.FirstCodeManager.is_in_first_record(code) buy_nums = origin_num buy_count = origin_count limit_up_price = gpcode_manager.get_limit_up_price(code) if limit_up_price is None: raise Exception("涨停价无法获取") # 目标手数 threshold_num = round(threshold_money / (limit_up_price * 100)) # place_order_count = trade_data_manager.placeordercountmanager.get_place_order_count(code) # 目标订单数量 threshold_count = cls.__l2PlaceOrderParamsManagerDict[code].get_safe_count() buy_single_time_seconds = L2DataUtil.get_time_as_second(total_datas[buy_single_index]["val"]["time"]) # 可以触发买,当有涨停买信号时才会触发买 trigger_buy = True # 间隔最大时间依次为:3,9,27,81 max_space_time = cls.__l2PlaceOrderParamsManagerDict[code].get_time_range() # 最大买量 max_buy_num = 0 max_buy_num_set = set(max_num_set) # 需要的最小大单笔数 big_num_count = cls.__l2PlaceOrderParamsManagerDict[code].get_big_num_count() # 较大单的手数 bigger_num = round(5900 / limit_up_price) for i in range(compute_start_index, compute_end_index + 1): data = total_datas[i] _val = total_datas[i]["val"] trigger_buy = False # 必须为连续3秒内的数据 if L2DataUtil.get_time_as_second(_val["time"]) - buy_single_time_seconds > max_space_time: TradePointManager.delete_buy_point(code) if i == compute_end_index: # 数据处理完毕 return None, buy_nums, buy_count, None, max_buy_num_set else: # 计算买入信号,不能同一时间开始计算 for ii in range(buy_single_index + 1, compute_end_index + 1): if total_datas[buy_single_index]["val"]["time"] != total_datas[ii]["val"]["time"]: return None, buy_nums, buy_count, ii, max_buy_num_set # 涨停买 if L2DataUtil.is_limit_up_price_buy(_val): if l2_data_util.is_big_money(_val): max_buy_num_set.add(i) if _val["num"] >= bigger_num: trigger_buy = True # 只统计59万以上的金额 buy_nums += int(_val["num"]) * int(total_datas[i]["re"]) buy_count += int(total_datas[i]["re"]) if buy_nums >= threshold_num and buy_count >= get_threshold_count(): logger_l2_trade_buy.info( f"{code}获取到买入执行点:{i} 统计纯买手数:{buy_nums} 目标纯买手数:{threshold_num} 统计纯买单数:{buy_count} 目标纯买单数:{get_threshold_count()}, 大单数量:{len(max_buy_num_set)}") elif L2DataUtil.is_limit_up_price_buy_cancel(_val): if _val["num"] >= bigger_num: # 只统计59万以上的金额 # 涨停买撤 # 判断买入位置是否在买入信号之前 buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data(code, total_datas[i], local_today_num_operate_map.get( code)) if buy_index is not None: # 找到买撤数据的买入点 if buy_index >= buy_single_index: buy_nums -= int(_val["num"]) * int(data["re"]) buy_count -= int(data["re"]) # 大单撤销 max_buy_num_set.discard(buy_index) l2_log.buy_debug(code, "{}数据在买入信号之后 撤买纯买手数:{} 目标手数:{}", i, buy_nums, threshold_num) else: l2_log.buy_debug(code, "{}数据在买入信号之前,买入位:{}", i, buy_index) if total_datas[buy_single_index]["val"]["time"] == total_datas[buy_index]["val"]["time"]: # 同一秒,当作买入信号之后处理 buy_nums -= int(_val["num"]) * int(data["re"]) buy_count -= int(data["re"]) # 大单撤销 max_buy_num_set.discard(buy_index) l2_log.buy_debug(code, "{}数据买入位与预估买入位在同一秒", i) else: # 未找到买撤数据的买入点 l2_log.buy_debug(code, "未找到买撤数据的买入点: 位置-{} 数据-{}", i, data) buy_nums -= int(_val["num"]) * int(total_datas[i]["re"]) buy_count -= int(total_datas[i]["re"]) l2_log.buy_debug(code, "位置-{},总手数:{},目标手数:{}", i, buy_nums, threshold_num) # 有撤单信号,且小于阈值 if buy_nums >= threshold_num and buy_count >= get_threshold_count() and trigger_buy and len( max_buy_num_set) >= big_num_count: return i, buy_nums, buy_count, None, max_buy_num_set l2_log.buy_debug(code, "尚未获取到买入执行点,起始计算位置:{} 统计纯买手数:{} 目标纯买手数:{} 统计纯买单数:{} 目标纯买单数:{} 大单数量:{} 目标大单数量:{}", compute_start_index, buy_nums, threshold_num, buy_count, get_threshold_count(), len(max_buy_num_set), big_num_count) return None, buy_nums, buy_count, None, max_buy_num_set if __name__ == "__main__": # trade_manager.start_cancel_buy("000637") # t.sleep(10) # L2TradeDataProcessor.test() # L2LimitUpMoneyStatisticUtil.verify_num("601958", 89178, "13:22:45") # load_l2_data("600213") # # buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(local_today_datas["600213"][84], # local_today_num_operate_map.get( # "600213")) # print(buy_index, buy_data) dict_ = {"code": 0} dict_.clear() print(dict_)