""" 交易API """ import copy import json import logging import multiprocessing import queue import random import threading import time import concurrent.futures import zmq from code_attribute import gpcode_manager from huaxin_client import constant as huaxin_client_constant, trade_client from log_module import async_log_util from log_module.log import hx_logger_trade_debug, hx_logger_trade_loop, hx_logger_trade_callback, \ logger_system from trade.huaxin import huaxin_trade_data_update, huaxin_trade_record_manager from trade.huaxin.huaxin_trade_record_manager import TradeOrderIdManager from trade.huaxin.huaxin_trade_order_processor import CancelOrderManager, HuaxinOrderEntity, TradeResultProcessor from utils import socket_util, huaxin_util, tool __response_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=15) __save_data_queue = queue.Queue(maxsize=1000) def __run_save_data(): while True: try: data = __save_data_queue.get() huaxin_trade_record_manager.DelegateRecordManager.add_one(data) except: pass finally: time.sleep(0.1) def __run_recv_queue_trade(queue: multiprocessing.Queue): def __cancel_order(code, order_ref): # 2s没成交就撤单 time.sleep(2) order_: HuaxinOrderEntity = TradeResultProcessor.get_huaxin_order_by_order_ref(order_ref) if order_ is not None: if huaxin_util.is_can_cancel(order_.orderStatus): cancel_order(TRADE_DIRECTION_SELL, code, order_.orderSysID) def __process_order(data): # 更新委托队列 push_msg_manager.push_delegate_queue_update() code = data["securityID"] accountID = data["accountID"] orderStatus = data["orderStatus"] orderRef = data["orderRef"] orderSysID = data["orderSysID"] insertTime = data.get("insertTime") acceptTime = data.get("acceptTime") insertDate = data.get("insertDate") direction = data.get("direction") limitPrice = data.get("limitPrice") volume = data.get("volume") is_shadow_order = False # 获取涨停价 limit_up_price = gpcode_manager.get_limit_up_price(code) if limit_up_price and volume == huaxin_client_constant.SHADOW_ORDER_VOLUME: if abs(float(limitPrice) - float(limit_up_price)) >= 0.001: is_shadow_order = True order = HuaxinOrderEntity(code, orderStatus, orderRef, accountID, orderSysID, insertTime=insertTime, acceptTime=acceptTime, insertDate=insertDate, direction=direction, is_shadow_order=is_shadow_order) try: if str(order.direction) == str(huaxin_util.TORA_TSTP_D_Sell): # 刷新持仓列表 huaxin_trade_data_update.add_position_list() if huaxin_util.is_deal(order.orderStatus): # 如果成交了需要刷新委托列表 huaxin_trade_data_update.add_delegate_list("卖成交") else: if huaxin_util.is_deal(order.orderStatus): # 如果成交了需要刷新委托列表 huaxin_trade_data_update.add_delegate_list("买成交") need_cancel = TradeResultProcessor.process_buy_order(order) # if need_cancel: # # 需要撤买单 # threading.Thread(target=lambda: cancel_order(TRADE_DIRECTION_SELL, order.code, order.orderSysID), # daemon=True).start() need_watch_cancel = TradeResultProcessor.process_sell_order(order) # if need_watch_cancel: # # 需要撤卖单 # threading.Thread(target=lambda: __cancel_order(order.code, order.orderRef), daemon=True).start() finally: try: # 加入2次,增大加入成功率 __save_data_queue.put_nowait(data) except Exception as e: hx_logger_trade_debug.exception(e) if not is_shadow_order: # 订单相关回调 # 重新请求委托列表与资金 # huaxin_trade_data_update.add_delegate_list("来自交易管道") huaxin_trade_data_update.add_deal_list() huaxin_trade_data_update.add_money_list() # 设置结果 def __set_response(data_json): if 'request_id' not in data_json: return # 处理数据 async_log_util.info(hx_logger_trade_callback, f"response:request_id-{data_json['request_id']}") # 设置响应内容 set_response(data_json["request_id"], data_json['data']) logger_system.info(f"huaxin_trade_api __run_recv_pipe_trade 线程ID:{tool.get_thread_id()}") if queue is not None: while True: try: val = queue.get() if val: data_json = json.loads(val) # 处理数据 type_ = data_json["type"] if type_ == "response": # 主动触发的响应 request_id = data_json['request_id'] async_log_util.info(hx_logger_trade_callback, f"response:request_id-{request_id}") __response_thread_pool.submit(__set_response, data_json) if type(data_json.get("data")) == dict: data = data_json["data"].get("data") if type(data) == dict and "orderRef" in data: __response_thread_pool.submit(__process_order, data) elif type_ == "trade_callback": try: # 交易回调 data_json = data_json["data"] ctype = data_json["type"] # 记录交易反馈日志 async_log_util.info(hx_logger_trade_callback, f"{data_json}") if ctype == 0: data = data_json.get("data") # 获取订单状态 __process_order(data) finally: pass except: pass def __create_trade_ipc_context(trade_ipc_addr): """ 创建IPC发送端口 @param trade_ipc_addr:(下单地址,撤单地址) @return: """ context = zmq.Context() global order_socket, cancel_order_socket order_socket = context.socket(zmq.REQ) order_socket.connect(trade_ipc_addr[0]) cancel_order_socket = context.socket(zmq.REQ) cancel_order_socket.connect(trade_ipc_addr[1]) # while True: # try: # # datas = ('000990', 7.52, 400, 93000030, 2012, 380422, 380421, 375477, '1') * 150 # # L2SharedMemoryDataUtil.set_data(datas, shared_memory) # socket.send_json({'data': [], "time": time.time()}) # response = socket.recv_string() # except Exception as e: # logging.exception(e) # 下单ZMQ通信锁 __order_zmq_lock = threading.Lock() def __order_by_zmq(data_json): """ 通过zmq发送下单信息 @param data_json: @return: """ with __order_zmq_lock: order_socket.send_json(data_json) response = order_socket.recv_string() def __cancel_order_by_zmq(data_json): cancel_order_socket.send_json(data_json) response = cancel_order_socket.recv_string() def __test_order(): time.sleep(60) for i in range(20): time.sleep(30) order_ref = huaxin_util.create_order_ref() order(1, "000333", 100, 1.00, price_type=2, blocking=False, order_ref=order_ref, shadow_price=0.99) time.sleep(30) cancel_order(1, "000333", '123123', orderRef=order_ref, blocking=False) queue_strategy_w_trade_r_for_read = None queue_strategy_w_trade_r = None def run(): global queue_strategy_w_trade_r_for_read, queue_strategy_w_trade_r queue_strategy_r_trade_w, queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_read = multiprocessing.Queue(), multiprocessing.Queue(), multiprocessing.Queue() order_ipc_addr, cancel_order_ipc_addr = "ipc://trade_order_ls.ipc", "ipc://trade_order_ls_cancel.ipc" trade_process = multiprocessing.Process( target=trade_client.run, args=(order_ipc_addr, cancel_order_ipc_addr, queue_strategy_r_trade_w, queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_read)) trade_process.start() t1 = threading.Thread(target=lambda: __run_recv_queue_trade(queue_strategy_r_trade_w), daemon=True) t1.start() t1 = threading.Thread(target=lambda: __run_save_data(), daemon=True) t1.start() t1 = threading.Thread(target=lambda: CancelOrderManager().run(cancel_order), daemon=True) t1.start() # 创建IPC发送端口 __create_trade_ipc_context((order_ipc_addr, cancel_order_ipc_addr)) # 交易通道的错误次数 trade_pipe_channel_error_count = 0 # pipe的交易通道是否正常 def is_pipe_channel_normal(): return True # 测试交易通道 def test_trade_channel(): global trade_pipe_channel_error_count sid = random.randint(0, 1000000) result = __test_trade_channel(sid) if result["code"] == 0 and result["data"]["data"]["sid"] == sid: trade_pipe_channel_error_count = 0 return True trade_pipe_channel_error_count += 1 if trade_pipe_channel_error_count > 100: trade_pipe_channel_error_count = 100 return False class ClientSocketManager: # 客户端类型 CLIENT_TYPE_TRADE = "trade" CLIENT_TYPE_DELEGATE_LIST = "delegate_list" CLIENT_TYPE_DEAL_LIST = "deal_list" CLIENT_TYPE_POSITION_LIST = "position_list" CLIENT_TYPE_MONEY = "money" CLIENT_TYPE_DEAL = "deal" CLIENT_TYPE_CMD_L2 = "l2_cmd" socket_client_dict = {} socket_client_lock_dict = {} active_client_dict = {} @classmethod def list_client(cls, _type): if _type == cls.CLIENT_TYPE_TRADE: if _type in cls.socket_client_dict: return cls.socket_client_dict.get(_type) else: if _type in cls.socket_client_dict: return [cls.socket_client_dict.get(_type)] return [] @classmethod def add_client(cls, _type, rid, sk): if _type == cls.CLIENT_TYPE_TRADE: # 交易列表 if _type not in cls.socket_client_dict: cls.socket_client_dict[_type] = [] cls.socket_client_dict[_type].append((rid, sk)) cls.socket_client_lock_dict[rid] = threading.Lock() hx_logger_trade_debug.info(f"add_client:{rid}") else: cls.socket_client_dict[_type] = (rid, sk) cls.socket_client_lock_dict[rid] = threading.Lock() # 是否已经被锁住 @classmethod def is_client_locked(cls, rid): if rid in cls.socket_client_lock_dict: return cls.socket_client_lock_dict[rid].locked() return None @classmethod def acquire_client(cls, _type): if _type == cls.CLIENT_TYPE_TRADE: if _type in cls.socket_client_dict: # 根据排序活跃时间排序 client_list = sorted(cls.socket_client_dict[_type], key=lambda x: cls.active_client_dict.get(x[0]) if x[ 0] in cls.active_client_dict else 0, reverse=True) hx_logger_trade_debug.info(f"acquire_client client_list数量:{len(client_list)}") hx_logger_trade_debug.info( f"acquire_client socket_client_lock_dict数量:{len(cls.socket_client_lock_dict.keys())}") for d in client_list: if d[0] in cls.socket_client_lock_dict: try: if cls.socket_client_lock_dict[d[0]].acquire(blocking=False): hx_logger_trade_debug.info(f"acquire_client success:{d[0]}") return d except threading.TimeoutError: hx_logger_trade_debug.error("acquire_client TimeoutError") else: if _type in cls.socket_client_dict: try: d = cls.socket_client_dict[_type] if d[0] in cls.socket_client_lock_dict: if cls.socket_client_lock_dict[d[0]].acquire(blocking=False): return d except threading.TimeoutError: pass return None @classmethod def release_client(cls, client_id): sucess = False if client_id in cls.socket_client_lock_dict: sucess = True # 释放锁 if cls.socket_client_lock_dict[client_id].locked(): cls.socket_client_lock_dict[client_id].release() if sucess: hx_logger_trade_debug.info(f"release_client success:{client_id}") else: hx_logger_trade_debug.info(f"release_client fail:{client_id}") @classmethod def del_client(cls, rid): # 删除线程锁 if rid in cls.socket_client_lock_dict: cls.socket_client_lock_dict.pop(rid) # 删除sk for t in cls.socket_client_dict: if type(cls.socket_client_dict[t]) == list: for d in cls.socket_client_dict[t]: if d[0] == rid: try: # 关闭socket d[1].close() except: pass cls.socket_client_dict[t].remove(d) break elif type(cls.socket_client_dict[t]) == tuple: if cls.socket_client_dict[t][0] == rid: try: # 关闭socket cls.socket_client_dict[t][1].close() except: pass cls.socket_client_dict.pop(t) break # 心跳信息 @classmethod def heart(cls, rid): cls.active_client_dict[rid] = time.time() @classmethod def del_invalid_clients(cls): # 清除长时间无心跳的客户端通道 for k in cls.active_client_dict.keys(): if time.time() - cls.active_client_dict[k] > 20: # 心跳时间间隔20s以上视为无效 cls.del_client(k) TRADE_DIRECTION_BUY = 1 TRADE_DIRECTION_SELL = 2 # 超时时间2s TIMEOUT = 2.0 # 等待响应的request_id __request_response_dict = {} def __get_request_id(type): return f"r_{type}_{round(time.time() * 10000)}_{random.randint(0, 100000)}" # 网络请求 def __request(_type, data, request_id=None, log_enable=True, is_trade=False): """ 请求,将交易(包含下单/撤单)与查询(包含查持仓/账户可用金额/委托列表/成交列表)队列分离 @param _type: @param data: @param request_id: @param log_enable: @param is_trade: @return: """ if not request_id: request_id = __get_request_id(_type) try: if log_enable: async_log_util.info(hx_logger_trade_loop, "请求发送开始:client_id-{} request_id-{}", 0, request_id) root_data = {"type": _type, "data": data, "request_id": request_id, "time": time.time() } root_data = socket_util.encryp_client_params_sign(root_data) start_time = time.time() if is_trade: # queue_strategy_w_trade_r.put_nowait(root_data) # 采用zmq通信 if data['trade_type'] == 1: __order_by_zmq(root_data) elif data['trade_type'] == 2: __cancel_order_by_zmq(root_data) else: queue_strategy_w_trade_r.put_nowait(root_data) else: queue_strategy_w_trade_r_for_read.put_nowait(root_data) use_time = int((time.time() - start_time) * 1000) if use_time > 10: async_log_util.info(hx_logger_trade_loop, f"发送耗时:request_id-{request_id} 耗时时间:{use_time}") if log_enable: async_log_util.info(hx_logger_trade_loop, "请求发送成功:request_id-{}", request_id) except BrokenPipeError as e: async_log_util.info(hx_logger_trade_loop, "请求发送异常:request_id-{} error-{}", request_id, str(e)) raise e except Exception as e: async_log_util.info(hx_logger_trade_loop, "请求发送异常: request_id-{} error-{}", request_id, str(e)) logging.exception(e) raise e return request_id def __read_response(request_id, blocking, timeout=TIMEOUT, log_enable=True): if blocking: start_time = time.time() try: while True: time.sleep(0.005) if request_id in __request_response_dict: # 获取到了响应内容 result = __request_response_dict.pop(request_id) if log_enable: async_log_util.info(hx_logger_trade_loop, "请求读取成功: request_id-{}", request_id) return result if time.time() - start_time > timeout: if log_enable: async_log_util.info(hx_logger_trade_loop, "请求读取超时: request_id-{}", request_id) # 读取内容超时才会释放 raise Exception(f"读取内容超时: request_id={request_id}") finally: pass return None __TradeOrderIdManager = TradeOrderIdManager() def set_response(request_id, response): if request_id: async_log_util.info(hx_logger_trade_loop, f"请求响应: request_id-{request_id} 内容-{response}") # 主动触发 __request_response_dict[request_id] = response else: # 被动触发 pass def order(direction, code, volume, price, price_type=2, blocking=False, sinfo=None, request_id=None, order_ref=None, shadow_price=None, shadow_volume=100): """ 下单委托 @param shadow_volume: 影子单的量 @param direction: 1-买 2-卖 @param code: @param volume:交易量 @param price:价格(如果是卖时不传价格就按照5挡价卖) @param price_type: @param blocking:是否阻塞进程 @param sinfo: @param request_id: @param order_ref: @param shadow_price: @return: """ timestamp = round(time.time() * 1000) if not sinfo: sinfo = f"ba_{code}_{timestamp}" if not order_ref: order_ref = huaxin_util.create_order_ref() if not request_id: request_id = __get_request_id(ClientSocketManager.CLIENT_TYPE_TRADE) for i in range(1): cancel_shadow = True if int(tool.get_now_time_str().replace(":", "")) < int("091500"): # 预埋单不能撤影子单 cancel_shadow = False request_id = __request(ClientSocketManager.CLIENT_TYPE_TRADE, {"type": ClientSocketManager.CLIENT_TYPE_TRADE, "trade_type": 1, "direction": direction, "code": code, "order_ref": order_ref, "volume": volume, "price_type": price_type, "price": price, "shadow_price": shadow_price, "shadow_volume": shadow_volume, "sinfo": sinfo, "blocking": blocking, "cancel_shadow": cancel_shadow}, request_id=request_id, is_trade=True) try: if blocking: return __read_response(request_id, blocking) else: return {"order_ref": order_ref} finally: # huaxin_trade_data_update.add_delegate_list("下单", delay=0.2) huaxin_trade_data_update.add_money_list() def order_new(direction, code, order_info_list, price_type=2, blocking=False, sinfo=None, request_id=None): """ 下单委托 @param direction: @param code: @param order_info_list: 下单信息:[(量,价, order_ref),(量,价, order_ref)] @param price_type: @param blocking: 是否阻塞进程 @param sinfo: @param request_id: @return: """ timestamp = round(time.time() * 1000) if not sinfo: sinfo = f"ba_{code}_{timestamp}" if not request_id: request_id = __get_request_id(ClientSocketManager.CLIENT_TYPE_TRADE) for i in range(1): request_id = __request(ClientSocketManager.CLIENT_TYPE_TRADE, {"type": ClientSocketManager.CLIENT_TYPE_TRADE, "trade_type": 1, "direction": direction, "code": code, "order_info_list": order_info_list, "price_type": price_type, "sinfo": sinfo, "blocking": blocking, "cancel_shadow": False}, request_id=request_id, is_trade=True) try: if blocking: return __read_response(request_id, blocking) else: return {"order_ref_list": [x[2] for x in order_info_list]} finally: # huaxin_trade_data_update.add_delegate_list("下单", delay=0.2) huaxin_trade_data_update.add_money_list() __canceling_order_dict = {} def cancel_order(direction, code, orderSysID, orderRef=None, blocking=False, sinfo=None, request_id=None, recancel=False): """ 测单 @param direction: 1-买 2-卖 @param code: @param orderSysID: @param orderRef: @param blocking: @param sinfo: @param request_id: @param recancel: @return: """ if tool.trade_time_sub(tool.get_now_time_str(), "14:57:00") >= 0 and tool.trade_time_sub(tool.get_now_time_str(), "15:00:01") <= 0: # 集合竞价不撤单 return if not recancel: CancelOrderManager().start_cancel(code, orderRef, orderSysID) if not sinfo: sinfo = f"cb_{code}_{round(time.time() * 1000)}_{random.randint(0, 10000)}" order_action_ref = huaxin_util.create_order_ref() if not request_id: request_id = __get_request_id(ClientSocketManager.CLIENT_TYPE_TRADE) # 加入撤单记录,用于校验最后的撤单是否成功 if code not in __canceling_order_dict: __canceling_order_dict[code] = set() __canceling_order_dict[code].add(json.dumps((orderRef, orderSysID))) # 执行2次撤单,防止没有撤到 for i in range(2): request_id = __request(ClientSocketManager.CLIENT_TYPE_TRADE, {"type": ClientSocketManager.CLIENT_TYPE_TRADE, "trade_type": 2, "direction": direction, "code": code, "orderRef": orderRef, "orderActionRef": order_action_ref, "orderSysID": orderSysID, "sinfo": sinfo}, request_id=request_id, is_trade=True) try: return __read_response(request_id, blocking) finally: huaxin_trade_data_update.add_money_list() def batch_cancel_order(direction, code, orderInfos: list, blocking=False, request_id=None, recancel=False): """ 测单 @param direction: 1-买 2-卖 @param code: @param orderInfos:[(orderRef, orderSysID)] @param blocking: @param request_id: @param recancel: @return: """ if tool.trade_time_sub(tool.get_now_time_str(), "14:57:00") >= 0 and tool.trade_time_sub(tool.get_now_time_str(), "15:00:01") <= 0: # 集合竞价不撤单 return if not recancel: for orderInfo in orderInfos: CancelOrderManager().start_cancel(code, orderInfo[0], orderInfo[1]) sinfos = [] for i in range(len(orderInfos)): sinfo = f"cb_{code}_{round(time.time() * 1000)}_{random.randint(0, 10000)}_{i}" sinfos.append(sinfo) order_action_refs = [] for i in range(len(orderInfos)): order_action_ref = huaxin_util.create_order_ref() order_action_refs.append(order_action_ref) if not request_id: request_id = __get_request_id(ClientSocketManager.CLIENT_TYPE_TRADE) # 加入撤单记录,用于校验最后的撤单是否成功 if code not in __canceling_order_dict: __canceling_order_dict[code] = set() for orderInfo in orderInfos: __canceling_order_dict[code].add(json.dumps((orderInfo[0], orderInfo[1]))) # 执行2次撤单,防止没有撤到 for i in range(2): request_id = __request(ClientSocketManager.CLIENT_TYPE_TRADE, {"type": ClientSocketManager.CLIENT_TYPE_TRADE, "trade_type": 2, "direction": direction, "code": code, "orderInfos": orderInfos, "orderActionRefs": order_action_refs, "sinfos": sinfos}, request_id=request_id, is_trade=True) try: return __read_response(request_id, blocking) finally: huaxin_trade_data_update.add_money_list() # CLIENT_TYPE_DELEGATE_LIST = "delegate_list" # CLIENT_TYPE_DEAL_LIST = "deal_list" # CLIENT_TYPE_POSITION_LIST = "position_list" # CLIENT_TYPE_MONEY = "money" # CLIENT_TYPE_DEAL = "deal" # 获取委托列表 # can_cancel:是否可以撤 def get_delegate_list(can_cancel=True, blocking=True, timeout=TIMEOUT): request_id = __request(ClientSocketManager.CLIENT_TYPE_DELEGATE_LIST, {"type": ClientSocketManager.CLIENT_TYPE_DELEGATE_LIST, "can_cancel": 1 if can_cancel else 0}) return __read_response(request_id, blocking, timeout=timeout) # 获取成交列表 def get_deal_list(blocking=True, timeout=TIMEOUT): request_id = __request(ClientSocketManager.CLIENT_TYPE_DEAL_LIST, {"type": ClientSocketManager.CLIENT_TYPE_DEAL_LIST}) return __read_response(request_id, blocking, timeout=timeout) # 获取持仓列表 def get_position_list(blocking=True): request_id = __request(ClientSocketManager.CLIENT_TYPE_POSITION_LIST, {"type": ClientSocketManager.CLIENT_TYPE_POSITION_LIST}) return __read_response(request_id, blocking) # 获取账户资金状况 def get_money(blocking=True): request_id = __request(ClientSocketManager.CLIENT_TYPE_MONEY, {"type": ClientSocketManager.CLIENT_TYPE_MONEY}) return __read_response(request_id, blocking) # 设置L2订阅数据 def set_l2_codes_data(codes_data, blocking=True): request_id = __request(ClientSocketManager.CLIENT_TYPE_CMD_L2, {"type": ClientSocketManager.CLIENT_TYPE_CMD_L2, "data": codes_data}) return __read_response(request_id, blocking) # 设置L2订阅数据 def __test_trade_channel(sid): request_id = __request("test", {"type": "test", "data": {"sid": sid}}, log_enable=False) return __read_response(request_id, True, log_enable=False) def parseResponse(data_str): if not data_str: raise Exception("反馈内容为空") res = data_str if type(res) == str: res = json.loads(data_str) res = res['data'] if res['code'] != 0: raise Exception(res['msg']) return res['data'] if __name__ == "__main__": pass