import datetime import json import l2_data_manager class L2DataTest: def test_concat_l2_data(self): path = "D:/test/2750_1.txt" code = "002750" data = None with open(path, 'r') as f: temp = f.readline() data = json.loads(temp) datas = l2_data_manager.L2DataUtil.format_l2_data(data, code, 10.92) l2_data_manager.L2DataUtil.get_add_data(code, datas, 0) l2_data_manager.local_latest_datas[code] = datas l2_data_manager.local_today_datas[code] = datas path = "D:/test/2750_2.txt" with open(path, 'r') as f: temp = f.readline() data = json.loads(temp) datas = l2_data_manager.L2DataUtil.format_l2_data(data, code, 10.92) datas = l2_data_manager.L2DataUtil.correct_data(code, datas) _start_index = 0 if l2_data_manager.local_today_datas.get(code) is not None and len( l2_data_manager.local_today_datas[code]) > 0: _start_index = l2_data_manager.local_today_datas[code][-1]["index"] + 1 add_datas = l2_data_manager.L2DataUtil.get_add_data(code, datas, _start_index) l2_data_manager.local_latest_datas[code] = add_datas l2_data_manager.local_today_datas[code].extend(add_datas) path = "D:/test/2750_3.txt" with open(path, 'r') as f: temp = f.readline() data = json.loads(temp) datas = l2_data_manager.L2DataUtil.format_l2_data(data, code, 10.92) datas = l2_data_manager.L2DataUtil.correct_data(code, datas) _start_index = 0 if l2_data_manager.local_today_datas.get(code) is not None and len( l2_data_manager.local_today_datas[code]) > 0: _start_index = l2_data_manager.local_today_datas[code][-1]["index"] + 1 add_datas = l2_data_manager.L2DataUtil.get_add_data(code, datas, _start_index) l2_data_manager.local_latest_datas[code] = add_datas l2_data_manager.local_today_datas[code].extend(add_datas) print(l2_data_manager.local_today_datas[code]) def get_space_position(self, code): date = datetime.datetime.now().strftime("%Y-%m-%d") path = "D:/logs/gp/l2/l2_process.{}.log".format(date) list = [] with open(path, encoding="utf-8") as f: while True: line = f.readline() if line: if line.find(code) > -1: start = line.find("处理数据范围:") end = line.find("处理时间:") line = (line[start:end]) line = line[line.find(":") + 1:len(line)] line = line.strip() print(line) list.append((int(line.split("-")[0]), int(line.split("-")[1]))) else: break return list if __name__ == '__main__': L2DataTest().get_space_position("002094")