import datetime
|
import json
|
|
import l2_data_manager
|
|
|
class L2DataTest:
|
def test_concat_l2_data(self):
|
path = "D:/test/2750_1.txt"
|
code = "002750"
|
data = None
|
with open(path, 'r') as f:
|
temp = f.readline()
|
data = json.loads(temp)
|
datas = l2_data_manager.L2DataUtil.format_l2_data(data, code, 10.92)
|
l2_data_manager.L2DataUtil.get_add_data(code, datas, 0)
|
l2_data_manager.local_latest_datas[code] = datas
|
l2_data_manager.local_today_datas[code] = datas
|
path = "D:/test/2750_2.txt"
|
with open(path, 'r') as f:
|
temp = f.readline()
|
data = json.loads(temp)
|
datas = l2_data_manager.L2DataUtil.format_l2_data(data, code, 10.92)
|
datas = l2_data_manager.L2DataUtil.correct_data(code, datas)
|
_start_index = 0
|
if l2_data_manager.local_today_datas.get(code) is not None and len(
|
l2_data_manager.local_today_datas[code]) > 0:
|
_start_index = l2_data_manager.local_today_datas[code][-1]["index"] + 1
|
add_datas = l2_data_manager.L2DataUtil.get_add_data(code, datas, _start_index)
|
l2_data_manager.local_latest_datas[code] = add_datas
|
l2_data_manager.local_today_datas[code].extend(add_datas)
|
|
path = "D:/test/2750_3.txt"
|
with open(path, 'r') as f:
|
temp = f.readline()
|
data = json.loads(temp)
|
datas = l2_data_manager.L2DataUtil.format_l2_data(data, code, 10.92)
|
datas = l2_data_manager.L2DataUtil.correct_data(code, datas)
|
_start_index = 0
|
if l2_data_manager.local_today_datas.get(code) is not None and len(
|
l2_data_manager.local_today_datas[code]) > 0:
|
_start_index = l2_data_manager.local_today_datas[code][-1]["index"] + 1
|
add_datas = l2_data_manager.L2DataUtil.get_add_data(code, datas, _start_index)
|
l2_data_manager.local_latest_datas[code] = add_datas
|
l2_data_manager.local_today_datas[code].extend(add_datas)
|
print(l2_data_manager.local_today_datas[code])
|
|
def get_space_position(self, code):
|
date = datetime.datetime.now().strftime("%Y-%m-%d")
|
path = "D:/logs/gp/l2/l2_process.{}.log".format(date)
|
list = []
|
|
with open(path, encoding="utf-8") as f:
|
while True:
|
line = f.readline()
|
if line:
|
if line.find(code) > -1:
|
start = line.find("处理数据范围:")
|
end = line.find("处理时间:")
|
line = (line[start:end])
|
line = line[line.find(":") + 1:len(line)]
|
line = line.strip()
|
print(line)
|
list.append((int(line.split("-")[0]), int(line.split("-")[1])))
|
else:
|
break
|
return list
|
|
|
|
|
if __name__ == '__main__':
|
L2DataTest().get_space_position("002094")
|