""" 现价处理器 """ # 获取到现价 import decimal import logging from l2.huaxin import huaxin_target_codes_manager from log_module import async_log_util from log_module.log import logger_l2_codes_subscript from ths import client_manager import constant from code_attribute import gpcode_manager from utils import tool, import_util from ths.l2_code_operate import L2CodeOperate from trade import trade_manager, l2_trade_util, trade_constant from trade.trade_data_manager import CodeActualPriceProcessor trade_gui = import_util.import_lib("trade.trade_gui") __actualPriceProcessor = CodeActualPriceProcessor() latest_add_codes = set() def accept_prices(prices, request_id=None): # 获取首板代码 first_codes = gpcode_manager.FirstGPCodesManager().get_first_gp_codes_cache() print("总价格代码数量:", len(prices)) if constant.L2_SOURCE_TYPE == constant.L2_SOURCE_TYPE_THS: __actualPriceProcessor.save_current_price_codes_count(len(prices)) # 采集的代码数量不对, 暂时不需要 # if len(gpcode_manager.get_gp_list()) - len(prices) > 10: # logger_l2_codes_subscript.info("采集到的代码数量不正确:{}", len(prices)) # return now_str = tool.get_now_time_str() # 获取想买单 want_codes = gpcode_manager.WantBuyCodesManager().list_code_cache() if True: _code_list = [] _delete_list = [] temp_prices = [] temp_rates = [] for d in prices: code, price = d["code"], float(d["price"]) temp_prices.append((code, price)) # 获取收盘价 pricePre = gpcode_manager.CodePrePriceManager.get_price_pre_cache(code) if pricePre is not None: rate = round((price - pricePre) * 100 / pricePre, 2) if first_codes and code in first_codes: rate = rate / 2 if rate >= 0 and not trade_manager.ForbiddenBuyCodeByScoreManager().is_in_cache(code): # 暂存涨幅为正的代码 _code_list.append((rate, code, 1 if code in want_codes else 0)) else: # 暂存涨幅为负的代码 _delete_list.append((rate, code, 0)) try: temp_rates.append((code, rate)) except Exception as e: logging.exception(e) try: __actualPriceProcessor.save_current_price(code, price, gpcode_manager.get_limit_up_price_by_preprice(code, pricePre) == tool.to_price( decimal.Decimal(d["price"]))) except Exception as e: logging.exception(e) logger_l2_codes_subscript.exception(e) gpcode_manager.set_prices(temp_prices) __actualPriceProcessor.process_rates(temp_rates, now_str) # -------------------------------处理交易位置分配--------------------------------- # 排序 new_code_list = sorted(_code_list, key=lambda e: (e.__getitem__(2), e.__getitem__(0)), reverse=True) if constant.L2_SOURCE_TYPE == constant.L2_SOURCE_TYPE_THS: # 预填充下单代码 _buy_win_codes = [] for d in new_code_list: _buy_win_codes.append(d[1]) for d in _delete_list: _buy_win_codes.append(d[1]) try: if not constant.API_TRADE_ENABLE and trade_gui is not None: trade_gui.THSBuyWinManagerNew.fill_codes(_buy_win_codes) except Exception as e: logging.exception(e) pass # -------------------------------处理L2监听--------------------------------- max_count = 0 if constant.L2_SOURCE_TYPE == constant.L2_SOURCE_TYPE_THS: client_ids = client_manager.getValidL2Clients() # 最多填充的代码数量 max_count = len(client_ids) * constant.L2_CODE_COUNT_PER_DEVICE if max_count == 0: max_count = constant.L2_CODE_COUNT_PER_DEVICE else: max_count = constant.HUAXIN_L2_MAX_CODES_COUNT _delete_list = [] for item in new_code_list: if l2_trade_util.is_in_forbidden_trade_codes( item[1]) or item[0] < 0: # 在(黑名单)/(涨幅小于)的数据 if trade_manager.CodesTradeStateManager().get_trade_state_cache( item[1]) != trade_constant.TRADE_STATE_BUY_SUCCESS: # 没成交才会加入删除 _delete_list.append(item) for item in _delete_list: new_code_list.remove(item) # 截取前几个代码填充 add_list = new_code_list[:max_count] # 后面的代码全部删除 _delete_list.extend(new_code_list[max_count:]) add_code_list = [] del_code_list = [] for d in add_list: add_code_list.append(d[1]) for d in _delete_list: del_code_list.append(d[1]) if constant.L2_SOURCE_TYPE == constant.L2_SOURCE_TYPE_HUAXIN: # 华鑫L2,获取加入代码的涨停价 # 是否和上次一样 try: add_code_set = set(add_code_list) # global latest_add_codes # if not latest_add_codes: # latest_add_codes = set() # # 判断设置的代码是否相同 # dif1 = latest_add_codes - add_code_set # dif2 = add_code_set - latest_add_codes # if dif1 or dif2: if True: print("设置L2代码数量:", len(add_code_set)) global latest_add_codes async_log_util.info(logger_l2_codes_subscript, f"({request_id})预处理新增订阅代码:{add_code_set - latest_add_codes}") latest_add_codes = add_code_set add_datas = [] for d in add_code_list: limit_up_price = gpcode_manager.get_limit_up_price(d) limit_up_price = round(float(limit_up_price), 2) min_volume = int(round(50 * 10000 / limit_up_price)) # 传递笼子价 add_datas.append( (d, min_volume, limit_up_price, round(tool.get_shadow_price(limit_up_price), 2), tool.get_buy_volume(limit_up_price))) huaxin_target_codes_manager.HuaXinL2SubscriptCodesManager.push(add_datas, request_id) except Exception as e: logging.exception(e) else: # 后面的代码数量 # 先删除应该删除的代码 for code in del_code_list: if gpcode_manager.is_listen_old(code): cid, pid = gpcode_manager.get_listen_code_pos(code) # 强制移除 if cid and pid: gpcode_manager.set_listen_code_by_pos(cid, pid, "") # 判断是否在监听里面 L2CodeOperate.get_instance().add_operate(0, code, "现价变化") # 增加应该增加的代码 for code in add_code_list: if not gpcode_manager.is_listen_old(code): L2CodeOperate.get_instance().add_operate(1, code, "现价变化") # 获取卡位数量 free_count = gpcode_manager.get_free_listen_pos_count() if free_count < 2: # 空闲位置不足 listen_codes = gpcode_manager.get_listen_codes() for code in listen_codes: if not gpcode_manager.is_in_gp_pool(code): client_id, pos = gpcode_manager.get_listen_code_pos(code) gpcode_manager.set_listen_code_by_pos(client_id, pos, "") free_count += 1 if free_count > 2: break print(add_code_list, del_code_list) __trade_price_dict = {} # 最近的非涨停价成交的信息,数据结构:{code:(价格,时间)} __trade_price_not_limit_up_info_dict = {} # 设置成交价 def set_trade_price(code, price, time_str, limit_up_price): __trade_price_dict[code] = price # 需要记录最近一次非涨停价成交的时间 if limit_up_price and abs(limit_up_price - price) > 0.001: # 非涨停价成交 __trade_price_not_limit_up_info_dict[code] = (price, time_str) # 获取成交价 def get_trade_price(code): return __trade_price_dict.get(code) def get_trade_not_limit_up_info(code): """ 获取最近的非涨停价成交的信息 @param code: @return:(价格,时间) """ return __trade_price_not_limit_up_info_dict.get(code)