import json
|
import logging
|
import multiprocessing
|
import threading
|
import time
|
|
import schedule
|
|
import constant
|
from code_atrribute import gpcode_manager, history_k_data_util
|
from code_atrribute.gpcode_manager import CodesNameManager
|
from code_atrribute.history_k_data_util import HistoryKDatasUtils
|
from code_atrribute.position_code_data_manager import PositionCodeDataManager
|
from huaxin_client.client_network import SendResponseSkManager
|
from huaxin_client.l2_data_transform_protocol import L2DataCallBack
|
from l2 import l2_data_util
|
from l2.huaxin import l2_huaxin_util
|
from l2.l2_data_manager import L2DataProcessor
|
from l2.l2_data_util import local_today_datas, local_today_num_operate_map, local_today_buyno_map, \
|
local_today_canceled_buyno_map, L2DataUtil
|
from log_module import async_log_util
|
from log_module.log import logger_trade, logger_debug, logger_system, logger_local_huaxin_l1_trade_info, \
|
logger_trade_position_api_request, logger_l2_error, hx_logger_l2_transaction, printlog
|
from trade import huaxin_trade_data_update, huaxin_sell_util, huaxin_trade_api
|
from trade.huaxin_trade_record_manager import PositionManager, DelegateSellOrderManager
|
from trade.l2_data_manager import L2TransactionDataManager
|
from trade.sell_rule_manager import TradeRuleManager, SellRule, AutoCancelSellModeManager
|
from utils import outside_api_command_manager, middle_api_protocol, tool, huaxin_util, socket_util, cb_data_util, \
|
kpl_data_manager
|
from utils.outside_api_command_manager import ActionCallback
|
import concurrent.futures
|
|
|
class OutsideApiCommandCallback(ActionCallback):
|
__cancel_sell_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=8)
|
|
def __init__(self, queue_strategy_w_l2_r):
|
self.queue_strategy_w_l2_r = queue_strategy_w_l2_r
|
|
@classmethod
|
def __send_response(cls, data_bytes):
|
sk = SendResponseSkManager.create_send_response_sk(addr=middle_api_protocol.SERVER_HOST,
|
port=middle_api_protocol.SERVER_PORT)
|
try:
|
data_bytes = socket_util.load_header(data_bytes)
|
sk.sendall(data_bytes)
|
result, header_str = socket_util.recv_data(sk)
|
result = json.loads(result)
|
if result["code"] != 0:
|
raise Exception(result['msg'])
|
finally:
|
sk.close()
|
|
def send_response(self, data, _client_id, _request_id):
|
data_bytes = json.dumps({"type": "response", "data": data, "client_id": _client_id,
|
"request_id": _request_id}).encode('utf-8')
|
for i in range(3):
|
try:
|
self.__send_response(data_bytes)
|
printlog("发送数据成功")
|
break
|
except Exception as e1:
|
logging.exception(e1)
|
|
# 撤长期没有成交的单
|
def __cancel_not_deal_order(self, code, order_ref, timeout=3):
|
time.sleep(timeout)
|
# 撤买单
|
huaxin_trade_api.cancel_order(1, code, "", orderRef=order_ref)
|
|
def OnTrade(self, client_id, request_id, data):
|
try:
|
trade_type = data["trade_type"]
|
if trade_type == outside_api_command_manager.TRADE_TYPE_ORDER:
|
code = data["code"]
|
direction = data["direction"]
|
volume = data["volume"]
|
price_type = data["price_type"]
|
price = data["price"]
|
sinfo = data["sinfo"]
|
if direction == 2:
|
# price_type: 0-价格笼子 1-跌停价 2-涨停价 3-现价 4-买5价
|
async_log_util.info(logger_trade, f"API卖: 接收数据-{data}")
|
current_price = PositionCodeDataManager.get_l1_current_price(code)
|
if current_price is None or current_price <= 0.0:
|
results = history_k_data_util.HistoryKDatasUtils.get_gp_current_info([code])
|
if results:
|
current_price = results[0]["price"]
|
|
limit_down_price = gpcode_manager.get_limit_down_price(code)
|
limit_up_price = gpcode_manager.get_limit_up_price(code)
|
order_ref = huaxin_util.create_order_ref()
|
try:
|
# 如果卖量比持仓还大,就全部卖完
|
position_volume = PositionManager().get_code_volume_cache(code)
|
if position_volume and volume >= position_volume:
|
# 表示全部卖完
|
volume = -1
|
result = huaxin_sell_util.start_sell(code, volume, price_type, limit_up_price, limit_down_price,
|
current_price, blocking=False, request_id=request_id,
|
order_ref=order_ref)
|
async_log_util.info(logger_trade, f"API卖结果: {result}")
|
self.send_response(result, client_id, request_id)
|
except Exception as e:
|
if str(e).find("超时") >= 0:
|
self.send_response({"code": 0, "data": {"orderRef": order_ref}}, client_id, request_id)
|
else:
|
raise e
|
else:
|
if not price:
|
if tool.trade_time_sub(tool.get_now_time_str(), "09:30:00") < 0:
|
# 开盘之前
|
limit_down_price = gpcode_manager.get_limit_down_price(code)
|
if not limit_down_price:
|
raise Exception("尚未获取跌停价")
|
# 比跌停价高1分
|
price = round(float(limit_down_price) + 0.01, 2)
|
else:
|
# 开盘之后
|
# 没有传入价格,以最新价买入
|
current_price = PositionCodeDataManager.get_l1_current_price(code)
|
if not current_price:
|
raise Exception("尚未获取到现价")
|
# 获取买1金额
|
price = round(float(current_price), 2)
|
buy1_info = PositionCodeDataManager.get_l1_current_buy1_data(code)
|
if buy1_info and buy1_info[0] * buy1_info[1] > 50 * 10000:
|
# 如果买1在50w以上就加一档
|
price += 0.01
|
limit_up_price = gpcode_manager.get_limit_up_price(code)
|
if limit_up_price and price > float(limit_up_price):
|
price = round(float(limit_up_price), 2)
|
order_ref = huaxin_util.create_order_ref()
|
result = huaxin_trade_api.order(direction, code, volume, price, price_type=price_type,
|
sinfo=sinfo, order_ref=order_ref,
|
blocking=True, request_id=request_id)
|
# 2s内没成交就撤单
|
self.__cancel_sell_thread_pool.submit(self.__cancel_not_deal_order, code, order_ref)
|
else:
|
result = huaxin_trade_api.order(direction, code, volume, price, price_type=price_type,
|
sinfo=sinfo,
|
blocking=True, request_id=request_id)
|
self.send_response({"code": 0, "data": result}, client_id, request_id)
|
except Exception as e:
|
logger_debug.exception(e)
|
self.send_response({"code": 1, "msg": str(e)}, client_id, request_id)
|
|
def OnSellRule(self, client_id, request_id, data):
|
try:
|
operate = data["operate"]
|
if operate == outside_api_command_manager.OPERRATE_ADD:
|
data = data["data"]
|
code = data["code"]
|
type = data["type"]
|
buy1_price = data.get("buy1_price")
|
if not buy1_price:
|
buy1_price = gpcode_manager.get_limit_up_price(code)
|
if not buy1_price:
|
raise Exception("尚未获取到涨停价")
|
rule = SellRule(code=data["code"], buy1_volume=data["buy1_volume"], buy1_price=buy1_price,
|
sell_volume=data.get("sell_volume"), sell_price_type=data.get("sell_price_type"),
|
end_time=data["end_time"], type=type)
|
TradeRuleManager().add_rule(rule)
|
self.send_response({"code": 0, "data": {}}, client_id, request_id)
|
elif operate == outside_api_command_manager.OPERRATE_SET:
|
data = data["data"]
|
code = data["code"]
|
buy1_price = data.get("buy1_price")
|
if not buy1_price:
|
buy1_price = gpcode_manager.get_limit_up_price(code)
|
if not buy1_price:
|
gpcode_manager.request_price_pre([code])
|
buy1_price = gpcode_manager.get_limit_up_price(code)
|
if not buy1_price:
|
raise Exception("尚未获取到涨停价")
|
rule = SellRule(id_=data["id"], code=data["code"], buy1_volume=data["buy1_volume"],
|
buy1_price=buy1_price,
|
sell_volume=data.get("sell_volume"), sell_price_type=data.get("sell_price_type"),
|
end_time=data["end_time"])
|
TradeRuleManager().update_rule(rule)
|
self.send_response({"code": 0, "data": {}}, client_id, request_id)
|
elif operate == outside_api_command_manager.OPERRATE_DELETE:
|
data = data["data"]
|
TradeRuleManager().del_rule(data["id"])
|
self.send_response({"code": 0, "data": {}}, client_id, request_id)
|
elif operate == outside_api_command_manager.OPERRATE_GET:
|
rules = TradeRuleManager().list_rules()
|
fresults = []
|
for rule in rules:
|
fresults.append(rule.to_dict())
|
self.send_response({"code": 0, "data": fresults}, client_id, request_id)
|
except Exception as e:
|
self.send_response({"code": 1, "msg": str(e)}, client_id, request_id)
|
|
def OnGetCodePositionInfo(self, client_id, request_id, data):
|
code = data.get("code")
|
__start_time = time.time()
|
try:
|
if not tool.is_can_sell_code(code):
|
raise Exception("非主板代码")
|
# 获取持仓
|
positions = PositionManager.latest_positions
|
trade_rules_count = len(TradeRuleManager().list_can_excut_rules_cache())
|
|
fdata = {"code": code, "total": 0, "available": 0, "sell_orders": [], "sell_rules_count": trade_rules_count,
|
"cost_price": 0, "code_info": (code, ''), "desc": "", "cost_price_rate": 0}
|
if positions:
|
for d in positions:
|
if d["securityID"] != code:
|
continue
|
code_name = d["securityName"]
|
fdata["code_info"] = (code, code_name)
|
if d["prePosition"] <= 0 and d["todayBSPos"] <= 0:
|
continue
|
fdata["total"] = d["prePosition"] + d["todayBSPos"]
|
fdata["available"] = d["availablePosition"]
|
fdata["cost_price"] = round(float(d["historyPosPrice"]), 2)
|
break
|
if fdata['total'] <= 0:
|
# 没有持仓,需要获取代码名称
|
name = CodesNameManager.get_code_name(code)
|
if not name:
|
threading.Thread(target=lambda: CodesNameManager.request_code_name(code), daemon=True).start()
|
if name:
|
fdata["code_info"] = (code, name)
|
|
# 有现价就获取现价
|
current_price = PositionCodeDataManager.get_l1_current_price(code)
|
pre_close_price = gpcode_manager.get_price_pre_cache(code)
|
if current_price:
|
fdata["cost_price"] = current_price
|
pre_close_price = gpcode_manager.get_price_pre_cache(code)
|
if current_price and pre_close_price:
|
rate = round((float(current_price) - float(pre_close_price)) / float(pre_close_price), 4)
|
fdata["cost_price_rate"] = rate
|
elif pre_close_price:
|
fdata["cost_price"] = pre_close_price
|
fdata["cost_price_rate"] = 0
|
# 获取涨幅
|
async_log_util.info(logger_trade_position_api_request, f"{fdata}")
|
result = {"code": 0, "data": fdata}
|
self.send_response(result, client_id, request_id)
|
except Exception as e:
|
logging.exception(e)
|
self.send_response({"code": 1, "msg": f"数据处理出错:{e}"}, client_id, request_id)
|
finally:
|
use_time = time.time() - __start_time
|
if use_time > 0.01:
|
# 耗时10ms以上才记录日志
|
async_log_util.info(logger_trade_position_api_request, f"{code}请求持仓耗时:{use_time * 1000}ms")
|
|
def OnRefreshTradeData(self, client_id, request_id, data):
|
logger_debug.info(f"交易数据刷新:{data}")
|
try:
|
sync_type = data["ctype"]
|
if sync_type == "position_list":
|
huaxin_trade_data_update.add_position_list()
|
self.send_response({"code": 0, "data": {}, "msg": ""}, client_id, request_id)
|
except Exception as e:
|
self.send_response({"code": 1, "msg": str(e)}, client_id, request_id)
|
|
def OnCommonRequest(self, client_id, request_id, data):
|
ctype = data["ctype"]
|
if ctype == "get_positions":
|
# 获取所有持仓信息
|
positions = PositionManager.latest_positions
|
fdatas = []
|
if positions:
|
for d in positions:
|
code_ = d["securityID"]
|
code_name = d["securityName"]
|
if d["prePosition"] <= 0 and d["todayBSPos"] <= 0:
|
continue
|
# 获取现价
|
fdata = {"code": code_, "code_name": code_name, "total": d["prePosition"]+d["todayBSPos"],
|
"available": d["availablePosition"], "cost_price": d["historyPosPrice"], "volume_rate": 0}
|
|
current_volume, pre_volume = PositionCodeDataManager.get_l1_current_volume(
|
code_), PositionCodeDataManager.get_pre_volume(code_)
|
if current_volume and pre_volume and pre_volume > 0:
|
fdata["volume_rate"] = round(current_volume / pre_volume, 2)
|
current_price = PositionCodeDataManager.get_l1_current_price(code_)
|
if current_price:
|
fdata["cost_price"] = current_price
|
fdatas.append(fdata)
|
result = {"code": 0, "data": fdatas}
|
self.send_response(result, client_id, request_id)
|
elif ctype == "get_buy1_info":
|
# 获取代码的买1信息
|
code = data["code"]
|
results = HistoryKDatasUtils.get_gp_current_info([code])
|
item = results[0]["quotes"][0]
|
result = {"code": 0, "data": {"price": item["bid_p"], "volume": item["bid_v"]}}
|
self.send_response(result, client_id, request_id)
|
elif ctype == "get_current_l1_codes":
|
codes = L1DataProcessor.get_latest_update_codes()
|
result = {"code": 0, "data": list(codes)}
|
self.send_response(result, client_id, request_id)
|
elif ctype == "get_l2_deal_price":
|
# 根据L2数据获取成交价
|
code = data["code"]
|
price_info = L2DataProcessor.get_deal_price(code)
|
if price_info:
|
fdata = {"price": price_info[0], "time": price_info[1]}
|
pre_price = gpcode_manager.get_price_pre_cache(code)
|
if pre_price:
|
fdata["rate"] = round((price_info[0] - pre_price) / pre_price, 4)
|
result = {"code": 0, "data": fdata}
|
else:
|
result = {"code": 1, "msg": "尚未获取到价格"}
|
self.send_response(result, client_id, request_id)
|
elif ctype == "get_l2_datas":
|
# 获取L2数据
|
code = data["code"]
|
max_time = data["max_time"]
|
min_money = data["min_money"]
|
if not local_today_datas:
|
l2_data_util.load_l2_data_all(True)
|
total_datas = l2_data_util.local_today_datas.get(code)
|
# 只获取买与卖
|
for d in total_datas:
|
val = d['val']
|
if tool.trade_time_sub(d['val']['time'], max_time) > 0:
|
break
|
if val['num'] * float(val['price']) * 100 < min_money:
|
continue
|
if L2DataUtil.is_buy(val):
|
pass
|
if L2DataUtil.is_sell(val):
|
pass
|
|
codes = L1DataProcessor.get_latest_update_codes()
|
result = {"code": 0, "data": list(codes)}
|
self.send_response(result, client_id, request_id)
|
elif ctype == "buy_cb_for_commission":
|
# 用可转债填交易费漏洞
|
code = data["code"]
|
volume = data["volume"]
|
current_price = None
|
results = history_k_data_util.HistoryKDatasUtils.get_gp_current_info([code])
|
if results:
|
current_price = results[0]["price"]
|
order_ref = huaxin_util.create_order_ref()
|
try:
|
if not current_price:
|
raise Exception("尚未获取到现价")
|
price = round(tool.get_buy_max_price(current_price), 3)
|
sinfo = f"b_{code}_{int(time.time() * 1000)}"
|
# 需要立即卖的数据
|
# 下单之后需要立即卖出
|
cb_data_util.need_sell_sinfos.add(sinfo)
|
result = huaxin_trade_api.order(1, code, volume, price, blocking=True, order_ref=order_ref, sinfo=sinfo)
|
if result:
|
if result["code"] == 0:
|
sinfo = result["data"]["sinfo"]
|
pass
|
# 记录当前的sinfo
|
async_log_util.info(logger_trade, f"API可转债买结果: {result}")
|
self.send_response(result, client_id, request_id)
|
# 订阅L2,用于卖
|
constant.SUBSCRIPT_L2_CODES.add(code)
|
self.queue_strategy_w_l2_r.put_nowait(json.dumps(
|
{"type": "l2_cmd", "data": list(constant.SUBSCRIPT_L2_CODES)}))
|
except Exception as e:
|
if str(e).find("超时") >= 0:
|
self.send_response({"code": 0, "data": {"orderRef": order_ref}}, client_id, request_id)
|
else:
|
raise e
|
elif ctype == "sell_cb_for_commission":
|
# 用可转债填交易费漏洞
|
code = data["code"]
|
volume = data["volume"]
|
current_price = None
|
deal_price_info = L2DataProcessor.get_deal_price(code)
|
if deal_price_info:
|
current_price = deal_price_info[0]
|
else:
|
results = history_k_data_util.HistoryKDatasUtils.get_gp_current_info([code])
|
if results:
|
current_price = results[0]["price"]
|
order_ref = huaxin_util.create_order_ref()
|
try:
|
if not current_price:
|
raise Exception("尚未获取到现价")
|
price = round(tool.get_buy_min_price(current_price) + 0.001, 3)
|
result = huaxin_trade_api.order(2, code, volume, price, order_ref=order_ref, blocking=True)
|
async_log_util.info(logger_trade, f"API可转债卖结果: {result},成交价信息:{deal_price_info}")
|
self.send_response(result, client_id, request_id)
|
except Exception as e:
|
if str(e).find("超时") >= 0:
|
self.send_response({"code": 0, "data": {"orderRef": order_ref}}, client_id, request_id)
|
else:
|
raise e
|
|
elif ctype == "get_deal_queue":
|
# 获取成交队列
|
code = data["code"]
|
try:
|
fdata = {}
|
# 获取已经成交的大单
|
buy_order_info_list = L2TransactionDataManager().get_big_buy_orders(code)
|
if buy_order_info_list is None:
|
buy_order_info_list = []
|
MAX_COUNT = 50
|
buy_order_info_list = buy_order_info_list[0 - MAX_COUNT:]
|
# (类型,订单号,时间,量, 金额, 价格, 成交比例百分数)
|
fdata["deal_list"] = [[0, x[0], l2_huaxin_util.convert_time(x[3]), x[1], x[2], str(x[4]), 100] for x in
|
buy_order_info_list]
|
buyno_map = local_today_buyno_map.get(code)
|
# 设置成交的进度
|
for x in fdata["deal_list"]:
|
data = buyno_map.get(f"{x[1]}")
|
if data:
|
x[6] = int(round(x[3] / data["val"]["num"]))
|
x[3] = data["val"]["num"] * 100
|
|
dealing_buy_order_info = L2TransactionDataManager().get_dealing_buy_order(code)
|
if dealing_buy_order_info:
|
|
data = buyno_map.get(f"{dealing_buy_order_info[0]}")
|
if data:
|
# (类型, 订单号, 时间, 量, 金额, 价格, 成交比例百分数)
|
fdata["dealing"] = (0, dealing_buy_order_info[0], data["val"]["time"], data["val"]["num"] * 100,
|
int(data["val"]["num"] * float(data["val"]["price"]) * 100),
|
data["val"]["price"],
|
int(round(dealing_buy_order_info[1] / data["val"]["num"])))
|
self.send_response({"code": 0, "data": fdata}, client_id, request_id)
|
except Exception as e:
|
logger_debug.exception(e)
|
raise e
|
elif ctype == "auto_cancel_sell_mode":
|
try:
|
operate = data["operate"]
|
code = data.get("code")
|
if operate == outside_api_command_manager.OPERRATE_SET:
|
mode = data["mode"]
|
AutoCancelSellModeManager().set_mode(code, mode)
|
self.send_response({"code": 0, "data": {"mode": mode}}, client_id, request_id)
|
elif operate == outside_api_command_manager.OPERRATE_GET:
|
sell_mode = AutoCancelSellModeManager().get_mode_cache(code)
|
self.send_response({"code": 0, "data": {"mode": sell_mode}}, client_id, request_id)
|
except Exception as e:
|
self.send_response({"code": 1, "msg": str(e)}, client_id, request_id)
|
|
|
class L1DataProcessor:
|
# L1数据更新时间
|
__l1_data_update_time_dict = {}
|
|
# 获取最近更新L1的代码
|
@classmethod
|
def get_latest_update_codes(cls):
|
now_time = time.time()
|
codes = set()
|
for code in cls.__l1_data_update_time_dict:
|
if now_time - cls.__l1_data_update_time_dict[code] < 10:
|
codes.add(code)
|
return codes
|
|
@classmethod
|
def set_l1_trade_codes_info(cls, data_json):
|
data = data_json["data"]
|
request_id = data_json["request_id"]
|
datas = data["data"]
|
PositionCodeDataManager.set_current_l1_datas(datas)
|
cls.process_for_sell(datas)
|
|
@classmethod
|
def excute_sell_rule(cls, code, buy1_volume, buy1_price, source="L1"):
|
"""
|
执行卖出规则
|
:param code:
|
:param buy1_volume:
|
:param buy1_price:
|
:param source:
|
:return:
|
"""
|
rules = TradeRuleManager().list_can_excut_rules_cache(types=[TradeRuleManager.TYPE_SELL])
|
if not rules:
|
return
|
excuted_rule_ids = set()
|
if buy1_volume is not None:
|
for r in rules:
|
if r.code != code:
|
continue
|
# --------判断是否可以执行--------
|
can_excute = False
|
if round(float(buy1_price), 2) <= round(float(r.buy1_price), 2):
|
# 价格已经触发
|
if r.buy1_volume:
|
if r.buy1_volume >= buy1_volume:
|
# 量价触发
|
can_excute = True
|
async_log_util.info(logger_trade,
|
f"触发卖规则({code}-{(buy1_price, buy1_volume, source)}):量触发{buy1_volume}/{r.buy1_volume}")
|
else:
|
can_excute = True
|
async_log_util.info(logger_trade,
|
f"触发卖规则({code}-{(buy1_price, buy1_volume, source)}):价格触发{buy1_price}/{r.buy1_price}")
|
# 价格触发
|
# 获取价格类型
|
if not can_excute:
|
continue
|
|
# 请求卖出锁
|
TradeRuleManager().require_sell_lock(r.id_)
|
try:
|
if r.id_ in excuted_rule_ids:
|
continue
|
excuted_rule_ids.add(r.id_)
|
# 获取最新的执行状况
|
r = TradeRuleManager().get_by_id(r.id_)
|
if r.excuted:
|
continue
|
# 提交卖
|
limit_down_price = gpcode_manager.get_limit_down_price(code)
|
limit_up_price = gpcode_manager.get_limit_up_price(code)
|
huaxin_sell_util.start_sell(code, r.sell_volume, r.sell_price_type, limit_up_price,
|
limit_down_price,
|
buy1_price)
|
TradeRuleManager().excuted(r.id_)
|
except Exception as e:
|
logger_debug.exception(e)
|
finally:
|
TradeRuleManager().release_sell_lock(r.id_)
|
|
@classmethod
|
def process_for_sell(cls, datas):
|
rules = TradeRuleManager().list_can_excut_rules_cache(types=[TradeRuleManager.TYPE_SELL])
|
excuted_rule_ids = set()
|
if rules:
|
for d in datas:
|
code = d[0]
|
cls.__l1_data_update_time_dict[code] = time.time()
|
# 格式 (代码,现价,涨幅,量,更新时间,买1价格,买1量)
|
buy1_volume = d[6]
|
buy1_price = d[5]
|
cls.excute_sell_rule(code, buy1_volume, buy1_price)
|
|
|
# 读取L1的数据
|
def __recv_pipe_l1_trade(queue_l1_trade_w_strategy_r: multiprocessing.Queue):
|
logger_system.info(f"trade_server __recv_pipe_l1_trade 线程ID:{tool.get_thread_id()}")
|
if queue_l1_trade_w_strategy_r is not None:
|
while True:
|
try:
|
val = queue_l1_trade_w_strategy_r.get()
|
if val:
|
async_log_util.info(logger_local_huaxin_l1_trade_info, f"客户端接收:{val}")
|
val = json.loads(val)
|
printlog("收到来自L1的数据:", val["type"])
|
# 处理数据
|
type_ = val["type"]
|
if type_ == "upload_l1_trade_datas":
|
# 处理专为交易提供的L1数据
|
L1DataProcessor.set_l1_trade_codes_info(val)
|
async_log_util.info(logger_local_huaxin_l1_trade_info, f"{val}")
|
except Exception as e:
|
logger_local_huaxin_l1_trade_info.exception(e)
|
logging.exception(e)
|
|
|
class MyL2DataCallback(L2DataCallBack):
|
|
def OnL2Order(self, code, origin_datas, timestamp):
|
if tool.is_cb_code(code):
|
return
|
# 保存L2数据
|
datas = None
|
try:
|
# 转换数据格式
|
_start_index = 0
|
total_datas = local_today_datas.get(code)
|
if code not in local_today_datas:
|
local_today_datas[code] = []
|
if total_datas:
|
_start_index = total_datas[-1]["index"] + 1
|
datas = l2_huaxin_util.get_format_l2_datas(code, origin_datas,
|
gpcode_manager.get_limit_up_price(code), _start_index)
|
if len(datas) > 0:
|
local_today_datas[code].extend(datas)
|
l2_data_util.load_num_operate_map(local_today_num_operate_map, code, datas)
|
l2_data_util.load_buy_no_map(local_today_buyno_map, code, datas)
|
l2_data_util.load_canceled_buy_no_map(local_today_canceled_buyno_map, code, datas)
|
except Exception as e:
|
async_log_util.error(logger_l2_error, f"code:{code}")
|
logger_l2_error.exception(e)
|
finally:
|
if datas:
|
l2_data_util.save_l2_data(code, None, datas)
|
origin_datas.clear()
|
|
def OnL2Transaction(self, code, datas):
|
# async_log_util.info(hx_logger_l2_transaction, f"{code}#{datas}")
|
if datas:
|
# 获取最近的成交价
|
price, time_str = datas[-1][1], l2_huaxin_util.convert_time(datas[-1][3])
|
L2DataProcessor.set_deal_price(code, price, time_str)
|
# 获取当前票是否有09:30之前的委托卖
|
try:
|
for data in datas:
|
L2TransactionDataManager().add_transaction_data(code, data)
|
except Exception as e:
|
logger_debug.exception(e)
|
try:
|
delegate_sell_orders = DelegateSellOrderManager.get_delegate_sell_orders(code)
|
if delegate_sell_orders:
|
for d in delegate_sell_orders:
|
# [(orderSysID, securityID, insertTime, limitPrice)]
|
if int(d[2].replace(":", "")) >= int("093000"):
|
continue
|
if round(d[3], 2) > price:
|
async_log_util.info(logger_trade, f"成交价格低于委托价格撤单:{price}-{d}")
|
# 已经不能成交,要撤单
|
huaxin_trade_api.cancel_order(2, d[1], d[0])
|
except Exception as e:
|
logger_debug.exception(e)
|
logger_debug.error(f"集合竞价卖撤处理出错:{code} - {str(e)}")
|
|
def OnMarketData(self, code, datas):
|
# logger_debug.info(f"收到L2Market数据:{datas}")
|
for d in datas:
|
code = d["securityID"]
|
buy1 = d["buy"][0]
|
L1DataProcessor.excute_sell_rule(code, buy1[1], buy1[0], "L2")
|
|
def OnRealTimeBuy1Info(self, code, buy1_info):
|
# buy1_info: [买1时间,买1价格, 原始买1量, 实时买1量]
|
rules = TradeRuleManager().list_can_excut_rules_cache(types=[TradeRuleManager.TYPE_SELL])
|
if not rules:
|
rules = []
|
codes = [rule.code for rule in rules]
|
if code in codes:
|
async_log_util.info(logger_debug, f"OnRealTimeBuy1Info:{code}-{buy1_info}")
|
L1DataProcessor.excute_sell_rule(code, buy1_info[3], buy1_info[1], "L2-real")
|
|
|
def __sell_for_last_day_break_limit_up():
|
"""
|
卖前一天的炸板
|
:return:
|
"""
|
try:
|
# 获取所有当前持仓
|
codes = PositionManager.get_position_codes()
|
|
yesterday_limit_up_codes = kpl_data_manager.KPLLimitUpDataManager().get_yesterday_limit_up_codes()
|
for code in codes:
|
if code in yesterday_limit_up_codes:
|
# 昨日涨停
|
continue
|
# 获取当前价格
|
price_info = L2DataProcessor.get_deal_price(code)
|
if not price_info:
|
continue
|
price = price_info[0]
|
pre_close_price = gpcode_manager.get_price_pre_cache(code)
|
pre_close_price = round(float(pre_close_price), 2)
|
rate = round((price - pre_close_price) * 100 / pre_close_price, 2)
|
# 获取现有持仓
|
avaiable_volume = PositionManager.get_code_volume_cache(code)
|
|
orders = []
|
big_orders = L2TransactionDataManager().get_big_buy_orders(code)
|
if big_orders:
|
for o in big_orders:
|
if o[2] >= 299e4:
|
orders.append(o)
|
logger_debug.info("昨日炸板,今日开盘信息:代码-{},开盘价-【{}】,涨幅-【{}】,持仓-【{}】,成交大单数量-【{}】", code, price, rate,
|
avaiable_volume,
|
len(orders))
|
if rate <= -2:
|
sell_volume_rate = 1
|
elif rate <= 0:
|
sell_volume_rate = 0.8
|
elif rate <= 2:
|
if len(orders) >= 1:
|
sell_volume_rate = 0.5
|
else:
|
sell_volume_rate = 0.6
|
else:
|
if len(orders) >= 1:
|
sell_volume_rate = 0.4
|
else:
|
sell_volume_rate = 0.6
|
sell_volume = int(round((avaiable_volume // 100) * sell_volume_rate))
|
logger_debug.info("昨日炸板,卖出信息:code-{},卖出比例-【{}】,卖出量-【{}】", sell_volume_rate, sell_volume)
|
except Exception as e:
|
logger_debug.exception(e)
|
|
|
# 做一些初始化的操作
|
def __init():
|
def run_pending():
|
while True:
|
schedule.run_pending()
|
time.sleep(1)
|
# 9点半后终止运行
|
if tool.trade_time_sub(tool.get_now_time_str(), "09:30:00") > 0:
|
break
|
|
# 请求持仓
|
def request_position():
|
for i in range(3):
|
time.sleep(10)
|
huaxin_trade_data_update.add_position_list()
|
|
l2_data_util.load_l2_data_all()
|
|
# 定时持仓刷新
|
schedule.every().day.at("09:00:00").do(huaxin_trade_data_update.add_position_list)
|
schedule.every().day.at("09:10:00").do(huaxin_trade_data_update.add_position_list)
|
schedule.every().day.at("09:15:00").do(huaxin_trade_data_update.add_position_list)
|
schedule.every().day.at("09:26:00").do(__sell_for_last_day_break_limit_up)
|
threading.Thread(target=run_pending, daemon=True).start()
|
threading.Thread(target=request_position, daemon=True).start()
|
|
|
l2_data_callbacks = []
|
|
|
def init_l2_data_callbacks():
|
for i in range(constant.MAX_L2_CHANNEL_COUNT):
|
l2_data_callbacks.append(MyL2DataCallback())
|
|
|
def run(queue_l1_trade_w_strategy_r, queue_l1_trade_r_strategy_w, queue_strategy_r_trade_w, queue_strategy_w_trade_r,
|
queue_strategy_w_l2_r):
|
try:
|
# 初始化
|
__init()
|
|
# 监听l1交易那边传过来的代码
|
t1 = threading.Thread(target=lambda: __recv_pipe_l1_trade(queue_l1_trade_w_strategy_r), daemon=True)
|
t1.start()
|
|
threading.Thread(target=lambda: async_log_util.run_sync(), daemon=True).start()
|
|
threading.Thread(target=lambda: async_log_util.l2_data_log.run_sync(), daemon=True).start()
|
|
huaxin_trade_data_update.run(queue_l1_trade_r_strategy_w, queue_strategy_w_l2_r)
|
|
huaxin_trade_api.run_pipe_trade(queue_strategy_r_trade_w, queue_strategy_w_trade_r)
|
|
manager = outside_api_command_manager.ApiCommandManager()
|
manager.init(middle_api_protocol.SERVER_HOST,
|
middle_api_protocol.SERVER_PORT,
|
OutsideApiCommandCallback(queue_strategy_w_l2_r))
|
manager.run(blocking=True)
|
except Exception as e:
|
logger_system.exception(e)
|