import decimal import json import os import time as t from datetime import datetime import big_money_num_manager import data_process import global_util import l2_data_util import gpcode_manager import l2_trade_factor import redis_manager import tool import trade_manager from log import logger_l2_trade from trade_data_manager import TradeBuyDataManager _redisManager = redis_manager.RedisManager(1) # l2数据管理 # 本地最新一次上传的数据 local_latest_datas = {} # 本地今日数据 local_today_datas = {} # 本地手数+操作那类型组成的临时变量 # 用于加快数据处理,用空换时间 local_today_num_operate_map = {} class L2DataException(Exception): # 价格不匹配 CODE_PRICE_ERROR = 1 # 无收盘价 CODE_NO_CLOSE_PRICE = 2 def __init__(self, code, msg): super().__init__(self) self.code = code self.msg = msg def __str__(self): return self.msg def get_code(self): return self.code # 交易点管理器,用于管理买入点;买撤点;距离买入点的净买入数据;距离买撤点的买撤数据 class TradePointManager: @staticmethod def __get_redis(): return _redisManager.getRedis() # 删除买入点数据 @staticmethod def delete_buy_point(code): redis = TradePointManager.__get_redis() redis.delete("buy_compute_index_info-{}".format(code)) # 获取买入点信息 # 返回数据为:买入点 累计纯买额 已经计算的数据索引 @staticmethod def get_buy_compute_start_data(code): redis = TradePointManager.__get_redis() _key = "buy_compute_index_info-{}".format(code) _data_json = redis.get(_key) if _data_json is None: return None, 0, None _data = json.loads(_data_json) return _data[0], _data[1], _data[2] # 设置买入点的值 @staticmethod def set_buy_compute_start_data(code, nums, compute_index, buy_index): redis = TradePointManager.__get_redis() expire = tool.get_expire() _key = "buy_compute_index_info-{}".format(code) if buy_index is not None: redis.setex(_key, expire, json.dumps((buy_index, nums, compute_index))) else: _buy_index, _nums, _compute_index = TradePointManager.get_buy_compute_start_data(code) redis.setex(_key, expire, json.dumps((_buy_index, nums, compute_index))) # 获取撤买入开始计算的信息 # 返回数据的内容为:撤销点索引 撤买纯买额 计算的数据索引 @staticmethod def get_buy_cancel_compute_start_data(code): redis = TradePointManager.__get_redis() info = redis.get("buy_cancel_compute_info-{}".format(code)) if info is None: return None, None, None else: info = json.loads(info) return info[0], info[1], info[2] # 设置买撤点信息 # buy_num 纯买额 computed_index计算到的下标 index撤买信号起点 @classmethod def set_buy_cancel_compute_start_data(cls, code, buy_num, computed_index, index): redis = TradePointManager.__get_redis() expire = tool.get_expire() redis.setex("buy_cancel_compute_info-{}".format(code), expire, json.dumps((index, buy_num, computed_index))) # 增加撤买的纯买额 @classmethod def add_buy_nums_for_cancel(cls, code, num_add, computed_index): cancel_index, nums, c_index = cls.get_buy_cancel_compute_start_data(code) if cancel_index is None: raise Exception("无撤买信号记录") nums += num_add cls.set_buy_cancel_compute_start_data(code, nums, computed_index) # 删除买撤点数据 @staticmethod def delete_buy_cancel_point(code): redis = TradePointManager.__get_redis() redis.delete("buy_cancel_compute_info-{}".format(code)) def load_l2_data(code, force=False): redis = _redisManager.getRedis() # 加载最近的l2数据 if local_latest_datas.get(code) is None or force: # 获取最近的数据 _data = redis.get("l2-data-latest-{}".format(code)) if _data is not None: if code in local_latest_datas: local_latest_datas[code] = json.loads(_data) else: local_latest_datas.setdefault(code, json.loads(_data)) # 获取今日的数据 if local_today_datas.get(code) is None or force: datas = [] keys = redis.keys("l2-{}-*".format(code)) for k in keys: value = redis.get(k) _data = l2_data_util.l2_data_key_2_obj(k, value) datas.append(_data) # 排序 new_datas = sorted(datas, key=lambda e: (int(e.__getitem__('val')["time"].replace(":", "")), e.__getitem__('index'))) local_today_datas[code] = new_datas # 根据今日数据加载 l2_data_util.load_num_operate_map(local_today_num_operate_map, code, local_today_datas.get(code), force) def saveL2Data(code, datas, msg=""): start_time = round(t.time() * 1000) # 查询票是否在待监听的票里面 if not gpcode_manager.is_in_gp_pool(code): return None # 验证股价的正确性 redis_instance = _redisManager.getRedis() try: if redis_instance.setnx("l2-save-{}".format(code), "1") > 0: # 计算保留的时间 expire = tool.get_expire() start_index = redis_instance.get("l2-maxindex-{}".format(code)) if start_index is None: start_index = -1 else: start_index = int(start_index) max_index = start_index i = 0 for _data in datas: i += 1 key = "l2-" + _data["key"] value = redis_instance.get(key) if value is None: # 新增 max_index = start_index + i value = {"index": start_index + i, "re": _data["re"]} redis_instance.setex(key, expire, json.dumps(value)) else: json_value = json.loads(value) if json_value["re"] != _data["re"]: json_value["re"] = _data["re"] redis_instance.setex(key, expire, json.dumps(json_value)) redis_instance.setex("l2-maxindex-{}".format(code), expire, max_index) finally: redis_instance.delete("l2-save-{}".format(code)) print("保存新数据用时:", msg, round(t.time() * 1000) - start_time) return datas # TODO 获取l2的数据 def get_l2_data_index(code, key): pass def parseL2Data(str): day = datetime.now().strftime("%Y%m%d") dict = json.loads(str) data = dict["data"] client = dict["client"] code = data["code"] channel = data["channel"] capture_time = data["captureTime"] process_time = data["processTime"] data = data["data"] limit_up_price = gpcode_manager.get_limit_up_price(code) datas = L2DataUtil.format_l2_data(data, code, limit_up_price) # 获取涨停价 return day, client, channel, code, capture_time, process_time, datas # 保存l2数据 def save_l2_data(code, datas, add_datas): redis = _redisManager.getRedis() # 保存最近的数据 redis.setex("l2-data-latest-{}".format(code), tool.get_expire(), json.dumps(datas)) # 设置进内存 if code in local_latest_datas: local_latest_datas[code] = datas else: local_latest_datas.setdefault(code, datas) __set_l2_data_latest_count(code, len(datas)) if len(add_datas) > 0: saveL2Data(code, add_datas) class L2DataUtil: @classmethod def is_same_time(cls, time1, time2): # TODO 测试 # if 1 > 0: # return True time1_s = time1.split(":") time1_second = int(time1_s[0]) * 3600 + int(time1_s[1]) * 60 + int(time1_s[2]) time2_s = time2.split(":") time2_second = int(time2_s[0]) * 3600 + int(time2_s[1]) * 60 + int(time2_s[2]) if abs(time2_second - time1_second) < 3: return True else: return False # 获取增量数据 @classmethod def get_add_data(cls, code, datas, _start_index): if datas is not None and len(datas) < 1: return [] last_key = "" __latest_datas = local_latest_datas.get(code) if __latest_datas is not None and len(__latest_datas) > 0: last_key = __latest_datas[-1]["key"] count = 0 start_index = -1 # 如果原来没有数据 # TODO 设置add_data的序号 for n in reversed(datas): count += 1 if n["key"] == last_key: start_index = len(datas) - count break _add_datas = [] if len(last_key) > 0: if start_index < 0 or start_index + 1 >= len(datas): _add_datas = [] else: _add_datas = datas[start_index + 1:] else: _add_datas = datas[start_index + 1:] for i in range(0, len(_add_datas)): _add_datas[i]["index"] = _start_index + i return _add_datas # 纠正数据,将re字段替换为较大值 @classmethod def correct_data(cls, code, _datas): latest_data = local_latest_datas.get(code) if latest_data is None: latest_data = [] save_list = [] for data in _datas: for _ldata in latest_data: if _ldata["key"] == data["key"] and _ldata["re"] != data["re"]: max_re = max(_ldata["re"], data["re"]) _ldata["re"] = max_re data["re"] = max_re # 保存到数据库,更新re的数据 save_list.append(_ldata) if len(save_list) > 0: saveL2Data(code, save_list, "保存纠正数据") return _datas # 处理l2数据 @classmethod def format_l2_data(cls, data, code, limit_up_price): datas = [] dataIndexs = {} same_time_num = {} for item in data: # 解析数据 time = item["time"] if time in same_time_num: same_time_num[time] = same_time_num[time] + 1 else: same_time_num[time] = 1 price = float(item["price"]) num = item["num"] limitPrice = item["limitPrice"] # 涨停价 if limit_up_price is not None and limit_up_price == tool.to_price(decimal.Decimal(price)): limitPrice = 1 item["limitPrice"] = "{}".format(limitPrice) operateType = item["operateType"] cancelTime = item["cancelTime"] cancelTimeUnit = item["cancelTimeUnit"] key = "{}-{}-{}-{}-{}-{}-{}-{}".format(code, operateType, time, num, price, limitPrice, cancelTime, cancelTimeUnit) if key in dataIndexs: # 数据重复次数+1 datas[dataIndexs[key]]['re'] = datas[dataIndexs[key]]['re'] + 1 else: # 数据重复次数默认为1 datas.append({"key": key, "val": item, "re": 1}) dataIndexs.setdefault(key, len(datas) - 1) l2_data_util.save_big_data(code, same_time_num, data) return datas @classmethod def get_time_as_second(time_str): ts = time_str.split(":") return int(ts[0]) * 3600 + int(ts[1]) * 60 + int(ts[2]) # 是否是涨停价买 def is_limit_up_price_buy(val): if int(val["limitPrice"]) != 1: return False if int(val["operateType"]) != 0: return False price = float(val["price"]) num = int(val["num"]) if price * num * 100 < 50 * 10000: return False return True # 是否涨停买撤 def is_limit_up_price_buy_cancel(val): if int(val["limitPrice"]) != 1: return False if int(val["operateType"]) != 1: return False price = float(val["price"]) num = int(val["num"]) if price * num * 100 < 50 * 10000: return False return True @staticmethod def is_index_end(code, index): if index >= len(local_today_datas[code]) - 1: return True else: return False # L2交易数据处理器 class L2TradeDataProcessor: unreal_buy_dict = {} @classmethod # 数据处理入口 # datas: 本次截图数据 # capture_timestamp:截图时间戳 def process(cls, code, datas, capture_timestamp): now_time_str = datetime.now().strftime("%H:%M:%S") __start_time = round(t.time() * 1000) try: if len(datas) > 0: # 判断价格区间是否正确 if not data_process.is_same_code_with_price(code, float(datas[0]["val"]["price"])): raise L2DataException(L2DataException.CODE_PRICE_ERROR, "股价不匹配 code-{} price-{}".format(code, datas[0]["val"]["price"])) # 加载历史数据 load_l2_data(code) # 纠正数据 datas = L2DataUtil.correct_data(code, datas) _start_index = 0 if local_today_datas.get(code) is not None and len(local_today_datas[code]) > 0: _start_index = local_today_datas[code][-1]["index"] add_datas = L2DataUtil.get_add_data(code, datas, _start_index) if len(add_datas) > 0: # 拼接数据 local_today_datas[code].extend(add_datas) l2_data_util.load_num_operate_map(local_today_num_operate_map, code, add_datas) total_datas = local_today_datas[code] # 买入确认点处理 TradeBuyDataManager.process_buy_sure_position_info(code, capture_timestamp, total_datas, total_datas[-1], add_datas) if len(add_datas) > 0: # 计算大单数量 cls.__compute_big_money_data(code, add_datas) latest_time = add_datas[len(add_datas) - 1]["val"]["time"] # 时间差不能太大才能处理 if L2DataUtil.is_same_time(now_time_str, latest_time): # 判断是否已经挂单 state = trade_manager.get_trade_state(code) if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER: # 已挂单 cls.__process_order(code, len(total_datas) - len(add_datas) - 3) else: # 未挂单 cls.__process_not_order(code, add_datas, capture_timestamp) # 保存数据 save_l2_data(code, datas, add_datas) finally: if code in cls.unreal_buy_dict: cls.unreal_buy_dict.pop(code) @classmethod def __compute_big_money_data(cls, code, add_datas): # 计算大单 num = 0 for data in add_datas: if l2_trade_factor.L2TradeFactorSourceDataUtil.is_big_money(data): if int(data["val"]["operateType"]) == 0: num += data["re"] elif int(data["val"]["operateType"]) == 1: num -= data["re"] big_money_num_manager.add_num(code, num) # 处理未挂单 @classmethod def __process_not_order(cls, code, add_datas, capture_time): # 获取阈值 threshold_money = cls.__get_threshmoney(code) cls.__start_compute_buy(code, len(local_today_datas[code]) - len(add_datas), threshold_money, capture_time) # 处理已挂单 @classmethod def __process_order(cls, code, start_index, capture_time): if start_index < 0: start_index = 0 # 获取之前是否有记录的撤买信号 cancel_index, buy_num_for_cancel, computed_index = cls.__has_order_cancel_begin_pos(code) buy_index, buy_num = cls.__get_order_begin_pos(code) if cancel_index is None: # 无撤单信号起始点记录 cancel_index = cls.__compute_order_cancel_begin_single(code, start_index, 3) buy_num_for_cancel = 0 computed_index = buy_index if cancel_index is not None: # 获取阈值 有买撤信号,统计撤买纯买额 threshold_money = cls.__get_threshmoney(code) cls.__start_compute_cancel(code, cancel_index, computed_index, buy_num_for_cancel, threshold_money, capture_time) else: # 无买撤信号,终止执行 pass # 开始计算撤的信号 @classmethod def __start_compute_cancel(cls, code, cancel_index, compute_start_index, origin_num, threshold_money, capture_time): # sure_type 0-虚拟挂买位 1-真实挂买位 computed_index, buy_num_for_cancel, sure_type = cls.__sum_buy_num_for_cancel_order(code, compute_start_index, origin_num, threshold_money) total_datas = local_today_datas[code] if computed_index is not None: # 发出撤买信号,需要撤买 if cls.unreal_buy_dict.get(code) is not None: # 有虚拟下单 # 删除虚拟下单标记 cls.unreal_buy_dict.pop(code) # 删除下单标记位置 TradePointManager.delete_buy_point(code) else: # 无虚拟下单,需要执行撤单 logger_l2_trade.info( "执行撤销:{} - {}".format(code, json.dumps(total_datas[computed_index]))) try: trade_manager.start_cancel_buy(code) # 取消买入标识 TradePointManager.delete_buy_point(code) TradePointManager.delete_buy_cancel_point(code) except Exception as e: pass if computed_index < len(local_today_datas[code]) - 1: # 数据尚未处理完,重新进入下单计算流程 cls.__start_compute_buy(code, computed_index + 1, 0, threshold_money, capture_time) pass else: # 无需撤买,记录撤买信号 TradePointManager.set_buy_cancel_compute_start_data(code, buy_num_for_cancel, len(total_datas) - 1, cancel_index) # 判断是否有虚拟下单 unreal_buy_info = cls.unreal_buy_dict.get(code) if unreal_buy_info is not None: # unreal_buy_info 的内容格式为:(触法买操作下标,截图时间) # 真实下单 cls.__buy(code, unreal_buy_info[1], total_datas[unreal_buy_info[0]], unreal_buy_info[0]) pass else: # 终止执行 pass @classmethod def __buy(cls, code, capture_timestamp, last_data, last_data_index): logger_l2_trade.info( "执行买入:{} ".format(code)) try: trade_manager.start_buy(code, capture_timestamp, last_data, last_data_index) TradePointManager.delete_buy_cancel_point(code) except Exception as e: pass @classmethod def __start_compute_buy(cls, code, compute_start_index, threshold_money, capture_time): total_datas = local_today_datas[code] # 获取买入信号计算起始位置 index, num = cls.__get_order_begin_pos(code) # 是否为新获取到的位置 new_get_pos = False if index is None: # 有买入信号 has_single, _index = cls.__compute_order_begin_pos(code, len(total_datas) - compute_start_index, 3) index = _index if has_single: num = 0 new_get_pos = True if index is None: # 未获取到买入信号,终止程序 return None # 买入纯买额统计 compute_index, buy_nums = cls.sum_buy_num_for_order(code, compute_start_index, num, threshold_money) if compute_index is not None: # 记录买入信号位置 cls.__save_order_begin_data(code, compute_index, buy_nums, index) # 虚拟下单 cls.unreal_buy_dict[code] = (compute_index, capture_time) # 删除之前的所有撤单信号 TradePointManager.delete_buy_cancel_point(code) # 数据是否处理完毕 if L2DataUtil.is_index_end(code, compute_index): # 数据已经处理完毕,下单 cls.__buy(code, capture_time, total_datas[compute_index], compute_index) else: # 数据尚未处理完毕,进行下一步处理 cls.__process_order(code, compute_index + 1, capture_time) else: # 未达到下单条件,保存纯买额,设置纯买额 # 记录买入信号位置 cls.__save_order_begin_data(code, len(total_datas) - 1, buy_nums, index) pass # 获取下单起始信号 @classmethod def __get_order_begin_pos(cls, code): index, num, compute_index = TradePointManager.get_buy_compute_start_data(code) return index, num @classmethod def __save_order_begin_data(self, code, compute_index, num, buy_index=None): TradePointManager.set_buy_compute_start_data(code, num, compute_index, buy_index) # 获取撤单起始位置 @classmethod def __has_order_cancel_begin_pos(cls, code): # cancel_index:撤单信号起点 # buy_num_for_cancel:从挂入点计算的纯买额 # computed_index 计算的最后位置 cancel_index, buy_num_for_cancel, computed_index = TradePointManager.get_buy_cancel_compute_start_data(code) return cancel_index, buy_num_for_cancel, computed_index # 计算下单起始信号 # compute_data_count 用于计算的l2数据数量 def __compute_order_begin_pos(self, code, compute_data_count, continue_count): # 倒数100条数据查询 datas = local_today_datas[code] __len = len(datas) if __len < continue_count: return None start_index = 0 if compute_data_count > __len: compute_data_count = __len if __len > compute_data_count: start_index = __len - compute_data_count __time = None _limit_up_count_1s = 0 _limit_up_count_1s_start_index = -1 for i in range(start_index, __len - (continue_count - 1)): _val = datas[i]["val"] # 时间要>=09:30:00 if L2DataUtil.get_time_as_second(_val["time"]) < second_930: continue # 有连续4个涨停买就标记计算起始点 if L2DataUtil.is_limit_up_price_buy(_val): index_0 = i index_1 = -1 index_2 = -1 # index_3 = -1 for j in range(index_0 + 1, __len): # 涨停买 if L2DataUtil.is_limit_up_price_buy(datas[j]["val"]): index_1 = j break if index_1 > 0: for j in range(index_1 + 1, __len): # 涨停买 if L2DataUtil.is_limit_up_price_buy(datas[j]["val"]): index_2 = j break # if index_2 > 0: # for j in range(index_2 + 1, __len): # # 涨停买 # if datas[j]["val"]["limitPrice"] == 1 and datas[j]["val"]["operateType"] == 0: # index_3 = j if index_1 - index_0 == 1 and index_2 - index_1 == 1: # and index_3 - index_2 == 1 logger_l2_trade.info("找到物理连续涨停买 {},{},{}".format(code, i, datas[i])) return i # 同1s内有不连续的4个涨停买(如果遇买撤就重新计算,中间可间隔不涨停买)标记计算起始点 if L2DataUtil.is_limit_up_price_buy(_val): # 涨停买 if __time is None: _time = datas[i]["val"]["time"] _limit_up_count_1s = 1 _limit_up_count_1s_start_index = i elif _time == _val["time"]: _limit_up_count_1s += 1 else: _time = datas[i]["val"]["time"] _limit_up_count_1s = 1 _limit_up_count_1s_start_index = i elif _val["operateType"] == 1: # 买撤 _time = None _limit_up_count_1s = 0 _limit_up_count_1s_start_index = -1 if _limit_up_count_1s >= 4 and _limit_up_count_1s_start_index > -1: logger_l2_trade.info("找到同一秒连续涨停买 {},{},{}".format(code, _limit_up_count_1s_start_index, datas[i])) return _limit_up_count_1s_start_index return None # 是否有撤销信号 @classmethod def __compute_order_cancel_begin_single(cls, code, start_index, continue_count): datas = local_today_datas[code] __len = len(datas) if len(datas) - start_index < continue_count: return None for i in range(start_index, __len - (continue_count - 1)): _val = datas[i]["val"] if L2DataUtil.get_time_as_second(_val["time"]) < second_930: continue # 有连续3个买撤 if L2DataUtil.is_limit_up_price_buy_cancel(_val): index_0 = i index_1 = -1 index_2 = -1 for j in range(index_0 + 1, __len): # 涨停买 if L2DataUtil.is_limit_up_price_buy_cancel(datas[j]["val"]): index_1 = j break if index_1 > 0: for j in range(index_1 + 1, __len): # 涨停买 if L2DataUtil.is_limit_up_price_buy_cancel(datas[j]["val"]): index_2 = j break if index_1 - index_0 == 1 and index_2 - index_1 == 1: logger_l2_trade.info("连续3个涨停买撤 {},{},{}".format(code, i, json.dumps(datas[i]))) return i return None # 是否可以下单 def __is_can_order(self): pass # 虚拟下单 def __unreal_order(self): pass def __get_threshmoney(cls, code): l2_trade_factor.L2TradeFactorUtil.compute_m_value(code) # 获取预估挂买位 @classmethod def __get_sure_order_pos(cls, code): index, data = TradeBuyDataManager.get_buy_sure_position(code) if index is None: return 0, len(local_today_datas[code]) - 1, local_today_datas[code][-1] else: return 1, index, data # 统计买入净买量 @classmethod def __sum_buy_num_for_order(cls, code, compute_start_index, origin_num, threshold_money): total_datas = local_today_datas[code] buy_nums = origin_num limit_up_price = gpcode_manager.get_limit_up_price(code) if limit_up_price is None: raise Exception("涨停价无法获取") threshold_num = threshold_money / (limit_up_price * 100) for i in range(compute_start_index, len(total_datas)): _val = total_datas[i]["val"] # 有连续4个涨停买就标记计算起始点 if L2DataUtil.is_limit_up_price_buy(_val): # 涨停买 buy_nums += int(_val["num"]) * int(total_datas[i]["re"]) if buy_nums >= threshold_num: return i, buy_nums elif L2DataUtil.is_limit_up_price_buy_cancel(_val): # 涨停买撤 buy_nums -= int(_val["num"]) * int(total_datas[i]["re"]) return None, buy_nums # 同一时间买入的概率计算 @classmethod def __get_same_time_property(cls, code): # 计算板块热度 industry = global_util.code_industry_map.get(code) if industry is not None: hot_num = global_util.industry_hot_num.get(industry) if hot_num is not None: return 1 - l2_trade_factor.L2TradeFactorUtil.get_industry_rate(hot_num) return 0.5 # 统计买撤净买量 @classmethod def __sum_buy_num_for_cancel_order(cls, code, start_index, origin_num, threshold_money): buy_nums = origin_num total_datas = local_today_datas[code] limit_up_price = gpcode_manager.get_limit_up_price(code) if limit_up_price is None: raise Exception("涨停价无法获取") threshold_num = threshold_money / (limit_up_price * 100) # 获取预估挂买位 sure_type:0 虚拟挂买 1 实际挂买 sure_type, sure_pos, sure_data = cls.__get_sure_order_pos(code) same_time_property = cls.__get_same_time_property(code) # 同一秒,在预估买入位之后的数据之和 property_buy_num_count = 0 for i in range(start_index, len(total_datas)): data = total_datas[i] _val = data["val"] if L2DataUtil.is_limit_up_price_buy(_val): # 涨停买 if i < sure_pos: buy_nums += int(_val["num"]) * int(data["re"]) elif sure_data["val"]["time"] == _val["time"]: # 同一秒买入,而且还在预估买入位之后 property_buy_num_count += int(_val["num"]) * int(data["re"]) elif L2DataUtil.is_limit_up_price_buy_cancel(_val): # 涨停撤买 # 判断买入位置是否在买入信号之前 buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data, total_datas) if buy_index is not None: # 找到买撤数据的买入点 if buy_index < sure_pos: buy_nums -= int(_val["num"]) * int(data["re"]) elif sure_data["val"]["time"] == _val["time"]: # 同一秒,而且还在预估买入位之后按概率计算 property_buy_num_count -= int(_val["num"]) * int(data["re"]) else: # TODO 未找到买撤数据的买入点 pass property_buy_num = round(property_buy_num_count * same_time_property) if buy_nums + property_buy_num <= threshold_num: return i, buy_nums + property_buy_num, sure_type return None, buy_nums + round(property_buy_num_count * same_time_property), sure_type def __get_time_second(time_str): ts = time_str.split(":") return int(ts[0]) * 3600 + int(ts[1]) * 60 + int(ts[2]) second_930 = 9 * 3600 + 30 * 60 + 0 # 是否是涨停价买 def __is_limit_up_price_buy(val): if int(val["limitPrice"]) != 1: return False if int(val["operateType"]) != 0: return False price = float(val["price"]) num = int(val["num"]) if price * num * 100 < 50 * 10000: return False return True def __is_limit_up_price_buy_cancel(val): if int(val["limitPrice"]) != 1: return False if int(val["operateType"]) != 1: return False price = float(val["price"]) num = int(val["num"]) if price * num * 100 < 50 * 10000: return False return True # 获取涨停买入起始点 def __get_limit_up_buy_start(code, data_count, __continue_count): # logger.info("__get_limit_up_buy_start:{},{},{}".format(code, data_count, __continue_count)) # 倒数100条数据查询 datas = local_today_datas[code] __len = len(datas) if __len < __continue_count: return None start_index = 0 if data_count > __len: data_count = __len if __len > data_count: start_index = __len - data_count __time = None _limit_up_count_1s = 0 _limit_up_count_1s_start_index = -1 for i in range(start_index, __len - (__continue_count - 1)): _val = datas[i]["val"] # 时间要>=09:30:00 if __get_time_second(_val["time"]) < second_930: continue # 有连续4个涨停买就标记计算起始点 if __is_limit_up_price_buy(_val): index_0 = i index_1 = -1 index_2 = -1 # index_3 = -1 for j in range(index_0 + 1, __len): # 涨停买 if __is_limit_up_price_buy(datas[j]["val"]): index_1 = j break if index_1 > 0: for j in range(index_1 + 1, __len): # 涨停买 if __is_limit_up_price_buy(datas[j]["val"]): index_2 = j break # if index_2 > 0: # for j in range(index_2 + 1, __len): # # 涨停买 # if datas[j]["val"]["limitPrice"] == 1 and datas[j]["val"]["operateType"] == 0: # index_3 = j if index_1 - index_0 == 1 and index_2 - index_1 == 1: # and index_3 - index_2 == 1 logger_l2_trade.info("找到物理连续涨停买 {},{},{}".format(code, i, datas[i])) return i # 同1s内有不连续的4个涨停买(如果遇买撤就重新计算,中间可间隔不涨停买)标记计算起始点 if __is_limit_up_price_buy(_val): # 涨停买 if __time is None: _time = datas[i]["val"]["time"] _limit_up_count_1s = 1 _limit_up_count_1s_start_index = i elif _time == _val["time"]: _limit_up_count_1s += 1 else: _time = datas[i]["val"]["time"] _limit_up_count_1s = 1 _limit_up_count_1s_start_index = i elif _val["operateType"] == 1: # 买撤 _time = None _limit_up_count_1s = 0 _limit_up_count_1s_start_index = -1 if _limit_up_count_1s >= 4 and _limit_up_count_1s_start_index > -1: logger_l2_trade.info("找到同一秒连续涨停买 {},{},{}".format(code, _limit_up_count_1s_start_index, datas[i])) return _limit_up_count_1s_start_index return None # 获取涨停撤销起始点 def __get_limit_up_buy_cancel_start(code, data_count, __continue_count): # logger.info("__get_limit_up_buy_cancel_start:{},{},{}".format(code, data_count, __continue_count)) # 倒数100条数据查询 datas = local_today_datas[code] __len = len(datas) if __len < __continue_count: return None start_index = 0 if data_count > __len: data_count = __len if __len > data_count: start_index = __len - data_count for i in range(start_index, __len - (__continue_count - 1)): _val = datas[i]["val"] if __get_time_second(_val["time"]) < second_930: continue # 有连续3个买撤 if __is_limit_up_price_buy_cancel(_val): index_0 = i index_1 = -1 index_2 = -1 for j in range(index_0 + 1, __len): # 涨停买 if __is_limit_up_price_buy_cancel(datas[j]["val"]): index_1 = j break if index_1 > 0: for j in range(index_1 + 1, __len): # 涨停买 if __is_limit_up_price_buy_cancel(datas[j]["val"]): index_2 = j break if index_1 - index_0 == 1 and index_2 - index_1 == 1: logger_l2_trade.info("连续3个涨停买撤 {},{},{}".format(code, i, json.dumps(datas[i]))) return i return None # 是否有禁止交易特征 def __is_have_forbidden_feature(code, data_count, __continue_count): # 09:30:00出现连续撤销的数量大于一定的值 # 倒数100条数据查询 datas = local_today_datas[code] __len = len(datas) if __len < __continue_count: return None start_index = 0 if data_count > __len: data_count = __len if __len > data_count: start_index = __len - data_count cancel_start = -1 cancel_count = 0 for i in range(start_index, __len): _val = datas[i]["val"] if _val["time"] == "09:30:00" and i - 1 >= 0 and datas[i - 1]["val"]["time"] != "09:30:00": # 09:30第一条数据 if _val["operateType"] == 1: cancel_start = i else: return False elif cancel_start > -1: # 连续撤销 if _val["operateType"] == 1 and i - cancel_start == 1: cancel_start = i cancel_count += 1 if cancel_count >= __continue_count: return True else: return False return False # 设置最新的l2数据采集的数量 def __set_l2_data_latest_count(code, count): redis = _redisManager.getRedis() key = "latest-l2-count-{}".format(code) redis.setex(key, 2, count) pass # 获取代码最近的l2数据数量 def get_l2_data_latest_count(code): if code is None or len(code) < 1: return 0 redis = _redisManager.getRedis() key = "latest-l2-count-{}".format(code) result = redis.get(key) if result is None: return 0 else: return int(result) # 初始化l2固定代码库 def init_l2_fixed_codes(): key = "l2-fixed-codes" redis = _redisManager.getRedis() count = redis.scard(key) if count > 0: redis.delete(key) redis.sadd(key, "000000") redis.expire(key, tool.get_expire()) # 移除l2固定监控代码 def remove_from_l2_fixed_codes(code): key = "l2-fixed-codes" redis = _redisManager.getRedis() redis.srem(key, code) # 添加代码到L2固定监控 def add_to_l2_fixed_codes(code): key = "l2-fixed-codes" redis = _redisManager.getRedis() redis.sadd(key, code) redis.expire(key, tool.get_expire()) # 是否在l2固定监控代码中 def is_in_l2_fixed_codes(code): key = "l2-fixed-codes" redis = _redisManager.getRedis() return redis.sismember(key, code) if __name__ == "__main__": code = "000868" local_today_datas.setdefault(code, []) path = "C:/Users/Administrator/Desktop/demo/000868/" for file_name in os.listdir(path): p = "{}{}".format(path, file_name) f = open(p) for line in f.readlines(): # 依次读取每行 line = line.strip() data = json.loads(line) result = L2DataUtil.format_l2_data(data, code, 10.00) add_datas = L2DataUtil.get_add_data(code, result) print("增加的数量:", len(add_datas)) if len(add_datas) > 0: # 拼接数据 local_today_datas[code].extend(add_datas) if code in local_latest_datas: local_latest_datas[code] = result else: local_latest_datas.setdefault(code, result) f.close() for d in local_today_datas[code]: print(d["val"]["time"], d["val"]["num"], d["val"]["operateType"], d["re"])