| | |
| | | kpl API数据获取与处理 |
| | | """ |
| | | import json |
| | | import os.path |
| | | import time |
| | | import datetime |
| | | |
| | |
| | | # 构建涨停信息读写对象 |
| | | class DailyLimitUpInfoStorageManager: |
| | | # 初始化文件路径 |
| | | def __init__(self, file_path='local_storage_data/limit_up_block_date_data.jsonl'): |
| | | def __init__(self, file_path=constant.KPL_LIMIT_UP_DATA_PATH): |
| | | self.file_path = file_path |
| | | |
| | | # 添加单日涨停信息数据到文件中的一行 函数 |
| | | def append_data_to_file(self, data_to_append): |
| | | # print(f"data_to_append=={data_to_append}") |
| | | # 读取所有行并解析为 JSON 对象列表 |
| | | with open(self.file_path, 'r', encoding='utf-8') as file: |
| | | # 获取当前日期并格式化 |
| | | current_date = datetime.datetime.now().strftime('%Y-%m-%d') |
| | | lines = [json.loads(line.strip()) for line in file if line.strip()] |
| | | # print(f"lines type=={type(lines)}") |
| | | # print(f"lines=={lines}") |
| | | # 检查当前日期是否已存在于文件中 |
| | | if lines: # 如果读取到的行文件列表不为空(为真) |
| | | if lines[-1].get(current_date) is None: # 如果列表中的倒数最后一行获取不到当日的日期(最后一行的键 为 当日日期) |
| | | # 将日期和data_to_append转换为JSON格式的字符串 |
| | | if os.path.exists(self.file_path): |
| | | with open(self.file_path, 'r', encoding='utf-8') as file: |
| | | # 获取当前日期并格式化 |
| | | current_date = datetime.datetime.now().strftime('%Y-%m-%d') |
| | | lines = [json.loads(line.strip()) for line in file if line.strip()] |
| | | # print(f"lines type=={type(lines)}") |
| | | # print(f"lines=={lines}") |
| | | # 检查当前日期是否已存在于文件中 |
| | | if lines: # 如果读取到的行文件列表不为空(为真) |
| | | if lines[-1].get(current_date) is None: # 如果列表中的倒数最后一行获取不到当日的日期(最后一行的键 为 当日日期) |
| | | # 将日期和data_to_append转换为JSON格式的字符串 |
| | | json_line = json.dumps({current_date: data_to_append}, ensure_ascii=False) + '\n' |
| | | # 打开文件并追加JSON行 |
| | | with open(self.file_path, 'a', encoding='utf-8') as file:file.write(json_line) |
| | | else: |
| | | print(f"(当日日期已存在于文件的最后一行了,不再重复追加写入)") |
| | | else: |
| | | json_line = json.dumps({current_date: data_to_append}, ensure_ascii=False) + '\n' |
| | | # 打开文件并追加JSON行 |
| | | with open(self.file_path, 'a', encoding='utf-8') as file:file.write(json_line) |
| | | else: |
| | | print(f"(当日日期已存在于文件的最后一行了,不再重复追加写入)") |
| | | else: |
| | | json_line = json.dumps({current_date: data_to_append}, ensure_ascii=False) + '\n' |
| | | # 打开文件并追加JSON行 |
| | | with open(self.file_path, 'a', encoding='utf-8') as file: |
| | | file.write(json_line) |
| | | with open(self.file_path, 'a', encoding='utf-8') as file: |
| | | file.write(json_line) |
| | | |
| | | # 清理多余数据函数 |
| | | def check_and_remove_oldest_entry(self, max_entries): |