Administrator
2025-01-03 f11cde17628781e516371cc039e98fa884b808e2
新版真实下单位寻找
5个文件已修改
350 ■■■■ 已修改文件
l2/huaxin/huaxin_delegate_postion_manager.py 228 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
servers/huaxin_trade_server.py 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/code_plate_key_manager.py 5 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/kpl_data_constant.py 49 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/kpl_limit_up_data_manager.py 62 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/huaxin/huaxin_delegate_postion_manager.py
@@ -28,20 +28,6 @@
        price, volume, latest_data, time.time(), order_ref, shadow_price, exec_index, shadow_volume)
def place_order_new(code, oredr_info_list, exec_index, latest_data):
    """
    设置的下单信息
    @param code:
    @param oredr_info_list: 例如-[(量,价格, order_ref)], 先下单的在前
    @param exec_index:
    @param latest_data:
    @return:
    """
    async_log_util.info(logger_real_place_order_position,
                        f"新版下单:code-{code} oredr_info_list-{oredr_info_list}  exec-index-{exec_index}")
    _place_order_info_dict_new[code] = (oredr_info_list, latest_data, time.time(), exec_index)
# 获取下单信息
def get_order_info(code):
    info = _place_order_info_dict.get(code)
@@ -60,7 +46,219 @@
RELIABILITY_TYPE_ESTIMATE = 3  # 估算下单位
# 计算预估下单位
class RealDelegateOrderPositionManager:
    """
    真实下单位管理
    """
    __place_order_info_dict = {}
    # 获取下单信息
    @classmethod
    def get_order_info(cls, code):
        info = cls.__place_order_info_dict.get(code)
        TIME_SPACE_THRESHHOD = 3 if tool.is_sz_code(code) else 20
        if info and time.time() - info[2] > TIME_SPACE_THRESHHOD:
            async_log_util.info(logger_real_place_order_position, "get_order_info 间隔{}s以上:code-{}",
                                TIME_SPACE_THRESHHOD,
                                code)
            # 间隔3s以上就无效了
            info = None
            cls.__place_order_info_dict.pop(code)
        return info
    @classmethod
    def place_order(cls, code, oredr_info_list, exec_index, latest_data):
        """
        设置的下单信息
        @param code:
        @param oredr_info_list: 例如-[(量,价格, order_ref)], 先下单的在前
        @param exec_index:
        @param latest_data:
        @return:
        """
        async_log_util.info(logger_real_place_order_position,
                            f"新版下单:code-{code} oredr_info_list-{oredr_info_list}  exec-index-{exec_index}")
        cls.__place_order_info_dict[code] = (oredr_info_list, latest_data, time.time(), exec_index)
    @classmethod
    def __compute_estimate_order_position(cls, code, exec_buy_index, shadow_price, shadow_volume):
        total_datas = l2_data_util.local_today_datas.get(code)
        try:
            if tool.is_sh_code(code):
                # 通过影子单买撤数据的订单号确认大致位置
                shadow_place_order_cancel_index = None
                for i in range(exec_buy_index, total_datas[-1]['index'] + 1):
                    d = total_datas[i]
                    val = d['val']
                    # 判断影子订单位置
                    if val["num"] != shadow_volume // 100:
                        continue
                    if abs(shadow_price - float(val["price"])) >= 0.001:
                        continue
                    if not L2DataUtil.is_buy_cancel(val):
                        continue
                    shadow_place_order_cancel_index = d["index"]
                if shadow_place_order_cancel_index:
                    async_log_util.info(logger_debug,
                                        f"{code} 执行位:{exec_buy_index} 获取到影子单的买撤:{shadow_place_order_cancel_index}")
                    order_no = int(total_datas[shadow_place_order_cancel_index])
                    for i in range(shadow_place_order_cancel_index, 0, -1):
                        d = total_datas[i]
                        val = d['val']
                        if not L2DataUtil.is_buy(val):
                            continue
                        if order_no > int(val['orderNo']) and abs(tool.trade_time_sub(val['time'], total_datas[
                            shadow_place_order_cancel_index]['val']["time"])) <= 1:
                            real_place_order_index = min(i + 1, total_datas[-1]['index'])
                            async_log_util.info(logger_debug,
                                                f"{code} 执行位:{exec_buy_index} 根据影子单的买撤获取真实下单位置:{real_place_order_index}")
                            return real_place_order_index
        except Exception as e:
            async_log_util.error(logger_debug, f"真实下单位置(影子单撤单)出错:{code} - {str(e)}")
        exec_data = total_datas[exec_buy_index]
        THRESH_MS = 20 if tool.is_sz_code(code) else 100
        for i in range(exec_buy_index, total_datas[-1]["index"]):
            if L2DataUtil.time_sub_as_ms(total_datas[i]['val'], exec_data["val"]) >= THRESH_MS:
                return i
        return None
    @classmethod
    def __compute_real_place_order_position(cls, code, exec_data, oredr_info_list, add_datas):
        THRESHOLD_MS = 20 if tool.is_sz_code(code) else 100
        # 获取下单的量
        target_volumes = [x[0] // 100 for x in oredr_info_list]
        volumes_info_list = []
        current_delegates = huaxin_trade_record_manager.DelegateRecordManager().list_current_delegates(code)
        # 下单量对应的委托时间
        current_delegate_place_order_time_dict = {}
        if current_delegates:
            # 下单时间不能早于执行位置的时间
            if tool.trade_time_sub(current_delegates[0]["acceptTime"], exec_data["val"]["time"]) >= 0:
                current_delegate_place_order_time_dict = {x["volume"] // 100: x["acceptTime"] for x in
                                                          current_delegates}
        for data in add_datas:
            val = data["val"]
            if not L2DataUtil.is_limit_up_price_buy(val):
                continue
            if val["num"] not in target_volumes:
                continue
            if val['num'] in current_delegate_place_order_time_dict:
                if current_delegate_place_order_time_dict[val['num']] != val['time']:
                    # 与下单时间明确不符合
                    continue
            volumes_info_list.append((data["index"], val["num"], data))
        if not volumes_info_list:
            return None
        volumes_list = [x[1] for x in volumes_info_list]
        lack_volumes = set(target_volumes) - set(volumes_list)
        if lack_volumes:
            # 量不够, 往前找一定的量
            total_datas = l2_data_util.local_today_datas.get(code)
            end_data = add_datas[0]
            # 获取还差的量, 最多往前找THRESHOLD_MS 的毫秒
            for i in range(end_data["index"] - 1, -1, -1):
                data = total_datas[i]
                val = data["val"]
                if val['num'] not in lack_volumes:
                    continue
                if not L2DataUtil.is_limit_up_price_buy(val):
                    continue
                if tool.trade_time_sub_with_ms(L2DataUtil.get_time_with_ms(end_data['val']),
                                               L2DataUtil.get_time_with_ms(val)) > THRESHOLD_MS:
                    break
                if val['num'] in current_delegate_place_order_time_dict:
                    if current_delegate_place_order_time_dict[val['num']] != val['time']:
                        # 与下单时间明确不符合
                        continue
                left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code,
                                                                                                         i,
                                                                                                         total_datas,
                                                                                                         l2_data_util.local_today_canceled_buyno_map.get(
                                                                                                             code))
                if left_count < 1:
                    continue
                lack_volumes.discard(val['num'])
                volumes_info_list.insert(0, (data["index"], val["num"], data))
            volumes_list = [x[1] for x in volumes_info_list]
            lack_volumes = set(target_volumes) - set(volumes_list)
            if lack_volumes:
                return None
        match_list = []
        for i in range(0, len(volumes_list) - len(target_volumes) + 1):
            # 量与委托量不相等
            if set(target_volumes) != set(volumes_list[i:i + len(target_volumes)]):
                continue
            # 委托间隔时间不能相差100ms以上
            temp_volumes_info_list = volumes_info_list[i:i + len(target_volumes)]
            sub_time_list = []
            for j in range(0, len(temp_volumes_info_list) - 1):
                sub_ms = tool.trade_time_sub_with_ms(
                    L2DataUtil.get_time_with_ms(temp_volumes_info_list[j + 1][2]["val"]),
                    L2DataUtil.get_time_with_ms(temp_volumes_info_list[j][2]["val"]))
                sub_time_list.append(abs(sub_ms))
            max_sub_time = max(sub_time_list)
            if max_sub_time > THRESHOLD_MS:
                continue
            # 最大的时间差
            match_list.append((max_sub_time, temp_volumes_info_list[0][2]))
        if not match_list:
            # 没有找到真实下单位
            return None
        # 获取时间差最小的数据
        real_place_order_info = match_list[0]
        for x in match_list:
            if x[0] < real_place_order_info[0]:
                real_place_order_info = x
        return real_place_order_info[1]
    @classmethod
    def compute_l2_place_order_position(cls, code, add_datas):
        """
        计算真实下单位置
        @param code: 代码
        @param add_datas: 本批次数据
        @return:
        """
        order_info = get_order_info(code)
        if not order_info:
            # 暂无下单信息
            return None, order_info, None
        order_info_list = order_info[0]  # [(量,价格, order_ref)]
        exec_data = order_info[1]
        order_time = order_info[2]
        estimate_time_space = 1 if tool.is_sz_code(code) else 2.5
        place_order_data = cls.__compute_real_place_order_position(code, order_info_list, add_datas)
        if place_order_data:
            return place_order_data["index"], order_info, RELIABILITY_TYPE_REAL
        elif tool.trade_time_sub(add_datas[-1]['val']['time'],
                                 exec_data['val']['time']) >= estimate_time_space and time.time() - order_time > 5:
            estimate_index = cls.__compute_estimate_order_position(code, exec_data["index"], order_info_list[-1][1],
                                                                   order_info_list[-1][0])
            if estimate_index:
                return estimate_index, order_info, RELIABILITY_TYPE_ESTIMATE
        return None, order_info, None
    @classmethod
    def recompute_for_slow_time(cls, code, order_info, real_place_index, compute_type):
        """
        因为L2数据延迟问题而重新计算真实下单位
        @param code:
        @param order_info:
        @param real_place_index:
        @param compute_type:
        @return:
        """
        # TODO 重新计算, 根据实际订单来计算
        pass
    # 计算预估下单位
def __compute_estimate_order_position(code, exec_buy_index, shadow_price, shadow_volume):
    total_datas = l2_data_util.local_today_datas.get(code)
    try:
servers/huaxin_trade_server.py
@@ -1039,9 +1039,9 @@
            # 没有添加过的时候需要重新添加
            datas_ = LatestLimitUpBlockManager().statistics_limit_up_block_infos()
            if datas_:
                plates = [d[0] for d in datas_[:3]]
                for p in plates:
                    KPLPlateForbiddenManager().save_plate(p)
                for data_ in datas_:
                    if data_[2] >= 3:
                        KPLPlateForbiddenManager().save_plate(data_[0])
    except:
        pass
third_data/code_plate_key_manager.py
@@ -429,7 +429,10 @@
                    strong = cls.get_market_strong()
                    if strong is None:
                        strong = 60
                    THRESHOLD_MONEY = int((1 - strong / 200) * data[3])
                    if data[3] > 3000e4:
                        THRESHOLD_MONEY = int((1 - strong / 200) * data[3])
                    else:
                        THRESHOLD_MONEY = data[3]
            # if count >= MAX_COUNT:
            #     break
        # 记录精选流出日志
third_data/kpl_data_constant.py
@@ -101,6 +101,7 @@
        # 加载为扫入买匹配的代码板块
        kpl_results = KPLLimitUpDataUtil.get_latest_block_infos(min_day=tool.date_sub(cls.__day, 365),
                                                                max_day=cls.__day)
        # {"代码":[(板块, 日期), (板块, 日期)]}
        kpl_block_dict = {}
        for r in kpl_results:
            # 当日炸板的不计算原因
@@ -109,25 +110,36 @@
            code = r[0]
            if code not in kpl_block_dict:
                kpl_block_dict[code] = []
            kpl_block_dict[code].append((r[2], r[1]))  # (板块, 时间)
            kpl_block_dict[code].append((r[2], r[1]))  # (板块, 日期)
        for code in kpl_block_dict:
            block_infos = kpl_block_dict.get(code)
            block_infos.sort(key=lambda x: x[1], reverse=True)
            temp_dict = {}  # {"板块":[出现次数, 最近出现时间]}
            for b in block_infos:
                if b[0] not in temp_dict:
                    temp_dict[b[0]] = [0, b[1]]
                temp_dict[b[0]][0] += 1
            temp_list = [(k, temp_dict[k][0], temp_dict[k][1]) for k in temp_dict]
            # 按照涨停次数与最近涨停时间排序
            temp_list.sort(key=lambda x: (x[1], x[2]), reverse=True)
            cls.__radical_buy_reasons_origin_data_dict[code] = temp_list
            blocks = {temp_list[0][0]}
            if len(temp_list) > 1:
                if temp_list[1][1] >= 2:
                    blocks.add(temp_list[1][0])
            blocks -= constant.KPL_INVALID_BLOCKS
            cls.__radical_buy_reasons_dict[code] = blocks
            cls.__radical_buy_reasons_dict[code] = cls.__compute_limit_up_reasons(code, block_infos)
    @classmethod
    def __compute_limit_up_reasons(cls, code, block_infos):
        """
        计算涨停原因
        @param code:
        @param block_infos:
        @return:
        """
        # [(板块, 日期)]
        block_infos.sort(key=lambda x: x[1], reverse=True)
        # {"板块":[(出现次数, 最近出现日期)]}
        temp_dict = {}
        for b in block_infos:
            if b[0] not in temp_dict:
                temp_dict[b[0]] = [0, b[1]]
            temp_dict[b[0]][0] += 1
        temp_list = [(k, temp_dict[k][0], temp_dict[k][1]) for k in temp_dict]
        # 按照涨停次数与最近涨停时间排序
        temp_list.sort(key=lambda x: (x[1], x[2]), reverse=True)
        cls.__radical_buy_reasons_origin_data_dict[code] = temp_list
        blocks = {temp_list[0][0]}
        # 取涨停次数最多的和最近涨停的
        blocks.add(block_infos[0][0])
        blocks -= constant.KPL_INVALID_BLOCKS
        return blocks
    def get_limit_up_reasons(self, code):
        """
@@ -239,9 +251,6 @@
                        block_codes[b] = set()
                    block_codes[b].add(code)
        cls.__current_limit_up_block_codes = block_codes
    @classmethod
    def set_history_limit_up_datas(cls, history_limit_up_datas_):
third_data/kpl_limit_up_data_manager.py
@@ -1,6 +1,8 @@
"""
开盘啦涨停数据管理
"""
import copy
from third_data import kpl_util, kpl_data_manager
from third_data.history_k_data_manager import HistoryKDataManager
from third_data.history_k_data_util import HistoryKDatasUtils
@@ -34,8 +36,6 @@
    @return:
    """
    return kpl_data_manager.KPLLimitUpDataRecordManager.total_datas
class CodeLimitUpSequenceManager:
@@ -126,8 +126,8 @@
    """
    最近涨停的板块管理
    """
    # 看最近7天
    __LATEST_DAY_COUNT = 7
    # 看最近3天,不包含今天
    __LATEST_DAY_COUNT = 3
    __days = []
    # 目前涨停
@@ -156,16 +156,15 @@
    @classmethod
    def __load_datas(cls):
        # 加载最近7天的数据
        __days = HistoryKDatasUtils.get_latest_trading_date_cache(cls.__LATEST_DAY_COUNT - 1)
        now_day = tool.get_now_date_str()
        if __days[0] != now_day:
            __days.insert(0, now_day)
        # 加载最近几天的数据
        __days = HistoryKDatasUtils.get_latest_trading_date_cache(cls.__LATEST_DAY_COUNT)
        __days = copy.deepcopy(__days)
        # 不能包含今天
        if __days[0] == tool.get_now_date_str():
            __days.pop(0)
        cls.__days = __days
        # 加载之前6天的涨停,曾涨停,曾涨停代码的最近6天的K线
        for day in __days:
            if day == now_day:
                continue
            limit_up_records = get_history_limit_up_datas(day)
            cls.__history_limit_up_day_datas[day] = limit_up_records
            current_limit_up_datas = get_current_limit_up_datas(day)
@@ -179,8 +178,6 @@
            cls.__get_bars(code)
        # 统计前6天的板块信息
        for day in __days:
            if day == now_day:
                continue
            cls.__block_day_datas[day] = cls.__statistics_limit_up_block_infos_by_day(day)
    def set_current_limit_up_data(self, day, datas):
@@ -196,17 +193,21 @@
    def __statistics_limit_up_block_infos_by_day(cls, day):
        """
        统计涨停代码信息
        @return:
        @return:{"板块":(涨停数, 炸板数, {包含的代码})}
        """
        # 统计板块的
        current_code_dict = {d[0]: d for d in cls.__current_limit_up_day_datas[day]}
        # history_code_dict = {d[3]: d for d in self.__history_limit_up_day_datas[day]}
        block_codes_dict = {}
        for h in cls.__history_limit_up_day_datas[day]:
            cls.__code_name_dict[h[3]] = h[4]
            if h[2] not in block_codes_dict:
                block_codes_dict[h[2]] = set()
            block_codes_dict[h[2]].add(h[3])
            # 将板块所包含的代码归类
            code = h[3]
            cls.__code_name_dict[code] = h[4]
            blocks = LimitUpCodesBlockRecordManager().get_radical_buy_blocks(code)
            if blocks:
                for b in blocks:
                    if b not in block_codes_dict:
                        block_codes_dict[b] = set()
                    block_codes_dict[b].add(code)
        fdata = {}
        for b in block_codes_dict:
            limit_up_count = 0
@@ -225,19 +226,16 @@
        统计涨停板块数据
        @return:
        """
        # 只统计今天的数据
        now_day = tool.get_now_date_str()
        try:
            block_dict = self.__statistics_limit_up_block_infos_by_day(now_day)
            self.__block_day_datas[now_day] = block_dict
        except:
            pass
        # 板块出现的天数
        block_count_dict = {}
        # 板块出现的代码
        block_codes_dict = {}
        for day in self.__block_day_datas:
            for b in self.__block_day_datas[day]:
                total_limit_up_count = self.__block_day_datas[day][b][0] + self.__block_day_datas[day][b][1]
                if total_limit_up_count < 3:
                    continue
                # 板块涨停个数
                if b not in block_count_dict:
                    block_count_dict[b] = set()
                if b not in block_codes_dict:
@@ -250,19 +248,15 @@
        block_count_list = block_count_list[:50]
        # [(涨停原因,累计涨停次数,连续次数)]
        fdatas = []
        if self.__history_limit_up_day_datas.get(now_day):
            today_records_code_dict = {d[3]: d for d in self.__history_limit_up_day_datas.get(now_day)}
        else:
            today_records_code_dict = {}
        for d in block_count_list:
            b = d[0]
            fdata = [d[0], len(d[1])]
            fdata = [b, len(d[1])]
            temp = []
            max_continue_count = 0
            for day in self.__days:
                if day not in self.__block_day_datas:
                    continue
                if d[0] in self.__block_day_datas[day]:
                if b in self.__block_day_datas[day]:
                    temp.append(day)
                else:
                    c = len(temp)
@@ -286,9 +280,6 @@
            # 统计今天这个板块中大于二板的代码数量
            limit_up_counts = 0
            for code in block_codes_dict[d[0]]:
                if code in today_records_code_dict and today_records_code_dict[code][12] != '首板':
                    limit_up_counts += 1
            fdata.append(limit_up_counts)
            # 获取每天的数量
            days_datas = []
@@ -331,4 +322,3 @@
        rate = int((volumes_data[0]["close"] - min_price) * 100 / min_price)
        cls.__k_max_rate[code] = rate
        return cls.__k_datas.get(code)