""" 交易API """ import json import logging import random import time from socket_manager import ClientSocketManager from utils import socket_util TRADE_DIRECTION_BUY = 1 TRADE_DIRECTION_SELL = 2 # 数据操作 OPERRATE_SET = 1 # 设置 OPERRATE_DELETE = 2 # 删除 OPERRATE_GET = 3 # 获取 # 代码名单类型 CODE_LIST_WHITE = "white" CODE_LIST_BLACK = "black" CODE_LIST_WANT = "want" CODE_LIST_PAUSE_BUY = "pause_buy" # 类型 API_TYPE_TRADE = "trade" # 交易 API_TYPE_TRADE_STATE = "trade_state" # 交易状态 API_TYPE_TRADE_MODE = "trade_mode" # 交易模式 API_TYPE_CODE_LIST = "code_list" # 代码名单 API_TYPE_EXPORT_L2 = "export_l2" # 导出L2数据 API_TYPE_INIT = "init" # 初始化 API_TYPE_REFRESH_TRADE_DATA = "refresh_trade_data" # 交易数据刷新 API_TYPE_CODE_ATRRIBUTE = "code_attribute" # 代码属性 API_TYPE_CODE_TRADE_STATE = "code_trade_state" # 代码交易状态 API_TYPE_GET_ENV = "get_env" # 获取环境信息 # 超时时间2s TIMEOUT = 5.0 # 等待响应的request_id __request_response_dict = {} def __get_request_id(type): return f"r_{type}_{round(time.time() * 10000)}_{random.randint(0, 100000)}" # 网络请求 def __request(client_type, data): client = ClientSocketManager.acquire_client(client_type) if not client: raise Exception("无可用的交易client") request_id = __get_request_id(data["type"]) try: root_data = {"type": "cmd", "data": data, "request_id": request_id} root_data = socket_util.encryp_client_params_sign(root_data) # print("请求前对象", root_data) # 添加请求头 client[1].sendall(socket_util.load_header(json.dumps(root_data).encode(encoding='utf-8'))) result = client[1].recv(1024) # print("请求发送成功", result.decode(encoding='utf-8')) except BrokenPipeError as e: ClientSocketManager.del_client(client[0]) raise e except Exception as e: logging.exception(e) raise e return request_id, client def __read_response(client, request_id, blocking, timeout=TIMEOUT): if blocking: start_time = time.time() try: while True: time.sleep(0.005) if request_id in __request_response_dict: # 获取到了响应内容 result = __request_response_dict.pop(request_id) return result if time.time() - start_time > timeout: # 读取内容超时才会释放 ClientSocketManager.release_client(client[0]) raise Exception(f"读取内容超时: request_id={request_id}") finally: pass return None def set_response(client_id, request_id, response): if client_id and request_id: # 主动触发 __request_response_dict[request_id] = response ClientSocketManager.release_client(client_id) else: # 被动触发 pass def parseResponse(data_str): if not data_str: raise Exception("反馈内容为空") res = data_str if type(res) == str: res = json.loads(data_str) res = res['data'] if res['code'] != 0: raise Exception(res['msg']) return res['data'] # 下单委托 # direction 1-买 2-卖 # code:代码 # volume:交易量 # price:价格(如果是卖时不传价格就按照5挡价卖) # blocking是否阻塞进程 def trade_order(direction, code, volume, price, price_type=2, blocking=True): print("客户端", ClientSocketManager.socket_client_dict) request_id, client = __request(ClientSocketManager.CLIENT_TYPE_TRADE, {"type": API_TYPE_TRADE, "trade_type": 1, "direction": direction, "code": code, "volume": volume, "price_type": price_type, "price": price, "sinfo": f"order_{code}_{round(time.time() * 1000)}"}) return __read_response(client, request_id, blocking) # 取消下单 def trade_cancel_order(direction, code, accountID, orderSysID, blocking=True): request_id, client = __request(ClientSocketManager.CLIENT_TYPE_TRADE, {"type": API_TYPE_TRADE, "trade_type": 2, "direction": direction, "code": code, "accountID": accountID, "orderSysID": orderSysID, "sinfo": f"cancel_order_{code}_{round(time.time() * 1000)}"}) return __read_response(client, request_id, blocking) # 设置交易状态 def set_trade_state(state, blocking=True): request_id, client = __request(ClientSocketManager.CLIENT_TYPE_TRADE, {"type": API_TYPE_TRADE_STATE, "operate": OPERRATE_SET, "state": state, "sinfo": f"cb_{API_TYPE_TRADE_STATE}_{round(time.time() * 1000)}"}) return __read_response(client, request_id, blocking) # 获取交易状态 def get_trade_state(blocking=True): request_id, client = __request(ClientSocketManager.CLIENT_TYPE_TRADE, {"type": API_TYPE_TRADE_STATE, "operate": OPERRATE_GET, "sinfo": f"cb_{API_TYPE_TRADE_STATE}_{round(time.time() * 1000)}"}) return __read_response(client, request_id, blocking) # 设置交易模式 def set_trade_mode(mode, blocking=True): request_id, client = __request(ClientSocketManager.CLIENT_TYPE_TRADE, {"type": API_TYPE_TRADE_MODE, "operate": OPERRATE_SET, "mode": mode, "sinfo": f"cb_{API_TYPE_TRADE_MODE}_{round(time.time() * 1000)}"}) return __read_response(client, request_id, blocking) # 获取交易模式 def get_trade_mode(blocking=True): request_id, client = __request(ClientSocketManager.CLIENT_TYPE_TRADE, {"type": API_TYPE_TRADE_MODE, "operate": OPERRATE_GET, "sinfo": f"cb_{API_TYPE_TRADE_MODE}_{round(time.time() * 1000)}"}) return __read_response(client, request_id, blocking) # -----代码名单操作---- def add_code_list(code, code_list_type, blocking=True): request_id, client = __request(ClientSocketManager.CLIENT_TYPE_TRADE, {"type": API_TYPE_CODE_LIST, "code_list_type": code_list_type, "code": code, "operate": OPERRATE_SET, "sinfo": f"cb_{API_TYPE_CODE_LIST}_{round(time.time() * 1000)}"}) return __read_response(client, request_id, blocking) def remove_code_list(code, code_list_type, blocking=True): request_id, client = __request(ClientSocketManager.CLIENT_TYPE_TRADE, {"type": API_TYPE_CODE_LIST, "code_list_type": code_list_type, "code": code, "operate": OPERRATE_DELETE, "sinfo": f"cb_{API_TYPE_CODE_LIST}_{round(time.time() * 1000)}"}) return __read_response(client, request_id, blocking) def get_code_list(code_list_type, blocking=True): request_id, client = __request(ClientSocketManager.CLIENT_TYPE_TRADE, {"type": API_TYPE_CODE_LIST, "code_list_type": code_list_type, "operate": OPERRATE_GET, "sinfo": f"cb_{API_TYPE_CODE_LIST}_{round(time.time() * 1000)}"}) return __read_response(client, request_id, blocking) # -----导出L2数据---- def export_l2_data(code, blocking=True): request_id, client = __request(ClientSocketManager.CLIENT_TYPE_TRADE, {"type": API_TYPE_EXPORT_L2, "code": code, "sinfo": f"cb_{API_TYPE_CODE_LIST}_{round(time.time() * 1000)}"}) return __read_response(client, request_id, blocking) # -----每日初始化---- def everyday_init(blocking=True): request_id, client = __request(ClientSocketManager.CLIENT_TYPE_TRADE, {"type": API_TYPE_INIT, "sinfo": f"cb_{API_TYPE_CODE_LIST}_{round(time.time() * 1000)}"}) return __read_response(client, request_id, blocking) # 刷新交易数据 def refresh_trade_data(type, blocking=True): request_id, client = __request(ClientSocketManager.CLIENT_TYPE_TRADE, {"type": API_TYPE_REFRESH_TRADE_DATA, "ctype": type, "sinfo": f"update_trade_data_{round(time.time() * 1000)}"}) return __read_response(client, request_id, blocking) # 获取代码属性 def get_code_attribute(code, blocking=True): request_id, client = __request(ClientSocketManager.CLIENT_TYPE_TRADE, {"type": API_TYPE_CODE_ATRRIBUTE, "code": code, "sinfo": f"update_trade_data_{round(time.time() * 1000)}"}) return __read_response(client, request_id, blocking) # 获取代码交易状态 def get_code_trade_state(code, blocking=True): request_id, client = __request(ClientSocketManager.CLIENT_TYPE_TRADE, {"type": API_TYPE_CODE_TRADE_STATE, "code": code, "sinfo": f"update_trade_data_{round(time.time() * 1000)}"}) return __read_response(client, request_id, blocking) # 获取环境信息 def get_env_info(blocking=True): request_id, client = __request(ClientSocketManager.CLIENT_TYPE_TRADE, {"type": API_TYPE_GET_ENV, "sinfo": f"update_trade_data_{round(time.time() * 1000)}"}) return __read_response(client, request_id, blocking) if __name__ == "__main__": d = {"id": "123123"} print(d.pop("id"))