"""
|
交易API
|
"""
|
import copy
|
import json
|
import logging
|
import multiprocessing
|
import queue
|
import random
|
import threading
|
import time
|
import concurrent.futures
|
|
import zmq
|
|
from code_attribute import gpcode_manager
|
from huaxin_client import constant as huaxin_client_constant, trade_client
|
from log_module import async_log_util
|
from log_module.log import hx_logger_trade_debug, hx_logger_trade_loop, hx_logger_trade_callback, \
|
logger_system
|
from trade.huaxin import huaxin_trade_data_update, huaxin_trade_record_manager
|
from trade.huaxin.huaxin_trade_record_manager import TradeOrderIdManager
|
from trade.huaxin.huaxin_trade_order_processor import CancelOrderManager, HuaxinOrderEntity, TradeResultProcessor
|
from utils import socket_util, huaxin_util, tool
|
|
__response_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=15)
|
__save_data_queue = queue.Queue(maxsize=1000)
|
|
|
def __run_save_data():
|
while True:
|
try:
|
data = __save_data_queue.get()
|
huaxin_trade_record_manager.DelegateRecordManager.add_one(data)
|
except:
|
pass
|
finally:
|
time.sleep(0.1)
|
|
|
def __run_recv_queue_trade(queue: multiprocessing.Queue):
|
def __cancel_order(code, order_ref):
|
# 2s没成交就撤单
|
time.sleep(2)
|
order_: HuaxinOrderEntity = TradeResultProcessor.get_huaxin_order_by_order_ref(order_ref)
|
if order_ is not None:
|
if huaxin_util.is_can_cancel(order_.orderStatus):
|
cancel_order(TRADE_DIRECTION_SELL, code, order_.orderSysID)
|
|
def __process_order(data):
|
# 更新委托队列
|
push_msg_manager.push_delegate_queue_update()
|
code = data["securityID"]
|
accountID = data["accountID"]
|
orderStatus = data["orderStatus"]
|
orderRef = data["orderRef"]
|
orderSysID = data["orderSysID"]
|
insertTime = data.get("insertTime")
|
acceptTime = data.get("acceptTime")
|
insertDate = data.get("insertDate")
|
direction = data.get("direction")
|
limitPrice = data.get("limitPrice")
|
volume = data.get("volume")
|
is_shadow_order = False
|
# 获取涨停价
|
limit_up_price = gpcode_manager.get_limit_up_price(code)
|
if limit_up_price and volume == huaxin_client_constant.SHADOW_ORDER_VOLUME:
|
if abs(float(limitPrice) - float(limit_up_price)) >= 0.001:
|
is_shadow_order = True
|
|
order = HuaxinOrderEntity(code, orderStatus, orderRef, accountID, orderSysID,
|
insertTime=insertTime, acceptTime=acceptTime,
|
insertDate=insertDate, direction=direction,
|
is_shadow_order=is_shadow_order)
|
try:
|
if str(order.direction) == str(huaxin_util.TORA_TSTP_D_Sell):
|
# 刷新持仓列表
|
huaxin_trade_data_update.add_position_list()
|
if huaxin_util.is_deal(order.orderStatus):
|
# 如果成交了需要刷新委托列表
|
huaxin_trade_data_update.add_delegate_list("卖成交")
|
else:
|
if huaxin_util.is_deal(order.orderStatus):
|
# 如果成交了需要刷新委托列表
|
huaxin_trade_data_update.add_delegate_list("买成交")
|
need_cancel = TradeResultProcessor.process_buy_order(order)
|
# if need_cancel:
|
# # 需要撤买单
|
# threading.Thread(target=lambda: cancel_order(TRADE_DIRECTION_SELL, order.code, order.orderSysID),
|
# daemon=True).start()
|
need_watch_cancel = TradeResultProcessor.process_sell_order(order)
|
# if need_watch_cancel:
|
# # 需要撤卖单
|
# threading.Thread(target=lambda: __cancel_order(order.code, order.orderRef), daemon=True).start()
|
finally:
|
try:
|
# 加入2次,增大加入成功率
|
__save_data_queue.put_nowait(data)
|
except Exception as e:
|
hx_logger_trade_debug.exception(e)
|
|
if not is_shadow_order:
|
# 订单相关回调
|
# 重新请求委托列表与资金
|
# huaxin_trade_data_update.add_delegate_list("来自交易管道")
|
huaxin_trade_data_update.add_deal_list()
|
huaxin_trade_data_update.add_money_list()
|
|
# 设置结果
|
def __set_response(data_json):
|
if 'request_id' not in data_json:
|
return
|
# 处理数据
|
async_log_util.info(hx_logger_trade_callback, f"response:request_id-{data_json['request_id']}")
|
# 设置响应内容
|
set_response(data_json["request_id"], data_json['data'])
|
|
logger_system.info(f"huaxin_trade_api __run_recv_pipe_trade 线程ID:{tool.get_thread_id()}")
|
|
if queue is not None:
|
while True:
|
try:
|
val = queue.get()
|
if val:
|
data_json = json.loads(val)
|
# 处理数据
|
type_ = data_json["type"]
|
if type_ == "response":
|
# 主动触发的响应
|
request_id = data_json['request_id']
|
async_log_util.info(hx_logger_trade_callback,
|
f"response:request_id-{request_id}")
|
__response_thread_pool.submit(__set_response, data_json)
|
if type(data_json.get("data")) == dict:
|
data = data_json["data"].get("data")
|
if type(data) == dict and "orderRef" in data:
|
__response_thread_pool.submit(__process_order, data)
|
elif type_ == "trade_callback":
|
try:
|
# 交易回调
|
data_json = data_json["data"]
|
ctype = data_json["type"]
|
# 记录交易反馈日志
|
async_log_util.info(hx_logger_trade_callback, f"{data_json}")
|
if ctype == 0:
|
data = data_json.get("data")
|
# 获取订单状态
|
__process_order(data)
|
finally:
|
pass
|
except:
|
pass
|
|
|
def __create_trade_ipc_context(trade_ipc_addr):
|
"""
|
创建IPC发送端口
|
@param trade_ipc_addr:(下单地址,撤单地址)
|
@return:
|
"""
|
context = zmq.Context()
|
global order_socket, cancel_order_socket
|
order_socket = context.socket(zmq.REQ)
|
order_socket.connect(trade_ipc_addr[0])
|
cancel_order_socket = context.socket(zmq.REQ)
|
cancel_order_socket.connect(trade_ipc_addr[1])
|
# while True:
|
# try:
|
# # datas = ('000990', 7.52, 400, 93000030, 2012, 380422, 380421, 375477, '1') * 150
|
# # L2SharedMemoryDataUtil.set_data(datas, shared_memory)
|
# socket.send_json({'data': [], "time": time.time()})
|
# response = socket.recv_string()
|
# except Exception as e:
|
# logging.exception(e)
|
|
|
# 下单ZMQ通信锁
|
__order_zmq_lock = threading.Lock()
|
|
|
def __order_by_zmq(data_json):
|
"""
|
通过zmq发送下单信息
|
@param data_json:
|
@return:
|
"""
|
with __order_zmq_lock:
|
order_socket.send_json(data_json)
|
response = order_socket.recv_string()
|
|
|
def __cancel_order_by_zmq(data_json):
|
cancel_order_socket.send_json(data_json)
|
response = cancel_order_socket.recv_string()
|
|
|
def __test_order():
|
time.sleep(60)
|
for i in range(20):
|
time.sleep(30)
|
order_ref = huaxin_util.create_order_ref()
|
order(1, "000333", 100, 1.00, price_type=2, blocking=False, order_ref=order_ref, shadow_price=0.99)
|
time.sleep(30)
|
cancel_order(1, "000333", '123123', orderRef=order_ref, blocking=False)
|
|
|
queue_strategy_w_trade_r_for_read = None
|
queue_strategy_w_trade_r = None
|
|
|
def run():
|
global queue_strategy_w_trade_r_for_read, queue_strategy_w_trade_r
|
queue_strategy_r_trade_w, queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_read = multiprocessing.Queue(), multiprocessing.Queue(), multiprocessing.Queue()
|
order_ipc_addr, cancel_order_ipc_addr = "ipc://trade_order_ls.ipc", "ipc://trade_order_ls_cancel.ipc"
|
trade_process = multiprocessing.Process(
|
target=trade_client.run,
|
args=(order_ipc_addr, cancel_order_ipc_addr, queue_strategy_r_trade_w, queue_strategy_w_trade_r,
|
queue_strategy_w_trade_r_for_read))
|
trade_process.start()
|
|
t1 = threading.Thread(target=lambda: __run_recv_queue_trade(queue_strategy_r_trade_w), daemon=True)
|
t1.start()
|
t1 = threading.Thread(target=lambda: __run_save_data(), daemon=True)
|
t1.start()
|
t1 = threading.Thread(target=lambda: CancelOrderManager().run(cancel_order), daemon=True)
|
t1.start()
|
# 创建IPC发送端口
|
__create_trade_ipc_context((order_ipc_addr, cancel_order_ipc_addr))
|
|
|
# 交易通道的错误次数
|
trade_pipe_channel_error_count = 0
|
|
|
# pipe的交易通道是否正常
|
def is_pipe_channel_normal():
|
return True
|
|
|
# 测试交易通道
|
def test_trade_channel():
|
global trade_pipe_channel_error_count
|
sid = random.randint(0, 1000000)
|
result = __test_trade_channel(sid)
|
if result["code"] == 0 and result["data"]["data"]["sid"] == sid:
|
trade_pipe_channel_error_count = 0
|
return True
|
trade_pipe_channel_error_count += 1
|
if trade_pipe_channel_error_count > 100:
|
trade_pipe_channel_error_count = 100
|
return False
|
|
|
class ClientSocketManager:
|
# 客户端类型
|
CLIENT_TYPE_TRADE = "trade"
|
CLIENT_TYPE_DELEGATE_LIST = "delegate_list"
|
CLIENT_TYPE_DEAL_LIST = "deal_list"
|
CLIENT_TYPE_POSITION_LIST = "position_list"
|
CLIENT_TYPE_MONEY = "money"
|
CLIENT_TYPE_DEAL = "deal"
|
CLIENT_TYPE_CMD_L2 = "l2_cmd"
|
socket_client_dict = {}
|
socket_client_lock_dict = {}
|
active_client_dict = {}
|
|
@classmethod
|
def list_client(cls, _type):
|
if _type == cls.CLIENT_TYPE_TRADE:
|
if _type in cls.socket_client_dict:
|
return cls.socket_client_dict.get(_type)
|
else:
|
if _type in cls.socket_client_dict:
|
return [cls.socket_client_dict.get(_type)]
|
return []
|
|
@classmethod
|
def add_client(cls, _type, rid, sk):
|
if _type == cls.CLIENT_TYPE_TRADE:
|
# 交易列表
|
if _type not in cls.socket_client_dict:
|
cls.socket_client_dict[_type] = []
|
cls.socket_client_dict[_type].append((rid, sk))
|
cls.socket_client_lock_dict[rid] = threading.Lock()
|
hx_logger_trade_debug.info(f"add_client:{rid}")
|
else:
|
cls.socket_client_dict[_type] = (rid, sk)
|
cls.socket_client_lock_dict[rid] = threading.Lock()
|
|
# 是否已经被锁住
|
@classmethod
|
def is_client_locked(cls, rid):
|
if rid in cls.socket_client_lock_dict:
|
return cls.socket_client_lock_dict[rid].locked()
|
return None
|
|
@classmethod
|
def acquire_client(cls, _type):
|
if _type == cls.CLIENT_TYPE_TRADE:
|
if _type in cls.socket_client_dict:
|
# 根据排序活跃时间排序
|
client_list = sorted(cls.socket_client_dict[_type], key=lambda x: cls.active_client_dict.get(x[0]) if x[
|
0] in cls.active_client_dict else 0,
|
reverse=True)
|
hx_logger_trade_debug.info(f"acquire_client client_list数量:{len(client_list)}")
|
hx_logger_trade_debug.info(
|
f"acquire_client socket_client_lock_dict数量:{len(cls.socket_client_lock_dict.keys())}")
|
for d in client_list:
|
if d[0] in cls.socket_client_lock_dict:
|
try:
|
if cls.socket_client_lock_dict[d[0]].acquire(blocking=False):
|
hx_logger_trade_debug.info(f"acquire_client success:{d[0]}")
|
return d
|
except threading.TimeoutError:
|
hx_logger_trade_debug.error("acquire_client TimeoutError")
|
else:
|
if _type in cls.socket_client_dict:
|
try:
|
d = cls.socket_client_dict[_type]
|
if d[0] in cls.socket_client_lock_dict:
|
if cls.socket_client_lock_dict[d[0]].acquire(blocking=False):
|
return d
|
except threading.TimeoutError:
|
pass
|
return None
|
|
@classmethod
|
def release_client(cls, client_id):
|
sucess = False
|
if client_id in cls.socket_client_lock_dict:
|
sucess = True
|
# 释放锁
|
if cls.socket_client_lock_dict[client_id].locked():
|
cls.socket_client_lock_dict[client_id].release()
|
if sucess:
|
hx_logger_trade_debug.info(f"release_client success:{client_id}")
|
else:
|
hx_logger_trade_debug.info(f"release_client fail:{client_id}")
|
|
@classmethod
|
def del_client(cls, rid):
|
# 删除线程锁
|
if rid in cls.socket_client_lock_dict:
|
cls.socket_client_lock_dict.pop(rid)
|
# 删除sk
|
for t in cls.socket_client_dict:
|
if type(cls.socket_client_dict[t]) == list:
|
for d in cls.socket_client_dict[t]:
|
if d[0] == rid:
|
try:
|
# 关闭socket
|
d[1].close()
|
except:
|
pass
|
cls.socket_client_dict[t].remove(d)
|
break
|
|
elif type(cls.socket_client_dict[t]) == tuple:
|
if cls.socket_client_dict[t][0] == rid:
|
try:
|
# 关闭socket
|
cls.socket_client_dict[t][1].close()
|
except:
|
pass
|
cls.socket_client_dict.pop(t)
|
break
|
|
# 心跳信息
|
@classmethod
|
def heart(cls, rid):
|
cls.active_client_dict[rid] = time.time()
|
|
@classmethod
|
def del_invalid_clients(cls):
|
# 清除长时间无心跳的客户端通道
|
for k in cls.active_client_dict.keys():
|
if time.time() - cls.active_client_dict[k] > 20:
|
# 心跳时间间隔20s以上视为无效
|
cls.del_client(k)
|
|
|
TRADE_DIRECTION_BUY = 1
|
TRADE_DIRECTION_SELL = 2
|
|
# 超时时间2s
|
TIMEOUT = 2.0
|
|
# 等待响应的request_id
|
__request_response_dict = {}
|
|
|
def __get_request_id(type):
|
return f"r_{type}_{round(time.time() * 10000)}_{random.randint(0, 100000)}"
|
|
|
# 网络请求
|
def __request(_type, data, request_id=None, log_enable=True, is_trade=False):
|
"""
|
请求,将交易(包含下单/撤单)与查询(包含查持仓/账户可用金额/委托列表/成交列表)队列分离
|
@param _type:
|
@param data:
|
@param request_id:
|
@param log_enable:
|
@param is_trade:
|
@return:
|
"""
|
if not request_id:
|
request_id = __get_request_id(_type)
|
try:
|
if log_enable:
|
async_log_util.info(hx_logger_trade_loop, "请求发送开始:client_id-{} request_id-{}", 0, request_id)
|
root_data = {"type": _type,
|
"data": data,
|
"request_id": request_id,
|
"time": time.time()
|
}
|
root_data = socket_util.encryp_client_params_sign(root_data)
|
start_time = time.time()
|
if is_trade:
|
# queue_strategy_w_trade_r.put_nowait(root_data)
|
# 采用zmq通信
|
if data['trade_type'] == 1:
|
__order_by_zmq(root_data)
|
elif data['trade_type'] == 2:
|
__cancel_order_by_zmq(root_data)
|
else:
|
queue_strategy_w_trade_r.put_nowait(root_data)
|
else:
|
queue_strategy_w_trade_r_for_read.put_nowait(root_data)
|
|
use_time = int((time.time() - start_time) * 1000)
|
if use_time > 10:
|
async_log_util.info(hx_logger_trade_loop, f"发送耗时:request_id-{request_id} 耗时时间:{use_time}")
|
if log_enable:
|
async_log_util.info(hx_logger_trade_loop, "请求发送成功:request_id-{}", request_id)
|
except BrokenPipeError as e:
|
async_log_util.info(hx_logger_trade_loop, "请求发送异常:request_id-{} error-{}", request_id, str(e))
|
raise e
|
except Exception as e:
|
async_log_util.info(hx_logger_trade_loop, "请求发送异常: request_id-{} error-{}", request_id, str(e))
|
logging.exception(e)
|
raise e
|
return request_id
|
|
|
def __read_response(request_id, blocking, timeout=TIMEOUT, log_enable=True):
|
if blocking:
|
start_time = time.time()
|
try:
|
while True:
|
time.sleep(0.005)
|
if request_id in __request_response_dict:
|
# 获取到了响应内容
|
result = __request_response_dict.pop(request_id)
|
if log_enable:
|
async_log_util.info(hx_logger_trade_loop, "请求读取成功: request_id-{}", request_id)
|
return result
|
if time.time() - start_time > timeout:
|
if log_enable:
|
async_log_util.info(hx_logger_trade_loop, "请求读取超时: request_id-{}", request_id)
|
# 读取内容超时才会释放
|
raise Exception(f"读取内容超时: request_id={request_id}")
|
finally:
|
pass
|
|
return None
|
|
|
__TradeOrderIdManager = TradeOrderIdManager()
|
|
|
def set_response(request_id, response):
|
if request_id:
|
async_log_util.info(hx_logger_trade_loop, f"请求响应: request_id-{request_id} 内容-{response}")
|
# 主动触发
|
__request_response_dict[request_id] = response
|
else:
|
# 被动触发
|
pass
|
|
|
def order(direction, code, volume, price, price_type=2, blocking=False, sinfo=None, request_id=None,
|
order_ref=None, shadow_price=None, shadow_volume=100):
|
"""
|
下单委托
|
@param shadow_volume: 影子单的量
|
@param direction: 1-买 2-卖
|
@param code:
|
@param volume:交易量
|
@param price:价格(如果是卖时不传价格就按照5挡价卖)
|
@param price_type:
|
@param blocking:是否阻塞进程
|
@param sinfo:
|
@param request_id:
|
@param order_ref:
|
@param shadow_price:
|
@return:
|
"""
|
timestamp = round(time.time() * 1000)
|
if not sinfo:
|
sinfo = f"ba_{code}_{timestamp}"
|
if not order_ref:
|
order_ref = huaxin_util.create_order_ref()
|
if not request_id:
|
request_id = __get_request_id(ClientSocketManager.CLIENT_TYPE_TRADE)
|
for i in range(1):
|
cancel_shadow = True
|
if int(tool.get_now_time_str().replace(":", "")) < int("091500"):
|
# 预埋单不能撤影子单
|
cancel_shadow = False
|
request_id = __request(ClientSocketManager.CLIENT_TYPE_TRADE,
|
{"type": ClientSocketManager.CLIENT_TYPE_TRADE, "trade_type": 1,
|
"direction": direction,
|
"code": code,
|
"order_ref": order_ref,
|
"volume": volume,
|
"price_type": price_type,
|
"price": price,
|
"shadow_price": shadow_price,
|
"shadow_volume": shadow_volume,
|
"sinfo": sinfo,
|
"blocking": blocking,
|
"cancel_shadow": cancel_shadow},
|
request_id=request_id,
|
is_trade=True)
|
try:
|
if blocking:
|
return __read_response(request_id, blocking)
|
else:
|
return {"order_ref": order_ref}
|
finally:
|
# huaxin_trade_data_update.add_delegate_list("下单", delay=0.2)
|
huaxin_trade_data_update.add_money_list()
|
|
|
def order_new(direction, code, order_info_list, price_type=2, blocking=False, sinfo=None, request_id=None):
|
"""
|
下单委托
|
@param direction:
|
@param code:
|
@param order_info_list: 下单信息:[(量,价, order_ref),(量,价, order_ref)]
|
@param price_type:
|
@param blocking: 是否阻塞进程
|
@param sinfo:
|
@param request_id:
|
@return:
|
"""
|
timestamp = round(time.time() * 1000)
|
if not sinfo:
|
sinfo = f"ba_{code}_{timestamp}"
|
if not request_id:
|
request_id = __get_request_id(ClientSocketManager.CLIENT_TYPE_TRADE)
|
for i in range(1):
|
request_id = __request(ClientSocketManager.CLIENT_TYPE_TRADE,
|
{"type": ClientSocketManager.CLIENT_TYPE_TRADE, "trade_type": 1,
|
"direction": direction,
|
"code": code,
|
"order_info_list": order_info_list,
|
"price_type": price_type,
|
"sinfo": sinfo,
|
"blocking": blocking,
|
"cancel_shadow": False},
|
request_id=request_id,
|
is_trade=True)
|
try:
|
if blocking:
|
return __read_response(request_id, blocking)
|
else:
|
return {"order_ref_list": [x[2] for x in order_info_list]}
|
finally:
|
# huaxin_trade_data_update.add_delegate_list("下单", delay=0.2)
|
huaxin_trade_data_update.add_money_list()
|
|
|
__canceling_order_dict = {}
|
|
|
def cancel_order(direction, code, orderSysID, orderRef=None, blocking=False, sinfo=None, request_id=None,
|
recancel=False):
|
"""
|
测单
|
@param direction: 1-买 2-卖
|
@param code:
|
@param orderSysID:
|
@param orderRef:
|
@param blocking:
|
@param sinfo:
|
@param request_id:
|
@param recancel:
|
@return:
|
"""
|
if tool.trade_time_sub(tool.get_now_time_str(), "14:57:00") >= 0 and tool.trade_time_sub(tool.get_now_time_str(),
|
"15:00:01") <= 0:
|
# 集合竞价不撤单
|
return
|
|
if not recancel:
|
CancelOrderManager().start_cancel(code, orderRef, orderSysID)
|
if not sinfo:
|
sinfo = f"cb_{code}_{round(time.time() * 1000)}_{random.randint(0, 10000)}"
|
order_action_ref = huaxin_util.create_order_ref()
|
if not request_id:
|
request_id = __get_request_id(ClientSocketManager.CLIENT_TYPE_TRADE)
|
# 加入撤单记录,用于校验最后的撤单是否成功
|
if code not in __canceling_order_dict:
|
__canceling_order_dict[code] = set()
|
__canceling_order_dict[code].add(json.dumps((orderRef, orderSysID)))
|
# 执行2次撤单,防止没有撤到
|
for i in range(2):
|
request_id = __request(ClientSocketManager.CLIENT_TYPE_TRADE,
|
{"type": ClientSocketManager.CLIENT_TYPE_TRADE, "trade_type": 2,
|
"direction": direction,
|
"code": code,
|
"orderRef": orderRef,
|
"orderActionRef": order_action_ref,
|
"orderSysID": orderSysID, "sinfo": sinfo}, request_id=request_id, is_trade=True)
|
try:
|
return __read_response(request_id, blocking)
|
finally:
|
huaxin_trade_data_update.add_money_list()
|
|
|
def batch_cancel_order(direction, code, orderInfos: list, blocking=False,
|
request_id=None,
|
recancel=False):
|
"""
|
测单
|
@param direction: 1-买 2-卖
|
@param code:
|
@param orderInfos:[(orderRef, orderSysID)]
|
@param blocking:
|
@param request_id:
|
@param recancel:
|
@return:
|
"""
|
if tool.trade_time_sub(tool.get_now_time_str(), "14:57:00") >= 0 and tool.trade_time_sub(tool.get_now_time_str(),
|
"15:00:01") <= 0:
|
# 集合竞价不撤单
|
return
|
|
if not recancel:
|
for orderInfo in orderInfos:
|
CancelOrderManager().start_cancel(code, orderInfo[0], orderInfo[1])
|
sinfos = []
|
for i in range(len(orderInfos)):
|
sinfo = f"cb_{code}_{round(time.time() * 1000)}_{random.randint(0, 10000)}_{i}"
|
sinfos.append(sinfo)
|
order_action_refs = []
|
for i in range(len(orderInfos)):
|
order_action_ref = huaxin_util.create_order_ref()
|
order_action_refs.append(order_action_ref)
|
if not request_id:
|
request_id = __get_request_id(ClientSocketManager.CLIENT_TYPE_TRADE)
|
# 加入撤单记录,用于校验最后的撤单是否成功
|
if code not in __canceling_order_dict:
|
__canceling_order_dict[code] = set()
|
for orderInfo in orderInfos:
|
__canceling_order_dict[code].add(json.dumps((orderInfo[0], orderInfo[1])))
|
# 执行2次撤单,防止没有撤到
|
for i in range(2):
|
request_id = __request(ClientSocketManager.CLIENT_TYPE_TRADE,
|
{"type": ClientSocketManager.CLIENT_TYPE_TRADE, "trade_type": 2,
|
"direction": direction,
|
"code": code,
|
"orderInfos": orderInfos,
|
"orderActionRefs": order_action_refs,
|
"sinfos": sinfos}, request_id=request_id, is_trade=True)
|
try:
|
return __read_response(request_id, blocking)
|
finally:
|
huaxin_trade_data_update.add_money_list()
|
|
|
# CLIENT_TYPE_DELEGATE_LIST = "delegate_list"
|
# CLIENT_TYPE_DEAL_LIST = "deal_list"
|
# CLIENT_TYPE_POSITION_LIST = "position_list"
|
# CLIENT_TYPE_MONEY = "money"
|
# CLIENT_TYPE_DEAL = "deal"
|
|
# 获取委托列表
|
# can_cancel:是否可以撤
|
def get_delegate_list(can_cancel=True, blocking=True, timeout=TIMEOUT):
|
request_id = __request(ClientSocketManager.CLIENT_TYPE_DELEGATE_LIST,
|
{"type": ClientSocketManager.CLIENT_TYPE_DELEGATE_LIST,
|
"can_cancel": 1 if can_cancel else 0})
|
|
return __read_response(request_id, blocking, timeout=timeout)
|
|
|
# 获取成交列表
|
def get_deal_list(blocking=True, timeout=TIMEOUT):
|
request_id = __request(ClientSocketManager.CLIENT_TYPE_DEAL_LIST,
|
{"type": ClientSocketManager.CLIENT_TYPE_DEAL_LIST})
|
return __read_response(request_id, blocking, timeout=timeout)
|
|
|
# 获取持仓列表
|
def get_position_list(blocking=True):
|
request_id = __request(ClientSocketManager.CLIENT_TYPE_POSITION_LIST,
|
{"type": ClientSocketManager.CLIENT_TYPE_POSITION_LIST})
|
return __read_response(request_id, blocking)
|
|
|
# 获取账户资金状况
|
def get_money(blocking=True):
|
request_id = __request(ClientSocketManager.CLIENT_TYPE_MONEY,
|
{"type": ClientSocketManager.CLIENT_TYPE_MONEY})
|
return __read_response(request_id, blocking)
|
|
|
# 设置L2订阅数据
|
def set_l2_codes_data(codes_data, blocking=True):
|
request_id = __request(ClientSocketManager.CLIENT_TYPE_CMD_L2,
|
{"type": ClientSocketManager.CLIENT_TYPE_CMD_L2, "data": codes_data})
|
return __read_response(request_id, blocking)
|
|
|
# 设置L2订阅数据
|
def __test_trade_channel(sid):
|
request_id = __request("test",
|
{"type": "test", "data": {"sid": sid}}, log_enable=False)
|
return __read_response(request_id, True, log_enable=False)
|
|
|
def parseResponse(data_str):
|
if not data_str:
|
raise Exception("反馈内容为空")
|
res = data_str
|
if type(res) == str:
|
res = json.loads(data_str)
|
res = res['data']
|
if res['code'] != 0:
|
raise Exception(res['msg'])
|
return res['data']
|
|
|
if __name__ == "__main__":
|
pass
|