import decimal import json import time as t from datetime import datetime import data_process import mysql import gpcode_manager import mongo_data import redis_manager import tool import trade_manager from log import logger_l2_trade _redisManager = redis_manager.RedisManager(1) # l2数据管理 # 本地最新一次上传的数据 local_latest_datas = {} # 本地今日数据 local_today_datas = {} class L2DataException(Exception): # 价格不匹配 CODE_PRICE_ERROR = 1 # 无收盘价 CODE_NO_CLOSE_PRICE = 2 def __init__(self, code, msg): super().__init__(self) self.code = code self.msg = msg def __str__(self): return self.msg def get_code(self): return self.code # 交易点管理器,用于管理买入点;买撤点;距离买入点的净买入数据;距离买撤点的买撤数据 class TradePointManager: @staticmethod def __get_redis(): return _redisManager.getRedis() # 删除买入点数据 @staticmethod def delete_buy_point(code): redis = TradePointManager.__get_redis() redis.delete("buy_compute_index-{}".format(code)) redis.delete("buy_compute_num-{}".format(code)) # 删除买撤点数据 @staticmethod def delete_buy_cancel_point(code): redis = TradePointManager.__get_redis() redis.delete("buy_cancel_compute_index-{}".format(code)) redis.delete("buy_cancel_compute_num-{}".format(code)) # 获取买入点信息 @staticmethod def get_buy_compute_start_data(code): redis = TradePointManager.__get_redis() index = redis.get("buy_compute_index-{}".format(code)) total_num = redis.get("buy_compute_num-{}".format(code)) if index is None: return None, 0 else: return int(index), int(total_num) # 设置买入点的值 @staticmethod def set_buy_compute_start_data(code, num_add, index=None): redis = TradePointManager.__get_redis() expire = tool.get_expire() if index is not None: redis.setex("buy_compute_index-{}".format(code), expire, index) key = "buy_compute_num-{}".format(code) if redis.get(key) is None: redis.setex(key, expire, num_add) else: redis.incrby(key, num_add) # 获取撤买入开始计算的信息 @staticmethod def get_buy_cancel_compute_start_data(code): redis = TradePointManager.__get_redis() index = redis.get("buy_cancel_compute_index-{}".format(code)) total_num = redis.get("buy_cancel_compute_num-{}".format(code)) if index is None: return None, 0 else: return int(index), int(total_num) # 设置买撤点信息 @staticmethod def set_buy_cancel_compute_start_data(code, num_add, index=None): redis = TradePointManager.__get_redis() expire = tool.get_expire() if index is not None: redis.setex("buy_cancel_compute_index-{}".format(code), expire, index) key = "buy_cancel_compute_num-{}".format(code) if redis.get(key) is None: redis.setex(key, expire, num_add) else: redis.incrby(key, num_add) def load_l2_data(code, force=False): redis = _redisManager.getRedis() # 加载最近的l2数据 if local_latest_datas.get(code) is None or force: # 获取最近的数据 _data = redis.get("l2-data-latest-{}".format(code)) if _data is not None: if code in local_latest_datas: local_latest_datas[code] = json.loads(_data) else: local_latest_datas.setdefault(code, json.loads(_data)) # 获取今日的数据 if local_today_datas.get(code) is None or force: datas = [] keys = redis.keys("l2-{}-*".format(code)) for k in keys: key = k.replace("l2-", "") split_data = key.split("-") code = split_data[0] operateType = split_data[1] time = split_data[2] num = split_data[3] price = split_data[4] limitPrice = split_data[5] cancelTime = split_data[6] cancelTimeUnit = split_data[7] item = {"operateType": operateType, "time": time, "num": num, "price": price, "limitPrice": limitPrice, "cancelTime": cancelTime, "cancelTimeUnit": cancelTimeUnit} value = redis.get(k) json_value = json.loads(value) _data = {"key": key, "val": item, "re": json_value["re"], "index": int(json_value["index"])} datas.append(_data) # 排序 new_datas = sorted(datas, key=lambda e: (int(e.__getitem__('val')["time"].replace(":", "")), e.__getitem__('index'))) local_today_datas.setdefault(code, new_datas) def saveL2Data(code, datas): # 查询票是否在待监听的票里面 if not gpcode_manager.is_in_gp_pool(code): return None # 验证股价的正确性 redis_instance = _redisManager.getRedis() try: if redis_instance.setnx("l2-save-{}".format(code), "1") > 0: # 计算保留的时间 expire = tool.get_expire() index = 0 start_index = redis_instance.get("l2-maxindex-{}".format(code)) if start_index is None: start_index = 0 else: start_index = int(start_index) max_index = start_index for _data in datas: index = index + 1 key = "l2-" + _data["key"] value = redis_instance.get(key) if value is None: # 新增 max_index = start_index + index value = {"index": start_index + index, "re": _data['re']} redis_instance.setex(key, expire, json.dumps(value)) else: json_value = json.loads(value) if json_value["re"] != _data["re"]: json_value["re"] = _data["re"] redis_instance.setex(key, expire, json.dumps(json_value)) redis_instance.setex("l2-maxindex-{}".format(code), expire, max_index) finally: redis_instance.delete("l2-save-{}".format(code)) return datas def parseL2Data(str): now = int(t.time()) day = datetime.now().strftime("%Y%m%d") dict = json.loads(str) data = dict["data"] client = dict["client"] code = data["code"] channel = data["channel"] data = data["data"] datas = [] dataIndexs = {} # 获取涨停价 limit_up_price = gpcode_manager.get_limit_up_price(code) for item in data: # 解析数据 time = item["time"] price = float(item["price"]) num = item["num"] limitPrice = item["limitPrice"] # 涨停价 if limit_up_price is not None and limit_up_price == tool.to_price(decimal.Decimal(price)): limitPrice = 1 item["limitPrice"] = "{}".format(limitPrice) operateType = item["operateType"] cancelTime = item["cancelTime"] cancelTimeUnit = item["cancelTimeUnit"] key = "{}-{}-{}-{}-{}-{}-{}-{}".format(code, operateType, time, num, price, limitPrice, cancelTime, cancelTimeUnit) if key in dataIndexs: # 数据重复次数+1 datas[dataIndexs[key]]['re'] = datas[dataIndexs[key]]['re'] + 1 else: # 数据重复次数默认为1 datas.append({"key": key, "val": item, "re": 1}) dataIndexs.setdefault(key, len(datas) - 1) return day, client, channel, code, datas # 纠正数据,将re字段替换为较大值 def correct_data(code, _datas): latest_data = local_latest_datas.get(code) if latest_data is None: latest_data = [] for data in _datas: for _ldata in latest_data: if _ldata["key"] == data["key"] and _ldata["re"] != data["re"]: max_re = max(_ldata["re"], data["re"]) _ldata["re"] = max_re data["re"] = max_re return _datas # 保存l2数据 def save_l2_data(code, datas, add_datas): redis = _redisManager.getRedis() # 保存最近的数据 redis.setex("l2-data-latest-{}".format(code), tool.get_expire(), json.dumps(datas)) # 设置进内存 if code in local_latest_datas: local_latest_datas[code] = datas else: local_latest_datas.setdefault(code, datas) __set_l2_data_latest_count(code, len(datas)) if len(add_datas) > 0: saveL2Data(code, add_datas) # 获取增量数据 def get_add_data(code, datas): if datas is not None and len(datas) < 1: return [] last_key = "" __latest_datas = local_latest_datas.get(code) if __latest_datas is not None and len(__latest_datas) > 0: last_key = __latest_datas[-1]["key"] count = 0 start_index = -1 # 如果原来没有数据 for n in reversed(datas): count += 1 if n["key"] == last_key: start_index = len(datas) - count break if len(last_key) > 0: if start_index < 0 or start_index + 1 >= len(datas): return [] else: return datas[start_index + 1:] else: return datas[start_index + 1:] def __is_same_time(time1, time2): # TODO 测试 # if 1 > 0: # return True time1_s = time1.split(":") time1_second = int(time1_s[0]) * 3600 + int(time1_s[1]) * 60 + int(time1_s[2]) time2_s = time2.split(":") time2_second = int(time2_s[0]) * 3600 + int(time2_s[1]) * 60 + int(time2_s[2]) if abs(time2_second - time1_second) < 3: return True else: return False def process_data(code, datas): now_time_str = datetime.now().strftime("%H:%M:%S") __start_time = round(t.time() * 1000) try: if len(datas) > 0: # 判断价格区间是否正确 if not data_process.is_same_code_with_price(code, float(datas[0]["val"]["price"])): raise L2DataException(L2DataException.CODE_PRICE_ERROR, "股价不匹配") # 加载历史数据 load_l2_data(code) # 纠正数据 datas = correct_data(code, datas) add_datas = get_add_data(code, datas) if len(add_datas) > 0: # 拼接数据 local_today_datas[code].extend(add_datas) latest_time = add_datas[len(add_datas) - 1]["val"]["time"] # 时间差不能太大才能处理 if __is_same_time(now_time_str, latest_time): # logger.info("及时的数据,新增数据数量{}".format(len(add_datas))) # 是否已经有买入开始计算点 c_index, c_num = TradePointManager.get_buy_compute_start_data(code) if c_index is None: # 判断是否出现禁止交易信号 forbidden = __is_have_forbidden_feature(code, len(add_datas) + 6, 6) if forbidden: trade_manager.forbidden_trade(code) # 没有计算开始点 c_index = __get_limit_up_buy_start(code, len(add_datas) + 3, 3) if c_index is not None: total_datas = local_today_datas[code] logger_l2_trade.info("找到买点:{} - {}".format(code, json.dumps(total_datas[c_index]))) # 触发数据分析 ,获取连续涨停标记数据 buy_nums = 0 for i in range(c_index, len(total_datas)): _val = total_datas[i]["val"] # 有连续4个涨停买就标记计算起始点 if int(_val["limitPrice"]) == 1 and int(_val["operateType"]) == 0: # 涨停买 buy_nums += int(_val["num"]) * int(total_datas[i]["re"]) elif int(_val["limitPrice"]) == 1 and int(_val["operateType"]) == 1: # 涨停买撤 buy_nums -= int(_val["num"]) * int(total_datas[i]["re"]) TradePointManager.set_buy_compute_start_data(code, buy_nums, c_index) # 获取涨停价 limit_up_price = gpcode_manager.get_limit_up_price(code) if limit_up_price is not None: if buy_nums * limit_up_price * 100 > 1000 * 10000: # 大于1000w就买 logger_l2_trade.info( "执行买入:{} - 计算结束点: {}".format(code, json.dumps(total_datas[-1]))) try: trade_manager.start_buy(code) TradePointManager.delete_buy_cancel_point(code) except Exception as e: pass else: # 有计算开始点,计算新增的数据 buy_nums = 0 for data in add_datas: _val = data["val"] if int(_val["limitPrice"]) == 1 and int(_val["operateType"]) == 0: # 涨停买 buy_nums += int(_val["num"]) * int(data["re"]) elif int(_val["limitPrice"]) == 1 and int(_val["operateType"]) == 1: # 涨停买撤 buy_nums -= int(_val["num"]) * int(data["re"]) TradePointManager.set_buy_compute_start_data(code, buy_nums) latest_num = c_num + buy_nums # 获取涨停价 limit_up_price = gpcode_manager.get_limit_up_price(code) if limit_up_price is not None: if latest_num * limit_up_price * 100 > 1000 * 10000: # 大于1000w就买 logger_l2_trade.info("执行买入:{} - 计算结束点: {}".format(code, json.dumps(add_datas[-1]))) try: trade_manager.start_buy(code) TradePointManager.delete_buy_cancel_point(code) except Exception as e: pass if c_index is not None: # 是否处于委托待成交 state = trade_manager.get_trade_state(code) if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER: # 已经委托,检测取消接口 cancel_index, cancel_num = TradePointManager.get_buy_cancel_compute_start_data(code) if cancel_index is None: # 之前尚未监测到买撤起点 cancel_index = __get_limit_up_buy_cancel_start(code, len(add_datas) + 3, 3) if cancel_index is not None: total_datas = local_today_datas[code] # print("找到买撤点", cancel_index, total_datas[cancel_index]) logger_l2_trade.info( "找到买撤点:{} - {}".format(code, json.dumps(total_datas[cancel_index]))) # 触发数据分析 ,获取连续涨停标记数据 nums = 0 for i in range(c_index, len(total_datas)): _val = total_datas[i]["val"] if int(_val["limitPrice"]) == 1 and int(_val["operateType"]) == 1: # 涨停买撤 nums += int(_val["num"]) * int(total_datas[i]["re"]) TradePointManager.set_buy_cancel_compute_start_data(code, nums, cancel_index) else: # 之前监测到了买撤销起点 cancel_nums_add = 0 for data in add_datas: _val = data["val"] if int(_val["limitPrice"]) == 1 and int(_val["operateType"]) == 1: # 涨停买撤 cancel_nums_add += int(_val["num"]) * int(data["re"]) TradePointManager.set_buy_cancel_compute_start_data(code, cancel_nums_add) latest_num = cancel_num + cancel_nums_add # 获取涨停价 limit_up_price = gpcode_manager.get_limit_up_price(code) if limit_up_price is not None: if latest_num * limit_up_price * 100 > 1000 * 10000: # 大于1000w就买 #print("执行撤销") logger_l2_trade.info( "执行撤销:{} - {}".format(code, json.dumps(add_datas[-1]))) try: trade_manager.start_cancel_buy(code) # 取消买入标识 TradePointManager.delete_buy_point(code) TradePointManager.delete_buy_cancel_point(code) except Exception as e: pass pass # 保存数据 save_l2_data(code, datas, add_datas) finally: pass def __get_time_second(time_str): ts = time_str.split(":") return int(ts[0]) * 3600 + int(ts[1]) * 60 + int(ts[2]) second_930 = 9 * 3600 + 30 * 60 + 0 # 是否是涨停价买 def __is_limit_up_price_buy(val): if int(val["limitPrice"]) != 1: return False if int(val["operateType"]) != 0: return False price = float(val["price"]) num = int(val["num"]) if price * num * 100 < 50 * 10000: return False return True def __is_limit_up_price_buy_cancel(val): if int(val["limitPrice"]) != 1: return False if int(val["operateType"]) != 1: return False price = float(val["price"]) num = int(val["num"]) if price * num * 100 < 50 * 10000: return False return True # 获取涨停买入起始点 def __get_limit_up_buy_start(code, data_count, __continue_count): # logger.info("__get_limit_up_buy_start:{},{},{}".format(code, data_count, __continue_count)) # 倒数100条数据查询 datas = local_today_datas[code] __len = len(datas) if __len < __continue_count: return None start_index = 0 if data_count > __len: data_count = __len if __len > data_count: start_index = __len - data_count __time = None _limit_up_count_1s = 0 _limit_up_count_1s_start_index = -1 for i in range(start_index, __len - (__continue_count - 1)): _val = datas[i]["val"] # 时间要>=09:30:00 if __get_time_second(_val["time"]) < second_930: continue # 有连续4个涨停买就标记计算起始点 if __is_limit_up_price_buy(_val): index_0 = i index_1 = -1 index_2 = -1 # index_3 = -1 for j in range(index_0 + 1, __len): # 涨停买 if __is_limit_up_price_buy(datas[j]["val"]): index_1 = j break if index_1 > 0: for j in range(index_1 + 1, __len): # 涨停买 if __is_limit_up_price_buy(datas[j]["val"]): index_2 = j break # if index_2 > 0: # for j in range(index_2 + 1, __len): # # 涨停买 # if datas[j]["val"]["limitPrice"] == 1 and datas[j]["val"]["operateType"] == 0: # index_3 = j if index_1 - index_0 == 1 and index_2 - index_1 == 1: # and index_3 - index_2 == 1 logger_l2_trade.info("找到物理连续涨停买 {},{},{}".format(code, i,datas[i])) return i # 同1s内有不连续的4个涨停买(如果遇买撤就重新计算,中间可间隔不涨停买)标记计算起始点 if __is_limit_up_price_buy(_val): # 涨停买 if __time is None: _time = datas[i]["val"]["time"] _limit_up_count_1s = 1 _limit_up_count_1s_start_index = i elif _time == _val["time"]: _limit_up_count_1s += 1 else: _time = datas[i]["val"]["time"] _limit_up_count_1s = 1 _limit_up_count_1s_start_index = i elif _val["operateType"] == 1: # 买撤 _time = None _limit_up_count_1s = 0 _limit_up_count_1s_start_index = -1 if _limit_up_count_1s >= 4 and _limit_up_count_1s_start_index > -1: logger_l2_trade.info("找到同一秒连续涨停买 {},{},{}".format(code, _limit_up_count_1s_start_index,datas[i])) return _limit_up_count_1s_start_index return None # 获取涨停撤销起始点 def __get_limit_up_buy_cancel_start(code, data_count, __continue_count): # logger.info("__get_limit_up_buy_cancel_start:{},{},{}".format(code, data_count, __continue_count)) # 倒数100条数据查询 datas = local_today_datas[code] __len = len(datas) if __len < __continue_count: return None start_index = 0 if data_count > __len: data_count = __len if __len > data_count: start_index = __len - data_count for i in range(start_index, __len - (__continue_count - 1)): _val = datas[i]["val"] if __get_time_second(_val["time"]) < second_930: continue # 有连续3个买撤 if __is_limit_up_price_buy_cancel(_val): index_0 = i index_1 = -1 index_2 = -1 for j in range(index_0 + 1, __len): # 涨停买 if __is_limit_up_price_buy_cancel(datas[j]["val"]): index_1 = j break if index_1 > 0: for j in range(index_1 + 1, __len): # 涨停买 if __is_limit_up_price_buy_cancel(datas[j]["val"]): index_2 = j break if index_1 - index_0 == 1 and index_2 - index_1 == 1: logger_l2_trade.info("连续3个涨停买撤 {},{},{}".format(code, i,json.dumps(datas[i]))) return i return None # 是否有禁止交易特征 def __is_have_forbidden_feature(code, data_count, __continue_count): # 09:30:00出现连续撤销的数量大于一定的值 # 倒数100条数据查询 datas = local_today_datas[code] __len = len(datas) if __len < __continue_count: return None start_index = 0 if data_count > __len: data_count = __len if __len > data_count: start_index = __len - data_count cancel_start = -1 cancel_count = 0 for i in range(start_index, __len): _val = datas[i]["val"] if _val["time"] == "09:30:00" and i - 1 >= 0 and datas[i - 1]["val"]["time"] != "09:30:00": # 09:30第一条数据 if _val["operateType"] == 1: cancel_start = i else: return False elif cancel_start > -1: # 连续撤销 if _val["operateType"] == 1 and i - cancel_start == 1: cancel_start = i cancel_count += 1 if cancel_count >= __continue_count: return True else: return False return False # 设置最新的l2数据采集的数量 def __set_l2_data_latest_count(code, count): redis = _redisManager.getRedis() key = "latest-l2-count-{}".format(code) redis.setex(key, 2, count) pass # 获取代码最近的l2数据数量 def get_l2_data_latest_count(code): if code is None or len(code) < 1: return 0 redis = _redisManager.getRedis() key = "latest-l2-count-{}".format(code) result = redis.get(key) if result is None: return 0 else: return int(result) # 初始化l2固定代码库 def init_l2_fixed_codes(): key = "l2-fixed-codes" redis = _redisManager.getRedis() count = redis.scard(key) if count > 0: redis.delete(key) redis.sadd(key, "000000") redis.expire(key, tool.get_expire()) # 移除l2固定监控代码 def remove_from_l2_fixed_codes(code): key = "l2-fixed-codes" redis = _redisManager.getRedis() redis.srem(key, code) # 添加代码到L2固定监控 def add_to_l2_fixed_codes(code): key = "l2-fixed-codes" redis = _redisManager.getRedis() redis.sadd(key, code) # 是否在l2固定监控代码中 def is_in_l2_fixed_codes(code): key = "l2-fixed-codes" redis = _redisManager.getRedis() return redis.sismember(key, code) if __name__ == "__main__": pass # __set_buy_compute_start_data("000000", 100, 1) # __set_buy_compute_start_data("000000", 100) # __set_l2_data_latest_count("000333", 20) # print(type(get_l2_data_latest_count("000333"))) # datas = ["2", "3", "4", "5"] # print(datas[4:]) # print(decimal.Decimal("19.294").quantize(decimal.Decimal("0.00"), decimal.ROUND_HALF_UP)) # 获取增量数据 # 保存数据 # 拼接数据