import datetime import json import logging import random import time as t import big_money_num_manager import code_data_util import constant import global_util import gpcode_manager import industry_codes_sort import l2_data_log import l2_data_manager import l2_data_util import l2_trade_factor import l2_trade_test import l2_trade_util import limit_up_time_manager import redis_manager import ths_industry_util import tool import trade_manager import trade_queue_manager import trade_data_manager from l2_data_manager import L2DataException, TradePointManager, local_today_datas, L2DataUtil, load_l2_data, \ local_today_num_operate_map from log import logger_l2_trade, logger_l2_trade_cancel, logger_l2_trade_buy, logger_l2_process, logger_buy_1_volumn, \ logger_l2_error # TODO l2数据管理 from trade_data_manager import CodeActualPriceProcessor class L2DataManager: # 格式化数据 def format_data(self, datas): format_datas = [] for data in datas: format_datas.append({"val": data, "re": 1}) return format_datas # 获取新增数据 def get_add_datas(self, format_datas): pass # 从数据库加载数据 def load_data(self, code=None, force=False): pass # 保存数据 def save_datas(self, add_datas, datas): pass # m值大单处理 class L2BigNumForMProcessor: def __init__(self): self._redis_manager = redis_manager.RedisManager(1) def __get_redis(self): return self._redis_manager.getRedis() # 保存计算开始位置 def set_begin_pos(self, code, index): if self.__get_begin_pos(code) is None: # 保存位置 key = "m_big_money_begin-{}".format(code) self.__get_redis().setex(key, tool.get_expire(), index) # 获取计算开始位置 def __get_begin_pos(self, code): key = "m_big_money_begin-{}".format(code) val = self.__get_redis().get(key) if val is None: return None return int(val) # 清除已经处理的数据 def clear_processed_end_index(self, code): key = "m_big_money_process_index-{}".format(code) self.__get_redis().delete(key) # 添加已经处理过的单 def __set_processed_end_index(self, code, index): key = "m_big_money_process_index-{}".format(code) self.__get_redis().setex(key, tool.get_expire(), index) # 是否已经处理过 def __get_processed_end_index(self, code): key = "m_big_money_process_index-{}".format(code) val = self.__get_redis().get(key) if val is None: return None return int(val) # 处理大单 def process(self, code, start_index, end_index, limit_up_price): begin_pos = self.__get_begin_pos(code) if begin_pos is None: # 没有获取到开始买入信号 return # 上次处理到的坐标 processed_index = self.__get_processed_end_index(code) if processed_index is None: processed_index = 0 if processed_index >= end_index: return start_time = round(t.time() * 1000) total_datas = local_today_datas[code] num_splites = [round(5000 / limit_up_price), round(10000 / limit_up_price), round(20000 / limit_up_price), round(30000 / limit_up_price)] total_num = 0 for i in range(max(start_index, processed_index), end_index + 1): data = total_datas[i] if not L2DataUtil.is_limit_up_price_buy_cancel(data["val"]) and not L2DataUtil.is_limit_up_price_buy( data["val"]): continue # 如果是涨停买撤信号需要看数据位置是否比开始处理时间早 if L2DataUtil.is_limit_up_price_buy_cancel(data["val"]): # 获取买入信号 buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(total_datas[i], local_today_num_operate_map.get(code)) if buy_index is not None and buy_index < begin_pos: continue # 计算成交金额 num = int(data["val"]["num"]) temp = 0 if num < num_splites[0]: pass elif num < num_splites[1]: temp = 1 elif num < num_splites[2]: temp = round(4 / 3, 3) elif num < num_splites[3]: temp = 2 else: temp = 4 count = int(temp * data["re"] * 1000) if L2DataUtil.is_limit_up_price_buy_cancel(data["val"]): count = 0 - count total_num += count self.__set_processed_end_index(code, end_index) big_money_num_manager.add_num(code, total_num) print("m值大单计算范围:{}-{} 时间:{}".format(max(start_index, processed_index), end_index, round(t.time() * 1000) - start_time)) class L2TradeDataProcessor: unreal_buy_dict = {} random_key = {} l2BigNumForMProcessor = L2BigNumForMProcessor() __codeActualPriceProcessor = CodeActualPriceProcessor() buy1PriceManager = trade_queue_manager.Buy1PriceManager() __ths_l2_trade_queue_manager = trade_queue_manager.thsl2tradequeuemanager() @classmethod def debug(cls, code, content, *args): logger_l2_trade.debug(("thread-id={} code={} ".format(cls.random_key[code], code) + content).format(*args)) @classmethod def cancel_debug(cls, code, content, *args): logger_l2_trade_cancel.debug( ("thread-id={} code={} ".format(cls.random_key[code], code) + content).format(*args)) @classmethod def buy_debug(cls, code, content, *args): logger_l2_trade_buy.debug( ("thread-id={} code={} ".format(cls.random_key[code], code) + content).format(*args)) @classmethod # 数据处理入口 # datas: 本次截图数据 # capture_timestamp:截图时间戳 def process(cls, code, datas, capture_timestamp): cls.random_key[code] = random.randint(0, 100000) __start_time = round(t.time() * 1000) try: if len(datas) > 0: # 判断价格区间是否正确 if not code_data_util.is_same_code_with_price(code, float(datas[0]["val"]["price"])): raise L2DataException(L2DataException.CODE_PRICE_ERROR, "股价不匹配 code-{} price-{}".format(code, datas[0]["val"]["price"])) # 加载历史数据 l2_data_manager.load_l2_data(code) # 纠正数据 datas = l2_data_manager.L2DataUtil.correct_data(code, datas) _start_index = 0 if local_today_datas.get(code) is not None and len( local_today_datas[code]) > 0: _start_index = local_today_datas[code][-1]["index"] + 1 add_datas = l2_data_manager.L2DataUtil.get_add_data(code, datas, _start_index) # -------------数据增量处理------------ try: cls.process_add_datas(code, add_datas, capture_timestamp, __start_time) finally: # 保存数据 l2_data_manager.save_l2_data(code, datas, add_datas) __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time, "保存数据时间({})".format(len(add_datas))) finally: if code in cls.unreal_buy_dict: cls.unreal_buy_dict.pop(code) @classmethod def process_add_datas(cls, code, add_datas, capture_timestamp, __start_time): if len(add_datas) > 0: now_time_str = tool.get_now_time_str() # 拼接数据 local_today_datas[code].extend(add_datas) l2_data_util.load_num_operate_map(l2_data_manager.local_today_num_operate_map, code, add_datas) # ---------- 判断是否需要计算大单 ----------- try: average_need, buy_single_index, buy_exec_index = AverageBigNumComputer.is_need_compute_average( code, local_today_datas[code][-1]) # 计算平均大单 if average_need: end_index = local_today_datas[code][-1]["index"] if len(add_datas) > 0: end_index = add_datas[-1]["index"] AverageBigNumComputer.compute_average_big_num(code, buy_single_index, buy_single_index, end_index) except Exception as e: logging.exception(e) try: average_need, buy_single_index, buy_exec_index = SecondAverageBigNumComputer.is_need_compute_average( code, local_today_datas[code][-1]) # 计算平均大单 if average_need: end_index = local_today_datas[code][-1]["index"] if len(add_datas) > 0: end_index = add_datas[-1]["index"] SecondAverageBigNumComputer.compute_average_big_num(code, buy_single_index, buy_single_index, end_index) except Exception as e: logging.exception(e) # 第1条数据是否为09:30:00 if add_datas[0]["val"]["time"] == "09:30:00": if global_util.cuurent_prices.get(code): price_data = global_util.cuurent_prices.get(code) if price_data[1]: # 当前涨停价,设置涨停时间 logger_l2_process.info("开盘涨停:{}", code) # 保存涨停时间 limit_up_time_manager.save_limit_up_time(code, "09:30:00") total_datas = local_today_datas[code] __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time, "l2数据预处理时间") if len(add_datas) > 0: latest_time = add_datas[len(add_datas) - 1]["val"]["time"] # 时间差不能太大才能处理 if l2_data_manager.L2DataUtil.is_same_time(now_time_str, latest_time) and not l2_trade_util.is_in_forbidden_trade_codes( code): # 判断是否已经挂单 state = trade_manager.get_trade_state(code) start_index = len(total_datas) - len(add_datas) end_index = len(total_datas) - 1 if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER or state == trade_manager.TRADE_STATE_BUY_SUCCESS: # 已挂单 cls.__process_order(code, start_index, end_index, capture_timestamp) else: # 未挂单 cls.__process_not_order(code, start_index, end_index, capture_timestamp) logger_l2_process.info("code:{} 处理数据范围: {}-{} 处理时间:{} 截图时间戳:{}", code, add_datas[0]["index"], add_datas[-1]["index"], round(t.time() * 1000) - __start_time, capture_timestamp) __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time, "l2数据处理时间") # 处理未挂单 @classmethod def __process_not_order(cls, code, start_index, end_index, capture_time): __start_time = round(t.time() * 1000) # 获取阈值 threshold_money, msg = cls.__get_threshmoney(code) if round(t.time() * 1000) - __start_time > 10: __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time, "获取m值数据耗时") cls.__start_compute_buy(code, start_index, end_index, threshold_money, capture_time) # 测试专用 @classmethod def process_order(cls, code, start_index, end_index, capture_time, new_add=True): cls.__process_order(code, start_index, end_index, capture_time, new_add) # 处理已挂单 @classmethod def __process_order(cls, code, start_index, end_index, capture_time, new_add=True): if start_index < 0: start_index = 0 if end_index < start_index: return # 获取买入信号起始点 buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set = cls.__get_order_begin_pos( code) # 撤单计算,只看买1 cancel_data, cancel_msg = L2LimitUpMoneyStatisticUtil.process_data(code, start_index, end_index, buy_single_index, buy_exec_index) # 撤单计算,看秒级大单撤单 try: b_need_cancel, b_cancel_data = SecondAverageBigNumComputer.need_cancel(code, buy_single_index, buy_exec_index, start_index, end_index) if b_need_cancel and not cancel_data: cancel_data = b_cancel_data cancel_msg = "申报时间截至大单撤销比例触发阈值" except Exception as e: logging.exception(e) # 撤单计算,看分钟级大单撤单 try: b_need_cancel, b_cancel_data = AverageBigNumComputer.need_cancel(code, buy_single_index, buy_exec_index, start_index, end_index) if b_need_cancel and not cancel_data: cancel_data = b_cancel_data cancel_msg = "1分钟内大单撤销比例触发阈值" except Exception as e: logging.exception(e) if not cancel_data: # 统计板上卖 try: cancel_data, cancel_msg = L2LimitUpSellStatisticUtil.process(code, start_index, end_index, buy_exec_index) except Exception as e: logging.exception(e) # 计算m值大单 cls.l2BigNumForMProcessor.process(code, max(buy_single_index, start_index), end_index, gpcode_manager.get_limit_up_price(code)) if cancel_data: if cancel_data["index"] == 175: print("进入调试") cls.debug(code, "触发撤单,撤单位置:{} ,撤单原因:{}", cancel_data["index"], cancel_msg) # 撤单 if cls.cancel_buy(code, cancel_msg): # 撤单成功,继续计算下单 cls.__process_not_order(code, cancel_data["index"] + 1, end_index, capture_time) else: # 撤单尚未成功 pass else: # 如果有虚拟下单需要真实下单 unreal_buy_info = cls.unreal_buy_dict.get(code) if unreal_buy_info is not None: cls.debug(code, "有虚拟下单,无买撤信号,开始执行买入,执行位置:{},截图时间:{}", unreal_buy_info[0], capture_time) # unreal_buy_info 的内容格式为:(触法买操作下标,截图时间) # 真实下单 cls.__buy(code, unreal_buy_info[1], local_today_datas[code][unreal_buy_info[0]], unreal_buy_info[0]) # 判断是否需要计算长大单的信息 try: LongAverageBigNumComputer.compute_average_big_num(code, buy_single_index, buy_exec_index) except Exception as e: logging.exception(e) @classmethod def __buy(cls, code, capture_timestamp, last_data, last_data_index): can, reason = cls.__can_buy(code) # 删除虚拟下单 if code in cls.unreal_buy_dict: cls.unreal_buy_dict.pop(code) if not can: cls.debug(code, "不可以下单,原因:{}", reason) if not reason.startswith("买1价不为涨停价"): # 中断买入 trade_manager.break_buy(code, reason) return else: cls.debug(code, "可以下单,原因:{}", reason) try: cls.debug(code, "开始执行买入") trade_manager.start_buy(code, capture_timestamp, last_data, last_data_index) trade_data_manager.placeordercountmanager.place_order(code) # 获取买入位置信息 try: buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set = cls.__get_order_begin_pos( code) SecondAverageBigNumComputer.place_order_success(code, buy_single_index, buy_exec_index) AverageBigNumComputer.place_order_success(code, buy_single_index, buy_exec_index) LongAverageBigNumComputer.place_order_success(code, buy_single_index, buy_exec_index) except Exception as e: logging.exception(e) logger_l2_error.exception(e) l2_data_manager.TradePointManager.delete_buy_cancel_point(code) cls.debug(code, "执行买入成功") except Exception as e: cls.debug(code, "执行买入异常:{}", str(e)) pass finally: cls.debug(code, "m值影响因子:{}", l2_trade_factor.L2TradeFactorUtil.factors_to_string(code)) # 是否可以取消 @classmethod def __can_cancel(cls, code): if constant.TEST: return True, "" # 暂时注释掉 # 14点后如果是板块老大就不需要取消了 # now_time_str = tool.get_now_time_str() # if int(now_time_str.replace(":", "")) >= 140000: # industry, codes = ths_industry_util.get_same_industry_codes(code, gpcode_manager.get_gp_list()) # if industry is None: # return True, "没有获取到行业" # codes_index = industry_codes_sort.sort_codes(codes, code) # if codes_index is not None and codes_index.get(code) is not None: # # 同一板块中老二后面的不能买 # if codes_index.get(code) == 0: # return False, "14:00后老大不能撤单" # elif codes_index.get(code) == 1: # # 判断老大是否都是09:30:00涨停的 # # 同1板块老大是09:30:00涨停,老二14:00砸开的不撤 # first_count = 0 # for key in codes_index: # if codes_index[key] == 0: # first_count += 1 # if limit_up_time_manager.get_limit_up_time(key) == "09:30:00": # first_count -= 1 # if first_count == 0: # return False, "14:00后老大都开盘涨停,老二不能撤单" return True, "" # 是否可以买 @classmethod def __can_buy(cls, code): # 买1价格必须为涨停价才能买 # buy1_price = cls.buy1PriceManager.get_price(code) # if buy1_price is None: # return False, "买1价尚未获取到" # limit_up_price = gpcode_manager.get_limit_up_price(code) # if limit_up_price is None: # return False, "尚未获取到涨停价" # if abs(float(buy1_price) - float(limit_up_price)) >= 0.01: # return False, "买1价不为涨停价,买1价-{} 涨停价-{}".format(buy1_price, limit_up_price) # 从买入信号起始点到当前数据末尾的纯买手数与当前的卖1做比较,如果比卖1小则不能买入 total_datas = local_today_datas[code] try: sell1_time, sell1_price, sell1_volumn = cls.__ths_l2_trade_queue_manager.get_sell1_info(code) cls.buy_debug(code, "卖1信息为:({},{},{})", sell1_time, sell1_price, sell1_volumn) if sell1_time is not None and sell1_volumn > 0: # 获取执行位信息 buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set = cls.__get_order_begin_pos( code) buy_nums = num for i in range(buy_exec_index + 1, total_datas[-1]["index"] + 1): _val = total_datas[i]["val"] # 涨停买 if L2DataUtil.is_limit_up_price_buy(_val): # 涨停买 buy_nums += int(_val["num"]) * int(total_datas[i]["re"]) elif L2DataUtil.is_limit_up_price_buy_cancel(_val): buy_nums -= int(_val["num"]) * int(total_datas[i]["re"]) if buy_nums < sell1_volumn * 0.49: return False, "纯买量({})小于卖1量的49%{} 卖1时间:{}".format(buy_nums, sell1_volumn, sell1_time) except Exception as e: logging.exception(e) # 量比超过1.3的不能买 volumn_rate = l2_trade_factor.L2TradeFactorUtil.get_volumn_rate_by_code(code) if volumn_rate >= 1.3: return False, "最大量比超过1.3不能买" limit_up_time = limit_up_time_manager.get_limit_up_time(code) if limit_up_time is not None and l2_data_manager.L2DataUtil.get_time_as_second( limit_up_time) >= l2_data_manager.L2DataUtil.get_time_as_second( "14:30:00"): return False, "14:30后涨停的不能买,涨停时间为{}".format(limit_up_time) # 同一板块中老二后面的不能买 industry, codes = ths_industry_util.get_same_industry_codes(code, gpcode_manager.get_gp_list()) if industry is None: return True, "没有获取到行业" codes_index = industry_codes_sort.sort_codes(codes, code) if codes_index is not None and codes_index.get(code) is not None and codes_index.get(code) > 1: # 当老大老二当前没涨停 return False, "同一板块中老三,老四,...不能买" if cls.__codeActualPriceProcessor.is_under_water(code,total_datas[-1]["val"]["time"]): # 水下捞且板块中的票小于16不能买 # if global_util.industry_hot_num.get(industry) is not None and global_util.industry_hot_num.get( # industry) <= 16: # return False, "水下捞,板块中的票小于2只,为{}".format(global_util.industry_hot_num.get(industry)) # 水下捞自由流通市值大于老大的不要买 if codes_index.get(code) != 0: # 获取老大的市值 for c in codes_index: if codes_index.get(c) == 0 and global_util.zyltgb_map.get(code) > global_util.zyltgb_map.get(c): return False, "水下捞,不是老大,且自由流通市值大于老大" # 13:30后涨停,本板块中涨停票数<29不能买 # if limit_up_time is not None: # if int(limit_up_time.replace(":", "")) >= 133000 and global_util.industry_hot_num.get(industry) is not None: # if global_util.industry_hot_num.get(industry) < 16: # return False, "13:30后涨停,本板块中涨停票数<16不能买" if codes_index.get(code) is not None and codes_index.get(code) == 1: # 如果老大已经买成功了, 老二就不需要买了 first_codes = [] for key in codes_index: if codes_index.get(key) == 0: first_codes.append(key) # 暂时注释掉 # for key in first_codes: # state = trade_manager.get_trade_state(key) # if state == trade_manager.TRADE_STATE_BUY_SUCCESS: # # 老大已经买成功了 # return False, "老大{}已经买成功,老二无需购买".format(key) # # # 有9点半涨停的老大才能买老二,不然不能买 # # 获取老大的涨停时间 # for key in first_codes: # # 找到了老大 # time_ = limit_up_time_manager.get_limit_up_time(key) # if time_ == "09:30:00": # return True, "9:30涨停的老大,老二可以下单" # return False, "老大非9:30涨停,老二不能下单" # 过时 老二,本板块中涨停票数<29 不能买 # if codes_index.get(code) is not None and codes_index.get(code) == 1 and global_util.industry_hot_num.get( # industry) is not None: # if global_util.industry_hot_num.get(industry) < 29: # return False, "老二,本板块中涨停票数<29不能买" # 可以下单 return True, None @classmethod def __cancel_buy(cls, code): try: cls.debug(code, "开始执行撤单") trade_manager.start_cancel_buy(code) # 取消买入标识 l2_data_manager.TradePointManager.delete_buy_point(code) l2_data_manager.TradePointManager.delete_buy_cancel_point(code) l2_data_manager.TradePointManager.delete_compute_info_for_cancel_buy(code) l2_data_manager.TradePointManager.delete_count_info_for_cancel_buy(code) # 删除大群撤事件的大单 l2_data_manager.L2BetchCancelBigNumProcessor.del_recod(code) cls.debug(code, "执行撤单成功") except Exception as e: logging.exception(e) cls.debug(code, "执行撤单异常:{}", str(e)) @classmethod def cancel_buy(cls, code, msg=None, source="l2"): # 是否是交易队列触发 if source == "trade_queue": # 交易队列触发的需要下单后5s buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set = cls.__get_order_begin_pos( code) total_datas = local_today_datas[code] if buy_exec_index is not None and buy_exec_index > 0: now_time_str = tool.get_now_time_str() if tool.trade_time_sub(now_time_str, total_datas[buy_exec_index]["val"]["time"]) < 5: return False l2_data_manager.L2ContinueLimitUpCountManager.del_data(code) if code in cls.unreal_buy_dict: cls.unreal_buy_dict.pop(code) # 取消买入标识 l2_data_manager.TradePointManager.delete_buy_point(code) l2_data_manager.TradePointManager.delete_buy_cancel_point(code) l2_data_manager.TradePointManager.delete_compute_info_for_cancel_buy(code) l2_data_manager.TradePointManager.delete_count_info_for_cancel_buy(code) else: can_cancel, reason = cls.__can_cancel(code) if not can_cancel: # 不能取消 cls.cancel_debug(code, "撤单中断,原因:{}", reason) cls.debug(code, "撤单中断,原因:{}", reason) return False cls.__cancel_buy(code) l2_data_manager.L2BigNumProcessor.del_big_num_pos(code) cls.debug(code, "执行撤单成功,原因:{}", msg) return True # 虚拟下单 @classmethod def __virtual_buy(cls, code, buy_single_index, buy_exec_index, capture_time): cls.unreal_buy_dict[code] = (buy_exec_index, capture_time) SecondAverageBigNumComputer.place_order_success(code, buy_single_index, buy_exec_index) AverageBigNumComputer.place_order_success(code, buy_single_index, buy_exec_index) # 删除之前的板上卖信息 L2LimitUpSellStatisticUtil.delete(code) @classmethod def __start_compute_buy(cls, code, compute_start_index, compute_end_index, threshold_money, capture_time, new_add=True): if compute_end_index < compute_start_index: return _start_time = round(t.time() * 1000) total_datas = local_today_datas[code] # 获取买入信号计算起始位置 buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set = cls.__get_order_begin_pos( code) # 是否为新获取到的位置 if buy_single_index is None: place_order_count = trade_data_manager.placeordercountmanager.get_place_order_count(code) continue_count = 3 # 前2次的信号连续笔数为3,后面为2 if place_order_count > 2: continue_count = 2 # 有买入信号 has_single, _index = cls.__compute_order_begin_pos(code, max( compute_start_index - 2 if new_add else compute_start_index, 0), continue_count, compute_end_index) buy_single_index = _index if has_single: num = 0 count = 0 cls.debug(code, "获取到买入信号起始点:{} ,计算范围:{}-{} ,数据:{}", buy_single_index, compute_start_index, compute_end_index, total_datas[buy_single_index]) # 如果是今天第一次有下单开始信号,需要设置大单起始点 cls.l2BigNumForMProcessor.set_begin_pos(code, buy_single_index) _start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - _start_time, "下单信号计算时间") if buy_single_index is None: # 未获取到买入信号,终止程序 return None # 计算m值大单 cls.l2BigNumForMProcessor.process(code, max(buy_single_index, compute_start_index), compute_end_index, gpcode_manager.get_limit_up_price(code)) _start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - _start_time, "计算m值大单") threshold_money, msg = cls.__get_threshmoney(code) # 买入纯买额统计 compute_index, buy_nums, buy_count, rebegin_buy_pos, max_num_set_new = cls.__sum_buy_num_for_order_3(code, max(buy_single_index,compute_start_index),compute_end_index,num,count,threshold_money,buy_single_index,max_num_set) _start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - _start_time, "纯买额统计时间") cls.debug(code, "m值-{} m值因子-{}", threshold_money, msg) # 买入信号位与计算位置间隔2s及以上了 if rebegin_buy_pos is not None: # 需要重新计算纯买额 cls.__start_compute_buy(code, rebegin_buy_pos, compute_end_index, threshold_money, capture_time, False) return if compute_index is not None: cls.debug(code, "获取到买入执行位置:{} m值:{} 纯买手数:{} 纯买单数:{} 数据:{}", compute_index, threshold_money, buy_nums, buy_count, total_datas[compute_index]) # 记录买入信号位置 cls.__save_order_begin_data(code, buy_single_index, compute_index, compute_index, buy_nums, buy_count, max_num_set_new) # 如果是今天第一次有下单执行信号,涨停时间(买入执行位时间) limit_up_time_manager.save_limit_up_time(code, total_datas[compute_index]["val"]["time"]) # 虚拟下单 cls.__virtual_buy(code, buy_single_index, compute_index, capture_time) # 删除之前的所有撤单信号 l2_data_manager.TradePointManager.delete_buy_cancel_point(code) # 涨停封单额计算 L2LimitUpMoneyStatisticUtil.process_data(code, buy_single_index, compute_index, buy_single_index, buy_exec_index, False) _start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - _start_time, "记录执行买入数据") # 数据是否处理完毕 if compute_index >= compute_end_index: need_cancel, cancel_data = SecondAverageBigNumComputer.need_cancel(code, buy_single_index, compute_index, buy_single_index, compute_index, True) # 分钟级大单计算 # need_cancel, cancel_data = AverageBigNumComputer.need_cancel(code, buy_single_index, compute_index, # buy_single_index, compute_index, True) cls.debug(code, "数据处理完毕,下单, 数据截图时间-{}", capture_time) # 数据已经处理完毕,如果还没撤单就实际下单 if need_cancel: if cls.cancel_buy(code, "S级大单撤销"): # 执行撤单成功 pass else: cls.__buy(code, capture_time, total_datas[compute_index], compute_index) else: # AverageBigNumComputer.need_cancel(code, buy_single_index, compute_index, # buy_single_index, compute_index, False) SecondAverageBigNumComputer.need_cancel(code, buy_single_index, compute_index, buy_single_index, compute_index, False) # 数据尚未处理完毕,进行下一步处理 cls.debug(code, "数据尚未处理完毕,进行下一步处理,处理进度:{}", compute_index) # 处理撤单步骤 cls.__process_order(code, compute_index + 1, compute_end_index, capture_time, False) else: # 未达到下单条件,保存纯买额,设置纯买额 # 记录买入信号位置 cls.__save_order_begin_data(code, buy_single_index, -1, compute_end_index, buy_nums, buy_count, max_num_set_new) print("保存大单时间", round((t.time() - _start_time) * 1000)) _start_time = t.time() pass # 获取下单起始信号 @classmethod def __get_order_begin_pos(cls, code): buy_single_index, buy_exec_index, compute_index, num, count, max_num_set = l2_data_manager.TradePointManager.get_buy_compute_start_data( code) return buy_single_index, buy_exec_index, compute_index, num, count, max_num_set # 保存下单起始信号 @classmethod def __save_order_begin_data(self, code, buy_single_index, buy_exec_index, compute_index, num, count, max_num_set): TradePointManager.set_buy_compute_start_data(code, buy_single_index, buy_exec_index, compute_index, num, count, max_num_set) # 计算下单起始信号 # compute_data_count 用于计算的l2数据数量 @classmethod def __compute_order_begin_pos(cls, code, start_index, continue_count, end_index): second_930 = 9 * 3600 + 30 * 60 + 0 # 倒数100条数据查询 datas = local_today_datas[code] if end_index - start_index + 1 < continue_count: return False, None __time = None last_index = None count = 0 start = None for i in range(start_index, end_index + 1): _val = datas[i]["val"] # 时间要>=09:30:00 if L2DataUtil.get_time_as_second(_val["time"]) < second_930: continue if L2DataUtil.is_limit_up_price_buy(_val): if last_index is None or (datas[last_index]["val"]["time"] == datas[i]["val"]["time"]): if start is None: start = i last_index = i count += datas[i]["re"] if count >= continue_count: return True, start else: # 本条数据作为起点 last_index = i count = datas[i]["re"] start = i elif not L2DataUtil.is_sell(_val) and not L2DataUtil.is_sell_cancel(_val): # 剔除卖与卖撤 last_index = None count = 0 start = None return False, None @classmethod def __get_threshmoney(cls, code): return l2_trade_factor.L2TradeFactorUtil.compute_m_value(code) # 是否为万手哥 @classmethod def __is_big_money(cls, limit_up_price, val): if int(val["num"]) >= constant.BIG_MONEY_NUM: return True if int(val["num"]) * limit_up_price >= constant.BIG_MONEY_AMOUNT: return True return False # 计算万手哥笔数 @classmethod def __compute_big_money_count(cls, total_datas, start_index, end_index): count = 0 for i in range(start_index, end_index + 1): if L2DataUtil.is_limit_up_price_buy(total_datas[i]["val"]): count += total_datas[i]["re"] elif L2DataUtil.is_limit_up_price_buy_cancel(total_datas[i]["val"]): count -= total_datas[i]["re"] return count # 统计买入净买量,不计算在买入信号之前的买撤单 @classmethod def __sum_buy_num_for_order_3(cls, code, compute_start_index, compute_end_index, origin_num, origin_count, threshold_money, buy_single_index, max_num_set): def get_threshold_count(): count = threshold_count - sub_threshold_count if count < 3: count = 3 count = round(count * buy1_factor) # 最高30笔,最低8笔 if count > 21: count = 21 if count < 8: count = 8 return count _start_time = t.time() total_datas = local_today_datas[code] # 计算从买入信号开始到计算开始位置的大单数量 sub_threshold_count = cls.__compute_big_money_count(total_datas, buy_single_index, compute_start_index - 1) if sub_threshold_count < 0: sub_threshold_count = 0 buy_nums = origin_num buy_count = origin_count limit_up_price = gpcode_manager.get_limit_up_price(code) buy1_price = cls.buy1PriceManager.get_price(code) if limit_up_price is None: raise Exception("涨停价无法获取") # 目标手数 threshold_num = threshold_money / (limit_up_price * 100) buy1_factor = 1 # 获取买1是否为涨停价 if buy1_price is None: buy1_factor = 1.3 elif limit_up_price is None: buy1_factor = 1.3 elif abs(float(buy1_price) - float(limit_up_price)) >= 0.01: print("买1价不为涨停价,买1价-{} 涨停价-{}".format(buy1_price, limit_up_price)) buy1_factor = 1.3 # 目标订单数量 threshold_count = l2_trade_factor.L2TradeFactorUtil.get_safe_buy_count(code) buy_single_time_seconds = L2DataUtil.get_time_as_second(total_datas[buy_single_index]["val"]["time"]) # 可以触发买,当有涨停买信号时才会触发买 trigger_buy = True place_order_count = trade_data_manager.placeordercountmanager.get_place_order_count(code) if place_order_count > 3: place_order_count = 3 # 间隔最大时间依次为:3,9,27,81 max_space_time = pow(3, place_order_count + 1) - 1 # 最大买量 max_buy_num = 0 max_buy_num_set = set(max_num_set) for i in range(compute_start_index, compute_end_index + 1): data = total_datas[i] _val = total_datas[i]["val"] trigger_buy = False # 必须为连续3秒内的数据 if L2DataUtil.get_time_as_second(_val["time"]) - buy_single_time_seconds > max_space_time: TradePointManager.delete_buy_point(code) if i == compute_end_index: # 数据处理完毕 return None, buy_nums, buy_count, None, max_buy_num_set else: # 计算买入信号,不能同一时间开始计算 for ii in range(buy_single_index + 1, compute_end_index + 1): if total_datas[buy_single_index]["val"]["time"] != total_datas[ii]["val"]["time"]: return None, buy_nums, buy_count, ii, max_buy_num_set # 涨停买 if L2DataUtil.is_limit_up_price_buy(_val): if cls.__is_big_money(limit_up_price, _val): sub_threshold_count += int(total_datas[i]["re"]) max_buy_num_set.add(i) if round(int(_val["num"]) * float(_val["price"])) >= 5900: trigger_buy = True # 只统计59万以上的金额 buy_nums += int(_val["num"]) * int(total_datas[i]["re"]) buy_count += int(total_datas[i]["re"]) if buy_nums >= threshold_num and buy_count >= get_threshold_count(): logger_l2_trade_buy.info("{}获取到买入执行点:{} 统计纯买手数:{} 目标纯买手数:{} 统计纯买单数:{} 目标纯买单数:{}, 大单数量:{}", code, i, buy_nums, threshold_num, buy_count, get_threshold_count(), sub_threshold_count) elif L2DataUtil.is_limit_up_price_buy_cancel(_val): if cls.__is_big_money(limit_up_price, _val): sub_threshold_count -= int(total_datas[i]["re"]) if round(int(_val["num"]) * float(_val["price"])) >= 5900: # 只统计59万以上的金额 # 涨停买撤 # 判断买入位置是否在买入信号之前 buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(total_datas[i], local_today_num_operate_map.get( code)) if buy_index is not None: # 找到买撤数据的买入点 if buy_index >= buy_single_index: buy_nums -= int(_val["num"]) * int(data["re"]) buy_count -= int(data["re"]) cls.buy_debug(code, "{}数据在买入信号之后 撤买纯买手数:{} 目标手数:{}", i, buy_nums, threshold_num) else: cls.buy_debug(code, "{}数据在买入信号之前,买入位:{}", i, buy_index) if total_datas[buy_single_index]["val"]["time"] == buy_data["val"]["time"]: # 同一秒,当作买入信号之后处理 buy_nums -= int(_val["num"]) * int(data["re"]) buy_count -= int(data["re"]) cls.buy_debug(code, "{}数据买入位与预估买入位在同一秒", i) else: # 未找到买撤数据的买入点 cls.buy_debug(code, "未找到买撤数据的买入点: 位置-{} 数据-{}", i, data) buy_nums -= int(_val["num"]) * int(total_datas[i]["re"]) buy_count -= int(total_datas[i]["re"]) cls.buy_debug(code, "位置-{},总手数:{},目标手数:{}", i, buy_nums, threshold_num) # 有撤单信号,且小于阈值 if buy_nums >= threshold_num and buy_count >= get_threshold_count() and trigger_buy and len(max_buy_num_set)>1: return i, buy_nums, buy_count, None, max_buy_num_set cls.buy_debug(code, "尚未获取到买入执行点,起始计算位置:{} 统计纯买手数:{} 目标纯买手数:{} 统计纯买单数:{} 目标纯买单数:{} 大单数量:{}", compute_start_index, buy_nums, threshold_num, buy_count, get_threshold_count(), sub_threshold_count) return None, buy_nums, buy_count, None, max_buy_num_set @classmethod def test(cls): code = "002556" l2_trade_test.clear_trade_data(code) load_l2_data(code, True) _start = t.time() if True: state = trade_manager.get_trade_state(code) cls.random_key[code] = random.randint(0, 100000) capture_timestamp = 1999988888 try: if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER: # 已挂单 cls.__process_order(code, 1552, 1641, capture_timestamp) else: # 未挂单 cls.__process_not_order(code, 1552, 1641, capture_timestamp) except Exception as e: logging.exception(e) print("处理时间", round((t.time() - _start) * 1000)) return # 按s批量化数据 total_datas = local_today_datas[code] start_time = total_datas[0]["val"]["time"] start_index = 0 for i in range(0, len(total_datas)): if total_datas[i]["val"]["time"] != start_time: cls.random_key[code] = random.randint(0, 100000) # 处理数据 start = start_index # if start != 201: # continue end = i - 1 print("处理进度:{},{}".format(start, end)) capture_timestamp = 1999999999 state = trade_manager.get_trade_state(code) try: if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER: # 已挂单 cls.__process_order(code, start, end, capture_timestamp) else: # 未挂单 cls.__process_not_order(code, start, end, capture_timestamp) except Exception as e: logging.exception(e) # t.sleep(1) start_index = i start_time = total_datas[i]["val"]["time"] print("时间花费:", round((t.time() - _start) * 1000)) @classmethod def test1(cls): code = "002556" l2_trade_test.clear_trade_data(code) l2_data_manager.local_latest_datas[code] = [] load_l2_data(code, True) _start = t.time() capture_timestamp = 1999999999 cls.process(code, l2_data_manager.local_today_datas[code][1552:1641], capture_timestamp) print("时间花费:", round((t.time() - _start) * 1000)) pass @classmethod def test2(cls): code = "002864" load_l2_data(code) limit_up_time_manager.load_limit_up_time() limit_up_time = limit_up_time_manager.get_limit_up_time(code) if limit_up_time is not None and l2_data_manager.L2DataUtil.get_time_as_second( limit_up_time) >= l2_data_manager.L2DataUtil.get_time_as_second( "14:30:00"): return False, "14:30后涨停的不能买,涨停时间为{}".format(limit_up_time) # 同一板块中老二后面的不能买 industry, codes = ths_industry_util.get_same_industry_codes(code, gpcode_manager.get_gp_list()) if industry is None: return True, "没有获取到行业" codes_index = industry_codes_sort.sort_codes(codes, code) if codes_index is not None and codes_index.get(code) is not None and codes_index.get(code) > 1: return False, "同一板块中老三,老四,...不能买" if cls.__codeActualPriceProcessor.is_under_water(code): # 水下捞且板块中的票小于21不能买 if global_util.industry_hot_num.get(industry) is not None and global_util.industry_hot_num.get( industry) <= 16: return False, "水下捞,板块中的票小于2只,为{}".format(global_util.industry_hot_num.get(industry)) if codes_index.get(code) != 0: return False, "水下捞,不是老大,是老{}".format(codes_index.get(code)) # 13:30后涨停,本板块中涨停票数<29不能买 limit_up_time = limit_up_time_manager.get_limit_up_time(code) if limit_up_time is not None: if int(limit_up_time.replace(":", "")) >= 133000 and global_util.industry_hot_num.get(industry) is not None: if global_util.industry_hot_num.get(industry) < 16: return False, "13:30后涨停,本板块中涨停票数<16不能买" if codes_index.get(code) is not None and codes_index.get(code) == 1: # ----此条注释----- # 如果老大已经买成功了,老二就不需要买了 # first_codes = [] # for key in codes_index: # if codes_index.get(key) == 0: # first_codes.append(key) # # for key in first_codes: # state = trade_manager.get_trade_state(key) # if state == trade_manager.TRADE_STATE_BUY_SUCCESS: # # 老大已经买成功了 # return False, "老大{}已经买成功,老二无需购买".format(key) # ----此条注释----- # ----此条注释----- # 有9点半涨停的老大才能买老二,不然不能买 # 获取老大的涨停时间 # for key in first_codes: # # 找到了老大 # time_ = limit_up_time_manager.get_limit_up_time(key) # if time_ == "09:30:00": # return True, "9:30涨停的老大,老二可以下单" # return False, "老大非9:30涨停,老二不能下单" # ----此条注释----- return True, "老二可以下单" @classmethod def test3(cls): code = "002094" load_l2_data(code, True) cls.random_key[code] = random.randint(0, 100000) buy_single_begin_index, buy_exec_index = 426, 479 L2LimitUpMoneyStatisticUtil.process_data(code, 480, 519, buy_single_begin_index, buy_exec_index, False) L2LimitUpMoneyStatisticUtil.process_data(code, 480, 519, buy_single_begin_index, buy_exec_index, False) @classmethod def test_can_buy(cls): code = "002923" load_l2_data(code, True) limit_up_time_manager.load_limit_up_time() can, msg = cls.__can_buy(code) print(can, msg) # 涨停封单额统计 class L2LimitUpMoneyStatisticUtil: _redisManager = redis_manager.RedisManager(1) _thsBuy1VolumnManager = trade_queue_manager.THSBuy1VolumnManager() @classmethod def __get_redis(cls): return cls._redisManager.getRedis() # 设置l2的每一秒涨停封单额数据 @classmethod def __set_l2_second_money_record(cls, code, time, num, from_index, to_index): old_num, old_from, old_to = cls.__get_l2_second_money_record(code, time) if old_num is None: old_num = num old_from = from_index old_to = to_index else: old_num += num old_to = to_index key = "l2_limit_up_second_money-{}-{}".format(code, time.replace(":", "")) cls.__get_redis().setex(key, tool.get_expire(), json.dumps((old_num, old_from, old_to))) @classmethod def __get_l2_second_money_record(cls, code, time): key = "l2_limit_up_second_money-{}-{}".format(code, time.replace(":", "")) val = cls.__get_redis().get(key) return cls.__format_second_money_record_val(val) @classmethod def __format_second_money_record_val(cls, val): if val is None: return None, None, None val = json.loads(val) return val[0], val[1], val[2] @classmethod def __get_l2_second_money_record_keys(cls, code, time_regex): key = "l2_limit_up_second_money-{}-{}".format(code, time_regex) keys = cls.__get_redis().keys(key) return keys # 设置l2最新的封单额数据 @classmethod def __set_l2_latest_money_record(cls, code, index, num): key = "l2_limit_up_money-{}".format(code) cls.__get_redis().setex(key, tool.get_expire(), json.dumps((num, index))) # 返回数量,索引 @classmethod def __get_l2_latest_money_record(cls, code): key = "l2_limit_up_money-{}".format(code) result = cls.__get_redis().get(key) if result: result = json.loads(result) return result[0], result[1] else: return 0, -1 # 矫正数据 # 矫正方法为取矫正时间两侧的秒分布数据,用于确定计算结束坐标 @classmethod def verify_num(cls, code, num, time_str): # 记录买1矫正日志 logger_buy_1_volumn.info("涨停封单量矫正:代码-{} 量-{} 时间-{}", code, num, time_str) time_ = time_str.replace(":", "") key = None for i in range(4, -2, -2): # 获取本(分钟/小时/天)内秒分布数据 time_regex = "{}*".format(time_[:i]) keys_ = cls.__get_l2_second_money_record_keys(code, time_regex) if keys_ and len(keys_) > 1: # 需要排序 keys = [] for k in keys_: keys.append(k) keys.sort(key=lambda tup: int(tup.split("-")[-1])) # 有2个元素 for index in range(0, len(keys) - 1): time_1 = keys[index].split("-")[-1] time_2 = keys[index + 1].split("-")[-1] if int(time_1) <= int(time_) <= int(time_2): # 在此时间范围内 if time_ == time_2: key = keys[index + 1] else: key = keys[index] break if key: val = cls.__get_redis().get(key) old_num, old_from, old_to = cls.__format_second_money_record_val(val) end_index = old_to # 保存最近的数据 cls.__set_l2_latest_money_record(code, end_index, num) logger_buy_1_volumn.info("涨停封单量矫正结果:代码-{} 位置-{} 量-{}", code, end_index, num) break # 计算量,用于涨停封单量的计算 @classmethod def __compute_num(cls, code, data, buy_single_data): if L2DataUtil.is_limit_up_price_buy_cancel(data["val"]) or L2DataUtil.is_sell(data["val"]): # 涨停买撤与卖 return 0 - int(data["val"]["num"]) * data["re"] else: # 卖撤 if L2DataUtil.is_sell_cancel(data["val"]): # 卖撤的买数据是否在买入信号之前,如果在之前就不计算,不在之前就计算 if l2_data_util.is_sell_index_before_target(data, buy_single_data, local_today_num_operate_map.get(code)): return 0 return int(data["val"]["num"]) * data["re"] @classmethod def clear(cls, code): key = "l2_limit_up_money-{}".format(code) cls.__get_redis().delete(key) # 返回取消的标志数据 # with_cancel 是否需要判断是否撤销 @classmethod def process_data(cls, code, start_index, end_index, buy_single_begin_index, buy_exec_index, with_cancel=True): if buy_single_begin_index is None or buy_exec_index is None: return None, None start_time = round(t.time() * 1000) total_datas = local_today_datas[code] time_dict_num = {} # 记录计算的坐标 time_dict_num_index = {} # 坐标-量的map num_dict = {} # 统计时间分布 time_dict = {} for i in range(start_index, end_index + 1): data = total_datas[i] val = data["val"] time_ = val["time"] if time_ not in time_dict: time_dict[time_] = i for i in range(start_index, end_index + 1): data = total_datas[i] val = data["val"] time_ = val["time"] if time_ not in time_dict_num: time_dict_num[time_] = 0 time_dict_num_index[time_] = {"s": i, "e": i} time_dict_num_index[time_]["e"] = i num = cls.__compute_num(code, data, total_datas[buy_single_begin_index]) num_dict[i] = num time_dict_num[time_] = time_dict_num[time_] + num for t_ in time_dict_num: cls.__set_l2_second_money_record(code, t_, time_dict_num[t_], time_dict_num_index[t_]["s"], time_dict_num_index[t_]["e"]) print("保存涨停封单额时间:", round(t.time() * 1000) - start_time) # 累计最新的金额 total_num, index = cls.__get_l2_latest_money_record(code) record_msg = f"同花顺买1信息 {total_num},{index}" if index == -1: # 没有获取到最新的矫正封单额,需要从买入信号开始点计算 index = buy_single_begin_index - 1 total_num = 0 cancel_index = None cancel_msg = None # 待计算量 limit_up_price = gpcode_manager.get_limit_up_price(code) min_volumn = round(10000000 / (limit_up_price * 100)) min_volumn_big = min_volumn * 5 # 不同时间的数据开始坐标 time_start_index_dict = {} # 数据时间分布 time_list = [] # 到当前时间累积的买1量 time_total_num_dict = {} # 大单撤销笔数 cancel_big_num_count = 0 buy_exec_time = tool.get_time_as_second(total_datas[buy_exec_index]["val"]["time"]) # 获取最大封单额 max_buy1_volume = cls._thsBuy1VolumnManager.get_max_buy1_volume(code) # 从同花顺买1矫正过后的位置开始计算,到end_index结束 for i in range(index + 1, end_index + 1): data = total_datas[i] # 统计撤销数量 try: if big_money_num_manager.is_big_num(data["val"]): if L2DataUtil.is_limit_up_price_buy_cancel(data["val"]): cancel_big_num_count += int(data["re"]) # TODO 大量重复的工作需要处理,可以暂存在内存中,从而减少计算 # 获取是否在买入执行信号周围2s buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data, local_today_num_operate_map.get( code)) if buy_index is not None and buy_data is not None: # 相差1s buy_time = buy_data["val"]["time"] if abs(buy_exec_time - tool.get_time_as_second(buy_time)) < 2: cancel_big_num_count += int(data["re"]) elif L2DataUtil.is_limit_up_price_buy(data["val"]): cancel_big_num_count -= int(data["re"]) except Exception as e: logging.exception(e) threshold_rate = 0.5 if cancel_big_num_count >= 0: if cancel_big_num_count < 10: threshold_rate = threshold_rate - cancel_big_num_count * 0.01 else: threshold_rate = threshold_rate - 10 * 0.01 time_ = data["val"]["time"] if time_ not in time_start_index_dict: # 记录每一秒的开始位置 time_start_index_dict[time_] = i # 记录时间分布 time_list.append(time_) # 上一段时间的总数 time_total_num_dict[time_] = total_num exec_time_offset = tool.trade_time_sub(data["val"]["time"], total_datas[buy_exec_index]["val"]["time"]) val = num_dict.get(i) if val is None: val = cls.__compute_num(code, data, total_datas[buy_single_begin_index]) total_num += val # 在处理数据的范围内,就需要判断是否要撤单了 if start_index <= i <= end_index: # 如果是减小项 if val < 0: # 当前量小于最大量的24%则需要取消 if exec_time_offset >= 30: if total_num <= min_volumn_big and max_buy1_volume * 0.24 > total_num: cancel_index = i cancel_msg = "封板额小于最高封板额的24% {}/{}".format(total_num, max_buy1_volume) break # 累计封单金额小于1000万 if total_num < min_volumn: # 与执行位相隔>=5s时规则生效 if exec_time_offset >= 5: cancel_index = i cancel_msg = "封单金额小于1000万,为{}".format(total_num) break # 相邻2s内的数据减小50% # 上1s的总数 last_second_total_volumn = time_total_num_dict.get(time_list[-1]) if last_second_total_volumn > 0 and ( last_second_total_volumn - total_num) / last_second_total_volumn >= threshold_rate: # 与执行位相隔>=5s时规则生效 if exec_time_offset >= 5: # 相邻2s内的数据减小50% cancel_index = i cancel_msg = "相邻2s({})内的封单量减小50%({}->{})".format(time_, last_second_total_volumn, total_num) break # 记录中有上2个数据 if len(time_list) >= 2: # 倒数第2个数据 last_2_second_total_volumn = time_total_num_dict.get(time_list[-2]) if last_2_second_total_volumn > 0: if last_2_second_total_volumn > last_second_total_volumn > total_num: dif = last_2_second_total_volumn - total_num if dif / last_2_second_total_volumn >= threshold_rate: # 与执行位相隔>=5s时规则生效 if exec_time_offset >= 5: cancel_index = i cancel_msg = "相邻3s({})内的封单量(第3秒 与 第1的 减小比例)减小50%({}->{}->{})".format(time_, last_2_second_total_volumn, last_second_total_volumn, total_num) break # ------大单撤处理------- # if total_num < min_volumn_big: if exec_time_offset < 31: pass # try: # b_need_cancel, b_cancel_index = AverageBigNumComputer.need_cancel(code, i, i) # if b_need_cancel: # cancel_index = b_cancel_index # cancel_msg = "1分钟内大单撤销比例触发阈值" # break # except Exception as e: # logging.exception(e) # 30s外才执行 elif 31 <= exec_time_offset: try: b_need_cancel, b_cancel_index = LongAverageBigNumComputer.need_cancel(code, buy_exec_index, i, i) if b_need_cancel: cancel_index = b_cancel_index cancel_msg = "30s后内大单撤销比例触发阈值" break except Exception as e: logging.exception(e) # ------大单撤处理结束------- if not with_cancel: cancel_index = None print("封单额计算时间:", round(t.time() * 1000) - start_time) process_end_index = end_index if cancel_index: process_end_index = cancel_index # 保存最新累计金额 # cls.__set_l2_latest_money_record(code, process_end_index, total_num) l2_data_log.l2_time(code, round(t.time() * 1000) - start_time, "l2数据封单额计算时间", False) if cancel_index: L2TradeDataProcessor.cancel_debug(code, "数据处理位置:{}-{},{},最终买1为:{}", start_index, end_index, record_msg, total_num) return total_datas[cancel_index], cancel_msg return None, None # 涨停卖统计 class L2LimitUpSellStatisticUtil: _redisManager = redis_manager.RedisManager(0) @classmethod def __get_redis(cls): return cls._redisManager.getRedis() # 新增卖数据 @classmethod def __incre_sell_data(cls, code, num): key = "limit_up_sell_num-{}".format(code) cls.__get_redis().incrby(key, num) @classmethod def __get_sell_data(cls, code): key = "limit_up_sell_num-{}".format(code) val = cls.__get_redis().get(key) if val is None: return 0 return int(val) @classmethod def __save_process_index(cls, code, index): key = "limit_up_sell_index-{}".format(code) cls.__get_redis().setex(key, tool.get_expire(), index) @classmethod def __get_process_index(cls, code): key = "limit_up_sell_index-{}".format(code) val = cls.__get_redis().get(key) if val is None: return -1 return int(val) # 清除数据,当取消成功与买入之前需要清除数据 @classmethod def delete(cls, code): key = "limit_up_sell_num-{}".format(code) cls.__get_redis().delete(key) key = "limit_up_sell_index-{}".format(code) cls.__get_redis().delete(key) @classmethod def clear(cls): keys = cls.__get_redis().keys("limit_up_sell_num-*") for k in keys: cls.__get_redis().delete(k) # 处理数据,返回是否需要撤单 # 处理范围:买入执行位-当前最新位置 @classmethod def process(cls, code, start_index, end_index, buy_exec_index): # 获取涨停卖的阈值 limit_up_price = gpcode_manager.get_limit_up_price(code) zyltgb = l2_trade_factor.L2TradeFactorUtil.get_zyltgb(code) # 大于自由流通市值的4.8% threshold_num = int(zyltgb * 0.048) // (limit_up_price * 100) total_num = cls.__get_sell_data(code) cancel_index = None process_index = cls.__get_process_index(code) total_datas = local_today_datas.get(code) for i in range(start_index, end_index + 1): if i < buy_exec_index: continue if i <= process_index: continue if L2DataUtil.is_limit_up_price_sell(total_datas[i]["val"]) or L2DataUtil.is_sell(total_datas[i]["val"]): num = int(total_datas[i]["val"]["num"]) cls.__incre_sell_data(code, num) total_num += num if total_num > threshold_num: cancel_index = i break if cancel_index is not None: process_index = cancel_index else: process_index = end_index L2TradeDataProcessor.cancel_debug(code, "板上卖信息:计算位置:{}-{} 板上卖数据{}/{}", start_index, end_index, total_num, threshold_num) cls.__save_process_index(code, process_index) if cancel_index is not None: return total_datas[cancel_index], "板上卖的手数{} 超过{}".format(total_num, threshold_num) return None, "" @classmethod def test(cls): code = "003005" load_l2_data(code) L2TradeDataProcessor.random_key[code] = 123123 cls.process(code, 126, 171, 126) # s级平均大单计算 # 计算范围到申报时间的那一秒 class SecondAverageBigNumComputer: __redis_manager = redis_manager.RedisManager(0) __place_order_time_dict = {} @classmethod def __getRedis(cls): return cls.__redis_manager.getRedis() @classmethod def __save_average_data(cls, code, average_num, average_up_count, start_index, end_index): key = "s_average_big_num-{}".format(code) cls.__getRedis().setex(key, 2000, json.dumps((average_num, average_up_count, start_index, end_index))) L2TradeDataProcessor.cancel_debug(code, "保存秒级大单位置信息:平均手数-{} 大单数量-{} 计算开始范围-{}:{}".format(average_num, average_up_count, start_index, end_index)) @classmethod def __get_average_data(cls, code): key = "s_average_big_num-{}".format(code) val = cls.__getRedis().get(key) if val is None: return None, None, None, None val = json.loads(val) return val[0], val[1], val[2], val[3] # 保存买撤数据 @classmethod def __save_cancel_data(cls, code, cancel_index): key = "s_average_big_num_comput_info-{}".format(code) cls.__getRedis().sadd(key, cancel_index) # 获取买撤的数据 @classmethod def __get_cancel_datas(cls, code): key = "s_average_big_num_comput_info-{}".format(code) val = cls.__getRedis().smembers(key) return val # 保存买撤数据 @classmethod def __save_apply_time(cls, code, time_str): key = "s_average_big_num_apply_time-{}".format(code) cls.__getRedis().setex(key, tool.get_expire(), time_str) # 获取买撤的数据 @classmethod def __get_apply_time(cls, code): key = "s_average_big_num_apply_time-{}".format(code) val = cls.__getRedis().get(key) return val # 保存结束位置 @classmethod def __save_end_index(cls, code, end_index): key = "s_average_big_num_end_index_set-{}".format(code) cls.__getRedis().sadd(key, end_index) @classmethod def __list_end_indexs(cls, code): key = "s_average_big_num_end_index_set-{}".format(code) vals = cls.__getRedis().smembers(key) if vals is None: return None results = [] for val in vals: results.append(int(val)) results.sort() return results @classmethod def __clear_data(cls, code): ks = ["s_average_big_num_comput_info-{}".format(code), "s_average_big_num-{}".format(code), "s_average_big_num_end_index_set-{}".format(code)] for key in ks: cls.__getRedis().delete(key) @classmethod def clear_data(cls): ks = ["s_average_big_num_comput_info-*", "s_average_big_num-*", "s_average_big_num_end_index_set-*"] for key in ks: keys = cls.__getRedis().keys(key) for k in keys: cls.__getRedis().delete(k) # 计算平均手数 # 计算范围:买入信号起始点到买入执行位的下一张图结束点数据为止 @classmethod def compute_average_big_num(cls, code, buy_single_index, start_index, end_index): cls.__save_end_index(code, end_index) # 保存结束位置 end_indexs = cls.__list_end_indexs(code) print("compute_average_big_num", code, buy_single_index, start_index, end_index) L2TradeDataProcessor.cancel_debug(code, "开始计算短大单位置") total_data = local_today_datas[code] num = 0 count = 0 apply_time = cls.get_apply_time(code) apply_time_second = int(apply_time.replace(":", "")) for ei in end_indexs: if int(total_data[ei]["val"]["time"].replace(":", "")) >= apply_time_second: end_index = ei break for i in range(start_index, end_index + 1): data = total_data[i] val = data["val"] # if int(val["time"].replace(":", "")) > apply_time_second: # # 重新设置计算结束位置 # end_index = i - 1 # break if L2DataUtil.is_limit_up_price_buy(val): # and float(val["price"]) * int(val["num"]) > 7500: # 75万以上的才参与计算平均大单 count += data["re"] num += int(val["num"]) # 如果没有找到75万以上的单就不添加75w的筛选条件 if count == 0: for i in range(start_index, end_index + 1): data = total_data[i] val = data["val"] if L2DataUtil.is_limit_up_price_buy(val): if int(val["time"].replace(":", "")) > apply_time_second: break # 75万以上的才参与计算平均大单 count += data["re"] num += int(val["num"]) average_num = num // count average_num = min(constant.BIG_MONEY_NUM, round(constant.BIG_MONEY_AMOUNT / gpcode_manager.get_limit_up_price(code))) average_up_count = 0 for i in range(start_index, end_index + 1): data = total_data[i] val = data["val"] if L2DataUtil.is_limit_up_price_buy(val): if int(val["time"].replace(":", "")) > apply_time_second: break if int(val["num"]) >= average_num: average_up_count += data["re"] print("平均手数:", average_num, "大单总数:", average_up_count) # 保存数据 cls.__save_average_data(code, average_num, average_up_count, start_index, end_index) # 是否需要撤单 @classmethod def need_cancel(cls, code, buy_single_index, buy_exec_index, start_index, end_index, need_cancel=True): average_num, average_up_count, a_start_index, a_end_index = cls.__get_average_data(code) L2TradeDataProcessor.cancel_debug(code, "s级是否需要撤单,数据范围:{}-{} 平均大单信息-({},{},{},{})", start_index, end_index, average_num, average_up_count, a_start_index, a_end_index) if average_num is None: return False, None total_data = local_today_datas[code] # 只守护30s if tool.trade_time_sub(total_data[start_index]["val"]["time"], total_data[buy_exec_index]["val"]["time"]) > 30: return False, None # 如果start_index与buy_single_index相同,即是下单后的第一次计算 # 需要查询买入信号之前的同1s是否有涨停撤的数据 if buy_single_index == start_index: for i in range(buy_single_index - 1, 0, -1): data = total_data[i] val = data["val"] if val["time"] != total_data[buy_single_index]["val"]["time"]: break if L2DataUtil.is_limit_up_price_buy_cancel(val) and int(val["cancelTime"]) == 0: # 涨停买撤销且撤销的间隔时间为0 # 查询买入信号,如果无法查询到或者是买入位置比买入信号小就不算 buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data, local_today_num_operate_map.get( code)) if buy_index is not None and a_start_index <= buy_index <= a_end_index: # 在买入信号之后 cls.__save_cancel_data(code, i) for i in range(start_index, end_index + 1): data = total_data[i] val = data["val"] # print("处理进度", i) if L2DataUtil.is_limit_up_price_buy_cancel(val): # 查询买入位置 buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data, local_today_num_operate_map.get( code)) if buy_index is not None and a_start_index <= buy_index <= a_end_index: cls.__save_cancel_data(code, i) else: # 有部分撤销从而导致的无法溯源,这时就需要判断预估买入时间是否在a_start_index到a_end_index的时间区间 min_space, max_space = l2_data_util.compute_time_space_as_second(val["cancelTime"], val["cancelTimeUnit"]) # 只判断S级撤销,只有s级撤销才有可能相等 if max_space - min_space <= 1: buy_time = tool.trade_time_add_second(val["time"], 0 - min_space) if int(total_data[a_start_index]["val"]["time"].replace(":", "")) <= int( buy_time.replace(":", "")) <= int( total_data[a_end_index]["val"]["time"].replace(":", "")): cls.__save_cancel_data(code, i) if need_cancel: # 计算买撤大单暂比 cancel_datas = cls.__get_cancel_datas(code) if cancel_datas is not None and len(cancel_datas) > 0: L2TradeDataProcessor.cancel_debug(code, "s级大单 取消数量:{}", len(cancel_datas)) cancel_rate_threshold = 0.49 place_order_count = trade_data_manager.placeordercountmanager.get_place_order_count(code) if place_order_count <= 1: cancel_rate_threshold = 0.49 elif place_order_count <= 2: cancel_rate_threshold = 0.59 else: cancel_rate_threshold = 0.69 cancel_indexs = [] for index in cancel_datas: cancel_indexs.append(int(index)) cancel_indexs.sort() # print("取消的数据", cancel_indexs) cancel_count = 0 for index in cancel_indexs: data = total_data[index] if int(data["val"]["num"]) >= average_num: cancel_count += data["re"] if cancel_count / average_up_count > cancel_rate_threshold: return True, total_data[index] return False, None # 是否需要计算 @classmethod def is_need_compute_average(cls, code, latest_data): total_datas = local_today_datas[code] data = cls.__place_order_time_dict.get(code) if data is None: return False, None, None elif tool.trade_time_sub(latest_data["val"]["time"], cls.get_apply_time(code)) < 5: # 有5s时间上传申报时间 return True, data[1], data[2] else: cls.__place_order_time_dict.pop(code) return False, None, None # 设置申报时间 @classmethod def set_apply_time(cls, code, time_str, force=False): old_time_str = cls.get_apply_time(code) if not force: if old_time_str is not None: sub_time = tool.trade_time_sub(time_str, old_time_str) if sub_time <= 0 or sub_time > 4: # 申报时间与下单时间不能操过4s return cls.__save_apply_time(code, time_str) @classmethod def get_apply_time(cls, code): return cls.__get_apply_time(code) # 下单成功 @classmethod def place_order_success(cls, code, buy_single_index, buy_exec_index): cls.__clear_data(code) cls.__place_order_time_dict[code] = (t.time(), buy_single_index, buy_exec_index) # 以防万一,先保存下单信息 total_data = local_today_datas[code] cls.set_apply_time(code, total_data[buy_exec_index]["val"]["time"], True) cls.compute_average_big_num(code, buy_single_index, buy_single_index, total_data[-1]["index"]) @classmethod def __test(cls, datas): code = datas[0] load_l2_data(code) L2TradeDataProcessor.random_key[code] = 123123 # 先执行下单 buy_single_index = datas[1] buy_exec_index = datas[2] local_today_datas[code] = local_today_datas[code][0:datas[4]] cls.place_order_success(code, buy_single_index, buy_exec_index) # 执行是否需要计算average cls.compute_average_big_num(code, buy_single_index, buy_single_index, datas[3]) cancel, cancel_data = cls.need_cancel(code, buy_single_index, buy_exec_index, buy_single_index, buy_exec_index, False) for i in range(buy_exec_index + 1, datas[4]): cancel, cancel_data = cls.need_cancel(code, buy_single_index, buy_exec_index, i, i) if cancel: print("需要撤单", cancel, cancel_data["index"]) break @classmethod def test(cls): # cls.__test(("000909", 607, 646, 646, 694)) # 代码 买入信号起始点 买入信息执行位置 计算末位 最远计算位置 # cls.__test(("002793", 292, 308, 314, 410)) cls.__save_end_index("000333", 200) cls.__save_end_index("000333", 101) cls.__save_end_index("000333", 99) cls.__save_end_index("000333", 120) cls.__save_end_index("000333", 126) cls.__save_end_index("000333", 126) print(cls.__list_end_indexs("000333")) # 执行是否需要撤销 # 平均大单计算 class AverageBigNumComputer: __redis_manager = redis_manager.RedisManager(0) __place_order_time_dict = {} @classmethod def __getRedis(cls): return cls.__redis_manager.getRedis() @classmethod def __save_average_data(cls, code, average_num, average_up_count, start_index, end_index): key = "average_big_num-{}".format(code) cls.__getRedis().setex(key, 2000, json.dumps((average_num, average_up_count, start_index, end_index))) L2TradeDataProcessor.cancel_debug(code, "保存短大单位置信息:平均手数-{} 大单数量-{} 计算开始范围-{}:{}".format(average_num, average_up_count, start_index, end_index)) @classmethod def __get_average_data(cls, code): key = "average_big_num-{}".format(code) val = cls.__getRedis().get(key) if val is None: return None, None, None, None val = json.loads(val) return val[0], val[1], val[2], val[3] # 保存买撤数据 @classmethod def __save_cancel_data(cls, code, cancel_index): key = "average_big_num_comput_info-{}".format(code) cls.__getRedis().sadd(key, cancel_index) # 获取买撤的数据 @classmethod def __get_cancel_datas(cls, code): key = "average_big_num_comput_info-{}".format(code) val = cls.__getRedis().smembers(key) return val @classmethod def __clear_data(cls, code): key = "average_big_num_comput_info-{}".format(code) cls.__getRedis().delete(key) key = "average_big_num-{}".format(code) cls.__getRedis().delete(key) @classmethod def clear_data(cls): key = "average_big_num_comput_info-*" keys = cls.__getRedis().keys(key) for k in keys: cls.__getRedis().delete(k) key = "average_big_num-*" keys = cls.__getRedis().keys(key) for k in keys: cls.__getRedis().delete(k) # 计算平均手数 # 计算范围:买入信号起始点到买入执行位的下一张图结束点数据为止 @classmethod def compute_average_big_num(cls, code, buy_single_index, start_index, end_index): print("compute_average_big_num", code, buy_single_index, start_index, end_index) L2TradeDataProcessor.cancel_debug(code, "开始计算短大单位置") total_data = local_today_datas[code] num = 0 count = 0 for i in range(start_index, end_index + 1): data = total_data[i] val = data["val"] if L2DataUtil.is_limit_up_price_buy(val) and float(val["price"]) * int(val["num"]) >= 5000: # 75万以上的才参与计算平均大单 count += data["re"] num += int(val["num"]) # 如果没有找到75万以上的单就不添加75w的筛选条件 if count == 0: for i in range(start_index, end_index + 1): data = total_data[i] val = data["val"] if L2DataUtil.is_limit_up_price_buy(val): # 75万以上的才参与计算平均大单 count += data["re"] num += int(val["num"]) average_num = num // count # average_num = 0 average_num = min(constant.BIG_MONEY_NUM, round(constant.BIG_MONEY_AMOUNT / gpcode_manager.get_limit_up_price(code))) average_up_count = 0 for i in range(start_index, end_index + 1): data = total_data[i] val = data["val"] if L2DataUtil.is_limit_up_price_buy(val): if int(val["num"]) >= average_num: average_up_count += data["re"] print("平均手数:", average_num, "大单总数:", average_up_count) # 保存数据 cls.__save_average_data(code, average_num, average_up_count, start_index, end_index) # 是否需要撤单 @classmethod def need_cancel(cls, code, buy_single_index, buy_exec_index, start_index, end_index, need_cancel=True): # 暂时取消此撤单条件 return False, None average_num, average_up_count, a_start_index, a_end_index = cls.__get_average_data(code) if average_num is None: return False, None total_data = local_today_datas[code] # 如果start_index与buy_single_index相同,即是下单后的第一次计算 # 需要查询买入信号之前的同1s是否有涨停撤的数据 if buy_single_index == start_index: for i in range(buy_single_index - 1, 0, -1): data = total_data[i] val = data["val"] if val["time"] != total_data[buy_single_index]["val"]["time"]: break if L2DataUtil.is_limit_up_price_buy_cancel(val) and int(val["cancelTime"]) == 0: # 涨停买撤销且撤销的间隔时间为0 # 查询买入信号,如果无法查询到或者是买入位置比买入信号小就不算 buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data, local_today_num_operate_map.get( code)) if buy_index is not None and a_start_index <= buy_index <= a_end_index: # 在买入信号之后 cls.__save_cancel_data(code, i) for i in range(start_index, end_index + 1): data = total_data[i] val = data["val"] # print("处理进度", i) if L2DataUtil.is_limit_up_price_buy_cancel(val): # 查询买入位置 buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data, local_today_num_operate_map.get( code)) if buy_index is not None and a_start_index <= buy_index <= a_end_index: cls.__save_cancel_data(code, i) if need_cancel: # 计算买撤大单暂比 cancel_datas = cls.__get_cancel_datas(code) if cancel_datas is not None and len(cancel_datas) > 0: cancel_rate_threshold = 0.49 place_order_count = trade_data_manager.placeordercountmanager.get_place_order_count(code) if place_order_count <= 1: cancel_rate_threshold = 0.49 elif place_order_count <= 2: cancel_rate_threshold = 0.549 else: cancel_rate_threshold = 0.59 cancel_indexs = [] for index in cancel_datas: cancel_indexs.append(int(index)) cancel_indexs.sort() # print("取消的数据", cancel_indexs) cancel_count = 0 for index in cancel_indexs: data = total_data[index] if int(data["val"]["num"]) >= average_num: cancel_count += data["re"] if cancel_count / average_up_count > cancel_rate_threshold: return True, total_data[index] return False, None # 是否需要计算 @classmethod def is_need_compute_average(cls, code, latest_data): total_datas = local_today_datas[code] data = cls.__place_order_time_dict.get(code) if data is None: return False, None, None elif tool.trade_time_sub(latest_data["val"]["time"], total_datas[data[2]]["val"]["time"]) < 3: # 3s内的数据才需要计算average return True, data[1], data[2] else: cls.__place_order_time_dict.pop(code) return False, None, None # 下单成功 @classmethod def place_order_success(cls, code, buy_single_index, buy_exec_index): cls.__clear_data(code) cls.__place_order_time_dict[code] = (t.time(), buy_single_index, buy_exec_index) # 以防万一,先保存下单信息 total_data = local_today_datas[code] cls.compute_average_big_num(code, buy_single_index, buy_single_index, total_data[-1]["index"]) @classmethod def __test(cls, datas): code = datas[0] load_l2_data(code) L2TradeDataProcessor.random_key[code] = 123123 # 先执行下单 buy_single_index = datas[1] buy_exec_index = datas[2] local_today_datas[code] = local_today_datas[code][0:datas[4]] cls.place_order_success(code, buy_single_index, buy_exec_index) # 执行是否需要计算average cls.compute_average_big_num(code, buy_single_index, buy_single_index, datas[3]) for i in range(buy_single_index, datas[4]): cancel, cancel_data = cls.need_cancel(code, i, i) if cancel: print("需要撤单", cancel, cancel_data["index"]) break @classmethod def test(cls): cls.__test(("000716", 410, 420, 461, 536)) # 代码 买入信号起始点 买入信息执行位置 计算末位 最远计算位置 # cls.__test(("002793", 292, 308, 314, 410)) # 执行是否需要撤销 # 平均大单计算 class LongAverageBigNumComputer: __redis_manager = redis_manager.RedisManager(0) @classmethod def __getRedis(cls): return cls.__redis_manager.getRedis() @classmethod def __save_average_data(cls, code, average_num, average_up_count, total_count, start_index, end_index): L2TradeDataProcessor.cancel_debug(code, "获取到长大单位置信息:平均手数-{} 大单数量-{} 样本数量-{} 计算开始范围-{}:{}".format(average_num, average_up_count, total_count, start_index, end_index)) key = "l_average_big_num-{}".format(code) cls.__getRedis().setex(key, tool.get_expire(), json.dumps((average_num, average_up_count, total_count, start_index, end_index))) @classmethod def __get_average_data(cls, code): key = "l_average_big_num-{}".format(code) val = cls.__getRedis().get(key) if val is None: return None, None, None, None, None val = json.loads(val) return val[0], val[1], val[2], val[3], val[4] @classmethod def __save_compute_info(cls, code, cancel_count, process_index): key = "l_average_big_num_comput_info-{}".format(code) cls.__getRedis().setex(key, tool.get_expire(), json.dumps((cancel_count, process_index))) @classmethod def __get_compute_info(cls, code): key = "l_average_big_num_comput_info-{}".format(code) val = cls.__getRedis().get(key) if val is None: return None, None val = json.loads(val) return val[0], val[1] @classmethod def __clear_data(cls, code): key = "l_average_big_num_comput_info-{}".format(code) cls.__getRedis().delete(key) key = "l_average_big_num-{}".format(code) cls.__getRedis().delete(key) # 计算平均手数 # 计算范围:买入信号起始点到买入执行位的下一张图结束点数据为止 @classmethod def compute_average_big_num(cls, code, buy_single_index, buy_exec_index): total_data = local_today_datas[code] latest_index = total_data[-1]["index"] end_index = total_data[-1]["index"] if end_index >= 434: print("测试") start_index = buy_exec_index if tool.trade_time_sub(total_data[end_index]["val"]["time"], total_data[buy_exec_index]["val"]["time"]) < 3: return exec_time = total_data[buy_exec_index]["val"]["time"] o_average_num, o_average_up_count, o_count, o_start_index, o_start_index = cls.__get_average_data(code) if o_average_num is not None and o_count >= constant.H_CANCEL_BUY_COUNT: return # 获取买入执行位后2s的数据末位 for i in range(end_index, buy_exec_index, - 1): time_ = total_data[i]["val"]["time"] if tool.trade_time_sub(time_, exec_time) <= 2: end_index = i break num = 0 count = 0 for i in range(start_index, end_index + 1): data = total_data[i] val = data["val"] if L2DataUtil.is_limit_up_price_buy(val) and int(val["num"]) * float(val["price"]) >= 5900: count += data["re"] num += int(val["num"]) * data["re"] # 如果小于30笔,需要再往后计算 if count < constant.H_CANCEL_BUY_COUNT: for i in range(end_index + 1, latest_index + 1, 1): data = total_data[i] val = data["val"] if L2DataUtil.is_limit_up_price_buy(val) and int(val["num"]) * float(val["price"]) >= 5900: count += data["re"] num += int(val["num"]) * data["re"] if count >= constant.H_CANCEL_BUY_COUNT: end_index = i break # 获取大单数量 average_up_count = 0 average_num = round(num / count) for i in range(start_index, end_index + 1): data = total_data[i] val = data["val"] if int(val["num"]) >= average_num: average_up_count += data["re"] # 保存数据 cls.__save_average_data(code, average_num, average_up_count, count, start_index, end_index) cls.__save_compute_info(code, 0, buy_exec_index) # 是否需要撤单 @classmethod def need_cancel(cls, code, buy_exec_index, start_index, end_index): average_num, average_up_count, total_count, a_start_index, a_end_index = cls.__get_average_data(code) if average_num is None: return False, None cancel_count, process_index = cls.__get_compute_info(code) total_data = local_today_datas[code] # 14:30过后不再守护 if int(total_data[end_index]["val"]["time"].replace(":", "")) > int("143000"): return False, None try: for i in range(min(start_index, buy_exec_index + 1), end_index + 1): if i <= buy_exec_index: continue if process_index >= i: continue data = total_data[i] val = data["val"] if L2DataUtil.is_limit_up_price_buy_cancel(val) and int(val["num"]) >= average_num: # 查询买入位置 buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data, local_today_num_operate_map.get( code)) if buy_index is not None and a_start_index <= buy_index <= a_end_index: # 买入位置要在平均值计算范围内 cancel_count += data["re"] process_index = i sj = 0 # 5 * tool.trade_time_sub(val["time"],total_data[buy_exec_index]["val"]["time"]) print("h平均大单计算结果:", "取消数量", cancel_count, "大单总数", average_up_count, sj) if cancel_count / (average_up_count - sj) >= 0.75: return True, i finally: cls.__save_compute_info(code, cancel_count, process_index) return False, None # 下单成功 @classmethod def place_order_success(cls, code, buy_single_index, buy_exec_index): cls.__clear_data(code) @classmethod def __test(cls, datas): code = datas[0] load_l2_data(code) L2TradeDataProcessor.random_key[code] = random.randint(0, 100000) # 先执行下单 buy_single_index = datas[1] buy_exec_index = datas[2] cls.__clear_data(code) cls.place_order_success(code, buy_single_index, buy_exec_index) # 执行是否需要计算average cls.compute_average_big_num(code, buy_single_index, buy_exec_index) for i in range(buy_exec_index + 1, datas[4]): cancel, index = cls.need_cancel(code, buy_exec_index, i, i) if cancel: print("需要撤单", cancel, index) break @classmethod def test(cls): # 代码 买入信号起始点 买入信息执行位置 计算末位 最远计算位置 cls.__test(("002793", 292, 308, 332, 410)) # 执行是否需要撤销 if __name__ == "__main__": # trade_manager.start_cancel_buy("000637") # t.sleep(10) # AverageBigNumComputer.test() # LongAverageBigNumComputer.test() # L2TradeDataProcessor.test() SecondAverageBigNumComputer.test() # load_l2_data("600213") # # buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(local_today_datas["600213"][84], # local_today_num_operate_map.get( # "600213")) # print(buy_index, buy_data)