""" 代码信息对外输出 """ # score_info 得分信息 # 下单参数信息 # 选股宝 # 市场热度 import sys import time import code_attribute from code_attribute import gpcode_manager, code_price_manager from log_module import log_export from third_data import kpl_data_manager, kpl_api from third_data.kpl_data_manager import KPLLimitUpDataRecordManager from trade import l2_trade_util from utils import global_util base_output_content = {} kpl_block_info_dict = {} __kplDataManager = kpl_data_manager.KPLDataManager() def __get_base_html_content(): print("路径", sys.path[0]) if base_output_content.get('css') is None: __base_html_content = "" with open("./output/css/style.css", mode='r') as f: lines = f.readlines() for line in lines: __base_html_content += line base_output_content['css'] = __base_html_content return f"" def money_desc(money): if abs(money) > 100000000: return f"{round(money / 100000000, 2)}亿" else: return f"{round(money / 10000, 2)}万" def get_output_params(code, jingxuan_cache_dict, industry_cache_dict): __start_time = time.time() def format_plate_output(_plat): if _plat in jingxuan_cache_dict: return _plat, money_desc(jingxuan_cache_dict[_plat][3]) elif _plat in industry_cache_dict: return _plat, money_desc(industry_cache_dict[_plat][3]) else: return _plat, '' params = { "base_url": "http://192.168.3.252/kp/", } code_extra_infos = [] if l2_trade_util.BlackListCodeManager().is_in(code): code_extra_infos.append("黑名单") if l2_trade_util.WhiteListCodeManager().is_in(code): code_extra_infos.append("白名单") # 获取白名单,黑名单 if code_attribute.gpcode_manager.WantBuyCodesManager().is_in_cache(code): code_extra_infos.append("想买单") if code_attribute.gpcode_manager.PauseBuyCodesManager().is_in_cache(code): code_extra_infos.append("暂不买") params["code"] = code params["code_name"] = f"{gpcode_manager.get_code_name(code)} {code} ({','.join(code_extra_infos)})" score_info = None buy_params_info = None ##############################主动买,被动买################################## # 返回主动买,被动买,不买的列表(代码, 名称, 得分, 是否涨停) params["initiative_buy_codes"] = [] params["passive_buy_codes"] = [] params["passive_buy_codes"] = params["passive_buy_codes"] __start_time = time.time() trade_info = __load_trade_record(code) params["trade_record"] = {"open_limit_up": trade_info[0], "records": trade_info[2]} __start_time = time.time() ##############################开盘啦相关信息################################## industry = global_util.code_industry_map.get(code) params["kpl_code_info"] = {"industry": format_plate_output(industry)} # 获取开盘啦板块 if code not in kpl_block_info_dict: plate_info = kpl_api.getStockIDPlate(code) else: plate_info = kpl_block_info_dict.get(code) if plate_info: kpl_block_info_dict[code] = plate_info plate_info.sort(key=lambda x: x[2]) plate_info.reverse() params["kpl_code_info"]["plate"] = [(k[0], k[1], k[2], format_plate_output(k[1])[1]) for k in plate_info] __start_time = time.time() # 获取代码的历史涨停数据,(涨停原因,日期,板块) code_records = KPLLimitUpDataRecordManager.get_latest_infos(code, 4, False)[:2] if code_records: code_records = [(format_plate_output(k[0]), k[1], [format_plate_output(k1) for k1 in k[2].split("、")]) for k in code_records] params["kpl_code_info"]["code_records"] = code_records if not KPLLimitUpDataRecordManager.total_datas: KPLLimitUpDataRecordManager.load_total_datas() for d in KPLLimitUpDataRecordManager.total_datas: if d[3] == code: # 获取今日 plates = d[6].split("、") plates = [format_plate_output(p) for p in plates] params["kpl_code_info"]["today"] = (format_plate_output(d[2]), d[1], plates) break __start_time = time.time() return params def __load_trade_record(code): def format_l2_data(item): return f"{item['val']['time']}#{item['val']['num']}手#{round(item['val']['num'] * float(item['val']['price']) * 100 / 10000, 1)}万" # 获取炸板信息 limit_up_info = code_price_manager.Buy1PriceManager().get_limit_up_info(code) break_time = limit_up_info[1] records = [] try: records = log_export.load_buy_score_recod(code) except: pass records_new = [] records_new_data = [] return break_time, records_new, records_new_data