import datetime import logging import random import time as t import big_money_num_manager import data_process import global_util import gpcode_manager import l2_data_log import l2_data_manager import l2_data_util import l2_trade_factor import l2_trade_test import limit_up_time_manager import log import redis_manager import ths_industry_util import tool import trade_manager from l2_data_manager import L2DataException, TradePointManager, local_today_datas, L2DataUtil, load_l2_data, \ local_today_num_operate_map, L2LimitUpMoneyStatisticUtil from log import logger_l2_trade, logger_l2_trade_cancel, logger_l2_trade_buy, logger_l2_process # TODO l2数据管理 class L2DataManager: # 格式化数据 def format_data(self, datas): format_datas = [] for data in datas: format_datas.append({"val": data, "re": 1}) return format_datas # 获取新增数据 def get_add_datas(self, format_datas): pass # 从数据库加载数据 def load_data(self, code=None, force=False): pass # 保存数据 def save_datas(self, add_datas, datas): pass # m值大单处理 class L2BigNumForMProcessor: def __init__(self): self._redis_manager = redis_manager.RedisManager(1) def __get_redis(self): return self._redis_manager.getRedis() # 保存计算开始位置 def set_begin_pos(self, code, index): if self.__get_begin_pos(code) is None: # 保存位置 key = "m_big_money_begin-{}".format(code) self.__get_redis().setex(key, tool.get_expire(), index) # 获取计算开始位置 def __get_begin_pos(self, code): key = "m_big_money_begin-{}".format(code) val = self.__get_redis().get(key) if val is None: return None return int(val) # 清除已经处理的数据 def clear_processed_end_index(self, code): key = "m_big_money_process_index-{}".format(code) self.__get_redis().delete(key) # 添加已经处理过的单 def __set_processed_end_index(self, code, index): key = "m_big_money_process_index-{}".format(code) self.__get_redis().setex(key, tool.get_expire(), index) # 是否已经处理过 def __get_processed_end_index(self, code): key = "m_big_money_process_index-{}".format(code) val = self.__get_redis().get(key) if val is None: return None return int(val) # 处理大单 def process(self, code, start_index, end_index, limit_up_price): begin_pos = self.__get_begin_pos(code) if begin_pos is None: # 没有获取到开始买入信号 return # 上次处理到的坐标 processed_index = self.__get_processed_end_index(code) if processed_index is None: processed_index = 0 if processed_index >= end_index: return start_time = round(t.time() * 1000) total_datas = local_today_datas[code] num_splites = [round(5000 / limit_up_price), round(10000 / limit_up_price), round(20000 / limit_up_price), round(30000 / limit_up_price)] total_num = 0 for i in range(max(start_index, processed_index), end_index + 1): data = total_datas[i] if not L2DataUtil.is_limit_up_price_buy_cancel(data["val"]) and not L2DataUtil.is_limit_up_price_buy( data["val"]): continue # 如果是涨停买撤信号需要看数据位置是否比开始处理时间早 if L2DataUtil.is_limit_up_price_buy_cancel(data["val"]): # 获取买入信号 buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(total_datas[i], local_today_num_operate_map.get(code)) if buy_index is not None and buy_index < begin_pos: continue # 计算成交金额 num = int(data["val"]["num"]) temp = 0 if num < num_splites[0]: pass elif num < num_splites[1]: temp = 1 elif num < num_splites[2]: temp = round(4 / 3, 3) elif num < num_splites[3]: temp = 2 else: temp = 4 count = int(temp * data["re"] * 1000) if L2DataUtil.is_limit_up_price_buy_cancel(data["val"]): count = 0 - count total_num += count self.__set_processed_end_index(code, end_index) big_money_num_manager.add_num(code, total_num) print("m值大单计算范围:{}-{} 时间:{}".format(max(start_index, processed_index), end_index, round(t.time() * 1000) - start_time)) class L2TradeDataProcessor: unreal_buy_dict = {} random_key = {} l2BigNumForMProcessor = L2BigNumForMProcessor() @classmethod def debug(cls, code, content, *args): logger_l2_trade.debug(("thread-id={} code={} ".format(cls.random_key[code], code) + content).format(*args)) @classmethod def cancel_debug(cls, code, content, *args): logger_l2_trade_cancel.debug( ("thread-id={} code={} ".format(cls.random_key[code], code) + content).format(*args)) @classmethod def buy_debug(cls, code, content, *args): logger_l2_trade_buy.debug( ("thread-id={} code={} ".format(cls.random_key[code], code) + content).format(*args)) @classmethod # 数据处理入口 # datas: 本次截图数据 # capture_timestamp:截图时间戳 def process(cls, code, datas, capture_timestamp): cls.random_key[code] = random.randint(0, 100000) now_time_str = datetime.datetime.now().strftime("%H:%M:%S") __start_time = round(t.time() * 1000) try: if len(datas) > 0: # 判断价格区间是否正确 if not data_process.is_same_code_with_price(code, float(datas[0]["val"]["price"])): raise L2DataException(L2DataException.CODE_PRICE_ERROR, "股价不匹配 code-{} price-{}".format(code, datas[0]["val"]["price"])) # 加载历史数据 l2_data_manager.load_l2_data(code) # 纠正数据 datas = l2_data_manager.L2DataUtil.correct_data(code, datas) _start_index = 0 if local_today_datas.get(code) is not None and len( local_today_datas[code]) > 0: _start_index = local_today_datas[code][-1]["index"] + 1 add_datas = l2_data_manager.L2DataUtil.get_add_data(code, datas, _start_index) if len(add_datas) > 0: # 拼接数据 local_today_datas[code].extend(add_datas) l2_data_util.load_num_operate_map(l2_data_manager.local_today_num_operate_map, code, add_datas) total_datas = local_today_datas[code] __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time, "l2数据预处理时间") if len(add_datas) > 0: _start_time = round(t.time() * 1000) latest_time = add_datas[len(add_datas) - 1]["val"]["time"] # 时间差不能太大才能处理 # TODO 暂时关闭处理 if l2_data_manager.L2DataUtil.is_same_time(now_time_str, latest_time): # 判断是否已经挂单 state = trade_manager.get_trade_state(code) start_index = len(total_datas) - len(add_datas) end_index = len(total_datas) - 1 if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER: # 已挂单 cls.__process_order(code, start_index, end_index, capture_timestamp) else: # 未挂单 cls.__process_not_order(code, start_index, end_index, capture_timestamp) logger_l2_process.info("code:{} 处理数据范围: {}-{} 处理时间:{}", code, add_datas[0]["index"], add_datas[-1]["index"], round(t.time() * 1000) - __start_time) __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time, "l2数据处理时间") # 保存数据 l2_data_manager.save_l2_data(code, datas, add_datas) __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time, "保存数据时间") finally: if code in cls.unreal_buy_dict: cls.unreal_buy_dict.pop(code) # 处理未挂单 @classmethod def __process_not_order(cls, code, start_index, end_index, capture_time): # 获取阈值 threshold_money = cls.__get_threshmoney(code) cls.__start_compute_buy(code, start_index, end_index, threshold_money, capture_time) @classmethod def __statistic_count_l2_data_for_cancel(cls, code, start_index, end_index, has_cancel_single=False): index, old_buy_count, old_cancel_count = l2_data_manager.TradePointManager.get_count_info_for_cancel_buy(code) for i in range(start_index, end_index + 1): buy_count, buy_cancel_count = cls.__count_l2_data_for_cancel(code, i, i) old_buy_count += buy_count old_cancel_count += buy_cancel_count if old_buy_count > 0 and (old_buy_count - old_cancel_count) / old_buy_count < 0.3 and has_cancel_single: return i, True l2_data_manager.TradePointManager.set_count_info_for_cancel_buy(code, end_index, old_buy_count, old_cancel_count) return end_index, False # 处理已挂单 @classmethod def __process_order(cls, code, start_index, end_index, capture_time, new_add=True): if start_index < 0: start_index = 0 if end_index < start_index: return # 获取买入信号起始点 buy_single_index, buy_exec_index, buy_compute_index, num, count = cls.__get_order_begin_pos(code) # 撤单计算,只看买1 cancel_data, cancel_msg = L2LimitUpMoneyStatisticUtil.process_data(code, start_index, end_index, buy_single_index) # 计算m值大单 cls.l2BigNumForMProcessor.process(code, max(buy_single_index, start_index), end_index, gpcode_manager.get_limit_up_price(code)) if cancel_data: cls.debug(code, "触发撤单,撤单位置:{} ,撤单原因:{}", cancel_data["index"], cancel_msg) # 撤单 cls.cancel_buy(code) # 继续计算下单 cls.__process_not_order(code, cancel_data["index"] + 1, end_index, capture_time) else: # 如果有虚拟下单需要真实下单 unreal_buy_info = cls.unreal_buy_dict.get(code) if unreal_buy_info is not None: cls.debug(code, "有虚拟下单,无买撤信号,开始执行买入,执行位置:{},截图时间:{}", unreal_buy_info[0], capture_time) # unreal_buy_info 的内容格式为:(触法买操作下标,截图时间) # 真实下单 cls.__buy(code, unreal_buy_info[1], local_today_datas[code][unreal_buy_info[0]], unreal_buy_info[0]) @classmethod def __buy(cls, code, capture_timestamp, last_data, last_data_index): can, reason = cls.__can_buy(code) # 不能购买 if not can: cls.debug(code, "不可以下单,原因:{}", reason) return else: cls.debug(code, "可以下单,原因:{}", reason) # 删除虚拟下单 if code in cls.unreal_buy_dict: cls.unreal_buy_dict.pop(code) cls.debug(code, "开始执行买入") try: trade_manager.start_buy(code, capture_timestamp, last_data, last_data_index) l2_data_manager.TradePointManager.delete_buy_cancel_point(code) cls.debug(code, "执行买入成功") except Exception as e: cls.debug(code, "执行买入异常:{}", str(e)) pass finally: cls.debug(code, "m值影响因子:{}", l2_trade_factor.L2TradeFactorUtil.factors_to_string(code)) # 是否可以买 @classmethod def __can_buy(cls, code): limit_up_time = limit_up_time_manager.get_limit_up_time(code) if limit_up_time is not None and l2_data_manager.L2DataUtil.get_time_as_second( limit_up_time) >= l2_data_manager.L2DataUtil.get_time_as_second( "14:30:00"): return False, "14:30后涨停的不能买,涨停时间为{}".format(limit_up_time) # 同一板块中老二后面的不能买 industry, codes = ths_industry_util.get_same_industry_codes(code, gpcode_manager.get_gp_list()) if industry is None: return True, "没有获取到行业" codes_index = limit_up_time_manager.sort_code_by_limit_time(codes) if codes_index is not None and codes_index.get(code) is not None and codes_index.get(code) > 1: return False, "同一板块中老三,老四,...不能买" # 13:00后涨停,本板块中涨停票数<29不能买 limit_up_time = limit_up_time_manager.get_limit_up_time(code) if limit_up_time is not None: if int(limit_up_time.replace(":", "")) >= 130000 and global_util.industry_hot_num.get(industry) is not None: if global_util.industry_hot_num.get(industry) < 29: return False, "13:00后涨停,本板块中涨停票数<29不能买" # 老二,本板块中涨停票数<29 不能买 if codes_index.get(code) is not None and codes_index.get(code) == 1 and global_util.industry_hot_num.get( industry) is not None: if global_util.industry_hot_num.get(industry) < 29: return False, "老二,本板块中涨停票数<29不能买" # 可以下单 return True, None @classmethod def __cancel_buy(cls, code): try: cls.debug(code, "开始执行撤单") trade_manager.start_cancel_buy(code) # 取消买入标识 l2_data_manager.TradePointManager.delete_buy_point(code) l2_data_manager.TradePointManager.delete_buy_cancel_point(code) l2_data_manager.TradePointManager.delete_compute_info_for_cancel_buy(code) l2_data_manager.TradePointManager.delete_count_info_for_cancel_buy(code) # 删除大群撤事件的大单 l2_data_manager.L2BetchCancelBigNumProcessor.del_recod(code) cls.debug(code, "执行撤单成功") except Exception as e: cls.debug(code, "执行撤单异常:{}", str(e)) @classmethod def cancel_buy(cls, code): # 删除大群撤事件的大单 l2_data_manager.L2BetchCancelBigNumProcessor.del_recod(code) l2_data_manager.L2ContinueLimitUpCountManager.del_data(code) if code in cls.unreal_buy_dict: cls.unreal_buy_dict.pop(code) # 取消买入标识 l2_data_manager.TradePointManager.delete_buy_point(code) l2_data_manager.TradePointManager.delete_buy_cancel_point(code) l2_data_manager.TradePointManager.delete_compute_info_for_cancel_buy(code) l2_data_manager.TradePointManager.delete_count_info_for_cancel_buy(code) # 删除大群撤事件的大单 l2_data_manager.L2BetchCancelBigNumProcessor.del_recod(code) else: cls.__cancel_buy(code) l2_data_manager.L2BigNumProcessor.del_big_num_pos(code) @classmethod def __start_compute_buy(cls, code, compute_start_index, compute_end_index, threshold_money, capture_time, new_add=True): if compute_end_index < compute_start_index: return total_datas = local_today_datas[code] # 获取买入信号计算起始位置 buy_single_index, buy_exec_index, buy_compute_index, num, count = cls.__get_order_begin_pos(code) # 是否为新获取到的位置 if buy_single_index is None: # 有买入信号 has_single, _index = cls.__compute_order_begin_pos(code, max( compute_start_index - 2 if new_add else compute_start_index, 0), 3, compute_end_index) buy_single_index = _index if has_single: num = 0 count = 0 cls.debug(code, "获取到买入信号起始点:{} 数据:{}", buy_single_index, total_datas[buy_single_index]) # 如果是今天第一次有下单开始信号,需要设置大单起始点 cls.l2BigNumForMProcessor.set_begin_pos(code, buy_single_index) if buy_single_index is None: # 未获取到买入信号,终止程序 return None # 计算m值大单 cls.l2BigNumForMProcessor.process(code, max(buy_single_index, compute_start_index), compute_end_index, gpcode_manager.get_limit_up_price(code)) threshold_money = cls.__get_threshmoney(code) # 买入纯买额统计 compute_index, buy_nums, buy_count, rebegin_buy_pos = cls.__sum_buy_num_for_order_3(code, max(buy_single_index, compute_start_index), compute_end_index, num, count, threshold_money, buy_single_index, capture_time) # 买入信号位与计算位置间隔2s及以上了 if rebegin_buy_pos is not None: # 需要重新计算纯买额 cls.__start_compute_buy(code, rebegin_buy_pos, compute_end_index, threshold_money, capture_time, False) return if compute_index is not None: cls.debug(code, "获取到买入执行位置:{} m值:{} 纯买手数:{} 纯买单数:{} 数据:{}", compute_index, threshold_money, buy_nums, buy_count, total_datas[compute_index]) # 记录买入信号位置 cls.__save_order_begin_data(code, buy_single_index, compute_index, compute_index, buy_nums, buy_count) # 如果是今天第一次有下单执行信号,涨停时间(买入执行位时间) limit_up_time_manager.save_limit_up_time(code, total_datas[compute_index]["val"]["time"]) # 虚拟下单 cls.unreal_buy_dict[code] = (compute_index, capture_time) # 删除之前的所有撤单信号 l2_data_manager.TradePointManager.delete_buy_cancel_point(code) # 涨停封单额计算 L2LimitUpMoneyStatisticUtil.process_data(code, buy_single_index, compute_index, buy_single_index, False) # 数据是否处理完毕 if compute_index >= compute_end_index: cls.debug(code, "数据处理完毕,下单, 数据截图时间-{}", capture_time) # 数据已经处理完毕,如果还没撤单就实际下单 cls.__buy(code, capture_time, total_datas[compute_index], compute_index) else: # 数据尚未处理完毕,进行下一步处理 cls.debug(code, "数据尚未处理完毕,进行下一步处理,处理进度:{}", compute_index) # 处理撤单步骤 cls.__process_order(code, compute_index + 1, compute_end_index, capture_time, False) else: # 未达到下单条件,保存纯买额,设置纯买额 # 记录买入信号位置 cls.__save_order_begin_data(code, buy_single_index, -1, compute_end_index, buy_nums, buy_count) pass # 获取下单起始信号 @classmethod def __get_order_begin_pos(cls, code): buy_single_index, buy_exec_index, compute_index, num, count = l2_data_manager.TradePointManager.get_buy_compute_start_data( code) return buy_single_index, buy_exec_index, compute_index, num, count # 保存下单起始信号 @classmethod def __save_order_begin_data(self, code, buy_single_index, buy_exec_index, compute_index, num, count): TradePointManager.set_buy_compute_start_data(code, buy_single_index, buy_exec_index, compute_index, num, count) # 计算下单起始信号 # compute_data_count 用于计算的l2数据数量 @classmethod def __compute_order_begin_pos(cls, code, start_index, continue_count, end_index): second_930 = 9 * 3600 + 30 * 60 + 0 # 倒数100条数据查询 datas = local_today_datas[code] if end_index - start_index + 1 < continue_count: return False, None __time = None last_index = None count = 0 start = None for i in range(start_index, end_index + 1): _val = datas[i]["val"] # 时间要>=09:30:00 if L2DataUtil.get_time_as_second(_val["time"]) < second_930: continue if L2DataUtil.is_limit_up_price_buy(_val): if last_index is None or (datas[last_index]["val"]["time"] == datas[i]["val"]["time"]): if start is None: start = i last_index = i count += datas[i]["re"] if count >= continue_count: return True, start else: # 本条数据作为起点 last_index = i count = datas[i]["re"] start = i elif not L2DataUtil.is_sell(_val) and not L2DataUtil.is_sell_cancel(_val): # 剔除卖与卖撤 last_index = None count = 0 start = None return False, None @classmethod def __get_threshmoney(cls, code): return l2_trade_factor.L2TradeFactorUtil.compute_m_value(code) # 统计买入净买量,不计算在买入信号之前的买撤单 @classmethod def __sum_buy_num_for_order_3(cls, code, compute_start_index, compute_end_index, origin_num, origin_count, threshold_money, buy_single_index, capture_time): total_datas = local_today_datas[code] buy_nums = origin_num buy_count = origin_count limit_up_price = gpcode_manager.get_limit_up_price(code) if limit_up_price is None: raise Exception("涨停价无法获取") # 目标手数 threshold_num = threshold_money / (limit_up_price * 100) # 目标订单数量 threshold_count = l2_trade_factor.L2TradeFactorUtil.get_safe_buy_count(code) buy_single_time_seconds = L2DataUtil.get_time_as_second(total_datas[buy_single_index]["val"]["time"]) for i in range(compute_start_index, compute_end_index + 1): data = total_datas[i] _val = total_datas[i]["val"] if L2DataUtil.get_time_as_second(_val["time"]) - buy_single_time_seconds > 1: TradePointManager.delete_buy_point(code) if i == compute_end_index: # 数据处理完毕 return None, buy_nums, buy_count, None else: # 计算买入信号,不能同一时间开始计算 for ii in range(buy_single_index + 1, compute_end_index + 1): if total_datas[buy_single_index]["val"]["time"] != total_datas[ii]["val"]["time"]: return None, buy_nums, buy_count, ii # 涨停买 if L2DataUtil.is_limit_up_price_buy(_val): # 涨停买 buy_nums += int(_val["num"]) * int(total_datas[i]["re"]) buy_count += int(total_datas[i]["re"]) if buy_nums >= threshold_num and buy_count >= threshold_count: logger_l2_trade_buy.info("{}获取到买入执行点:{} 统计纯买手数:{} 目标纯买手数:{} 统计纯买单数:{} 目标纯买单数:{}", code, i, buy_nums, threshold_num, buy_count, threshold_count) elif L2DataUtil.is_limit_up_price_buy_cancel(_val): # 涨停买撤 # 判断买入位置是否在买入信号之前 buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(total_datas[i], local_today_num_operate_map.get(code)) if buy_index is not None: # 找到买撤数据的买入点 if buy_index >= buy_single_index: buy_nums -= int(_val["num"]) * int(data["re"]) buy_count -= int(data["re"]) cls.buy_debug(code, "{}数据在买入信号之后 撤买纯买手数:{} 目标手数:{}", i, buy_nums, threshold_num) else: cls.buy_debug(code, "{}数据在买入信号之前,买入位:{}", i, buy_index) if total_datas[buy_single_index]["val"]["time"] == buy_data["val"]["time"]: # 同一秒,当作买入信号之后处理 buy_nums -= int(_val["num"]) * int(data["re"]) buy_count -= int(data["re"]) cls.buy_debug(code, "{}数据买入位与预估买入位在同一秒", i) else: # 未找到买撤数据的买入点 cls.buy_debug(code, "未找到买撤数据的买入点: 位置-{} 数据-{}", i, data) buy_nums -= int(_val["num"]) * int(total_datas[i]["re"]) buy_count -= int(total_datas[i]["re"]) cls.buy_debug(code, "位置-{},总手数:{},目标手数:{}", i, buy_nums, threshold_num) # 有撤单信号,且小于阈值 if buy_nums >= threshold_num and buy_count >= threshold_count: return i, buy_nums, buy_count, None cls.buy_debug(code, "尚未获取到买入执行点,起始计算位置:{} 统计纯买手数:{} 目标纯买手数:{} 统计纯买单数:{} 目标纯买单数:{}", compute_start_index, buy_nums, threshold_num, buy_count, threshold_count) return None, buy_nums, buy_count, None @classmethod def test(cls): code = "002898" l2_trade_test.clear_trade_data(code) load_l2_data(code, True) if True: state = trade_manager.get_trade_state(code) cls.random_key[code] = random.randint(0, 100000) capture_timestamp = 1999988888 try: if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER: # 已挂单 cls.__process_order(code, 0, 140, capture_timestamp) else: # 未挂单 cls.__process_not_order(code, 0, 140, capture_timestamp) except Exception as e: logging.exception(e) return _start = t.time() # 按s批量化数据 total_datas = local_today_datas[code] start_time = total_datas[0]["val"]["time"] start_index = 0 for i in range(0, len(total_datas)): if total_datas[i]["val"]["time"] != start_time: cls.random_key[code] = random.randint(0, 100000) # 处理数据 start = start_index # if start != 201: # continue end = i - 1 print("处理进度:{},{}".format(start, end)) capture_timestamp = 1999999999 state = trade_manager.get_trade_state(code) try: if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER: # 已挂单 cls.__process_order(code, start, end, capture_timestamp) else: # 未挂单 cls.__process_not_order(code, start, end, capture_timestamp) except Exception as e: logging.exception(e) # t.sleep(1) start_index = i start_time = total_datas[i]["val"]["time"] print("时间花费:", round((t.time() - _start) * 1000)) if __name__ == "__main__": L2TradeDataProcessor.test()