""" L2的数据处理 """ import decimal import json import logging import random import time as t from datetime import datetime import big_money_num_manager import code_data_util import constant import global_data_loader import global_util import industry_codes_sort import l2_data_log import l2_data_util import gpcode_manager import l2_trade_factor import log import redis_manager import ths_industry_util import tool import trade_manager from log import logger_l2_trade, logger_l2_trade_cancel, logger_l2_trade_buy, logger_l2_process, logger_l2_data import trade_data_manager import limit_up_time_manager _redisManager = redis_manager.RedisManager(1) # l2数据管理 # 本地最新一次上传的数据 local_latest_datas = {} # 本地今日数据 local_today_datas = {} # 本地手数+操作那类型组成的临时变量 # 用于加快数据处理,用空换时间 local_today_num_operate_map = {} class L2DataException(Exception): # 价格不匹配 CODE_PRICE_ERROR = 1 # 无收盘价 CODE_NO_CLOSE_PRICE = 2 def __init__(self, code, msg): super().__init__(self) self.code = code self.msg = msg def __str__(self): return self.msg def get_code(self): return self.code # 交易点管理器,用于管理买入点;买撤点;距离买入点的净买入数据;距离买撤点的买撤数据 class TradePointManager: @staticmethod def __get_redis(): return _redisManager.getRedis() # 删除买入点数据 @staticmethod def delete_buy_point(code): redis = TradePointManager.__get_redis() redis.delete("buy_compute_index_info-{}".format(code)) # 获取买入点信息 # 返回数据为:买入点 累计纯买额 已经计算的数据索引 @staticmethod def get_buy_compute_start_data(code): redis = TradePointManager.__get_redis() _key = "buy_compute_index_info-{}".format(code) _data_json = redis.get(_key) if _data_json is None: return None, None, None, 0, 0, [] _data = json.loads(_data_json) return _data[0], _data[1], _data[2], _data[3], _data[4], _data[5] # 设置买入点的值 # buy_single_index 买入信号位 # buy_exec_index 买入执行位 # compute_index 计算位置 # nums 累计纯买额 @staticmethod def set_buy_compute_start_data(code, buy_single_index, buy_exec_index, compute_index, nums, count, max_num_sets): redis = TradePointManager.__get_redis() expire = tool.get_expire() _key = "buy_compute_index_info-{}".format(code) if buy_single_index is not None: redis.setex(_key, expire, json.dumps((buy_single_index, buy_exec_index, compute_index, nums, count, list(max_num_sets)))) else: _buy_single_index, _buy_exec_index, _compute_index, _nums, _count, _max_num_index = TradePointManager.get_buy_compute_start_data( code) redis.setex(_key, expire, json.dumps((_buy_single_index, buy_exec_index, compute_index, nums, count, list(max_num_sets)))) # 获取撤买入开始计算的信息 # 返回数据的内容为:撤销点索引 撤买纯买额 计算的数据索引 @staticmethod def get_buy_cancel_single_pos(code): redis = TradePointManager.__get_redis() info = redis.get("buy_cancel_single_pos-{}".format(code)) if info is None: return None else: return int(info) # 设置买撤点信息 # buy_num 纯买额 computed_index计算到的下标 index撤买信号起点 @classmethod def set_buy_cancel_single_pos(cls, code, index): redis = TradePointManager.__get_redis() expire = tool.get_expire() redis.setex("buy_cancel_single_pos-{}".format(code), expire, index) # 删除买撤点数据 @classmethod def delete_buy_cancel_point(cls, code): redis = TradePointManager.__get_redis() redis.delete("buy_cancel_single_pos-{}".format(code)) # 设置买撤纯买额 @classmethod def set_compute_info_for_cancel_buy(cls, code, index, nums): redis = TradePointManager.__get_redis() expire = tool.get_expire() redis.setex("compute_info_for_cancel_buy-{}".format(code), expire, json.dumps((index, nums))) logger_l2_trade_buy.info("{}保存撤单纯买额信息:{},{}", code, index, nums) # 获取买撤纯买额计算信息 @classmethod def get_compute_info_for_cancel_buy(cls, code): redis = TradePointManager.__get_redis() info = redis.get("compute_info_for_cancel_buy-{}".format(code)) if info is None: return None, 0 else: info = json.loads(info) return info[0], info[1] @classmethod def delete_compute_info_for_cancel_buy(cls, code): redis = TradePointManager.__get_redis() redis.delete("compute_info_for_cancel_buy-{}".format(code)) # 从买入信号开始设置涨停买与涨停撤的单数 @classmethod def set_count_info_for_cancel_buy(cls, code, index, buy_count, cancel_count): redis = TradePointManager.__get_redis() expire = tool.get_expire() redis.setex("count_info_for_cancel_buy-{}".format(code), expire, json.dumps((index, buy_count, cancel_count))) logger_l2_trade_buy.info("{}保存撤单纯买额信息:{},{}", code, index, buy_count, cancel_count) # 获取买撤纯买额计算信息 @classmethod def get_count_info_for_cancel_buy(cls, code): redis = TradePointManager.__get_redis() info = redis.get("count_info_for_cancel_buy-{}".format(code)) if info is None: return None, 0, 0 else: info = json.loads(info) return info[0], info[1], info[2] @classmethod def delete_count_info_for_cancel_buy(cls, code): redis = TradePointManager.__get_redis() redis.delete("count_info_for_cancel_buy-{}".format(code)) def load_l2_data(code, force=False): redis = _redisManager.getRedis() # 加载最近的l2数据 if local_latest_datas.get(code) is None or force: # 获取最近的数据 _data = redis.get("l2-data-latest-{}".format(code)) if _data is not None: if code in local_latest_datas: local_latest_datas[code] = json.loads(_data) else: local_latest_datas.setdefault(code, json.loads(_data)) # 获取今日的数据 if local_today_datas.get(code) is None or force: datas = log.load_l2_from_log() datas = datas.get(code) if datas is None: datas = [] local_today_datas[code] = datas # 从数据库加载 # datas = [] # keys = redis.keys("l2-{}-*".format(code)) # for k in keys: # value = redis.get(k) # _data = l2_data_util.l2_data_key_2_obj(k, value) # datas.append(_data) # # 排序 # new_datas = sorted(datas, # key=lambda e: (int(e.__getitem__('val')["time"].replace(":", "")), e.__getitem__('index'))) # local_today_datas[code] = new_datas # 根据今日数据加载 l2_data_util.load_num_operate_map(local_today_num_operate_map, code, local_today_datas.get(code), force) @tool.async_call def saveL2Data(code, datas, msg=""): start_time = round(t.time() * 1000) # 查询票是否在待监听的票里面 if not gpcode_manager.is_in_gp_pool(code): return None # 验证股价的正确性 redis_instance = _redisManager.getRedis() try: if redis_instance.setnx("l2-save-{}".format(code), "1") > 0: # 计算保留的时间 expire = tool.get_expire() i = 0 for _data in datas: i += 1 key = "l2-" + _data["key"] value = redis_instance.get(key) if value is None: # 新增 try: value = {"index": _data["index"], "re": _data["re"]} redis_instance.setex(key, expire, json.dumps(value)) except: logging.error("更正L2数据出错:{} key:{}".format(code, key)) else: json_value = json.loads(value) if json_value["re"] != _data["re"]: json_value["re"] = _data["re"] redis_instance.setex(key, expire, json.dumps(json_value)) finally: redis_instance.delete("l2-save-{}".format(code)) print("保存新数据用时:", msg, "耗时:{}".format(round(t.time() * 1000) - start_time)) return datas def parseL2Data(str): day = datetime.now().strftime("%Y%m%d") dict = json.loads(str) data = dict["data"] client = dict["client"] code = data["code"] channel = data["channel"] capture_time = data["captureTime"] process_time = data["processTime"] data = data["data"] limit_up_price = gpcode_manager.get_limit_up_price(code) datas = L2DataUtil.format_l2_data(data, code, limit_up_price) # 获取涨停价 return day, client, channel, code, capture_time, process_time, datas, data # 保存l2数据 def save_l2_data(code, datas, add_datas): redis = _redisManager.getRedis() # 只有有新曾数据才需要保存 if len(add_datas) > 0: # 保存最近的数据 __start_time = round(t.time() * 1000) redis.setex("l2-data-latest-{}".format(code), tool.get_expire(), json.dumps(datas)) l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time, "保存最近l2数据用时") # 设置进内存 local_latest_datas[code] = datas __set_l2_data_latest_count(code, len(datas)) try: logger_l2_data.info("{}-{}", code, add_datas) except Exception as e: logging.exception(e) saveL2Data(code, add_datas) # 清除l2数据 def clear_l2_data(code): redis_l2 = redis_manager.RedisManager(1).getRedis() keys = redis_l2.keys("l2-{}-*".format(code)) for k in keys: redis_l2.delete(k) redis_l2.delete("l2-data-latest-{}".format(code)) class L2DataUtil: @classmethod def is_same_time(cls, time1, time2): if constant.TEST: return True time1_s = time1.split(":") time1_second = int(time1_s[0]) * 3600 + int(time1_s[1]) * 60 + int(time1_s[2]) time2_s = time2.split(":") time2_second = int(time2_s[0]) * 3600 + int(time2_s[1]) * 60 + int(time2_s[2]) if abs(time2_second - time1_second) < 3: return True else: return False # 获取增量数据 @classmethod def get_add_data(cls, code, datas, _start_index): if datas is not None and len(datas) < 1: return [] last_data = None latest_datas_ = local_latest_datas.get(code) if latest_datas_ is not None and len(latest_datas_) > 0: last_data = latest_datas_[-1] count = 0 start_index = -1 # 如果原来没有数据 # 设置add_data的序号 for n in reversed(datas): count += 1 if n["key"] == (last_data["key"] if last_data is not None else ""): start_index = len(datas) - count break _add_datas = [] if last_data is not None: if start_index < 0: if L2DataUtil.get_time_as_second(datas[0]["val"]["time"]) >= L2DataUtil.get_time_as_second( last_data["val"]["time"]): _add_datas = datas else: _add_datas = [] elif start_index + 1 >= len(datas): _add_datas = [] else: _add_datas = datas[start_index + 1:] else: _add_datas = datas[start_index + 1:] for i in range(0, len(_add_datas)): _add_datas[i]["index"] = _start_index + i return _add_datas # 纠正数据,将re字段替换为较大值 @classmethod def correct_data(cls, code, _datas): latest_data = local_latest_datas.get(code) if latest_data is None: latest_data = [] save_list = [] for data in _datas: for _ldata in latest_data: # 新数据条数比旧数据多才保存 if _ldata["key"] == data["key"] and _ldata["re"] < data["re"]: max_re = max(_ldata["re"], data["re"]) _ldata["re"] = max_re data["re"] = max_re # 保存到数据库,更新re的数据 save_list.append(_ldata) if len(save_list) > 0: saveL2Data(code, save_list, "保存纠正数据") local_latest_datas[code] = latest_data return _datas # 处理l2数据 @classmethod def format_l2_data(cls, data, code, limit_up_price): datas = [] dataIndexs = {} same_time_num = {} for item in data: # 解析数据 time = item["time"] if time in same_time_num: same_time_num[time] = same_time_num[time] + 1 else: same_time_num[time] = 1 price = float(item["price"]) num = item["num"] limitPrice = item["limitPrice"] # 涨停价 if limit_up_price is not None: if limit_up_price == tool.to_price(decimal.Decimal(price)): limitPrice = 1 else: limitPrice = 0 item["limitPrice"] = "{}".format(limitPrice) operateType = item["operateType"] # 不需要非涨停买与买撤 if int(item["limitPrice"]) != 1 and (int(operateType) == 0 or int(operateType) == 1): continue cancelTime = item["cancelTime"] cancelTimeUnit = item["cancelTimeUnit"] key = "{}-{}-{}-{}-{}-{}-{}-{}".format(code, operateType, time, num, price, limitPrice, cancelTime, cancelTimeUnit) if key in dataIndexs: # 数据重复次数+1 datas[dataIndexs[key]]['re'] = datas[dataIndexs[key]]['re'] + 1 else: # 数据重复次数默认为1 datas.append({"key": key, "val": item, "re": 1}) dataIndexs.setdefault(key, len(datas) - 1) # TODO 测试的时候开启,方便记录大单数据 # l2_data_util.save_big_data(code, same_time_num, data) return datas @classmethod def get_time_as_second(cls, time_str): ts = time_str.split(":") return int(ts[0]) * 3600 + int(ts[1]) * 60 + int(ts[2]) # @classmethod # def get_time_as_str(cls, time_seconds): # ts = time_str.split(":") # return int(ts[0]) * 3600 + int(ts[1]) * 60 + int(ts[2]) # 是否是涨停价买 @classmethod def is_limit_up_price_buy(cls, val): if int(val["limitPrice"]) != 1: return False if int(val["operateType"]) != 0: return False price = float(val["price"]) num = int(val["num"]) if price * num * 100 < 50 * 10000: return False return True # 是否为涨停卖 @classmethod def is_limit_up_price_sell(cls, val): if int(val["limitPrice"]) != 1: return False if int(val["operateType"]) != 2: return False price = float(val["price"]) num = int(val["num"]) if price * num * 100 < 50 * 10000: return False return True # 是否涨停买撤 @classmethod def is_limit_up_price_buy_cancel(cls, val): if int(val["limitPrice"]) != 1: return False if int(val["operateType"]) != 1: return False price = float(val["price"]) num = int(val["num"]) if price * num * 100 < 50 * 10000: return False return True # 是否卖撤 @classmethod def is_sell_cancel(cls, val): if int(val["operateType"]) == 3: return True return False # 是否为卖 @classmethod def is_sell(cls, val): if int(val["operateType"]) == 2: return True return False # 连续涨停买单数最大值管理器 class L2ContinueLimitUpCountManager: @classmethod def del_data(cls, code): cls.__del_last_record(code) cls.__del_max(code) # 获取最大值 @classmethod def __get_max(cls, code): key = "max_same_time_buy_count-{}".format(code) redis = _redisManager.getRedis() val = redis.get(key) if val is not None: return int(val) else: return None # 保存最大值 @classmethod def __save_max(cls, code, max_num): key = "max_same_time_buy_count-{}".format(code) redis = _redisManager.getRedis() redis.setex(key, tool.get_expire(), max_num) @classmethod def __del_max(cls, code): key = "max_same_time_buy_count-{}".format(code) redis = _redisManager.getRedis() redis.delete(key) # 保存上一条数据最大值 @classmethod def __save_last_record(cls, code, _time, count, index): key = "same_time_buy_last_count-{}".format(code) redis = _redisManager.getRedis() redis.setex(key, tool.get_expire(), json.dumps((_time, count, index))) @classmethod def __del_last_record(cls, code): key = "same_time_buy_last_count-{}".format(code) redis = _redisManager.getRedis() redis.delete(key) @classmethod def __get_last_record(cls, code): key = "same_time_buy_last_count-{}".format(code) redis = _redisManager.getRedis() val = redis.get(key) if val is None: return None, None, None else: val = json.loads(val) return val[0], val[1], val[2] @classmethod def process(cls, code, start_index, end_index): last_time, last_count, last_index = cls.__get_last_record(code) total_datas = local_today_datas[code] time_count_dict = {} for index in range(start_index, end_index + 1): if last_index is not None and last_index >= index: continue if L2DataUtil.is_limit_up_price_buy(total_datas[index]["val"]): if last_count is None: last_count = 0 last_time = total_datas[index]["val"]["time"] last_index = index if last_time == total_datas[index]["val"]["time"]: last_count += total_datas[index]["re"] last_index = index else: if last_count is not None and last_count > 0: time_count_dict[last_time] = last_count last_count = total_datas[index]["re"] last_time = total_datas[index]["val"]["time"] last_index = index else: if last_count is not None and last_count > 0: time_count_dict[last_time] = last_count last_count = 0 last_time = None last_index = None if last_count is not None and last_count > 0: time_count_dict[last_time] = last_count # 保存latest cls.__save_last_record(code, last_time, last_count, last_index) else: # 移除 cls.__del_last_record(code) # 查找这批数据中的最大数量 max_time = None max_num = None for key in time_count_dict: if max_time is None: max_time = key max_num = time_count_dict[key] if time_count_dict[key] > max_num: max_num = time_count_dict[key] max_time = key if max_num is not None: old_max = cls.__get_max(code) if old_max is None or max_num > old_max: cls.__save_max(code, max_num) @classmethod def get_continue_count(cls, code): count = cls.__get_max(code) if count is None: count = 0 count = count // 3 if count < 15: count = 15 return count # 大单处理器 class L2BigNumProcessor: # 是否需要根据大单撤单,返回是否需要撤单与撤单信号的数据 @classmethod def __need_cancel_with_max_num(cls, code, max_num_info, start_index, end_index): if max_num_info is None: return False, None # 如果是买入单,需要看他前面同一秒是否有撤单 if int(max_num_info["val"]["operateType"]) == 0: # 只有买撤信号在买入信号之前的同一秒的单才会撤单情况 _map = local_today_num_operate_map.get(code) if _map is not None: cancel_datas = _map.get( "{}-{}-{}".format(max_num_info["val"]["num"], "1", max_num_info["val"]["price"])) if cancel_datas is not None: for cancel_data in cancel_datas: # 只能在当前规定的数据范围查找,以防出现重复查找 if cancel_data["index"] < start_index or cancel_data["index"] > end_index: continue if cancel_data["index"] > max_num_info["index"]: buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(cancel_data, local_today_num_operate_map[ code]) if buy_index is None: continue if buy_data["val"]["time"] != max_num_info["val"]["time"]: continue min_space, max_space = l2_data_util.compute_time_space_as_second( cancel_data["val"]["cancelTime"], cancel_data["val"][ "cancelTimeUnit"]) if min_space < 60: L2TradeDataProcessor.cancel_debug(code, "找到大单撤单,但撤单间隔时间小于60s,撤单数据-{}", json.dumps(cancel_data)) return True, cancel_data else: # 如果间隔时间大于等于60s,这判断小群撤事件 L2TradeDataProcessor.cancel_debug(code, "找到大单撤单,但撤单间隔时间大于60s,撤单数据-{}", json.dumps(cancel_data)) return False, cancel_data return False, None else: return True, None # 计算数量最大的涨停买/涨停撤 @classmethod def __compute_max_num(cls, code, start_index, end_index, max_num_info, buy_exec_time): new_max_info = max_num_info max_num = 0 if max_num_info is not None: max_num = int(max_num_info["val"]["num"]) # 计算大单 total_data = local_today_datas[code] for i in range(start_index, end_index + 1): data = total_data[i] val = data["val"] if not L2DataUtil.is_limit_up_price_buy(val) and not L2DataUtil.is_limit_up_price_buy_cancel( val): continue # 下单时间与买入执行时间之差大于60s的不做处理 if l2_data_util.get_time_as_seconds(val["time"]) - l2_data_util.get_time_as_seconds(buy_exec_time) > 1: continue if L2DataUtil.is_limit_up_price_buy(val): pass elif L2DataUtil.is_limit_up_price_buy_cancel(val): min_space, max_space = l2_data_util.compute_time_space_as_second(val["cancelTime"], val["cancelTimeUnit"]) # 只能处理1s内的撤单 if min_space > 1: continue # buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data, # local_today_num_operate_map.get(code)) # if buy_index is None: # continue # if l2_data_util.get_time_as_seconds(buy_data["val"]["time"]) - l2_data_util.get_time_as_seconds( # buy_exec_time) > 1: # continue num = int(total_data[i]["val"]["num"]) if num > max_num: max_num = num new_max_info = data return new_max_info @classmethod def __save_big_num_pos(cls, code, index): redis = _redisManager.getRedis() redis.setex("big_num_pos-{}".format(code), tool.get_expire(), index) @classmethod def __get_big_num_pos(cls, code): redis = _redisManager.getRedis() index = redis.get("big_num_pos-{}".format(code)) if index is not None: return int(index) return index @classmethod def del_big_num_pos(cls, code): redis = _redisManager.getRedis() redis.delete("big_num_pos-{}".format(code)) @classmethod def __cancel_buy(cls, code, index): L2TradeDataProcessor.debug(code, "撤买,触发位置-{},触发条件:大单,数据:{}", index, local_today_datas[code][index]) L2TradeDataProcessor.cancel_buy(code) # 处理数据中的大单,返回是否已经撤单和撤单数据的时间 @classmethod def process_cancel_with_big_num(cls, code, start_index, end_index): total_data = local_today_datas[code] # 如果无下单信号就无需处理 buy_single_index, buy_exec_index, compute_index, nums, max_num_index = TradePointManager.get_buy_compute_start_data( code) if buy_single_index is None or buy_exec_index is None or buy_exec_index < 0: return False, None # 判断是否有大单记录 index = cls.__get_big_num_pos(code) # 无大单记录 if index is None: # 计算大单 new_max_info = cls.__compute_max_num(code, start_index, end_index, None, total_data[buy_exec_index]["val"]["time"]) if new_max_info is None: return False, None L2TradeDataProcessor.debug(code, "获取到大单位置信息:{}", json.dumps(new_max_info)) index = new_max_info["index"] # 大单是否有撤单信号 need_cancel, cancel_data = cls.__need_cancel_with_max_num(code, new_max_info, start_index, end_index) if need_cancel: # 需要撤单 # 撤单 L2TradeDataProcessor.cancel_debug(code, "新找到大单-{},需要撤买", new_max_info["index"]) cls.__cancel_buy(code, new_max_info["index"]) return True, cancel_data, else: # 无需撤单 # 保存大单记录 cls.__save_big_num_pos(code, index) return False, None else: # 有大单记录 need_cancel = False cancel_index = -1 need_cancel, cancel_data = cls.__need_cancel_with_max_num(code, total_data[index], start_index, end_index) # 需要撤单 if need_cancel: # 撤单 cls.__cancel_buy(code, cancel_index) return True, cancel_data # 无需撤单 else: # 计算新的大单 max_num_data = cls.__compute_max_num(code, start_index, end_index, total_data[index], total_data[buy_exec_index]["val"]["time"]) if index == int(max_num_data["index"]): return False, cancel_data L2TradeDataProcessor.debug(code, "找到大单位置信息:{}", json.dumps(max_num_data)) # 大单是否有撤单信号 need_cancel, cancel_data = cls.__need_cancel_with_max_num(code, max_num_data, max_num_data["index"], end_index) if need_cancel: # 需要撤单 # 撤单 cls.__cancel_buy(code, max_num_data["index"] if cancel_data is None else cancel_data) L2TradeDataProcessor.cancel_debug(code, "原来跟踪到大单无撤买信号-{},新跟踪的大单需要撤买-{}", index, max_num_data["index"]) return True, cancel_data else: # 无需撤单 # 保存大单记录 cls.__save_big_num_pos(code, max_num_data["index"]) return False, cancel_data @classmethod def test(cls): code = "000036" load_l2_data(code, True) new_max_info = cls.__compute_max_num(code, 470, 476, None, "09:32:59") print(new_max_info) # 大群撤大单跟踪 class L2BetchCancelBigNumProcessor: @classmethod def __get_recod(cls, code): redis = _redisManager.getRedis() _val = redis.get("betch_cancel_big_num-{}".format(code)) if _val is None: return None, None else: datas = json.loads(_val) return datas[0], datas[1] @classmethod def del_recod(cls, code): redis = _redisManager.getRedis() key = "betch_cancel_big_num-{}".format(code) redis.delete(key) @classmethod def __save_recod(cls, code, max_big_num_info, big_nums_info): redis = _redisManager.getRedis() key = "betch_cancel_big_num-{}".format(code) redis.setex(key, tool.get_expire(), json.dumps((max_big_num_info, big_nums_info))) # 暂时弃用 @classmethod def need_cancel(cls, code, start_index, end_index): # 是否需要撤单 max_big_num_info, big_nums_info = cls.__get_recod(code) if big_nums_info is None: # 无大单信息 return True nums_set = set() index_set = set() for d in big_nums_info: nums_set.add(d[0]) index_set.add(d[1]) total_datas = local_today_datas[code] count = 0 latest_buy_index = end_index for index in range(start_index, end_index + 1): if not nums_set.__contains__(total_datas[index]["val"]["num"]): continue buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(total_datas[index], local_today_num_operate_map[code]) if buy_index is None: continue if index_set.__contains__(buy_index): count += buy_data["re"] latest_buy_index = buy_index # 获取大单数量 total_count = 0 for i in index_set: if i <= latest_buy_index: total_count += total_datas[i]["re"] L2TradeDataProcessor.debug(code, "大群撤大单数量:{}/{}", count, total_count) # 大单小于5笔无脑撤,后修改为无大单无脑撤 if total_count <= 0: return True # 大单撤单笔数大于总大单笔数的1/5就撤单 if count / total_count >= 0.2: return True else: return False pass # def need_cancel(cls, code, start_index, end_index): # total_datas = local_today_datas[code] # for index in range(start_index,end_index+1): # price = total_datas[index]["val"]["price"] # num = total_datas[index]["val"]["num"] # if total_datas[index] # 过时 @classmethod def process(cls, code, start_index, end_index): # 处理大单 # 获取大单列表,大单格式为:((num,index,re),[(num,index,re),(num,index,re)]) total_datas = local_today_datas[code] max_big_num_info, big_nums_info = cls.__get_recod(code) # 寻找最大值 for index in range(start_index, end_index + 1): # 只处理涨停买与涨停买撤 if not L2DataUtil.is_limit_up_price_buy( total_datas[index]["val"]): continue if max_big_num_info is None: max_big_num_info = ( int(total_datas[start_index]["val"]["num"]), total_datas[start_index]["index"]) if int(total_datas[index]["val"]["num"]) > max_big_num_info[0]: max_big_num_info = ( int(total_datas[index]["val"]["num"]), total_datas[index]["index"]) # 将大于最大值90%的数据加入 if max_big_num_info is not None: min_num = round(max_big_num_info[0] * 0.9) for index in range(start_index, end_index + 1): # 只统计涨停买 if not L2DataUtil.is_limit_up_price_buy( total_datas[index]["val"]): continue if int(total_datas[index]["val"]["num"]) >= min_num: if big_nums_info is None: big_nums_info = [] big_nums_info.append((int(total_datas[index]["val"]["num"]), total_datas[index]["index"])) # 移除小于90%的数据 big_nums_info_new = [] index_set = set() for d in big_nums_info: if d[0] >= min_num: if not index_set.__contains__(d[1]): index_set.add(d[1]) big_nums_info_new.append(d) cls.__save_recod(code, max_big_num_info, big_nums_info_new) # 最新方法 @classmethod def process_new(cls, code, start_index, end_index): # 处理大单 # 获取大单列表,大单格式为:((num,index,re),[(num,index,re),(num,index,re)]) total_datas = local_today_datas[code] max_big_num_info, big_nums_info = cls.__get_recod(code) # 大于等于8000手或者金额>=300万就是大单 for index in range(start_index, end_index + 1): # 只统计涨停买 if not L2DataUtil.is_limit_up_price_buy( total_datas[index]["val"]): continue # 大于等于8000手或者金额 >= 300 # 万就是大单 if int(total_datas[index]["val"]["num"]) >= 8000 or int(total_datas[index]["val"]["num"]) * float( total_datas[index]["val"]["price"]) >= 30000: if big_nums_info is None: big_nums_info = [] big_nums_info.append((int(total_datas[index]["val"]["num"]), total_datas[index]["index"])) # 移除小于90%的数据 big_nums_info_new = [] index_set = set() if big_nums_info is not None: for d in big_nums_info: if not index_set.__contains__(d[1]): index_set.add(d[1]) big_nums_info_new.append(d) cls.__save_recod(code, max_big_num_info, big_nums_info_new) # 卖跟踪 class L2SellProcessor: @classmethod def __get_recod(cls, code): redis = _redisManager.getRedis() _val = redis.get("sell_num-{}".format(code)) if _val is None: return None, None else: datas = json.loads(_val) return datas[0], datas[1] @classmethod def del_recod(cls, code): redis = _redisManager.getRedis() key = "sell_num-{}".format(code) redis.delete(key) @classmethod def __save_recod(cls, code, process_index, count): redis = _redisManager.getRedis() key = "sell_num-{}".format(code) redis.setex(key, tool.get_expire(), json.dumps((process_index, count))) # 暂时弃用 @classmethod def need_cancel(cls, code, start_index, end_index): # 是否需要撤单 process_index, count = cls.__get_recod(code) if process_index is None: # 无卖的信息 return False if count is None: count = 0 limit_up_price = gpcode_manager.get_limit_up_price(code) if limit_up_price is None: return False if float(limit_up_price) * count * 100 >= l2_trade_factor.L2TradeFactorUtil.get_base_safe_val( global_util.zyltgb_map[code]): return True return False @classmethod def process(cls, code, start_index, end_index): # 处理大单 # 获取大单列表,大单格式为:((num,index,re),[(num,index,re),(num,index,re)]) total_datas = local_today_datas[code] process_index, count = cls.__get_recod(code) # 寻找最大值 for index in range(start_index, end_index + 1): # 只处理涨停卖 if not L2DataUtil.is_limit_up_price_sell( total_datas[index]["val"]): continue # 不处理历史数据 if process_index is not None and process_index >= index: continue if count is None: count = 0 count += int(total_datas[index]["val"]["num"]) if process_index is None: process_index = end_index cls.__save_recod(code, process_index, count) def __get_time_second(time_str): ts = time_str.split(":") return int(ts[0]) * 3600 + int(ts[1]) * 60 + int(ts[2]) second_930 = 9 * 3600 + 30 * 60 + 0 # 是否是涨停价买 def __is_limit_up_price_buy(val): if int(val["limitPrice"]) != 1: return False if int(val["operateType"]) != 0: return False price = float(val["price"]) num = int(val["num"]) if price * num * 100 < 50 * 10000: return False return True def __is_limit_up_price_buy_cancel(val): if int(val["limitPrice"]) != 1: return False if int(val["operateType"]) != 1: return False price = float(val["price"]) num = int(val["num"]) if price * num * 100 < 50 * 10000: return False return True # 获取涨停买入起始点 def __get_limit_up_buy_start(code, data_count, __continue_count): # logger.info("__get_limit_up_buy_start:{},{},{}".format(code, data_count, __continue_count)) # 倒数100条数据查询 datas = local_today_datas[code] __len = len(datas) if __len < __continue_count: return None start_index = 0 if data_count > __len: data_count = __len if __len > data_count: start_index = __len - data_count __time = None _limit_up_count_1s = 0 _limit_up_count_1s_start_index = -1 for i in range(start_index, __len - (__continue_count - 1)): _val = datas[i]["val"] # 时间要>=09:30:00 if __get_time_second(_val["time"]) < second_930: continue # 有连续4个涨停买就标记计算起始点 if __is_limit_up_price_buy(_val): index_0 = i index_1 = -1 index_2 = -1 # index_3 = -1 for j in range(index_0 + 1, __len): # 涨停买 if __is_limit_up_price_buy(datas[j]["val"]): index_1 = j break if index_1 > 0: for j in range(index_1 + 1, __len): # 涨停买 if __is_limit_up_price_buy(datas[j]["val"]): index_2 = j break # if index_2 > 0: # for j in range(index_2 + 1, __len): # # 涨停买 # if datas[j]["val"]["limitPrice"] == 1 and datas[j]["val"]["operateType"] == 0: # index_3 = j if index_1 - index_0 == 1 and index_2 - index_1 == 1: # and index_3 - index_2 == 1 logger_l2_trade.info("找到物理连续涨停买 {},{},{}".format(code, i, datas[i])) return i # 同1s内有不连续的4个涨停买(如果遇买撤就重新计算,中间可间隔不涨停买)标记计算起始点 if __is_limit_up_price_buy(_val): # 涨停买 if __time is None: _time = datas[i]["val"]["time"] _limit_up_count_1s = 1 _limit_up_count_1s_start_index = i elif _time == _val["time"]: _limit_up_count_1s += 1 else: _time = datas[i]["val"]["time"] _limit_up_count_1s = 1 _limit_up_count_1s_start_index = i elif _val["operateType"] == 1: # 买撤 _time = None _limit_up_count_1s = 0 _limit_up_count_1s_start_index = -1 if _limit_up_count_1s >= 4 and _limit_up_count_1s_start_index > -1: logger_l2_trade.info("找到同一秒连续涨停买 {},{},{}".format(code, _limit_up_count_1s_start_index, datas[i])) return _limit_up_count_1s_start_index return None # 获取涨停撤销起始点 def __get_limit_up_buy_cancel_start(code, data_count, __continue_count): # logger.info("__get_limit_up_buy_cancel_start:{},{},{}".format(code, data_count, __continue_count)) # 倒数100条数据查询 datas = local_today_datas[code] __len = len(datas) if __len < __continue_count: return None start_index = 0 if data_count > __len: data_count = __len if __len > data_count: start_index = __len - data_count for i in range(start_index, __len - (__continue_count - 1)): _val = datas[i]["val"] if __get_time_second(_val["time"]) < second_930: continue # 有连续3个买撤 if __is_limit_up_price_buy_cancel(_val): index_0 = i index_1 = -1 index_2 = -1 for j in range(index_0 + 1, __len): # 涨停买 if __is_limit_up_price_buy_cancel(datas[j]["val"]): index_1 = j break if index_1 > 0: for j in range(index_1 + 1, __len): # 涨停买 if __is_limit_up_price_buy_cancel(datas[j]["val"]): index_2 = j break if index_1 - index_0 == 1 and index_2 - index_1 == 1: # logger_l2_trade.info("连续3个涨停买撤 {},{},{}".format(code, i, json.dumps(datas[i]))) return i return None # 是否有禁止交易特征 def __is_have_forbidden_feature(code, data_count, __continue_count): # 09:30:00出现连续撤销的数量大于一定的值 # 倒数100条数据查询 datas = local_today_datas[code] __len = len(datas) if __len < __continue_count: return None start_index = 0 if data_count > __len: data_count = __len if __len > data_count: start_index = __len - data_count cancel_start = -1 cancel_count = 0 for i in range(start_index, __len): _val = datas[i]["val"] if _val["time"] == "09:30:00" and i - 1 >= 0 and datas[i - 1]["val"]["time"] != "09:30:00": # 09:30第一条数据 if _val["operateType"] == 1: cancel_start = i else: return False elif cancel_start > -1: # 连续撤销 if _val["operateType"] == 1 and i - cancel_start == 1: cancel_start = i cancel_count += 1 if cancel_count >= __continue_count: return True else: return False return False # 设置最新的l2数据采集的数量 def __set_l2_data_latest_count(code, count): redis = _redisManager.getRedis() key = "latest-l2-count-{}".format(code) redis.setex(key, 2, count) pass # 获取代码最近的l2数据数量 def get_l2_data_latest_count(code): if code is None or len(code) < 1: return 0 redis = _redisManager.getRedis() key = "latest-l2-count-{}".format(code) result = redis.get(key) if result is None: return 0 else: return int(result) # 初始化l2固定代码库 def init_l2_fixed_codes(): key = "l2-fixed-codes" redis = _redisManager.getRedis() count = redis.scard(key) if count > 0: redis.delete(key) redis.sadd(key, "000000") redis.expire(key, tool.get_expire()) # 移除l2固定监控代码 def remove_from_l2_fixed_codes(code): key = "l2-fixed-codes" redis = _redisManager.getRedis() redis.srem(key, code) # 添加代码到L2固定监控 def add_to_l2_fixed_codes(code): key = "l2-fixed-codes" redis = _redisManager.getRedis() redis.sadd(key, code) redis.expire(key, tool.get_expire()) # 是否在l2固定监控代码中 def is_in_l2_fixed_codes(code): key = "l2-fixed-codes" redis = _redisManager.getRedis() return redis.sismember(key, code) if __name__ == "__main__": clear_l2_data("603912")