""" L2的数据处理 """ import decimal import json import logging import random import time as t from datetime import datetime import big_money_num_manager import data_process import global_util import l2_data_util import gpcode_manager import l2_trade_factor import redis_manager import ths_industry_util import tool import trade_manager from log import logger_l2_trade, logger_l2_trade_cancel, logger_l2_trade_buy, logger_l2_process from trade_data_manager import TradeBuyDataManager import limit_up_time_manager _redisManager = redis_manager.RedisManager(1) # l2数据管理 # 本地最新一次上传的数据 local_latest_datas = {} # 本地今日数据 local_today_datas = {} # 本地手数+操作那类型组成的临时变量 # 用于加快数据处理,用空换时间 local_today_num_operate_map = {} class L2DataException(Exception): # 价格不匹配 CODE_PRICE_ERROR = 1 # 无收盘价 CODE_NO_CLOSE_PRICE = 2 def __init__(self, code, msg): super().__init__(self) self.code = code self.msg = msg def __str__(self): return self.msg def get_code(self): return self.code # 交易点管理器,用于管理买入点;买撤点;距离买入点的净买入数据;距离买撤点的买撤数据 class TradePointManager: @staticmethod def __get_redis(): return _redisManager.getRedis() # 删除买入点数据 @staticmethod def delete_buy_point(code): redis = TradePointManager.__get_redis() redis.delete("buy_compute_index_info-{}".format(code)) # 获取买入点信息 # 返回数据为:买入点 累计纯买额 已经计算的数据索引 @staticmethod def get_buy_compute_start_data(code): redis = TradePointManager.__get_redis() _key = "buy_compute_index_info-{}".format(code) _data_json = redis.get(_key) if _data_json is None: return None, None, None, 0, 0 _data = json.loads(_data_json) return _data[0], _data[1], _data[2], _data[3], _data[4] # 设置买入点的值 # buy_single_index 买入信号位 # buy_exec_index 买入执行位 # compute_index 计算位置 # nums 累计纯买额 @staticmethod def set_buy_compute_start_data(code, buy_single_index, buy_exec_index, compute_index, nums, count): redis = TradePointManager.__get_redis() expire = tool.get_expire() _key = "buy_compute_index_info-{}".format(code) if buy_single_index is not None: redis.setex(_key, expire, json.dumps((buy_single_index, buy_exec_index, compute_index, nums, count))) else: _buy_single_index, _buy_exec_index, _compute_index, _nums, _count = TradePointManager.get_buy_compute_start_data( code) redis.setex(_key, expire, json.dumps((_buy_single_index, buy_exec_index, compute_index, nums, count))) # 获取撤买入开始计算的信息 # 返回数据的内容为:撤销点索引 撤买纯买额 计算的数据索引 @staticmethod def get_buy_cancel_single_pos(code): redis = TradePointManager.__get_redis() info = redis.get("buy_cancel_single_pos-{}".format(code)) if info is None: return None else: return int(info) # 设置买撤点信息 # buy_num 纯买额 computed_index计算到的下标 index撤买信号起点 @classmethod def set_buy_cancel_single_pos(cls, code, index): redis = TradePointManager.__get_redis() expire = tool.get_expire() redis.setex("buy_cancel_single_pos-{}".format(code), expire, index) # 删除买撤点数据 @classmethod def delete_buy_cancel_point(cls, code): redis = TradePointManager.__get_redis() redis.delete("buy_cancel_single_pos-{}".format(code)) # 设置买撤纯买额 @classmethod def set_compute_info_for_cancel_buy(cls, code, index, nums): redis = TradePointManager.__get_redis() expire = tool.get_expire() redis.setex("compute_info_for_cancel_buy-{}".format(code), expire, json.dumps((index, nums))) logger_l2_trade_buy.info("{}保存撤单纯买额信息:{},{}", code, index, nums) # 获取买撤纯买额计算信息 @classmethod def get_compute_info_for_cancel_buy(cls, code): redis = TradePointManager.__get_redis() info = redis.get("compute_info_for_cancel_buy-{}".format(code)) if info is None: return None, 0 else: info = json.loads(info) return info[0], info[1] @classmethod def delete_compute_info_for_cancel_buy(cls, code): redis = TradePointManager.__get_redis() redis.delete("compute_info_for_cancel_buy-{}".format(code)) # 从买入信号开始设置涨停买与涨停撤的单数 @classmethod def set_count_info_for_cancel_buy(cls, code, index, buy_count, cancel_count): redis = TradePointManager.__get_redis() expire = tool.get_expire() redis.setex("count_info_for_cancel_buy-{}".format(code), expire, json.dumps((index, buy_count, cancel_count))) logger_l2_trade_buy.info("{}保存撤单纯买额信息:{},{}", code, index, buy_count, cancel_count) # 获取买撤纯买额计算信息 @classmethod def get_count_info_for_cancel_buy(cls, code): redis = TradePointManager.__get_redis() info = redis.get("count_info_for_cancel_buy-{}".format(code)) if info is None: return None, 0, 0 else: info = json.loads(info) return info[0], info[1], info[2] @classmethod def delete_count_info_for_cancel_buy(cls, code): redis = TradePointManager.__get_redis() redis.delete("count_info_for_cancel_buy-{}".format(code)) def load_l2_data(code, force=False): redis = _redisManager.getRedis() # 加载最近的l2数据 if local_latest_datas.get(code) is None or force: # 获取最近的数据 _data = redis.get("l2-data-latest-{}".format(code)) if _data is not None: if code in local_latest_datas: local_latest_datas[code] = json.loads(_data) else: local_latest_datas.setdefault(code, json.loads(_data)) # 获取今日的数据 if local_today_datas.get(code) is None or force: datas = [] keys = redis.keys("l2-{}-*".format(code)) for k in keys: value = redis.get(k) _data = l2_data_util.l2_data_key_2_obj(k, value) datas.append(_data) # 排序 new_datas = sorted(datas, key=lambda e: (int(e.__getitem__('val')["time"].replace(":", "")), e.__getitem__('index'))) local_today_datas[code] = new_datas # 根据今日数据加载 l2_data_util.load_num_operate_map(local_today_num_operate_map, code, local_today_datas.get(code), force) def saveL2Data(code, datas, msg=""): start_time = round(t.time() * 1000) # 查询票是否在待监听的票里面 if not gpcode_manager.is_in_gp_pool(code): return None # 验证股价的正确性 redis_instance = _redisManager.getRedis() try: if redis_instance.setnx("l2-save-{}".format(code), "1") > 0: # 计算保留的时间 expire = tool.get_expire() i = 0 for _data in datas: i += 1 key = "l2-" + _data["key"] value = redis_instance.get(key) if value is None: # 新增 try: value = {"index": _data["index"], "re": _data["re"]} redis_instance.setex(key, expire, json.dumps(value)) except: logging.error("更正L2数据出错:{} key:{}".format(code, key)) else: json_value = json.loads(value) if json_value["re"] != _data["re"]: json_value["re"] = _data["re"] redis_instance.setex(key, expire, json.dumps(json_value)) finally: redis_instance.delete("l2-save-{}".format(code)) print("保存新数据用时:", msg, "耗时:{}".format(round(t.time() * 1000) - start_time)) return datas def parseL2Data(str): day = datetime.now().strftime("%Y%m%d") dict = json.loads(str) data = dict["data"] client = dict["client"] code = data["code"] channel = data["channel"] capture_time = data["captureTime"] process_time = data["processTime"] data = data["data"] limit_up_price = gpcode_manager.get_limit_up_price(code) datas = L2DataUtil.format_l2_data(data, code, limit_up_price) # 获取涨停价 return day, client, channel, code, capture_time, process_time, datas # 保存l2数据 def save_l2_data(code, datas, add_datas): redis = _redisManager.getRedis() # 保存最近的数据 redis.setex("l2-data-latest-{}".format(code), tool.get_expire(), json.dumps(datas)) # 设置进内存 if code in local_latest_datas: local_latest_datas[code] = datas else: local_latest_datas.setdefault(code, datas) __set_l2_data_latest_count(code, len(datas)) if len(add_datas) > 0: saveL2Data(code, add_datas) # 清除l2数据 def clear_l2_data(code): redis_l2 = redis_manager.RedisManager(1).getRedis() keys = redis_l2.keys("l2-{}-*".format(code)) for k in keys: redis_l2.delete(k) redis_l2.delete("l2-data-latest-{}".format(code)) class L2DataUtil: @classmethod def is_same_time(cls, time1, time2): if global_util.TEST: return True time1_s = time1.split(":") time1_second = int(time1_s[0]) * 3600 + int(time1_s[1]) * 60 + int(time1_s[2]) time2_s = time2.split(":") time2_second = int(time2_s[0]) * 3600 + int(time2_s[1]) * 60 + int(time2_s[2]) if abs(time2_second - time1_second) < 3: return True else: return False # 获取增量数据 @classmethod def get_add_data(cls, code, datas, _start_index): if datas is not None and len(datas) < 1: return [] last_data = None latest_datas_ = local_latest_datas.get(code) if latest_datas_ is not None and len(latest_datas_) > 0: last_data = latest_datas_[-1] count = 0 start_index = -1 # 如果原来没有数据 # 设置add_data的序号 for n in reversed(datas): count += 1 if n["key"] == (last_data["key"] if last_data is not None else ""): start_index = len(datas) - count break _add_datas = [] if last_data is not None: if start_index < 0: if L2DataUtil.get_time_as_second(datas[0]["val"]["time"]) >= L2DataUtil.get_time_as_second( last_data["val"]["time"]): _add_datas = datas else: _add_datas = [] elif start_index + 1 >= len(datas): _add_datas = [] else: _add_datas = datas[start_index + 1:] else: _add_datas = datas[start_index + 1:] for i in range(0, len(_add_datas)): _add_datas[i]["index"] = _start_index + i return _add_datas # 纠正数据,将re字段替换为较大值 @classmethod def correct_data(cls, code, _datas): latest_data = local_latest_datas.get(code) if latest_data is None: latest_data = [] save_list = [] for data in _datas: for _ldata in latest_data: # 新数据条数比旧数据多才保存 if _ldata["key"] == data["key"] and _ldata["re"] < data["re"]: max_re = max(_ldata["re"], data["re"]) _ldata["re"] = max_re data["re"] = max_re # 保存到数据库,更新re的数据 save_list.append(_ldata) if len(save_list) > 0: saveL2Data(code, save_list, "保存纠正数据") local_latest_datas[code] = latest_data return _datas # 处理l2数据 @classmethod def format_l2_data(cls, data, code, limit_up_price): datas = [] dataIndexs = {} same_time_num = {} for item in data: # 解析数据 time = item["time"] if time in same_time_num: same_time_num[time] = same_time_num[time] + 1 else: same_time_num[time] = 1 price = float(item["price"]) num = item["num"] limitPrice = item["limitPrice"] # 涨停价 if limit_up_price is not None: if limit_up_price == tool.to_price(decimal.Decimal(price)): limitPrice = 1 else: limitPrice = 0 item["limitPrice"] = "{}".format(limitPrice) operateType = item["operateType"] # 不需要非涨停买与买撤 if int(item["limitPrice"]) != 1 and (int(operateType) == 0 or int(operateType) == 1): continue cancelTime = item["cancelTime"] cancelTimeUnit = item["cancelTimeUnit"] key = "{}-{}-{}-{}-{}-{}-{}-{}".format(code, operateType, time, num, price, limitPrice, cancelTime, cancelTimeUnit) if key in dataIndexs: # 数据重复次数+1 datas[dataIndexs[key]]['re'] = datas[dataIndexs[key]]['re'] + 1 else: # 数据重复次数默认为1 datas.append({"key": key, "val": item, "re": 1}) dataIndexs.setdefault(key, len(datas) - 1) l2_data_util.save_big_data(code, same_time_num, data) return datas @classmethod def get_time_as_second(cls, time_str): ts = time_str.split(":") return int(ts[0]) * 3600 + int(ts[1]) * 60 + int(ts[2]) # @classmethod # def get_time_as_str(cls, time_seconds): # ts = time_str.split(":") # return int(ts[0]) * 3600 + int(ts[1]) * 60 + int(ts[2]) # 是否是涨停价买 @classmethod def is_limit_up_price_buy(cls, val): if int(val["limitPrice"]) != 1: return False if int(val["operateType"]) != 0: return False price = float(val["price"]) num = int(val["num"]) if price * num * 100 < 50 * 10000: return False return True @classmethod def is_limit_up_price_sell(cls, val): if int(val["limitPrice"]) != 1: return False if int(val["operateType"]) != 2: return False price = float(val["price"]) num = int(val["num"]) if price * num * 100 < 50 * 10000: return False return True # 是否涨停买撤 @classmethod def is_limit_up_price_buy_cancel(cls, val): if int(val["limitPrice"]) != 1: return False if int(val["operateType"]) != 1: return False price = float(val["price"]) num = int(val["num"]) if price * num * 100 < 50 * 10000: return False return True # 是否卖撤 @classmethod def is_sell_cancel(cls, val): if int(val["operateType"]) == 3: return True return False # 是否为卖 @classmethod def is_sell(cls, val): if int(val["operateType"]) == 2: return True return False # L2交易数据处理器 # 一些常见的概念: # 买入信号位置(出现下单信号的第一条数据的位置):buy_single_index # 买入执行位置(符合下单信号的最后一条数据):buy_exec_index # 计算位置(当前计算的整个计算的位置):compute_index # class L2TradeDataProcessor: unreal_buy_dict = {} random_key = {} @classmethod def debug(cls, code, content, *args): logger_l2_trade.debug(("thread-id={} code={} ".format(cls.random_key[code], code) + content).format(*args)) @classmethod def cancel_debug(cls, code, content, *args): logger_l2_trade_cancel.debug( ("thread-id={} code={} ".format(cls.random_key[code], code) + content).format(*args)) @classmethod def buy_debug(cls, code, content, *args): logger_l2_trade_buy.debug( ("thread-id={} code={} ".format(cls.random_key[code], code) + content).format(*args)) @classmethod # 数据处理入口 # datas: 本次截图数据 # capture_timestamp:截图时间戳 def process(cls, code, datas, capture_timestamp): cls.random_key[code] = random.randint(0, 100000) now_time_str = datetime.now().strftime("%H:%M:%S") __start_time = round(t.time() * 1000) try: if len(datas) > 0: # 判断价格区间是否正确 if not data_process.is_same_code_with_price(code, float(datas[0]["val"]["price"])): raise L2DataException(L2DataException.CODE_PRICE_ERROR, "股价不匹配 code-{} price-{}".format(code, datas[0]["val"]["price"])) # 加载历史数据 load_l2_data(code) # 纠正数据 datas = L2DataUtil.correct_data(code, datas) _start_index = 0 if local_today_datas.get(code) is not None and len(local_today_datas[code]) > 0: _start_index = local_today_datas[code][-1]["index"] + 1 add_datas = L2DataUtil.get_add_data(code, datas, _start_index) if len(add_datas) > 0: # 拼接数据 local_today_datas[code].extend(add_datas) l2_data_util.load_num_operate_map(local_today_num_operate_map, code, add_datas) total_datas = local_today_datas[code] # 过时 买入确认点处理 # TradeBuyDataManager.process_buy_sure_position_info(code, capture_timestamp, total_datas, # total_datas[-1], # add_datas) if len(add_datas) > 0: _start_time = round(t.time() * 1000) latest_time = add_datas[len(add_datas) - 1]["val"]["time"] # 时间差不能太大才能处理 # TODO 暂时关闭处理 # if L2DataUtil.is_same_time(now_time_str, latest_time): # # 判断是否已经挂单 # state = trade_manager.get_trade_state(code) # start_index = len(total_datas) - len(add_datas) # end_index = len(total_datas) - 1 # if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER: # # 已挂单 # cls.__process_order(code, start_index, end_index, capture_timestamp) # else: # # 未挂单 # cls.__process_not_order(code, start_index, end_index, capture_timestamp) logger_l2_process.info("code:{} 处理数据范围: {}-{} 处理时间:{}", code, add_datas[0]["index"], add_datas[-1]["index"], round(t.time() * 1000) - __start_time) # 保存数据 save_l2_data(code, datas, add_datas) finally: if code in cls.unreal_buy_dict: cls.unreal_buy_dict.pop(code) @classmethod def __compute_big_money_data(cls, code, start_index, end_index): # 计算大单 total_datas = local_today_datas[code] num = 0 for index in range(start_index, end_index + 1): data = total_datas[index] if l2_trade_factor.L2TradeFactorSourceDataUtil.is_big_money(data): if int(data["val"]["operateType"]) == 0: num += data["re"] elif int(data["val"]["operateType"]) == 1: num -= data["re"] big_money_num_manager.add_num(code, num) # 处理未挂单 @classmethod def __process_not_order(cls, code, start_index, end_index, capture_time): # 获取阈值 threshold_money = cls.__get_threshmoney(code) cls.__start_compute_buy(code, start_index, end_index, threshold_money, capture_time) @classmethod def __statistic_count_l2_data_for_cancel(cls, code, start_index, end_index, has_cancel_single=False): index, old_buy_count, old_cancel_count = TradePointManager.get_count_info_for_cancel_buy(code) for i in range(start_index, end_index + 1): buy_count, buy_cancel_count = cls.__count_l2_data_for_cancel(code, i, i) old_buy_count += buy_count old_cancel_count += buy_cancel_count if old_buy_count > 0 and (old_buy_count - old_cancel_count) / old_buy_count < 0.3 and has_cancel_single: return i, True TradePointManager.set_count_info_for_cancel_buy(code, end_index, old_buy_count, old_cancel_count) return end_index, False # 处理已挂单 @classmethod def __process_order(cls, code, start_index, end_index, capture_time, new_add=True): if start_index < 0: start_index = 0 if end_index < start_index: return # 获取之前是否有记录的撤买信号 # cancel_index = TradePointManager.get_buy_cancel_single_pos(code) # cancel_computed_index, cancel_buy_num = TradePointManager.get_compute_info_for_cancel_buy(code) # if cancel_computed_index is None: # logger_l2_trade.error("{} 未获取到买撤纯买额,起始计算位:{}", code, start_index) # 统计群撤大单 L2BetchCancelBigNumProcessor.process_new(code, start_index, end_index) # 统计最大连续买单 L2ContinueLimitUpCountManager.process(code, start_index, end_index) # 计算大单撤销 need_cancel, cancel_data = L2BigNumProcessor.process_cancel_with_big_num(code, start_index, end_index) if need_cancel: # 已经撤单了 threshold_money = cls.__get_threshmoney(code) # 重新处理下单 cls.__start_compute_buy(code, cancel_data["index"] + 1, end_index, threshold_money, capture_time) return # buy_single_index, buy_exec_index, buy_compute_index, buy_num = cls.__get_order_begin_pos(code) # if cancel_index is None: # 无撤单信号起始点记录 continue_cancel = L2ContinueLimitUpCountManager.get_continue_count(code) order_cancel_begin_start = max(start_index - (continue_cancel - 1), 0) if new_add else start_index order_cancel_begin_end = end_index total_datas = local_today_datas[code] little_cancel = False # 大单撤单的数据不为空 if cancel_data is not None: # 小群撤事件 continue_cancel = 5 cancel_time_seconds = L2DataUtil.get_time_as_second(cancel_data["val"]["time"]) # 查找上一秒与下一秒 for i in range(int(cancel_data["index"]), 0, -1): # 查找上一秒和下一秒 if total_datas[i]["val"]["time"] != cancel_data["val"][ "time"] and cancel_time_seconds - L2DataUtil.get_time_as_second(total_datas[i]["val"]["time"]) > 1: order_cancel_begin_start = i + 1 break for i in range(int(cancel_data["index"]), len(local_today_datas[code])): # 查找上一秒和下一秒 if total_datas[i]["val"]["time"] != cancel_data["val"]["time"] and L2DataUtil.get_time_as_second( total_datas[i]["val"]["time"]) - cancel_time_seconds > 1: order_cancel_begin_end = i - 1 break cls.cancel_debug(code, "小群撤事件计算范围:{},{}", order_cancel_begin_start, order_cancel_begin_end) little_cancel = True cancel_start_index = None cancel_end_index = None need_cancel = False if little_cancel: # 小群撤事件 cancel_start_index, cancel_end_index = cls.__compute_order_cancel_little_begin_single(code, order_cancel_begin_start , continue_cancel, order_cancel_begin_end) if cancel_start_index is not None: cls.debug(code, "找到小群撤信号,撤单信号范围:{}-{}", cancel_start_index, cancel_end_index) # 有小群撤信号 need_cancel = True else: # 不满足小群撤,从小群撤后面一条数据继续处理 cls.__process_order(code, cancel_data["index"] + 1, end_index, capture_time, False) return else: # 大群撤事件 cancel_start_index, cancel_end_index = cls.__compute_order_cancel_begin_single( code, order_cancel_begin_start , continue_cancel, order_cancel_begin_end) if cancel_start_index is not None: cls.debug(code, "找到大群撤信号,连续笔数阈值:{}, 撤单信号范围:{}-{}", continue_cancel, cancel_start_index, cancel_end_index) # 判断是否有大群撤大单撤 need_cancel = L2BetchCancelBigNumProcessor.need_cancel(code, cancel_start_index, cancel_end_index) if need_cancel: cls.debug(code, "大群撤信号有大单撤销") else: cls.debug(code, "大群撤信号无大单撤销") if need_cancel: # 需要撤买 cls.cancel_buy(code) if cancel_end_index >= end_index: return # 继续处理下单信号 threshold_money = cls.__get_threshmoney(code) cls.__start_compute_buy(code, cancel_end_index + 1, end_index, threshold_money, capture_time, False) else: # 是否有虚拟下单 unreal_buy_info = cls.unreal_buy_dict.get(code) if unreal_buy_info is not None: cls.debug(code, "有虚拟下单,无买撤信号,开始执行买入,执行位置:{},截图时间:{}", unreal_buy_info[0], capture_time) # unreal_buy_info 的内容格式为:(触法买操作下标,截图时间) # 真实下单 cls.__buy(code, unreal_buy_info[1], local_today_datas[code][unreal_buy_info[0]], unreal_buy_info[0]) # 过时 开始计算撤的信号 @classmethod def __start_compute_cancel(cls, code, cancel_index, compute_start_index, origin_num, threshold_money, capture_time): # sure_type 0-虚拟挂买位 1-真实挂买位 cancel_single = cancel_index is not None computed_index, buy_num_for_cancel, sure_type = cls.__sum_buy_num_for_cancel_order(code, compute_start_index, origin_num, threshold_money, cancel_single) total_datas = local_today_datas[code] if computed_index is not None: cls.debug(code, "获取到撤单执行信号,信号位置:{},m2:{} 数据:{}", computed_index, threshold_money, total_datas[computed_index]) # 发出撤买信号,需要撤买 if cls.unreal_buy_dict.get(code) is not None: # 有虚拟下单 cls.debug(code, "之前有虚拟下单,执行虚拟撤买") # 删除虚拟下单标记 cls.unreal_buy_dict.pop(code) # 删除下单标记位置 TradePointManager.delete_buy_point(code) else: # 无虚拟下单,需要执行撤单 cls.debug(code, "之前无虚拟下单,执行真实撤单") cls.__cancel_buy(code) if computed_index < len(local_today_datas[code]) - 1: # 数据尚未处理完,重新进入下单计算流程 cls.__start_compute_buy(code, computed_index + 1, threshold_money, capture_time, False) pass else: cls.debug(code, "撤买纯买额计算,计算位置:{}-{},目前为止纯买手数:{}", compute_start_index, total_datas[-1]["index"], buy_num_for_cancel) # 无需撤买,设置计算信息 TradePointManager.set_compute_info_for_cancel_buy(code, int(total_datas[-1]["index"]), buy_num_for_cancel) # 判断是否有虚拟下单 unreal_buy_info = cls.unreal_buy_dict.get(code) if unreal_buy_info is not None: # unreal_buy_info 的内容格式为:(触法买操作下标,截图时间) # 真实下单 cls.debug(code, "无撤单执行信号,有虚拟下单,执行真实下单") cls.__buy(code, unreal_buy_info[1], total_datas[unreal_buy_info[0]], unreal_buy_info[0]) pass else: # 终止执行 pass @classmethod def __buy(cls, code, capture_timestamp, last_data, last_data_index): can, reason = cls.__can_buy(code) # 不能购买 if not can: cls.debug(code, "不可以下单,原因:{}", reason) return else: cls.debug(code, "可以下单,原因:{}", reason) # 删除虚拟下单 if code in cls.unreal_buy_dict: cls.unreal_buy_dict.pop(code) cls.debug(code, "开始执行买入") try: trade_manager.start_buy(code, capture_timestamp, last_data, last_data_index) TradePointManager.delete_buy_cancel_point(code) cls.debug(code, "执行买入成功") except Exception as e: cls.debug(code, "执行买入异常:{}", str(e)) pass finally: cls.debug(code, "m值影响因子:", l2_trade_factor.L2TradeFactorUtil.factors_to_string(code)) # 是否可以买 @classmethod def __can_buy(cls, code): limit_up_time = limit_up_time_manager.get_limit_up_time(code) if limit_up_time is not None and L2DataUtil.get_time_as_second(limit_up_time) >= L2DataUtil.get_time_as_second( "14:30:00"): return False, "14:30后涨停的不能买,涨停时间为{}".format(limit_up_time) # 同一板块中老二后面的不能买 industry, codes = ths_industry_util.get_same_industry_codes(code, gpcode_manager.get_gp_list()) if industry is None: return True, "没有获取到行业" codes_index = limit_up_time_manager.sort_code_by_limit_time(codes) if codes_index is not None and codes_index.get(code) is not None and codes_index.get(code) > 1: return False, "同一板块中老三,老四,...不能买" # 13:00后涨停,本板块中涨停票数<29不能买 limit_up_time = limit_up_time_manager.get_limit_up_time(code) if limit_up_time is not None: if int(limit_up_time.replace(":", "")) >= 130000 and global_util.industry_hot_num.get(industry) is not None: if global_util.industry_hot_num.get(industry) < 29: return False, "13:00后涨停,本板块中涨停票数<29不能买" # 老二,本板块中涨停票数<29 不能买 if codes_index.get(code) is not None and codes_index.get(code) == 1 and global_util.industry_hot_num.get( industry) is not None: if global_util.industry_hot_num.get(industry) < 29: return False, "老二,本板块中涨停票数<29不能买" # 可以下单 return True, None @classmethod def __cancel_buy(cls, code): try: cls.debug(code, "开始执行撤单") trade_manager.start_cancel_buy(code) # 取消买入标识 TradePointManager.delete_buy_point(code) TradePointManager.delete_buy_cancel_point(code) TradePointManager.delete_compute_info_for_cancel_buy(code) TradePointManager.delete_count_info_for_cancel_buy(code) # 删除大群撤事件的大单 L2BetchCancelBigNumProcessor.del_recod(code) cls.debug(code, "执行撤单成功") except Exception as e: cls.debug(code, "执行撤单异常:{}", str(e)) @classmethod def cancel_buy(cls, code): # 删除大群撤事件的大单 L2BetchCancelBigNumProcessor.del_recod(code) L2ContinueLimitUpCountManager.del_data(code) if code in cls.unreal_buy_dict: cls.unreal_buy_dict.pop(code) # 取消买入标识 TradePointManager.delete_buy_point(code) TradePointManager.delete_buy_cancel_point(code) TradePointManager.delete_compute_info_for_cancel_buy(code) TradePointManager.delete_count_info_for_cancel_buy(code) # 删除大群撤事件的大单 L2BetchCancelBigNumProcessor.del_recod(code) else: cls.__cancel_buy(code) L2BigNumProcessor.del_big_num_pos(code) @classmethod def __start_compute_buy(cls, code, compute_start_index, compute_end_index, threshold_money, capture_time, new_add=True): if compute_end_index < compute_start_index: return total_datas = local_today_datas[code] # 获取买入信号计算起始位置 buy_single_index, buy_exec_index, buy_compute_index, num = cls.__get_order_begin_pos(code) # 是否为新获取到的位置 new_get_pos = False if buy_single_index is None: # 有买入信号 has_single, _index = cls.__compute_order_begin_pos(code, max( compute_start_index - 2 if new_add else compute_start_index, 0), 3, compute_end_index) buy_single_index = _index if has_single: num = 0 new_get_pos = True cls.debug(code, "获取到买入信号起始点:{} 数据:{}", buy_single_index, total_datas[buy_single_index]) limit_up_time_manager.save_limit_up_time(code, total_datas[buy_single_index]["val"]["time"]) # 重置大单计算 big_money_num_manager.reset(code) if buy_single_index is None: # 未获取到买入信号,终止程序 return None # TODO 可能存在问题 计算大单数量 cls.__compute_big_money_data(code, max(compute_start_index, buy_single_index), compute_end_index) # 买入纯买额统计 compute_index, buy_nums, rebegin_buy_pos = cls.__sum_buy_num_for_order_3(code, max(buy_single_index, compute_start_index), compute_end_index, num, threshold_money, buy_single_index, capture_time) if rebegin_buy_pos is not None: # 需要重新计算纯买额 cls.__start_compute_buy(code, rebegin_buy_pos, compute_end_index, threshold_money, capture_time, False) return if compute_index is not None: cls.debug(code, "获取到买入执行位置:{} m值:{} 纯买手数:{} 数据:{}", compute_index, threshold_money, buy_nums, total_datas[compute_index]) # 记录买入信号位置 cls.__save_order_begin_data(code, buy_single_index, compute_index, compute_index, buy_nums) # 虚拟下单 cls.unreal_buy_dict[code] = (compute_index, capture_time) # 删除之前的所有撤单信号 TradePointManager.delete_buy_cancel_point(code) TradePointManager.delete_compute_info_for_cancel_buy(code) TradePointManager.delete_count_info_for_cancel_buy(code) TradeBuyDataManager.remove_buy_position_info(code) # 已过时 为买撤保存基础纯买额 # TradePointManager.set_compute_info_for_cancel_buy(code, compute_index, buy_nums) b_buy_count, b_buy_cancel_count = cls.__count_l2_data_before_for_cancel(code, buy_single_index) buy_count, buy_cancel_count = cls.__count_l2_data_for_cancel(code, buy_single_index, compute_index) TradePointManager.set_count_info_for_cancel_buy(code, compute_index, b_buy_count + buy_count, b_buy_cancel_count + buy_cancel_count) # 计算大单(从买入信号起始点到挂单执行点),返回是否取消 cancel_result, cancel_data = L2BigNumProcessor.process_cancel_with_big_num(code, buy_single_index, compute_index) # 计算大群撤的大单 L2BetchCancelBigNumProcessor.process_new(code, buy_single_index, compute_index) # 连续涨停数计算 L2ContinueLimitUpCountManager.process(code, buy_single_index, compute_index) # 数据是否处理完毕 if compute_index >= compute_end_index: cls.debug(code, "数据处理完毕,下单, 数据截图时间-{}", capture_time) # 数据已经处理完毕,如果还没撤单就实际下单 if not cancel_result: cls.__buy(code, capture_time, total_datas[compute_index], compute_index) else: # 数据尚未处理完毕,进行下一步处理 cls.debug(code, "数据尚未处理完毕,进行下一步处理,处理进度:{}", compute_index) # 如果还没撤单,就继续处理已下单的步骤 if not cancel_result: cls.__process_order(code, compute_index + 1, compute_end_index, capture_time, False) else: cls.__start_compute_buy(code, compute_index + 1, compute_end_index, threshold_money, capture_time, False) else: # 未达到下单条件,保存纯买额,设置纯买额 # 记录买入信号位置 cls.__save_order_begin_data(code, buy_single_index, -1, compute_end_index, buy_nums) pass # 获取下单起始信号 @classmethod def __get_order_begin_pos(cls, code): buy_single_index, buy_exec_index, compute_index, num = TradePointManager.get_buy_compute_start_data(code) return buy_single_index, buy_exec_index, compute_index, num @classmethod def __save_order_begin_data(self, code, buy_single_index, buy_exec_index, compute_index, num): TradePointManager.set_buy_compute_start_data(code, buy_single_index, buy_exec_index, compute_index, num) # 计算下单起始信号 # compute_data_count 用于计算的l2数据数量 @classmethod def __compute_order_begin_pos(cls, code, start_index, continue_count, end_index): # 倒数100条数据查询 datas = local_today_datas[code] if end_index - start_index + 1 < continue_count: return False, None __time = None last_index = None count = 0 start = None for i in range(start_index, end_index + 1): _val = datas[i]["val"] # 时间要>=09:30:00 if L2DataUtil.get_time_as_second(_val["time"]) < second_930: continue if L2DataUtil.is_limit_up_price_buy(_val) and (last_index is None or ( i - last_index == 1 and datas[last_index]["val"]["time"] == datas[i]["val"]["time"])): if start is None: start = i last_index = i count += datas[i]["re"] if count >= continue_count: return True, start elif not L2DataUtil.is_limit_up_price_sell(_val): last_index = None count = 0 start = None return False, None # 大群撤事件,最多相隔1s @classmethod def __compute_order_cancel_begin_single(cls, code, start_index, continue_count, end_index): datas = local_today_datas[code] if end_index - start_index + 1 < continue_count: return None, None count = 0 start = -1 start_time = None for i in range(start_index, end_index + 1): _val = datas[i]["val"] _timestamp = L2DataUtil.get_time_as_second(_val["time"]) if L2DataUtil.get_time_as_second(_val["time"]) < second_930: continue if L2DataUtil.is_limit_up_price_buy_cancel(_val) and (start_time is None or _timestamp - start_time < 2): if start == -1: start = i start_time = L2DataUtil.get_time_as_second(_val["time"]) count += datas[i]["re"] elif not L2DataUtil.is_limit_up_price_sell(_val): if count >= continue_count: return start, i - 1 start = -1 count = 0 start_time = None if count >= continue_count: return start, end_index else: return None, None # 小群撤事件 @classmethod def __compute_order_cancel_little_begin_single(cls, code, start_index, continue_count, end_index=None): # 必须为同一秒的数据 same_second = True datas = local_today_datas[code] __len = len(datas) if len(datas) - start_index < continue_count: return None, None count = 0 start = -1 start_time = None if end_index is None: end_index = __len - continue_count for i in range(start_index, end_index + 1): _val = datas[i]["val"] _timestamp = L2DataUtil.get_time_as_second(_val["time"]) if _timestamp < second_930: continue # 间隔时间不能多于1s if L2DataUtil.is_limit_up_price_buy_cancel(_val) and (start_time is None or _timestamp - start_time < 2): if start == -1: start = i start_time = L2DataUtil.get_time_as_second(_val["time"]) count += int(datas[i]["re"]) elif not L2DataUtil.is_limit_up_price_sell(_val): if count >= continue_count: return start, i - 1 start = -1 count = 0 start_time = None if count >= continue_count: return start, end_index else: return None, None # 虚拟下单 def __unreal_order(self): pass @classmethod def __get_threshmoney(cls, code): return l2_trade_factor.L2TradeFactorUtil.compute_m_value(code) # 获取预估挂买位 @classmethod def __get_sure_order_pos(cls, code): index, data = TradeBuyDataManager.get_buy_sure_position(code) if index is None: return 0, len(local_today_datas[code]) - 1, local_today_datas[code][-1] else: return 1, index, data # 过时 统计买入净买量 @classmethod def __sum_buy_num_for_order(cls, code, compute_start_index, origin_num, threshold_money): total_datas = local_today_datas[code] buy_nums = origin_num limit_up_price = gpcode_manager.get_limit_up_price(code) if limit_up_price is None: raise Exception("涨停价无法获取") threshold_num = threshold_money / (limit_up_price * 100) for i in range(compute_start_index, len(total_datas)): _val = total_datas[i]["val"] # 有连续4个涨停买就标记计算起始点 if L2DataUtil.is_limit_up_price_buy(_val): # 涨停买 buy_nums += int(_val["num"]) * int(total_datas[i]["re"]) if buy_nums >= threshold_num: cls.debug(code, "获取到买入执行点:{} 统计纯买手数:{} 目标纯买手数:{}", i, buy_nums, threshold_num) return i, buy_nums elif L2DataUtil.is_limit_up_price_buy_cancel(_val): # 涨停买撤 buy_nums -= int(_val["num"]) * int(total_datas[i]["re"]) cls.debug(code, "尚未获取到买入执行点,起始计算位置:{} 统计纯买手数:{} 目标纯买手数:{}", compute_start_index, buy_nums, threshold_num) return None, buy_nums # 过时 统计买入净买量,不计算在买入信号之前的买撤单 @classmethod def __sum_buy_num_for_order_2(cls, code, compute_start_index, origin_num, threshold_money, buy_single_index): total_datas = local_today_datas[code] buy_nums = origin_num limit_up_price = gpcode_manager.get_limit_up_price(code) if limit_up_price is None: raise Exception("涨停价无法获取") threshold_num = threshold_money / (limit_up_price * 100) property_buy_num_count = 0 same_time_property = cls.__get_same_time_property(code) for i in range(compute_start_index, len(total_datas)): data = total_datas[i] _val = total_datas[i]["val"] # 有连续4个涨停买就标记计算起始点 if L2DataUtil.is_limit_up_price_buy(_val): # 涨停买 buy_nums += int(_val["num"]) * int(total_datas[i]["re"]) if buy_nums >= threshold_num: logger_l2_trade_buy.info("{}获取到买入执行点:{} 统计纯买手数:{} 目标纯买手数:{}", code, i, buy_nums, threshold_num) elif L2DataUtil.is_limit_up_price_buy_cancel(_val): # 涨停买撤 # 判断买入位置是否在买入信号之前 buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(total_datas[i], local_today_num_operate_map.get(code)) if buy_index is not None: # 找到买撤数据的买入点 if buy_index >= buy_single_index: buy_nums -= int(_val["num"]) * int(data["re"]) cls.buy_debug(code, "{}数据在买入信号之后 撤买纯买手数:{} 目标手数:{}", i, buy_nums, threshold_num) else: cls.buy_debug(code, "{}数据在买入信号之前,买入位:{}", i, buy_index) if total_datas[buy_single_index]["val"]["time"] == buy_data["val"]["time"]: # 同一秒,而且还在预估买入位之后按概率计算 property_buy_num_count -= int(_val["num"]) * int(data["re"]) cls.buy_debug(code, "{}数据买入位与预估买入位在同一秒", i) else: # 未找到买撤数据的买入点 cls.cancel_debug(code, "未找到买撤数据的买入点: 位置-{} 数据-{}", i, data) buy_nums -= int(_val["num"]) * int(total_datas[i]["re"]) property_buy_num = round(property_buy_num_count * same_time_property) cls.buy_debug(code, "买入信号点之前同一秒买入手数-{},位置-{},总手数:{},目标手数:{}", property_buy_num, i, buy_nums + property_buy_num, threshold_num) # 有撤单信号,且小于阈值 if buy_nums + property_buy_num >= threshold_num: return i, buy_nums + property_buy_num cls.buy_debug(code, "尚未获取到买入执行点,起始计算位置:{} 统计纯买手数:{} 目标纯买手数:{}", compute_start_index, buy_nums + property_buy_num, threshold_num) return None, buy_nums + property_buy_num # 统计买入净买量,不计算在买入信号之前的买撤单 @classmethod def __sum_buy_num_for_order_3(cls, code, compute_start_index, compute_end_index, origin_num, threshold_money, buy_single_index, capture_time): total_datas = local_today_datas[code] buy_nums = origin_num limit_up_price = gpcode_manager.get_limit_up_price(code) if limit_up_price is None: raise Exception("涨停价无法获取") threshold_num = threshold_money / (limit_up_price * 100) buy_single_time_seconds = L2DataUtil.get_time_as_second(total_datas[buy_single_index]["val"]["time"]) for i in range(compute_start_index, compute_end_index + 1): data = total_datas[i] _val = total_datas[i]["val"] if L2DataUtil.get_time_as_second(_val["time"]) - buy_single_time_seconds > 1: TradePointManager.delete_buy_point(code) if i == compute_end_index: # 数据处理完毕 return None, buy_nums, None else: # 计算买入信号,不能同一时间开始计算 for ii in range(buy_single_index + 1, compute_end_index + 1): if total_datas[buy_single_index]["val"]["time"] != total_datas[ii]["val"]["time"]: return None, buy_nums, ii # 涨停买 if L2DataUtil.is_limit_up_price_buy(_val): # 涨停买 buy_nums += int(_val["num"]) * int(total_datas[i]["re"]) if buy_nums >= threshold_num: logger_l2_trade_buy.info("{}获取到买入执行点:{} 统计纯买手数:{} 目标纯买手数:{}", code, i, buy_nums, threshold_num) elif L2DataUtil.is_limit_up_price_buy_cancel(_val): # 涨停买撤 # 判断买入位置是否在买入信号之前 buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(total_datas[i], local_today_num_operate_map.get(code)) if buy_index is not None: # 找到买撤数据的买入点 if buy_index >= buy_single_index: buy_nums -= int(_val["num"]) * int(data["re"]) cls.buy_debug(code, "{}数据在买入信号之后 撤买纯买手数:{} 目标手数:{}", i, buy_nums, threshold_num) else: cls.buy_debug(code, "{}数据在买入信号之前,买入位:{}", i, buy_index) if total_datas[buy_single_index]["val"]["time"] == buy_data["val"]["time"]: # 同一秒,当作买入信号之后处理 buy_nums -= int(_val["num"]) * int(data["re"]) cls.buy_debug(code, "{}数据买入位与预估买入位在同一秒", i) else: # 未找到买撤数据的买入点 cls.buy_debug(code, "未找到买撤数据的买入点: 位置-{} 数据-{}", i, data) buy_nums -= int(_val["num"]) * int(total_datas[i]["re"]) cls.buy_debug(code, "位置-{},总手数:{},目标手数:{}", i, buy_nums, threshold_num) # 有撤单信号,且小于阈值 if buy_nums >= threshold_num: return i, buy_nums, None cls.buy_debug(code, "尚未获取到买入执行点,起始计算位置:{} 统计纯买手数:{} 目标纯买手数:{}", compute_start_index, buy_nums, threshold_num) return None, buy_nums, None # 计算买入信号之前的且和买入信号数据在同一时间的数量 @classmethod def __count_l2_data_before_for_cancel(cls, code, buy_single_index): total_data = local_today_datas[code] single_time = total_data[buy_single_index]["val"]["time"] buy_count = 0 cancel_count = 0 for i in range(buy_single_index, -1, -1): if single_time == total_data[i]["val"]["time"]: if L2DataUtil.is_limit_up_price_buy(total_data[i]["val"]): buy_count += int(total_data[i]["re"]) elif L2DataUtil.is_limit_up_price_buy(total_data[i]["val"]): cancel_count += int(total_data[i]["re"]) else: break return buy_count, cancel_count @classmethod def __count_l2_data_for_cancel(cls, code, start_index, end_index): total_data = local_today_datas[code] buy_count = 0 cancel_count = 0 for i in range(start_index, end_index + 1): if L2DataUtil.is_limit_up_price_buy(total_data[i]["val"]): buy_count += int(total_data[i]["re"]) elif L2DataUtil.is_limit_up_price_buy_cancel(total_data[i]["val"]): cancel_count += int(total_data[i]["re"]) return buy_count, cancel_count # 同一时间买入的概率计算 @classmethod def __get_same_time_property(cls, code): # 计算板块热度 industry = global_util.code_industry_map.get(code) if industry is not None: hot_num = global_util.industry_hot_num.get(industry) if hot_num is not None: return 1 - l2_trade_factor.L2TradeFactorUtil.get_industry_rate(hot_num) return 0.5 # 过时 统计买撤净买量 @classmethod def __sum_buy_num_for_cancel_order(cls, code, start_index, origin_num, threshold_money, cancel_single=True): buy_nums = origin_num total_datas = local_today_datas[code] limit_up_price = gpcode_manager.get_limit_up_price(code) if limit_up_price is None: raise Exception("涨停价无法获取") threshold_num = threshold_money / (limit_up_price * 100) # 获取预估挂买位 sure_type:0 虚拟挂买 1 实际挂买 sure_type, sure_pos, sure_data = cls.__get_sure_order_pos(code) same_time_property = cls.__get_same_time_property(code) # 同一秒,在预估买入位之后的数据之和 property_buy_num_count = 0 cls.cancel_debug(code, "撤单纯买额计算位置:{}-{} 预估挂买位:{} 是否有撤单信号:{}", start_index, len(total_datas) - 1, sure_pos, cancel_single) for i in range(start_index, len(total_datas)): data = total_datas[i] _val = data["val"] if L2DataUtil.is_limit_up_price_buy(_val): # 涨停买 if i < sure_pos: buy_nums += int(_val["num"]) * int(data["re"]) elif sure_data["val"]["time"] == _val["time"]: # 同一秒买入,而且还在预估买入位之后 property_buy_num_count += int(_val["num"]) * int(data["re"]) elif L2DataUtil.is_limit_up_price_buy_cancel(_val): # 涨停撤买 # 判断买入位置是否在买入信号之前 buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data, local_today_num_operate_map.get(code)) if buy_index is not None: # 找到买撤数据的买入点 if buy_index < sure_pos: buy_nums -= int(_val["num"]) * int(data["re"]) cls.cancel_debug(code, "{}数据在预估买入位之前 撤买纯买手数:{} 目标手数:{}", i, buy_nums, threshold_num) else: cls.cancel_debug(code, "{}数据在预估买入位之后,买入位:{}", i, buy_index) if sure_data["val"]["time"] == buy_data["val"]["time"]: # 同一秒,而且还在预估买入位之后按概率计算 property_buy_num_count -= int(_val["num"]) * int(data["re"]) cls.debug(code, "{}数据买入位与预估买入位在同一秒", i) else: # 未找到买撤数据的买入点 cls.cancel_debug(code, "未找到买撤数据的买入点: 位置-{} 数据-{}", i, data) property_buy_num = round(property_buy_num_count * same_time_property) cls.cancel_debug(code, "预估买入点之后同一秒买入手数-{},位置-{},总手数:{},目标手数:{}", property_buy_num, i, buy_nums + property_buy_num, threshold_num) # 有撤单信号,且小于阈值 if buy_nums + property_buy_num <= threshold_num and cancel_single: return i, buy_nums + property_buy_num, sure_type buy_num_news = buy_nums + round(property_buy_num_count * same_time_property) cls.cancel_debug(code, "处理起始位置:{} 最终纯买额:{}", start_index, buy_num_news) return None, buy_num_news, sure_type # 统计买撤净买量 @classmethod def __count_num_for_cancel_order(cls, code, start_index, origin_buy_num, origin_cancel_num, min_rate, betch_cancel_single=True): buy_nums = origin_buy_num buy_cancel_num = origin_cancel_num total_datas = local_today_datas[code] limit_up_price = gpcode_manager.get_limit_up_price(code) if limit_up_price is None: raise Exception("涨停价无法获取") # 获取预估挂买位 sure_type:0 虚拟挂买 1 实际挂买 for i in range(start_index, len(total_datas)): data = total_datas[i] _val = data["val"] if L2DataUtil.is_limit_up_price_buy(_val): # 涨停买 buy_nums += int(data["re"]) elif L2DataUtil.is_limit_up_price_buy_cancel(_val): buy_cancel_num += int(data["re"]) # 有撤单信号,且小于阈值 if (buy_nums - buy_cancel_num) / buy_cancel_num <= min_rate and betch_cancel_single: return i, buy_nums, buy_cancel_num return None, buy_nums, buy_cancel_num @classmethod def test(cls): code = "000593" load_l2_data(code, True) if False: state = trade_manager.get_trade_state(code) cls.random_key[code] = random.randint(0, 100000) capture_timestamp = 1999988888 try: if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER: # 已挂单 cls.__process_order(code, 201, 237, capture_timestamp) else: # 未挂单 cls.__process_not_order(code, 201, 237, capture_timestamp) except Exception as e: logging.exception(e) return _start = t.time() # 按s批量化数据 total_datas = local_today_datas[code] start_time = total_datas[0]["val"]["time"] start_index = 0 for i in range(0, len(total_datas)): if total_datas[i]["val"]["time"] != start_time: cls.random_key[code] = random.randint(0, 100000) # 处理数据 start = start_index # if start != 201: # continue end = i - 1 print("处理进度:{},{}".format(start, end)) capture_timestamp = 1999999999 state = trade_manager.get_trade_state(code) try: if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER: # 已挂单 cls.__process_order(code, start, end, capture_timestamp) else: # 未挂单 cls.__process_not_order(code, start, end, capture_timestamp) except Exception as e: logging.exception(e) # t.sleep(1) start_index = i start_time = total_datas[i]["val"]["time"] print("时间花费:", round((t.time() - _start) * 1000)) @classmethod def test1(cls): code = "000593" load_l2_data(code, True) print(cls.__compute_order_begin_pos(code, 232, 3, 239)) @classmethod def test2(cls): code = "600082" load_l2_data(code, True) cls.random_key[code] = random.randint(0, 100000) need_cancel, cancel_data = L2BigNumProcessor.process_cancel_with_big_num(code, 121, 123) @classmethod def test_can_order(cls): code = "000948" global_util.load_industry() limit_up_time_manager.load_limit_up_time() print(cls.__can_buy(code)) # 连续涨停买单数最大值管理器 class L2ContinueLimitUpCountManager: @classmethod def del_data(cls, code): cls.__del_last_record(code) cls.__del_max(code) # 获取最大值 @classmethod def __get_max(cls, code): key = "max_same_time_buy_count-{}".format(code) redis = _redisManager.getRedis() val = redis.get(key) if val is not None: return int(val) else: return None # 保存最大值 @classmethod def __save_max(cls, code, max_num): key = "max_same_time_buy_count-{}".format(code) redis = _redisManager.getRedis() redis.setex(key, tool.get_expire(), max_num) @classmethod def __del_max(cls, code): key = "max_same_time_buy_count-{}".format(code) redis = _redisManager.getRedis() redis.delete(key) # 保存上一条数据最大值 @classmethod def __save_last_record(cls, code, _time, count, index): key = "same_time_buy_last_count-{}".format(code) redis = _redisManager.getRedis() redis.setex(key, tool.get_expire(), json.dumps((_time, count, index))) @classmethod def __del_last_record(cls, code): key = "same_time_buy_last_count-{}".format(code) redis = _redisManager.getRedis() redis.delete(key) @classmethod def __get_last_record(cls, code): key = "same_time_buy_last_count-{}".format(code) redis = _redisManager.getRedis() val = redis.get(key) if val is None: return None, None, None else: val = json.loads(val) return val[0], val[1], val[2] @classmethod def process(cls, code, start_index, end_index): last_time, last_count, last_index = cls.__get_last_record(code) total_datas = local_today_datas[code] time_count_dict = {} for index in range(start_index, end_index + 1): if last_index is not None and last_index >= index: continue if L2DataUtil.is_limit_up_price_buy(total_datas[index]["val"]): if last_count is None: last_count = 0 last_time = total_datas[index]["val"]["time"] last_index = index if last_time == total_datas[index]["val"]["time"]: last_count += total_datas[index]["re"] last_index = index else: if last_count is not None and last_count > 0: time_count_dict[last_time] = last_count last_count = total_datas[index]["re"] last_time = total_datas[index]["val"]["time"] last_index = index else: if last_count is not None and last_count > 0: time_count_dict[last_time] = last_count last_count = 0 last_time = None last_index = None if last_count is not None and last_count > 0: time_count_dict[last_time] = last_count # 保存latest cls.__save_last_record(code, last_time, last_count, last_index) else: # 移除 cls.__del_last_record(code) # 查找这批数据中的最大数量 max_time = None max_num = None for key in time_count_dict: if max_time is None: max_time = key max_num = time_count_dict[key] if time_count_dict[key] > max_num: max_num = time_count_dict[key] max_time = key if max_num is not None: old_max = cls.__get_max(code) if old_max is None or max_num > old_max: cls.__save_max(code, max_num) @classmethod def get_continue_count(cls, code): count = cls.__get_max(code) if count is None: count = 0 count = count // 3 if count < 15: count = 15 return count # 大单处理器 class L2BigNumProcessor: # 是否需要根据大单撤单,返回是否需要撤单与撤单信号的数据 @classmethod def __need_cancel_with_max_num(cls, code, max_num_info, start_index, end_index): if max_num_info is None: return False, None # 如果是买入单,需要看他前面同一秒是否有撤单 if int(max_num_info["val"]["operateType"]) == 0: # 只有买撤信号在买入信号之前的同一秒的单才会撤单情况 _map = local_today_num_operate_map.get(code) if _map is not None: cancel_datas = _map.get( "{}-{}-{}".format(max_num_info["val"]["num"], "1", max_num_info["val"]["price"])) if cancel_datas is not None: for cancel_data in cancel_datas: # 只能在当前规定的数据范围查找,以防出现重复查找 if cancel_data["index"] < start_index or cancel_data["index"] > end_index: continue if cancel_data["index"] > max_num_info["index"]: buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(cancel_data, local_today_num_operate_map[ code]) if buy_index is None: continue if buy_data["val"]["time"] != max_num_info["val"]["time"]: continue min_space, max_space = l2_data_util.compute_time_space_as_second( cancel_data["val"]["cancelTime"], cancel_data["val"][ "cancelTimeUnit"]) if min_space < 60: L2TradeDataProcessor.cancel_debug(code, "找到大单撤单,但撤单间隔时间小于60s,撤单数据-{}", json.dumps(cancel_data)) return True, cancel_data else: # 如果间隔时间大于等于60s,这判断小群撤事件 L2TradeDataProcessor.cancel_debug(code, "找到大单撤单,但撤单间隔时间大于60s,撤单数据-{}", json.dumps(cancel_data)) return False, cancel_data return False, None else: return True, None # 计算数量最大的涨停买/涨停撤 @classmethod def __compute_max_num(cls, code, start_index, end_index, max_num_info, buy_exec_time): new_max_info = max_num_info max_num = 0 if max_num_info is not None: max_num = int(max_num_info["val"]["num"]) # 计算大单 total_data = local_today_datas[code] for i in range(start_index, end_index + 1): data = total_data[i] val = data["val"] if not L2DataUtil.is_limit_up_price_buy(val) and not L2DataUtil.is_limit_up_price_buy_cancel( val): continue # 下单时间与买入执行时间之差大于60s的不做处理 if l2_data_util.get_time_as_seconds(val["time"]) - l2_data_util.get_time_as_seconds(buy_exec_time) > 1: continue if L2DataUtil.is_limit_up_price_buy(val): pass elif L2DataUtil.is_limit_up_price_buy_cancel(val): min_space, max_space = l2_data_util.compute_time_space_as_second(val["cancelTime"], val["cancelTimeUnit"]) # 只能处理1s内的撤单 if min_space > 1: continue # buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data, # local_today_num_operate_map.get(code)) # if buy_index is None: # continue # if l2_data_util.get_time_as_seconds(buy_data["val"]["time"]) - l2_data_util.get_time_as_seconds( # buy_exec_time) > 1: # continue num = int(total_data[i]["val"]["num"]) if num > max_num: max_num = num new_max_info = data return new_max_info @classmethod def __save_big_num_pos(cls, code, index): redis = _redisManager.getRedis() redis.setex("big_num_pos-{}".format(code), tool.get_expire(), index) @classmethod def __get_big_num_pos(cls, code): redis = _redisManager.getRedis() index = redis.get("big_num_pos-{}".format(code)) if index is not None: return int(index) return index @classmethod def del_big_num_pos(cls, code): redis = _redisManager.getRedis() redis.delete("big_num_pos-{}".format(code)) @classmethod def __cancel_buy(cls, code, index): L2TradeDataProcessor.debug(code, "撤买,触发位置-{},触发条件:大单,数据:{}", index, local_today_datas[code][index]) L2TradeDataProcessor.cancel_buy(code) # 处理数据中的大单,返回是否已经撤单和撤单数据的时间 @classmethod def process_cancel_with_big_num(cls, code, start_index, end_index): total_data = local_today_datas[code] # 如果无下单信号就无需处理 buy_single_index, buy_exec_index, compute_index, nums = TradePointManager.get_buy_compute_start_data(code) if buy_single_index is None or buy_exec_index is None or buy_exec_index < 0: return False, None # 判断是否有大单记录 index = cls.__get_big_num_pos(code) # 无大单记录 if index is None: # 计算大单 new_max_info = cls.__compute_max_num(code, start_index, end_index, None, total_data[buy_exec_index]["val"]["time"]) if new_max_info is None: return False, None L2TradeDataProcessor.debug(code, "获取到大单位置信息:{}", json.dumps(new_max_info)) index = new_max_info["index"] # 大单是否有撤单信号 need_cancel, cancel_data = cls.__need_cancel_with_max_num(code, new_max_info, start_index, end_index) if need_cancel: # 需要撤单 # 撤单 L2TradeDataProcessor.cancel_debug(code, "新找到大单-{},需要撤买", new_max_info["index"]) cls.__cancel_buy(code, new_max_info["index"]) return True, cancel_data, else: # 无需撤单 # 保存大单记录 cls.__save_big_num_pos(code, index) return False, None else: # 有大单记录 need_cancel = False cancel_index = -1 need_cancel, cancel_data = cls.__need_cancel_with_max_num(code, total_data[index], start_index, end_index) # 需要撤单 if need_cancel: # 撤单 cls.__cancel_buy(code, cancel_index) return True, cancel_data # 无需撤单 else: # 计算新的大单 max_num_data = cls.__compute_max_num(code, start_index, end_index, total_data[index], total_data[buy_exec_index]["val"]["time"]) if index == int(max_num_data["index"]): return False, cancel_data L2TradeDataProcessor.debug(code, "找到大单位置信息:{}", json.dumps(max_num_data)) # 大单是否有撤单信号 need_cancel, cancel_data = cls.__need_cancel_with_max_num(code, max_num_data, max_num_data["index"], end_index) if need_cancel: # 需要撤单 # 撤单 cls.__cancel_buy(code, max_num_data["index"] if cancel_data is None else cancel_data) L2TradeDataProcessor.cancel_debug(code, "原来跟踪到大单无撤买信号-{},新跟踪的大单需要撤买-{}", index, max_num_data["index"]) return True, cancel_data else: # 无需撤单 # 保存大单记录 cls.__save_big_num_pos(code, max_num_data["index"]) return False, cancel_data @classmethod def test(cls): code = "000036" load_l2_data(code, True) new_max_info = cls.__compute_max_num(code, 470, 476, None, "09:32:59") print(new_max_info) # 大群撤大单跟踪 class L2BetchCancelBigNumProcessor: @classmethod def __get_recod(cls, code): redis = _redisManager.getRedis() _val = redis.get("betch_cancel_big_num-{}".format(code)) if _val is None: return None, None else: datas = json.loads(_val) return datas[0], datas[1] @classmethod def del_recod(cls, code): redis = _redisManager.getRedis() key = "betch_cancel_big_num-{}".format(code) redis.delete(key) @classmethod def __save_recod(cls, code, max_big_num_info, big_nums_info): redis = _redisManager.getRedis() key = "betch_cancel_big_num-{}".format(code) redis.setex(key, tool.get_expire(), json.dumps((max_big_num_info, big_nums_info))) # 暂时弃用 @classmethod def need_cancel(cls, code, start_index, end_index): # 是否需要撤单 max_big_num_info, big_nums_info = cls.__get_recod(code) if big_nums_info is None: # 无大单信息 return True nums_set = set() index_set = set() for d in big_nums_info: nums_set.add(d[0]) index_set.add(d[1]) total_datas = local_today_datas[code] count = 0 latest_buy_index = end_index for index in range(start_index, end_index + 1): if not nums_set.__contains__(total_datas[index]["val"]["num"]): continue buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(total_datas[index], local_today_num_operate_map[code]) if buy_index is None: continue if index_set.__contains__(buy_index): count += buy_data["re"] latest_buy_index = buy_index # 获取大单数量 total_count = 0 for i in index_set: if i <= latest_buy_index: total_count += total_datas[i]["re"] L2TradeDataProcessor.debug(code, "大群撤大单数量:{}/{}", count, total_count) # 大单小于5笔无脑撤,后修改为无大单无脑撤 if total_count <= 0: return True # 大单撤单笔数大于总大单笔数的1/5就撤单 if count / total_count >= 0.2: return True else: return False pass # def need_cancel(cls, code, start_index, end_index): # total_datas = local_today_datas[code] # for index in range(start_index,end_index+1): # price = total_datas[index]["val"]["price"] # num = total_datas[index]["val"]["num"] # if total_datas[index] # 过时 @classmethod def process(cls, code, start_index, end_index): # 处理大单 # 获取大单列表,大单格式为:((num,index,re),[(num,index,re),(num,index,re)]) total_datas = local_today_datas[code] max_big_num_info, big_nums_info = cls.__get_recod(code) # 寻找最大值 for index in range(start_index, end_index + 1): # 只处理涨停买与涨停买撤 if not L2DataUtil.is_limit_up_price_buy( total_datas[index]["val"]): continue if max_big_num_info is None: max_big_num_info = ( int(total_datas[start_index]["val"]["num"]), total_datas[start_index]["index"]) if int(total_datas[index]["val"]["num"]) > max_big_num_info[0]: max_big_num_info = ( int(total_datas[index]["val"]["num"]), total_datas[index]["index"]) # 将大于最大值90%的数据加入 if max_big_num_info is not None: min_num = round(max_big_num_info[0] * 0.9) for index in range(start_index, end_index + 1): # 只统计涨停买 if not L2DataUtil.is_limit_up_price_buy( total_datas[index]["val"]): continue if int(total_datas[index]["val"]["num"]) >= min_num: if big_nums_info is None: big_nums_info = [] big_nums_info.append((int(total_datas[index]["val"]["num"]), total_datas[index]["index"])) # 移除小于90%的数据 big_nums_info_new = [] index_set = set() for d in big_nums_info: if d[0] >= min_num: if not index_set.__contains__(d[1]): index_set.add(d[1]) big_nums_info_new.append(d) cls.__save_recod(code, max_big_num_info, big_nums_info_new) # 最新方法 @classmethod def process_new(cls, code, start_index, end_index): # 处理大单 # 获取大单列表,大单格式为:((num,index,re),[(num,index,re),(num,index,re)]) total_datas = local_today_datas[code] max_big_num_info, big_nums_info = cls.__get_recod(code) # 大于等于8000手或者金额>=300万就是大单 for index in range(start_index, end_index + 1): # 只统计涨停买 if not L2DataUtil.is_limit_up_price_buy( total_datas[index]["val"]): continue # 大于等于8000手或者金额 >= 300 # 万就是大单 if int(total_datas[index]["val"]["num"]) >= 8000 or int(total_datas[index]["val"]["num"]) * float( total_datas[index]["val"]["price"]) >= 30000: if big_nums_info is None: big_nums_info = [] big_nums_info.append((int(total_datas[index]["val"]["num"]), total_datas[index]["index"])) # 移除小于90%的数据 big_nums_info_new = [] index_set = set() if big_nums_info is not None: for d in big_nums_info: if not index_set.__contains__(d[1]): index_set.add(d[1]) big_nums_info_new.append(d) cls.__save_recod(code, max_big_num_info, big_nums_info_new) # 卖跟踪 class L2SellProcessor: @classmethod def __get_recod(cls, code): redis = _redisManager.getRedis() _val = redis.get("sell_num-{}".format(code)) if _val is None: return None, None else: datas = json.loads(_val) return datas[0], datas[1] @classmethod def del_recod(cls, code): redis = _redisManager.getRedis() key = "sell_num-{}".format(code) redis.delete(key) @classmethod def __save_recod(cls, code, process_index, count): redis = _redisManager.getRedis() key = "sell_num-{}".format(code) redis.setex(key, tool.get_expire(), json.dumps((process_index, count))) # 暂时弃用 @classmethod def need_cancel(cls, code, start_index, end_index): # 是否需要撤单 process_index, count = cls.__get_recod(code) if process_index is None: # 无卖的信息 return False if count is None: count = 0 limit_up_price = gpcode_manager.get_limit_up_price(code) if limit_up_price is None: return False if float(limit_up_price) * count * 100 >= l2_trade_factor.L2TradeFactorUtil.get_base_safe_val( global_util.zyltgb_map[code]): return True return False @classmethod def process(cls, code, start_index, end_index): # 处理大单 # 获取大单列表,大单格式为:((num,index,re),[(num,index,re),(num,index,re)]) total_datas = local_today_datas[code] process_index, count = cls.__get_recod(code) # 寻找最大值 for index in range(start_index, end_index + 1): # 只处理涨停卖 if not L2DataUtil.is_limit_up_price_sell( total_datas[index]["val"]): continue # 不处理历史数据 if process_index is not None and process_index >= index: continue if count is None: count = 0 count += int(total_datas[index]["val"]["num"]) if process_index is None: process_index = end_index cls.__save_recod(code, process_index, count) # 涨停封单额统计 class L2LimitUpMoneyStatisticUtil: _redisManager = redis_manager.RedisManager(1) @classmethod def __get_redis(cls): return cls._redisManager.getRedis() # 设置l2的每一秒涨停封单额数据 @classmethod def __set_l2_second_money_record(cls, code, time, num, from_index, to_index): old_num, old_from, old_to = cls.__get_l2_second_money_record(code, time) if old_num is None: old_num = num old_from = from_index old_to = to_index else: old_num += num old_to = to_index key = "l2_limit_up_second_money-{}-{}".format(code, time.replace(":", "")) cls.__get_redis().setex(key, tool.get_expire(), json.dumps((old_num, old_from, old_to))) @classmethod def __get_l2_second_money_record(cls, code, time): key = "l2_limit_up_second_money-{}-{}".format(code, time.replace(":", "")) val = cls.__get_redis().get(key) return cls.__format_second_money_record_val(val) @classmethod def __format_second_money_record_val(cls, val): if val is None: return None, None, None val = json.loads(val) return val[0], val[1], val[2] @classmethod def __get_l2_second_money_record_keys(cls, code, time_regex): key = "l2_limit_up_second_money-{}-{}".format(code, time_regex) keys = cls.__get_redis().keys(key) return keys # 设置l2最新的封单额数据 @classmethod def __set_l2_latest_money_record(cls, code, index, num): key = "l2_limit_up_money-{}".format(code) cls.__get_redis().setex(key, tool.get_expire(), json.dumps((num, index))) # 返回数量,索引 @classmethod def __get_l2_latest_money_record(cls, code): key = "l2_limit_up_money-{}".format(code) result = cls.__get_redis().get(key) if result: result = json.loads(result) return result[0], result[1] else: return 0, -1 # 矫正数据 # 矫正方法为取矫正时间两侧的秒分布数据,用于确定计算结束坐标 @classmethod def verify_num(cls, code, num, time_str): time_ = time_str.replace(":", "") key = None for i in range(4, -2, -2): # 获取本(分钟/小时/天)内秒分布数据 time_regex = "{}*".format(time_[:i]) keys_ = cls.__get_l2_second_money_record_keys(code, time_regex) if keys_ and len(keys_) > 1: # 需要排序 keys = [] for k in keys_: keys.append(k) keys.sort(key=lambda tup: int(tup.split("-")[-1])) # 有2个元素 for index in range(0, len(keys) - 1): time_1 = keys[index].split("-")[-1] time_2 = keys[index + 1].split("-")[-1] if int(time_1) <= int(time_) <= int(time_2): # 在此时间范围内 if time_ == time_2: key = keys[index + 1] else: key = keys[index] break if key: val = cls.__get_redis().get(key) old_num, old_from, old_to = cls.__format_second_money_record_val(val) end_index = old_to # 保存最近的数据 cls.__set_l2_latest_money_record(code, end_index, num) break # 计算量,用于涨停封单量的计算 @classmethod def __compute_num(cls, code, data, buy_single_data): if L2DataUtil.is_limit_up_price_buy_cancel(data["val"]) or L2DataUtil.is_sell(data["val"]): # 涨停买撤与卖 return 0 - int(data["val"]["num"]) * data["re"] else: # 卖撤 if L2DataUtil.is_sell_cancel(data["val"]): # 卖撤的买数据是否在买入信号之前,如果在之前就不计算,不在之前就计算 if l2_data_util.is_sell_index_before_target(data, buy_single_data, local_today_num_operate_map.get(code)): return 0 return int(data["val"]["num"]) * data["re"] @classmethod def clear(cls, code): key = "l2_limit_up_money-{}".format(code) cls.__get_redis().delete(key) # 返回取消的标志数据 # with_cancel 是否需要判断是否撤销 @classmethod def process_data(cls, code, start_index, end_index, buy_single_begin_index, with_cancel=True): start_time = round(t.time() * 1000) total_datas = local_today_datas[code] time_dict_num = {} # 记录计算的坐标 time_dict_num_index = {} num_dict = {} # 统计时间分布 time_dict = {} for i in range(start_index, end_index + 1): data = total_datas[i] val = data["val"] time_ = val["time"] if time_ not in time_dict: time_dict[time_] = i for i in range(start_index, end_index + 1): data = total_datas[i] val = data["val"] time_ = val["time"] if time_ not in time_dict_num: time_dict_num[time_] = 0 time_dict_num_index[time_] = {"s": i, "e": i} time_dict_num_index[time_]["e"] = i num = cls.__compute_num(code, data, total_datas[buy_single_begin_index]) num_dict[i] = num time_dict_num[time_] = time_dict_num[time_] + num for t_ in time_dict_num: cls.__set_l2_second_money_record(code, t_, time_dict_num[t_], time_dict_num_index[t_]["s"], time_dict_num_index[t_]["e"]) print("保存涨停封单额时间:", round(t.time() * 1000) - start_time) # 累计最新的金额 total_num, index = cls.__get_l2_latest_money_record(code) if index == -1: # 没有获取到最新的矫正封单额,需要从买入信号开始点计算 index = buy_single_begin_index - 1 total_num = 0 # TODO 待优化计算 cancel_index = None cancel_msg = None # 待计算量 limit_up_price = gpcode_manager.get_limit_up_price(code) min_volumn = round(10000000 / (limit_up_price * 100)) # 不同时间的数据开始坐标 time_start_index_dict = {} # 数据时间分布 time_list = [] # 到当前时间累积的买1量 time_total_num_dict = {} for i in range(index + 1, end_index + 1): data = total_datas[i] time_ = data["val"]["time"] if time_ not in time_start_index_dict: # 记录每一秒的开始位置 time_start_index_dict[time_] = i # 记录时间分布 time_list.append(time_) # 上一段时间的总数 time_total_num_dict[time_] = total_num val = num_dict.get(i) if val is None: val = cls.__compute_num(code, data, total_datas[buy_single_begin_index]) total_num += val # 如果是减小项,且在处理数据的范围内,就需要判断是否要撤单了 if val < 0 and start_index <= i <= end_index: # 累计封单金额小于1000万 if total_num < min_volumn: cancel_index = i cancel_msg = "封单金额小于1000万" break # 相邻2s内的数据减小50% # 上1s的总数 last_second_total_volumn = time_total_num_dict.get(time_list[-1]) if last_second_total_volumn > 0 and ( last_second_total_volumn - total_num) / last_second_total_volumn >= 0.5: # 相邻2s内的数据减小50% cancel_index = i cancel_msg = "相邻2s({})内的封单量减小50%({}->{})".format(time_, last_second_total_volumn, total_num) break if not with_cancel: cancel_index = None print("封单额计算时间:", round(t.time() * 1000) - start_time) process_end_index = end_index if cancel_index: process_end_index = cancel_index # 保存最新累计金额 # cls.__set_l2_latest_money_record(code, process_end_index, total_num) if cancel_index: return total_datas[cancel_index], cancel_msg return None, None def __get_time_second(time_str): ts = time_str.split(":") return int(ts[0]) * 3600 + int(ts[1]) * 60 + int(ts[2]) second_930 = 9 * 3600 + 30 * 60 + 0 # 是否是涨停价买 def __is_limit_up_price_buy(val): if int(val["limitPrice"]) != 1: return False if int(val["operateType"]) != 0: return False price = float(val["price"]) num = int(val["num"]) if price * num * 100 < 50 * 10000: return False return True def __is_limit_up_price_buy_cancel(val): if int(val["limitPrice"]) != 1: return False if int(val["operateType"]) != 1: return False price = float(val["price"]) num = int(val["num"]) if price * num * 100 < 50 * 10000: return False return True # 获取涨停买入起始点 def __get_limit_up_buy_start(code, data_count, __continue_count): # logger.info("__get_limit_up_buy_start:{},{},{}".format(code, data_count, __continue_count)) # 倒数100条数据查询 datas = local_today_datas[code] __len = len(datas) if __len < __continue_count: return None start_index = 0 if data_count > __len: data_count = __len if __len > data_count: start_index = __len - data_count __time = None _limit_up_count_1s = 0 _limit_up_count_1s_start_index = -1 for i in range(start_index, __len - (__continue_count - 1)): _val = datas[i]["val"] # 时间要>=09:30:00 if __get_time_second(_val["time"]) < second_930: continue # 有连续4个涨停买就标记计算起始点 if __is_limit_up_price_buy(_val): index_0 = i index_1 = -1 index_2 = -1 # index_3 = -1 for j in range(index_0 + 1, __len): # 涨停买 if __is_limit_up_price_buy(datas[j]["val"]): index_1 = j break if index_1 > 0: for j in range(index_1 + 1, __len): # 涨停买 if __is_limit_up_price_buy(datas[j]["val"]): index_2 = j break # if index_2 > 0: # for j in range(index_2 + 1, __len): # # 涨停买 # if datas[j]["val"]["limitPrice"] == 1 and datas[j]["val"]["operateType"] == 0: # index_3 = j if index_1 - index_0 == 1 and index_2 - index_1 == 1: # and index_3 - index_2 == 1 logger_l2_trade.info("找到物理连续涨停买 {},{},{}".format(code, i, datas[i])) return i # 同1s内有不连续的4个涨停买(如果遇买撤就重新计算,中间可间隔不涨停买)标记计算起始点 if __is_limit_up_price_buy(_val): # 涨停买 if __time is None: _time = datas[i]["val"]["time"] _limit_up_count_1s = 1 _limit_up_count_1s_start_index = i elif _time == _val["time"]: _limit_up_count_1s += 1 else: _time = datas[i]["val"]["time"] _limit_up_count_1s = 1 _limit_up_count_1s_start_index = i elif _val["operateType"] == 1: # 买撤 _time = None _limit_up_count_1s = 0 _limit_up_count_1s_start_index = -1 if _limit_up_count_1s >= 4 and _limit_up_count_1s_start_index > -1: logger_l2_trade.info("找到同一秒连续涨停买 {},{},{}".format(code, _limit_up_count_1s_start_index, datas[i])) return _limit_up_count_1s_start_index return None # 获取涨停撤销起始点 def __get_limit_up_buy_cancel_start(code, data_count, __continue_count): # logger.info("__get_limit_up_buy_cancel_start:{},{},{}".format(code, data_count, __continue_count)) # 倒数100条数据查询 datas = local_today_datas[code] __len = len(datas) if __len < __continue_count: return None start_index = 0 if data_count > __len: data_count = __len if __len > data_count: start_index = __len - data_count for i in range(start_index, __len - (__continue_count - 1)): _val = datas[i]["val"] if __get_time_second(_val["time"]) < second_930: continue # 有连续3个买撤 if __is_limit_up_price_buy_cancel(_val): index_0 = i index_1 = -1 index_2 = -1 for j in range(index_0 + 1, __len): # 涨停买 if __is_limit_up_price_buy_cancel(datas[j]["val"]): index_1 = j break if index_1 > 0: for j in range(index_1 + 1, __len): # 涨停买 if __is_limit_up_price_buy_cancel(datas[j]["val"]): index_2 = j break if index_1 - index_0 == 1 and index_2 - index_1 == 1: # logger_l2_trade.info("连续3个涨停买撤 {},{},{}".format(code, i, json.dumps(datas[i]))) return i return None # 是否有禁止交易特征 def __is_have_forbidden_feature(code, data_count, __continue_count): # 09:30:00出现连续撤销的数量大于一定的值 # 倒数100条数据查询 datas = local_today_datas[code] __len = len(datas) if __len < __continue_count: return None start_index = 0 if data_count > __len: data_count = __len if __len > data_count: start_index = __len - data_count cancel_start = -1 cancel_count = 0 for i in range(start_index, __len): _val = datas[i]["val"] if _val["time"] == "09:30:00" and i - 1 >= 0 and datas[i - 1]["val"]["time"] != "09:30:00": # 09:30第一条数据 if _val["operateType"] == 1: cancel_start = i else: return False elif cancel_start > -1: # 连续撤销 if _val["operateType"] == 1 and i - cancel_start == 1: cancel_start = i cancel_count += 1 if cancel_count >= __continue_count: return True else: return False return False # 设置最新的l2数据采集的数量 def __set_l2_data_latest_count(code, count): redis = _redisManager.getRedis() key = "latest-l2-count-{}".format(code) redis.setex(key, 2, count) pass # 获取代码最近的l2数据数量 def get_l2_data_latest_count(code): if code is None or len(code) < 1: return 0 redis = _redisManager.getRedis() key = "latest-l2-count-{}".format(code) result = redis.get(key) if result is None: return 0 else: return int(result) # 初始化l2固定代码库 def init_l2_fixed_codes(): key = "l2-fixed-codes" redis = _redisManager.getRedis() count = redis.scard(key) if count > 0: redis.delete(key) redis.sadd(key, "000000") redis.expire(key, tool.get_expire()) # 移除l2固定监控代码 def remove_from_l2_fixed_codes(code): key = "l2-fixed-codes" redis = _redisManager.getRedis() redis.srem(key, code) # 添加代码到L2固定监控 def add_to_l2_fixed_codes(code): key = "l2-fixed-codes" redis = _redisManager.getRedis() redis.sadd(key, code) redis.expire(key, tool.get_expire()) # 是否在l2固定监控代码中 def is_in_l2_fixed_codes(code): key = "l2-fixed-codes" redis = _redisManager.getRedis() return redis.sismember(key, code) if __name__ == "__main__": # 处理数据 code = "002898" load_l2_data(code) L2LimitUpMoneyStatisticUtil.verify_num(code, 70582, "09:42:00")