Administrator
2 天以前 ad3cc1a24c4b413bae3069cc6d1c2a1923540ce3
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
"""
TSL-TCP版的接口
"""
import codecs
import logging
import re
import socket
import ssl
 
 
class DataParseUtil:
    @classmethod
    def parse_limit_up_data(cls, hex_str, enable_log=False):
        limit_up_data_list = []
        results = re.split(r"\sA2\s01\s\w{2}\s01\s0A\s06\s", hex_str)
        # 涨停的日期
        day = None
        for i in range(1, len(results)):
            try:
                result = results[i]
                if enable_log:
                    print(bytes.fromhex(result))
                temp_results = re.finditer(r"\s12\s\w{2}\s", result)
                for match in temp_results:
                    code = bytes.fromhex(result[:match.start()]).decode('utf-8')
                    result = result[match.end():]
                    break
                # print(bytes.fromhex(result))
                # print(result)
                temp_results = re.split(r"\sA2\s06\s[A-Z0-9]{2}\s", result)
                contents = [code]
                pattern = r'[^\u4e00-\u9fa5a-zA-Z0-9\u3001\.\-]'  # 中文/字母/数字/中文顿号
                for j in range(len(temp_results)):
                    content = temp_results[j]
                    if j == 0:
                        # 代码名称
                        code_name = re.split(r"\s1A\s06", content)[0]
                        code_name = bytes.fromhex(code_name).decode('utf-8')
                        # 替换所有非中英文数字字符为空字符串
                        code_name = re.sub(pattern, '', code_name)
                        contents.append(code_name)
                    else:
                        content_str = bytes.fromhex(content).decode('utf-8', errors='ignore')
                        content_str = re.sub(pattern, '', content_str)
 
                        if len(contents) >= 20:
                            # 最多20项数据
                            day = content_str
                            break
                        contents.append(content_str)
 
                # contents 格式: [代码,名称,未知,未知,未知,成交额,涨停时间,高度, 涨停原因, 推荐原因,封单金额, 主力净额, 实际换手百分比, 实际流通市值, 最大封单金额, 最近涨停时间, 涨停原因的代码个数, 涨停原因的编码, 未知, 未知]
                if enable_log:
                    print(i, len(contents), contents)
                limit_up_data_list.append(contents)
            except Exception as e:
                logging.exception(e)
        return limit_up_data_list, day
 
 
def __recieve_data(ssl_sock):
    """
    接收数据请求
    """
    response = ssl_sock.read(3)
    hex_str = codecs.encode(response, 'hex').decode('utf-8')
    content_length = int(hex_str[2:6], 16)
    total_content = b''
    while content_length > len(total_content):
        content = ssl_sock.recv(4096)
        total_content += content
    return total_content
 
 
def __base_request(request_body, ssl_sock=None):
    """
 
    """
    if not ssl_sock:
        host, port = "hwsockapp.longhuvip.com", 14000
        sock = socket.create_connection((host, port))
        sock.settimeout(2)
        context = ssl.create_default_context()
        # 不验证证书
        context.check_hostname = False
        context.verify_mode = ssl.CERT_NONE
        ssl_sock = context.wrap_socket(sock, server_hostname=host)
        try:
            # 注册使用
            register_body = "30 00 65 00 02 02 59 10 00 00 0A 24 62 36 39 32 65 35 31 63 2D 31 62 63 34 2D 33 65 38 63 2D 61 30 31 62 2D 36 32 30 61 61 36 32 34 30 65 32 38 10 01 1A 08 35 2E 32 30 2E 30 2E 36 20 01 2A 20 63 65 30 36 34 63 66 38 62 35 34 62 62 66 35 30 63 33 66 36 62 37 36 63 30 38 38 64 64 63 33 36 32 01 30 3A 01 30 40 63"
            ssl_sock.sendall(bytes.fromhex(register_body))
 
            __recieve_data(ssl_sock)
            __recieve_data(ssl_sock)
 
        except Exception as e:
            raise e
    ssl_sock.sendall(bytes.fromhex(request_body))
    response = __recieve_data(ssl_sock)
    hex_str = codecs.encode(response, 'hex').decode('utf-8').upper()
    hex_array = []
    for i in range(0, len(hex_str), 2):
        str_ = hex_str[i: i + 2]
        hex_array.append(str_)
    return " ".join(hex_array), ssl_sock
 
 
def get_limit_up_list():
    # 获取涨停列表
    pages = ["400014006F0837080003020837080110061801281E3801",
             "400016008B08370800030208370801100618012011282B3801",
             "400016008C0837080003020837080110061801202F282B3801",
             "400016008D0837080003020837080110061801202F282B3801",
             "400016008E0837080003020837080110061801202F282B3801",
             "400016008F0837080003020837080110061801202F282B3801"
             ]
    limit_up_list = []
    ssl_sock = None
    day = None
    for page in pages:
        # print("======================")
        result, ssl_sock = __base_request(page, ssl_sock)
        fresults, day = DataParseUtil.parse_limit_up_data(result)
        limit_up_list.extend(fresults)
        if len(fresults) < 30:
            break
    seen = set()
    # 去重
    unique_limit_up_list = [x for x in limit_up_list if x[0] not in seen and not seen.add(x[0])]
 
    unique_limit_up_list.sort(key=lambda e: int(e[6], ), reverse=True)
 
    # 格式化数据
    format_limit_up_data_list = []
    if unique_limit_up_list:
        for d in unique_limit_up_list:
            format_data = [0] * 21
            format_data[0], format_data[1], format_data[3], format_data[4], format_data[5] = d[0], d[1], '', int(d[6]), \
                                                                                             d[8]
            format_data[6], format_data[7], format_data[8] = int(d[10]), int(d[14]), int(d[11])
 
            format_data[12], format_data[13], format_data[14] = d[9], int(d[13]), round(float(d[12]), 2)
            format_data[18], format_data[19], format_data[20] = d[7], d[17], int(d[16])
 
            format_limit_up_data_list.append(format_data)
    fdata = {"errcode": 0, "list": format_limit_up_data_list, "day": day}
    return fdata
 
 
if __name__ == "__main__":
    result = get_limit_up_list()
    print(len(result["list"]))