"""
|
现价处理器
|
"""
|
# 获取到现价
|
import decimal
|
import logging
|
|
from l2.huaxin import huaxin_target_codes_manager
|
from log_module import async_log_util
|
from log_module.log import logger_l2_codes_subscript
|
from ths import client_manager
|
import constant
|
from code_attribute import gpcode_manager
|
from utils import tool, import_util
|
from ths.l2_code_operate import L2CodeOperate
|
from trade import trade_manager, l2_trade_util
|
from trade.trade_data_manager import CodeActualPriceProcessor
|
|
trade_gui = import_util.import_lib("trade.trade_gui")
|
|
__actualPriceProcessor = CodeActualPriceProcessor()
|
|
latest_add_codes = set()
|
|
|
def accept_prices(prices, request_id=None):
|
# 获取首板代码
|
first_codes = gpcode_manager.FirstGPCodesManager().get_first_gp_codes_cache()
|
|
print("总价格代码数量:", len(prices))
|
if constant.L2_SOURCE_TYPE == constant.L2_SOURCE_TYPE_THS:
|
__actualPriceProcessor.save_current_price_codes_count(len(prices))
|
# 采集的代码数量不对, 暂时不需要
|
# if len(gpcode_manager.get_gp_list()) - len(prices) > 10:
|
# logger_l2_codes_subscript.info("采集到的代码数量不正确:{}", len(prices))
|
# return
|
now_str = tool.get_now_time_str()
|
# 获取想买单
|
want_codes = gpcode_manager.WantBuyCodesManager().list_code_cache()
|
if True:
|
_code_list = []
|
_delete_list = []
|
temp_prices = []
|
temp_rates = []
|
for d in prices:
|
code, price = d["code"], float(d["price"])
|
temp_prices.append((code, price))
|
# 获取收盘价
|
pricePre = gpcode_manager.CodePrePriceManager.get_price_pre_cache(code)
|
if pricePre is not None:
|
rate = round((price - pricePre) * 100 / pricePre, 2)
|
if first_codes and code in first_codes:
|
rate = rate / 2
|
if rate >= 0 and not trade_manager.ForbiddenBuyCodeByScoreManager().is_in_cache(code):
|
# 暂存涨幅为正的代码
|
_code_list.append((rate, code, 1 if code in want_codes else 0))
|
else:
|
# 暂存涨幅为负的代码
|
_delete_list.append((rate, code, 0))
|
try:
|
temp_rates.append((code, rate))
|
except Exception as e:
|
logging.exception(e)
|
|
try:
|
__actualPriceProcessor.save_current_price(code, price,
|
gpcode_manager.get_limit_up_price_by_preprice(
|
pricePre) == tool.to_price(
|
decimal.Decimal(d["price"])))
|
except Exception as e:
|
logging.exception(e)
|
logger_l2_codes_subscript.exception(e)
|
gpcode_manager.set_prices(temp_prices)
|
__actualPriceProcessor.process_rates(temp_rates, now_str)
|
# -------------------------------处理交易位置分配---------------------------------
|
# 排序
|
new_code_list = sorted(_code_list, key=lambda e: (e.__getitem__(2), e.__getitem__(0)), reverse=True)
|
if constant.L2_SOURCE_TYPE == constant.L2_SOURCE_TYPE_THS:
|
# 预填充下单代码
|
_buy_win_codes = []
|
for d in new_code_list:
|
_buy_win_codes.append(d[1])
|
for d in _delete_list:
|
_buy_win_codes.append(d[1])
|
try:
|
if not constant.API_TRADE_ENABLE and trade_gui is not None:
|
trade_gui.THSBuyWinManagerNew.fill_codes(_buy_win_codes)
|
except Exception as e:
|
logging.exception(e)
|
pass
|
|
# -------------------------------处理L2监听---------------------------------
|
max_count = 0
|
if constant.L2_SOURCE_TYPE == constant.L2_SOURCE_TYPE_THS:
|
client_ids = client_manager.getValidL2Clients()
|
# 最多填充的代码数量
|
max_count = len(client_ids) * constant.L2_CODE_COUNT_PER_DEVICE
|
if max_count == 0:
|
max_count = constant.L2_CODE_COUNT_PER_DEVICE
|
else:
|
max_count = constant.HUAXIN_L2_MAX_CODES_COUNT
|
|
_delete_list = []
|
for item in new_code_list:
|
if l2_trade_util.is_in_forbidden_trade_codes(
|
item[1]) or item[0] < 0:
|
# 在(黑名单)/(涨幅小于)的数据
|
if trade_manager.CodesTradeStateManager().get_trade_state_cache(
|
item[1]) != trade_manager.TRADE_STATE_BUY_SUCCESS:
|
# 没成交才会加入删除
|
_delete_list.append(item)
|
|
for item in _delete_list:
|
new_code_list.remove(item)
|
# 截取前几个代码填充
|
add_list = new_code_list[:max_count]
|
# 后面的代码全部删除
|
_delete_list.extend(new_code_list[max_count:])
|
|
add_code_list = []
|
del_code_list = []
|
for d in add_list:
|
add_code_list.append(d[1])
|
|
for d in _delete_list:
|
del_code_list.append(d[1])
|
|
if constant.L2_SOURCE_TYPE == constant.L2_SOURCE_TYPE_HUAXIN:
|
# 华鑫L2,获取加入代码的涨停价
|
|
# 是否和上次一样
|
try:
|
add_code_set = set(add_code_list)
|
# global latest_add_codes
|
# if not latest_add_codes:
|
# latest_add_codes = set()
|
# # 判断设置的代码是否相同
|
# dif1 = latest_add_codes - add_code_set
|
# dif2 = add_code_set - latest_add_codes
|
# if dif1 or dif2:
|
if True:
|
print("设置L2代码数量:", len(add_code_set))
|
global latest_add_codes
|
async_log_util.info(logger_l2_codes_subscript,
|
f"({request_id})预处理新增订阅代码:{add_code_set - latest_add_codes}")
|
latest_add_codes = add_code_set
|
add_datas = []
|
for d in add_code_list:
|
limit_up_price = gpcode_manager.get_limit_up_price(d)
|
limit_up_price = round(float(limit_up_price), 2)
|
min_volume = int(round(50 * 10000 / limit_up_price))
|
# 传递笼子价
|
add_datas.append(
|
(d, min_volume, limit_up_price, round(tool.get_shadow_price(limit_up_price), 2),
|
tool.get_buy_volume(limit_up_price)))
|
huaxin_target_codes_manager.HuaXinL2SubscriptCodesManager.push(add_datas, request_id)
|
except Exception as e:
|
logging.exception(e)
|
else:
|
# 后面的代码数量
|
# 先删除应该删除的代码
|
for code in del_code_list:
|
if gpcode_manager.is_listen_old(code):
|
cid, pid = gpcode_manager.get_listen_code_pos(code)
|
# 强制移除
|
if cid and pid:
|
gpcode_manager.set_listen_code_by_pos(cid, pid, "")
|
# 判断是否在监听里面
|
L2CodeOperate.get_instance().add_operate(0, code, "现价变化")
|
# 增加应该增加的代码
|
for code in add_code_list:
|
if not gpcode_manager.is_listen_old(code):
|
L2CodeOperate.get_instance().add_operate(1, code, "现价变化")
|
|
# 获取卡位数量
|
free_count = gpcode_manager.get_free_listen_pos_count()
|
if free_count < 2:
|
# 空闲位置不足
|
listen_codes = gpcode_manager.get_listen_codes()
|
for code in listen_codes:
|
if not gpcode_manager.is_in_gp_pool(code):
|
client_id, pos = gpcode_manager.get_listen_code_pos(code)
|
gpcode_manager.set_listen_code_by_pos(client_id, pos, "")
|
free_count += 1
|
if free_count > 2:
|
break
|
|
print(add_code_list, del_code_list)
|
|
|
__trade_price_dict = {}
|
|
# 最近的非涨停价成交的信息,数据结构:{code:(价格,时间)}
|
__trade_price_not_limit_up_info_dict = {}
|
|
|
# 设置成交价
|
def set_trade_price(code, price, time_str, limit_up_price):
|
__trade_price_dict[code] = price
|
# 需要记录最近一次非涨停价成交的时间
|
if limit_up_price and abs(limit_up_price - price) > 0.001:
|
# 非涨停价成交
|
__trade_price_not_limit_up_info_dict[code] = (price, time_str)
|
|
|
# 获取成交价
|
def get_trade_price(code):
|
return __trade_price_dict.get(code)
|
|
|
def get_trade_not_limit_up_info(code):
|
"""
|
获取最近的非涨停价成交的信息
|
@param code:
|
@return:(价格,时间)
|
"""
|
return __trade_price_not_limit_up_info_dict.get(code)
|