# -*- coding: utf-8 -*-
|
"""
|
交易API
|
"""
|
import json
|
import logging
|
import random
|
import threading
|
import time
|
|
import crypt
|
from log_module.log import printlog
|
from utils import socket_util
|
|
|
class ClientSocketManager:
|
# 客户端类型
|
CLIENT_TYPE_TRADE = "trade"
|
CLIENT_TYPE_DELEGATE_LIST = "delegate_list"
|
CLIENT_TYPE_DEAL_LIST = "deal_list"
|
CLIENT_TYPE_POSITION_LIST = "position_list"
|
CLIENT_TYPE_MONEY = "money"
|
CLIENT_TYPE_DEAL = "deal"
|
CLIENT_TYPE_CMD_L2 = "l2_cmd"
|
socket_client_dict = {}
|
socket_client_lock_dict = {}
|
active_client_dict = {}
|
|
@classmethod
|
def add_client(cls, _type, rid, sk):
|
if _type == cls.CLIENT_TYPE_TRADE:
|
# 交易列表
|
if _type not in cls.socket_client_dict:
|
cls.socket_client_dict[_type] = []
|
cls.socket_client_dict[_type].append((rid, sk))
|
cls.socket_client_lock_dict[rid] = threading.Lock()
|
else:
|
cls.socket_client_dict[_type] = (rid, sk)
|
cls.socket_client_lock_dict[rid] = threading.Lock()
|
|
@classmethod
|
def acquire_client(cls, _type):
|
if _type == cls.CLIENT_TYPE_TRADE:
|
if _type in cls.socket_client_dict:
|
# 根据排序活跃时间排序
|
client_list = sorted(cls.socket_client_dict[_type], key=lambda x: cls.active_client_dict.get(x[0]) if x[
|
0] in cls.active_client_dict else 0,
|
reverse=True)
|
for d in client_list:
|
if d[0] in cls.socket_client_lock_dict:
|
try:
|
if cls.socket_client_lock_dict[d[0]].acquire(blocking=False):
|
return d
|
except threading.TimeoutError:
|
pass
|
else:
|
if _type in cls.socket_client_dict:
|
try:
|
d = cls.socket_client_dict[_type]
|
if d[0] in cls.socket_client_lock_dict:
|
if cls.socket_client_lock_dict[d[0]].acquire(blocking=False):
|
return d
|
except threading.TimeoutError:
|
pass
|
return None
|
|
@classmethod
|
def release_client(cls, rid):
|
if rid in cls.socket_client_lock_dict:
|
# 释放锁
|
cls.socket_client_lock_dict[rid].release()
|
|
@classmethod
|
def del_client(cls, rid):
|
# 删除线程锁
|
if rid in cls.socket_client_lock_dict:
|
cls.socket_client_lock_dict.pop(rid)
|
# 删除sk
|
for t in cls.socket_client_dict:
|
if type(cls.socket_client_dict[t]) == list:
|
for d in cls.socket_client_dict[t]:
|
if d[0] == rid:
|
cls.socket_client_dict[t].remove(d)
|
break
|
|
elif type(cls.socket_client_dict[t]) == tuple:
|
if cls.socket_client_dict[t][0] == rid:
|
cls.socket_client_dict.pop(t)
|
break
|
|
# 心跳信息
|
@classmethod
|
def heart(cls, rid):
|
cls.active_client_dict[rid] = time.time()
|
|
|
TRADE_DIRECTION_BUY = 1
|
TRADE_DIRECTION_SELL = 2
|
|
# 超时时间2s
|
TIMEOUT = 2.0
|
|
# 等待响应的request_id
|
__request_response_dict = {}
|
|
|
def __get_request_id(type):
|
return f"{type}_{round(time.time() * 10000)}_{random.randint(0, 100000)}"
|
|
|
# 网络请求
|
def __request(_type, data):
|
client = ClientSocketManager.acquire_client(_type)
|
if not client:
|
raise Exception("无可用的交易client")
|
try:
|
request_id = __get_request_id(_type)
|
root_data = {"type": "cmd",
|
"data": data,
|
"request_id": request_id}
|
str_list = []
|
for k in root_data:
|
if type(root_data[k]) == dict:
|
str_list.append(f"{k}={json.dumps(root_data[k])}")
|
else:
|
str_list.append(f"{k}={root_data[k]}")
|
str_list.sort()
|
str_list.append("%Yeshi2014@#.")
|
root_data["sign"] = crypt.md5_encrypt("&".join(str_list))
|
printlog("请求前对象", root_data)
|
# 添加请求头
|
client[1].sendall(socket_util.load_header(json.dumps(root_data).encode(encoding='utf-8')))
|
result = client[1].recv(1024)
|
printlog("请求发送成功", result.decode(encoding='utf-8'))
|
except BrokenPipeError as e:
|
ClientSocketManager.del_client(client[0])
|
raise e
|
except Exception as e:
|
logging.exception(e)
|
raise e
|
return request_id, client
|
|
|
def __read_response(client, request_id, blocking, timeout=TIMEOUT):
|
if blocking:
|
start_time = time.time()
|
while True:
|
time.sleep(0.01)
|
if request_id in __request_response_dict:
|
# 获取到了响应内容
|
result = __request_response_dict.pop(request_id)
|
return result
|
if time.time() - start_time > timeout:
|
ClientSocketManager.release_client(client[0])
|
raise Exception(f"读取内容超时: request_id={request_id}")
|
return None
|
|
|
def set_response(client_id, request_id, response):
|
if client_id and request_id:
|
# 主动触发
|
ClientSocketManager.release_client(client_id)
|
__request_response_dict[request_id] = response
|
else:
|
# 被动触发
|
pass
|
|
|
# 下单委托
|
# direction 1-买 2-卖
|
# code:代码
|
# volume:交易量
|
# price:价格(如果是卖时不传价格就按照5挡价卖)
|
# blocking是否阻塞进程
|
def order(direction, code, volume, price, price_type=2, blocking=True):
|
printlog("客户端", ClientSocketManager.socket_client_dict)
|
request_id, client = __request(ClientSocketManager.CLIENT_TYPE_TRADE,
|
{"type": ClientSocketManager.CLIENT_TYPE_TRADE, "trade_type": 1,
|
"direction": direction,
|
"code": code,
|
"volume": volume,
|
"price_type": price_type,
|
"price": price, "sinfo": f"b_{code}_{round(time.time() * 1000)}"})
|
|
return __read_response(client, request_id, blocking)
|
|
|
def cancel_order(direction, code, orderSysID, blocking=False):
|
request_id, client = __request(ClientSocketManager.CLIENT_TYPE_TRADE,
|
{"type": ClientSocketManager.CLIENT_TYPE_TRADE, "trade_type": 2,
|
"direction": direction,
|
"code": code,
|
"orderSysID": orderSysID, "sinfo": f"cb_{code}_{round(time.time() * 1000)}"})
|
|
return __read_response(client, request_id, blocking)
|
|
|
# CLIENT_TYPE_DELEGATE_LIST = "delegate_list"
|
# CLIENT_TYPE_DEAL_LIST = "deal_list"
|
# CLIENT_TYPE_POSITION_LIST = "position_list"
|
# CLIENT_TYPE_MONEY = "money"
|
# CLIENT_TYPE_DEAL = "deal"
|
|
# 获取委托列表
|
# can_cancel:是否可以撤
|
def get_delegate_list(can_cancel=True, blocking=True):
|
request_id, client = __request(ClientSocketManager.CLIENT_TYPE_DELEGATE_LIST,
|
{"type": ClientSocketManager.CLIENT_TYPE_DELEGATE_LIST,
|
"can_cancel": 1 if can_cancel else 0})
|
|
return __read_response(client, request_id, blocking)
|
|
|
# 获取成交列表
|
def get_deal_list(blocking=True):
|
request_id, client = __request(ClientSocketManager.CLIENT_TYPE_DEAL_LIST,
|
{"type": ClientSocketManager.CLIENT_TYPE_DEAL_LIST})
|
return __read_response(client, request_id, blocking)
|
|
|
# 获取持仓列表
|
def get_position_list(blocking=True):
|
request_id, client = __request(ClientSocketManager.CLIENT_TYPE_POSITION_LIST,
|
{"type": ClientSocketManager.CLIENT_TYPE_POSITION_LIST})
|
return __read_response(client, request_id, blocking)
|
|
|
# 获取账户资金状况
|
def get_money(blocking=True):
|
request_id, client = __request(ClientSocketManager.CLIENT_TYPE_MONEY,
|
{"type": ClientSocketManager.CLIENT_TYPE_MONEY})
|
return __read_response(client, request_id, blocking)
|
|
|
# 设置L2订阅数据
|
def set_l2_codes_data(codes_data, blocking=True):
|
request_id, client = __request(ClientSocketManager.CLIENT_TYPE_CMD_L2,
|
{"type": ClientSocketManager.CLIENT_TYPE_CMD_L2, "data": codes_data})
|
return __read_response(client, request_id, blocking)
|
|
|
if __name__ == "__main__":
|
d = {"id": "123123"}
|
printlog(d.pop("id"))
|