| | |
| | | import logging |
| | | import time |
| | | import datetime |
| | | |
| | | import dask |
| | | |
| | | import utils |
| | | from log_module.log import logger_common |
| | | from log_module import async_log_util |
| | | from log_module.log import logger_common, logger_debug |
| | | from strategy.trade_setting import TradeSetting |
| | | from utils import huaxin_util, tool |
| | | # 引入华鑫API(小辉整理) |
| | |
| | | # logger.info(f"【没有】在集合竞价内启动,采用【掘金数据】记录") |
| | | print(f"【没有】在开盘前内启动,采用【掘金数据】记录 开盘价") |
| | | data_cache.record_current_open_execution = True |
| | | current_datas = utils.juejin_api.JueJinApi.get_codes_open(data_cache.DataCache().min_stocks, fields='symbol,open') |
| | | current_datas = utils.juejin_api.JueJinApi.get_codes_open(data_cache.DataCache().min_stocks, |
| | | fields='symbol,open') |
| | | # print(f"current_datas=={current_datas}") |
| | | for current_data in current_datas: |
| | | # print(f"current_data=={current_data}") |
| | |
| | | __current_high_and_low_dict = {} |
| | | |
| | | |
| | | # 生成所有个股的最高价、最低价 字典 |
| | | # |
| | | def get_all_stocks_current_high_and_low(current_infos): |
| | | """ |
| | | 生成所有个股的最高价、最低价 字典 |
| | | :param current_infos: |
| | | :return: |
| | | """ |
| | | # 获取当前时间 |
| | | now_time = tool.get_now_time_str() |
| | | # 如果当前时间大于09:25:06才运行最高价和最低价的运算 |
| | |
| | | # print(f"current_info=={current_infos}") |
| | | for current_info in current_infos: |
| | | symbol = basic_methods.format_stock_symbol(current_info[0]) #股票代码 |
| | | pre_close = current_info[1] # 昨日收盘价 |
| | | current_price = current_info[2] # 最新价 |
| | | current_quotes_buy_1_price = current_info[5][0][0] #买一价格 |
| | | pre_close = current_info[1] # 昨日收盘价 |
| | | current_price = current_info[2] # 最新价 |
| | | current_quotes_buy_1_price = current_info[5][0][0] #买一价格 |
| | | price_tracker = __current_high_or_low_dict.get(symbol) |
| | | if not price_tracker: |
| | | # 赋初值 |
| | |
| | | # print(f"current_price>>>>>>==={current_price}") |
| | | if current_price is not None: |
| | | # 为避免L1数据中最新价偶发为0,在最新价为0时使用买一价记录 |
| | | if current_price != 0 or current_price != 0.0: |
| | | if current_price > 0: |
| | | # logger.info( |
| | | # f"《current_price 不为空 也不为0.0 也不为0 当日当时最新价:{current_price}》") |
| | | get_current_high_or_low.set_current_price(current_price) |
| | | data_cache.all_stocks_current_high_and_low[symbol] = { |
| | | 'current_high': get_current_high_or_low.current_high, |
| | | 'current_low': get_current_high_or_low.current_low} |
| | | elif current_quotes_buy_1_price != 0 or current_quotes_buy_1_price != 0.0: |
| | | elif current_quotes_buy_1_price > 0: |
| | | # logger.info( |
| | | # f"代码:{symbol}::《current_price 未获取成功 或 值为0.0 零食采用买一价作为最新价,买一价:{current_quotes_buy_1_price}》 ") |
| | | get_current_high_or_low.set_current_price(current_quotes_buy_1_price) |
| | | data_cache.all_stocks_current_high_and_low[symbol] = { |
| | | 'current_high': get_current_high_or_low.current_high, |
| | | 'current_low': get_current_high_or_low.current_low} |
| | | elif pre_close != 0 or pre_close != 0.0: |
| | | elif pre_close > 0: |
| | | logger.info( |
| | | f"最新价和买一价获取失败或有误,获取到的当日当时最新价:{current_price},买一价:{current_quotes_buy_1_price}》,临时性采用昨收价作为最新价,昨收价:{pre_close}") |
| | | get_current_high_or_low.set_current_price(pre_close) |
| | |
| | | |
| | | # 把current_infos灌入相应的线程 |
| | | def set_current_info(current_infos): |
| | | @dask.delayed |
| | | def process_current_infos(current_info_list): |
| | | for current_info in current_info_list: |
| | | try: |
| | | if current_info is not None: |
| | | strategic_thread_manager(current_info) |
| | | except Exception as error: |
| | | logging.exception(error) |
| | | # print("异常:", current_info) |
| | | logger_debug.exception(error) |
| | | logger_debug.error(f"L1处理出错:{current_info}") |
| | | |
| | | @dask.delayed |
| | | def batch_process_current_infos(fs): |
| | | return fs |
| | | |
| | | logging.info(f"set_current_info进入") |
| | | now_start = time.time() |
| | | try: |
| | | now_start = time.time() |
| | | now_time = tool.get_now_time_str() |
| | | if len(current_infos) == 0 and now_time > data_cache.L1_DATA_START_TIME: |
| | | print(f"9:15后 l1数据为空=l1_data_current_infos===={current_infos}") |
| | |
| | | # print(f"i===={i}") |
| | | get_all_stocks_current_open(current_infos) |
| | | get_all_stocks_current_high_and_low(current_infos) |
| | | for current_info in current_infos: |
| | | try: |
| | | if current_info is not None: |
| | | strategic_thread_manager(current_info) |
| | | except Exception as error: |
| | | logging.exception(error) |
| | | # print("异常:", current_info) |
| | | if current_infos: |
| | | # 分批处理数据 |
| | | ds = [] |
| | | total_count = len(current_infos) |
| | | page = 5 |
| | | page_size = total_count // page + 1 |
| | | for p in range(page): |
| | | temp_list = current_infos[p * page_size:(p + 1) * page_size] |
| | | ds.append(process_current_infos(temp_list)) |
| | | dask_result = batch_process_current_infos(ds) |
| | | dask_result.compute() |
| | | now_end: float = time.time() |
| | | start_to_end = now_end - now_start |
| | | print(f"运行中=={round(start_to_end, 2)} 秒") |
| | |
| | | except Exception as error: |
| | | logging.exception(error) |
| | | finally: |
| | | pass |
| | | |
| | | async_log_util.info(logger_debug, f"L1处理时间:{time.time() - now_start}") |
| | | |
| | | # 仅仅用于测试数据进入策略后的数据情况 |
| | | # get_current_info() |