| | |
| | | import logging |
| | | import time |
| | | import datetime |
| | | |
| | | import dask |
| | | |
| | | import utils |
| | | from log_module.log import logger_common |
| | | from utils import huaxin_util |
| | | from log_module import async_log_util |
| | | from log_module.log import logger_common, logger_debug |
| | | from strategy.trade_setting import TradeSetting |
| | | from utils import huaxin_util, tool |
| | | # 引入华鑫API(小辉整理) |
| | | from strategy import l1_data_api |
| | | from strategy import data_cache |
| | | from strategy import basic_methods |
| | | from strategy import buying_strategy, selling_strategy, index_market_trend_strategy |
| | | from strategy import buying_strategy, selling_strategy, market_sentiment_analysis |
| | | |
| | | # from low_suction.shared_memory_util import SharedMemoryObj |
| | | |
| | |
| | | def index_market_current(): |
| | | logging.info(f"index_market_trend进入") |
| | | while True: |
| | | if not TradeSetting().get_running(): |
| | | # 已经暂停 |
| | | time.sleep(1) |
| | | continue |
| | | try: |
| | | # 在data_cache中获取到推送过来的实时指数行情数据 |
| | | stock_index_dict = data_cache.stock_index_dict |
| | | now_time = datetime.datetime.now().strftime("%H:%M:%S") |
| | | if len(stock_index_dict) == 0 and data_cache.L1_data_start_time < now_time < data_cache.closing_time: |
| | | now_time = tool.get_now_time_str() |
| | | if len(stock_index_dict) == 0 and data_cache.L1_DATA_START_TIME < now_time < data_cache.CLOSING_TIME: |
| | | print(f"9:15--15:00 实时指数数据为空===={stock_index_dict}") |
| | | index_judge_thread_manager(stock_index_dict) |
| | | except Exception as error: |
| | |
| | | if index_market_info is not None: |
| | | # 调用交易策略模块中的涨幅视界策略 |
| | | # 指数行情调用 |
| | | index_market_trend_strategy.instant_trend_strategy(index_market_info) |
| | | market_sentiment_analysis.instant_trend_strategy(index_market_info) |
| | | |
| | | |
| | | # 生成所有个股的开盘价字典 |
| | | def get_all_stocks_current_open(current_infos): |
| | | pass |
| | | # 获取当前时间 |
| | | now_time = datetime.datetime.now().strftime("%H:%M:%S") |
| | | now_time = tool.get_now_time_str() |
| | | # 如果当前时间大于09:25:06才运行最高价和最低价的运算 |
| | | if data_cache.later_open_bidding_time < now_time: |
| | | # if data_cache.after_closing_time < now_time: |
| | | if now_time < data_cache.opening_time and data_cache.record_current_open_execution is False: |
| | | # if now_time < data_cache.after_closing_time: |
| | | if data_cache.LATER_OPEN_BIDDING_TIME < now_time: |
| | | # if data_cache.AFTER_CLOSING_TIME < now_time: |
| | | if now_time < data_cache.OPENING_TIME and data_cache.record_current_open_execution is False: |
| | | # if now_time < data_cache.AFTER_CLOSING_TIME: |
| | | logger.info(f"在开盘前启动,采用【华鑫数据】记录 开盘价") |
| | | data_cache.record_current_open_execution = True |
| | | # print(f"current_info=={current_infos}") |
| | |
| | | # logger.info(f"【没有】在集合竞价内启动,采用【掘金数据】记录") |
| | | print(f"【没有】在开盘前内启动,采用【掘金数据】记录 开盘价") |
| | | data_cache.record_current_open_execution = True |
| | | current_datas = utils.juejin_api.JueJinApi.get_codes_open(data_cache.DataCache().filtered_stocks, fields='symbol,open') |
| | | current_datas = utils.juejin_api.JueJinApi.get_codes_open(data_cache.DataCache().min_stocks, |
| | | fields='symbol,open') |
| | | # print(f"current_datas=={current_datas}") |
| | | for current_data in current_datas: |
| | | # print(f"current_data=={current_data}") |
| | |
| | | __current_high_and_low_dict = {} |
| | | |
| | | |
| | | # 生成所有个股的最高价、最低价 字典 |
| | | # |
| | | def get_all_stocks_current_high_and_low(current_infos): |
| | | """ |
| | | 生成所有个股的最高价、最低价 字典 |
| | | :param current_infos: |
| | | :return: |
| | | """ |
| | | # 获取当前时间 |
| | | now_time = datetime.datetime.now().strftime("%H:%M:%S") |
| | | now_time = tool.get_now_time_str() |
| | | # 如果当前时间大于09:25:06才运行最高价和最低价的运算 |
| | | if data_cache.later_open_bidding_time < now_time: |
| | | # if data_cache.after_closing_time < now_time: |
| | | if data_cache.Local_startup_time < data_cache.opening_time: |
| | | if data_cache.LATER_OPEN_BIDDING_TIME < now_time: |
| | | # if data_cache.AFTER_CLOSING_TIME < now_time: |
| | | if data_cache.now_time < data_cache.OPENING_TIME: |
| | | logger.info(f"【在】开盘前启动,采用【华鑫数据】记录 最高最低价") |
| | | # # 如果在9:30前启动,则只采用【华鑫数据】记录 最高最低价 |
| | | # while True: |
| | | # print(f"current_info=={current_infos}") |
| | | for current_info in current_infos: |
| | | symbol = basic_methods.format_stock_symbol(current_info[0]) #股票代码 |
| | | pre_close = current_info[1] # 昨日收盘价 |
| | | current_price = current_info[2] # 最新价 |
| | | current_quotes_buy_1_price = current_info[5][0][0] #买一价格 |
| | | pre_close = current_info[1] # 昨日收盘价 |
| | | current_price = current_info[2] # 最新价 |
| | | current_quotes_buy_1_price = current_info[5][0][0] #买一价格 |
| | | price_tracker = __current_high_or_low_dict.get(symbol) |
| | | if not price_tracker: |
| | | # 赋初值 |
| | |
| | | # print(f"current_price>>>>>>==={current_price}") |
| | | if current_price is not None: |
| | | # 为避免L1数据中最新价偶发为0,在最新价为0时使用买一价记录 |
| | | if current_price != 0 or current_price != 0.0: |
| | | if current_price > 0: |
| | | # logger.info( |
| | | # f"《current_price 不为空 也不为0.0 也不为0 当日当时最新价:{current_price}》") |
| | | get_current_high_or_low.set_current_price(current_price) |
| | | data_cache.all_stocks_current_high_and_low[symbol] = { |
| | | 'current_high': get_current_high_or_low.current_high, |
| | | 'current_low': get_current_high_or_low.current_low} |
| | | elif current_quotes_buy_1_price != 0 or current_quotes_buy_1_price != 0.0: |
| | | elif current_quotes_buy_1_price > 0: |
| | | # logger.info( |
| | | # f"代码:{symbol}::《current_price 未获取成功 或 值为0.0 零食采用买一价作为最新价,买一价:{current_quotes_buy_1_price}》 ") |
| | | get_current_high_or_low.set_current_price(current_quotes_buy_1_price) |
| | | data_cache.all_stocks_current_high_and_low[symbol] = { |
| | | 'current_high': get_current_high_or_low.current_high, |
| | | 'current_low': get_current_high_or_low.current_low} |
| | | elif pre_close != 0 or pre_close != 0.0: |
| | | elif pre_close > 0: |
| | | logger.info( |
| | | f"最新价和买一价获取失败或有误,获取到的当日当时最新价:{current_price},买一价:{current_quotes_buy_1_price}》,临时性采用昨收价作为最新价,昨收价:{pre_close}") |
| | | get_current_high_or_low.set_current_price(pre_close) |
| | |
| | | # print(f"当前时间更新时间:{now_time}") |
| | | if not __current_high_or_low_dict: |
| | | # 还没初始化 |
| | | # current_datas = current(symbols=data_cache.DataCache().filtered_stocks, fields='symbol,high,low') |
| | | # current_datas = current(symbols=data_cache.DataCache().min_stocks, fields='symbol,high,low') |
| | | current_datas = utils.juejin_api.JueJinApi.get_codes_high_and_low( |
| | | data_cache.DataCache().filtered_stocks, fields='symbol,high,low') |
| | | data_cache.DataCache().min_stocks, fields='symbol,high,low') |
| | | # print(f"current_datas=={current_datas}") |
| | | for current_data in current_datas: |
| | | symbol, high, low = current_data['symbol'], current_data['high'], current_data['low'] |
| | |
| | | for current_info in current_infos: |
| | | # print(f"开始循环current_infos") |
| | | symbol = basic_methods.format_stock_symbol(current_info[0]) |
| | | if symbol not in data_cache.DataCache().filtered_stocks: |
| | | if symbol not in data_cache.DataCache().min_stocks: |
| | | continue |
| | | # if symbol.find("300810") > 0: |
| | | # print(f"开始循环current_infos:"+symbol) |
| | |
| | | # 初始化 |
| | | # current_datas = current(symbols=[symbol], fields='symbol,high,low') |
| | | current_datas = utils.juejin_api.JueJinApi.get_codes_high_and_low( |
| | | data_cache.DataCache().filtered_stocks, fields='symbol,high,low') |
| | | data_cache.DataCache().min_stocks, fields='symbol,high,low') |
| | | current_data = current_datas[0] |
| | | # print(f"开始实例化对象") |
| | | symbol, high, low = current_data['symbol'], current_data['high'], current_data['low'] |
| | |
| | | else: |
| | | return None |
| | | |
| | | |
| | | # 获取当前L1行情数据 |
| | | def get_current_info(): |
| | | logging.info(f"get_current_info进入") |
| | |
| | | try: |
| | | now_start = time.time() |
| | | current_infos = l1_data_api.get_current_info() |
| | | now_time = datetime.datetime.now().strftime("%H:%M:%S") |
| | | if len(current_infos) == 0 and now_time > data_cache.L1_data_start_time: |
| | | now_time = tool.get_now_time_str() |
| | | if len(current_infos) == 0 and now_time > data_cache.L1_DATA_START_TIME: |
| | | print(f"9:15后 l1数据为空=l1_data_current_infos===={current_infos}") |
| | | # for i in current_infos: |
| | | # if i[0] == '000001': |
| | | # print(f"i===={i}") |
| | | get_all_stocks_current_open(current_infos) |
| | | get_all_stocks_current_high_and_low(current_infos) |
| | | |
| | | # 保存现价 |
| | | for current_info in current_infos: |
| | | data_cache.current_l1_dict[current_info[0]] = current_info |
| | | |
| | | for current_info in current_infos: |
| | | try: |
| | | if current_info is not None: |
| | |
| | | time.sleep(0.5) |
| | | |
| | | |
| | | |
| | | |
| | | # 把current_infos灌入相应的线程 |
| | | def set_current_info(current_infos): |
| | | # @dask.delayed |
| | | def process_current_infos(current_info_list): |
| | | __start_time = time.time() |
| | | use_time_list = [] |
| | | for current_info in current_info_list: |
| | | try: |
| | | if current_info is not None: |
| | | _start_time = time.time() |
| | | strategic_thread_manager(current_info) |
| | | use_time_list.append((time.time() - _start_time, current_info[0])) |
| | | except Exception as error: |
| | | logging.exception(error) |
| | | # print("异常:", current_info) |
| | | logger_debug.exception(error) |
| | | logger_debug.error(f"L1处理出错:{current_info}") |
| | | use_time = time.time() - __start_time |
| | | if use_time > 0.5: |
| | | # 记录超过1s的数据 |
| | | async_log_util.info(logger_debug, "L1数据处理时间统计:thread-{} 总计用时-{} 平均耗时-{} 最大耗时-{}", |
| | | tool.get_thread_id(), use_time, sum([x[0] for x in use_time_list]) / len(use_time_list), |
| | | max(use_time_list, key=lambda e: e[0])) |
| | | |
| | | # @dask.delayed |
| | | # def batch_process_current_infos(fs): |
| | | # return fs |
| | | |
| | | logging.info(f"set_current_info进入") |
| | | now_start = time.time() |
| | | try: |
| | | now_start = time.time() |
| | | now_time = datetime.datetime.now().strftime("%H:%M:%S") |
| | | if len(current_infos) == 0 and now_time > data_cache.L1_data_start_time: |
| | | now_time = tool.get_now_time_str() |
| | | if len(current_infos) == 0 and now_time > data_cache.L1_DATA_START_TIME: |
| | | print(f"9:15后 l1数据为空=l1_data_current_infos===={current_infos}") |
| | | # for i in current_infos: |
| | | # if i[0] == '000001': |
| | | # print(f"i===={i}") |
| | | get_all_stocks_current_open(current_infos) |
| | | get_all_stocks_current_high_and_low(current_infos) |
| | | for current_info in current_infos: |
| | | try: |
| | | if current_info is not None: |
| | | strategic_thread_manager(current_info) |
| | | except Exception as error: |
| | | logging.exception(error) |
| | | # print("异常:", current_info) |
| | | if current_infos: |
| | | # 保存现价 |
| | | for current_info in current_infos: |
| | | data_cache.current_l1_dict[current_info[0]] = current_info |
| | | |
| | | # 分批处理数据 |
| | | # ds = [] |
| | | # total_count = len(current_infos) |
| | | # page = 2 |
| | | # page_size = total_count // page + 1 |
| | | # for p in range(page): |
| | | # temp_list = current_infos[p * page_size:(p + 1) * page_size] |
| | | # ds.append(process_current_infos(temp_list)) |
| | | # dask_result = batch_process_current_infos(ds) |
| | | # dask_result.compute() |
| | | process_current_infos(current_infos) |
| | | now_end: float = time.time() |
| | | start_to_end = now_end - now_start |
| | | print(f"运行中=={round(start_to_end, 2)} 秒") |
| | |
| | | except Exception as error: |
| | | logging.exception(error) |
| | | finally: |
| | | pass |
| | | |
| | | async_log_util.info(logger_debug, f"L1处理时间:{time.time() - now_start}") |
| | | |
| | | # 仅仅用于测试数据进入策略后的数据情况 |
| | | # get_current_info() |