Administrator
2023-09-19 ca186ff9fe0be665ba1153b7244a671bdade6f23
BUG修复
11个文件已修改
129 ■■■■■ 已修改文件
huaxin_client/command_manager.py 17 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/trade_client.py 37 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/cancel_buy_strategy.py 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_manager_new.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.py 11 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/l2_trade_test.py 5 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/huaxin_trade_api.py 22 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/trade_server.py 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/l2_trade_factor.py 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/trade_manager.py 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/trade_result_manager.py 22 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/command_manager.py
@@ -5,6 +5,7 @@
import concurrent.futures
import json
import logging
import multiprocessing
import threading
from huaxin_client import socket_util
@@ -73,10 +74,11 @@
        return cls._instance
    @classmethod
    def init(cls, trade_action_callback: TradeActionCallback, pipe_l2, pipe_strategy):
    def init(cls, trade_action_callback: TradeActionCallback, pipe_l2, pipe_strategy, queue_strategy_trade):
        cls.action_callback = trade_action_callback
        cls.pipe_strategy = pipe_strategy
        cls.pipe_l2 = pipe_l2
        cls.queue_strategy_trade = queue_strategy_trade
    @classmethod
    def process_command(cls, _type, client_id, result_json, sk=None):
@@ -120,18 +122,17 @@
            logging.error(result_json)
    @classmethod
    def run_process_command(cls, pipe_strategy):
        if pipe_strategy is None:
    def run_process_command(cls, queue_strategy_trade: multiprocessing.Queue):
        if queue_strategy_trade is None:
            return
        # 本地命令接收
        try:
            while True:
                try:
                    val = pipe_strategy.recv()
                    val = queue_strategy_trade.get()
                    if val:
                        val = json.loads(val)
                        _type = val["type"]
                        cls.process_command_thread_pool.submit(lambda:  cls.process_command(_type, None, val))
                        cls.process_command_thread_pool.submit(lambda: cls.process_command(_type, None, val))
                except Exception as e:
                    async_log_util.exception(logger_local_huaxin_trade_debug, e)
                    logging.exception(e)
@@ -141,10 +142,10 @@
    # 维护连接数的稳定
    def run(self, blocking=True):
        if blocking:
            self.run_process_command(self.pipe_strategy)
            self.run_process_command(self.queue_strategy_trade)
        else:
            # 接受命令
            t1 = threading.Thread(target=lambda: self.run_process_command(self.pipe_strategy), daemon=True)
            t1 = threading.Thread(target=lambda: self.run_process_command(self.queue_strategy_trade), daemon=True)
            t1.start()
huaxin_client/trade_client.py
@@ -152,11 +152,12 @@
        其它字段置空
        '''
        # 给L2发送消息
        if l2pipe is not None:
            l2pipe.send(json.dumps({"type": "listen_volume", "data": {"code": code, "volume": count}}).encode('utf-8'))
        ret = api.ReqOrderInsert(req_field, self.req_id)
        if ret != 0:
            raise Exception('ReqOrderInsert fail, ret[%d]' % ret)
        if l2pipe is not None:
            l2pipe.send(json.dumps({"type": "listen_volume", "data": {"code": code, "volume": count}}).encode('utf-8'))
        async_log_util.info(logger_trade, f"{code}华鑫本地真实下单结束")
        return
@@ -630,7 +631,8 @@
                              "limitPrice": pOrderField.LimitPrice, "accountID": pOrderField.AccountID,
                              "orderRef": pOrderField.OrderRef, "turnover": pOrderField.Turnover,
                              "volume": pOrderField.VolumeTotalOriginal, "volumeTraded": pOrderField.VolumeTraded,
                              "orderStatus": pOrderField.OrderStatus, "orderSubmitStatus": pOrderField.OrderSubmitStatus,
                              "orderStatus": pOrderField.OrderStatus,
                              "orderSubmitStatus": pOrderField.OrderSubmitStatus,
                              "statusMsg": pOrderField.StatusMsg}
                self.call_back_thread_pool.submit(self.__data_callback, TYPE_ORDER, 0, order_data)
        except Exception as e:
@@ -1047,7 +1049,8 @@
        else:
            async_log_util.info(logger_local_huaxin_trade_debug, "非API回调 req_id-{}", req_id)
            if trade_response:
                trade_response.OnTradeCallback({"type": "trade_callback", "data": {"code": 0, "data": data, "type": type}})
                trade_response.OnTradeCallback(
                    {"type": "trade_callback", "data": {"code": 0, "data": data, "type": type}})
            # # 非API回调
            else:
                send_response(
@@ -1066,27 +1069,7 @@
addr, port = constant.SERVER_IP, constant.SERVER_PORT
def process_cmd(tradeRequest: TradeRequest):
    tradeCommandManager.process_command(tradeRequest.type_, None, tradeRequest.data)
def __test():
    # 测试撤单
    for i in range(0, 10):
        code = "600190"
        orderSysID = "0190000229"
        sinfo = f"test_cancel_{i}"
        data = {"type": "trade", "trade_type": 2,
                "direction": 0,
                "code": code,
                "localOrderID": "",
                "orderSysID": orderSysID, "sinfo": sinfo}
        process_cmd(TradeRequest("trade", {"type": "trade", "data": data, "request_id": f"test-{i}"}, f"test-{i}"))
        time.sleep(2)
def run(trade_response_: TradeResponse=None, pipe_l2=None, pipe_strategy=None):
def run(trade_response_: TradeResponse = None, pipe_l2=None, pipe_strategy=None, queue_strategy_trade=None):
    try:
        logger_system.info("交易进程ID:{}", os.getpid())
        logger_system.info(f"trade 线程ID:{tool.get_thread_id()}")
@@ -1101,11 +1084,11 @@
        trade_response = trade_response_
        # 运行日志同步
        threading.Thread( target=lambda:async_log_util.run_sync(),daemon=True).start()
        threading.Thread(target=lambda: async_log_util.run_sync(), daemon=True).start()
        global tradeCommandManager
        tradeCommandManager = command_manager.TradeCommandManager()
        tradeCommandManager.init(MyTradeActionCallback(), l2pipe, pipe_strategy)
        tradeCommandManager.init(MyTradeActionCallback(), l2pipe, pipe_strategy, queue_strategy_trade)
        logger_system.info("华鑫交易服务启动")
        tradeCommandManager.run()
    except Exception as e:
l2/cancel_buy_strategy.py
@@ -383,7 +383,6 @@
    __db = 0
    __redis_manager = redis_manager.RedisManager(0)
    __tradeBuyQueue = TradeBuyQueue()
    __buyL2SafeCountManager = BuyL2SafeCountManager()
    __hCancelParamsManager = l2_trade_factor.HCancelParamsManager()
    __SecondCancelBigNumComputer = SecondCancelBigNumComputer()
@@ -993,7 +992,7 @@
    # 开始撤单
    def start_cancel(self, code, buy_no, total_datas, buy_order_no_map, local_operate_map, m_val_num):
        thresh_num = int(m_val_num * 1.8)
        thresh_num = int(m_val_num * 1)
        place_order_index = self.__SecondCancelBigNumComputer.get_real_place_order_index_cache(code)
        if place_order_index is None:
            raise Exception("未获取到下单真实位置")
l2/l2_data_manager_new.py
@@ -210,7 +210,6 @@
    __codeActualPriceProcessor = CodeActualPriceProcessor()
    __ths_l2_trade_queue_manager = trade_queue_manager.thsl2tradequeuemanager()
    __thsBuy1VolumnManager = trade_queue_manager.THSBuy1VolumnManager()
    __buyL2SafeCountManager = safe_count_manager.BuyL2SafeCountManager()
    __l2PlaceOrderParamsManagerDict = {}
    __last_buy_single_dict = {}
    __TradeBuyQueue = transaction_progress.TradeBuyQueue()
@@ -942,9 +941,6 @@
        _start_time = tool.get_now_timestamp()
        total_datas = local_today_datas[code]
        # 处理安全笔数
        # cls.__buyL2SafeCountManager.compute_left_rate(code, compute_start_index, compute_end_index, total_datas,
        #                                               local_today_num_operate_map.get(code))
        # 获取买入信号计算起始位置
        buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set, buy_volume_rate = cls.__get_order_begin_pos(
main.py
@@ -23,7 +23,7 @@
# from huaxin_api import trade_client, l2_client, l1_client
def createTradeServer(pipe_server, pipe_trade, pipe_l1, pipe_l2, ptl2_l2, psl2_l2, ptl2_trade, pst_trade):
def createTradeServer(pipe_server, pipe_trade, pipe_l1, pipe_l2, ptl2_l2, psl2_l2, queue_strategy_trade):
    logger_system.info("策略进程ID:{}", os.getpid())
    log.close_print()
    # 初始化参数
@@ -49,7 +49,7 @@
    t1.start()
    #
    # 启动华鑫交易服务
    trade_server.run(pipe_trade, pipe_l1, pipe_l2, huaxin_client.trade_client.process_cmd)
    trade_server.run(pipe_trade, pipe_l1, pipe_l2, queue_strategy_trade)
# 主服务
@@ -93,6 +93,8 @@
        # l1与策略间的通信
        pl1t_l1, pl1t_strategy = multiprocessing.Pipe()
        queue_strategy_trade = multiprocessing.Queue()
        # 托管环境下不创建
        # serverProcess = multiprocessing.Process(target=createServer, args=(pss_server,))
        # serverProcess.start()
@@ -104,12 +106,11 @@
        # 交易进程
        tradeProcess = multiprocessing.Process(
            target=lambda: huaxin_client.trade_client.run(None, ptl2_trade, pst_trade))
            target=lambda: huaxin_client.trade_client.run(None, ptl2_trade, pst_trade, queue_strategy_trade))
        tradeProcess.start()
        # 主进程
        createTradeServer(pss_strategy, pst_strategy, pl1t_strategy, psl2_strategy, ptl2_l2, psl2_l2, ptl2_trade,
                          pst_trade)
        createTradeServer(pss_strategy, pst_strategy, pl1t_strategy, psl2_strategy, ptl2_l2, psl2_l2, queue_strategy_trade)
        # 将tradeServer作为主进程
        l1Process.join()
test/l2_trade_test.py
@@ -17,10 +17,9 @@
from utils import tool
from db import redis_manager_delegate as redis_manager
from l2 import l2_log, l2_data_manager, transaction_progress
from l2.safe_count_manager import BuyL2SafeCountManager
from l2.transaction_progress import TradeBuyQueue
from third_data import kpl_util, kpl_data_manager
from third_data.code_plate_key_manager import RealTimeKplMarketData, LimitUpCodesPlateKeyManager
from third_data.code_plate_key_manager import  LimitUpCodesPlateKeyManager
from third_data.kpl_data_manager import KPLDataManager
from trade import trade_data_manager, current_price_process_manager, l2_trade_util
from trade.trade_queue_manager import THSBuy1VolumnManager
@@ -46,7 +45,6 @@
            continue
        RedisUtils.delete(redis_info, k, auto_free=False)
    RedisUtils.realse(redis_info)
    BuyL2SafeCountManager().clear_data(code)
    transaction_progress.TradeBuyQueue().set_traded_index(code, 0)
    l2_trade_util.remove_from_forbidden_trade_codes(code)
@@ -121,7 +119,6 @@
        l2.l2_data_util.local_today_num_operate_map[code].clear()
        print("id:", id(l2.l2_data_util.local_today_datas))
        # safe_count_manager.BuyL2SafeCountManager.get_safe_count = mock.Mock(return_value=16)
        # l2_trade_factor.L2TradeFactorUtil.compute_m_value = mock.Mock(return_value=(14699952, ""))
        # pos_list.insert(41,(225,306))
        # pos_list.insert(63, (345, 423))
trade/huaxin/huaxin_trade_api.py
@@ -64,12 +64,11 @@
# 设置交易通信队列
def run_pipe_trade(pipe_trade_, trade_cmd_callback_):
def run_pipe_trade(pipe_trade_, queue_strategy_trade_):
    global pipe_trade
    pipe_trade = pipe_trade_
    global trade_cmd_callback
    trade_cmd_callback = trade_cmd_callback_
    global queue_strategy_trade
    queue_strategy_trade = queue_strategy_trade_
    t1 = threading.Thread(target=lambda: __run_recv_pipe_trade(), daemon=True)
    t1.start()
@@ -251,15 +250,12 @@
                     "data": data,
                     "request_id": request_id}
        root_data = socket_util.encryp_client_params_sign(root_data)
        if not is_pipe:
            trade_cmd_callback(TradeRequest(_type, root_data, request_id))
        else:
            start_time = time.time()
            pipe_trade.send(json.dumps(root_data).encode("utf-8"))
            use_time = int((time.time() - start_time)*1000)
            if use_time > 10:
                async_log_util.info(hx_logger_trade_loop, f"发送耗时:request_id-{request_id} 耗时时间:{use_time}")
        start_time = time.time()
        queue_strategy_trade.put_nowait(root_data)
        # pipe_trade.send(json.dumps(root_data).encode("utf-8"))
        use_time = int((time.time() - start_time)*1000)
        if use_time > 10:
            async_log_util.info(hx_logger_trade_loop, f"发送耗时:request_id-{request_id} 耗时时间:{use_time}")
        if log_enable:
            async_log_util.info(hx_logger_trade_loop, "请求发送成功:request_id-{}", request_id)
    except BrokenPipeError as e:
trade/huaxin/trade_server.py
@@ -6,6 +6,7 @@
import json
import logging
import mmap
import multiprocessing
import queue
import random
import socket
@@ -828,7 +829,7 @@
my_trade_response = MyTradeResponse()
def run(pipe_trade, pipe_l1, pipe_l2, trade_cmd_callback):
def run(pipe_trade, pipe_l1, pipe_l2, queue_strategy_trade: multiprocessing.Queue):
    logger_system.info(f"trade_server 线程ID:{tool.get_thread_id()}")
    try:
        # 执行一些初始化数据
@@ -842,7 +843,7 @@
        manager.run(blocking=False)
        # 启动交易服务
        huaxin_trade_api.run_pipe_trade(pipe_trade, trade_cmd_callback)
        huaxin_trade_api.run_pipe_trade(pipe_trade, queue_strategy_trade)
        # 监听l1那边传过来的代码
        t1 = threading.Thread(target=lambda: __recv_pipe_l1(pipe_l1), daemon=True)
trade/l2_trade_factor.py
@@ -129,7 +129,6 @@
        self.score_index = score_index
    # 获取信号连续买笔数
    def get_begin_continue_buy_count(self):
        counts = [3, 3, 3, 2, 2, 2, 2]
        volume_rate_index = self.volume_rate_index
trade/trade_manager.py
@@ -518,7 +518,9 @@
        __cancel_success(code)
        # 再次撤单
        if constant.TRADE_WAY == constant.TRADE_WAY_HUAXIN:
            async_log_util.info(logger_trade, "{} trade_manager.start_cancel_buy 再次撤单开始".format(code))
            __cancel_order_thread_pool.submit(lambda: trade_huaxin.cancel_order(code, msg="再次撤单"))
            async_log_util.info(logger_trade, "{} trade_manager.start_cancel_buy 再次撤单结束".format(code))
        # 不需要再次撤单了
        # try:
        #     cancel_buy_again(code)
trade/trade_result_manager.py
@@ -2,15 +2,13 @@
import logging
from l2 import l2_data_manager
from l2.cancel_buy_strategy import HourCancelBigNumComputer, SecondCancelBigNumComputer , \
from l2.cancel_buy_strategy import HourCancelBigNumComputer, SecondCancelBigNumComputer, \
    LCancelBigNumComputer, DCancelBigNumComputer
from l2.l2_data_util import local_today_datas, local_today_num_operate_map
from l2.safe_count_manager import BuyL2SafeCountManager
from log_module.log import logger_l2_error
from trade.trade_queue_manager import THSBuy1VolumnManager
__thsBuy1VolumnManager = THSBuy1VolumnManager()
__buyL2SafeCountManager = BuyL2SafeCountManager()
def virtual_buy_success(code):
@@ -26,9 +24,6 @@
def virtual_cancel_success(code, buy_single_index, buy_exec_index, total_datas):
    l2_data_manager.TradePointManager().delete_buy_point(code)
    l2_data_manager.TradePointManager().delete_buy_cancel_point(code)
    # 安全笔数计算
    __buyL2SafeCountManager.save_place_order_info(code, buy_single_index, buy_exec_index,
                                                  total_datas[-1]["index"])
    SecondCancelBigNumComputer().cancel_success(code)
    DCancelBigNumComputer().cancel_success(code)
    LCancelBigNumComputer().cancel_success(code)
@@ -37,20 +32,10 @@
# 真实买成功
def real_buy_success(code, tradePointManager):
    # @dask.delayed
    def clear_max_buy1_volume(code):
        # 下单成功,需要删除最大买1
        __thsBuy1VolumnManager.clear_max_buy1_volume(code)
    # @dask.delayed
    def safe_count(code, buy_single_index, buy_exec_index):
        try:
            __buyL2SafeCountManager.save_place_order_info(code, buy_single_index, buy_exec_index, None)
        except Exception as e:
            logging.exception(e)
            logger_l2_error.exception(e)
    # @dask.delayed
    def h_cancel(code, buy_single_index, buy_exec_index):
        try:
            HourCancelBigNumComputer().place_order_success(code, buy_single_index, buy_exec_index,
@@ -60,7 +45,6 @@
            logging.exception(e)
            logger_l2_error.exception(e)
    # @dask.delayed
    def l_cancel(code):
        try:
            LCancelBigNumComputer().place_order_success(code)
@@ -79,7 +63,6 @@
        code)
    clear_max_buy1_volume(code)
    safe_count(code, buy_single_index, buy_exec_index)
    s_cancel(code)
    # H撤暂时不生效
    h_cancel(code, buy_single_index, buy_exec_index)
@@ -89,9 +72,6 @@
# 真实撤成功
def real_cancel_success(code, buy_single_index, buy_exec_index, total_datas):
    # 安全笔数计算
    __buyL2SafeCountManager.save_place_order_info(code, buy_single_index, buy_exec_index,
                                                  total_datas[-1]["index"])
    # 取消买入标识
    l2_data_manager.TradePointManager().delete_buy_point(code)
    l2_data_manager.TradePointManager().delete_buy_cancel_point(code)