Administrator
2024-05-09 77fea830c2e11f94c364a37b8bda792a51d11eec
J撤
7个文件已修改
302 ■■■■■ 已修改文件
constant.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/cancel_buy_strategy.py 227 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_manager_new.py 19 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_log.py 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
log_module/log.py 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/test_cancel.py 35 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/trade_result_manager.py 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
constant.py
@@ -86,6 +86,10 @@
H_CANCEL_MIN_BIG_NUM_COUNT = 3
H_CANCEL_START_TIME = 900
# J撤单
J_CANCEL_RATE = 0.5
J_CANCEL_RATE_WITH_MUST_BUY = 0.9
# L2监控的最低金额
L2_MIN_MONEY = 500000
# 每个L2设备的代码数量
l2/cancel_buy_strategy.py
@@ -39,6 +39,58 @@
    HourCancelBigNumComputer().set_real_place_order_index(code, index, buy_single_index)
    NewGCancelBigNumComputer().set_real_place_order_index(code, index, buy_single_index, is_default)
    FCancelBigNumComputer().set_real_order_index(code, index, is_default)
    JCancelBigNumComputer().set_real_place_order_index(code, index, buy_single_index, is_default)
class BaseCancel:
    def set_real_place_order_index(self, code, index, buy_single_index, is_default):
        pass
        # 撤单成功
    def cancel_success(self, code):
        pass
        # 下单成功
    def place_order_success(self, code):
        pass
class L2DataComputeUtil:
    """
    L2数据计算帮助类
    """
    @classmethod
    def compute_left_buy_order(cls, code, start_index, end_index, limit_up_price, min_money=500000):
        """
        计算剩下的委托买单
        @param code: 代码
        @param start_index:起始索引(包含)
        @param end_index: 结束索引(包含)
        @param limit_up_price: 涨停价
        @param min_money: 最小的资金
        @return:笔数,手数
        """
        min_volume = min_money // int(limit_up_price * 100)
        total_datas = local_today_datas.get(code)
        canceled_buyno_map = local_today_canceled_buyno_map.get(code)
        total_count = 0
        total_num = 0
        for i in range(start_index, end_index + 1):
            data = total_datas[i]
            val = data['val']
            if not L2DataUtil.is_limit_up_price_buy(val):
                continue
            if val['num'] < min_volume:
                continue
            left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code, i,
                                                                                                     total_datas,
                                                                                                     canceled_buyno_map)
            if left_count > 0:
                total_count += 1
                total_num += val["num"]
        return total_count, total_num
class SCancelBigNumComputer:
@@ -843,6 +895,9 @@
    __instance = None
    # 下单位之后的封单中的大单
    __follow_big_order_cache = {}
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(DCancelBigNumComputer, cls).__new__(cls, *args, **kwargs)
@@ -881,6 +936,18 @@
                    # 量低于
                    return True, r.id_
        return False, None
    def compute_d_cancel_watch_index(self, code):
        # 计算D撤囊括范围
        # real_place_order_index = self.__SCancelBigNumComputer.get_real_place_order_index_cache(code)
        pass
    def __clear_data(self, code):
        if code in self.__follow_big_order_cache:
            self.__follow_big_order_cache.pop(code)
    def place_order_success(self, code, buy_single_index, buy_exec_index):
        self.__clear_data(code)
# ---------------------------------L撤-------------------------------
@@ -1252,7 +1319,8 @@
    def set_real_place_order_index(self, code, index, buy_single_index=None, is_default=False):
        l2_log.l_cancel_debug(code, f"设置真实下单位-{index},buy_single_index-{buy_single_index}")
        self.__real_place_order_index_dict[code] = (index, is_default)
        RedisUtils.setex_async(self.__db, f"l_cancel_real_place_order_index-{code}", tool.get_expire(), json.dumps((index, is_default)))
        RedisUtils.setex_async(self.__db, f"l_cancel_real_place_order_index-{code}", tool.get_expire(),
                               json.dumps((index, is_default)))
        if buy_single_index is not None:
            if code in self.__last_trade_progress_dict:
                # L撤从成交进度位开始计算
@@ -1409,7 +1477,7 @@
        if real_place_order_info and real_place_order_info[0] > index:
            total_datas = local_today_datas.get(code)
            min_num = int(5000 / (float(gpcode_manager.get_limit_up_price(code))))
            for j in range(real_place_order_info[0]-1, index , -1):
            for j in range(real_place_order_info[0] - 1, index, -1):
                data = total_datas[j]
                val = data['val']
                if data["index"] in watch_indexes:
@@ -1773,7 +1841,9 @@
        if tool.trade_time_sub(tool.get_now_time_str(), total_datas[real_order_index]['val']['time']) > 1 * 60:
            return False, "下单超过60s"
        THRESHOLD_MONEY_W, THRESHOLD_COUNT = self.__get_fast_deal_threshold_value(code, total_datas[real_order_index]['val']['time'])
        THRESHOLD_MONEY_W, THRESHOLD_COUNT = self.__get_fast_deal_threshold_value(code,
                                                                                  total_datas[real_order_index]['val'][
                                                                                      'time'])
        total_left_count = 0
        total_left_num = 0
@@ -3058,5 +3128,156 @@
    pass
class JCancelBigNumComputer(BaseCancel):
    """
    J撤:
    1000ms内若有三笔,我们后面的涨停撤买L ,
    此时算一次信号,即开始计算我们后面的未撤的所有涨停总额,
    当此总涨停额下降至50%则撤单。
    更新屏幕,3分钟以后有新的信号,
    则重新计算总额。如果3分钟后没有新的信号,
    则沿用上一次计算的后涨停额。
    守护时间14点50分00秒,
    此撤关于我们后面的不想顶而撤,
    导致封单额陡然降低
    """
    __cancel_single_cache = {}
    __real_place_order_index_info_dict = {}
    __instance = None
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(JCancelBigNumComputer, cls).__new__(cls, *args, **kwargs)
        return cls.__instance
    def set_real_place_order_index(self, code, index, buy_single_index, is_default):
        self.__real_place_order_index_info_dict[code] = (index, is_default)
    def __compute_cancel_single(self, code, start_index, end_index):
        """
        计算撤单信号:1000ms内有3笔撤单
        @param code:
        @param start_index: 开始索引
        @param end_index: 结束索引
        @return:
        """
        # 获取真实下单位置
        real_place_order_index_info = self.__real_place_order_index_info_dict.get(code)
        if not real_place_order_index_info or real_place_order_index_info[1]:
            # 尚未获取到真实下单位置
            return
        real_place_order_index = real_place_order_index_info[0]
        # 获取撤单信号[时间,真实下单位置, 信号总手数,目前手数,最新计算的索引]
        cancel_single_info = self.__cancel_single_cache.get(code)
        outoftime = False
        total_datas = local_today_datas.get(code)
        if cancel_single_info:
            outoftime = tool.trade_time_sub(total_datas[-1]['val']['time'], cancel_single_info[0]) > 180
            if not outoftime:
                # 上次计算还未超时
                return
        limit_up_price = round(float(gpcode_manager.get_limit_up_price(code)), 2)
        if cancel_single_info and outoftime:
            # 更新需要计算信号
            min_volume = 50 * 10000 // int(limit_up_price * 100)
            # 计算本批数据是否有撤单
            single_info = None  # (index, 时间ms)
            for i in range(end_index, start_index - 1, -1):
                data = total_datas[i]
                val = data['val']
                if not L2DataUtil.is_limit_up_price_buy_cancel(val):
                    continue
                if val['num'] < min_volume:
                    continue
                single_info = (i, L2DataUtil.get_time_with_ms(val))
                break
            if not single_info:
                # "无涨停撤单"
                return
            indexes = [single_info[0]]  # 包含信号笔数
            for i in range(single_info[0] - 1, real_place_order_index, -1):
                data = total_datas[i]
                val = data['val']
                if not L2DataUtil.is_limit_up_price_buy_cancel(val):
                    continue
                if val['num'] < min_volume:
                    continue
                time_ms = L2DataUtil.get_time_with_ms(val)
                if tool.trade_time_sub_with_ms(single_info[1], time_ms) > 1000:
                    break
                indexes.append(i)
                if len(indexes) >= 3:
                    break
            if len(indexes) < 3:
                # 不满足更新条件
                return
        total_count, total_num = L2DataComputeUtil.compute_left_buy_order(code, real_place_order_index + 1,
                                                                          total_datas[-1]['index'], limit_up_price)
        self.__cancel_single_cache[code] = [total_datas[-1]['val']['time'], real_place_order_index, total_num,
                                            total_num,
                                            total_datas[-1]['index']]
        l2_log.j_cancel_debug(code, f"触发囊括:{self.__cancel_single_cache[code]}")
    def need_cancel(self, code, start_index, end_index):
        # 需要先计算
        self.__compute_cancel_single(code, start_index, end_index)
        # [时间, 真实下单位置, 信号总手数, 目前手数, 最新计算的索引]
        cancel_single_info = self.__cancel_single_cache.get(code)
        if not cancel_single_info:
            return False, None, "没有监听"
        # 计算剩余数量
        total_datas = local_today_datas.get(code)
        buyno_map = local_today_buyno_map.get(code)
        limit_up_price = round(float(gpcode_manager.get_limit_up_price(code)), 2)
        min_volume = 50 * 10000 // int(limit_up_price * 100)
        # 计算纯买额
        for i in range(start_index, end_index + 1):
            data = total_datas[i]
            val = data['val']
            if data['index'] <= cancel_single_info[4]:
                continue
            if val['num'] < min_volume:
                continue
            if L2DataUtil.is_limit_up_price_buy_cancel(val):
                orderNo = val['orderNo']
                buy_data = buyno_map.get(f"{orderNo}")
                if buy_data and buy_data['index'] > cancel_single_info[1]:
                    cancel_single_info[3] -= val['num']
            elif L2DataUtil.is_limit_up_price_buy(val):
                cancel_single_info[3] += val['num']
            else:
                continue
        cancel_single_info[4] = end_index
        # self.__cancel_single_cache[code] = cancel_single_info
        threshold_rate = constant.J_CANCEL_RATE
        if gpcode_manager.MustBuyCodesManager().is_in_cache(code):
            threshold_rate = constant.J_CANCEL_RATE_WITH_MUST_BUY
        cancel_num = cancel_single_info[2] - cancel_single_info[3]
        rate = round(cancel_num / cancel_single_info[2], 2)
        if rate >= threshold_rate:
            return True, total_datas[
                end_index], f"撤单比例达到:{rate}/{threshold_rate} 剩余手数:{cancel_single_info[3]}/{cancel_single_info[2]}"
        return False, None, f"尚未达到撤单比例{rate} , {cancel_num}/{cancel_single_info[2]}"
    def __clear_data(self, code):
        if code in self.__cancel_single_cache:
            self.__cancel_single_cache.pop(code)
        if code in self.__real_place_order_index_info_dict:
            self.__real_place_order_index_info_dict.pop(code)
    def cancel_success(self, code):
        self.__clear_data(code)
    def place_order_success(self, code):
        self.__clear_data(code)
if __name__ == "__main__":
    pass
l2/l2_data_manager_new.py
@@ -22,7 +22,8 @@
from l2 import l2_data_manager, l2_log, l2_data_source_util, code_price_manager, \
    transaction_progress, cancel_buy_strategy, place_order_single_data_manager
from l2.cancel_buy_strategy import SCancelBigNumComputer, HourCancelBigNumComputer, DCancelBigNumComputer, \
    LCancelBigNumComputer, LatestCancelIndexManager, LCancelRateManager, GCancelBigNumComputer, NewGCancelBigNumComputer
    LCancelBigNumComputer, LatestCancelIndexManager, LCancelRateManager, GCancelBigNumComputer, \
    NewGCancelBigNumComputer, JCancelBigNumComputer
from l2.l2_data_manager import L2DataException, OrderBeginPosInfo
from l2.l2_data_util import local_today_datas, L2DataUtil, local_today_num_operate_map, local_today_buyno_map, \
    local_latest_datas, local_today_canceled_buyno_map, local_today_sellno_map
@@ -602,6 +603,18 @@
                                     f"B撤出错 参数:buy_single_index-{_buy_single_index} buy_exec_index-{_buy_exec_index} 错误原因:{str(e)}")
            return None, ""
        # J撤
        def j_cancel(_buy_single_index, _buy_exec_index):
            try:
                b_need_cancel, b_cancel_data, extra_msg = JCancelBigNumComputer().need_cancel(code, start_index, end_index)
                if b_need_cancel and b_cancel_data:
                    return b_cancel_data, f"J撤({extra_msg})"
            except Exception as e:
                async_log_util.error(logger_l2_error,
                                     f"J撤出错 参数:buy_single_index-{_buy_single_index} buy_exec_index-{_buy_exec_index} 错误原因:{str(e)}")
            return None, ""
        if start_index < 0:
            start_index = 0
@@ -626,6 +639,10 @@
            cancel_data, cancel_msg = b_cancel(order_begin_pos.buy_single_index, order_begin_pos.buy_exec_index)
        if not cancel_data:
            cancel_data, cancel_msg = h_cancel(order_begin_pos.buy_single_index, order_begin_pos.buy_exec_index)
        # J撤
        if not cancel_data:
            cancel_data, cancel_msg = j_cancel(order_begin_pos.buy_single_index, order_begin_pos.buy_exec_index)
        if cancel_data and not DCancelBigNumComputer().has_auto_cancel_rules(code):
            l2_log.debug(code, "触发撤单,撤单位置:{} ,撤单原因:{}", cancel_data["index"], cancel_msg)
            # 撤单
l2/l2_log.py
@@ -4,7 +4,7 @@
from log_module import async_log_util
from log_module.log import logger_l2_trade_cancel, logger_l2_trade_buy, logger_trade_record, logger_l2_trade, \
    logger_l2_s_cancel, logger_l2_h_cancel, logger_l2_l_cancel, logger_l2_error, logger_l2_d_cancel, logger_l2_f_cancel, \
    logger_l2_g_cancel
    logger_l2_g_cancel, logger_l2_j_cancel
# 日志队列分配管理器
@@ -117,6 +117,10 @@
    __add_async_log(logger_l2_g_cancel, code, content, *args)
def j_cancel_debug(code, content, *args):
    __add_async_log(logger_l2_j_cancel, code, content, *args)
# 交易记录
def trade_record(code, type, content, *args):
    if len(args) > 0:
log_module/log.py
@@ -76,6 +76,10 @@
                   filter=lambda record: record["extra"].get("name") == "f_cancel",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("l2", "cancel/j_cancel"),
                   filter=lambda record: record["extra"].get("name") == "j_cancel",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("l2", "l2_trade_buy"),
                   filter=lambda record: record["extra"].get("name") == "l2_trade_buy",
                   rotation="00:00", compression="zip", enqueue=True)
@@ -352,6 +356,7 @@
logger_l2_f_cancel = __mylogger.get_logger("f_cancel")
logger_l2_l_cancel = __mylogger.get_logger("l_cancel")
logger_l2_g_cancel = __mylogger.get_logger("g_cancel")
logger_l2_j_cancel = __mylogger.get_logger("j_cancel")
logger_l2_trade_buy = __mylogger.get_logger("l2_trade_buy")
logger_l2_trade_queue = __mylogger.get_logger("l2_trade_queue")
logger_l2_trade_buy_queue = __mylogger.get_logger("l2_trade_buy_queue")
test/test_cancel.py
@@ -1,11 +1,42 @@
from copy import deepcopy
from db.redis_manager_delegate import RedisUtils
from l2.cancel_buy_strategy import NewGCancelBigNumComputer
from l2.cancel_buy_strategy import NewGCancelBigNumComputer, JCancelBigNumComputer
from log_module import log_export
import l2
def test_g_cancel():
    NewGCancelBigNumComputer().test()
def test_j():
    code = '603778'
    pos_list = log_export.get_l2_process_position(code)
    l2.l2_data_util.load_l2_data(code)
    total_datas = deepcopy(l2.l2_data_util.local_today_datas[code])
    l2.l2_data_util.local_today_num_operate_map.get(code).clear()
    l2.l2_data_util.local_today_buyno_map.get(code).clear()
    l2.l2_data_util.local_today_canceled_buyno_map.get(code).clear()
    l2.l2_data_util.local_today_datas[code] = []
    for p in pos_list:
        if p[0] >= 234:
            break
        add_datas = total_datas[p[0]:p[1] + 1]
        l2.l2_data_util.local_today_datas[code].extend(add_datas)
        l2.l2_data_util.load_num_operate_map(l2.l2_data_util.local_today_num_operate_map, code, add_datas)
        l2.l2_data_util.load_buy_no_map(l2.l2_data_util.local_today_buyno_map, code, add_datas)
        l2.l2_data_util.load_sell_no_map(l2.l2_data_util.local_today_sellno_map, code, add_datas)
        l2.l2_data_util.load_canceled_buy_no_map(l2.l2_data_util.local_today_canceled_buyno_map, code, add_datas)
        if p[0] > 71:
            JCancelBigNumComputer().set_real_place_order_index(code, 71, 19, False)
        result = JCancelBigNumComputer().need_cancel(code, p[0], p[1])
        # print(add_datas[-1]['val']['time'], result)
        if result and result[0]:
            print(result)
if __name__ == '__main__':
    test_g_cancel()
    test_j()
    RedisUtils.run_loop()
trade/trade_result_manager.py
@@ -4,7 +4,8 @@
from code_attribute.gpcode_manager import MustBuyCodesManager
from l2 import l2_data_manager, place_order_single_data_manager
from l2.cancel_buy_strategy import HourCancelBigNumComputer, SCancelBigNumComputer, \
    LCancelBigNumComputer, DCancelBigNumComputer, GCancelBigNumComputer, FCancelBigNumComputer, NewGCancelBigNumComputer
    LCancelBigNumComputer, DCancelBigNumComputer, GCancelBigNumComputer, FCancelBigNumComputer, \
    NewGCancelBigNumComputer, JCancelBigNumComputer
from l2.l2_data_manager import OrderBeginPosInfo
from l2.l2_data_util import local_today_datas, local_today_num_operate_map, L2DataUtil
from l2.l2_sell_manager import L2MarketSellManager
@@ -31,6 +32,7 @@
    SCancelBigNumComputer().cancel_success(code)
    LCancelBigNumComputer().cancel_success(code)
    NewGCancelBigNumComputer().cancel_success(code)
    JCancelBigNumComputer().cancel_success(code)
    # dask.compute(f1, f2, f5, f6, f7, f8)
@@ -85,6 +87,7 @@
    h_cancel(code, order_begin_pos.buy_single_index, order_begin_pos.buy_exec_index)
    l_cancel(code)
    g_cancel(code)
    JCancelBigNumComputer().place_order_success(code)
    if order_begin_pos.mode == OrderBeginPosInfo.MODE_FAST:
        f_cancel(code)
        # 记录卖盘统计时间被用
@@ -117,6 +120,7 @@
    LCancelBigNumComputer().cancel_success(code)
    FCancelBigNumComputer().cancel_success(code)
    NewGCancelBigNumComputer().cancel_success(code)
    JCancelBigNumComputer().cancel_success(code)
    # 记录最近的撤单时间
    if from_real_cancel:
        __latest_cancel_l2_data_dict[code] = total_datas[-1]