# coding=utf-8
|
from __future__ import print_function, absolute_import, unicode_literals
|
import logging
|
import json
|
import multiprocessing
|
# import multiprocessing
|
# from log import logger
|
import threading
|
import time
|
# 引入掘金桥梁API
|
import utils.juejin_api
|
# 引入开盘啦API模块
|
# 引入全局变量模块
|
# 引入定时器模块
|
# 引入历史K线方法模块
|
# 引入瞬时分时行情模块
|
# 引入账户管理模块【进行资金和仓位管理】
|
from strategy import kpl_api, data_cache, check_timer, all_K_line, instant_time_market, account_management
|
from huaxin_client import l2_market_client
|
from log_module import async_log_util
|
# 引入日志模块
|
from strategy.logging_config import get_logger
|
from strategy import order_methods
|
from trade import huaxin_trade_data_update
|
from utils import hx_qc_value_util
|
|
# 获取logger实例
|
logger = get_logger()
|
# 引入行情订阅模块
|
# import subscribe_market
|
|
# 全局日志配置 如果不想用就把 logging.ERROR 如果要打就=logging.INFO
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
|
|
# 计算当前线程数量
|
def get_current_thread_count():
|
# print(f"threading.active_count()=={threading.active_count()}")
|
return threading.active_count()
|
|
|
# 初始化数据 函数
|
def init_data():
|
# logging.info("main初始化数据开始")
|
logger.info(f"main初始化数据开始")
|
# 初始化所有目标票标的信息
|
data_cache.all_stocks = utils.juejin_api.JueJinApi.get_target_codes()
|
# 获取K线
|
data_cache.DataCache()
|
|
all_K_line.k_line_history.init(data_cache.DataCache().today_date, data_cache.DataCache().next_trading_day,
|
data_cache.DataCache().filtered_stocks)
|
|
# 调用指标K线写入本地文件
|
all_K_line.all_stocks_all_k_line_dict_write()
|
|
# 先使用json.load()直接从文件中读取【已经存储在本地的K线指标属性字典】并解析JSON数据
|
with open('strategy/local_storage_data/all_stocks_all_K_line_property_dict.json', 'r', encoding='utf-8') as f:
|
data_cache.all_stocks_all_K_line_property_dict = json.load(f)
|
print(
|
f"data_cache.all_stocks_all_K_line_property_dict的个数==={len(data_cache.all_stocks_all_K_line_property_dict)}")
|
|
# current_data_info = current(symbols='SHSE.603839', fields='open')
|
# print(f"current_data_info==={current_data_info}")
|
|
|
# 第一步:初始化context函数,并开启获取实时数据的线程
|
def init():
|
# 在初始化函数里面就调用全局化变量函数(即上文缩写的init_data函数)
|
init_data()
|
# 实时运行定时器线程【定时器函数目前 只管理 15:00 后运行一次 整理当日涨停信息 和 获取所有个股的板块概念】
|
threading.Thread(target=lambda: check_timer.check_time(), daemon=True).start()
|
# 获取实时大盘行情情绪综合强度 [分数] 线程
|
threading.Thread(target=lambda: kpl_api.get_real_time_market_strong(), daemon=True).start()
|
# 实时检测是否拉取K线线程
|
threading.Thread(target=lambda: all_K_line.check_time_and_data_date(), daemon=True).start()
|
# print(f"all_stocks_all_K_line_property_dict== {type(data_cache.all_stocks_all_K_line_property_dict)}")
|
|
'''
|
目前设想,买只需要进行账户管理,卖才需要仓位管理,而卖在通过行情订阅实现
|
并且行情订阅在另一个进程里面,所以目前不需要同时调用
|
# 初始化资金管理,下单买逻辑才会有数据
|
account_management.position_management(context)
|
# 初始化仓位管理,下单卖逻辑才会有数据
|
account_management.position_management(context)
|
'''
|
|
# TODO 此处注释,要替换context
|
# threading.Thread(target=account_management.finance_management_process, args=(context,), daemon=True).start()
|
# threading.Thread(target=account_management.position_management_process, args=(context,), daemon=True).start()
|
|
# 开盘啦的涨停概念的回调函数
|
def kpl_limit_up_process(datas):
|
# print(f"回调成功==={datas}")
|
data_cache.limit_up_block_names = datas
|
|
# # 计算当前线程数量
|
# get_current_thread_count()
|
|
# 开启开盘啦 涨停列表 和 全盘个股概念板块 接口线程
|
# 涨停概念线程
|
# threading.Thread(target=kpl_api.kpl_limit_up_process, daemon=True).start() #该行代码为只运行单一线程不回调数据的方式
|
threading.Thread(target=kpl_api.kpl_limit_up_process, args=(kpl_limit_up_process,), daemon=True).start()
|
|
# # 开盘啦的板块强度下的个股强度回调函数
|
def get_market_sift_plate_its_stock_power_process(market_sift_plate_stock_dict):
|
# print(f"回调成功===精选板块股票强度数据更新==={market_sift_plate_stock_dict}")
|
data_cache.market_sift_plate_stock_dict = market_sift_plate_stock_dict
|
|
# 板块强度下个股强度线程
|
threading.Thread(target=kpl_api.get_market_sift_plate_its_stock_power_process,
|
args=(get_market_sift_plate_its_stock_power_process,), daemon=True).start()
|
|
# 行情订阅,沪深300指数【在on_tick函数中去打印tick 数据】
|
# subscribe('SHSE.000300', frequency='tick', count=1, unsubscribe_previous=False)
|
|
# 初始化get_current_data方法函数,下单买逻辑才会运行中。。。【核心主线程,随时考虑其启动顺序】>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
|
# threading.Thread(target=lambda: instant_time_market.get_current_data(), daemon=True).start()
|
|
# 计算开盘啦昨日拉取的概念数据中为空的股票数量
|
kpl_api.get_have_no_plate_num()
|
|
# 获取历史涨停信息数据并整理
|
kpl_api.get_handling_limit_up_info()
|
|
# 检查K线昨日涨停是否正确
|
all_K_line.check_limit_up_attribute()
|
|
# # 获取所有个股的板块概念并写入文件【耗时较长应该放在 核心主线程 和 仓位管理 后面】
|
# kpl_api.get_all_stocks_plate_dict(data_cache.min_stocks)
|
|
|
# 委托状态更新事件
|
def on_order_status(context, order):
|
# 可在后面执行其他处理逻辑
|
logger.info(f"context====main————{context}")
|
logger.info(f"order====main————{order}")
|
|
|
# 委托执行回报事件
|
def on_execution_report(context, execrpt):
|
logger.info(f"execrpt>context===={context}")
|
logger.info(f"execrpt===={execrpt}")
|
|
|
# 交易账户状态更新事件 (交易账户状态对象,仅响应 已连接,已登录,已断开 和 错误 事件。)
|
def on_account_status(context, account):
|
logger.info(f"account>context===={context}")
|
logger.info(f"account===={account}")
|
logger.info(account)
|
|
|
# 第三步:执行策略的初始设置
|
if __name__ == '__main__':
|
class MyMarketDataCallback(l2_market_client.L2MarketDataCallback):
|
def on_markets(self, datas):
|
"""
|
L1数据回调
|
:param datas:
|
:return:
|
"""
|
data_cache.latest_code_market_info_dict = {x[0]: x for x in datas}
|
instant_time_market.process_current_infos(datas)
|
|
|
# 启动异步日志
|
threading.Thread(target=async_log_util.run_sync, daemon=True).start()
|
|
# 启动交易
|
order_methods.run()
|
|
# 运行华鑫增值服务进程,用于获取K线与交易日历
|
threading.Thread(target=hx_qc_value_util.run, daemon=True).start()
|
|
# 运行交易数据更新服务
|
huaxin_trade_data_update.run()
|
|
# 等待5s,等其他线程/进程启动完毕
|
time.sleep(5)
|
|
# 初始化数据
|
init()
|
|
# 需要订阅的目标代码
|
target_codes = [x["sec_id"] for x in data_cache.DataCache().all_stocks]
|
|
# 订阅L2 market行情
|
l2_market_client.run(target_codes, MyMarketDataCallback())
|