| | |
| | | from huaxin_client import socket_util |
| | | from huaxin_client.client_network import SendResponseSkManager |
| | | |
| | | |
| | | # 请求l1订阅的目标代码 |
| | | from utils import tool |
| | | |
| | | |
| | | def request_l1_subscript_target_codes(): |
| | | type_ = "get_level1_codes" |
| | | fdata = json.dumps( |
| | |
| | | codes_sh = [] |
| | | codes_sz = [] |
| | | for code in codes: |
| | | if code.find("00") == 0: |
| | | if tool.is_sz_code(code): |
| | | codes_sz.append(code.encode("utf-8")) |
| | | else: |
| | | codes_sh.append(code.encode("utf-8")) |
| | | print("获取订阅目标代数量:", len(codes_sh), len(codes_sz)) |
| | | return codes_sh, codes_sz |
| | | except ConnectionResetError: |
| | | SendResponseSkManager.del_send_response_sk(type_) |
| | |
| | | f.write("\n") |
| | | |
| | | |
| | | def get_codes_from_file(): |
| | | def get_codes_from_file(only_can_buy=True): |
| | | codes_sh, codes_sz = [], [] |
| | | if os.path.exists(__CODE_SH_PATH): |
| | | with open(__CODE_SH_PATH, 'r') as f: |
| | | line = f.readline() |
| | | while line: |
| | | if line.strip(): |
| | | codes_sh.append(line.strip().encode('utf-8')) |
| | | line = f.readline() |
| | | lines = f.readlines() |
| | | for line in lines: |
| | | if line: |
| | | if line.strip(): |
| | | code = line.strip() |
| | | if not tool.is_can_buy_code(code) and only_can_buy: |
| | | continue |
| | | codes_sh.append(code.encode('utf-8')) |
| | | |
| | | if os.path.exists(__CODE_SZ_PATH): |
| | | with open(__CODE_SZ_PATH, 'r') as f: |
| | | line = f.readline() |
| | | while line: |
| | | if line.strip(): |
| | | codes_sz.append(line.strip().encode('utf-8')) |
| | | line = f.readline() |
| | | lines = f.readlines() |
| | | for line in lines: |
| | | if line: |
| | | if line.strip(): |
| | | code = line.strip() |
| | | if not tool.is_can_buy_code(code) and only_can_buy: |
| | | continue |
| | | codes_sz.append(code.encode('utf-8')) |
| | | return codes_sh, codes_sz |
| | | |
| | | |
| | | def get_codes(): |
| | | # codes_sh, codes_sz = get_codes_from_file() |
| | | # if not codes_sh or not codes_sz: |
| | | return request_l1_subscript_target_codes() |
| | | # return codes_sh, codes_sz |
| | | def get_codes(only_can_buy=True): |
| | | codes_sh, codes_sz = get_codes_from_file(only_can_buy) |
| | | if not codes_sh or not codes_sz: |
| | | return request_l1_subscript_target_codes() |
| | | return codes_sh, codes_sz |
| | | |
| | | |
| | | if __name__ == '__main__': |
| | | pass |
| | | print(get_codes()) |