Administrator
2022-12-18 86e0061f9cf211b98252a9e6b71d6c9801e4a16b
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import datetime
import json
 
import l2_data_manager
 
 
class L2DataTest:
    def test_concat_l2_data(self):
        path = "D:/test/2750_1.txt"
        code = "002750"
        data = None
        with open(path, 'r') as f:
            temp = f.readline()
            data = json.loads(temp)
        datas = l2_data_manager.L2DataUtil.format_l2_data(data, code, 10.92)
        l2_data_manager.L2DataUtil.get_add_data(code, datas, 0)
        l2_data_manager.local_latest_datas[code] = datas
        l2_data_manager.local_today_datas[code] = datas
        path = "D:/test/2750_2.txt"
        with open(path, 'r') as f:
            temp = f.readline()
            data = json.loads(temp)
            datas = l2_data_manager.L2DataUtil.format_l2_data(data, code, 10.92)
            datas = l2_data_manager.L2DataUtil.correct_data(code, datas)
            _start_index = 0
            if l2_data_manager.local_today_datas.get(code) is not None and len(
                    l2_data_manager.local_today_datas[code]) > 0:
                _start_index = l2_data_manager.local_today_datas[code][-1]["index"] + 1
            add_datas = l2_data_manager.L2DataUtil.get_add_data(code, datas, _start_index)
            l2_data_manager.local_latest_datas[code] = add_datas
            l2_data_manager.local_today_datas[code].extend(add_datas)
 
        path = "D:/test/2750_3.txt"
        with open(path, 'r') as f:
            temp = f.readline()
            data = json.loads(temp)
            datas = l2_data_manager.L2DataUtil.format_l2_data(data, code, 10.92)
            datas = l2_data_manager.L2DataUtil.correct_data(code, datas)
            _start_index = 0
            if l2_data_manager.local_today_datas.get(code) is not None and len(
                    l2_data_manager.local_today_datas[code]) > 0:
                _start_index = l2_data_manager.local_today_datas[code][-1]["index"] + 1
            add_datas = l2_data_manager.L2DataUtil.get_add_data(code, datas, _start_index)
            l2_data_manager.local_latest_datas[code] = add_datas
            l2_data_manager.local_today_datas[code].extend(add_datas)
            print(l2_data_manager.local_today_datas[code])
 
    def get_space_position(self, code):
        date = datetime.datetime.now().strftime("%Y-%m-%d")
        path = "D:/logs/gp/l2/l2_process.{}.log".format(date)
        list = []
 
        with open(path, encoding="utf-8") as f:
            while True:
                line = f.readline()
                if line:
                    if line.find(code) > -1:
                        start = line.find("处理数据范围:")
                        end = line.find("处理时间:")
                        line = (line[start:end])
                        line = line[line.find(":") + 1:len(line)]
                        line = line.strip()
                        print(line)
                        list.append((int(line.split("-")[0]), int(line.split("-")[1])))
                else:
                    break
        return list
 
 
 
 
if __name__ == '__main__':
    L2DataTest().get_space_position("002094")