import json
|
import logging
|
import multiprocessing
|
import threading
|
import time
|
|
import schedule
|
|
from code_atrribute import gpcode_manager
|
from code_atrribute.history_k_data_util import HistoryKDatasUtils
|
from huaxin_client.client_network import SendResponseSkManager
|
from log_module import async_log_util
|
from log_module.log import logger_trade, logger_debug, logger_system, logger_local_huaxin_l1_trade_info, \
|
logger_trade_position_api_request
|
from trade import huaxin_trade_data_update, huaxin_sell_util, huaxin_trade_api
|
from trade.huaxin_trade_record_manager import PositionManager
|
from trade.sell_rule_manager import TradeRuleManager, SellRule
|
from utils import outside_api_command_manager, middle_api_protocol, tool, huaxin_util, socket_util
|
from utils.outside_api_command_manager import ActionCallback
|
import concurrent.futures
|
|
|
class OutsideApiCommandCallback(ActionCallback):
|
__cancel_sell_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=8)
|
|
@classmethod
|
def __send_response(cls, data_bytes):
|
sk = SendResponseSkManager.create_send_response_sk(addr=middle_api_protocol.SERVER_HOST,
|
port=middle_api_protocol.SERVER_PORT)
|
try:
|
data_bytes = socket_util.load_header(data_bytes)
|
sk.sendall(data_bytes)
|
result, header_str = socket_util.recv_data(sk)
|
result = json.loads(result)
|
if result["code"] != 0:
|
raise Exception(result['msg'])
|
finally:
|
sk.close()
|
|
def send_response(self, data, _client_id, _request_id):
|
data_bytes = json.dumps({"type": "response", "data": data, "client_id": _client_id,
|
"request_id": _request_id}).encode('utf-8')
|
for i in range(3):
|
try:
|
self.__send_response(data_bytes)
|
print("发送数据成功")
|
break
|
except Exception as e1:
|
logging.exception(e1)
|
|
# 撤长期没有成交的单
|
def __cancel_not_deal_order(self, code, order_ref, timeout=3):
|
time.sleep(timeout)
|
# 撤买单
|
huaxin_trade_api.cancel_order(1, code, "", orderRef=order_ref)
|
|
def OnTrade(self, client_id, request_id, data):
|
try:
|
trade_type = data["trade_type"]
|
if trade_type == outside_api_command_manager.TRADE_TYPE_ORDER:
|
code = data["code"]
|
direction = data["direction"]
|
volume = data["volume"]
|
price_type = data["price_type"]
|
price = data["price"]
|
sinfo = data["sinfo"]
|
if direction == 2:
|
# price_type: 0-价格笼子 1-跌停价 2-涨停价 3-现价 4-买5价
|
async_log_util.info(logger_trade, f"API卖: 接收数据-{data}")
|
current_price = L1DataProcessor.get_l1_current_price(code)
|
limit_down_price = gpcode_manager.get_limit_down_price(code)
|
limit_up_price = gpcode_manager.get_limit_up_price(code)
|
order_ref = huaxin_util.create_order_ref()
|
try:
|
result = huaxin_sell_util.start_sell(code, volume, price_type, limit_up_price, limit_down_price,
|
current_price, blocking=True, request_id=request_id,
|
order_ref=order_ref)
|
async_log_util.info(logger_trade, f"API卖结果: {result}")
|
self.send_response(result, client_id, request_id)
|
except Exception as e:
|
if str(e).find("超时") >= 0:
|
self.send_response({"code": 0, "data": {"orderRef": order_ref}}, client_id, request_id)
|
else:
|
raise e
|
else:
|
if not price:
|
if tool.trade_time_sub(tool.get_now_time_str(), "09:30:00") < 0:
|
# 开盘之前
|
limit_down_price = gpcode_manager.get_limit_down_price(code)
|
if not limit_down_price:
|
raise Exception("尚未获取跌停价")
|
# 比跌停价高1分
|
price = round(float(limit_down_price) + 0.01, 2)
|
else:
|
# 开盘之后
|
# 没有传入价格,以最新价买入
|
current_price = L1DataProcessor.get_l1_current_price(code)
|
if not current_price:
|
raise Exception("尚未获取到现价")
|
# 获取买1金额
|
price = round(float(current_price), 2)
|
buy1_info = L1DataProcessor.current_buy1_dict.get(code)
|
if buy1_info and buy1_info[0] * buy1_info[1] > 50 * 10000:
|
# 如果买1在50w以上就加一档
|
price += 0.01
|
limit_up_price = gpcode_manager.get_limit_up_price(code)
|
if limit_up_price and price > float(limit_up_price):
|
price = round(float(limit_up_price), 2)
|
order_ref = huaxin_util.create_order_ref()
|
result = huaxin_trade_api.order(direction, code, volume, price, price_type=price_type,
|
sinfo=sinfo, order_ref=order_ref,
|
blocking=True, request_id=request_id)
|
# 2s内没成交就撤单
|
self.__cancel_sell_thread_pool.submit(self.__cancel_not_deal_order, code, order_ref)
|
else:
|
result = huaxin_trade_api.order(direction, code, volume, price, price_type=price_type,
|
sinfo=sinfo,
|
blocking=True, request_id=request_id)
|
self.send_response({"code": 0, "data": result}, client_id, request_id)
|
except Exception as e:
|
logger_debug.exception(e)
|
self.send_response({"code": 1, "msg": str(e)}, client_id, request_id)
|
|
def OnSellRule(self, client_id, request_id, data):
|
try:
|
operate = data["operate"]
|
if operate == outside_api_command_manager.OPERRATE_ADD:
|
data = data["data"]
|
code = data["code"]
|
type = data["type"]
|
buy1_price = data.get("buy1_price")
|
if not buy1_price:
|
buy1_price = gpcode_manager.get_limit_up_price(code)
|
if not buy1_price:
|
raise Exception("尚未获取到涨停价")
|
rule = SellRule(code=data["code"], buy1_volume=data["buy1_volume"], buy1_price=buy1_price,
|
sell_volume=data.get("sell_volume"), sell_price_type=data.get("sell_price_type"),
|
end_time=data["end_time"], type=type)
|
TradeRuleManager().add_rule(rule)
|
self.send_response({"code": 0, "data": {}}, client_id, request_id)
|
elif operate == outside_api_command_manager.OPERRATE_SET:
|
data = data["data"]
|
code = data["code"]
|
buy1_price = data.get("buy1_price")
|
if not buy1_price:
|
buy1_price = gpcode_manager.get_limit_up_price(code)
|
if not buy1_price:
|
gpcode_manager.request_price_pre([code])
|
buy1_price = gpcode_manager.get_limit_up_price(code)
|
if not buy1_price:
|
raise Exception("尚未获取到涨停价")
|
rule = SellRule(id_=data["id"], code=data["code"], buy1_volume=data["buy1_volume"],
|
buy1_price=buy1_price,
|
sell_volume=data.get("sell_volume"), sell_price_type=data.get("sell_price_type"),
|
end_time=data["end_time"])
|
TradeRuleManager().update_rule(rule)
|
self.send_response({"code": 0, "data": {}}, client_id, request_id)
|
elif operate == outside_api_command_manager.OPERRATE_DELETE:
|
data = data["data"]
|
TradeRuleManager().del_rule(data["id"])
|
self.send_response({"code": 0, "data": {}}, client_id, request_id)
|
elif operate == outside_api_command_manager.OPERRATE_GET:
|
rules = TradeRuleManager().list_rules()
|
fresults = []
|
for rule in rules:
|
fresults.append(rule.to_dict())
|
self.send_response({"code": 0, "data": fresults}, client_id, request_id)
|
except Exception as e:
|
self.send_response({"code": 1, "msg": str(e)}, client_id, request_id)
|
|
def OnGetCodePositionInfo(self, client_id, request_id, data):
|
code = data.get("code")
|
__start_time = time.time()
|
try:
|
if not tool.is_shsz_code(code):
|
raise Exception("非主板代码")
|
# 获取持仓
|
positions = PositionManager.latest_positions
|
trade_rules_count = len(TradeRuleManager().list_can_excut_rules_cache())
|
|
fdata = {"code": code, "total": 0, "available": 0, "sell_orders": [], "sell_rules_count": trade_rules_count,
|
"cost_price": 0, "code_info": (code, ''), "desc": "", "cost_price_rate": 0}
|
if positions:
|
for d in positions:
|
code_name = d["securityName"]
|
fdata["code_info"] = (code, code_name)
|
if d["prePosition"] <= 0:
|
continue
|
if d["securityID"] != code:
|
continue
|
fdata["total"] = d["prePosition"]
|
fdata["available"] = d["availablePosition"]
|
fdata["cost_price"] = round(float(d["historyPosPrice"]), 2)
|
break
|
# 有现价就获取现价
|
current_price = L1DataProcessor.get_l1_current_price(code)
|
pre_close_price = gpcode_manager.get_price_pre_cache(code)
|
if current_price:
|
fdata["cost_price"] = current_price
|
pre_close_price = gpcode_manager.get_price_pre_cache(code)
|
if current_price and pre_close_price:
|
rate = round((float(current_price) - float(pre_close_price)) / float(pre_close_price), 4)
|
fdata["cost_price_rate"] = rate
|
elif pre_close_price:
|
fdata["cost_price"] = pre_close_price
|
fdata["cost_price_rate"] = 0
|
# 获取涨幅
|
async_log_util.info(logger_trade_position_api_request, f"{fdata}")
|
result = {"code": 0, "data": fdata}
|
self.send_response(result, client_id, request_id)
|
except Exception as e:
|
logging.exception(e)
|
self.send_response({"code": 1, "msg": f"数据处理出错:{e}"}, client_id, request_id)
|
finally:
|
use_time = time.time() - __start_time
|
if use_time > 0.01:
|
# 耗时10ms以上才记录日志
|
async_log_util.info(logger_trade_position_api_request, f"{code}请求持仓耗时:{use_time * 1000}ms")
|
|
def OnRefreshTradeData(self, client_id, request_id, data):
|
logger_debug.info(f"交易数据刷新:{data}")
|
try:
|
sync_type = data["ctype"]
|
if sync_type == "position_list":
|
huaxin_trade_data_update.add_position_list()
|
self.send_response({"code": 0, "data": {}, "msg": ""}, client_id, request_id)
|
except Exception as e:
|
self.send_response({"code": 1, "msg": str(e)}, client_id, request_id)
|
|
def OnCommonRequest(self, client_id, request_id, data):
|
ctype = data["ctype"]
|
if ctype == "get_positions":
|
# 获取所有持仓信息
|
positions = PositionManager.latest_positions
|
fdatas = []
|
if positions:
|
for d in positions:
|
code_ = d["securityID"]
|
code_name = d["securityName"]
|
if d["prePosition"] <= 0:
|
continue
|
fdatas.append({"code": code_, "code_name": code_name, "total": d["prePosition"],
|
"available": d["availablePosition"]})
|
result = {"code": 0, "data": fdatas}
|
self.send_response(result, client_id, request_id)
|
elif ctype == "get_buy1_info":
|
# 获取代码的买1信息
|
code = data["code"]
|
results = HistoryKDatasUtils.get_gp_current_info([code])
|
item = results[0]["quotes"][0]
|
result = {"code": 0, "data": {"price": item["bid_p"], "volume": item["bid_v"]}}
|
self.send_response(result, client_id, request_id)
|
elif ctype == "get_current_l1_codes":
|
codes = L1DataProcessor.get_latest_update_codes()
|
result = {"code": 0, "data": list(codes)}
|
self.send_response(result, client_id, request_id)
|
|
|
class L1DataProcessor:
|
__current_price_dict = {}
|
current_buy1_dict = {}
|
# L1数据更新时间
|
__l1_data_update_time_dict = {}
|
|
# 获取最近更新L1的代码
|
@classmethod
|
def get_latest_update_codes(cls):
|
now_time = time.time()
|
codes = set()
|
for code in cls.__l1_data_update_time_dict:
|
if now_time - cls.__l1_data_update_time_dict[code] < 10:
|
codes.add(code)
|
return codes
|
|
@classmethod
|
def set_l1_trade_codes_info(cls, data_json):
|
data = data_json["data"]
|
request_id = data_json["request_id"]
|
datas = data["data"]
|
cls.__save_l1_current_price(datas)
|
cls.process_for_sell(datas)
|
|
|
|
@classmethod
|
def process_for_sell(cls, datas):
|
rules = TradeRuleManager().list_can_excut_rules_cache(types=[TradeRuleManager.TYPE_SELL])
|
excuted_rule_ids = set()
|
if rules:
|
for d in datas:
|
code = d[0]
|
cls.__l1_data_update_time_dict[code] = time.time()
|
# 格式 (代码,现价,涨幅,量,更新时间,买1价格,买1量)
|
buy1_volume = d[6]
|
buy1_price = d[5]
|
if buy1_volume:
|
for r in rules:
|
# 生效时间
|
if r.code == code:
|
# --------判断是否可以执行--------
|
can_excute = False
|
if round(float(buy1_price), 2) <= round(float(r.buy1_price), 2):
|
# 价格已经触发
|
if r.buy1_volume:
|
if r.buy1_volume >= buy1_volume:
|
# 量价触发
|
can_excute = True
|
async_log_util.info(logger_trade, f"触发卖规则:量触发{buy1_volume}/{r.buy1_volume}")
|
else:
|
can_excute = True
|
async_log_util.info(logger_trade, f"触发卖规则:价格触发{buy1_price}/{r.buy1_price}")
|
# 价格触发
|
# 获取价格类型
|
if not can_excute:
|
continue
|
|
# 请求卖出锁
|
TradeRuleManager().require_sell_lock(r.id_)
|
try:
|
if r.id_ in excuted_rule_ids:
|
continue
|
excuted_rule_ids.add(r.id_)
|
# 获取最新的执行状况
|
r = TradeRuleManager().get_by_id(r.id_)
|
if r.excuted:
|
continue
|
# 提交卖
|
limit_down_price = gpcode_manager.get_limit_down_price(code)
|
limit_up_price = gpcode_manager.get_limit_up_price(code)
|
huaxin_sell_util.start_sell(code, r.sell_volume, r.sell_price_type, limit_up_price,
|
limit_down_price,
|
buy1_price)
|
TradeRuleManager().excuted(r.id_)
|
except Exception as e:
|
logger_debug.exception(e)
|
finally:
|
TradeRuleManager().release_sell_lock(r.id_)
|
|
# 获取L1现价
|
@classmethod
|
def get_l1_current_price(cls, code):
|
return cls.__current_price_dict.get(code)
|
|
# 保存现价
|
@classmethod
|
def __save_l1_current_price(cls, datas):
|
for d in datas:
|
code = d[0]
|
# 格式 (代码,现价,涨幅,量,更新时间,买1价格,买1量)
|
price = d[1]
|
cls.__current_price_dict[code] = price
|
cls.current_buy1_dict[code] = (d[5], d[6])
|
|
|
# 读取L1的数据
|
def __recv_pipe_l1_trade(queue_l1_trade_w_strategy_r: multiprocessing.Queue):
|
logger_system.info(f"trade_server __recv_pipe_l1_trade 线程ID:{tool.get_thread_id()}")
|
if queue_l1_trade_w_strategy_r is not None:
|
while True:
|
try:
|
val = queue_l1_trade_w_strategy_r.get()
|
if val:
|
async_log_util.info(logger_local_huaxin_l1_trade_info, f"客户端接收:{val}")
|
val = json.loads(val)
|
print("收到来自L1的数据:", val["type"])
|
# 处理数据
|
type_ = val["type"]
|
if type_ == "upload_l1_trade_datas":
|
# 处理专为交易提供的L1数据
|
L1DataProcessor.set_l1_trade_codes_info(val)
|
async_log_util.info(logger_local_huaxin_l1_trade_info, f"{val}")
|
except Exception as e:
|
logger_local_huaxin_l1_trade_info.exception(e)
|
logging.exception(e)
|
|
|
# 做一些初始化的操作
|
def __init():
|
def run_pending():
|
while True:
|
schedule.run_pending()
|
time.sleep(1)
|
# 9点半后终止运行
|
if tool.trade_time_sub(tool.get_now_time_str(), "09:30:00") > 0:
|
break
|
|
# 请求持仓
|
def request_position():
|
for i in range(3):
|
time.sleep(10)
|
huaxin_trade_data_update.add_position_list()
|
|
# 定时持仓刷新
|
schedule.every().day.at("09:00:00").do(huaxin_trade_data_update.add_position_list)
|
schedule.every().day.at("09:10:00").do(huaxin_trade_data_update.add_position_list)
|
schedule.every().day.at("09:15:00").do(huaxin_trade_data_update.add_position_list)
|
threading.Thread(target=run_pending, daemon=True).start()
|
threading.Thread(target=request_position, daemon=True).start()
|
|
|
def run(queue_l1_trade_w_strategy_r, queue_l1_trade_r_strategy_w, queue_strategy_r_trade_w, queue_strategy_w_trade_r):
|
try:
|
# 初始化
|
__init()
|
|
# 监听l1交易那边传过来的代码
|
t1 = threading.Thread(target=lambda: __recv_pipe_l1_trade(queue_l1_trade_w_strategy_r), daemon=True)
|
t1.start()
|
|
threading.Thread(target=lambda: async_log_util.run_sync(), daemon=True).start()
|
|
huaxin_trade_data_update.run(queue_l1_trade_r_strategy_w)
|
|
huaxin_trade_api.run_pipe_trade(queue_strategy_r_trade_w, queue_strategy_w_trade_r)
|
|
manager = outside_api_command_manager.ApiCommandManager()
|
manager.init(middle_api_protocol.SERVER_HOST,
|
middle_api_protocol.SERVER_PORT,
|
OutsideApiCommandCallback())
|
manager.run(blocking=True)
|
except Exception as e:
|
logger_system.exception(e)
|