"""
|
交易API
|
"""
|
import json
|
import logging
|
import multiprocessing
|
import queue
|
import random
|
import threading
|
import time
|
|
import constant
|
from log_module import async_log_util
|
from log_module.log import hx_logger_trade_debug, hx_logger_trade_loop, hx_logger_trade_callback, logger_trade, \
|
logger_system
|
from trade import huaxin_trade_data_update, middle_api_protocol
|
from utils import socket_util, huaxin_util, tool
|
import concurrent.futures
|
|
|
class TradeCallback:
|
"""
|
交易回调
|
"""
|
|
def on_order(self, order_info):
|
pass
|
|
|
__response_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=15)
|
__save_data_queue = queue.Queue()
|
|
|
def __run_recv_queue_trade(queue: multiprocessing.Queue):
|
# 设置结果
|
def __set_response(data_json):
|
if 'request_id' not in data_json:
|
return
|
# 处理数据
|
async_log_util.info(hx_logger_trade_callback, f"response:request_id-{data_json['request_id']}")
|
# 设置响应内容
|
set_response(data_json["request_id"], data_json['data'])
|
|
logger_system.info(f"huaxin_trade_api __run_recv_pipe_trade 线程ID:{tool.get_thread_id()}")
|
|
if queue is not None:
|
while True:
|
try:
|
val = queue.get()
|
if val:
|
data_json = json.loads(val)
|
# 处理数据
|
type_ = data_json["type"]
|
if type_ == "response":
|
# 主动触发的响应
|
request_id = data_json['request_id']
|
async_log_util.info(hx_logger_trade_callback,
|
f"response:request_id-{request_id}")
|
__response_thread_pool.submit(__set_response, data_json)
|
elif type_ == "trade_callback":
|
try:
|
# 交易回调
|
data_json = data_json["data"]
|
ctype = data_json["type"]
|
# 记录交易反馈日志
|
async_log_util.info(hx_logger_trade_callback, f"{data_json}")
|
if ctype == 0:
|
# 订单回调
|
if trade_callback:
|
trade_callback.on_order(data_json.get("data"))
|
finally:
|
pass
|
except:
|
pass
|
|
|
def __test_order():
|
time.sleep(60)
|
for i in range(20):
|
time.sleep(30)
|
order_ref = huaxin_util.create_order_ref()
|
order(1, "000333", 100, 1.00, price_type=2, blocking=False, order_ref=order_ref, shadow_price=0.99)
|
time.sleep(30)
|
cancel_order(1, "000333", '123123', orderRef=order_ref, blocking=False)
|
|
|
# 设置交易通信队列
|
# 暂时不会使用该方法
|
def run_trade(queue_strategy_r_trade_w_, trade_callback_: TradeCallback, queue_strategy_w_trade_r_,
|
queue_strategy_w_trade_for_query_r_):
|
"""
|
:param queue_strategy_w_trade_for_query_r_: 策略写交易读(用于数据查询)
|
:param queue_strategy_w_trade_r_: 策略写交易读
|
:param trade_callback_: 订单回调
|
:param queue_strategy_r_trade_w_: 接收交易结果数据队列
|
:return:
|
"""
|
global queue_strategy_r_trade_w, queue_strategy_w_trade_r, queue_strategy_w_trade_for_query_r, trade_callback
|
queue_strategy_r_trade_w = queue_strategy_r_trade_w_
|
queue_strategy_w_trade_r = queue_strategy_w_trade_r_
|
queue_strategy_w_trade_for_query_r = queue_strategy_w_trade_for_query_r_
|
trade_callback = trade_callback_
|
# 读取交易结果
|
threading.Thread(target=lambda: __run_recv_queue_trade(queue_strategy_r_trade_w_), daemon=True).start()
|
|
|
def add_trade_callback_data(data_str: str):
|
"""
|
设置交易响应数据
|
:param data_str:
|
:return:
|
"""
|
queue_strategy_r_trade_w.put_nowait(data_str)
|
|
|
# 交易通道的错误次数
|
trade_pipe_channel_error_count = 0
|
|
|
# 测试交易通道
|
def test_trade_channel():
|
global trade_pipe_channel_error_count
|
sid = random.randint(0, 1000000)
|
result = __test_trade_channel(sid)
|
if result["code"] == 0 and result["data"]["data"]["sid"] == sid:
|
trade_pipe_channel_error_count = 0
|
return True
|
trade_pipe_channel_error_count += 1
|
if trade_pipe_channel_error_count > 100:
|
trade_pipe_channel_error_count = 100
|
return False
|
|
|
class ClientSocketManager:
|
# 客户端类型
|
CLIENT_TYPE_TRADE = "trade"
|
CLIENT_TYPE_DELEGATE_LIST = "delegate_list"
|
CLIENT_TYPE_DEAL_LIST = "deal_list"
|
CLIENT_TYPE_POSITION_LIST = "position_list"
|
CLIENT_TYPE_MONEY = "money"
|
CLIENT_TYPE_DEAL = "deal"
|
CLIENT_TYPE_CMD_L2 = "l2_cmd"
|
socket_client_dict = {}
|
socket_client_lock_dict = {}
|
active_client_dict = {}
|
|
@classmethod
|
def list_client(cls, _type):
|
if _type == cls.CLIENT_TYPE_TRADE:
|
if _type in cls.socket_client_dict:
|
return cls.socket_client_dict.get(_type)
|
else:
|
if _type in cls.socket_client_dict:
|
return [cls.socket_client_dict.get(_type)]
|
return []
|
|
@classmethod
|
def add_client(cls, _type, rid, sk):
|
if _type == cls.CLIENT_TYPE_TRADE:
|
# 交易列表
|
if _type not in cls.socket_client_dict:
|
cls.socket_client_dict[_type] = []
|
cls.socket_client_dict[_type].append((rid, sk))
|
cls.socket_client_lock_dict[rid] = threading.Lock()
|
hx_logger_trade_debug.info(f"add_client:{rid}")
|
else:
|
cls.socket_client_dict[_type] = (rid, sk)
|
cls.socket_client_lock_dict[rid] = threading.Lock()
|
|
# 是否已经被锁住
|
@classmethod
|
def is_client_locked(cls, rid):
|
if rid in cls.socket_client_lock_dict:
|
return cls.socket_client_lock_dict[rid].locked()
|
return None
|
|
@classmethod
|
def acquire_client(cls, _type):
|
if _type == cls.CLIENT_TYPE_TRADE:
|
if _type in cls.socket_client_dict:
|
# 根据排序活跃时间排序
|
client_list = sorted(cls.socket_client_dict[_type], key=lambda x: cls.active_client_dict.get(x[0]) if x[
|
0] in cls.active_client_dict else 0,
|
reverse=True)
|
hx_logger_trade_debug.info(f"acquire_client client_list数量:{len(client_list)}")
|
hx_logger_trade_debug.info(
|
f"acquire_client socket_client_lock_dict数量:{len(cls.socket_client_lock_dict.keys())}")
|
for d in client_list:
|
if d[0] in cls.socket_client_lock_dict:
|
try:
|
if cls.socket_client_lock_dict[d[0]].acquire(blocking=False):
|
hx_logger_trade_debug.info(f"acquire_client success:{d[0]}")
|
return d
|
except threading.TimeoutError:
|
hx_logger_trade_debug.error("acquire_client TimeoutError")
|
else:
|
if _type in cls.socket_client_dict:
|
try:
|
d = cls.socket_client_dict[_type]
|
if d[0] in cls.socket_client_lock_dict:
|
if cls.socket_client_lock_dict[d[0]].acquire(blocking=False):
|
return d
|
except threading.TimeoutError:
|
pass
|
return None
|
|
@classmethod
|
def release_client(cls, client_id):
|
sucess = False
|
if client_id in cls.socket_client_lock_dict:
|
sucess = True
|
# 释放锁
|
if cls.socket_client_lock_dict[client_id].locked():
|
cls.socket_client_lock_dict[client_id].release()
|
if sucess:
|
hx_logger_trade_debug.info(f"release_client success:{client_id}")
|
else:
|
hx_logger_trade_debug.info(f"release_client fail:{client_id}")
|
|
@classmethod
|
def del_client(cls, rid):
|
# 删除线程锁
|
if rid in cls.socket_client_lock_dict:
|
cls.socket_client_lock_dict.pop(rid)
|
# 删除sk
|
for t in cls.socket_client_dict:
|
if type(cls.socket_client_dict[t]) == list:
|
for d in cls.socket_client_dict[t]:
|
if d[0] == rid:
|
try:
|
# 关闭socket
|
d[1].close()
|
except:
|
pass
|
cls.socket_client_dict[t].remove(d)
|
break
|
|
elif type(cls.socket_client_dict[t]) == tuple:
|
if cls.socket_client_dict[t][0] == rid:
|
try:
|
# 关闭socket
|
cls.socket_client_dict[t][1].close()
|
except:
|
pass
|
cls.socket_client_dict.pop(t)
|
break
|
|
# 心跳信息
|
@classmethod
|
def heart(cls, rid):
|
cls.active_client_dict[rid] = time.time()
|
|
@classmethod
|
def del_invalid_clients(cls):
|
# 清除长时间无心跳的客户端通道
|
for k in cls.active_client_dict.keys():
|
if time.time() - cls.active_client_dict[k] > 20:
|
# 心跳时间间隔20s以上视为无效
|
cls.del_client(k)
|
|
|
TRADE_DIRECTION_BUY = 1
|
TRADE_DIRECTION_SELL = 2
|
|
# 超时时间2s
|
TIMEOUT = 2.0
|
# 交易代理
|
TRADE_DELEGATED = True
|
|
# 等待响应的request_id
|
__request_response_dict = {}
|
|
|
def __get_request_id(type):
|
return f"r_{type}_{round(time.time() * 10000)}_{random.randint(0, 100000)}"
|
|
|
def __request_delegate(request_id, type, data):
|
"""
|
请求仿真交易
|
:param request_id:
|
:param type:
|
:param data:
|
:return:
|
"""
|
fdata = middle_api_protocol.load_simulation_trade(type, data)
|
try:
|
result = middle_api_protocol.request(fdata, port=10020)
|
set_response(request_id, result)
|
except Exception as e:
|
pass
|
|
|
# 网络请求
|
def __request(_type, data, request_id=None, log_enable=True, is_trade=False):
|
"""
|
请求,将交易(包含下单/撤单)与查询(包含查持仓/账户可用金额/委托列表/成交列表)队列分离
|
@param _type:
|
@param data:
|
@param request_id:
|
@param log_enable:
|
@param is_trade:
|
@return:
|
"""
|
if not request_id:
|
request_id = __get_request_id(_type)
|
try:
|
if log_enable:
|
async_log_util.info(hx_logger_trade_loop, "请求发送开始:client_id-{} request_id-{}", 0, request_id)
|
root_data = {"type": _type,
|
"data": data,
|
"request_id": request_id,
|
"time": time.time()
|
}
|
root_data = socket_util.encryp_client_params_sign(root_data)
|
start_time = time.time()
|
if constant.IS_SIMULATED_TRADE:
|
# =========模拟盘交易代理请求==========
|
threading.Thread(target=__request_delegate, args=(request_id, _type, data,), daemon=True).start()
|
else:
|
# ===========真实盘交易===============
|
if is_trade:
|
queue_strategy_w_trade_r.put_nowait(root_data)
|
else:
|
queue_strategy_w_trade_for_query_r.put_nowait(root_data)
|
|
use_time = int((time.time() - start_time) * 1000)
|
if use_time > 10:
|
async_log_util.info(hx_logger_trade_loop, f"发送耗时:request_id-{request_id} 耗时时间:{use_time}")
|
if log_enable:
|
async_log_util.info(hx_logger_trade_loop, "请求发送成功:request_id-{}", request_id)
|
except BrokenPipeError as e:
|
async_log_util.info(hx_logger_trade_loop, "请求发送异常:request_id-{} error-{}", request_id, str(e))
|
raise e
|
except Exception as e:
|
async_log_util.info(hx_logger_trade_loop, "请求发送异常: request_id-{} error-{}", request_id, str(e))
|
logging.exception(e)
|
raise e
|
return request_id
|
|
|
def __read_response(request_id, blocking, timeout=TIMEOUT, log_enable=True):
|
if blocking:
|
start_time = time.time()
|
try:
|
while True:
|
time.sleep(0.005)
|
if request_id in __request_response_dict:
|
# 获取到了响应内容
|
result = __request_response_dict.pop(request_id)
|
if log_enable:
|
async_log_util.info(hx_logger_trade_loop, "请求读取成功: request_id-{}", request_id)
|
return result
|
if time.time() - start_time > timeout:
|
if log_enable:
|
async_log_util.info(hx_logger_trade_loop, "请求读取超时: request_id-{}", request_id)
|
# 读取内容超时才会释放
|
raise Exception(f"读取内容超时: request_id={request_id}")
|
finally:
|
pass
|
|
return None
|
|
|
def set_response(request_id, response):
|
if request_id:
|
async_log_util.info(hx_logger_trade_loop, f"请求响应: request_id-{request_id} 内容-{response}")
|
# 主动触发
|
__request_response_dict[request_id] = response
|
else:
|
# 被动触发
|
pass
|
|
|
def order(direction, code, volume, price, price_type=2, blocking=False, sinfo=None, request_id=None,
|
order_ref=None, shadow_price=None):
|
"""
|
下单委托
|
@param direction: 1-买 2-卖
|
@param code:
|
@param volume:交易量
|
@param price:价格(如果是卖时不传价格就按照5挡价卖)
|
@param price_type:
|
@param blocking:是否阻塞进程
|
@param sinfo:
|
@param request_id:
|
@param order_ref:
|
@param shadow_price:
|
@return:
|
"""
|
timestamp = round(time.time() * 1000)
|
if not sinfo:
|
sinfo = f"b_{code}_{timestamp}"
|
if not order_ref:
|
order_ref = huaxin_util.create_order_ref()
|
if not request_id:
|
request_id = __get_request_id(ClientSocketManager.CLIENT_TYPE_TRADE)
|
for i in range(1):
|
request_id = __request(ClientSocketManager.CLIENT_TYPE_TRADE,
|
{"type": ClientSocketManager.CLIENT_TYPE_TRADE, "trade_type": 1,
|
"direction": direction,
|
"code": code,
|
"order_ref": order_ref,
|
"volume": volume,
|
"price_type": price_type,
|
"price": price, "shadow_price": shadow_price, "sinfo": sinfo, "blocking": blocking},
|
request_id=request_id,
|
is_trade=True)
|
try:
|
if blocking:
|
res = __read_response(request_id, blocking)
|
else:
|
res = {"order_ref": order_ref}
|
async_log_util.info(hx_logger_trade_debug, f"下单结果:{res}")
|
return res
|
finally:
|
# huaxin_trade_data_update.add_delegate_list("下单", delay=0.2)
|
huaxin_trade_data_update.add_money_list()
|
|
|
__canceling_order_dict = {}
|
|
|
def cancel_order(direction, code, orderSysID, orderRef=None, blocking=False, sinfo=None, request_id=None,
|
recancel=False):
|
"""
|
撤单
|
:param direction: 1-买入 2-卖出
|
:param code:
|
:param orderSysID:
|
:param orderRef:
|
:param blocking:
|
:param sinfo:
|
:param request_id:
|
:param recancel:
|
:return:
|
"""
|
if tool.trade_time_sub(tool.get_now_time_str(), "14:57:00") >= 0 and tool.trade_time_sub(tool.get_now_time_str(),
|
"15:00:01") <= 0:
|
# 集合竞价不撤单
|
return
|
|
if not sinfo:
|
sinfo = f"cb_{code}_{round(time.time() * 1000)}_{random.randint(0, 10000)}"
|
order_action_ref = huaxin_util.create_order_ref()
|
if not request_id:
|
request_id = __get_request_id(ClientSocketManager.CLIENT_TYPE_TRADE)
|
# 加入撤单记录,用于校验最后的撤单是否成功
|
if code not in __canceling_order_dict:
|
__canceling_order_dict[code] = set()
|
__canceling_order_dict[code].add(json.dumps((orderRef, orderSysID)))
|
# 执行2次撤单,防止没有撤到
|
for i in range(2):
|
request_id = __request(ClientSocketManager.CLIENT_TYPE_TRADE,
|
{"type": ClientSocketManager.CLIENT_TYPE_TRADE, "trade_type": 2,
|
"direction": direction,
|
"code": code,
|
"orderRef": orderRef,
|
"orderActionRef": order_action_ref,
|
"orderSysID": orderSysID, "sinfo": sinfo}, request_id=request_id, is_trade=True)
|
try:
|
res = __read_response(request_id, blocking)
|
async_log_util.info(hx_logger_trade_debug, f"撤单结果:{res}")
|
return res
|
finally:
|
# huaxin_trade_data_update.add_delegate_list("撤单")
|
huaxin_trade_data_update.add_money_list()
|
|
|
# CLIENT_TYPE_DELEGATE_LIST = "delegate_list"
|
# CLIENT_TYPE_DEAL_LIST = "deal_list"
|
# CLIENT_TYPE_POSITION_LIST = "position_list"
|
# CLIENT_TYPE_MONEY = "money"
|
# CLIENT_TYPE_DEAL = "deal"
|
|
# 获取委托列表
|
# can_cancel:是否可以撤
|
def get_delegate_list(can_cancel=True, blocking=True, timeout=TIMEOUT):
|
"""
|
委托列表
|
:param can_cancel:
|
:param blocking:
|
:param timeout:
|
:return: 例如:{'code': 0, 'data': [{'securityID': '002400', 'orderLocalID': 'P900046788', 'direction': '0', 'orderSysID': '12002P900046788', 'insertTime': '13:11:40', 'insertDate': '20250109', 'acceptTime': '13:11:22', 'cancelTime': '', 'limitPrice': 8.23, 'accountID': '00044396', 'turnover': 0.0, 'orderRef': 131161, 'volume': 1000, 'volumeTraded': 0, 'orderStatus': '2', 'orderSubmitStatus': '1', 'statusMsg': ''}]}
|
"""
|
request_id = __request(ClientSocketManager.CLIENT_TYPE_DELEGATE_LIST,
|
{"type": ClientSocketManager.CLIENT_TYPE_DELEGATE_LIST,
|
"can_cancel": 1 if can_cancel else 0})
|
|
res = __read_response(request_id, blocking, timeout=timeout)
|
|
async_log_util.info(hx_logger_trade_debug, f"获取委托列表:{res}")
|
|
return res
|
|
|
# 获取成交列表
|
def get_deal_list(blocking=True, timeout=TIMEOUT):
|
"""
|
获取成交列表
|
:param blocking:
|
:param timeout:
|
:return: {'code': 0, 'data': [{'tradeID': '17211275', 'securityID': '002184', 'orderLocalID': 'P900046786', 'direction': '0', 'orderSysID': '12002P900046786', 'price': 20.31, 'tradeTime': '13:11:40', 'volume': 1000, 'tradeDate': '20250109', 'tradingDay': '20250109', 'pbuID': '232600', 'accountID': '00044396', 'orderRef': 131146}]}
|
"""
|
request_id = __request(ClientSocketManager.CLIENT_TYPE_DEAL_LIST,
|
{"type": ClientSocketManager.CLIENT_TYPE_DEAL_LIST})
|
res = __read_response(request_id, blocking, timeout=timeout)
|
async_log_util.info(hx_logger_trade_debug, f"获取成交列表:{res}")
|
return res
|
|
|
# 获取华鑫持仓列表 后续即可实现仓位管理
|
# 获取持仓列表
|
def get_position_list(blocking=True):
|
"""
|
获取持仓列表
|
:param blocking:
|
:return: {'code': 0, 'data': [{'investorID': '00044396', 'tradingDay': '20250109', 'securityName': '海得控制', 'securityID': '002184', 'historyPos': 0, 'historyPosFrozen': 0, 'todayBSPos': 1000, 'todayBSPosFrozen': 0, 'historyPosPrice': 0.0, 'totalPosCost': 20316.176271, 'prePosition': 0, 'availablePosition': 0, 'currentPosition': 1000, 'openPosCost': 20310.0, 'todayCommission': 6.176271, 'todayTotalBuyAmount': 20310.0, 'todayTotalSellAmount': 0.0}, {'investorID': '00044396', 'tradingDay': '20250109', 'securityName': '省广集团', 'securityID': '002400', 'historyPos': 0, 'historyPosFrozen': 0, 'todayBSPos': 0, 'todayBSPosFrozen': 0, 'historyPosPrice': 0.0, 'totalPosCost': 0.0, 'prePosition': 0, 'availablePosition': 0, 'currentPosition': 0, 'openPosCost': 0.0, 'todayCommission': 0.0, 'todayTotalBuyAmount': 8230.0, 'todayTotalSellAmount': 0.0}]}
|
"""
|
request_id = __request(ClientSocketManager.CLIENT_TYPE_POSITION_LIST,
|
{"type": ClientSocketManager.CLIENT_TYPE_POSITION_LIST})
|
res = __read_response(request_id, blocking)
|
async_log_util.info(hx_logger_trade_debug, f"获取持仓结果:{res}")
|
return res
|
|
|
# 获取华鑫账户资金 后续即可实现账户管理
|
# 获取账户资金状况
|
def get_money(blocking=True):
|
"""
|
获取账户资金状况
|
:param blocking:
|
:return: 示例: {'code': 0, 'data': [{'departmentID': '0001', 'investorID': '00032047', 'accountID': '00032047', 'currencyID': '1', 'usefulMoney': 39305420.68, 'frozenCash': 0.0, 'fetchLimit': 39305420.68, 'preDeposit': 39305420.68}]}
|
"""
|
request_id = __request(ClientSocketManager.CLIENT_TYPE_MONEY,
|
{"type": ClientSocketManager.CLIENT_TYPE_MONEY})
|
res = __read_response(request_id, blocking)
|
async_log_util.info(hx_logger_trade_debug, f"获取账户结果:{res}")
|
return res
|
|
|
# money = get_money()
|
# print(f"money=={money}")
|
|
|
# 设置L2订阅数据
|
def __test_trade_channel(sid):
|
request_id = __request("test",
|
{"type": "test", "data": {"sid": sid}}, log_enable=False)
|
return __read_response(request_id, True, log_enable=False)
|
|
|
if __name__ == "__main__":
|
print(get_position_list())
|