constant.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
main.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
strategy/all_K_line.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
strategy/data_cache.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
strategy/kpl_api.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 |
constant.py
@@ -56,5 +56,13 @@ return 'D:' if is_windows() else '/home/userzjj' # 板块数据路径 ALL_STOCKS_PLATE_PATH = f"{get_path_prefix()}/local_storage_data/all_stocks_plate_dict.json" # K线路径 K_BARS_PATH = f"{get_path_prefix()}/local_storage_data/all_stocks_all_K_line_property_dict.json" # 订阅L2代码数据 SUBSCRIPT_L2_CODES = set() main.py
@@ -3,10 +3,13 @@ import logging import json import multiprocessing import os.path # import multiprocessing # from log import logger import threading import time import constant # 引入掘金桥梁API import utils.juejin_api # 引入开盘啦API模块 @@ -55,10 +58,11 @@ all_K_line.all_stocks_all_k_line_dict_write() # 先使用json.load()直接从文件中读取【已经存储在本地的K线指标属性字典】并解析JSON数据 with open('strategy/local_storage_data/all_stocks_all_K_line_property_dict.json', 'r', encoding='utf-8') as f: data_cache.all_stocks_all_K_line_property_dict = json.load(f) print( f"data_cache.all_stocks_all_K_line_property_dict的个数==={len(data_cache.all_stocks_all_K_line_property_dict)}") if os.path.exists(constant.K_BARS_PATH): with open(constant.K_BARS_PATH, 'r', encoding='utf-8') as f: data_cache.all_stocks_all_K_line_property_dict = json.load(f) print( f"data_cache.all_stocks_all_K_line_property_dict的个数==={len(data_cache.all_stocks_all_K_line_property_dict)}") # current_data_info = current(symbols='SHSE.603839', fields='open') # print(f"current_data_info==={current_data_info}") strategy/all_K_line.py
@@ -378,7 +378,7 @@ try: json_data = json.dumps(convert_datetime(all_stocks_all_K_line_property_dict), ensure_ascii=False, indent=4) # 将转换后的JSON字符串写入文件 with open('local_storage_data/all_stocks_all_K_line_property_dict.json', 'w', encoding='utf-8') as f: with open(constant.K_BARS_PATH, 'w', encoding='utf-8') as f: f.write(json_data) except Exception as error: print(f"An error occurred while converting the data to JSON: {error}") @@ -451,11 +451,12 @@ # 调用指标K线写入本地文件 all_stocks_all_k_line_dict_write() # 读取已经获取到并存储在本地的所有票的指标K线字典,并赋值给data_cache全局缓存 with open('local_storage_data/all_stocks_all_K_line_property_dict.json', 'r', encoding='utf-8') as f: data_cache.all_stocks_all_K_line_property_dict = json.load(f) print( f"data_cache.all_stocks_all_K_line_property_dict的个数{len(data_cache.all_stocks_all_K_line_property_dict)}") if os.path.exists(constant.K_BARS_PATH): with open(constant.K_BARS_PATH, 'r', encoding='utf-8') as f: data_cache.all_stocks_all_K_line_property_dict = json.load(f) print( f"data_cache.all_stocks_all_K_line_property_dict的个数{len(data_cache.all_stocks_all_K_line_property_dict)}") else: check_today_date = check_data_date(data_cache.DataCache.today_date) @@ -469,11 +470,12 @@ # 调用指标K线写入本地文件 all_stocks_all_k_line_dict_write() # 读取已经获取到并存储在本地的所有票的指标K线字典,并赋值给data_cache全局缓存 with open('local_storage_data/all_stocks_all_K_line_property_dict.json', 'r', encoding='utf-8') as f: data_cache.all_stocks_all_K_line_property_dict = json.load(f) print( f"data_cache.all_stocks_all_K_line_property_dict的个数{len(data_cache.all_stocks_all_K_line_property_dict)}") if os.path.exists(constant.K_BARS_PATH): with open(constant.K_BARS_PATH, 'r', encoding='utf-8') as f: data_cache.all_stocks_all_K_line_property_dict = json.load(f) print( f"data_cache.all_stocks_all_K_line_property_dict的个数{len(data_cache.all_stocks_all_K_line_property_dict)}") except Exception as error: logging.exception(error) print(f"实时检测是否拉取K线线程报错An error occurred: {error}") strategy/data_cache.py
@@ -136,9 +136,8 @@ # 读取已经获取到并存储在本地的目标范围的个股的板块概念 # 读取JSON文件并解析为字典 stocks_plate_path = f"{constant.get_path_prefix()}/local_storage_data/all_stocks_plate_dict.json" if os.path.exists(stocks_plate_path): with open(stocks_plate_path, 'r', if os.path.exists(constant.ALL_STOCKS_PLATE_PATH): with open(constant.ALL_STOCKS_PLATE_PATH, 'r', encoding='utf-8') as f: json_data = f.read() else: strategy/kpl_api.py
@@ -7,6 +7,7 @@ import requests import constant # import requests from strategy import data_cache from strategy import basic_methods @@ -615,7 +616,7 @@ # 将字典转换为JSON格式的字符串 json_data = json.dumps(all_stocks_plate_dict) # 写入文件 with open('local_storage_data/all_stocks_plate_dict.json', 'w', encoding='utf-8') as f: with open(constant.ALL_STOCKS_PLATE_PATH, 'w', encoding='utf-8') as f: f.write(json_data) now_time = datetime.datetime.now() # 获取本机时间 logger.info(f"写入所有个股板块文件完成!::{now_time}")