import json
|
|
import l2_data_manager
|
|
|
class L2DataTest:
|
def test_concat_l2_data(self):
|
path = "D:/test/2750_1.txt"
|
code = "002750"
|
data = None
|
with open(path, 'r') as f:
|
temp = f.readline()
|
data = json.loads(temp)
|
datas = l2_data_manager.L2DataUtil.format_l2_data(data, code, 10.92)
|
l2_data_manager.L2DataUtil.get_add_data(code, datas, 0)
|
l2_data_manager.local_latest_datas[code] = datas
|
l2_data_manager.local_today_datas[code] = datas
|
path = "D:/test/2750_2.txt"
|
with open(path, 'r') as f:
|
temp = f.readline()
|
data = json.loads(temp)
|
datas = l2_data_manager.L2DataUtil.format_l2_data(data, code, 10.92)
|
datas = l2_data_manager.L2DataUtil.correct_data(code, datas)
|
_start_index = 0
|
if l2_data_manager.local_today_datas.get(code) is not None and len(
|
l2_data_manager.local_today_datas[code]) > 0:
|
_start_index = l2_data_manager.local_today_datas[code][-1]["index"] + 1
|
add_datas = l2_data_manager.L2DataUtil.get_add_data(code, datas, _start_index)
|
l2_data_manager.local_latest_datas[code]=add_datas
|
l2_data_manager.local_today_datas[code].extend(add_datas)
|
|
path = "D:/test/2750_3.txt"
|
with open(path, 'r') as f:
|
temp = f.readline()
|
data = json.loads(temp)
|
datas = l2_data_manager.L2DataUtil.format_l2_data(data, code, 10.92)
|
datas = l2_data_manager.L2DataUtil.correct_data(code, datas)
|
_start_index = 0
|
if l2_data_manager.local_today_datas.get(code) is not None and len(
|
l2_data_manager.local_today_datas[code]) > 0:
|
_start_index = l2_data_manager.local_today_datas[code][-1]["index"] + 1
|
add_datas = l2_data_manager.L2DataUtil.get_add_data(code, datas, _start_index)
|
l2_data_manager.local_latest_datas[code] = add_datas
|
l2_data_manager.local_today_datas[code].extend(add_datas)
|
print(l2_data_manager.local_today_datas[code])
|
|
|
if __name__ == '__main__':
|
L2DataTest().test_concat_l2_data()
|