""" 现价处理器 """ # 获取到现价 import decimal import logging from l2.huaxin import huaxin_target_codes_manager from log_module import async_log_util from log_module.log import logger_l2_codes_subscript, logger_debug import constant from code_attribute import gpcode_manager from third_data import kpl_data_constant from third_data.code_plate_key_manager import KPLPlateForbiddenManager from third_data.kpl_data_constant import LimitUpCodesBlockRecordManager, LimitUpDataConstant from trade.buy_money_count_setting import BuyMoneyUtil from trade.buy_radical import radical_buy_data_manager, new_block_processor from trade.buy_radical.block_special_codes_manager import BlockSpecialCodesManager from trade.buy_radical.radical_buy_data_manager import RadicalBuyBlockManager, RadicalBuyDataManager from trade.order_statistic import DealAndDelegateWithBuyModeDataManager from utils import tool, import_util from trade import trade_manager, l2_trade_util, trade_constant from trade.trade_data_manager import CodeActualPriceProcessor, RadicalBuyDealCodesManager import concurrent.futures trade_gui = import_util.import_lib("trade.trade_gui") __actualPriceProcessor = CodeActualPriceProcessor() __pre_big_order_deal_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=10) latest_add_codes = set() def compute_code_order(code, top_in_blocks=None, yesterday_limit_up_codes=None, today_history_limit_up_codes=None, top_out_blocks=None): """ 计算代码的排序 @param code: @param top_in_blocks: 净流入前几 @return: 排序值,若为负值不订阅 """ # 想买单/隔夜单排序位 if yesterday_limit_up_codes is None: yesterday_limit_up_codes = set() if top_in_blocks is None: top_in_blocks = [] if top_out_blocks is None: top_out_blocks = [] if today_history_limit_up_codes is None: today_history_limit_up_codes = set() # 高位板 if code in yesterday_limit_up_codes: return -1 # 黑名单 trade_state = trade_manager.CodesTradeStateManager().get_trade_state_cache(code) # 处于委托状态的必须订阅 if trade_state == trade_constant.TRADE_STATE_BUY_DELEGATED or trade_state == trade_constant.TRADE_STATE_BUY_PLACE_ORDER: return 0 if l2_trade_util.is_in_forbidden_trade_codes(code): # 没有成交 if trade_state == trade_constant.TRADE_STATE_BUY_SUCCESS: # 成交的票 return 998 else: return -1 deal_codes = RadicalBuyDealCodesManager().get_deal_codes() result = RadicalBuyDataManager().is_code_can_buy(code, deal_codes) if not result[0]: if len(result) > 2 and result[2]: # 可以拉黑 if new_block_processor.is_can_forbidden(code): l2_trade_util.forbidden_trade(code, msg=result[1]) return -1 # 想买单 if gpcode_manager.WantBuyCodesManager().is_in_cache(code): return 0 # 隔夜单排一 if gpcode_manager.BuyOpenLimitUpCodeManager().is_in_cache(code): return 0 # 如果当前清单处于委托状态就不能移除 if trade_state == trade_constant.TRADE_STATE_BUY_PLACE_ORDER or trade_state == trade_constant.TRADE_STATE_BUY_DELEGATED: return 0 # 板块 blocks = LimitUpCodesBlockRecordManager().get_radical_buy_blocks(code) if not blocks: return -1 if code in today_history_limit_up_codes: # 涨停过的代码才会计算板块身位 for b in blocks: index = None # 只订阅有辨识度的票和新板块前3 special_codes = BlockSpecialCodesManager().get_block_codes(b) if special_codes and code in special_codes: # 有辨识度在净流入中则订阅 if b in top_in_blocks: index = top_in_blocks.index(b) return index + 1 else: # 辨识度的票没在净流入中,只要不在净流出中就订阅 if b not in top_out_blocks: return 200 else: # 没有辨识度,新板块订阅前3 new_blocks = kpl_data_constant.get_new_blocks(code) if new_blocks and b in new_blocks: info = RadicalBuyBlockManager().get_history_index(code, b, yesterday_limit_up_codes) if info[0] > 0: info = RadicalBuyBlockManager().filter_before_codes(code, b, info[0], info[1], yesterday_limit_up_codes) if info[0] < 3: if b in top_in_blocks: index = top_in_blocks.index(b) return index + 1 else: return 200 else: # 尚未涨停过的代码,订阅板块有辨识度的前6 for b in blocks: if b not in top_in_blocks and tool.get_now_time_as_int() >= 100000: # 10点之后才考虑净流入 continue if b in top_in_blocks: index = top_in_blocks.index(b) else: index = 1000 special_codes = BlockSpecialCodesManager().get_block_codes(b) if not special_codes or code not in special_codes: continue # 净流入 + 辨识度 return index + 1 # 判断今日辨识度 try: for b in blocks: if radical_buy_data_manager.RadicalBuyBlockManager.is_today_block_special_codes(code, b, yesterday_limit_up_codes): if b in top_in_blocks: index = top_in_blocks.index(b) return index + 1 except Exception as e: logger_debug.exception(e) return 10000 def accept_prices(prices, request_id=None, top_in_blocks=None, yesterday_limit_up_codes=None, top_out_blocks=None): """ 接收价格,处理订阅 @param yesterday_limit_up_codes: 昨日涨停数据 @param history_limit_up_datas: 历史涨停数据 @param prices: @param request_id: @param top_in_blocks: 净流入前几的代码 @return: """ # logger_debug.debug(f"接收L1数据测试:流入前20-{top_in_blocks}") if True: today_history_limit_up_codes = set([d[3] for d in LimitUpDataConstant.history_limit_up_datas]) _code_list = [] _delete_list = [] temp_prices = [] now_time_int = tool.get_now_time_as_int() for d in prices: code, price = d["code"], float(d["price"]) temp_prices.append((code, price)) # 获取收盘价 pricePre = gpcode_manager.CodePrePriceManager.get_price_pre_cache(code) if pricePre is not None: # 是否是想买单 order_index = compute_code_order(code, top_in_blocks, yesterday_limit_up_codes, today_history_limit_up_codes, top_out_blocks=top_out_blocks) rate = round((price - pricePre) * 100 / pricePre, 2) if tool.is_ge_code(code): # 创业板的涨幅需要打折 rate = rate / 2 if order_index >= 0: if order_index < 1000 and rate >= 5: # 涨幅大于3%的才能订阅 _code_list.append((rate, code, order_index)) else: _delete_list.append((rate, code, 0)) else: # 暂存涨幅为负的代码 _delete_list.append((rate, code, 0)) try: __actualPriceProcessor.save_current_price(code, price, gpcode_manager.get_limit_up_price_by_preprice(code, pricePre) == tool.to_price( decimal.Decimal(d["price"]))) except Exception as e: logging.exception(e) logger_l2_codes_subscript.exception(e) gpcode_manager.set_prices(temp_prices) # -------------------------------处理交易位置分配--------------------------------- # 排序 new_code_list = sorted(_code_list, key=lambda e: (e[2], -e[0])) # logger_debug.debug(f"接收L1数据测试:排序过后的代码-{new_code_list[:75]}") # -------------------------------处理L2监听--------------------------------- max_count = constant.HUAXIN_L2_MAX_CODES_COUNT _delete_list = [] for item in new_code_list: trade_state = trade_manager.CodesTradeStateManager().get_trade_state_cache(item[1]) if l2_trade_util.is_in_forbidden_trade_codes( item[1]) and trade_state != trade_constant.TRADE_STATE_BUY_SUCCESS: # 拉黑的尚未成交的代码 _delete_list.append(item) elif item[0] < 0: # 在(黑名单)/(涨幅小于)的数据 # if trade_manager.CodesTradeStateManager().get_trade_state_cache( # item[1]) != trade_constant.TRADE_STATE_BUY_SUCCESS: # 没成交才会加入删除 _delete_list.append(item) for item in _delete_list: new_code_list.remove(item) # 截取前几个代码填充 add_list = new_code_list[:max_count] async_log_util.info(logger_l2_codes_subscript, f"({request_id})需要订阅的代码:{add_list}") # 后面的代码全部删除 _delete_list.extend(new_code_list[max_count:]) add_code_list = [] del_code_list = [] for d in add_list: add_code_list.append(d[1]) for d in _delete_list: del_code_list.append(d[1]) if del_code_list: async_log_util.info(logger_l2_codes_subscript, f"({request_id})需要删除订阅的代码:{del_code_list}") if constant.L2_SOURCE_TYPE == constant.L2_SOURCE_TYPE_HUAXIN: # 华鑫L2,获取加入代码的涨停价 # 是否和上次一样 try: add_code_set = set(add_code_list) global latest_add_codes if not latest_add_codes: latest_add_codes = set() # # 判断设置的代码是否相同 # dif1 = latest_add_codes - add_code_set dif2 = add_code_set - latest_add_codes if dif2: # 新增加的订阅需要拉取之前的大单 __pre_big_order_deal_thread_pool.submit(radical_buy_data_manager.pull_pre_deal_big_orders_by_codes, dif2) if True: async_log_util.info(logger_l2_codes_subscript, f"({request_id})预处理新增订阅代码:{dif2}") latest_add_codes = add_code_set add_datas = [] for d in add_code_list: limit_up_price = gpcode_manager.get_limit_up_price_as_num(d) min_volume = int(round(50 * 10000 / limit_up_price)) # 需要订阅的特殊的量 special_volumes = BuyMoneyUtil.get_possible_buy_volumes(limit_up_price) special_volumes |= set([tool.get_buy_volume_by_money(limit_up_price, x) for x in constant.AVAILABLE_BUY_MONEYS]) add_datas.append( # (代码, 最小量, 涨停价,影子订单价格,买量, 特殊价格) (d, min_volume, limit_up_price, round(tool.get_shadow_price(limit_up_price), 2), tool.get_buy_volume(limit_up_price), list(special_volumes))) huaxin_target_codes_manager.HuaXinL2SubscriptCodesManager.push(add_datas, request_id) except Exception as e: logger_debug.exception(e) else: pass __trade_price_dict = {} # 最近的非涨停价成交的信息,数据结构:{code:(价格,时间)} __trade_price_not_limit_up_info_dict = {} # 设置成交价 def set_trade_price(code, price): __trade_price_dict[code] = price def set_latest_not_limit_up_time(code, time_str_with_ms): """ 记录最近的一次上板时间(最近的一笔主动买就是上板时间) @param code: @param time_str: @return: """ __trade_price_not_limit_up_info_dict[code] = time_str_with_ms # 获取成交价 def get_trade_price(code): return __trade_price_dict.get(code) def get_trade_not_limit_up_time_with_ms(code): """ 获取最近的非板上成交的时间 @param code: @return:(价格, 时间) """ return __trade_price_not_limit_up_info_dict.get(code)