| | |
| | | # 是否为测试 |
| | | import os |
| | | import platform |
| | | |
| | | TEST = False |
| | |
| | | return 'D:' if is_windows() else '/home/userzjj' |
| | | |
| | | |
| | | DATA_DIR_PATH = f"{get_path_prefix()}/local_storage_data" |
| | | |
| | | if not os.path.exists(DATA_DIR_PATH): |
| | | os.makedirs(DATA_DIR_PATH) |
| | | |
| | | # 板块数据路径 |
| | | ALL_STOCKS_PLATE_PATH = f"{get_path_prefix()}/local_storage_data/all_stocks_plate_dict.json" |
| | | ALL_STOCKS_PLATE_PATH = f"{DATA_DIR_PATH}/all_stocks_plate_dict.json" |
| | | |
| | | # K线路径 |
| | | K_BARS_PATH = f"{get_path_prefix()}/local_storage_data/all_stocks_all_K_line_property_dict.json" |
| | | K_BARS_PATH = f"{DATA_DIR_PATH}/all_stocks_all_K_line_property_dict.json" |
| | | |
| | | KPL_LIMIT_UP_DATA_PATH = f"{DATA_DIR_PATH}/limit_up_block_date_data.json" |
| | | |
| | | |
| | | |
| | |
| | | kpl API数据获取与处理 |
| | | """ |
| | | import json |
| | | import os.path |
| | | import time |
| | | import datetime |
| | | |
| | |
| | | # 构建涨停信息读写对象 |
| | | class DailyLimitUpInfoStorageManager: |
| | | # 初始化文件路径 |
| | | def __init__(self, file_path='local_storage_data/limit_up_block_date_data.jsonl'): |
| | | def __init__(self, file_path=constant.KPL_LIMIT_UP_DATA_PATH): |
| | | self.file_path = file_path |
| | | |
| | | # 添加单日涨停信息数据到文件中的一行 函数 |
| | | def append_data_to_file(self, data_to_append): |
| | | # print(f"data_to_append=={data_to_append}") |
| | | # 读取所有行并解析为 JSON 对象列表 |
| | | if os.path.exists(self.file_path): |
| | | with open(self.file_path, 'r', encoding='utf-8') as file: |
| | | # 获取当前日期并格式化 |
| | | current_date = datetime.datetime.now().strftime('%Y-%m-%d') |