"""
|
现价处理器
|
"""
|
# 获取到现价
|
import decimal
|
import logging
|
|
from l2.huaxin import huaxin_target_codes_manager
|
from log_module import async_log_util
|
from log_module.log import logger_l2_codes_subscript, logger_debug
|
import constant
|
from code_attribute import gpcode_manager
|
from third_data.code_plate_key_manager import KPLPlateForbiddenManager
|
from third_data.kpl_data_constant import LimitUpCodesBlockRecordManager, LimitUpDataConstant
|
from trade.buy_money_count_setting import BuyMoneyUtil
|
from trade.buy_radical import radical_buy_data_manager
|
from trade.buy_radical.block_special_codes_manager import BlockSpecialCodesManager
|
from trade.buy_radical.radical_buy_data_manager import RadicalBuyBlockManager, RadicalBuyDataManager
|
from utils import tool, import_util
|
from trade import trade_manager, l2_trade_util, trade_constant
|
from trade.trade_data_manager import CodeActualPriceProcessor, RadicalBuyDealCodesManager
|
import concurrent.futures
|
|
trade_gui = import_util.import_lib("trade.trade_gui")
|
|
__actualPriceProcessor = CodeActualPriceProcessor()
|
|
__pre_big_order_deal_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=10)
|
|
latest_add_codes = set()
|
|
|
def compute_code_order(code, top_in_blocks=None, yesterday_limit_up_codes=None, today_history_limit_up_codes=None):
|
"""
|
计算代码的排序
|
@param code:
|
@param top_in_blocks: 净流入前几
|
@return: 排序值,若为负值不订阅
|
"""
|
# 想买单/隔夜单排序位
|
if yesterday_limit_up_codes is None:
|
yesterday_limit_up_codes = set()
|
if top_in_blocks is None:
|
top_in_blocks = []
|
if today_history_limit_up_codes is None:
|
today_history_limit_up_codes = set()
|
# 高位板
|
if code in yesterday_limit_up_codes:
|
return -1
|
# 黑名单
|
trade_state = trade_manager.CodesTradeStateManager().get_trade_state_cache(code)
|
|
if l2_trade_util.is_in_forbidden_trade_codes(code):
|
# 没有成交
|
if trade_state == trade_constant.TRADE_STATE_BUY_SUCCESS:
|
# 成交的票
|
return 998
|
else:
|
return -1
|
|
deal_codes = RadicalBuyDealCodesManager().get_deal_codes()
|
result = RadicalBuyDataManager().is_code_can_buy(code, deal_codes)
|
if not result[0]:
|
if len(result) > 2 and result[2]:
|
# 可以拉黑
|
l2_trade_util.forbidden_trade(code, result[1])
|
return -1
|
|
# 想买单
|
if gpcode_manager.WantBuyCodesManager().is_in_cache(code):
|
return 0
|
# 隔夜单排一
|
if gpcode_manager.BuyOpenLimitUpCodeManager().is_in_cache(code):
|
return 0
|
|
# 如果当前清单处于委托状态就不能移除
|
if trade_state == trade_constant.TRADE_STATE_BUY_PLACE_ORDER or trade_state == trade_constant.TRADE_STATE_BUY_DELEGATED:
|
return 0
|
|
# 板块
|
blocks = LimitUpCodesBlockRecordManager().get_radical_buy_blocks(code)
|
if not blocks:
|
return -1
|
if code in today_history_limit_up_codes:
|
# 涨停过的代码才会计算板块身位
|
for b in blocks:
|
index = None
|
# 只订阅有辨识度的票和新板块前3
|
special_codes = BlockSpecialCodesManager().get_block_codes(b)
|
if special_codes and code in special_codes:
|
# 有辨识度在净流入中则订阅
|
if b in top_in_blocks:
|
index = top_in_blocks.index(b)
|
return index + 1
|
else:
|
# 没有辨识度,新板块订阅前3
|
new_blocks = LimitUpCodesBlockRecordManager().get_new_blocks()
|
if new_blocks and b in new_blocks:
|
info = RadicalBuyBlockManager().get_history_index(code, b, yesterday_limit_up_codes)
|
if info[0] > 0:
|
info = RadicalBuyBlockManager().filter_before_codes(code, b, info[0], info[1],
|
yesterday_limit_up_codes)
|
if info[0] < 3:
|
if b in top_in_blocks:
|
index = top_in_blocks.index(b)
|
return index + 1
|
else:
|
return 200
|
else:
|
# 尚未涨停过的代码,订阅板块有辨识度的前6
|
for b in blocks:
|
if b not in top_in_blocks:
|
continue
|
index = top_in_blocks.index(b)
|
special_codes = BlockSpecialCodesManager().get_block_codes(b)
|
if not special_codes or code not in special_codes:
|
continue
|
# 净流入 + 辨识度
|
return index + 1
|
return 10000
|
|
|
def accept_prices(prices, request_id=None, top_in_blocks=None, yesterday_limit_up_codes=None):
|
"""
|
接收价格,处理订阅
|
@param yesterday_limit_up_codes: 昨日涨停数据
|
@param history_limit_up_datas: 历史涨停数据
|
@param prices:
|
@param request_id:
|
@param top_in_blocks: 净流入前几的代码
|
@return:
|
"""
|
# logger_debug.debug(f"接收L1数据测试:流入前20-{top_in_blocks}")
|
if True:
|
today_history_limit_up_codes = set([d[3] for d in LimitUpDataConstant.history_limit_up_datas])
|
_code_list = []
|
_delete_list = []
|
temp_prices = []
|
now_time_int = tool.get_now_time_as_int()
|
for d in prices:
|
code, price = d["code"], float(d["price"])
|
temp_prices.append((code, price))
|
# 获取收盘价
|
pricePre = gpcode_manager.CodePrePriceManager.get_price_pre_cache(code)
|
if pricePre is not None:
|
# 是否是想买单
|
order_index = compute_code_order(code, top_in_blocks, yesterday_limit_up_codes,
|
today_history_limit_up_codes)
|
rate = round((price - pricePre) * 100 / pricePre, 2)
|
if tool.is_ge_code(code):
|
# 创业板的涨幅需要打折
|
rate = rate / 2
|
if order_index >= 0:
|
if order_index < 1000 and rate >= 3:
|
# 涨幅大于3%的才能订阅
|
_code_list.append((rate, code, order_index))
|
else:
|
_delete_list.append((rate, code, 0))
|
else:
|
# 暂存涨幅为负的代码
|
_delete_list.append((rate, code, 0))
|
try:
|
__actualPriceProcessor.save_current_price(code, price,
|
gpcode_manager.get_limit_up_price_by_preprice(code,
|
pricePre) == tool.to_price(
|
decimal.Decimal(d["price"])))
|
except Exception as e:
|
logging.exception(e)
|
logger_l2_codes_subscript.exception(e)
|
gpcode_manager.set_prices(temp_prices)
|
# -------------------------------处理交易位置分配---------------------------------
|
# 排序
|
new_code_list = sorted(_code_list, key=lambda e: (e[2], -e[0]))
|
# logger_debug.debug(f"接收L1数据测试:排序过后的代码-{new_code_list[:75]}")
|
# -------------------------------处理L2监听---------------------------------
|
max_count = constant.HUAXIN_L2_MAX_CODES_COUNT
|
|
_delete_list = []
|
for item in new_code_list:
|
trade_state = trade_manager.CodesTradeStateManager().get_trade_state_cache(item[1])
|
if l2_trade_util.is_in_forbidden_trade_codes(item[1]) and trade_state != trade_constant.TRADE_STATE_BUY_SUCCESS:
|
# 拉黑的尚未成交的代码
|
_delete_list.append(item)
|
elif item[0] < 0:
|
# 在(黑名单)/(涨幅小于)的数据
|
# if trade_manager.CodesTradeStateManager().get_trade_state_cache(
|
# item[1]) != trade_constant.TRADE_STATE_BUY_SUCCESS:
|
# 没成交才会加入删除
|
_delete_list.append(item)
|
|
for item in _delete_list:
|
new_code_list.remove(item)
|
# 截取前几个代码填充
|
add_list = new_code_list[:max_count]
|
async_log_util.info(logger_l2_codes_subscript,
|
f"({request_id})需要订阅的代码:{add_list}")
|
|
# 后面的代码全部删除
|
_delete_list.extend(new_code_list[max_count:])
|
|
add_code_list = []
|
del_code_list = []
|
for d in add_list:
|
add_code_list.append(d[1])
|
|
for d in _delete_list:
|
del_code_list.append(d[1])
|
|
if constant.L2_SOURCE_TYPE == constant.L2_SOURCE_TYPE_HUAXIN:
|
# 华鑫L2,获取加入代码的涨停价
|
|
# 是否和上次一样
|
try:
|
add_code_set = set(add_code_list)
|
global latest_add_codes
|
if not latest_add_codes:
|
latest_add_codes = set()
|
# # 判断设置的代码是否相同
|
# dif1 = latest_add_codes - add_code_set
|
dif2 = add_code_set - latest_add_codes
|
if dif2:
|
# 新增加的订阅需要拉取之前的大单
|
for c in dif2:
|
__pre_big_order_deal_thread_pool.submit(radical_buy_data_manager.pull_pre_deal_big_orders, c)
|
if True:
|
async_log_util.info(logger_l2_codes_subscript,
|
f"({request_id})预处理新增订阅代码:{dif2}")
|
latest_add_codes = add_code_set
|
add_datas = []
|
for d in add_code_list:
|
limit_up_price = gpcode_manager.get_limit_up_price_as_num(d)
|
min_volume = int(round(50 * 10000 / limit_up_price))
|
# 需要订阅的特殊的量
|
special_volumes = BuyMoneyUtil.get_possible_buy_volumes(limit_up_price)
|
special_volumes |= set([tool.get_buy_volume_by_money(limit_up_price, x) for x in
|
constant.AVAILABLE_BUY_MONEYS])
|
add_datas.append(
|
# (代码, 最小量, 涨停价,影子订单价格,买量, 特殊价格)
|
(d, min_volume, limit_up_price, round(tool.get_shadow_price(limit_up_price), 2),
|
tool.get_buy_volume(limit_up_price), list(special_volumes)))
|
huaxin_target_codes_manager.HuaXinL2SubscriptCodesManager.push(add_datas, request_id)
|
except Exception as e:
|
logging.exception(e)
|
else:
|
pass
|
|
|
__trade_price_dict = {}
|
|
# 最近的非涨停价成交的信息,数据结构:{code:(价格,时间)}
|
__trade_price_not_limit_up_info_dict = {}
|
|
|
# 设置成交价
|
def set_trade_price(code, price):
|
__trade_price_dict[code] = price
|
|
|
def set_latest_not_limit_up_time(code, time_str_with_ms):
|
"""
|
记录最近的一次上板时间(最近的一笔主动买就是上板时间)
|
@param code:
|
@param time_str:
|
@return:
|
"""
|
__trade_price_not_limit_up_info_dict[code] = time_str_with_ms
|
|
|
# 获取成交价
|
def get_trade_price(code):
|
return __trade_price_dict.get(code)
|
|
|
def get_trade_not_limit_up_time_with_ms(code):
|
"""
|
获取最近的非板上成交的时间
|
@param code:
|
@return:(价格, 时间)
|
"""
|
return __trade_price_not_limit_up_info_dict.get(code)
|