""" L2数据溯源 """ import tool class L2DataSourceUtils(object): __cancel_and_buy_map = {} __buy_and_cancel_map = {} @classmethod def __save_map(cls, code, buy_index, cancel_index): if cls.__cancel_and_buy_map.get(code) is None: cls.__cancel_and_buy_map[code] = {} cls.__cancel_and_buy_map[code][cancel_index] = buy_index if cls.__buy_and_cancel_map.get(code) is None: cls.__buy_and_cancel_map[code] = {} cls.__buy_and_cancel_map[code][buy_index] = cancel_index @classmethod def __get_cancel_index_cache(cls, code, buy_index): if code not in cls.__buy_and_cancel_map: return None return cls.__buy_and_cancel_map[code].get(buy_index) @classmethod def __get_buy_index_cache(cls, code, cancel_index): if code not in cls.__cancel_and_buy_map: return None return cls.__cancel_and_buy_map[code].get(cancel_index) @classmethod def __compare_time(cls, time1, time2): result = int(time1.replace(":", "", 2)) - int(time2.replace(":", "", 2)) return result # 计算时间的区间 @classmethod def __compute_time_space_as_second(cls, cancel_time, cancel_time_unit): __time = int(cancel_time) if int(cancel_time) == 0: return 0, 0 unit = int(cancel_time_unit) if unit == 0: # 秒 return __time, (__time + 1) elif unit == 1: # 分钟 return __time * 60, (__time + 1) * 60 elif unit == 2: # 小时 return __time * 3600, (__time + 1) * 3600 # 华鑫渠道的L2,根据买撤数据查找买入数据 @classmethod def __get_buy_index_with_cancel_data_by_huaxin_l2(cls, code, cancel_data, local_today_num_operate_map): buy_datas = local_today_num_operate_map.get( "{}-{}-{}".format(cancel_data["val"]["num"], "0", cancel_data["val"]["price"])) if buy_datas is None: # 无数据 return None # orderNo for bd in buy_datas: # 根据订单号做匹配 if bd["val"]["orderNo"] == cancel_data["val"]["orderNo"]: return bd return None # 同花顺渠道的L2,根据买撤数据查找买入数据 @classmethod def __get_buy_index_with_cancel_data_by_ths_l2(cls, code, cancel_data, local_today_num_operate_map): buy_datas = local_today_num_operate_map.get( "{}-{}-{}".format(cancel_data["val"]["num"], "0", cancel_data["val"]["price"])) if buy_datas is None: # 无数据 return None min_space, max_space = cls.__compute_time_space_as_second(cancel_data["val"]["cancelTime"], cancel_data["val"]["cancelTimeUnit"]) max_time = tool.trade_time_add_second(cancel_data["val"]["time"], 0 - min_space) min_time = tool.trade_time_add_second(cancel_data["val"]["time"], 0 - max_space) # 匹配到的index suit_indexes = [] for i in range(0, len(buy_datas)): data = buy_datas[i] if int(data["val"]["operateType"]) != 0: continue if int(data["val"]["num"]) != int(cancel_data["val"]["num"]): continue # 如果能找到对应的买撤就需要返回 cancel_index = cls.__get_cancel_index_cache(code, data["index"]) if cancel_index is not None and cancel_index != cancel_data["index"]: continue if min_space == 0 and max_space == 0: if cls.__compare_time(data["val"]["time"], min_time) == 0: suit_indexes.append(data["index"]) elif cls.__compare_time(data["val"]["time"], min_time) > 0 and cls.__compare_time(data["val"]["time"], max_time) <= 0: suit_indexes.append(data["index"]) if len(suit_indexes) >= 2: # 多个匹配项,优先溯源离取消位置最近的数据 suit_indexes.sort(key=lambda t: cancel_data["index"] - t) if len(suit_indexes) >= 1: cls.__save_map(code, suit_indexes[0], cancel_data["index"]) return suit_indexes[0] return None @classmethod def __get_buy_index_with_cancel_data(cls, code, cancel_data, local_today_num_operate_map): buy_index = cls.__get_buy_index_cache(code, cancel_data["index"]) if buy_index is not None: return buy_index return cls.__get_buy_index_with_cancel_data_by_ths_l2(code, cancel_data, local_today_num_operate_map) # 根据买撤数据(与今日总的数据)计算买入数据 @classmethod def get_buy_index_with_cancel_data(cls, code, cancel_data, local_today_num_operate_map): key = "{}-{}-{}".format(cancel_data["val"]["num"], "1", cancel_data["val"]["price"]) cancel_datas = local_today_num_operate_map.get(key) try: cancel_datas.sort(key=lambda t: t["index"]) except Exception as e: print("测试") for item in cancel_datas: # 提前做计算 cls.__get_buy_index_with_cancel_data(code, item, local_today_num_operate_map) return cls.__get_buy_index_with_cancel_data(code, cancel_data, local_today_num_operate_map) # 获取没撤的笔数 @classmethod def get_limit_up_buy_no_canceled_count(cls, code, index, total_data, local_today_num_operate_map): data = None try: data = total_data[index] except: print("") val = data["val"] # 判断当前买是否已经买撤 cancel_datas = local_today_num_operate_map.get( "{}-{}-{}".format(val["num"], "1", val["price"])) canceled = False if cancel_datas: for cancel_data in cancel_datas: buy_index = cls.get_buy_index_with_cancel_data(code, cancel_data, local_today_num_operate_map) if buy_index == index: canceled = True count = data["re"] - cancel_data["re"] if count > 0: return count break if not canceled: count = data["re"] return count return 0 # if __name__ == "__main__": # code = "000925" # l2_data_util.load_l2_data(code) # total_datas = l2_data_util.local_today_datas.get(code) # local_today_num_operate_map = l2_data_util.local_today_num_operate_map.get(code) # cancel_index = 900 # index = L2DataSourceUtils.get_buy_index_with_cancel_data(code, total_datas[cancel_index], # l2_data_util.local_today_num_operate_map.get(code)) # print("溯源位置:", index)