"""
|
TSL-TCP版的接口
|
"""
|
import codecs
|
import logging
|
import re
|
import socket
|
import ssl
|
|
|
class DataParseUtil:
|
@classmethod
|
def parse_limit_up_data(cls, hex_str, enable_log=False):
|
limit_up_data_list = []
|
results = re.split(r"\sA2\s01\s\w{2}\s01\s0A\s06\s", hex_str)
|
# 涨停的日期
|
day = None
|
for i in range(1, len(results)):
|
try:
|
result = results[i]
|
if enable_log:
|
print(bytes.fromhex(result))
|
temp_results = re.finditer(r"\s12\s\w{2}\s", result)
|
for match in temp_results:
|
code = bytes.fromhex(result[:match.start()]).decode('utf-8')
|
result = result[match.end():]
|
break
|
# print(bytes.fromhex(result))
|
# print(result)
|
temp_results = re.split(r"\sA2\s06\s[A-Z0-9]{2}\s", result)
|
contents = [code]
|
pattern = r'[^\u4e00-\u9fa5a-zA-Z0-9\u3001\.\-]' # 中文/字母/数字/中文顿号
|
for j in range(len(temp_results)):
|
content = temp_results[j]
|
if j == 0:
|
# 代码名称
|
code_name = re.split(r"\s1A\s06", content)[0]
|
code_name = bytes.fromhex(code_name).decode('utf-8')
|
# 替换所有非中英文数字字符为空字符串
|
code_name = re.sub(pattern, '', code_name)
|
contents.append(code_name)
|
else:
|
content_str = bytes.fromhex(content).decode('utf-8', errors='ignore')
|
content_str = re.sub(pattern, '', content_str)
|
|
if len(contents) >= 20:
|
# 最多20项数据
|
day = content_str
|
break
|
contents.append(content_str)
|
|
# contents 格式: [代码,名称,未知,未知,未知,成交额,涨停时间,高度, 涨停原因, 推荐原因,封单金额, 主力净额, 实际换手百分比, 实际流通市值, 最大封单金额, 最近涨停时间, 涨停原因的代码个数, 涨停原因的编码, 未知, 未知]
|
if enable_log:
|
print(i, len(contents), contents)
|
limit_up_data_list.append(contents)
|
except Exception as e:
|
logging.exception(e)
|
return limit_up_data_list, day
|
|
|
def __recieve_data(ssl_sock):
|
"""
|
接收数据请求
|
"""
|
response = ssl_sock.read(3)
|
hex_str = codecs.encode(response, 'hex').decode('utf-8')
|
content_length = int(hex_str[2:6], 16)
|
total_content = b''
|
while content_length > len(total_content):
|
content = ssl_sock.recv(4096)
|
total_content += content
|
return total_content
|
|
|
def __base_request(request_body, ssl_sock=None):
|
"""
|
|
"""
|
if not ssl_sock:
|
host, port = "hwsockapp.longhuvip.com", 14000
|
sock = socket.create_connection((host, port))
|
sock.settimeout(2)
|
context = ssl.create_default_context()
|
# 不验证证书
|
context.check_hostname = False
|
context.verify_mode = ssl.CERT_NONE
|
ssl_sock = context.wrap_socket(sock, server_hostname=host)
|
try:
|
# 注册使用
|
register_body = "30 00 65 00 02 02 59 10 00 00 0A 24 62 36 39 32 65 35 31 63 2D 31 62 63 34 2D 33 65 38 63 2D 61 30 31 62 2D 36 32 30 61 61 36 32 34 30 65 32 38 10 01 1A 08 35 2E 32 30 2E 30 2E 36 20 01 2A 20 63 65 30 36 34 63 66 38 62 35 34 62 62 66 35 30 63 33 66 36 62 37 36 63 30 38 38 64 64 63 33 36 32 01 30 3A 01 30 40 63"
|
ssl_sock.sendall(bytes.fromhex(register_body))
|
|
__recieve_data(ssl_sock)
|
__recieve_data(ssl_sock)
|
|
except Exception as e:
|
raise e
|
ssl_sock.sendall(bytes.fromhex(request_body))
|
response = __recieve_data(ssl_sock)
|
hex_str = codecs.encode(response, 'hex').decode('utf-8').upper()
|
hex_array = []
|
for i in range(0, len(hex_str), 2):
|
str_ = hex_str[i: i + 2]
|
hex_array.append(str_)
|
return " ".join(hex_array), ssl_sock
|
|
|
def get_limit_up_list():
|
# 获取涨停列表
|
pages = ["400014006F0837080003020837080110061801281E3801",
|
"400016008B08370800030208370801100618012011282B3801",
|
"400016008C0837080003020837080110061801202F282B3801",
|
"400016008D0837080003020837080110061801202F282B3801",
|
"400016008E0837080003020837080110061801202F282B3801",
|
"400016008F0837080003020837080110061801202F282B3801"
|
]
|
limit_up_list = []
|
ssl_sock = None
|
day = None
|
for page in pages:
|
# print("======================")
|
result, ssl_sock = __base_request(page, ssl_sock)
|
fresults, day = DataParseUtil.parse_limit_up_data(result)
|
limit_up_list.extend(fresults)
|
if len(fresults) < 30:
|
break
|
seen = set()
|
# 去重
|
unique_limit_up_list = [x for x in limit_up_list if x[0] not in seen and not seen.add(x[0])]
|
|
unique_limit_up_list.sort(key=lambda e: int(e[6], ), reverse=True)
|
|
# 格式化数据
|
format_limit_up_data_list = []
|
if unique_limit_up_list:
|
for d in unique_limit_up_list:
|
format_data = [0] * 21
|
format_data[0], format_data[1], format_data[3], format_data[4], format_data[5] = d[0], d[1], '', int(d[6]), \
|
d[8]
|
format_data[6], format_data[7], format_data[8] = int(d[10]), int(d[14]), int(d[11])
|
|
format_data[12], format_data[13], format_data[14] = d[9], int(d[13]), round(float(d[12]), 2)
|
format_data[18], format_data[19], format_data[20] = d[7], d[17], int(d[16])
|
|
format_limit_up_data_list.append(format_data)
|
fdata = {"errcode": 0, "list": format_limit_up_data_list, "day": day}
|
return fdata
|
|
|
if __name__ == "__main__":
|
result = get_limit_up_list()
|
print(len(result["list"]))
|