"""
|
下单信号管理
|
"""
|
import time
|
|
from l2 import l2_log
|
from l2.huaxin import l2_huaxin_util
|
from log_module import async_log_util
|
from log_module.log import logger_l2_trade_buy, logger_debug
|
from utils import tool
|
|
|
class L2TradeSingleDataProcessor:
|
"""
|
L2逐笔成交数据信号管理
|
"""
|
__latest_sell_data = {}
|
|
# 最近的涨停卖委托列表
|
__latest_limit_up_sell_list_dict = {}
|
|
__latest_limit_up_sell_order_no_set_dict = {}
|
|
# 主动卖订单号集合
|
__active_sell_order_no_set_dict = {}
|
|
# 主动买单数据,格式:{"code":[买单号,当前成交手数, 当前成交金额, 开始时间, 结束时间]}
|
__latest_active_buy_order_data_dict = {}
|
# 所有的买单数据,格式:{code:[[买单数据],...]}
|
__active_buy_order_datas_dict = {}
|
|
@classmethod
|
def clear_passive_sell_data(cls, code):
|
"""
|
清除被动卖相关的数据
|
@param code:
|
@return:
|
"""
|
if code in cls.__latest_active_buy_order_data_dict:
|
cls.__latest_active_buy_order_data_dict.pop(code)
|
if code in cls.__active_buy_order_datas_dict:
|
cls.__active_buy_order_datas_dict.pop(code)
|
if code in cls.__latest_limit_up_sell_list_dict:
|
cls.__latest_limit_up_sell_list_dict.pop(code)
|
if code in cls.__latest_limit_up_sell_order_no_set_dict:
|
cls.__latest_limit_up_sell_order_no_set_dict.pop(code)
|
|
@classmethod
|
def add_l2_delegate_limit_up_sell(cls, code, data):
|
"""
|
添加涨停卖单数据,当前不是涨停成交时才加
|
@param code:
|
@param data:
|
@return:
|
"""
|
if code not in cls.__latest_limit_up_sell_list_dict:
|
cls.__latest_limit_up_sell_list_dict[code] = []
|
cls.__latest_limit_up_sell_list_dict[code].append(data)
|
# async_log_util.info(logger_debug, f"添加涨停卖数据:{code}-{data}")
|
if code not in cls.__latest_limit_up_sell_order_no_set_dict:
|
cls.__latest_limit_up_sell_order_no_set_dict[code] = set()
|
cls.__latest_limit_up_sell_order_no_set_dict[code].add(data['val']['orderNo'])
|
# 只保留前20的数据
|
if len(cls.__latest_limit_up_sell_list_dict[code]) > 20:
|
delete_datas = cls.__latest_limit_up_sell_list_dict[code][:-20]
|
cls.__latest_limit_up_sell_list_dict[code] = cls.__latest_limit_up_sell_list_dict[code][-20:]
|
# 删除之前的map
|
for d in delete_datas:
|
cls.__latest_limit_up_sell_order_no_set_dict[code].discard(d["val"]["orderNo"])
|
|
@classmethod
|
def add_l2_delegate_limit_up_sell_cancel(cls, code, order_no):
|
"""
|
涨停卖撤
|
@param code:
|
@param order_no:
|
@return:
|
"""
|
if code not in cls.__latest_limit_up_sell_order_no_set_dict:
|
return
|
order_no_set = cls.__latest_limit_up_sell_order_no_set_dict[code]
|
if order_no not in order_no_set:
|
return
|
cls.__latest_limit_up_sell_order_no_set_dict[code].discard(order_no)
|
|
for i in range(0, len(cls.__latest_limit_up_sell_list_dict[code])):
|
if cls.__latest_limit_up_sell_list_dict[code][i]['val']['orderNo'] == order_no:
|
cls.__latest_limit_up_sell_list_dict[code].pop(i)
|
break
|
|
@classmethod
|
def get_latest_limit_up_sell_order_count(cls, code):
|
sell_list = cls.__latest_limit_up_sell_list_dict.get(code)
|
if not sell_list:
|
return 0
|
return len(sell_list)
|
|
@classmethod
|
def process_passive_limit_up_sell_data(cls, code, fdatas):
|
"""
|
添加涨停被动卖成交数据
|
@param fdata: 数据格式:(data['SecurityID'], data['TradePrice'], data['TradeVolume'],
|
# data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'],
|
# data['SellNo'], data['ExecType'])
|
|
[(数据本身, 是否主动买, 是否涨停, 总成交额, 不含ms时间,含ms时间)]
|
@return: 涨停卖是否已经吃完
|
"""
|
try:
|
start_time = time.time()
|
sell_list = cls.__latest_limit_up_sell_list_dict.get(code)
|
if not sell_list:
|
return False
|
last_sell_info = sell_list[-1]
|
for data in fdatas:
|
# 过滤被动买
|
if not data[1]:
|
# 出现被动买需要将历史大单清空
|
if cls.__active_buy_order_datas_dict.get(code):
|
cls.__active_buy_order_datas_dict[code].clear()
|
continue
|
money = data[3]
|
# 统计买单数据
|
if code not in cls.__latest_active_buy_order_data_dict:
|
# [买单号,当前成交股数, 当前成交金额, 开始时间, 结束时间]
|
cls.__latest_active_buy_order_data_dict[code] = [data[0][6], data[0][2], money, data[0][3],
|
data[0][3]]
|
else:
|
if data[0][6] == cls.__latest_active_buy_order_data_dict[code][0]:
|
# 同一买单号
|
cls.__latest_active_buy_order_data_dict[code][1] += data[0][2]
|
cls.__latest_active_buy_order_data_dict[code][2] += money
|
cls.__latest_active_buy_order_data_dict[code][4] = data[0][3]
|
else:
|
# 不同买单号
|
if cls.__latest_active_buy_order_data_dict[code][2] >= 2990000:
|
# 加入大单列表
|
if code not in cls.__active_buy_order_datas_dict:
|
cls.__active_buy_order_datas_dict[code] = []
|
cls.__active_buy_order_datas_dict[code].append(
|
cls.__latest_active_buy_order_data_dict[code])
|
|
cls.__latest_active_buy_order_data_dict[code] = [data[0][6], data[0][2], money, data[0][3],
|
data[0][3]]
|
|
if not data[2]:
|
# 排除主动卖/非涨停卖
|
continue
|
sell_no = data[0][7]
|
if sell_no != last_sell_info['val']['orderNo']:
|
continue
|
# 需要判断当前单是否已经成交完成
|
if code not in cls.__latest_sell_data:
|
cls.__latest_sell_data[code] = [sell_no, data[0][2]]
|
else:
|
if cls.__latest_sell_data[code][0] == sell_no:
|
cls.__latest_sell_data[code][1] += data[0][2]
|
else:
|
cls.__latest_sell_data[code] = [sell_no, data[0][2]]
|
sell_info_num = last_sell_info['val']['num']
|
deal_num = cls.__latest_sell_data[code][1] // 100
|
if sell_info_num == deal_num:
|
use_time = round((time.time() - start_time) * 1000, 3)
|
l2_log.info(code, logger_l2_trade_buy,
|
f"找到最近的被动涨停卖单数据:{last_sell_info['val']['orderNo']}, 成交数据:{data} 计算耗时:{use_time}ms, 可以触发下单")
|
|
# 将历史大单列表与最近的大单加入列表
|
big_buy_order_datas = []
|
if code in cls.__active_buy_order_datas_dict:
|
big_buy_order_datas.extend(cls.__active_buy_order_datas_dict[code])
|
if cls.__latest_active_buy_order_data_dict[code][2] >= 2990000:
|
big_buy_order_datas.append(cls.__latest_active_buy_order_data_dict[code])
|
# 成交完成
|
L2TradeSingleDataManager.set_latest_sell_data(code, data, big_buy_order_datas)
|
l2_log.info(code, logger_l2_trade_buy, "被动卖数据处理完毕")
|
if tool.is_sz_code(code):
|
# 涨停主动卖已经被吃完,可以清除
|
return True
|
break
|
except Exception as e:
|
logger_debug.exception(e)
|
return False
|
|
@classmethod
|
def filter_last_limit_up_sell_data(cls, code, fdatas):
|
"""
|
筛选出最后一条涨停卖成交数据
|
@param code:
|
@param fdatas:
|
@return: (成交数据, 卖单数据)
|
"""
|
|
def compute_last_sell():
|
# 最大买单号
|
max_buy_order_no = fdatas[-1][0][6]
|
for i in range(len(sell_list) - 1, -1, -1):
|
if sell_list[i]['val']['orderNo'] > max_buy_order_no:
|
continue
|
return sell_list[i]
|
return None
|
|
if not fdatas[-1][2]:
|
# 最后一条数据不是涨停成交数据
|
return None
|
|
sell_list = cls.__latest_limit_up_sell_list_dict.get(code)
|
if not sell_list:
|
return None
|
last_sell_info = compute_last_sell()
|
if not last_sell_info:
|
return None
|
for data in fdatas:
|
if not data[2]:
|
# 排除主动卖/非涨停卖
|
continue
|
sell_no = data[0][7]
|
if sell_no != last_sell_info['val']['orderNo']:
|
continue
|
# 需要判断当前单是否已经成交完成
|
if code not in cls.__latest_sell_data:
|
cls.__latest_sell_data[code] = [sell_no, data[0][2]]
|
else:
|
if cls.__latest_sell_data[code][0] == sell_no:
|
cls.__latest_sell_data[code][1] += data[0][2]
|
else:
|
cls.__latest_sell_data[code] = [sell_no, data[0][2]]
|
sell_info_num = last_sell_info['val']['num']
|
deal_num = cls.__latest_sell_data[code][1] // 100
|
if sell_info_num == deal_num:
|
# 最后一笔涨停卖已经成交完成
|
l2_log.info(code, logger_l2_trade_buy,
|
f"找到最近的被动涨停卖单数据:{last_sell_info}, 成交数据:{data} 可以触发下单")
|
return data, last_sell_info
|
return None
|
|
@classmethod
|
def add_active_limit_up_sell_data(cls, data):
|
"""
|
添加主动卖数据
|
@param data:
|
@return:
|
"""
|
try:
|
code = data[0]
|
sell_no = data[7]
|
if code not in cls.__active_sell_order_no_set_dict:
|
cls.__active_sell_order_no_set_dict[code] = set()
|
cls.__active_sell_order_no_set_dict[code].add(sell_no)
|
|
except Exception as e:
|
logger_debug.exception(e)
|
|
|
class L2TradeSingleCallback:
|
"""
|
交易信号回调
|
"""
|
|
def OnTradeSingle(self, code, big_buy_order_count, _type, data):
|
"""
|
交易数据信号回调
|
@param code:
|
@param big_buy_order_count: 大买单数量
|
@param _type: 类型:0-TYPE_PASSIVE 1-TYPE_ACTIVE
|
@param data: (逐笔成交数据,生效时间)
|
@return:
|
"""
|
|
def OnLimitUpActiveBuy(self, code, transaction_data, no_left_limit_up_sell):
|
"""
|
涨停主动买触发
|
@param code:
|
@param transaction_data: 成交数据
|
@param no_left_limit_up_sell: 是否还剩余涨停卖
|
@return:
|
"""
|
|
def OnLastLimitUpSellDeal(self, code, data):
|
"""
|
最后一笔涨停卖成交
|
@param code: 代码
|
@param data: 成交的数据
|
@return:
|
"""
|
|
|
class L2TradeSingleDataManager:
|
__callback = None
|
|
TYPE_PASSIVE = 0
|
TYPE_ACTIVE = 1
|
|
"""
|
买入信号管理
|
"""
|
# 最近的涨停卖被动成交数据
|
__latest_sell_data_dict = {} # 格式为:{code:(逐笔成交数据,生效时间(带ms))}
|
|
# 由被动向主动卖成交转变的数据
|
__latest_sell_active_deal_data_dict = {} # 格式为:{code:(逐笔成交数据,生效时间(带ms))}
|
|
@classmethod
|
def set_callback(cls, callback: L2TradeSingleCallback):
|
cls.__callback = callback
|
|
@classmethod
|
def set_latest_sell_data(cls, code, fdata, big_active_buy_order_datas):
|
|
"""
|
设置最近成交的涨停卖被动成交数据
|
@param code: 代码
|
@param data: L2逐笔成交数据 数据格式:(data['SecurityID'], data['TradePrice'], data['TradeVolume'],
|
# data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'],
|
# data['SellNo'], data['ExecType'])
|
@param big_active_buy_order_datas: 大主动买单数据:[[买单号,当前成交股数, 当前成交金额, 开始时间, 结束时间],....]
|
@return:
|
"""
|
deal_time = fdata[5]
|
# 生效时间在1s以内
|
cls.__latest_sell_data_dict[code] = (fdata[0], tool.trade_time_add_millionsecond(deal_time, 1000))
|
if cls.__callback:
|
big_buy_order_count = 0
|
if big_active_buy_order_datas:
|
for b in big_active_buy_order_datas:
|
if b[0] > fdata[0][7]:
|
# 买单在卖单之后
|
big_buy_order_count += 1
|
|
cls.__callback.OnTradeSingle(code, big_buy_order_count, cls.TYPE_PASSIVE, cls.__latest_sell_data_dict[code])
|
|
@classmethod
|
def set_sell_passive_to_active_datas(cls, code, passive_data, active_data):
|
"""
|
设置被动卖成交向主动卖成交的转变数据
|
@param code: 代码
|
@param passive_data: 被动卖成交逐笔
|
@param active_data: 主动卖成交逐笔
|
@return:
|
"""
|
# 暂时不需要生效
|
l2_log.info(code, logger_l2_trade_buy, f"被动卖变主动卖:{passive_data} => {active_data}")
|
# deal_time = l2_huaxin_util.convert_time(passive_data[3], True)
|
# # 生效时间在1s以内
|
# cls.__latest_sell_active_deal_data_dict[code] = (
|
# active_data, tool.trade_time_add_millionsecond(deal_time, 1000))
|
# if cls.__callback:
|
# cls.__callback.OnTradeSingle(code, 0, cls.TYPE_ACTIVE, cls.__latest_sell_active_deal_data_dict[code])
|
|
@classmethod
|
def set_limit_up_active_buy(cls, code, transaction_datas, no_left_limit_up_sell):
|
if transaction_datas:
|
cls.__callback.OnLimitUpActiveBuy(code, transaction_datas, no_left_limit_up_sell)
|
|
@classmethod
|
def get_valid_trade_single(cls, code, latest_time_with_ms):
|
"""
|
获取有效的成交下单信号
|
@param code:
|
@param latest_time_with_ms:
|
@return: (逐笔成交数据, 类型)
|
"""
|
# 如果有最近卖涨停成交完就用最近的数据
|
data = cls.__latest_sell_data_dict.get(code)
|
if data and tool.trade_time_sub_with_ms(latest_time_with_ms, data[1]) <= 0:
|
return data, cls.TYPE_PASSIVE
|
data = cls.__latest_sell_active_deal_data_dict.get(code)
|
if data and tool.trade_time_sub_with_ms(latest_time_with_ms, data[1]) <= 0:
|
return data, cls.TYPE_ACTIVE
|
return None
|
|
@classmethod
|
def is_can_place_order(cls, code, buy_data):
|
"""
|
是否可以下单
|
@param code: 代码
|
@param buy_data: L2逐笔委托大于50w的买入数据
|
@return: (是否可以下单, 原因)
|
"""
|
buy_data_time_with_ms = tool.to_time_with_ms(buy_data['val']['time'], buy_data['val']['tms'])
|
single = cls.get_valid_trade_single(code, buy_data_time_with_ms)
|
if not single:
|
return False, "没有找到可用信号"
|
if single[0][0][6] < buy_data['val']['orderNo']:
|
# 信号位置的成交买单号要小于买入数据的买单号才行
|
return True, f"找到信号:{single}"
|
else:
|
return False, f"买单没有在信号位之后 信号:{single}"
|
|
@classmethod
|
def clear_data(cls, code):
|
if code in cls.__latest_sell_data_dict:
|
cls.__latest_sell_data_dict.pop(code)
|
|
if code in cls.__latest_sell_active_deal_data_dict:
|
cls.__latest_sell_active_deal_data_dict.pop(code)
|
|
L2TradeSingleDataProcessor.clear_passive_sell_data(code)
|