admin
2025-06-10 568c763084b926a6f2d632b7ac65b9ec8280752f
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
"""
L1数据接口
"""
import hashlib
import json
import socket
import time
 
 
class SocketUtil:
 
    @classmethod
    def md5_encrypt(cls, value):
        md5 = hashlib.md5()
        md5.update(value.encode('utf-8'))
        return md5.hexdigest()
 
    @classmethod
    def create_socket(cls, addr, port):
        client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)  # 生成socket,连接server
        client.connect((addr, port))
        return client
 
    @classmethod
    def load_header(cls, data_bytes):
        slen = '##%08d' % len(data_bytes)
        return slen.encode("utf-8") + data_bytes
 
    @classmethod
    # 接收数据,去除头
    def recv_data(cls, sk):
        data = ""
        header_size = 10
        buf = sk.recv(1024)
        header_str = buf[:header_size]
        if buf:
            buf = buf.decode('utf-8')
            if buf.startswith("##"):
                content_length = int(buf[2:10])
                received_size = 0
                # 加上读取头的数据
                received_size += len(buf[header_size:])
                data += buf[header_size:]
                while not received_size == content_length:
                    r_data = sk.recv(10240)
                    received_size += len(r_data)
                    data += r_data.decode('utf-8')
            else:
                data = sk.recv(1024 * 1024)
                data = buf + data.decode('utf-8')
        return data, header_str
 
 
def get_current_info():
    """
    获取当前L1信息
    :return: [(代码,昨日收盘价,最新价,总成交量,总成交额,买5档,卖5档,更新时间)]
    """
    sk = SocketUtil.create_socket("43.138.167.68", 11008)
    params_bytes = SocketUtil.load_header(json.dumps({"type": "get_l1_data", "sign": ''}).encode("utf-8"))
    sk.send(params_bytes)
    result_str, header = SocketUtil.recv_data(sk)
    result_json = json.loads(result_str)
    if result_json['code'] == 0:
        return result_json['data']
    return []
 
 
def set_target_codes(codes):
    sk = SocketUtil.create_socket("43.138.167.68", 11008)
    params_bytes = SocketUtil.load_header(
        json.dumps({"type": "set_l1_codes", "data": {"codes": list(codes)}, "sign": ''}).encode("utf-8"))
    sk.send(params_bytes)
    result_str, header = SocketUtil.recv_data(sk)
    result_json = json.loads(result_str)
    if result_json['code'] == 0:
        return
    raise Exception("设置出错")
 
 
if __name__ == "__main__":
    print(get_current_info())
    # while True:
    #     datas = get_current_info()
    #     print(len(datas))
    #     time.sleep(3)