"""
|
L1需要订阅的代码管理
|
"""
|
import json
|
import os
|
|
import constant
|
from huaxin_client.client_network import SendResponseSkManager
|
|
|
# 请求l1订阅的目标代码
|
from log_module.log import printlog
|
from utils import socket_util, tool
|
|
|
def request_l1_subscript_target_codes():
|
type_ = "get_level1_codes"
|
fdata = json.dumps(
|
{"type": type_, "data": {}})
|
msg = fdata.encode("utf-8")
|
# 发送消息
|
for i in range(3):
|
try:
|
sk = SendResponseSkManager.create_send_response_sk()
|
msg = socket_util.load_header(msg)
|
sk.sendall(msg)
|
result, header_str = socket_util.recv_data(sk)
|
# 读取代码
|
result_json = json.loads(result)
|
if result_json["code"] == 0:
|
codes = result_json["data"]
|
codes_sh = []
|
codes_sz = []
|
for code in codes:
|
if tool.is_sz_code(code):
|
codes_sz.append(code.encode("utf-8"))
|
else:
|
codes_sh.append(code.encode("utf-8"))
|
printlog("获取订阅目标代数量:", len(codes_sh), len(codes_sz))
|
return codes_sh, codes_sz
|
except ConnectionResetError:
|
SendResponseSkManager.del_send_response_sk(type_)
|
except BrokenPipeError:
|
SendResponseSkManager.del_send_response_sk(type_)
|
return None, None
|
|
|
__DIR_PATH = f"{constant.get_path_prefix()}/codes"
|
__CODE_SH_PATH = f"{__DIR_PATH}/codes_sh.text"
|
__CODE_SZ_PATH = f"{__DIR_PATH}/codes_sz.text"
|
|
|
# 保存目标代码
|
def save_codes(codes_sh, codes_sz):
|
if not os.path.exists(__DIR_PATH):
|
os.mkdir(__DIR_PATH)
|
with open(__CODE_SH_PATH, 'w') as f:
|
for c in codes_sh:
|
if type(c) == bytes:
|
f.write(c.decode('utf-8'))
|
else:
|
f.write(c)
|
f.write("\n")
|
|
with open(__CODE_SZ_PATH, 'w') as f:
|
for c in codes_sz:
|
if type(c) == bytes:
|
f.write(c.decode('utf-8'))
|
else:
|
f.write(c)
|
f.write("\n")
|
|
|
def get_codes_from_file():
|
codes_sh, codes_sz = [], []
|
if os.path.exists(__CODE_SH_PATH):
|
with open(__CODE_SH_PATH, 'r') as f:
|
line = f.readline()
|
while line:
|
if line.strip():
|
codes_sh.append(line.strip().encode('utf-8'))
|
line = f.readline()
|
if os.path.exists(__CODE_SZ_PATH):
|
with open(__CODE_SZ_PATH, 'r') as f:
|
line = f.readline()
|
while line:
|
if line.strip():
|
codes_sz.append(line.strip().encode('utf-8'))
|
line = f.readline()
|
return codes_sh, codes_sz
|
|
|
def get_codes():
|
codes_sh, codes_sz = get_codes_from_file()
|
if not codes_sh or not codes_sz:
|
return request_l1_subscript_target_codes()
|
return codes_sh, codes_sz
|
|
|
if __name__ == '__main__':
|
pass
|