# -*- coding: utf-8 -*- """ 交易API """ import json import logging import random import threading import time import crypt from huaxin_client import socket_util class ClientSocketManager: # 客户端类型 CLIENT_TYPE_TRADE = "trade" CLIENT_TYPE_DELEGATE_LIST = "delegate_list" CLIENT_TYPE_DEAL_LIST = "deal_list" CLIENT_TYPE_POSITION_LIST = "position_list" CLIENT_TYPE_MONEY = "money" CLIENT_TYPE_DEAL = "deal" CLIENT_TYPE_CMD_L2 = "l2_cmd" socket_client_dict = {} socket_client_lock_dict = {} active_client_dict = {} @classmethod def add_client(cls, _type, rid, sk): if _type == cls.CLIENT_TYPE_TRADE: # 交易列表 if _type not in cls.socket_client_dict: cls.socket_client_dict[_type] = [] cls.socket_client_dict[_type].append((rid, sk)) cls.socket_client_lock_dict[rid] = threading.Lock() else: cls.socket_client_dict[_type] = (rid, sk) cls.socket_client_lock_dict[rid] = threading.Lock() @classmethod def acquire_client(cls, _type): if _type == cls.CLIENT_TYPE_TRADE: if _type in cls.socket_client_dict: # 根据排序活跃时间排序 client_list = sorted(cls.socket_client_dict[_type], key=lambda x: cls.active_client_dict.get(x[0]) if x[ 0] in cls.active_client_dict else 0, reverse=True) for d in client_list: if d[0] in cls.socket_client_lock_dict: try: if cls.socket_client_lock_dict[d[0]].acquire(blocking=False): return d except threading.TimeoutError: pass else: if _type in cls.socket_client_dict: try: d = cls.socket_client_dict[_type] if d[0] in cls.socket_client_lock_dict: if cls.socket_client_lock_dict[d[0]].acquire(blocking=False): return d except threading.TimeoutError: pass return None @classmethod def release_client(cls, rid): if rid in cls.socket_client_lock_dict: # 释放锁 cls.socket_client_lock_dict[rid].release() @classmethod def del_client(cls, rid): # 删除线程锁 if rid in cls.socket_client_lock_dict: cls.socket_client_lock_dict.pop(rid) # 删除sk for t in cls.socket_client_dict: if type(cls.socket_client_dict[t]) == list: for d in cls.socket_client_dict[t]: if d[0] == rid: cls.socket_client_dict[t].remove(d) break elif type(cls.socket_client_dict[t]) == tuple: if cls.socket_client_dict[t][0] == rid: cls.socket_client_dict.pop(t) break # 心跳信息 @classmethod def heart(cls, rid): cls.active_client_dict[rid] = time.time() TRADE_DIRECTION_BUY = 1 TRADE_DIRECTION_SELL = 2 # 超时时间2s TIMEOUT = 2.0 # 等待响应的request_id __request_response_dict = {} def __get_request_id(type): return f"{type}_{round(time.time() * 10000)}_{random.randint(0, 100000)}" # 网络请求 def __request(_type, data): client = ClientSocketManager.acquire_client(_type) if not client: raise Exception("无可用的交易client") try: request_id = __get_request_id(_type) root_data = {"type": "cmd", "data": data, "request_id": request_id} str_list = [] for k in root_data: if type(root_data[k]) == dict: str_list.append(f"{k}={json.dumps(root_data[k])}") else: str_list.append(f"{k}={root_data[k]}") str_list.sort() str_list.append("%Yeshi2014@#.") root_data["sign"] = crypt.md5_encrypt("&".join(str_list)) # print("请求前对象", root_data) # 添加请求头 client[1].sendall(socket_util.load_header(json.dumps(root_data).encode(encoding='utf-8'))) result = client[1].recv(1024) # print("请求发送成功", result.decode(encoding='utf-8')) except BrokenPipeError as e: ClientSocketManager.del_client(client[0]) raise e except Exception as e: logging.exception(e) raise e return request_id, client def __read_response(client, request_id, blocking, timeout=TIMEOUT): if blocking: start_time = time.time() while True: time.sleep(0.01) if request_id in __request_response_dict: # 获取到了响应内容 result = __request_response_dict.pop(request_id) return result if time.time() - start_time > timeout: ClientSocketManager.release_client(client[0]) raise Exception(f"读取内容超时: request_id={request_id}") return None def set_response(client_id, request_id, response): if client_id and request_id: # 主动触发 ClientSocketManager.release_client(client_id) __request_response_dict[request_id] = response else: # 被动触发 pass # 下单委托 # direction 1-买 2-卖 # code:代码 # volume:交易量 # price:价格(如果是卖时不传价格就按照5挡价卖) # blocking是否阻塞进程 def order(direction, code, volume, price, price_type=2, blocking=True): # print("客户端", ClientSocketManager.socket_client_dict) request_id, client = __request(ClientSocketManager.CLIENT_TYPE_TRADE, {"type": ClientSocketManager.CLIENT_TYPE_TRADE, "trade_type": 1, "direction": direction, "code": code, "volume": volume, "price_type": price_type, "price": price, "sinfo": f"ba_{code}_{round(time.time() * 1000)}"}) return __read_response(client, request_id, blocking) def cancel_order(direction, code, orderSysID, blocking=False): request_id, client = __request(ClientSocketManager.CLIENT_TYPE_TRADE, {"type": ClientSocketManager.CLIENT_TYPE_TRADE, "trade_type": 2, "direction": direction, "code": code, "orderSysID": orderSysID, "sinfo": f"cb_{code}_{round(time.time() * 1000)}"}) return __read_response(client, request_id, blocking) # CLIENT_TYPE_DELEGATE_LIST = "delegate_list" # CLIENT_TYPE_DEAL_LIST = "deal_list" # CLIENT_TYPE_POSITION_LIST = "position_list" # CLIENT_TYPE_MONEY = "money" # CLIENT_TYPE_DEAL = "deal" # 获取委托列表 # can_cancel:是否可以撤 def get_delegate_list(can_cancel=True, blocking=True): request_id, client = __request(ClientSocketManager.CLIENT_TYPE_DELEGATE_LIST, {"type": ClientSocketManager.CLIENT_TYPE_DELEGATE_LIST, "can_cancel": 1 if can_cancel else 0}) return __read_response(client, request_id, blocking) # 获取成交列表 def get_deal_list(blocking=True): request_id, client = __request(ClientSocketManager.CLIENT_TYPE_DEAL_LIST, {"type": ClientSocketManager.CLIENT_TYPE_DEAL_LIST}) return __read_response(client, request_id, blocking) # 获取持仓列表 def get_position_list(blocking=True): request_id, client = __request(ClientSocketManager.CLIENT_TYPE_POSITION_LIST, {"type": ClientSocketManager.CLIENT_TYPE_POSITION_LIST}) return __read_response(client, request_id, blocking) # 获取账户资金状况 def get_money(blocking=True): request_id, client = __request(ClientSocketManager.CLIENT_TYPE_MONEY, {"type": ClientSocketManager.CLIENT_TYPE_MONEY}) return __read_response(client, request_id, blocking) # 设置L2订阅数据 def set_l2_codes_data(codes_data, blocking=True): request_id, client = __request(ClientSocketManager.CLIENT_TYPE_CMD_L2, {"type": ClientSocketManager.CLIENT_TYPE_CMD_L2, "data": codes_data}) return __read_response(client, request_id, blocking) if __name__ == "__main__": d = {"id": "123123"} print(d.pop("id"))