"""
|
交易API
|
"""
|
import json
|
import logging
|
import multiprocessing
|
import queue
|
import random
|
import threading
|
import time
|
import concurrent.futures
|
|
import requests
|
|
from utils import socket_util, tool
|
|
__response_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=15)
|
__save_data_queue = queue.Queue()
|
|
|
def __run_recv_queue_trade(queue: multiprocessing.Queue):
|
# 设置结果
|
def __set_response(data_json):
|
if 'request_id' not in data_json:
|
return
|
# 设置响应内容
|
set_response(data_json["request_id"], data_json['data'])
|
|
def __send_to_trade_server(data_str):
|
requests.post("http://183.234.94.164:12881/trade_callback", data_str)
|
|
if queue is not None:
|
while True:
|
try:
|
val = queue.get()
|
if val:
|
data_json = json.loads(val)
|
# 处理数据
|
type_ = data_json["type"]
|
if type_ == "response":
|
# 主动触发的响应
|
request_id = data_json['request_id']
|
|
__response_thread_pool.submit(__set_response, data_json)
|
elif type_ == "trade_callback":
|
try:
|
# 交易回调
|
data_json = data_json["data"]
|
# 将交易回调传递给交易服务器
|
__response_thread_pool.submit(__send_to_trade_server, val)
|
# 记录交易反馈日志
|
finally:
|
pass
|
except:
|
pass
|
|
|
# 设置交易通信队列
|
# 暂时不会使用该方法
|
def run_trade(queue_strategy_r_trade_w_, queue_strategy_w_trade_r_, queue_strategy_w_trade_r_for_query_):
|
"""
|
|
:param queue_strategy_r_trade_w_: 接收交易结果数据队列
|
:param queue_strategy_w_trade_r_: 发送交易指令队列
|
:param queue_strategy_w_trade_r_for_query_:发送查询的交易指令队列
|
:param trade_ipc_addr:
|
:return:
|
"""
|
global queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_query
|
queue_strategy_w_trade_r = queue_strategy_w_trade_r_
|
queue_strategy_w_trade_r_for_query = queue_strategy_w_trade_r_for_query_
|
|
# 读取交易结果
|
threading.Thread(target=lambda: __run_recv_queue_trade(queue_strategy_r_trade_w_), daemon=True).start()
|
|
|
# 交易通道的错误次数
|
trade_pipe_channel_error_count = 0
|
|
|
# pipe的交易通道是否正常
|
def is_pipe_channel_normal():
|
return True
|
|
|
# 测试交易通道
|
def test_trade_channel():
|
global trade_pipe_channel_error_count
|
sid = random.randint(0, 1000000)
|
result = __test_trade_channel(sid)
|
if result["code"] == 0 and result["data"]["data"]["sid"] == sid:
|
trade_pipe_channel_error_count = 0
|
return True
|
trade_pipe_channel_error_count += 1
|
if trade_pipe_channel_error_count > 100:
|
trade_pipe_channel_error_count = 100
|
return False
|
|
|
class ClientSocketManager:
|
# 客户端类型
|
CLIENT_TYPE_TRADE = "trade"
|
CLIENT_TYPE_DELEGATE_LIST = "delegate_list"
|
CLIENT_TYPE_DEAL_LIST = "deal_list"
|
CLIENT_TYPE_POSITION_LIST = "position_list"
|
CLIENT_TYPE_MONEY = "money"
|
CLIENT_TYPE_DEAL = "deal"
|
CLIENT_TYPE_CMD_L2 = "l2_cmd"
|
socket_client_dict = {}
|
socket_client_lock_dict = {}
|
active_client_dict = {}
|
|
@classmethod
|
def list_client(cls, _type):
|
if _type == cls.CLIENT_TYPE_TRADE:
|
if _type in cls.socket_client_dict:
|
return cls.socket_client_dict.get(_type)
|
else:
|
if _type in cls.socket_client_dict:
|
return [cls.socket_client_dict.get(_type)]
|
return []
|
|
@classmethod
|
def add_client(cls, _type, rid, sk):
|
if _type == cls.CLIENT_TYPE_TRADE:
|
# 交易列表
|
if _type not in cls.socket_client_dict:
|
cls.socket_client_dict[_type] = []
|
cls.socket_client_dict[_type].append((rid, sk))
|
cls.socket_client_lock_dict[rid] = threading.Lock()
|
else:
|
cls.socket_client_dict[_type] = (rid, sk)
|
cls.socket_client_lock_dict[rid] = threading.Lock()
|
|
# 是否已经被锁住
|
@classmethod
|
def is_client_locked(cls, rid):
|
if rid in cls.socket_client_lock_dict:
|
return cls.socket_client_lock_dict[rid].locked()
|
return None
|
|
@classmethod
|
def acquire_client(cls, _type):
|
if _type == cls.CLIENT_TYPE_TRADE:
|
if _type in cls.socket_client_dict:
|
# 根据排序活跃时间排序
|
client_list = sorted(cls.socket_client_dict[_type], key=lambda x: cls.active_client_dict.get(x[0]) if x[
|
0] in cls.active_client_dict else 0,
|
reverse=True)
|
|
for d in client_list:
|
if d[0] in cls.socket_client_lock_dict:
|
try:
|
if cls.socket_client_lock_dict[d[0]].acquire(blocking=False):
|
|
return d
|
except threading.TimeoutError:
|
pass
|
else:
|
if _type in cls.socket_client_dict:
|
try:
|
d = cls.socket_client_dict[_type]
|
if d[0] in cls.socket_client_lock_dict:
|
if cls.socket_client_lock_dict[d[0]].acquire(blocking=False):
|
return d
|
except threading.TimeoutError:
|
pass
|
return None
|
|
@classmethod
|
def release_client(cls, client_id):
|
sucess = False
|
if client_id in cls.socket_client_lock_dict:
|
sucess = True
|
# 释放锁
|
if cls.socket_client_lock_dict[client_id].locked():
|
cls.socket_client_lock_dict[client_id].release()
|
|
@classmethod
|
def del_client(cls, rid):
|
# 删除线程锁
|
if rid in cls.socket_client_lock_dict:
|
cls.socket_client_lock_dict.pop(rid)
|
# 删除sk
|
for t in cls.socket_client_dict:
|
if type(cls.socket_client_dict[t]) == list:
|
for d in cls.socket_client_dict[t]:
|
if d[0] == rid:
|
try:
|
# 关闭socket
|
d[1].close()
|
except:
|
pass
|
cls.socket_client_dict[t].remove(d)
|
break
|
|
elif type(cls.socket_client_dict[t]) == tuple:
|
if cls.socket_client_dict[t][0] == rid:
|
try:
|
# 关闭socket
|
cls.socket_client_dict[t][1].close()
|
except:
|
pass
|
cls.socket_client_dict.pop(t)
|
break
|
|
# 心跳信息
|
@classmethod
|
def heart(cls, rid):
|
cls.active_client_dict[rid] = time.time()
|
|
@classmethod
|
def del_invalid_clients(cls):
|
# 清除长时间无心跳的客户端通道
|
for k in cls.active_client_dict.keys():
|
if time.time() - cls.active_client_dict[k] > 20:
|
# 心跳时间间隔20s以上视为无效
|
cls.del_client(k)
|
|
|
TRADE_DIRECTION_BUY = 1
|
TRADE_DIRECTION_SELL = 2
|
|
# 超时时间2s
|
TIMEOUT = 2.0
|
# 交易代理
|
TRADE_DELEGATED = True
|
|
# 等待响应的request_id
|
__request_response_dict = {}
|
|
|
def __get_request_id(type):
|
return f"r_{type}_{round(time.time() * 10000)}_{random.randint(0, 100000)}"
|
|
|
# 网络请求
|
def __request(_type, data, request_id=None, log_enable=True, is_trade=False):
|
"""
|
请求,将交易(包含下单/撤单)与查询(包含查持仓/账户可用金额/委托列表/成交列表)队列分离
|
@param _type:
|
@param data:
|
@param request_id:
|
@param log_enable:
|
@param is_trade:
|
@return:
|
"""
|
if not request_id:
|
request_id = __get_request_id(_type)
|
try:
|
root_data = {"type": _type,
|
"data": data,
|
"request_id": request_id,
|
"time": time.time()
|
}
|
root_data = socket_util.encryp_client_params_sign(root_data)
|
start_time = time.time()
|
if is_trade:
|
queue_strategy_w_trade_r.put_nowait(root_data)
|
else:
|
queue_strategy_w_trade_r_for_query.put_nowait(root_data)
|
|
use_time = int((time.time() - start_time) * 1000)
|
except BrokenPipeError as e:
|
|
raise e
|
except Exception as e:
|
|
logging.exception(e)
|
raise e
|
return request_id
|
|
|
def __read_response(request_id, blocking, timeout=TIMEOUT, log_enable=True):
|
if blocking:
|
start_time = time.time()
|
try:
|
while True:
|
time.sleep(0.005)
|
if request_id in __request_response_dict:
|
# 获取到了响应内容
|
result = __request_response_dict.pop(request_id)
|
return result
|
if time.time() - start_time > timeout:
|
# 读取内容超时才会释放
|
raise Exception(f"读取内容超时: request_id={request_id}")
|
finally:
|
pass
|
|
return None
|
|
|
|
def set_response(request_id, response):
|
if request_id:
|
# 主动触发
|
__request_response_dict[request_id] = response
|
else:
|
# 被动触发
|
pass
|
|
|
def request(type_, data):
|
request_id = __request(type_,
|
data)
|
return __read_response(request_id, blocking=True)
|
|
|
# 设置L2订阅数据
|
def __test_trade_channel(sid):
|
request_id = __request("test",
|
{"type": "test", "data": {"sid": sid}}, log_enable=False)
|
return __read_response(request_id, True, log_enable=False)
|
|
|
def parseResponse(data_str):
|
if not data_str:
|
raise Exception("反馈内容为空")
|
res = data_str
|
if type(res) == str:
|
res = json.loads(data_str)
|
res = res['data']
|
if res['code'] != 0:
|
raise Exception(res['msg'])
|
return res['data']
|
|
|
if __name__ == "__main__":
|
requests.post("http://192.168.3.241:12881/trade_callback", json.dumps({"code":"123123"}))
|