Administrator
2024-03-20 7d026af4248dfedebf99e5134e3270c361f14f0c
S撤调整
5个文件已修改
201 ■■■■■ 已修改文件
l2/cancel_buy_strategy.py 172 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_manager_new.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_util.py 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/l2_trade_test.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
utils/tool.py 17 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/cancel_buy_strategy.py
@@ -36,7 +36,7 @@
def set_real_place_position(code, index, buy_single_index=None, is_default=True):
    # DCancelBigNumComputer().set_real_order_index(code, index)
    SCancelBigNumComputer().set_real_place_order_index(code, index)
    SCancelBigNumComputer().set_real_place_order_index(code, index,is_default)
    LCancelBigNumComputer().set_real_place_order_index(code, index, buy_single_index=buy_single_index,
                                                       is_default=is_default)
    HourCancelBigNumComputer().set_real_place_order_index(code, index, buy_single_index)
@@ -71,34 +71,27 @@
            for k in keys:
                code = k.split("-")[-1]
                val = RedisUtils.get(__redis, k)
                val = int(val)
                val = json.loads(val)
                tool.CodeDataCacheUtil.set_cache(cls.__s_cancel_real_place_order_index_cache, code, val)
        finally:
            RedisUtils.realse(__redis)
    # 设置真实下单位置
    def __save_real_place_order_index(self, code, index):
        CodeDataCacheUtil.set_cache(self.__s_cancel_real_place_order_index_cache, code, index)
    def __save_real_place_order_index(self, code, index, is_default):
        CodeDataCacheUtil.set_cache(self.__s_cancel_real_place_order_index_cache, code, (index, is_default))
        key = "s_cancel_real_place_order_index-{}".format(code)
        RedisUtils.setex_async(self.__db, key, tool.get_expire(), index)
        RedisUtils.setex_async(self.__db, key, tool.get_expire(), json.dumps((index, is_default)))
    def __get_real_place_order_index(self, code):
        key = "s_cancel_real_place_order_index-{}".format(code)
        val = RedisUtils.get(self.__get_redis(), key)
        if val is None:
            return None
        return int(val)
    def __get_real_place_order_index_cache(self, code):
    def __get_real_place_order_index_info_cache(self, code):
        cache_result = CodeDataCacheUtil.get_cache(self.__s_cancel_real_place_order_index_cache, code)
        if cache_result[0]:
            return cache_result[1]
        return None
    def get_real_place_order_index_cache(self, code):
        cache_result = CodeDataCacheUtil.get_cache(self.__s_cancel_real_place_order_index_cache, code)
        if cache_result[0]:
            return cache_result[1]
        cache_result = self.__get_real_place_order_index_info_cache(code)
        if cache_result:
            return cache_result[0]
        return None
    def __clear_data(self, code):
@@ -110,8 +103,8 @@
            RedisUtils.delete_async(self.__db, key)
    # 设置真实下单位置
    def set_real_place_order_index(self, code, index):
        self.__save_real_place_order_index(code, index)
    def set_real_place_order_index(self, code, index, is_default):
        self.__save_real_place_order_index(code, index, is_default)
    def clear_data(self):
        ks = ["s_big_num_cancel_compute_data-*", "s_cancel_real_place_order_index-*"]
@@ -131,11 +124,15 @@
    # S前撤实现
    def __need_cancel_for_up(self, code, big_sell_order_info, total_datas):
        # 查询是否是真的真实下单位置
        trade_index, is_default = TradeBuyQueue().get_traded_index(code)
        real_order_index = self.__get_real_place_order_index_cache(code)
        if real_order_index is None and big_sell_order_info[0] < 500000:
            return True, "没找到真实下单位置就有大卖单"
        real_order_index_info = self.__get_real_place_order_index_info_cache(code)
        if real_order_index_info is None or real_order_index_info[1]:
            return False, "没找到真实下单位"
        real_order_index = real_order_index_info[0]
        total_deal_money = sum([x[1] * x[2] for x in big_sell_order_info[1]])
        start_order_no = big_sell_order_info[1][0][3][1]
        total_num = 0
        for i in range(trade_index, real_order_index):
            data = total_datas[i]
@@ -150,16 +147,43 @@
                                                                                                         code))
            if left_count > 0:
                total_num += val["num"]
        total_money = total_num * 100 * big_sell_order_info[1][-1][2]
        total_deal_money = sum([x[1] * x[2] for x in big_sell_order_info[1]])
        deal_rate = round(total_deal_money / total_money, 4)
        if deal_rate >= 0.3:
            return True, f"成交比例:{deal_rate}({total_deal_money}/{total_money})"
        # 大于300w
        if total_deal_money >= 300 * 10000:
            total_money = total_num * 100 * big_sell_order_info[1][-1][2]
            deal_rate = round(total_deal_money / total_money, 4)
            if deal_rate >= 0.3:
                return True, f"有300w大卖单成交比例:{deal_rate}({total_deal_money}/{total_money})"
        if total_deal_money >= 100 * 10000:
            start_order_no = big_sell_order_info[1][-1][4][1]
            latest_deal_time_ms = l2_huaxin_util.convert_time(big_sell_order_info[1][-1][4][0], with_ms=True)
            real_trade_index = None
            for i in range(trade_index, real_order_index):
                data = total_datas[i]
                val = data['val']
                if not L2DataUtil.is_limit_up_price_buy(val):
                    continue
                if int(val['orderNo']) < start_order_no:
                    continue
                real_trade_index = i
            if real_trade_index is None:
                l2_log.s_cancel_debug(code, f"没找到真实的成交进度:start_order_no-{start_order_no} 卖单-{big_sell_order_info}")
                return False, ""
            # 重新囊括L后
            # 撤单时间比早成交时间大就需要计算在里面
            LCancelBigNumComputer().re_compute_l_down_watch_indexes(code, big_sell_info=(
                real_trade_index, latest_deal_time_ms))
        return False, ""
    # 计算S后撤监听的数据范围
    def __compute_down_cancel_watch_index(self, code, big_sell_order_info, total_datas):
        trade_index, is_default = TradeBuyQueue().get_traded_index(code)
        real_order_index = self.__get_real_place_order_index_cache(code)
        real_order_index_info = self.__get_real_place_order_index_info_cache(code)
        if real_order_index_info is None or real_order_index_info[1]:
            return
        real_order_index = real_order_index_info[0]
        start_order_no = big_sell_order_info[1][-1][4][1]
        latest_deal_time = l2_huaxin_util.convert_time(big_sell_order_info[1][-1][4][0], with_ms=True)
        if code in self.__s_down_watch_indexes_dict:
@@ -191,21 +215,23 @@
            self.__s_down_watch_indexes_dict[code] = (latest_deal_time, watch_indexes)
    #  big_sell_order_info格式:[总共的卖单金额, 大卖单详情列表]
    def set_big_sell_order_info_for_cancel(self, code,  big_sell_order_info,order_begin_pos: OrderBeginPosInfo):
    def set_big_sell_order_info_for_cancel(self, code, big_sell_order_info, order_begin_pos: OrderBeginPosInfo):
        if order_begin_pos is None or order_begin_pos.buy_exec_index is None or order_begin_pos.buy_exec_index < 0:
            return False, "还未下单"
        if big_sell_order_info[0] < 500000 or not big_sell_order_info[1]:
            return False, "无大单卖"
        # 获取最新的成交时间
        total_datas = local_today_datas.get(code)
        need_cancel, cancel_msg = self.__need_cancel_for_up(code, big_sell_order_info, total_datas)
        if need_cancel:
            return need_cancel, cancel_msg
        # 获取最新的成交时间
        latest_trade_time = l2_huaxin_util.convert_time(big_sell_order_info[1][-1][4][0])
        if tool.trade_time_sub(latest_trade_time, total_datas[order_begin_pos.buy_single_index]['val']['time']) < 180:
            return self.__need_cancel_for_up(code, big_sell_order_info, total_datas)
        else:
        if tool.trade_time_sub(latest_trade_time, total_datas[order_begin_pos.buy_single_index]['val']['time']) >= 180:
            self.__compute_down_cancel_watch_index(code, big_sell_order_info, total_datas)
            return False, "超过S前守护范围"
        return False, "不满足撤单条件"
    def need_cancel_for_down(self, code, start_index, end_index):
        watch_index_info = self.__s_down_watch_indexes_dict.get(code)
@@ -859,22 +885,32 @@
        # 重新计算L上
    def re_compute_l_down_watch_indexes(self, code):
    # big_sell_info卖单信息,格式:[最近成交索引,最近成交时间(带毫秒)]
    def re_compute_l_down_watch_indexes(self, code, big_sell_info=None):
        watch_index_info = self.__cancel_watch_index_info_cache.get(code)
        if not watch_index_info or watch_index_info[1] > 0:
            return
        # 获取成交进度位与真实下单位置
        real_place_order_index_info = self.__real_place_order_index_dict.get(code)
        last_trade_progress_index = self.__last_trade_progress_dict.get(code)
        if big_sell_info is not None:
            last_trade_progress_index = big_sell_info[0]
        if not real_place_order_index_info or not last_trade_progress_index:
            return
        min_cancel_time_with_ms = None
        if big_sell_info:
            min_cancel_time_with_ms = big_sell_info[1]
        self.compute_watch_index(code, watch_index_info[0], last_trade_progress_index + 1,
                                 real_place_order_index_info[0],
                                 re_compute=1)
                                 re_compute=1, min_cancel_time_with_ms=min_cancel_time_with_ms,
                                 msg=f"大单卖: 成交进度-{big_sell_info}" if big_sell_info is not None else '')
    # 计算观察索引,倒序计算
    # re_compute:是否是重新计算的
    def compute_watch_index(self, code, buy_single_index, start_index, end_index, re_compute=0):
    # min_cancel_time_with_ms:最小撤单时间,大于等于此时间的撤单需要囊括进去
    def compute_watch_index(self, code, buy_single_index, start_index, end_index, re_compute=0,
                            min_cancel_time_with_ms=None, msg=""):
        try:
            l2_log.l_cancel_debug(code, f"计算L后囊括范围:{start_index}-{end_index}")
            total_datas = local_today_datas.get(code)
@@ -900,14 +936,18 @@
                    if val["num"] < min_num:
                        continue
                    left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code,
                                                                                                             j,
                                                                                                             total_datas,
                                                                                                             local_today_canceled_buyno_map.get(
                                                                                                                 code))
                    if left_count > 0:
                    cancel_data = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_canceled_data_v2(code,
                                                                                                          j,
                                                                                                          total_datas,
                                                                                                          local_today_canceled_buyno_map.get(
                                                                                                              code))
                    if not cancel_data:
                        not_cancel_indexes_with_num.append((j, val["num"]))
                    elif min_cancel_time_with_ms and tool.compare_time_with_ms(min_cancel_time_with_ms,
                                                                               L2DataUtil.get_time_with_ms(
                                                                                   cancel_data['val'])) <= 0:
                        not_cancel_indexes_with_num.append((j, val["num"]))
                min_num = 0
                if not_cancel_indexes_with_num:
                    temp_count = len(not_cancel_indexes_with_num)
@@ -937,13 +977,19 @@
                            if val['num'] < min_num:
                                continue
                            left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(
                                code,
                                i,
                                total_datas,
                                local_today_canceled_buyno_map.get(
                                    code))
                            if left_count > 0:
                            cancel_data = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_canceled_data_v2(code,
                                                                                                                  i,
                                                                                                                  total_datas,
                                                                                                                  local_today_canceled_buyno_map.get(
                                                                                                                      code))
                            if not cancel_data:
                                watch_indexes.add(i)
                                if len(watch_indexes) >= MAX_COUNT:
                                    break
                            elif min_cancel_time_with_ms and tool.compare_time_with_ms(min_cancel_time_with_ms,
                                                                                       L2DataUtil.get_time_with_ms(
                                                                                           cancel_data['val'])) <= 0:
                                watch_indexes.add(i)
                                if len(watch_indexes) >= MAX_COUNT:
                                    break
@@ -973,18 +1019,24 @@
                            # 小金额过滤
                            if float(val['price']) * val['num'] < 100 * 100:
                                continue
                            left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(
                                code,
                                i,
                                total_datas,
                                local_today_canceled_buyno_map.get(
                                    code))
                            if left_count > 0:
                            cancel_data = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_canceled_data_v2(code,
                                                                                                                  i,
                                                                                                                  total_datas,
                                                                                                                  local_today_canceled_buyno_map.get(
                                                                                                                      code))
                            if not cancel_data:
                                watch_indexes.add(i)
                                break
                            elif min_cancel_time_with_ms and tool.compare_time_with_ms(min_cancel_time_with_ms,
                                                                                       L2DataUtil.get_time_with_ms(
                                                                                           cancel_data['val'])) <= 0:
                                watch_indexes.add(i)
                                break
                    self.__set_watch_indexes(code, buy_single_index, re_compute, watch_indexes)
                    l2_log.l_cancel_debug(code,
                                          f"设置监听范围{'(重新计算)' if re_compute else ''}, 数据范围:{re_start_index}-{end_index} 监听范围-{watch_indexes}")
                                          f"设置监听范围({msg}){'(重新计算)' if re_compute else ''}, 数据范围:{re_start_index}-{end_index} 监听范围-{watch_indexes}")
        except Exception as e:
            l2_log.l_cancel_debug(code, f"计算L后囊括范围出错:{str(e)}")
            async_log_util.exception(logger_l2_l_cancel, e)
@@ -1277,6 +1329,10 @@
    def __compute_near_by_trade_progress_need_cancel(self, code, buy_exec_index, start_index, end_index, total_data,
                                                     is_first_code):
        # L前不生效
        if True:
            return False, None
        watch_indexes = self.__get_near_by_trade_progress_indexes_cache(code)
        if not watch_indexes:
            return False, None
l2/l2_data_manager_new.py
@@ -497,8 +497,8 @@
                b_need_cancel, b_cancel_msg = cls.__SCancelBigNumComputer.need_cancel_for_down(code, start_index,end_index)
                if b_need_cancel:
                    async_log_util.error(logger_debug, f"{code} S后撤单:{b_cancel_msg}")
                    # return total_data[end_index], f"S后撤({b_cancel_msg})"
                    return None, ""
                    return total_data[end_index], f"S后撤({b_cancel_msg})"
                    # return None, ""
            except Exception as e:
                logging.exception(e)
                async_log_util.error(logger_l2_error,
l2/l2_data_util.py
@@ -467,6 +467,10 @@
        fs = sub_s * 1000 + sub_ms
        return fs
    @classmethod
    def get_time_with_ms(cls, val):
        return val["time"] + "." + "{:0>3}".format(int(val["tms"]))
class L2TradeQueueUtils(object):
    # 买入数据是否已撤
@@ -581,4 +585,4 @@
if __name__ == "__main__":
    print(L2DataUtil.time_sub_as_ms({"time": "13:00:00", "time_ms": "980"}, {"time": "11:29:59", "time_ms": "590"}))
    print(L2DataUtil.get_time_with_ms({"time": "10:00:00", "tms": 490}))
test/l2_trade_test.py
@@ -206,7 +206,7 @@
        code = "603363"
        l2.l2_data_util.load_l2_data(code)
        l2.l2_data_util.local_today_datas[code] = l2.l2_data_util.local_today_datas[code][:222]
        l2.cancel_buy_strategy.SCancelBigNumComputer().set_real_place_order_index(code, 153)
        l2.cancel_buy_strategy.SCancelBigNumComputer().set_real_place_order_index(code, 153,False)
        TradeBuyQueue.get_traded_index = mock.Mock(return_value=(117, False))
        order_begin_pos = l2_data_manager.OrderBeginPosInfo(buy_single_index=117, buy_exec_index=122)
utils/tool.py
@@ -157,9 +157,9 @@
def get_time_as_millionsecond(time_str):
    s_str,ms_str = time_str.split(".")
    s_str, ms_str = time_str.split(".")
    ts = s_str.split(":")
    return int(ts[0]) * 3600 + int(ts[1]) * 60 + int(ts[2])*1000 + int(ms_str)
    return int(ts[0]) * 3600 + int(ts[1]) * 60 + int(ts[2]) * 1000 + int(ms_str)
# 将秒数格式化为时间
@@ -192,20 +192,25 @@
    return time_1 - time_2
def trade_time_sub_with_ms(time_str_1, time_str_2):
    split_time = get_time_as_second("11:30:00")*1000
    split_time = get_time_as_second("11:30:00") * 1000
    time_1 = get_time_as_millionsecond(time_str_1)
    time_2 = get_time_as_millionsecond(time_str_2)
    if time_1 < split_time < time_2:
        time_2 = time_2 - 90 * 60*1000
        time_2 = time_2 - 90 * 60 * 1000
    elif time_2 < split_time < time_1:
        time_2 = time_2 + 90 * 60*1000
        time_2 = time_2 + 90 * 60 * 1000
    return time_1 - time_2
def compare_time(time_1: str, time_2: str):
    return int(time_1.replace(":", "")) - int(time_2.replace(":", ""))
def compare_time_with_ms(time_1: str, time_2: str):
    return int(time_1.replace(":", "").replace(".", "")) - int(time_2.replace(":", "").replace(".", ""))
# 交易时间加几s
@@ -324,5 +329,5 @@
if __name__ == "__main__":
    print(trade_time_add_millionsecond("10:36:00.123", 1000))
    print(compare_time_with_ms("10:00:00.123","10:01:01.123"))
    print(trade_time_add_millionsecond("11:29:59.123", 1888))