Administrator
2023-08-30 dacb3898100a1b982d0538f013f9318cd5e9a51c
L撤阻断机制
4个文件已修改
89 ■■■■ 已修改文件
huaxin_client/l2_client.py 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/cancel_buy_strategy.py 62 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/trade_server.py 16 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/trade_result_manager.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_client.py
@@ -322,10 +322,11 @@
                        "SubSeq": pTransaction['SubSeq'], "BuyNo": pTransaction['BuyNo'],
                        "SellNo": pTransaction['SellNo'],
                        "ExecType": pTransaction['ExecType'].decode()}
                key = f"{item['SecurityID']}_{item['TradePrice']}_{item['BuyNo']}"
                if self.__last_transaction_keys_dict.get(code) == key:
                    return
                self.__last_transaction_keys_dict[code] = key
                # 暂时注释掉同1单号至多上传1次
                # key = f"{item['SecurityID']}_{item['TradePrice']}_{item['BuyNo']}"
                # if self.__last_transaction_keys_dict.get(code) == key:
                #     return
                # self.__last_transaction_keys_dict[code] = key
                # print("逐笔成交", item)
                l2_data_manager.add_transaction_detail(item)
l2/cancel_buy_strategy.py
@@ -10,10 +10,11 @@
import time
import constant
from code_attribute import big_money_num_manager, gpcode_manager
from code_attribute import big_money_num_manager, gpcode_manager, code_volumn_manager
import l2_data_util
from db import redis_manager_delegate as redis_manager
from db.redis_manager_delegate import RedisUtils
from trade.l2_trade_factor import L2PlaceOrderParamsManager
from utils import tool
from l2.safe_count_manager import BuyL2SafeCountManager
from l2.transaction_progress import TradeBuyQueue
@@ -998,6 +999,7 @@
    __redis_manager = redis_manager.RedisManager(0)
    __last_trade_progress_dict = {}
    __cancel_watch_index_cache = {}
    __SecondCancelBigNumComputer = SecondCancelBigNumComputer()
    __instance = None
@@ -1057,6 +1059,8 @@
    def clear(self, code=None):
        if code:
            self.del_watch_index(code)
            if code in self.__last_trade_progress_dict:
                self.__last_trade_progress_dict.pop(code)
        else:
            keys = RedisUtils.keys(self.__get_redis(), f"l_cancel_watch_index-*")
            for k in keys:
@@ -1064,13 +1068,13 @@
                self.del_watch_index(code)
    # 设置成交位置,成交位置变化之后相应的监听数据也会发生变化
    def set_trade_progress(self, code, index, total_data):
        old_watch_indexes = self.__get_watch_indexes_cache(code)
        if self.__last_trade_progress_dict.get(code) == index and len(
                old_watch_indexes) >= constant.L_CANCEL_MAX_WATCH_COUNT:
            # 成交进度尚未发生变化且已经监听到了足够的数据
            return
        self.__last_trade_progress_dict[code] = index
        watch_indexes = set()
        # 小金额
@@ -1115,13 +1119,8 @@
        self.__add_watch_indexes(code, add_indexes)
        self.__del_watch_indexes(code, delete_indexes)
    def need_cancel(self, code, buy_exec_index, start_index, end_index, total_data, local_today_num_operate_map,
                    is_first_code):
        time_space = tool.trade_time_sub(total_data[start_index]["val"]["time"],
                                         total_data[buy_exec_index]["val"]["time"])
        # 守护S撤以外的数据
        if time_space <= constant.L_CANCEL_START_TIME or int(tool.get_now_time_str().replace(":", "")) > int("145000"):
            return False, None
    def __compute_need_cancel(self, code, buy_exec_index, start_index, end_index, total_data,
                              local_today_num_operate_map, is_first_code):
        watch_indexes = self.__get_watch_indexes_cache(code)
        if not watch_indexes:
            return False, None
@@ -1158,6 +1157,51 @@
        return False, None
    def need_cancel(self, code, buy_exec_index, start_index, end_index, total_data, local_today_num_operate_map,
                    is_first_code):
        time_space = tool.trade_time_sub(total_data[start_index]["val"]["time"],
                                         total_data[buy_exec_index]["val"]["time"])
        # 守护S撤以外的数据
        if time_space <= constant.L_CANCEL_START_TIME or int(tool.get_now_time_str().replace(":", "")) > int("145000"):
            return False, None
        can_cancel, cancel_data = self.__compute_need_cancel(code, buy_exec_index, start_index, end_index, total_data,
                                                             local_today_num_operate_map, is_first_code)
        if can_cancel:
            # 判断成交进度位置到当前位置的净买入
            try:
                place_order_index = self.__SecondCancelBigNumComputer.get_real_place_order_index_cache(code)
                if place_order_index is None:
                    raise Exception("未获取到下单真实位置")
                # 获取到真实成交位置
                transaction_index = self.__last_trade_progress_dict.get(code)
                if transaction_index is None:
                    raise Exception("尚未获取到真实成交位置")
                # 获取m值
                volume_rate = code_volumn_manager.get_volume_rate(code)
                volume_rate_index = code_volumn_manager.get_volume_rate_index(volume_rate)
                m_val = L2PlaceOrderParamsManager(code, True, volume_rate, volume_rate_index, None).get_m_val()[0]
                limit_up_price = gpcode_manager.get_limit_up_price(code)
                m_val_num = int(m_val / (float(limit_up_price) * 100))
                threshold_num = int(m_val_num * 1.2)
                buy_nums = 0
                for index in range(transaction_index + 1, place_order_index):
                    data = total_data[index]
                    if L2DataUtil.is_limit_up_price_buy(data["val"]):
                        # 获取是否在买入执行信号周围2s
                        left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count(code,
                                                                                                              index,
                                                                                                              total_data,
                                                                                                              local_today_num_operate_map)
                        if left_count > 0:
                            buy_nums += left_count * data["val"]["num"]
                            if buy_nums > threshold_num:
                                return False, "LX阻断L撤撤单"
                return can_cancel, cancel_data
            except Exception as e:
                l2_log.l_cancel_debug(code, f"LX撤单计算异常:{str(e)}")
                return can_cancel, cancel_data
        return can_cancel, cancel_data
    def place_order_success(self, code):
        self.clear(code)
trade/huaxin/trade_server.py
@@ -249,7 +249,7 @@
                            data = data_json["data"]
                            code = data["code"]
                            order_no = data["data"]
                            TradeServerProcessor.trading_order_canceled(code,order_no)
                            TradeServerProcessor.trading_order_canceled(code, order_no)
                        finally:
                            sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8')))
@@ -407,12 +407,12 @@
        try:
            need_cancel, msg = cls.__GCancelBigNumComputer.start_cancel(code, f"{order_no}",
                                                                         l2_data_util.local_today_datas.get(
                                                                             code),
                                                                         l2_data_util.local_today_buyno_map.get(
                                                                             code),
                                                                         l2_data_util.local_today_num_operate_map.get(
                                                                             code), m_val_num)
                                                                        l2_data_util.local_today_datas.get(
                                                                            code),
                                                                        l2_data_util.local_today_buyno_map.get(
                                                                            code),
                                                                        l2_data_util.local_today_num_operate_map.get(
                                                                            code), m_val_num)
            if need_cancel:
                # 需要撤单
                l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, "G撤撤单", "G撤")
@@ -421,6 +421,7 @@
                logger_l2_g_cancel.info(f"{code}-不需要撤单:{msg}")
        except Exception as e:
            logger_l2_g_cancel.error(f"{code}-撤单异常:{str(e)}")
def clear_invalid_client():
    while True:
@@ -771,7 +772,6 @@
    def OnTradingOrderCancel(self, code, buy_no):
        TradeServerProcessor.trading_order_canceled(code, buy_no)
class MyTradeResponse(TradeResponse):
trade/trade_result_manager.py
@@ -63,7 +63,7 @@
    # @dask.delayed
    def l_cancel(code):
        try:
            LCancelBigNumComputer().del_watch_index(code)
            LCancelBigNumComputer().place_order_success(code)
        except Exception as e:
            logging.exception(e)
            logger_l2_error.exception(e)