""" 交易API """ import json import logging import random import time from socket_manager import ClientSocketManager from utils import socket_util TRADE_DIRECTION_BUY = 1 TRADE_DIRECTION_SELL = 2 # 数据操作 OPERRATE_SET = 1 # 设置 OPERRATE_DELETE = 2 # 删除 OPERRATE_GET = 3 # 获取 OPERRATE_ADD = 4 # 添加 # 代码名单类型 CODE_LIST_WHITE = "white" CODE_LIST_BLACK = "black" CODE_LIST_WANT = "want" CODE_LIST_PAUSE_BUY = "pause_buy" CODE_LIST_MUST_BUY = "must_buy" # 类型 API_TYPE_TRADE = "trade" # 交易 API_TYPE_TRADE_STATE = "trade_state" # 交易状态 API_TYPE_TRADE_MODE = "trade_mode" # 交易模式 API_TYPE_SELL_RULE = "sell_rule" # 卖出规则 API_TYPE_CODE_LIST = "code_list" # 代码名单 API_TYPE_EXPORT_L2 = "export_l2" # 导出L2数据 API_TYPE_INIT = "init" # 初始化 API_TYPE_REFRESH_TRADE_DATA = "refresh_trade_data" # 交易数据刷新 API_TYPE_CODE_ATRRIBUTE = "code_attribute" # 代码属性 API_TYPE_CODE_TRADE_STATE = "code_trade_state" # 代码交易状态 API_TYPE_GET_ENV = "get_env" # 获取环境信息 API_TYPE_SYNC_L1_TARGET_CODES = "sync_l1_subscript_codes" # 同步L1需要订阅的代码 API_TYPE_SYSTEM_LOG = "system_log" # 系统日志 API_TYPE_GET_FROM_DATA_SERVER = "get_from_data_server" # 从数据服务器拉取数据 API_TYPE_CODE_TRADE_INFO = "code_trade_info" # 代码交易信息 API_TYPE_CODE_L2_LISTEN_ACTIVE_COUNT = "l2_listen_active_count" # L2有效监听数量 API_TYPE_SAVE_RUNNING_DATA = "save_running_data" # 保存运行时数据 API_TYPE_GET_CODE_POSITION_INFO = "get_code_position_info" # 获取代码持仓信息 API_TYPE_COMMON_REQUEST = "common_request" # 通用请求 # 超时时间2s TIMEOUT = 5.0 # 等待响应的request_id __request_response_dict = {} def __get_request_id(type): return f"r_{type}_{round(time.time() * 10000)}_{random.randint(0, 100000)}" # 网络请求 def __request(client_type, data): client = ClientSocketManager.acquire_client(client_type) if not client: raise Exception("无可用的交易client") request_id = __get_request_id(data["type"]) try: root_data = {"type": "cmd", "data": data, "request_id": request_id} root_data = socket_util.encryp_client_params_sign(root_data) # print("请求前对象", root_data) # 添加请求头 client[1].sendall(socket_util.load_header(json.dumps(root_data).encode(encoding='utf-8'))) result = client[1].recv(1024) # print("请求发送成功", result.decode(encoding='utf-8')) except BrokenPipeError as e: ClientSocketManager.del_client(client[0]) raise e except Exception as e: logging.exception(e) raise e return request_id, client def __read_response(client, request_id, blocking, timeout=TIMEOUT): if blocking: start_time = time.time() try: while True: time.sleep(0.005) if request_id in __request_response_dict: # 获取到了响应内容 result = __request_response_dict.pop(request_id) return result if time.time() - start_time > timeout: # 读取内容超时才会释放 ClientSocketManager.release_client(client[0]) raise Exception(f"读取内容超时: request_id={request_id}") finally: pass return None def set_response(client_id, request_id, response): if client_id and request_id: # 主动触发 __request_response_dict[request_id] = response ClientSocketManager.release_client(client_id) else: # 被动触发 pass def parseResponse(data_str): if not data_str: raise Exception("反馈内容为空") res = data_str if type(res) == str: res = json.loads(data_str) res = res['data'] if res['code'] != 0: raise Exception(res['msg']) return res['data'] # 下单委托 # direction 1-买 2-卖 # code:代码 # volume:交易量 # price:价格(如果是卖时不传价格就按照5挡价卖) # blocking是否阻塞进程 def trade_order(direction, code, volume, price, price_type=2, blocking=True): print("客户端", ClientSocketManager.socket_client_dict) request_id, client = __request( ClientSocketManager.CLIENT_TYPE_TRADE if direction == 1 else ClientSocketManager.CLIENT_TYPE_TRADE_SELL, {"type": API_TYPE_TRADE, "trade_type": 1, "direction": direction, "code": code, "volume": volume, "price_type": price_type, "price": price, "sinfo": f"order_{code}_{round(time.time() * 1000)}"}) return __read_response(client, request_id, blocking) # 取消下单 def trade_cancel_order(direction, code, accountID, orderSysID, blocking=True): request_id, client = __request(ClientSocketManager.CLIENT_TYPE_TRADE, {"type": API_TYPE_TRADE, "trade_type": 2, "direction": direction, "code": code, "accountID": accountID, "orderSysID": orderSysID, "sinfo": f"cb_{code}_{round(time.time() * 1000)}"}) return __read_response(client, request_id, blocking) # 设置交易状态 def set_trade_state(state, blocking=True): request_id, client = __request(ClientSocketManager.CLIENT_TYPE_COMMON, {"type": API_TYPE_TRADE_STATE, "operate": OPERRATE_SET, "state": state, "sinfo": f"cb_{API_TYPE_TRADE_STATE}_{round(time.time() * 1000)}"}) return __read_response(client, request_id, blocking) # 获取交易状态 def get_trade_state(blocking=True): request_id, client = __request(ClientSocketManager.CLIENT_TYPE_COMMON, {"type": API_TYPE_TRADE_STATE, "operate": OPERRATE_GET, "sinfo": f"cb_{API_TYPE_TRADE_STATE}_{round(time.time() * 1000)}"}) return __read_response(client, request_id, blocking) # 设置交易模式 def set_trade_mode(mode, blocking=True): request_id, client = __request(ClientSocketManager.CLIENT_TYPE_COMMON, {"type": API_TYPE_TRADE_MODE, "operate": OPERRATE_SET, "mode": mode, "sinfo": f"cb_{API_TYPE_TRADE_MODE}_{round(time.time() * 1000)}"}) return __read_response(client, request_id, blocking) # 获取交易模式 def get_trade_mode(blocking=True): request_id, client = __request(ClientSocketManager.CLIENT_TYPE_COMMON, {"type": API_TYPE_TRADE_MODE, "operate": OPERRATE_GET, "sinfo": f"cb_{API_TYPE_TRADE_MODE}_{round(time.time() * 1000)}"}) return __read_response(client, request_id, blocking) # -----代码名单操作---- def add_code_list(code, code_list_type, blocking=True): request_id, client = __request(ClientSocketManager.CLIENT_TYPE_COMMON, {"type": API_TYPE_CODE_LIST, "code_list_type": code_list_type, "code": code, "operate": OPERRATE_SET, "sinfo": f"cb_{API_TYPE_CODE_LIST}_{round(time.time() * 1000)}"}) return __read_response(client, request_id, blocking) def remove_code_list(code, code_list_type, blocking=True): request_id, client = __request(ClientSocketManager.CLIENT_TYPE_COMMON, {"type": API_TYPE_CODE_LIST, "code_list_type": code_list_type, "code": code, "operate": OPERRATE_DELETE, "sinfo": f"cb_{API_TYPE_CODE_LIST}_{round(time.time() * 1000)}"}) return __read_response(client, request_id, blocking) def get_code_list(code_list_type, blocking=True): request_id, client = __request(ClientSocketManager.CLIENT_TYPE_COMMON, {"type": API_TYPE_CODE_LIST, "code_list_type": code_list_type, "operate": OPERRATE_GET, "sinfo": f"cb_{API_TYPE_CODE_LIST}_{round(time.time() * 1000)}"}) return __read_response(client, request_id, blocking) # -----导出L2数据---- def export_l2_data(code, blocking=True): request_id, client = __request(ClientSocketManager.CLIENT_TYPE_COMMON, {"type": API_TYPE_EXPORT_L2, "code": code, "sinfo": f"cb_{API_TYPE_EXPORT_L2}_{round(time.time() * 1000)}"}) return __read_response(client, request_id, blocking) # -----每日初始化---- def everyday_init(blocking=True): request_id, client = __request(ClientSocketManager.CLIENT_TYPE_COMMON, {"type": API_TYPE_INIT, "sinfo": f"cb_{API_TYPE_INIT}_{round(time.time() * 1000)}"}) return __read_response(client, request_id, blocking) # 刷新交易数据 def refresh_trade_data(type, blocking=True): request_id, client = __request(ClientSocketManager.CLIENT_TYPE_COMMON, {"type": API_TYPE_REFRESH_TRADE_DATA, "ctype": type, "sinfo": f"cb_{API_TYPE_REFRESH_TRADE_DATA}_{round(time.time() * 1000)}"}) return __read_response(client, request_id, blocking) # 获取代码属性 def get_code_attribute(code, blocking=True): request_id, client = __request(ClientSocketManager.CLIENT_TYPE_COMMON, {"type": API_TYPE_CODE_ATRRIBUTE, "code": code, "sinfo": f"cb_{API_TYPE_CODE_ATRRIBUTE}_{round(time.time() * 1000)}"}) return __read_response(client, request_id, blocking) # 获取代码交易状态 def get_code_trade_state(code, blocking=True): request_id, client = __request(ClientSocketManager.CLIENT_TYPE_COMMON, {"type": API_TYPE_CODE_TRADE_STATE, "code": code, "sinfo": f"{API_TYPE_CODE_TRADE_STATE}_{round(time.time() * 1000)}"}) return __read_response(client, request_id, blocking) # 获取环境信息 def get_env_info(blocking=True): request_id, client = __request(ClientSocketManager.CLIENT_TYPE_COMMON, {"type": API_TYPE_GET_ENV, "sinfo": f"cb_{API_TYPE_GET_ENV}_{round(time.time() * 1000)}"}) return __read_response(client, request_id, blocking) # 获取环境信息 def sync_l1_subscript_codes(blocking=True): request_id, client = __request(ClientSocketManager.CLIENT_TYPE_COMMON, {"type": API_TYPE_SYNC_L1_TARGET_CODES, "sinfo": f"cb_{API_TYPE_SYNC_L1_TARGET_CODES}_{round(time.time() * 1000)}"}) return __read_response(client, request_id, blocking) # 获取系统日志 def get_system_logs(start_index, count, blocking=True): request_id, client = __request(ClientSocketManager.CLIENT_TYPE_COMMON, {"type": API_TYPE_SYSTEM_LOG, "start_index": start_index, "count": count, "sinfo": f"cb_{API_TYPE_SYSTEM_LOG}_{round(time.time() * 1000)}"}) return __read_response(client, request_id, blocking) # 拉取data_server的内容 def get_from_data_server(path, params, blocking=True): request_id, client = __request(ClientSocketManager.CLIENT_TYPE_COMMON, {"type": API_TYPE_GET_FROM_DATA_SERVER, "path": path, "params": params, "sinfo": f"cb_{API_TYPE_GET_FROM_DATA_SERVER}_{round(time.time() * 1000)}"}) return __read_response(client, request_id, blocking, timeout=30) # 获取代码的交易信息 def get_code_trade_info(code, blocking=True): request_id, client = __request(ClientSocketManager.CLIENT_TYPE_COMMON, {"type": API_TYPE_CODE_TRADE_INFO, "code": code, "sinfo": f"cb_{API_TYPE_CODE_TRADE_INFO}_{round(time.time() * 1000)}"}) return __read_response(client, request_id, blocking, timeout=30) # L2有效监听数量 def get_l2_listen_active_count(blocking=True): request_id, client = __request(ClientSocketManager.CLIENT_TYPE_COMMON, {"type": API_TYPE_CODE_L2_LISTEN_ACTIVE_COUNT, "sinfo": f"cb_{API_TYPE_CODE_L2_LISTEN_ACTIVE_COUNT}_{round(time.time() * 1000)}"}) return __read_response(client, request_id, blocking, timeout=30) # 保存正在运行的数据 def save_running_data(blocking=True): request_id, client = __request(ClientSocketManager.CLIENT_TYPE_COMMON, {"type": API_TYPE_SAVE_RUNNING_DATA, "sinfo": f"cb_{API_TYPE_SAVE_RUNNING_DATA}_{round(time.time() * 1000)}"}) return __read_response(client, request_id, blocking, timeout=30) # 保存正在运行的数据 def sell_rule(operate, data={}, blocking=True): request_id, client = __request(ClientSocketManager.CLIENT_TYPE_TRADE_SELL, {"type": API_TYPE_SELL_RULE, "operate": operate, "data": data, "sinfo": f"cb_{API_TYPE_SELL_RULE}_{round(time.time() * 1000)}"}) return __read_response(client, request_id, blocking, timeout=10) # 获取代码持仓信息 def get_code_position_info(code, blocking=True): request_id, client = __request(ClientSocketManager.CLIENT_TYPE_TRADE_SELL, {"type": API_TYPE_GET_CODE_POSITION_INFO, "code": code, "sinfo": f"cb_{API_TYPE_GET_CODE_POSITION_INFO}_{round(time.time() * 1000)}"}) return __read_response(client, request_id, blocking) def common_request(params={}, blocking=True): data = {"type": API_TYPE_COMMON_REQUEST, "sinfo": f"cb_{API_TYPE_COMMON_REQUEST}_{round(time.time() * 1000)}"} if params: for k in params: data[k] = params[k] request_id, client = __request(ClientSocketManager.CLIENT_TYPE_COMMON, data) return __read_response(client, request_id, blocking, timeout=10) if __name__ == "__main__": d = {"id": "123123"} print(d.pop("id"))