| | |
| | | import logging |
| | | import multiprocessing |
| | | import threading |
| | | import time |
| | | |
| | | import zmq |
| | | |
| | | from huaxin_client import socket_util |
| | | from huaxin_client.client_network import SendResponseSkManager |
| | |
| | | return cls._instance |
| | | |
| | | @classmethod |
| | | def init(cls, trade_action_callback: TradeActionCallback, queue_strategy_trade_read_for_trade: multiprocessing.Queue,queue_strategy_trade_read_for_read: multiprocessing.Queue): |
| | | def init(cls, trade_action_callback: TradeActionCallback, |
| | | queue_strategy_trade_read_for_trade: multiprocessing.Queue, |
| | | queue_strategy_trade_read_for_read: multiprocessing.Queue): |
| | | cls.action_callback = trade_action_callback |
| | | cls.queue_strategy_trade_read = queue_strategy_trade_read_for_trade |
| | | cls.queue_strategy_trade_read_trade_read = queue_strategy_trade_read_for_read |
| | |
| | | except Exception as e: |
| | | async_log_util.exception(logger_local_huaxin_trade_debug, e) |
| | | |
| | | ###############ZEROMQ协议接收命令################# |
| | | @classmethod |
| | | def __create_order_command_reciever(cls, ipc_addr): |
| | | """ |
| | | 接收下单命令 |
| | | @param ipc_addr: ipc地址 |
| | | @return: |
| | | """ |
| | | context = zmq.Context() |
| | | socket = context.socket(zmq.REP) |
| | | socket.bind(ipc_addr) |
| | | while True: |
| | | data = socket.recv_json() |
| | | try: |
| | | request_id = data.get('request_id') |
| | | use_time = time.time() - data.get('time') |
| | | data = data.get('data') |
| | | cls.action_callback.OnTrade(None, request_id, None, 2, data) |
| | | async_log_util.info(logger_local_huaxin_trade_debug, f"下单通信耗时: {use_time}s") |
| | | except Exception as e: |
| | | logger_local_huaxin_trade_debug.exception(e) |
| | | finally: |
| | | socket.send_string("SUCCESS") |
| | | |
| | | @classmethod |
| | | def __create_cancel_order_command_reciever(cls, ipc_addr): |
| | | """ |
| | | 接收撤单命令 |
| | | @param ipc_addr: ipc地址 |
| | | @return: |
| | | """ |
| | | context = zmq.Context() |
| | | socket = context.socket(zmq.REP) |
| | | socket.bind(ipc_addr) |
| | | while True: |
| | | data = socket.recv_json() |
| | | try: |
| | | request_id = data.get('request_id') |
| | | use_time = time.time() - data.get('time') |
| | | data = data.get('data') |
| | | cls.action_callback.OnTrade(None, request_id, None, 2, data) |
| | | async_log_util.info(logger_local_huaxin_trade_debug, f"撤单通信耗时: {use_time}s") |
| | | |
| | | except Exception as e: |
| | | logger_local_huaxin_trade_debug.exception(e) |
| | | finally: |
| | | socket.send_string("SUCCESS") |
| | | |
| | | # 维护连接数的稳定 |
| | | def run(self, blocking=True): |
| | | def run(self, order_ipc_addr, cancel_order_ipc_addr, blocking=True): |
| | | if blocking: |
| | | t1 = threading.Thread(target=lambda: self.run_process_read_command(self.queue_strategy_trade_read_trade_read), daemon=True) |
| | | t1 = threading.Thread( |
| | | target=lambda: self.run_process_read_command(self.queue_strategy_trade_read_trade_read), daemon=True) |
| | | t1.start() |
| | | t1 = threading.Thread( |
| | | target=lambda: self.__create_order_command_reciever(order_ipc_addr), daemon=True) |
| | | t1.start() |
| | | t1 = threading.Thread( |
| | | target=lambda: self.__create_cancel_order_command_reciever(cancel_order_ipc_addr), daemon=True) |
| | | t1.start() |
| | | self.run_process_command(self.queue_strategy_trade_read) |
| | | else: |
| | | # 接受命令 |
| | | t1 = threading.Thread(target=lambda: self.run_process_command(self.queue_strategy_trade_read), daemon=True) |
| | | t1.start() |
| | | t1 = threading.Thread(target=lambda: self.run_process_read_command(self.queue_strategy_trade_read_trade_read), daemon=True) |
| | | t1 = threading.Thread( |
| | | target=lambda: self.run_process_read_command(self.queue_strategy_trade_read_trade_read), daemon=True) |
| | | t1.start() |
| | | t1 = threading.Thread( |
| | | target=lambda: self.__create_order_command_reciever(order_ipc_addr), daemon=True) |
| | | t1.start() |
| | | t1 = threading.Thread( |
| | | target=lambda: self.__create_cancel_order_command_reciever(cancel_order_ipc_addr), daemon=True) |
| | | t1.start() |
| | | |
| | | |
| | |
| | | import threading |
| | | import time |
| | | |
| | | import zmq |
| | | |
| | | from huaxin_client import command_manager |
| | | from huaxin_client import constant |
| | | from huaxin_client import socket_util |
| | |
| | | # send_response( |
| | | # json.dumps({"type": "response", "data": {"code": 0, "data": data}, "client_id": client_id, |
| | | # "request_id": request_id}), type, client_id, request_id, temp_params[2]) |
| | | if trade_response: |
| | | trade_response.OnTradeResponse( |
| | | {"type": "response", "data": {"code": 0, "data": data}, "client_id": client_id, |
| | | "request_id": request_id}) |
| | | else: |
| | | send_response( |
| | | json.dumps({"type": "response", "data": {"code": 0, "data": data}, "client_id": client_id, |
| | | "request_id": request_id}), type, client_id, request_id) |
| | |
| | | async_log_util.info(logger_local_huaxin_trade_debug, "API回调结束 req_id-{} request_id-{}", req_id, request_id) |
| | | else: |
| | | async_log_util.info(logger_local_huaxin_trade_debug, "非API回调 req_id-{}", req_id) |
| | | if trade_response: |
| | | trade_response.OnTradeCallback( |
| | | {"type": "trade_callback", "data": {"code": 0, "data": data, "type": type}}) |
| | | # # 非API回调 |
| | | else: |
| | | send_response( |
| | | json.dumps({"type": "trade_callback", "data": {"code": 0, "data": data, "type": type}}), |
| | | type, |
| | |
| | | addr, port = constant.SERVER_IP, constant.SERVER_PORT |
| | | |
| | | |
| | | def run(trade_response_: TradeResponse = None, queue_other_w_l2_r_: multiprocessing.Queue = None, |
| | | queue_strategy_trade_write_=None, |
| | | def run(ipc_order_addr, ipc_cancel_order_addr, queue_strategy_trade_write_=None, |
| | | queue_strategy_trade_read=None, queue_strategy_trade_read_for_read=None): |
| | | """ |
| | | 运行 |
| | | @param ipc_order_addr: zmq下单命令ipc地址 |
| | | @param ipc_cancel_order_addr: zmq撤单命令ipc地址 |
| | | @param queue_strategy_trade_write_: |
| | | @param queue_strategy_trade_read: |
| | | @param queue_strategy_trade_read_for_read: |
| | | @return: |
| | | """ |
| | | try: |
| | | logger_system.info("交易进程ID:{}", os.getpid()) |
| | | logger_system.info(f"trade 线程ID:{tool.get_thread_id()}") |
| | |
| | | global queue_strategy_trade_write |
| | | queue_strategy_trade_write = queue_strategy_trade_write_ |
| | | |
| | | global trade_response |
| | | trade_response = trade_response_ |
| | | |
| | | # 运行日志同步 |
| | | threading.Thread(target=lambda: async_log_util.run_sync(), daemon=True).start() |
| | | |
| | |
| | | tradeCommandManager = command_manager.TradeCommandManager() |
| | | tradeCommandManager.init(MyTradeActionCallback(), queue_strategy_trade_read, queue_strategy_trade_read_for_read) |
| | | logger_system.info("华鑫交易服务启动") |
| | | tradeCommandManager.run() |
| | | tradeCommandManager.run(ipc_order_addr, ipc_cancel_order_addr) |
| | | except Exception as e: |
| | | logger_system.exception(e) |
| | | # 不需要运行命令解析 |
| | |
| | | if not must_buy: |
| | | temp_thresh_hold_rate = round((total_num - max_num) * 0.9 / total_num, 2) |
| | | thresh_hold_rate = min(thresh_hold_rate, temp_thresh_hold_rate) |
| | | l2_log.l_cancel_debug(code, f"计算范围:{start_index}-{end_index},已撤单比例:{rate}/{thresh_hold_rate}") |
| | | l2_log.l_cancel_debug(code, f"L后计算范围:{start_index}-{end_index},已撤单比例:{rate}/{thresh_hold_rate}") |
| | | if rate >= thresh_hold_rate: |
| | | canceled_indexes.sort() |
| | | l2_log.l_cancel_debug(code, f"L下撤单,撤单位置:{canceled_indexes[-1]}") |
| | | l2_log.l_cancel_debug(code, f"L后撤单,撤单位置:{canceled_indexes[-1]}") |
| | | return True, total_data[canceled_indexes[-1]] |
| | | |
| | | return False, None |
| | |
| | | def cancel_success(self, code): |
| | | self.clear(code) |
| | | |
| | | |
| | | # ---------------------------------新G撤------------------------------- |
| | | class NewGCancelBigNumComputer: |
| | | __db = 0 |
| | | __redis_manager = redis_manager.RedisManager(0) |
| | | __real_place_order_index_dict = {} |
| | | __trade_progress_index_dict = {} |
| | | __watch_indexes_dict = {} |
| | |
| | | def __new__(cls, *args, **kwargs): |
| | | if not cls.__instance: |
| | | cls.__instance = super(NewGCancelBigNumComputer, cls).__new__(cls, *args, **kwargs) |
| | | cls.__load_datas() |
| | | return cls.__instance |
| | | |
| | | @classmethod |
| | | def __load_datas(cls): |
| | | __redis = cls.__get_redis() |
| | | try: |
| | | keys = RedisUtils.keys(__redis, "g_cancel_watch_index_info-*") |
| | | for k in keys: |
| | | code = k.split("-")[-1] |
| | | val = RedisUtils.get(__redis, k) |
| | | if val: |
| | | val = json.loads(val) |
| | | val = set(val) |
| | | CodeDataCacheUtil.set_cache(cls.__watch_indexes_dict, code, val) |
| | | finally: |
| | | RedisUtils.realse(__redis) |
| | | |
| | | @classmethod |
| | | def __get_redis(cls): |
| | | return cls.__redis_manager.getRedis() |
| | | |
| | | def __set_watch_index(self, code, indexes): |
| | | CodeDataCacheUtil.set_cache(self.__watch_indexes_dict, code, indexes) |
| | | RedisUtils.setex_async(self.__db, f"g_cancel_watch_index_info-{code}", tool.get_expire(), |
| | | json.dumps(list(indexes))) |
| | | |
| | | def set_real_place_order_index(self, code, index, buy_single_index, is_default): |
| | | self.__real_place_order_index_dict[code] = (index, is_default) |
| | | start_index = buy_single_index |
| | | if code in self.__trade_progress_index_dict: |
| | | start_index = self.__trade_progress_index_dict.get(code) |
| | | if not is_default: |
| | | trade_index, is_default = TradeBuyQueue().get_traded_index(code) |
| | | if trade_index is None: |
| | | start_index = 0 |
| | | else: |
| | | start_index = trade_index |
| | | self.__commpute_watch_indexes(code, start_index, (index, is_default), from_real_order_index_changed=True) |
| | | |
| | | def clear(self, code=None): |
| | |
| | | self.__real_place_order_index_dict.pop(code) |
| | | if code in self.__watch_indexes_dict: |
| | | self.__watch_indexes_dict.pop(code) |
| | | RedisUtils.delete_async(self.__db, f"g_cancel_watch_index_info-{code}") |
| | | if code in self.__trade_progress_index_dict: |
| | | self.__trade_progress_index_dict.pop(code) |
| | | else: |
| | | self.__real_place_order_index_dict.clear() |
| | | self.__watch_indexes_dict.clear() |
| | | self.__trade_progress_index_dict.clear() |
| | | keys = RedisUtils.keys(self.__get_redis(), f"g_cancel_watch_index_info-*") |
| | | for k in keys: |
| | | RedisUtils.delete(self.__get_redis(), k) |
| | | |
| | | # recompute:是否重新计算 |
| | | def __commpute_watch_indexes(self, code, traded_index, real_order_index_info, from_real_order_index_changed=False, |
| | |
| | | if from_real_order_index_changed or recompute: |
| | | # 真实下单位改变后才会更新 |
| | | final_watch_indexes = origin_watch_index | watch_indexes |
| | | self.__watch_indexes_dict[code] = final_watch_indexes |
| | | self.__set_watch_index(code, final_watch_indexes) |
| | | l2_log.g_cancel_debug(code, f"大单监听:{final_watch_indexes} 是否重新计算:{recompute}") |
| | | |
| | | def set_trade_progress(self, code, buy_single_index, index): |
| | | if self.__trade_progress_index_dict.get(code) != index: |
| | | # if self.__trade_progress_index_dict.get(code) != index: |
| | | self.__trade_progress_index_dict[code] = index |
| | | self.__commpute_watch_indexes(code, index, self.__real_place_order_index_dict.get(code)) |
| | | # self.__commpute_watch_indexes(code, index, self.__real_place_order_index_dict.get(code)) |
| | | |
| | | def need_cancel(self, code, buy_exec_index, start_index, end_index): |
| | | if code not in self.__real_place_order_index_dict: |
| | |
| | | return True, total_datas[cancel_half_index], f"B撤:撤单索引-{cancel_half_index}" |
| | | return False, None, "还有大单没撤单" |
| | | |
| | | |
| | | def test(self): |
| | | self.__set_watch_index("000333", {1, 2, 3, 4, 5, 6, 7}) |
| | | |
| | | |
| | | # ---------------------------------独苗撤------------------------------- |
| | |
| | | def createTradeServer(pipe_server, queue_strategy_r_trade_w_: multiprocessing.Queue, |
| | | queue_l1_w_strategy_r_: multiprocessing.Queue, |
| | | queue_strategy_w_trade_r_: multiprocessing.Queue, |
| | | queue_strategy_w_trade_r_for_read_: multiprocessing.Queue, queue_l1_trade_r_strategy_w_, queue_l1_trade_w_strategy_r_): |
| | | queue_strategy_w_trade_r_for_read_: multiprocessing.Queue, queue_l1_trade_r_strategy_w_, |
| | | queue_l1_trade_w_strategy_r_, trade_ipc_addr): |
| | | """ |
| | | 策略进程 |
| | | @param pipe_server: |
| | | @param queue_strategy_r_trade_w_: |
| | | @param queue_l1_w_strategy_r_: |
| | | @param queue_strategy_w_trade_r_: |
| | | @param queue_strategy_w_trade_r_for_read_: |
| | | @param queue_l1_trade_r_strategy_w_: |
| | | @param queue_l1_trade_w_strategy_r_: |
| | | @param trade_ipc_addr: 交易ipc地址(下单地址, 撤单地址) |
| | | @return: |
| | | """ |
| | | logger_system.info("策略进程ID:{}", os.getpid()) |
| | | log.close_print() |
| | | # 初始化参数 |
| | |
| | | # 启动华鑫交易服务 |
| | | huaxin_trade_server.run(queue_strategy_r_trade_w_, queue_l1_w_strategy_r_, queue_strategy_w_trade_r_, |
| | | queue_strategy_w_trade_r_for_read_, |
| | | queue_l1_trade_w_strategy_r_) |
| | | queue_l1_trade_w_strategy_r_, trade_ipc_addr) |
| | | |
| | | |
| | | # 主服务 |
| | |
| | | # 策略读交易写 |
| | | queue_strategy_r_trade_w = multiprocessing.Queue() |
| | | |
| | | # 下单,撤单ipc地址 |
| | | order_ipc_addr, cancel_order_ipc_addr = "ipc://trade_order.ipc", "ipc://trade_cancel_order.ipc" |
| | | |
| | | # 托管环境下不创建 |
| | | # serverProcess = multiprocessing.Process(target=createServer, args=(pss_server,)) |
| | | # serverProcess.start() |
| | |
| | | # 交易进程 |
| | | tradeProcess = multiprocessing.Process( |
| | | target=huaxin_client.trade_client.run, |
| | | args=(None, queue_other_w_l2_r, queue_strategy_r_trade_w, queue_strategy_w_trade_r, |
| | | args=(order_ipc_addr, cancel_order_ipc_addr, queue_strategy_r_trade_w, queue_strategy_w_trade_r, |
| | | queue_strategy_w_trade_r_for_read)) |
| | | tradeProcess.start() |
| | | # 此处将L2的进程与策略进程合并 |
| | |
| | | # 主进程 |
| | | createTradeServer(pss_strategy, queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r, |
| | | queue_strategy_w_trade_r_for_read, queue_l1_trade_r_strategy_w, |
| | | queue_l1_trade_w_strategy_r) |
| | | queue_l1_trade_w_strategy_r,trade_ipc_addr) |
| | | |
| | | # 将tradeServer作为主进程 |
| | | l1Process.join() |
New file |
| | |
| | | from db.redis_manager_delegate import RedisUtils |
| | | from l2.cancel_buy_strategy import NewGCancelBigNumComputer |
| | | |
| | | |
| | | def test_g_cancel(): |
| | | NewGCancelBigNumComputer().test() |
| | | |
| | | |
| | | if __name__ == '__main__': |
| | | test_g_cancel() |
| | | RedisUtils.run_loop() |
| | |
| | | import time |
| | | import concurrent.futures |
| | | |
| | | import zmq |
| | | |
| | | from code_attribute import gpcode_manager |
| | | from huaxin_client import constant as huaxin_client_constant |
| | | from log_module import async_log_util |
| | |
| | | pass |
| | | |
| | | |
| | | def __create_trade_ipc_context(trade_ipc_addr): |
| | | """ |
| | | 创建IPC发送端口 |
| | | @param trade_ipc_addr:(下单地址,撤单地址) |
| | | @return: |
| | | """ |
| | | context = zmq.Context() |
| | | global order_socket, cancel_order_socket |
| | | order_socket = context.socket(zmq.REQ) |
| | | order_socket.connect(trade_ipc_addr[0]) |
| | | cancel_order_socket = context.socket(zmq.REQ) |
| | | cancel_order_socket.connect(trade_ipc_addr[1]) |
| | | # while True: |
| | | # try: |
| | | # # datas = ('000990', 7.52, 400, 93000030, 2012, 380422, 380421, 375477, '1') * 150 |
| | | # # L2SharedMemoryDataUtil.set_data(datas, shared_memory) |
| | | # socket.send_json({'data': [], "time": time.time()}) |
| | | # response = socket.recv_string() |
| | | # except Exception as e: |
| | | # logging.exception(e) |
| | | |
| | | |
| | | def __order_by_zmq(data_json): |
| | | """ |
| | | 通过zmq发送下单信息 |
| | | @param data_json: |
| | | @return: |
| | | """ |
| | | order_socket.send_json(data_json) |
| | | response = order_socket.recv_string() |
| | | |
| | | |
| | | def __cancel_order_by_zmq(data_json): |
| | | cancel_order_socket.send_json(data_json) |
| | | response = cancel_order_socket.recv_string() |
| | | |
| | | |
| | | def __test_order(): |
| | | time.sleep(60) |
| | | order_ref = huaxin_util.create_order_ref() |
| | | order(1, "000333", 100, 1.00, price_type=2, blocking=False, order_ref=order_ref, shadow_price=0.99) |
| | | time.sleep(60) |
| | | cancel_order(1, "000333", '', orderRef=order_ref, blocking=False) |
| | | |
| | | |
| | | # 设置交易通信队列 |
| | | # 暂时不会使用该方法 |
| | | def run_pipe_trade(queue_strategy_r_trade_w_, queue_strategy_w_trade_r_, queue_strategy_w_trade_r_for_read_): |
| | | def run_pipe_trade(queue_strategy_r_trade_w_, queue_strategy_w_trade_r_, queue_strategy_w_trade_r_for_read_, |
| | | trade_ipc_addr): |
| | | global queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_read, queue_l1_trade_r_strategy_w |
| | | queue_strategy_w_trade_r = queue_strategy_w_trade_r_ |
| | | queue_strategy_w_trade_r_for_read = queue_strategy_w_trade_r_for_read_ |
| | |
| | | t1.start() |
| | | t1 = threading.Thread(target=lambda: CancelOrderManager().run(cancel_order), daemon=True) |
| | | t1.start() |
| | | # 创建IPC发送端口 |
| | | __create_trade_ipc_context(trade_ipc_addr) |
| | | |
| | | # 测试下单 |
| | | threading.Thread(target=lambda: __test_order(), daemon=True).start() |
| | | |
| | | |
| | | # 交易通道的错误次数 |
| | |
| | | |
| | | |
| | | # 网络请求 |
| | | def __request(_type, data, request_id=None, blocking=False, is_pipe=True, log_enable=True, is_trade=False): |
| | | def __request(_type, data, request_id=None, log_enable=True, is_trade=False): |
| | | """ |
| | | 请求,将交易(包含下单/撤单)与查询(包含查持仓/账户可用金额/委托列表/成交列表)队列分离 |
| | | @param _type: |
| | | @param data: |
| | | @param request_id: |
| | | @param blocking: |
| | | @param is_pipe: |
| | | @param log_enable: |
| | | @param is_trade: |
| | | @return: |
| | |
| | | request_id = __get_request_id(_type) |
| | | try: |
| | | if log_enable: |
| | | async_log_util.info(hx_logger_trade_loop, "请求发送开始:client_id-{} request_id-{} is_pipe-{}", 0, request_id, |
| | | is_pipe) |
| | | async_log_util.info(hx_logger_trade_loop, "请求发送开始:client_id-{} request_id-{}", 0, request_id) |
| | | root_data = {"type": _type, |
| | | "data": data, |
| | | "request_id": request_id} |
| | | "request_id": request_id, |
| | | "time": time.time() |
| | | } |
| | | root_data = socket_util.encryp_client_params_sign(root_data) |
| | | start_time = time.time() |
| | | if is_trade: |
| | | # queue_strategy_w_trade_r.put_nowait(root_data) |
| | | # 采用zmq通信 |
| | | if data['trade_type'] == 1: |
| | | __order_by_zmq(root_data) |
| | | elif data['trade_type'] == 2: |
| | | __cancel_order_by_zmq(root_data) |
| | | else: |
| | | queue_strategy_w_trade_r.put_nowait(root_data) |
| | | else: |
| | | queue_strategy_w_trade_r_for_read.put_nowait(root_data) |
| | |
| | | pass |
| | | |
| | | |
| | | # 下单委托 |
| | | # direction 1-买 2-卖 |
| | | # code:代码 |
| | | # volume:交易量 |
| | | # price:价格(如果是卖时不传价格就按照5挡价卖) |
| | | # blocking是否阻塞进程 |
| | | def order(direction, code, volume, price, price_type=2, blocking=False, sinfo=None, request_id=None, |
| | | order_ref=None, shadow_price=None): |
| | | """ |
| | | 下单委托 |
| | | @param direction: 1-买 2-卖 |
| | | @param code: |
| | | @param volume:交易量 |
| | | @param price:价格(如果是卖时不传价格就按照5挡价卖) |
| | | @param price_type: |
| | | @param blocking:是否阻塞进程 |
| | | @param sinfo: |
| | | @param request_id: |
| | | @param order_ref: |
| | | @param shadow_price: |
| | | @return: |
| | | """ |
| | | timestamp = round(time.time() * 1000) |
| | | if not sinfo: |
| | | sinfo = f"b_{code}_{timestamp}" |
| | |
| | | "price_type": price_type, |
| | | "price": price, "shadow_price": shadow_price, "sinfo": sinfo, "blocking": blocking}, |
| | | request_id=request_id, |
| | | blocking=blocking, |
| | | is_pipe=is_pipe_channel_normal(), is_trade=True) |
| | | is_trade=True) |
| | | try: |
| | | if blocking: |
| | | return __read_response(request_id, blocking) |
| | |
| | | "code": code, |
| | | "orderRef": orderRef, |
| | | "orderActionRef": order_action_ref, |
| | | "orderSysID": orderSysID, "sinfo": sinfo}, request_id=request_id, blocking=blocking, |
| | | is_pipe=is_pipe_channel_normal(), is_trade=True) |
| | | "orderSysID": orderSysID, "sinfo": sinfo}, request_id=request_id, is_trade=True) |
| | | try: |
| | | return __read_response(request_id, blocking) |
| | | finally: |
| | |
| | | def get_delegate_list(can_cancel=True, blocking=True, timeout=TIMEOUT): |
| | | request_id = __request(ClientSocketManager.CLIENT_TYPE_DELEGATE_LIST, |
| | | {"type": ClientSocketManager.CLIENT_TYPE_DELEGATE_LIST, |
| | | "can_cancel": 1 if can_cancel else 0}, blocking=blocking, is_pipe=is_pipe_channel_normal()) |
| | | "can_cancel": 1 if can_cancel else 0}) |
| | | |
| | | return __read_response(request_id, blocking, timeout=timeout) |
| | | |
| | |
| | | # 获取成交列表 |
| | | def get_deal_list(blocking=True, timeout=TIMEOUT): |
| | | request_id = __request(ClientSocketManager.CLIENT_TYPE_DEAL_LIST, |
| | | {"type": ClientSocketManager.CLIENT_TYPE_DEAL_LIST}, blocking=blocking, |
| | | is_pipe=is_pipe_channel_normal()) |
| | | {"type": ClientSocketManager.CLIENT_TYPE_DEAL_LIST}) |
| | | return __read_response(request_id, blocking, timeout=timeout) |
| | | |
| | | |
| | | # 获取持仓列表 |
| | | def get_position_list(blocking=True): |
| | | request_id = __request(ClientSocketManager.CLIENT_TYPE_POSITION_LIST, |
| | | {"type": ClientSocketManager.CLIENT_TYPE_POSITION_LIST}, blocking=blocking, |
| | | is_pipe=is_pipe_channel_normal()) |
| | | {"type": ClientSocketManager.CLIENT_TYPE_POSITION_LIST}) |
| | | return __read_response(request_id, blocking) |
| | | |
| | | |
| | | # 获取账户资金状况 |
| | | def get_money(blocking=True): |
| | | request_id = __request(ClientSocketManager.CLIENT_TYPE_MONEY, |
| | | {"type": ClientSocketManager.CLIENT_TYPE_MONEY}, blocking=blocking, |
| | | is_pipe=is_pipe_channel_normal()) |
| | | {"type": ClientSocketManager.CLIENT_TYPE_MONEY}) |
| | | return __read_response(request_id, blocking) |
| | | |
| | | |
| | | # 设置L2订阅数据 |
| | | def set_l2_codes_data(codes_data, blocking=True): |
| | | request_id = __request(ClientSocketManager.CLIENT_TYPE_CMD_L2, |
| | | {"type": ClientSocketManager.CLIENT_TYPE_CMD_L2, "data": codes_data}, blocking=blocking, |
| | | is_pipe=is_pipe_channel_normal()) |
| | | {"type": ClientSocketManager.CLIENT_TYPE_CMD_L2, "data": codes_data}) |
| | | return __read_response(request_id, blocking) |
| | | |
| | | |
| | | # 设置L2订阅数据 |
| | | def __test_trade_channel(sid): |
| | | request_id = __request("test", |
| | | {"type": "test", "data": {"sid": sid}}, blocking=True, log_enable=False) |
| | | {"type": "test", "data": {"sid": sid}}, log_enable=False) |
| | | return __read_response(request_id, True, log_enable=False) |
| | | |
| | | |
| | |
| | | |
| | | |
| | | def run(queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_read, |
| | | queue_l1_trade_w_strategy_r): |
| | | queue_l1_trade_w_strategy_r, trade_ipc_addr): |
| | | """ |
| | | |
| | | @param queue_strategy_r_trade_w: |
| | | @param queue_l1_w_strategy_r: |
| | | @param queue_strategy_w_trade_r: |
| | | @param queue_strategy_w_trade_r_for_read: |
| | | @param queue_l1_trade_w_strategy_r: |
| | | @param trade_ipc_addr: 交易IPC地址:(下单ipc地址,撤单ipc地址) |
| | | @return: |
| | | """ |
| | | logger_system.info(f"trade_server 线程ID:{tool.get_thread_id()}") |
| | | try: |
| | | # 执行一些初始化数据 |
| | |
| | | |
| | | # 启动交易服务 |
| | | huaxin_trade_api.run_pipe_trade(queue_strategy_r_trade_w, queue_strategy_w_trade_r, |
| | | queue_strategy_w_trade_r_for_read) |
| | | queue_strategy_w_trade_r_for_read, trade_ipc_addr) |
| | | |
| | | # 监听l1那边传过来的代码 |
| | | t1 = threading.Thread(target=lambda: __recv_pipe_l1(queue_l1_w_strategy_r), daemon=True) |