"""
|
交易API
|
"""
|
import json
|
import logging
|
import multiprocessing
|
import queue
|
import random
|
import threading
|
import time
|
import concurrent.futures
|
|
from code_atrribute import history_k_data_util
|
from l2.l2_data_manager import L2DataProcessor
|
from log_module import async_log_util
|
from log_module.log import hx_logger_trade_debug, hx_logger_trade_loop, hx_logger_trade_callback, \
|
logger_system
|
from trade import huaxin_trade_data_update
|
from trade.huaxin_trade_order_processor import CancelOrderManager, HuaxinOrderEntity, TradeResultProcessor
|
from trade.huaxin_trade_record_manager import TradeOrderIdManager, DelegateSellOrderManager
|
|
from utils import socket_util, tool, huaxin_util, cb_data_util
|
|
__response_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=15)
|
__save_data_queue = queue.Queue()
|
|
|
def __run_recv_queue_trade(queue: multiprocessing.Queue):
|
def __cancel_order(code, order_ref):
|
# 2s/3s没成交就撤单
|
if tool.is_sh_code(code):
|
time.sleep(3)
|
else:
|
time.sleep(2)
|
order_: HuaxinOrderEntity = TradeResultProcessor.get_huaxin_order_by_order_ref(order_ref)
|
if order_ is not None:
|
if huaxin_util.is_can_cancel(order_.orderStatus):
|
cancel_order(2, code, order_.orderSysID)
|
|
def __sell_cb_code(code, volume):
|
# 获取现价
|
current_price = None
|
deal_price_info = L2DataProcessor.get_deal_price(code)
|
if deal_price_info:
|
current_price = deal_price_info[0]
|
if not current_price:
|
results = history_k_data_util.HistoryKDatasUtils.get_gp_current_info([code])
|
if results:
|
current_price = results[0]["price"]
|
if current_price is None:
|
raise Exception("获取到的现价为空")
|
price = round(tool.get_buy_min_price(current_price, True), 3)
|
results = order(2, code, volume, price)
|
async_log_util.info(hx_logger_trade_debug, f"自动卖出可转债结果:{results}")
|
|
def __re_sell(code, volume, price):
|
order(2, code, volume, price)
|
|
def __process_order(data):
|
code = data["securityID"]
|
accountID = data["accountID"]
|
orderStatus = data["orderStatus"]
|
orderRef = data["orderRef"]
|
orderSysID = data["orderSysID"]
|
insertTime = data.get("insertTime")
|
acceptTime = data.get("acceptTime")
|
insertDate = data.get("insertDate")
|
direction = data.get("direction")
|
limitPrice = data.get("limitPrice")
|
volume = data.get("volume")
|
sinfo = data.get("sinfo")
|
cancelTime = data.get("cancelTime")
|
|
orderEntity = HuaxinOrderEntity(code, orderStatus, orderRef, accountID, orderSysID,
|
insertTime=insertTime, acceptTime=acceptTime,
|
insertDate=insertDate, direction=direction)
|
try:
|
if str(orderEntity.direction) == str(huaxin_util.TORA_TSTP_D_Sell):
|
try:
|
if huaxin_util.is_can_cancel(orderEntity.orderStatus):
|
DelegateSellOrderManager.add_delegate_sell_order(
|
(orderEntity.orderSysID, orderEntity.code, orderEntity.insertTime, limitPrice))
|
elif huaxin_util.is_canceled(orderEntity.orderStatus):
|
# 9:30之前下的单需要重新下单
|
if cancelTime and int(cancelTime.replace(":", "")) >= int("093000"):
|
delegate_data = DelegateSellOrderManager.get_delegate_sell_order_by_order_sys_id(orderSysID)
|
if delegate_data and int(delegate_data[2].replace(":", "")) < int("093000"):
|
# TODO需要重新下单
|
deal_price = L2DataProcessor.get_deal_price(code)
|
if deal_price:
|
sell_price = tool.get_buy_min_price(deal_price)
|
__re_sell(code, volume, sell_price)
|
|
DelegateSellOrderManager.cancel_delegate_sell_order(orderEntity.orderSysID)
|
elif orderEntity.orderStatus == huaxin_util.TORA_TSTP_OST_AllTraded:
|
DelegateSellOrderManager.deal_delegate_sell_order(orderEntity.orderSysID)
|
except Exception as e:
|
hx_logger_trade_debug.exception(e)
|
# 刷新持仓列表
|
huaxin_trade_data_update.add_position_list()
|
if huaxin_util.is_deal(orderEntity.orderStatus):
|
# 如果成交了需要刷新委托列表
|
huaxin_trade_data_update.add_delegate_list("卖成交")
|
else:
|
# 买入
|
if tool.is_cb_code(code):
|
async_log_util.info(hx_logger_trade_debug,
|
f"可转债买入:代码-{code} sinfo-{sinfo} 订单状态-{orderStatus} 需要卖的sinfo-{cb_data_util.need_sell_sinfos}")
|
if sinfo in cb_data_util.need_sell_sinfos and str(
|
orderStatus) == huaxin_util.TORA_TSTP_OST_AllTraded:
|
# 可转债买入
|
# 卖出可转债
|
__sell_cb_code(code, volume)
|
# 买入需要刷新持仓列表
|
huaxin_trade_data_update.add_position_list()
|
# TradeResultProcessor.process_buy_order(order)
|
need_watch_cancel = TradeResultProcessor.process_sell_order(orderEntity)
|
if need_watch_cancel:
|
threading.Thread(target=lambda: __cancel_order(orderEntity.code, orderEntity.orderRef),
|
daemon=True).start()
|
finally:
|
try:
|
# 加入2次,增大加入成功率
|
__save_data_queue.put_nowait(data)
|
except Exception as e:
|
hx_logger_trade_debug.exception(e)
|
|
# 设置结果
|
def __set_response(data_json):
|
if 'request_id' not in data_json:
|
return
|
# 处理数据
|
async_log_util.info(hx_logger_trade_callback, f"response:request_id-{data_json['request_id']}")
|
# 设置响应内容
|
set_response(data_json["request_id"], data_json['data'])
|
|
logger_system.info(f"huaxin_trade_api __run_recv_pipe_trade 线程ID:{tool.get_thread_id()}")
|
|
if queue is not None:
|
while True:
|
try:
|
val = queue.get()
|
if val:
|
data_json = json.loads(val)
|
# 处理数据
|
type_ = data_json["type"]
|
if type_ == "response":
|
# 主动触发的响应
|
request_id = data_json['request_id']
|
async_log_util.info(hx_logger_trade_callback,
|
f"response:request_id-{request_id}")
|
__response_thread_pool.submit(__set_response, data_json)
|
if type(data_json.get("data")) == dict:
|
data = data_json["data"].get("data")
|
if type(data) == dict and "orderRef" in data:
|
__response_thread_pool.submit(__process_order, data)
|
elif type_ == "trade_callback":
|
try:
|
# 交易回调
|
data_json = data_json["data"]
|
ctype = data_json["type"]
|
# 记录交易反馈日志
|
async_log_util.info(hx_logger_trade_callback, f"{data_json}")
|
if ctype == 0:
|
data = data_json.get("data")
|
# 获取订单状态
|
__process_order(data)
|
finally:
|
pass
|
except:
|
pass
|
|
|
# 设置交易通信队列
|
# 暂时不会使用该方法
|
def run_pipe_trade(queue_strategy_r_trade_w_, queue_strategy_w_trade_r_):
|
global queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_read, queue_l1_trade_r_strategy_w
|
queue_strategy_w_trade_r = queue_strategy_w_trade_r_
|
|
t1 = threading.Thread(target=lambda: __run_recv_queue_trade(queue_strategy_r_trade_w_), daemon=True)
|
t1.start()
|
t1 = threading.Thread(target=lambda: CancelOrderManager().run(cancel_order), daemon=True)
|
t1.start()
|
|
|
# 交易通道的错误次数
|
trade_pipe_channel_error_count = 0
|
|
|
# pipe的交易通道是否正常
|
def is_pipe_channel_normal():
|
return True
|
|
|
# 测试交易通道
|
def test_trade_channel():
|
global trade_pipe_channel_error_count
|
sid = random.randint(0, 1000000)
|
result = __test_trade_channel(sid)
|
if result["code"] == 0 and result["data"]["data"]["sid"] == sid:
|
trade_pipe_channel_error_count = 0
|
return True
|
trade_pipe_channel_error_count += 1
|
if trade_pipe_channel_error_count > 100:
|
trade_pipe_channel_error_count = 100
|
return False
|
|
|
class ClientSocketManager:
|
# 客户端类型
|
CLIENT_TYPE_TRADE = "trade"
|
CLIENT_TYPE_DELEGATE_LIST = "delegate_list"
|
CLIENT_TYPE_DEAL_LIST = "deal_list"
|
CLIENT_TYPE_POSITION_LIST = "position_list"
|
CLIENT_TYPE_MONEY = "money"
|
CLIENT_TYPE_DEAL = "deal"
|
CLIENT_TYPE_CMD_L2 = "l2_cmd"
|
socket_client_dict = {}
|
socket_client_lock_dict = {}
|
active_client_dict = {}
|
|
@classmethod
|
def list_client(cls, _type):
|
if _type == cls.CLIENT_TYPE_TRADE:
|
if _type in cls.socket_client_dict:
|
return cls.socket_client_dict.get(_type)
|
else:
|
if _type in cls.socket_client_dict:
|
return [cls.socket_client_dict.get(_type)]
|
return []
|
|
@classmethod
|
def add_client(cls, _type, rid, sk):
|
if _type == cls.CLIENT_TYPE_TRADE:
|
# 交易列表
|
if _type not in cls.socket_client_dict:
|
cls.socket_client_dict[_type] = []
|
cls.socket_client_dict[_type].append((rid, sk))
|
cls.socket_client_lock_dict[rid] = threading.Lock()
|
hx_logger_trade_debug.info(f"add_client:{rid}")
|
else:
|
cls.socket_client_dict[_type] = (rid, sk)
|
cls.socket_client_lock_dict[rid] = threading.Lock()
|
|
# 是否已经被锁住
|
@classmethod
|
def is_client_locked(cls, rid):
|
if rid in cls.socket_client_lock_dict:
|
return cls.socket_client_lock_dict[rid].locked()
|
return None
|
|
@classmethod
|
def acquire_client(cls, _type):
|
if _type == cls.CLIENT_TYPE_TRADE:
|
if _type in cls.socket_client_dict:
|
# 根据排序活跃时间排序
|
client_list = sorted(cls.socket_client_dict[_type], key=lambda x: cls.active_client_dict.get(x[0]) if x[
|
0] in cls.active_client_dict else 0,
|
reverse=True)
|
hx_logger_trade_debug.info(f"acquire_client client_list数量:{len(client_list)}")
|
hx_logger_trade_debug.info(
|
f"acquire_client socket_client_lock_dict数量:{len(cls.socket_client_lock_dict.keys())}")
|
for d in client_list:
|
if d[0] in cls.socket_client_lock_dict:
|
try:
|
if cls.socket_client_lock_dict[d[0]].acquire(blocking=False):
|
hx_logger_trade_debug.info(f"acquire_client success:{d[0]}")
|
return d
|
except threading.TimeoutError:
|
hx_logger_trade_debug.error("acquire_client TimeoutError")
|
else:
|
if _type in cls.socket_client_dict:
|
try:
|
d = cls.socket_client_dict[_type]
|
if d[0] in cls.socket_client_lock_dict:
|
if cls.socket_client_lock_dict[d[0]].acquire(blocking=False):
|
return d
|
except threading.TimeoutError:
|
pass
|
return None
|
|
@classmethod
|
def release_client(cls, client_id):
|
sucess = False
|
if client_id in cls.socket_client_lock_dict:
|
sucess = True
|
# 释放锁
|
if cls.socket_client_lock_dict[client_id].locked():
|
cls.socket_client_lock_dict[client_id].release()
|
if sucess:
|
hx_logger_trade_debug.info(f"release_client success:{client_id}")
|
else:
|
hx_logger_trade_debug.info(f"release_client fail:{client_id}")
|
|
@classmethod
|
def del_client(cls, rid):
|
# 删除线程锁
|
if rid in cls.socket_client_lock_dict:
|
cls.socket_client_lock_dict.pop(rid)
|
# 删除sk
|
for t in cls.socket_client_dict:
|
if type(cls.socket_client_dict[t]) == list:
|
for d in cls.socket_client_dict[t]:
|
if d[0] == rid:
|
try:
|
# 关闭socket
|
d[1].close()
|
except:
|
pass
|
cls.socket_client_dict[t].remove(d)
|
break
|
|
elif type(cls.socket_client_dict[t]) == tuple:
|
if cls.socket_client_dict[t][0] == rid:
|
try:
|
# 关闭socket
|
cls.socket_client_dict[t][1].close()
|
except:
|
pass
|
cls.socket_client_dict.pop(t)
|
break
|
|
# 心跳信息
|
@classmethod
|
def heart(cls, rid):
|
cls.active_client_dict[rid] = time.time()
|
|
@classmethod
|
def del_invalid_clients(cls):
|
# 清除长时间无心跳的客户端通道
|
for k in cls.active_client_dict.keys():
|
if time.time() - cls.active_client_dict[k] > 20:
|
# 心跳时间间隔20s以上视为无效
|
cls.del_client(k)
|
|
|
TRADE_DIRECTION_BUY = 1
|
TRADE_DIRECTION_SELL = 2
|
|
# 超时时间2s
|
TIMEOUT = 2.0
|
|
# 等待响应的request_id
|
__request_response_dict = {}
|
|
|
def __get_request_id(type):
|
return f"r_{type}_{round(time.time() * 10000)}_{random.randint(0, 100000)}"
|
|
|
# 网络请求
|
def __request(_type, data, request_id=None, blocking=False, is_pipe=True, log_enable=True, is_trade=False):
|
if not request_id:
|
request_id = __get_request_id(_type)
|
try:
|
if log_enable:
|
async_log_util.info(hx_logger_trade_loop, "请求发送开始:client_id-{} request_id-{} is_pipe-{}", 0, request_id,
|
is_pipe)
|
root_data = {"type": _type,
|
"data": data,
|
"request_id": request_id}
|
root_data = socket_util.encryp_client_params_sign(root_data)
|
start_time = time.time()
|
if is_trade:
|
queue_strategy_w_trade_r.put_nowait(root_data)
|
|
use_time = int((time.time() - start_time) * 1000)
|
if use_time > 10:
|
async_log_util.info(hx_logger_trade_loop, f"发送耗时:request_id-{request_id} 耗时时间:{use_time}")
|
if log_enable:
|
async_log_util.info(hx_logger_trade_loop, "请求发送成功:request_id-{}", request_id)
|
except BrokenPipeError as e:
|
async_log_util.info(hx_logger_trade_loop, "请求发送异常:request_id-{} error-{}", request_id, str(e))
|
raise e
|
except Exception as e:
|
async_log_util.info(hx_logger_trade_loop, "请求发送异常: request_id-{} error-{}", request_id, str(e))
|
logging.exception(e)
|
raise e
|
return request_id
|
|
|
def __read_response(request_id, blocking, timeout=TIMEOUT, log_enable=True):
|
if blocking:
|
start_time = time.time()
|
try:
|
while True:
|
time.sleep(0.005)
|
if request_id in __request_response_dict:
|
# 获取到了响应内容
|
result = __request_response_dict.pop(request_id)
|
if log_enable:
|
async_log_util.info(hx_logger_trade_loop, "请求读取成功: request_id-{}", request_id)
|
return result
|
if time.time() - start_time > timeout:
|
if log_enable:
|
async_log_util.info(hx_logger_trade_loop, "请求读取超时: request_id-{}", request_id)
|
# 读取内容超时才会释放
|
raise Exception(f"读取内容超时: request_id={request_id}")
|
finally:
|
pass
|
|
return None
|
|
|
__TradeOrderIdManager = TradeOrderIdManager()
|
|
|
def set_response(request_id, response):
|
if request_id:
|
async_log_util.info(hx_logger_trade_loop, f"请求响应: request_id-{request_id} 内容-{response}")
|
# 主动触发
|
__request_response_dict[request_id] = response
|
else:
|
# 被动触发
|
pass
|
|
|
# 下单委托
|
# direction 1-买 2-卖
|
# code:代码
|
# volume:交易量
|
# price:价格(如果是卖时不传价格就按照5挡价卖)
|
# blocking是否阻塞进程
|
def order(direction, code, volume, price, price_type=2, blocking=False, sinfo=None, request_id=None,
|
order_ref=None, shadow_price=None):
|
timestamp = round(time.time() * 1000)
|
if not sinfo:
|
sinfo = f"b_{code}_{timestamp}"
|
if not order_ref:
|
order_ref = huaxin_util.create_order_ref()
|
if not request_id:
|
request_id = __get_request_id(ClientSocketManager.CLIENT_TYPE_TRADE)
|
for i in range(1):
|
request_id = __request(ClientSocketManager.CLIENT_TYPE_TRADE,
|
{"type": ClientSocketManager.CLIENT_TYPE_TRADE, "trade_type": 1,
|
"direction": direction,
|
"code": code,
|
"order_ref": order_ref,
|
"volume": volume,
|
"price_type": price_type,
|
"price": price, "shadow_price": shadow_price, "sinfo": sinfo, "blocking": blocking},
|
request_id=request_id,
|
blocking=blocking,
|
is_pipe=is_pipe_channel_normal(), is_trade=True)
|
try:
|
if blocking:
|
return __read_response(request_id, blocking)
|
else:
|
return {"order_ref": order_ref}
|
finally:
|
# huaxin_trade_data_update.add_delegate_list("下单", delay=0.2)
|
huaxin_trade_data_update.add_money_list()
|
|
|
__canceling_order_dict = {}
|
|
__canceled_order_ids = set()
|
|
|
def cancel_order(direction, code, orderSysID, orderRef=None, blocking=False, sinfo=None, request_id=None,
|
recancel=False):
|
if tool.trade_time_sub(tool.get_now_time_str(), "14:57:00") >= 0:
|
# 集合竞价不撤单
|
return
|
|
# 防止重复撤单
|
if orderSysID:
|
if orderSysID in __canceled_order_ids:
|
return
|
__canceled_order_ids.add(orderSysID)
|
if not recancel:
|
CancelOrderManager().start_cancel(code, orderRef, orderSysID)
|
if not sinfo:
|
sinfo = f"cb_{code}_{round(time.time() * 1000)}_{random.randint(0, 10000)}"
|
order_action_ref = huaxin_util.create_order_ref()
|
if not request_id:
|
request_id = __get_request_id(ClientSocketManager.CLIENT_TYPE_TRADE)
|
# 加入撤单记录,用于校验最后的撤单是否成功
|
if code not in __canceling_order_dict:
|
__canceling_order_dict[code] = set()
|
__canceling_order_dict[code].add(json.dumps((orderRef, orderSysID)))
|
# 执行2次撤单,防止没有撤到
|
for i in range(2):
|
request_id = __request(ClientSocketManager.CLIENT_TYPE_TRADE,
|
{"type": ClientSocketManager.CLIENT_TYPE_TRADE, "trade_type": 2,
|
"direction": direction,
|
"code": code,
|
"orderRef": orderRef,
|
"orderActionRef": order_action_ref,
|
"orderSysID": orderSysID, "sinfo": sinfo}, request_id=request_id, blocking=blocking,
|
is_pipe=is_pipe_channel_normal(), is_trade=True)
|
try:
|
return __read_response(request_id, blocking)
|
finally:
|
huaxin_trade_data_update.add_delegate_list("撤单", delay=0.1)
|
huaxin_trade_data_update.add_money_list()
|
|
|
# CLIENT_TYPE_DELEGATE_LIST = "delegate_list"
|
# CLIENT_TYPE_DEAL_LIST = "deal_list"
|
# CLIENT_TYPE_POSITION_LIST = "position_list"
|
# CLIENT_TYPE_MONEY = "money"
|
# CLIENT_TYPE_DEAL = "deal"
|
|
# 获取委托列表
|
# can_cancel:是否可以撤
|
def get_delegate_list(can_cancel=True, blocking=True, timeout=TIMEOUT):
|
request_id = __request(ClientSocketManager.CLIENT_TYPE_DELEGATE_LIST,
|
{"type": ClientSocketManager.CLIENT_TYPE_DELEGATE_LIST,
|
"can_cancel": 1 if can_cancel else 0}, blocking=blocking, is_pipe=is_pipe_channel_normal(),
|
is_trade=True)
|
|
return __read_response(request_id, blocking, timeout=timeout)
|
|
|
# 获取成交列表
|
def get_deal_list(blocking=True, timeout=TIMEOUT):
|
request_id = __request(ClientSocketManager.CLIENT_TYPE_DEAL_LIST,
|
{"type": ClientSocketManager.CLIENT_TYPE_DEAL_LIST}, blocking=blocking,
|
is_pipe=is_pipe_channel_normal(), is_trade=True)
|
return __read_response(request_id, blocking, timeout=timeout)
|
|
|
# 获取持仓列表
|
def get_position_list(blocking=True):
|
request_id = __request(ClientSocketManager.CLIENT_TYPE_POSITION_LIST,
|
{"type": ClientSocketManager.CLIENT_TYPE_POSITION_LIST}, blocking=blocking,
|
is_pipe=is_pipe_channel_normal(), is_trade=True)
|
return __read_response(request_id, blocking)
|
|
|
# 获取账户资金状况
|
def get_money(blocking=True):
|
request_id = __request(ClientSocketManager.CLIENT_TYPE_MONEY,
|
{"type": ClientSocketManager.CLIENT_TYPE_MONEY}, blocking=blocking,
|
is_pipe=is_pipe_channel_normal(), is_trade=True)
|
return __read_response(request_id, blocking)
|
|
|
# 设置L2订阅数据
|
def set_l2_codes_data(codes_data, blocking=True):
|
request_id = __request(ClientSocketManager.CLIENT_TYPE_CMD_L2,
|
{"type": ClientSocketManager.CLIENT_TYPE_CMD_L2, "data": codes_data}, blocking=blocking,
|
is_pipe=is_pipe_channel_normal())
|
return __read_response(request_id, blocking)
|
|
|
# 设置L2订阅数据
|
def __test_trade_channel(sid):
|
request_id = __request("test",
|
{"type": "test", "data": {"sid": sid}}, blocking=True, log_enable=False)
|
return __read_response(request_id, True, log_enable=False)
|
|
|
def parseResponse(data_str):
|
if not data_str:
|
raise Exception("反馈内容为空")
|
res = data_str
|
if type(res) == str:
|
res = json.loads(data_str)
|
res = res['data']
|
if res['code'] != 0:
|
raise Exception(res['msg'])
|
return res['data']
|
|
|
if __name__ == "__main__":
|
pass
|