| | |
| | | import concurrent.futures |
| | | import json |
| | | import logging |
| | | import multiprocessing |
| | | import threading |
| | | |
| | | from huaxin_client import socket_util |
| | |
| | | return cls._instance |
| | | |
| | | @classmethod |
| | | def init(cls, trade_action_callback: TradeActionCallback, pipe_l2, pipe_strategy): |
| | | def init(cls, trade_action_callback: TradeActionCallback, pipe_l2, pipe_strategy, queue_strategy_trade): |
| | | cls.action_callback = trade_action_callback |
| | | cls.pipe_strategy = pipe_strategy |
| | | cls.pipe_l2 = pipe_l2 |
| | | cls.queue_strategy_trade = queue_strategy_trade |
| | | |
| | | @classmethod |
| | | def process_command(cls, _type, client_id, result_json, sk=None): |
| | |
| | | logging.error(result_json) |
| | | |
| | | @classmethod |
| | | def run_process_command(cls, pipe_strategy): |
| | | if pipe_strategy is None: |
| | | def run_process_command(cls, queue_strategy_trade: multiprocessing.Queue): |
| | | if queue_strategy_trade is None: |
| | | return |
| | | # 本地命令接收 |
| | | try: |
| | | while True: |
| | | try: |
| | | val = pipe_strategy.recv() |
| | | val = queue_strategy_trade.get() |
| | | if val: |
| | | val = json.loads(val) |
| | | _type = val["type"] |
| | | cls.process_command_thread_pool.submit(lambda: cls.process_command(_type, None, val)) |
| | | cls.process_command_thread_pool.submit(lambda: cls.process_command(_type, None, val)) |
| | | except Exception as e: |
| | | async_log_util.exception(logger_local_huaxin_trade_debug, e) |
| | | logging.exception(e) |
| | |
| | | # 维护连接数的稳定 |
| | | def run(self, blocking=True): |
| | | if blocking: |
| | | self.run_process_command(self.pipe_strategy) |
| | | self.run_process_command(self.queue_strategy_trade) |
| | | else: |
| | | # 接受命令 |
| | | t1 = threading.Thread(target=lambda: self.run_process_command(self.pipe_strategy), daemon=True) |
| | | t1 = threading.Thread(target=lambda: self.run_process_command(self.queue_strategy_trade), daemon=True) |
| | | t1.start() |
| | | |
| | | |
| | |
| | | 其它字段置空 |
| | | ''' |
| | | # 给L2发送消息 |
| | | if l2pipe is not None: |
| | | l2pipe.send(json.dumps({"type": "listen_volume", "data": {"code": code, "volume": count}}).encode('utf-8')) |
| | | |
| | | ret = api.ReqOrderInsert(req_field, self.req_id) |
| | | if ret != 0: |
| | | raise Exception('ReqOrderInsert fail, ret[%d]' % ret) |
| | | if l2pipe is not None: |
| | | l2pipe.send(json.dumps({"type": "listen_volume", "data": {"code": code, "volume": count}}).encode('utf-8')) |
| | | async_log_util.info(logger_trade, f"{code}华鑫本地真实下单结束") |
| | | return |
| | | |
| | |
| | | "limitPrice": pOrderField.LimitPrice, "accountID": pOrderField.AccountID, |
| | | "orderRef": pOrderField.OrderRef, "turnover": pOrderField.Turnover, |
| | | "volume": pOrderField.VolumeTotalOriginal, "volumeTraded": pOrderField.VolumeTraded, |
| | | "orderStatus": pOrderField.OrderStatus, "orderSubmitStatus": pOrderField.OrderSubmitStatus, |
| | | "orderStatus": pOrderField.OrderStatus, |
| | | "orderSubmitStatus": pOrderField.OrderSubmitStatus, |
| | | "statusMsg": pOrderField.StatusMsg} |
| | | self.call_back_thread_pool.submit(self.__data_callback, TYPE_ORDER, 0, order_data) |
| | | except Exception as e: |
| | |
| | | else: |
| | | async_log_util.info(logger_local_huaxin_trade_debug, "非API回调 req_id-{}", req_id) |
| | | if trade_response: |
| | | trade_response.OnTradeCallback({"type": "trade_callback", "data": {"code": 0, "data": data, "type": type}}) |
| | | trade_response.OnTradeCallback( |
| | | {"type": "trade_callback", "data": {"code": 0, "data": data, "type": type}}) |
| | | # # 非API回调 |
| | | else: |
| | | send_response( |
| | |
| | | |
| | | addr, port = constant.SERVER_IP, constant.SERVER_PORT |
| | | |
| | | |
| | | def process_cmd(tradeRequest: TradeRequest): |
| | | tradeCommandManager.process_command(tradeRequest.type_, None, tradeRequest.data) |
| | | |
| | | |
| | | def __test(): |
| | | # 测试撤单 |
| | | for i in range(0, 10): |
| | | code = "600190" |
| | | orderSysID = "0190000229" |
| | | sinfo = f"test_cancel_{i}" |
| | | data = {"type": "trade", "trade_type": 2, |
| | | "direction": 0, |
| | | "code": code, |
| | | "localOrderID": "", |
| | | "orderSysID": orderSysID, "sinfo": sinfo} |
| | | process_cmd(TradeRequest("trade", {"type": "trade", "data": data, "request_id": f"test-{i}"}, f"test-{i}")) |
| | | time.sleep(2) |
| | | |
| | | |
| | | def run(trade_response_: TradeResponse=None, pipe_l2=None, pipe_strategy=None): |
| | | def run(trade_response_: TradeResponse = None, pipe_l2=None, pipe_strategy=None, queue_strategy_trade=None): |
| | | try: |
| | | logger_system.info("交易进程ID:{}", os.getpid()) |
| | | logger_system.info(f"trade 线程ID:{tool.get_thread_id()}") |
| | |
| | | trade_response = trade_response_ |
| | | |
| | | # 运行日志同步 |
| | | threading.Thread( target=lambda:async_log_util.run_sync(),daemon=True).start() |
| | | threading.Thread(target=lambda: async_log_util.run_sync(), daemon=True).start() |
| | | |
| | | global tradeCommandManager |
| | | tradeCommandManager = command_manager.TradeCommandManager() |
| | | tradeCommandManager.init(MyTradeActionCallback(), l2pipe, pipe_strategy) |
| | | tradeCommandManager.init(MyTradeActionCallback(), l2pipe, pipe_strategy, queue_strategy_trade) |
| | | logger_system.info("华鑫交易服务启动") |
| | | tradeCommandManager.run() |
| | | except Exception as e: |
| | |
| | | __db = 0 |
| | | __redis_manager = redis_manager.RedisManager(0) |
| | | __tradeBuyQueue = TradeBuyQueue() |
| | | __buyL2SafeCountManager = BuyL2SafeCountManager() |
| | | __hCancelParamsManager = l2_trade_factor.HCancelParamsManager() |
| | | __SecondCancelBigNumComputer = SecondCancelBigNumComputer() |
| | | |
| | |
| | | |
| | | # 开始撤单 |
| | | def start_cancel(self, code, buy_no, total_datas, buy_order_no_map, local_operate_map, m_val_num): |
| | | thresh_num = int(m_val_num * 1.8) |
| | | thresh_num = int(m_val_num * 1) |
| | | place_order_index = self.__SecondCancelBigNumComputer.get_real_place_order_index_cache(code) |
| | | if place_order_index is None: |
| | | raise Exception("未获取到下单真实位置") |
| | |
| | | __codeActualPriceProcessor = CodeActualPriceProcessor() |
| | | __ths_l2_trade_queue_manager = trade_queue_manager.thsl2tradequeuemanager() |
| | | __thsBuy1VolumnManager = trade_queue_manager.THSBuy1VolumnManager() |
| | | __buyL2SafeCountManager = safe_count_manager.BuyL2SafeCountManager() |
| | | __l2PlaceOrderParamsManagerDict = {} |
| | | __last_buy_single_dict = {} |
| | | __TradeBuyQueue = transaction_progress.TradeBuyQueue() |
| | |
| | | |
| | | _start_time = tool.get_now_timestamp() |
| | | total_datas = local_today_datas[code] |
| | | # 处理安全笔数 |
| | | # cls.__buyL2SafeCountManager.compute_left_rate(code, compute_start_index, compute_end_index, total_datas, |
| | | # local_today_num_operate_map.get(code)) |
| | | |
| | | # 获取买入信号计算起始位置 |
| | | buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set, buy_volume_rate = cls.__get_order_begin_pos( |
| | |
| | | # from huaxin_api import trade_client, l2_client, l1_client |
| | | |
| | | |
| | | def createTradeServer(pipe_server, pipe_trade, pipe_l1, pipe_l2, ptl2_l2, psl2_l2, ptl2_trade, pst_trade): |
| | | def createTradeServer(pipe_server, pipe_trade, pipe_l1, pipe_l2, ptl2_l2, psl2_l2, queue_strategy_trade): |
| | | logger_system.info("策略进程ID:{}", os.getpid()) |
| | | log.close_print() |
| | | # 初始化参数 |
| | |
| | | t1.start() |
| | | # |
| | | # 启动华鑫交易服务 |
| | | trade_server.run(pipe_trade, pipe_l1, pipe_l2, huaxin_client.trade_client.process_cmd) |
| | | trade_server.run(pipe_trade, pipe_l1, pipe_l2, queue_strategy_trade) |
| | | |
| | | |
| | | # 主服务 |
| | |
| | | # l1与策略间的通信 |
| | | pl1t_l1, pl1t_strategy = multiprocessing.Pipe() |
| | | |
| | | queue_strategy_trade = multiprocessing.Queue() |
| | | |
| | | # 托管环境下不创建 |
| | | # serverProcess = multiprocessing.Process(target=createServer, args=(pss_server,)) |
| | | # serverProcess.start() |
| | |
| | | |
| | | # 交易进程 |
| | | tradeProcess = multiprocessing.Process( |
| | | target=lambda: huaxin_client.trade_client.run(None, ptl2_trade, pst_trade)) |
| | | target=lambda: huaxin_client.trade_client.run(None, ptl2_trade, pst_trade, queue_strategy_trade)) |
| | | tradeProcess.start() |
| | | |
| | | # 主进程 |
| | | createTradeServer(pss_strategy, pst_strategy, pl1t_strategy, psl2_strategy, ptl2_l2, psl2_l2, ptl2_trade, |
| | | pst_trade) |
| | | createTradeServer(pss_strategy, pst_strategy, pl1t_strategy, psl2_strategy, ptl2_l2, psl2_l2, queue_strategy_trade) |
| | | |
| | | # 将tradeServer作为主进程 |
| | | l1Process.join() |
| | |
| | | from utils import tool |
| | | from db import redis_manager_delegate as redis_manager |
| | | from l2 import l2_log, l2_data_manager, transaction_progress |
| | | from l2.safe_count_manager import BuyL2SafeCountManager |
| | | from l2.transaction_progress import TradeBuyQueue |
| | | from third_data import kpl_util, kpl_data_manager |
| | | from third_data.code_plate_key_manager import RealTimeKplMarketData, LimitUpCodesPlateKeyManager |
| | | from third_data.code_plate_key_manager import LimitUpCodesPlateKeyManager |
| | | from third_data.kpl_data_manager import KPLDataManager |
| | | from trade import trade_data_manager, current_price_process_manager, l2_trade_util |
| | | from trade.trade_queue_manager import THSBuy1VolumnManager |
| | |
| | | continue |
| | | RedisUtils.delete(redis_info, k, auto_free=False) |
| | | RedisUtils.realse(redis_info) |
| | | BuyL2SafeCountManager().clear_data(code) |
| | | |
| | | transaction_progress.TradeBuyQueue().set_traded_index(code, 0) |
| | | l2_trade_util.remove_from_forbidden_trade_codes(code) |
| | |
| | | l2.l2_data_util.local_today_num_operate_map[code].clear() |
| | | |
| | | print("id:", id(l2.l2_data_util.local_today_datas)) |
| | | # safe_count_manager.BuyL2SafeCountManager.get_safe_count = mock.Mock(return_value=16) |
| | | # l2_trade_factor.L2TradeFactorUtil.compute_m_value = mock.Mock(return_value=(14699952, "")) |
| | | # pos_list.insert(41,(225,306)) |
| | | # pos_list.insert(63, (345, 423)) |
| | |
| | | |
| | | |
| | | # 设置交易通信队列 |
| | | def run_pipe_trade(pipe_trade_, trade_cmd_callback_): |
| | | def run_pipe_trade(pipe_trade_, queue_strategy_trade_): |
| | | global pipe_trade |
| | | pipe_trade = pipe_trade_ |
| | | global trade_cmd_callback |
| | | trade_cmd_callback = trade_cmd_callback_ |
| | | |
| | | global queue_strategy_trade |
| | | queue_strategy_trade = queue_strategy_trade_ |
| | | t1 = threading.Thread(target=lambda: __run_recv_pipe_trade(), daemon=True) |
| | | t1.start() |
| | | |
| | |
| | | "data": data, |
| | | "request_id": request_id} |
| | | root_data = socket_util.encryp_client_params_sign(root_data) |
| | | |
| | | if not is_pipe: |
| | | trade_cmd_callback(TradeRequest(_type, root_data, request_id)) |
| | | else: |
| | | start_time = time.time() |
| | | pipe_trade.send(json.dumps(root_data).encode("utf-8")) |
| | | use_time = int((time.time() - start_time)*1000) |
| | | if use_time > 10: |
| | | async_log_util.info(hx_logger_trade_loop, f"发送耗时:request_id-{request_id} 耗时时间:{use_time}") |
| | | start_time = time.time() |
| | | queue_strategy_trade.put_nowait(root_data) |
| | | # pipe_trade.send(json.dumps(root_data).encode("utf-8")) |
| | | use_time = int((time.time() - start_time)*1000) |
| | | if use_time > 10: |
| | | async_log_util.info(hx_logger_trade_loop, f"发送耗时:request_id-{request_id} 耗时时间:{use_time}") |
| | | if log_enable: |
| | | async_log_util.info(hx_logger_trade_loop, "请求发送成功:request_id-{}", request_id) |
| | | except BrokenPipeError as e: |
| | |
| | | import json |
| | | import logging |
| | | import mmap |
| | | import multiprocessing |
| | | import queue |
| | | import random |
| | | import socket |
| | |
| | | my_trade_response = MyTradeResponse() |
| | | |
| | | |
| | | def run(pipe_trade, pipe_l1, pipe_l2, trade_cmd_callback): |
| | | def run(pipe_trade, pipe_l1, pipe_l2, queue_strategy_trade: multiprocessing.Queue): |
| | | logger_system.info(f"trade_server 线程ID:{tool.get_thread_id()}") |
| | | try: |
| | | # 执行一些初始化数据 |
| | |
| | | manager.run(blocking=False) |
| | | |
| | | # 启动交易服务 |
| | | huaxin_trade_api.run_pipe_trade(pipe_trade, trade_cmd_callback) |
| | | huaxin_trade_api.run_pipe_trade(pipe_trade, queue_strategy_trade) |
| | | |
| | | # 监听l1那边传过来的代码 |
| | | t1 = threading.Thread(target=lambda: __recv_pipe_l1(pipe_l1), daemon=True) |
| | |
| | | self.score_index = score_index |
| | | |
| | | # 获取信号连续买笔数 |
| | | |
| | | def get_begin_continue_buy_count(self): |
| | | counts = [3, 3, 3, 2, 2, 2, 2] |
| | | volume_rate_index = self.volume_rate_index |
| | |
| | | __cancel_success(code) |
| | | # 再次撤单 |
| | | if constant.TRADE_WAY == constant.TRADE_WAY_HUAXIN: |
| | | async_log_util.info(logger_trade, "{} trade_manager.start_cancel_buy 再次撤单开始".format(code)) |
| | | __cancel_order_thread_pool.submit(lambda: trade_huaxin.cancel_order(code, msg="再次撤单")) |
| | | async_log_util.info(logger_trade, "{} trade_manager.start_cancel_buy 再次撤单结束".format(code)) |
| | | # 不需要再次撤单了 |
| | | # try: |
| | | # cancel_buy_again(code) |
| | |
| | | import logging |
| | | |
| | | from l2 import l2_data_manager |
| | | from l2.cancel_buy_strategy import HourCancelBigNumComputer, SecondCancelBigNumComputer , \ |
| | | from l2.cancel_buy_strategy import HourCancelBigNumComputer, SecondCancelBigNumComputer, \ |
| | | LCancelBigNumComputer, DCancelBigNumComputer |
| | | from l2.l2_data_util import local_today_datas, local_today_num_operate_map |
| | | from l2.safe_count_manager import BuyL2SafeCountManager |
| | | from log_module.log import logger_l2_error |
| | | from trade.trade_queue_manager import THSBuy1VolumnManager |
| | | |
| | | __thsBuy1VolumnManager = THSBuy1VolumnManager() |
| | | __buyL2SafeCountManager = BuyL2SafeCountManager() |
| | | |
| | | |
| | | def virtual_buy_success(code): |
| | |
| | | def virtual_cancel_success(code, buy_single_index, buy_exec_index, total_datas): |
| | | l2_data_manager.TradePointManager().delete_buy_point(code) |
| | | l2_data_manager.TradePointManager().delete_buy_cancel_point(code) |
| | | # 安全笔数计算 |
| | | __buyL2SafeCountManager.save_place_order_info(code, buy_single_index, buy_exec_index, |
| | | total_datas[-1]["index"]) |
| | | SecondCancelBigNumComputer().cancel_success(code) |
| | | DCancelBigNumComputer().cancel_success(code) |
| | | LCancelBigNumComputer().cancel_success(code) |
| | |
| | | |
| | | # 真实买成功 |
| | | def real_buy_success(code, tradePointManager): |
| | | # @dask.delayed |
| | | def clear_max_buy1_volume(code): |
| | | # 下单成功,需要删除最大买1 |
| | | __thsBuy1VolumnManager.clear_max_buy1_volume(code) |
| | | |
| | | # @dask.delayed |
| | | def safe_count(code, buy_single_index, buy_exec_index): |
| | | try: |
| | | __buyL2SafeCountManager.save_place_order_info(code, buy_single_index, buy_exec_index, None) |
| | | except Exception as e: |
| | | logging.exception(e) |
| | | logger_l2_error.exception(e) |
| | | |
| | | # @dask.delayed |
| | | def h_cancel(code, buy_single_index, buy_exec_index): |
| | | try: |
| | | HourCancelBigNumComputer().place_order_success(code, buy_single_index, buy_exec_index, |
| | |
| | | logging.exception(e) |
| | | logger_l2_error.exception(e) |
| | | |
| | | # @dask.delayed |
| | | def l_cancel(code): |
| | | try: |
| | | LCancelBigNumComputer().place_order_success(code) |
| | |
| | | code) |
| | | |
| | | clear_max_buy1_volume(code) |
| | | safe_count(code, buy_single_index, buy_exec_index) |
| | | s_cancel(code) |
| | | # H撤暂时不生效 |
| | | h_cancel(code, buy_single_index, buy_exec_index) |
| | |
| | | |
| | | # 真实撤成功 |
| | | def real_cancel_success(code, buy_single_index, buy_exec_index, total_datas): |
| | | # 安全笔数计算 |
| | | __buyL2SafeCountManager.save_place_order_info(code, buy_single_index, buy_exec_index, |
| | | total_datas[-1]["index"]) |
| | | # 取消买入标识 |
| | | l2_data_manager.TradePointManager().delete_buy_point(code) |
| | | l2_data_manager.TradePointManager().delete_buy_cancel_point(code) |