"""
|
交易API
|
"""
|
import json
|
import logging
|
import random
|
import time
|
|
from socket_manager import ClientSocketManager
|
from utils import socket_util
|
|
TRADE_DIRECTION_BUY = 1
|
TRADE_DIRECTION_SELL = 2
|
|
# 数据操作
|
OPERRATE_SET = 1 # 设置
|
OPERRATE_DELETE = 2 # 删除
|
OPERRATE_GET = 3 # 获取
|
|
# 代码名单类型
|
CODE_LIST_WHITE = "white"
|
CODE_LIST_BLACK = "black"
|
CODE_LIST_WANT = "want"
|
CODE_LIST_PAUSE_BUY = "pause_buy"
|
|
# 类型
|
API_TYPE_TRADE = "trade" # 交易
|
API_TYPE_TRADE_STATE = "trade_state" # 交易状态
|
API_TYPE_TRADE_MODE = "trade_mode" # 交易模式
|
API_TYPE_CODE_LIST = "code_list" # 代码名单
|
API_TYPE_EXPORT_L2 = "export_l2" # 导出L2数据
|
API_TYPE_INIT = "init" # 初始化
|
API_TYPE_REFRESH_TRADE_DATA = "refresh_trade_data" # 交易数据刷新
|
API_TYPE_CODE_ATRRIBUTE = "code_attribute" # 代码属性
|
API_TYPE_CODE_TRADE_STATE = "code_trade_state" # 代码交易状态
|
API_TYPE_GET_ENV = "get_env" # 获取环境信息
|
# 超时时间2s
|
TIMEOUT = 5.0
|
|
# 等待响应的request_id
|
__request_response_dict = {}
|
|
|
def __get_request_id(type):
|
return f"r_{type}_{round(time.time() * 10000)}_{random.randint(0, 100000)}"
|
|
|
# 网络请求
|
def __request(client_type, data):
|
client = ClientSocketManager.acquire_client(client_type)
|
if not client:
|
raise Exception("无可用的交易client")
|
request_id = __get_request_id(data["type"])
|
try:
|
root_data = {"type": "cmd",
|
"data": data,
|
"request_id": request_id}
|
root_data = socket_util.encryp_client_params_sign(root_data)
|
# print("请求前对象", root_data)
|
# 添加请求头
|
client[1].sendall(socket_util.load_header(json.dumps(root_data).encode(encoding='utf-8')))
|
result = client[1].recv(1024)
|
# print("请求发送成功", result.decode(encoding='utf-8'))
|
|
except BrokenPipeError as e:
|
ClientSocketManager.del_client(client[0])
|
raise e
|
except Exception as e:
|
logging.exception(e)
|
raise e
|
return request_id, client
|
|
|
def __read_response(client, request_id, blocking, timeout=TIMEOUT):
|
if blocking:
|
start_time = time.time()
|
try:
|
while True:
|
time.sleep(0.005)
|
if request_id in __request_response_dict:
|
# 获取到了响应内容
|
result = __request_response_dict.pop(request_id)
|
return result
|
if time.time() - start_time > timeout:
|
# 读取内容超时才会释放
|
ClientSocketManager.release_client(client[0])
|
raise Exception(f"读取内容超时: request_id={request_id}")
|
finally:
|
pass
|
|
return None
|
|
|
def set_response(client_id, request_id, response):
|
if client_id and request_id:
|
# 主动触发
|
__request_response_dict[request_id] = response
|
ClientSocketManager.release_client(client_id)
|
else:
|
# 被动触发
|
pass
|
|
|
def parseResponse(data_str):
|
if not data_str:
|
raise Exception("反馈内容为空")
|
res = data_str
|
if type(res) == str:
|
res = json.loads(data_str)
|
res = res['data']
|
if res['code'] != 0:
|
raise Exception(res['msg'])
|
return res['data']
|
|
|
# 下单委托
|
# direction 1-买 2-卖
|
# code:代码
|
# volume:交易量
|
# price:价格(如果是卖时不传价格就按照5挡价卖)
|
# blocking是否阻塞进程
|
def trade_order(direction, code, volume, price, price_type=2, blocking=True):
|
print("客户端", ClientSocketManager.socket_client_dict)
|
request_id, client = __request(ClientSocketManager.CLIENT_TYPE_TRADE,
|
{"type": API_TYPE_TRADE, "trade_type": 1,
|
"direction": direction,
|
"code": code,
|
"volume": volume,
|
"price_type": price_type,
|
"price": price, "sinfo": f"order_{code}_{round(time.time() * 1000)}"})
|
|
return __read_response(client, request_id, blocking)
|
|
|
# 取消下单
|
def trade_cancel_order(direction, code, accountID, orderSysID, blocking=True):
|
request_id, client = __request(ClientSocketManager.CLIENT_TYPE_TRADE,
|
{"type": API_TYPE_TRADE, "trade_type": 2,
|
"direction": direction,
|
"code": code,
|
"accountID": accountID,
|
"orderSysID": orderSysID,
|
"sinfo": f"cancel_order_{code}_{round(time.time() * 1000)}"})
|
return __read_response(client, request_id, blocking)
|
|
|
# 设置交易状态
|
def set_trade_state(state, blocking=True):
|
request_id, client = __request(ClientSocketManager.CLIENT_TYPE_TRADE,
|
{"type": API_TYPE_TRADE_STATE, "operate": OPERRATE_SET,
|
"state": state,
|
"sinfo": f"cb_{API_TYPE_TRADE_STATE}_{round(time.time() * 1000)}"})
|
return __read_response(client, request_id, blocking)
|
|
|
# 获取交易状态
|
def get_trade_state(blocking=True):
|
request_id, client = __request(ClientSocketManager.CLIENT_TYPE_TRADE,
|
{"type": API_TYPE_TRADE_STATE, "operate": OPERRATE_GET,
|
"sinfo": f"cb_{API_TYPE_TRADE_STATE}_{round(time.time() * 1000)}"})
|
return __read_response(client, request_id, blocking)
|
|
|
# 设置交易模式
|
def set_trade_mode(mode, blocking=True):
|
request_id, client = __request(ClientSocketManager.CLIENT_TYPE_TRADE,
|
{"type": API_TYPE_TRADE_MODE, "operate": OPERRATE_SET, "mode": mode,
|
"sinfo": f"cb_{API_TYPE_TRADE_MODE}_{round(time.time() * 1000)}"})
|
return __read_response(client, request_id, blocking)
|
|
|
# 获取交易模式
|
def get_trade_mode(blocking=True):
|
request_id, client = __request(ClientSocketManager.CLIENT_TYPE_TRADE,
|
{"type": API_TYPE_TRADE_MODE, "operate": OPERRATE_GET,
|
"sinfo": f"cb_{API_TYPE_TRADE_MODE}_{round(time.time() * 1000)}"})
|
return __read_response(client, request_id, blocking)
|
|
|
# -----代码名单操作----
|
def add_code_list(code, code_list_type, blocking=True):
|
request_id, client = __request(ClientSocketManager.CLIENT_TYPE_TRADE,
|
{"type": API_TYPE_CODE_LIST, "code_list_type": code_list_type, "code": code,
|
"operate": OPERRATE_SET,
|
"sinfo": f"cb_{API_TYPE_CODE_LIST}_{round(time.time() * 1000)}"})
|
return __read_response(client, request_id, blocking)
|
|
|
def remove_code_list(code, code_list_type, blocking=True):
|
request_id, client = __request(ClientSocketManager.CLIENT_TYPE_TRADE,
|
{"type": API_TYPE_CODE_LIST, "code_list_type": code_list_type, "code": code,
|
"operate": OPERRATE_DELETE,
|
"sinfo": f"cb_{API_TYPE_CODE_LIST}_{round(time.time() * 1000)}"})
|
return __read_response(client, request_id, blocking)
|
|
|
def get_code_list(code_list_type, blocking=True):
|
request_id, client = __request(ClientSocketManager.CLIENT_TYPE_TRADE,
|
{"type": API_TYPE_CODE_LIST, "code_list_type": code_list_type,
|
"operate": OPERRATE_GET,
|
"sinfo": f"cb_{API_TYPE_CODE_LIST}_{round(time.time() * 1000)}"})
|
return __read_response(client, request_id, blocking)
|
|
|
# -----导出L2数据----
|
def export_l2_data(code, blocking=True):
|
request_id, client = __request(ClientSocketManager.CLIENT_TYPE_TRADE,
|
{"type": API_TYPE_EXPORT_L2, "code": code,
|
"sinfo": f"cb_{API_TYPE_CODE_LIST}_{round(time.time() * 1000)}"})
|
return __read_response(client, request_id, blocking)
|
|
|
# -----每日初始化----
|
def everyday_init(blocking=True):
|
request_id, client = __request(ClientSocketManager.CLIENT_TYPE_TRADE,
|
{"type": API_TYPE_INIT,
|
"sinfo": f"cb_{API_TYPE_CODE_LIST}_{round(time.time() * 1000)}"})
|
return __read_response(client, request_id, blocking)
|
|
|
# 刷新交易数据
|
def refresh_trade_data(type, blocking=True):
|
request_id, client = __request(ClientSocketManager.CLIENT_TYPE_TRADE,
|
{"type": API_TYPE_REFRESH_TRADE_DATA, "ctype": type,
|
"sinfo": f"update_trade_data_{round(time.time() * 1000)}"})
|
return __read_response(client, request_id, blocking)
|
|
|
# 获取代码属性
|
def get_code_attribute(code, blocking=True):
|
request_id, client = __request(ClientSocketManager.CLIENT_TYPE_TRADE,
|
{"type": API_TYPE_CODE_ATRRIBUTE, "code": code,
|
"sinfo": f"update_trade_data_{round(time.time() * 1000)}"})
|
return __read_response(client, request_id, blocking)
|
|
|
# 获取代码交易状态
|
def get_code_trade_state(code, blocking=True):
|
request_id, client = __request(ClientSocketManager.CLIENT_TYPE_TRADE,
|
{"type": API_TYPE_CODE_TRADE_STATE, "code": code,
|
"sinfo": f"update_trade_data_{round(time.time() * 1000)}"})
|
return __read_response(client, request_id, blocking)
|
|
|
# 获取环境信息
|
def get_env_info(blocking=True):
|
request_id, client = __request(ClientSocketManager.CLIENT_TYPE_TRADE,
|
{"type": API_TYPE_GET_ENV,
|
"sinfo": f"update_trade_data_{round(time.time() * 1000)}"})
|
return __read_response(client, request_id, blocking)
|
|
|
if __name__ == "__main__":
|
d = {"id": "123123"}
|
print(d.pop("id"))
|