""" 代码信息对外输出 """ # score_info 得分信息 # 下单参数信息 # 选股宝 # 市场热度 import logging import sys import time import code_attribute from code_attribute import code_volumn_manager, limit_up_time_manager, global_data_loader, gpcode_manager, \ code_nature_analyse from l2.l2_data_manager import OrderBeginPosInfo from l2.l2_data_util import L2DataUtil from third_data.code_plate_key_manager import KPLCodeJXBlockManager from third_data.kpl_data_constant import LimitUpCodesBlockRecordManager from trade.buy_radical.block_special_codes_manager import BlockSpecialCodesManager from utils import global_util, tool from log_module import log, log_export from l2 import l2_data_manager, l2_data_util, transaction_progress, l2_data_manager_new, code_price_manager from cancel_strategy.s_l_h_cancel_strategy import HourCancelBigNumComputer import l2.l2_data_manager_new from third_data import kpl_data_manager, kpl_api from third_data.kpl_data_manager import KPLLimitUpDataRecordManager from trade import l2_trade_factor, trade_manager, l2_trade_util, trade_record_log_util, trade_constant from trade.l2_trade_factor import L2TradeFactorUtil base_output_content = {} kpl_block_info_dict = {} __kplDataManager = kpl_data_manager.KPLDataManager() def __get_base_html_content(): if base_output_content.get('css') is None: __base_html_content = "" with open("./output/css/style.css", mode='r') as f: lines = f.readlines() for line in lines: __base_html_content += line base_output_content['css'] = __base_html_content return f"" def money_desc(money): if abs(money) > 100000000: return f"{round(money / 100000000, 2)}亿" else: return f"{round(money / 10000, 2)}万" def get_output_params(code, jingxuan_cache_dict, industry_cache_dict): __start_time = time.time() def format_plate_output(_plat): if _plat in jingxuan_cache_dict: return _plat, money_desc(jingxuan_cache_dict[_plat][3]) elif _plat in industry_cache_dict: return _plat, money_desc(industry_cache_dict[_plat][3]) else: return _plat, '' params = { "base_url": "http://192.168.3.252/kp/", } day = tool.get_now_date_str() is_target_code = gpcode_manager.FirstCodeManager().is_in_first_record_cache(code) code_extra_infos = [] if gpcode_manager.BlackListCodeManager().is_in_cache(code): code_extra_infos.append("黑名单") if gpcode_manager.WhiteListCodeManager().is_in_cache(code): code_extra_infos.append("白名单") # 获取白名单,黑名单 if code_attribute.gpcode_manager.WantBuyCodesManager().is_in_cache(code): code_extra_infos.append("想买单") if code_attribute.gpcode_manager.PauseBuyCodesManager().is_in_cache(code): code_extra_infos.append("暂不买") params["code"] = code params["code_name"] = f"{gpcode_manager.get_code_name(code)} {code} ({','.join(code_extra_infos)})" total_datas = l2_data_util.local_today_datas.get(code) if total_datas is None: total_datas = [] # l2_data_util.load_l2_data(code) # total_datas = l2_data_util.local_today_datas.get(code) if is_target_code: limit_up_price = gpcode_manager.get_limit_up_price(code) limit_up_time = limit_up_time_manager.LimitUpTimeManager().get_limit_up_time_cache(code) volume_rate, volume_info = code_volumn_manager.CodeVolumeManager().get_volume_rate(code, with_info=True) ################################买前评分################################ # zyltgb, limit_price, bidding, k_form, code_nature, hot_block, volume_rate, limit_up_time, # deal_big_money # log.logger_debug.info(f"板块热度耗时:{time.time() - __start_time}") __start_time = time.time() ###############################下单信息############################### params["trade_data"] = {} # 获取买入意愿 __L2PlaceOrderParamsManager = l2_trade_factor.L2PlaceOrderParamsManager(code, True, volume_rate, code_volumn_manager.CodeVolumeManager().get_volume_rate_index( volume_rate), None) # 是否可以买入的信息 try: can_buy_info = l2.l2_data_manager_new.L2TradeDataProcessor.can_buy_first_new(code, limit_up_price) params["trade_data"]["can_buy_info"] = can_buy_info except: pass # 获取量参考日期 try: volume_refer_date, volume_refer_date_distance = code_volumn_manager.CodeVolumeManager().get_volume_refer_date( code) params["trade_data"]["volume_refer_date"] = volume_refer_date except: pass # 获取是否 k_format = code_nature_analyse.CodeNatureRecordManager().get_k_format_cache(code) if k_format: params["trade_data"]["special_info"] = k_format[8] __base_L2PlaceOrderParamsManager = l2_trade_factor.L2PlaceOrderParamsManager(code, False, volume_rate, code_volumn_manager.CodeVolumeManager().get_volume_rate_index( volume_rate), None) if -1 < __L2PlaceOrderParamsManager.score_index < 3: params["trade_data"]["star"] = {"desc": "主动买入"} if __L2PlaceOrderParamsManager.score_index == 0: params["trade_data"]["star"]["count"] = 3 elif __L2PlaceOrderParamsManager.score_index == 1: params["trade_data"]["star"]["count"] = 2 else: params["trade_data"]["star"]["count"] = 1 elif __L2PlaceOrderParamsManager.score_index < 0: params["trade_data"]["star"] = {"desc": "不执行买入", "count": 0} else: params["trade_data"]["star"] = {"desc": "被动买入", "count": 0} # 安全笔数 safe_count = __L2PlaceOrderParamsManager.get_safe_count() base_safe_count, min_count, max_count = L2TradeFactorUtil.get_safe_buy_count(code, True) params["trade_data"]["safe_count"] = {"base": base_safe_count, "now": safe_count} # 动态M值 m = __L2PlaceOrderParamsManager.get_m_val() zyltgb = global_util.zyltgb_map.get(code) if zyltgb is None: global_data_loader.load_zyltgb() zyltgb = global_util.zyltgb_map.get(code) base_m = L2TradeFactorUtil.get_base_safe_val(zyltgb) params["trade_data"]["m_val"] = {"base": base_m // 10000, "now": m[0] // 10000} # 买前大单 big_num = __L2PlaceOrderParamsManager.get_big_num_count() base_big_num = __base_L2PlaceOrderParamsManager.get_big_num_count() params["trade_data"]["big_num"] = {"base": base_big_num, "now": big_num} # 成交进度 trade_progress, is_default = transaction_progress.TradeBuyQueue().get_traded_index(code) if trade_progress is None or trade_progress < 0 or is_default: # buy_params_info.append("未识别") pass else: if trade_progress < len(total_datas): trade_progress_datas = [] for min_money in [30000, 20000, 10000]: for i in range(trade_progress - 1, -1, -1): # 是否为涨停买 data = total_datas[i] if L2DataUtil.is_limit_up_price_buy(total_datas[i]['val']): if data['val']['num'] * float(data['val']['price']) > min_money: trade_progress_datas.append({"time": data['val']['time'], "num": data['val']['num'], "money": money_desc(round( data['val']['num'] * float(data['val']['price']) * 100))}) break if trade_progress_datas: break data = total_datas[trade_progress] trade_progress_datas.append({"time": data['val']['time'], "num": data['val']['num'], "money": money_desc(round( data['val']['num'] * float(data['val']['price']) * 100))}) params["trade_data"]["trade_progress"] = trade_progress_datas # 买入信号 order_begin_pos = l2_data_manager.TradePointManager().get_buy_compute_start_data_cache( code) if order_begin_pos.buy_single_index is None: # buy_params_info.append("无信号") pass else: data = total_datas[order_begin_pos.buy_single_index] params["trade_data"]["buy_single"] = {"time": data['val']['time'], "num": data['val']['num'], "money": round(data['val']['num'] * float( data['val']['price']) * 100 / 10000, 1)} if order_begin_pos.buy_exec_index is None or order_begin_pos.buy_exec_index < 0: # buy_params_info.append("未下单") pass else: data = total_datas[order_begin_pos.buy_exec_index] params["trade_data"]["buy_exec"] = {"time": data['val']['time'], "num": data['val']['num'], "money": round(data['val']['num'] * float( data['val']['price']) * 100 / 10000, 1)} params["trade_data"]["trade_state"] = {} trade_state = trade_manager.CodesTradeStateManager().get_trade_state_cache(code) if trade_state == trade_constant.TRADE_STATE_BUY_PLACE_ORDER or trade_state == trade_constant.TRADE_STATE_BUY_DELEGATED: params["trade_data"]["trade_state"]["order"] = True params["trade_data"]["trade_state"]["desc"] = "已下单" else: params["trade_data"]["trade_state"]["order"] = False if trade_state == trade_constant.TRADE_STATE_NOT_TRADE: params["trade_data"]["trade_state"]["desc"] = "未交易" elif trade_state == trade_constant.TRADE_STATE_BUY_CANCEL_ING: params["trade_data"]["trade_state"]["desc"] = "撤单中" elif trade_state == trade_constant.TRADE_STATE_BUY_CANCEL_SUCCESS: params["trade_data"]["trade_state"]["desc"] = "撤单成功" elif trade_state == trade_constant.TRADE_STATE_BUY_SUCCESS: params["trade_data"]["trade_state"]["desc"] = "已成交" # log.logger_debug.info(f"下单信息耗时:{time.time() - __start_time}") __start_time = time.time() # H撤监听范围 if trade_state == trade_constant.TRADE_STATE_BUY_PLACE_ORDER or trade_state == trade_constant.TRADE_STATE_BUY_DELEGATED or trade_state == trade_constant.TRADE_STATE_BUY_SUCCESS: hcancel_datas_dict, cancel_indexes_set = HourCancelBigNumComputer().get_watch_index_dict(code) # 根据日志读取实时的计算数据 h_cancel_latest_compute_info = log_export.get_h_cancel_compute_info(code) if hcancel_datas_dict: temp_list = [(k, hcancel_datas_dict[k][0]) for k in hcancel_datas_dict] canceled_indexs = set([int(k.split("-")[0]) for k in cancel_indexes_set]) temp_list.sort(key=lambda x: x[0]) params["trade_data"]["h_cancel"] = { "computed_info": list(h_cancel_latest_compute_info) if h_cancel_latest_compute_info else None, "datas": []} for i in range(0, len(temp_list)): temp = temp_list[i] val = total_datas[temp[0]]["val"] canceled = temp[0] in canceled_indexs params["trade_data"]["h_cancel"]["datas"].append( (val["time"], val["num"], money_desc(val["num"] * float(val["price"]) * 100), (1 if canceled else 0))) # log.logger_debug.info(f"H撤监听范围耗时:{time.time() - __start_time}") __start_time = time.time() ##############################主动买,被动买################################## # 返回主动买,被动买,不买的列表(代码, 名称, 得分, 是否涨停) # codes_score = __load_codes_scores() params["initiative_buy_codes"] = [] # for d in codes_score[0]: # params["initiative_buy_codes"].append( # {"name": d[1], "code": d[0], "score": d[2], "limit_up": d[3], "open_limit_up": d[4]}) params["passive_buy_codes"] = [] # for d in codes_score[1]: # params["passive_buy_codes"].append( # {"name": d[1], "code": d[0], "score": d[2], "limit_up": d[3], "open_limit_up": d[4]}) params["passive_buy_codes"] = params["passive_buy_codes"] # log.logger_debug.info(f"主动买,被动买耗时:{time.time() - __start_time}") __start_time = time.time() trade_info = load_trade_record(code, total_datas) params["trade_record"] = {"open_limit_up": trade_info[0], "records": trade_info[2]} # log.logger_debug.info(f"读取交易记录耗时:{time.time() - __start_time}") __start_time = time.time() ##############################开盘啦相关信息################################## industry = global_util.code_industry_map.get(code) params["kpl_code_info"] = { "industry": format_plate_output(industry)} # 获取开盘啦板块 plate_info = None jingxuan_block_info = KPLCodeJXBlockManager().get_jx_blocks_cache(code) if not jingxuan_block_info: jingxuan_block_info = KPLCodeJXBlockManager().get_jx_blocks_cache(code, by=True) if jingxuan_block_info: jingxuan_blocks = jingxuan_block_info[0] plate_info = [(0, x, 0) for x in jingxuan_blocks] # set([x[1] for x in jingxuan_blocks]) if not plate_info: if code not in kpl_block_info_dict: plate_info = kpl_api.getStockIDPlate(code) else: plate_info = kpl_block_info_dict.get(code) if plate_info: kpl_block_info_dict[code] = plate_info plate_info.sort(key=lambda x: x[2]) plate_info.reverse() params["kpl_code_info"]["plate"] = [(k[0], k[1], k[2], format_plate_output(k[1])[1]) for k in plate_info] # log.logger_debug.info(f"开盘啦板块耗时:{time.time() - __start_time}") __start_time = time.time() # 获取代码的历史涨停数据,(涨停原因,日期,板块) code_records = KPLLimitUpDataRecordManager.get_latest_infos(code, 4, False)[:2] if code_records: code_records = [(format_plate_output(k[0]), k[1], [format_plate_output(k1) for k1 in k[2].split("、")]) for k in code_records] # 修改历史 # code_records = LimitUpCodesBlockRecordManager().get_radical_buy_blocks_origin_data(code) # if code_records: # code_records = [(f"{x[0]}x{x[1]}", x[2], '') for x in code_records] # else: # code_records = [] params["kpl_code_info"]["code_records"] = code_records if not KPLLimitUpDataRecordManager.total_datas: KPLLimitUpDataRecordManager.load_total_datas() for d in KPLLimitUpDataRecordManager.total_datas: if d[3] == code: # 获取今日 plates = d[6].split("、") plates = [format_plate_output(p) for p in plates] params["kpl_code_info"]["today"] = (format_plate_output(d[2]), d[1], plates) break # log.logger_debug.info(f"获取代码的历史涨停数据耗时:{time.time() - __start_time}") __start_time = time.time() return params # 获取开盘啦板块信息 def get_kpl_block_info(code): def format_plate_output(_plat): return _plat, '' ##############################开盘啦相关信息################################## industry = global_util.code_industry_map.get(code) kpl_code_info = { "industry": format_plate_output(industry)} # 获取开盘啦板块 plate_info = None jingxuan_block_info = KPLCodeJXBlockManager().get_jx_blocks_cache(code) if not jingxuan_block_info: jingxuan_block_info = KPLCodeJXBlockManager().get_jx_blocks_cache(code, by=True) if jingxuan_block_info: jingxuan_blocks = jingxuan_block_info[0] plate_info = [(0, x, 0) for x in jingxuan_blocks] # set([x[1] for x in jingxuan_blocks]) if not plate_info: if code not in kpl_block_info_dict: plate_info = kpl_api.getStockIDPlate(code) else: plate_info = kpl_block_info_dict.get(code) if plate_info: kpl_block_info_dict[code] = plate_info plate_info.sort(key=lambda x: x[2]) plate_info.reverse() kpl_code_info["plate"] = [(k[0], k[1], k[2], format_plate_output(k[1])[1]) for k in plate_info] code_records = KPLLimitUpDataRecordManager.get_latest_infos(code, 4, False)[:2] if code_records: code_records = [(format_plate_output(k[0]), k[1], [format_plate_output(k1) for k1 in k[2].split("、")]) for k in code_records] kpl_code_info["code_records"] = code_records if not KPLLimitUpDataRecordManager.total_datas: KPLLimitUpDataRecordManager.load_total_datas() for d in KPLLimitUpDataRecordManager.total_datas: if d[3] == code: # 获取今日 plates = d[6].split("、") plates = [format_plate_output(p) for p in plates] kpl_code_info["today"] = (format_plate_output(d[2]), d[1], plates) break return kpl_code_info def load_trade_record(code, total_datas, date=tool.get_now_date_str()): def format_l2_data(item): return f"{item['val']['time']}#{item['val']['num']}手#{round(item['val']['num'] * float(item['val']['price']) * 100 / 10000, 1)}万" def load_cancel_watch_index(latest_cancel_watch_index_dict_): for k in latest_cancel_watch_index_dict_: records_new_data.append(latest_cancel_watch_index_dict_[k]) latest_cancel_watch_index_dict_.clear() # 获取炸板信息 limit_up_info = code_price_manager.Buy1PriceManager().get_limit_up_info(code) break_time = limit_up_info[1] records = [] try: records = log_export.load_trade_recod(code, date=date) except: pass records_new = [] records_new_data = [] index = 0 if records: try: latest_cancel_watch_index_dict = {} for record in records: time_ = record[0] type = record[1] data = record[2] if type == trade_record_log_util.TYPE_PLACE_ORDER: # if data['kpl_blocks'] and (type(data['kpl_blocks'][0]) == list or type(data['kpl_blocks'][0]) == tuple): # records_new_data.append((time_, "开盘啦推荐原因", # f"{'、'.join([k[1] for k in data['kpl_blocks']])}", # None)) # else: records_new_data.append((time_, "开盘啦推荐原因", f"{'、'.join(data['kpl_blocks'])}", None)) if "kpl_match_blocks" in data: if data["kpl_match_blocks"]: records_new_data.append((time_, "匹配原因", f"{'、'.join(data['kpl_match_blocks'])}", None)) else: records_new_data.append((time_, "匹配原因", f"独苗", None)) extra_datas = [] if data.get('big_num_indexes'): big_num_desc = [] for i in data['big_num_indexes']: big_num_desc.append(format_l2_data(total_datas[i])) extra_datas.append(f"包含大单:{' & '.join(big_num_desc)}") if data.get('m_val'): extra_datas.append(f"M值:{money_desc(data['m_val'])}") if data.get('safe_count'): extra_datas.append(f"安全笔数:{data['safe_count']}") extra_datas.append(f"总卖额:{data.get('sell_info')}") records_new_data.append((time_, "", "-------------------------", [])) mode = data.get('mode') mode_desc = data.get('mode_desc') if mode == OrderBeginPosInfo.MODE_ACTIVE: records_new_data.append((time_, "积极下单", mode_desc, extra_datas)) elif mode == OrderBeginPosInfo.MODE_FAST: records_new_data.append((time_, "闪电下单", mode_desc, extra_datas)) elif mode == OrderBeginPosInfo.MODE_RADICAL: records_new_data.append((time_, "扫入下单", mode_desc, extra_datas)) else: records_new_data.append((time_, "常规下单", mode_desc, extra_datas)) elif type == trade_record_log_util.TYPE_REAL_PLACE_ORDER_POSITION: _datas = [] MIN_MONEYS = [30000, 20000, 10000] for min_money in MIN_MONEYS: for i in range(data['index'] - 2, 0, -1): if L2DataUtil.is_limit_up_price_buy(total_datas[i]['val']) and total_datas[i]['val'][ 'num'] * float(total_datas[i]['val']['price']) >= min_money: _datas.append(f"【{format_l2_data(total_datas[i])}】") if len(_datas) >= 1: break if len(_datas) >= 1: break for i in range(data['index'], 0, -1): if L2DataUtil.is_limit_up_price_buy(total_datas[i]['val']) and total_datas[i]['val'][ 'num'] * float(total_datas[i]['val']['price']) >= 3000: _datas.append(f"【{format_l2_data(total_datas[i])}】") break records_new_data.append( (time_, "实际挂单位", "".join(_datas), [])) elif type == trade_record_log_util.TYPE_CANCEL_WATCH_INDEXES: indexes = data['watch_indexes'] if indexes: cancel_type = data['cancel_type'] indexes.sort() indexes_data = [] for index in indexes: indexes_data.append(format_l2_data(total_datas[index])) desc = f"【{format_l2_data(total_datas[indexes[0]])}】-【{format_l2_data(total_datas[indexes[-1]])}】" if cancel_type == trade_record_log_util.CancelWatchIndexesInfo.CANCEL_TYPE_L_DOWN: latest_cancel_watch_index_dict[cancel_type] = (time_, "L撤后囊括", desc, indexes_data) elif cancel_type == trade_record_log_util.CancelWatchIndexesInfo.CANCEL_TYPE_L_UP: latest_cancel_watch_index_dict[cancel_type] = (time_, "L撤前囊括", desc, indexes_data) elif cancel_type == trade_record_log_util.CancelWatchIndexesInfo.CANCEL_TYPE_H: latest_cancel_watch_index_dict[cancel_type] = (time_, "H撤囊括", desc, indexes_data) elif cancel_type == trade_record_log_util.CancelWatchIndexesInfo.CANCEL_TYPE_D: latest_cancel_watch_index_dict[cancel_type] = (time_, "D撤囊括", desc, indexes_data) elif cancel_type == trade_record_log_util.CancelWatchIndexesInfo.CANCEL_TYPE_S: latest_cancel_watch_index_dict[cancel_type] = (time_, "S撤囊括", desc, indexes_data) elif type == trade_record_log_util.TYPE_FORBIDDEN_BUY: records_new_data.append((time_, "加入黑名单", f"原因:{data['msg']}", [])) elif type == trade_record_log_util.TYPE_CANT_PLACE_ORDER: records_new_data.append((time_, "不能下单", f"原因:{data['msg']}", [])) elif type == trade_record_log_util.TYPE_CANCEL: load_cancel_watch_index(latest_cancel_watch_index_dict) records_new_data.append((time_, "撤单", f"原因:{data['msg']}", [])) elif type == trade_record_log_util.TYPE_ACTION: records_new_data.append((time_, data['type'], f"{data['msg']}", [])) load_cancel_watch_index(latest_cancel_watch_index_dict) records_new_data.sort(key=lambda x: x[0]) if records_new_data: for d in records_new_data: records_new.append(f"【{d[0]}】" + "{:<10}".format(f'【{d[1]}】') + d[2]) except Exception as e: logging.exception(e) records_new.reverse() records_new_data.reverse() return break_time, records_new, records_new_data # 返回内容[(类型,buy_single_index,indexes)] def load_trade_record_cancel_watch_indexes(code, cancel_type=None, date=tool.get_now_date_str()): fresults = [] records = [] try: records = log_export.load_trade_recod(code, date=date) except: pass if records: for record in records: time_ = record[0] type = record[1] data = record[2] if type == trade_record_log_util.TYPE_CANCEL_WATCH_INDEXES: indexes = data['watch_indexes'] if indexes: if cancel_type and cancel_type != data.get("cancel_type"): continue fresults.append((data.get("cancel_type"), data.get('buy_single_index'), indexes)) return fresults if __name__ == '__main__': code = '603616' records = load_trade_record_cancel_watch_indexes(code, trade_record_log_util.CancelWatchIndexesInfo.CANCEL_TYPE_H)