""" L1需要订阅的代码管理 """ import json import os import constant from huaxin_client import socket_util from huaxin_client.client_network import SendResponseSkManager # 请求l1订阅的目标代码 from utils import tool def request_l1_subscript_target_codes(): type_ = "get_level1_codes" fdata = json.dumps( {"type": type_, "data": {}}) msg = fdata.encode("utf-8") # 发送消息 for i in range(3): try: sk = SendResponseSkManager.create_send_response_sk() msg = socket_util.load_header(msg) sk.sendall(msg) result, header_str = socket_util.recv_data(sk) # 读取代码 result_json = json.loads(result) if result_json["code"] == 0: codes = result_json["data"] codes_sh = [] codes_sz = [] for code in codes: if tool.is_sz_code(code): codes_sz.append(code.encode("utf-8")) else: codes_sh.append(code.encode("utf-8")) return codes_sh, codes_sz except ConnectionResetError: SendResponseSkManager.del_send_response_sk(type_) except BrokenPipeError: SendResponseSkManager.del_send_response_sk(type_) return None, None __DIR_PATH = f"{constant.get_path_prefix()}/codes" __CODE_SH_PATH = f"{__DIR_PATH}/codes_sh.text" __CODE_SZ_PATH = f"{__DIR_PATH}/codes_sz.text" # 保存目标代码 def save_codes(codes_sh, codes_sz): if not os.path.exists(__DIR_PATH): os.mkdir(__DIR_PATH) with open(__CODE_SH_PATH, 'w') as f: for c in codes_sh: if type(c) == bytes: f.write(c.decode('utf-8')) else: f.write(c) f.write("\n") with open(__CODE_SZ_PATH, 'w') as f: for c in codes_sz: if type(c) == bytes: f.write(c.decode('utf-8')) else: f.write(c) f.write("\n") def get_codes_from_file(only_can_buy=True): codes_sh, codes_sz = [], [] if os.path.exists(__CODE_SH_PATH): with open(__CODE_SH_PATH, 'r') as f: lines = f.readlines() for line in lines: if line: if line.strip(): code = line.strip() if not tool.is_can_buy_code(code) and only_can_buy: continue codes_sh.append(code.encode('utf-8')) if os.path.exists(__CODE_SZ_PATH): with open(__CODE_SZ_PATH, 'r') as f: lines = f.readlines() for line in lines: if line: if line.strip(): code = line.strip() if not tool.is_can_buy_code(code) and only_can_buy: continue codes_sz.append(code.encode('utf-8')) return codes_sh, codes_sz def get_codes(only_can_buy=True): codes_sh, codes_sz = get_codes_from_file(only_can_buy) if not codes_sh or not codes_sz: return request_l1_subscript_target_codes() return codes_sh, codes_sz if __name__ == '__main__': print(get_codes())