""" TSL-TCP版的接口 """ import codecs import logging import re import socket import ssl class DataParseUtil: @classmethod def parse_limit_up_data(cls, hex_str, enable_log=False): limit_up_data_list = [] results = re.split(r"\sA2\s01\s\w{2}\s01\s0A\s06\s", hex_str) # 涨停的日期 day = None for i in range(1, len(results)): try: result = results[i] if enable_log: print(bytes.fromhex(result)) temp_results = re.finditer(r"\s12\s\w{2}\s", result) for match in temp_results: code = bytes.fromhex(result[:match.start()]).decode('utf-8') result = result[match.end():] break # print(bytes.fromhex(result)) # print(result) temp_results = re.split(r"\sA2\s06\s[A-Z0-9]{2}\s", result) contents = [code] pattern = r'[^\u4e00-\u9fa5a-zA-Z0-9\u3001\.\-]' # 中文/字母/数字/中文顿号 for j in range(len(temp_results)): content = temp_results[j] if j == 0: # 代码名称 code_name = re.split(r"\s1A\s06", content)[0] code_name = bytes.fromhex(code_name).decode('utf-8') # 替换所有非中英文数字字符为空字符串 code_name = re.sub(pattern, '', code_name) contents.append(code_name) else: content_str = bytes.fromhex(content).decode('utf-8', errors='ignore') content_str = re.sub(pattern, '', content_str) if len(contents) >= 20: # 最多20项数据 day = content_str break contents.append(content_str) # contents 格式: [代码,名称,未知,未知,未知,成交额,涨停时间,高度, 涨停原因, 推荐原因,封单金额, 主力净额, 实际换手百分比, 实际流通市值, 最大封单金额, 最近涨停时间, 涨停原因的代码个数, 涨停原因的编码, 未知, 未知] if enable_log: print(i, len(contents), contents) limit_up_data_list.append(contents) except Exception as e: logging.exception(e) return limit_up_data_list, day def __recieve_data(ssl_sock): """ 接收数据请求 """ response = ssl_sock.read(3) hex_str = codecs.encode(response, 'hex').decode('utf-8') content_length = int(hex_str[2:6], 16) total_content = b'' while content_length > len(total_content): content = ssl_sock.recv(4096) total_content += content return total_content def __base_request(request_body, ssl_sock=None): """ """ if not ssl_sock: host, port = "hwsockapp.longhuvip.com", 14000 sock = socket.create_connection((host, port)) sock.settimeout(2) context = ssl.create_default_context() # 不验证证书 context.check_hostname = False context.verify_mode = ssl.CERT_NONE ssl_sock = context.wrap_socket(sock, server_hostname=host) try: # 注册使用 register_body = "30 00 65 00 02 02 59 10 00 00 0A 24 62 36 39 32 65 35 31 63 2D 31 62 63 34 2D 33 65 38 63 2D 61 30 31 62 2D 36 32 30 61 61 36 32 34 30 65 32 38 10 01 1A 08 35 2E 32 30 2E 30 2E 36 20 01 2A 20 63 65 30 36 34 63 66 38 62 35 34 62 62 66 35 30 63 33 66 36 62 37 36 63 30 38 38 64 64 63 33 36 32 01 30 3A 01 30 40 63" ssl_sock.sendall(bytes.fromhex(register_body)) __recieve_data(ssl_sock) __recieve_data(ssl_sock) except Exception as e: raise e ssl_sock.sendall(bytes.fromhex(request_body)) response = __recieve_data(ssl_sock) hex_str = codecs.encode(response, 'hex').decode('utf-8').upper() hex_array = [] for i in range(0, len(hex_str), 2): str_ = hex_str[i: i + 2] hex_array.append(str_) return " ".join(hex_array), ssl_sock def get_limit_up_list(): # 获取涨停列表 pages = ["400014006F0837080003020837080110061801281E3801", "400016008B08370800030208370801100618012011282B3801", "400016008C0837080003020837080110061801202F282B3801", "400016008D0837080003020837080110061801202F282B3801", "400016008E0837080003020837080110061801202F282B3801", "400016008F0837080003020837080110061801202F282B3801" ] limit_up_list = [] ssl_sock = None day = None for page in pages: # print("======================") result, ssl_sock = __base_request(page, ssl_sock) fresults, day = DataParseUtil.parse_limit_up_data(result) limit_up_list.extend(fresults) if len(fresults) < 30: break seen = set() # 去重 unique_limit_up_list = [x for x in limit_up_list if x[0] not in seen and not seen.add(x[0])] unique_limit_up_list.sort(key=lambda e: int(e[6], ), reverse=True) # 格式化数据 format_limit_up_data_list = [] if unique_limit_up_list: for d in unique_limit_up_list: format_data = [0] * 21 format_data[0], format_data[1], format_data[3], format_data[4], format_data[5] = d[0], d[1], '', int(d[6]), \ d[8] format_data[6], format_data[7], format_data[8] = int(d[10]), int(d[14]), int(d[11]) format_data[12], format_data[13], format_data[14] = d[9], int(d[13]), round(float(d[12]), 2) format_data[18], format_data[19], format_data[20] = d[7], d[17], int(d[16]) format_limit_up_data_list.append(format_data) fdata = {"errcode": 0, "list": format_limit_up_data_list, "day": day} return fdata if __name__ == "__main__": result = get_limit_up_list() print(len(result["list"]))