Administrator
2024-02-20 4cd1733f93d0e905a8b06d284eddef064e4d1ca6
客户端推送消息修改/量参考日期规则修改
5个文件已修改
1个文件已添加
440 ■■■■■ 已修改文件
l2/l2_transaction_data_processor.py 28 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
msg/buy_order_msg_manager.py 72 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
msg/push_msg_manager.py 32 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/test_code_attribute.py 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/huaxin_trade_server.py 273 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utils/init_data_util.py 30 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_transaction_data_processor.py
@@ -10,7 +10,7 @@
from l2.l2_transaction_data_manager import HuaXinTransactionDataManager
from log_module import async_log_util
from log_module.log import hx_logger_l2_debug, logger_l2_trade_buy_queue, logger_debug, hx_logger_l2_upload
from msg import push_msg_manager
from msg import push_msg_manager, buy_order_msg_manager
from trade import current_price_process_manager, trade_manager
from trade.deal_big_money_manager import DealOrderNoManager
from utils import tool
@@ -128,24 +128,14 @@
                        # 交易进度变化,判断到真实下单位置的距离
                        real_order_index = SecondCancelBigNumComputer().get_real_place_order_index_cache(code)
                        if real_order_index and real_order_index >= buy_progress_index:
                            total_left_count = 0
                            for i in range(buy_progress_index + 1, real_order_index):
                                val = total_datas[i]["val"]
                                if not L2DataUtil.is_limit_up_price_buy(val):
                                    continue
                                if val["num"] * float(val["price"]) < 5000:
                                    continue
                                left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(
                                    code, i,
                                    total_datas,
                                    local_today_canceled_buyno_map.get(
                                        code))
                                if left_count > 0:
                                    total_left_count += 1
                            if total_left_count <= 3:
                                # 当成交进度距离真实下单位置不足3笔时推送即将成交的消息
                                push_msg_manager.push_order_almost_deal(code, gpcode_manager.get_code_name(code))
                            # 发送下单消息
                            try:
                                buy_order_msg_manager.almost_deal(code, real_order_index, buy_progress_index,
                                                                  total_datas)
                                buy_order_msg_manager.follow_not_enough(code, order_begin_pos.buy_exec_index,
                                                                        real_order_index, total_datas)
                            except Exception as e:
                                logger_debug.exception(e)
            else:
                pass
            if order_begin_pos and order_begin_pos.buy_exec_index and order_begin_pos.buy_exec_index > -1:
msg/buy_order_msg_manager.py
New file
@@ -0,0 +1,72 @@
# 即将成交
from code_attribute import gpcode_manager
from l2 import l2_data_source_util, l2_data_util
from l2.l2_data_util import L2DataUtil
from msg import push_msg_manager
from utils import output_util, tool
# 即将成交
def almost_deal(code, real_order_index, trade_index, total_datas):
    total_left_count = 0
    total_left_money = 0
    for i in range(trade_index + 1, real_order_index):
        data = total_datas[i]
        val = data["val"]
        if not L2DataUtil.is_limit_up_price_buy(val):
            continue
        if val["num"] * float(val["price"]) < 5000:
            continue
        left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code,
                                                                                                 i,
                                                                                                 total_datas,
                                                                                                 l2_data_util.local_today_canceled_buyno_map.get(
                                                                                                     code))
        if left_count > 0:
            total_left_count += left_count
            total_left_money += val["num"] * left_count * float(val["price"]) * 100
    code_name = gpcode_manager.get_code_name(code)
    if total_left_count <= 10:
        push_msg_manager.push_order_almost_deal(code, code_name, real_order_index, f"剩余:{total_left_count}笔",
                                                ctype="count")
    elif total_left_money < 1500 * 10000:
        push_msg_manager.push_order_almost_deal(code, code_name, real_order_index,
                                                f"剩余:{output_util.money_desc(total_left_money)}", ctype="money")
# 真实下单位后面跟单不足
def follow_not_enough(code, buy_exec_index, real_order_index, total_datas):
    # 下单2s过后再提醒
    if tool.trade_time_sub(total_datas[-1]["val"]["time"], total_datas[buy_exec_index]["val"]["time"]) <= 2:
        return
    real_place_order_after_count = 0
    real_place_order_after_money = 0
    # 统计真实下单位置后面未撤的金额
    for i in range(real_order_index, total_datas[-1]["index"]):
        val = total_datas[i]["val"]
        if not L2DataUtil.is_limit_up_price_buy(val):
            continue
        # 是不是大单
        if not l2_data_util.is_big_money(val):
            continue
        canceled_data = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_canceled_data_v2(code,
                                                                                                i,
                                                                                                total_datas,
                                                                                                l2_data_util.local_today_canceled_buyno_map.get(
                                                                                                    code))
        if not canceled_data:
            real_place_order_after_count += 1
            real_place_order_after_money += val["num"] * float(val["price"]) * 100
    code_name = gpcode_manager.get_code_name(code)
    if real_place_order_after_count <= 10:
        push_msg_manager.push_delegate_order_danger(code, code_name, buy_exec_index,
                                                    f"剩余:{real_place_order_after_count}笔",
                                                    ctype="count")
    if real_place_order_after_money <= 1500 * 10000:
        push_msg_manager.push_delegate_order_danger(code, code_name, buy_exec_index,
                                                    f"剩余:{output_util.money_desc(real_place_order_after_money)}",
                                                    ctype="money")
msg/push_msg_manager.py
@@ -9,12 +9,37 @@
TYPE_DELEGATE_QUEUE_CHANGE = "delegate_queue_change"  # 委托队列变化
TYPE_DELEGATE_ORDER_DANGER = "delegate_order_danger"  # 委托的订单危险
thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=5)
__order_almost_deal_filter_dict = {}
__delegate_order_danger_filter_dict = {}
# 推送订单快成交信息
def push_order_almost_deal(code, code_name):
    __push_msg(TYPE_ORDER_ALMOST_DEAL, data={"code": code, "code_name": code_name})
def push_order_almost_deal(code, code_name, buy_single_index, msg, ctype='', force=False):
    if code not in __order_almost_deal_filter_dict:
        __order_almost_deal_filter_dict[code] = set()
    notify_key = f"{buy_single_index}_{ctype}"
    if force or notify_key not in __order_almost_deal_filter_dict[code]:
        __order_almost_deal_filter_dict[code].add(notify_key)
        # 强制推送或这次下单之前未推送
        __push_msg(TYPE_ORDER_ALMOST_DEAL,
                   data={"code": code, "code_name": code_name, "msg": msg})
# 推送委托的订单危险信息
def push_delegate_order_danger(code, code_name, buy_single_index, msg, ctype='', force=False):
    if code not in __delegate_order_danger_filter_dict:
        __delegate_order_danger_filter_dict[code] = set()
    notify_key = f"{buy_single_index}_{ctype}"
    if force or notify_key not in __delegate_order_danger_filter_dict[code]:
        __delegate_order_danger_filter_dict[code].add(notify_key)
        # 强制推送或这次下单之前未推送
        __push_msg(TYPE_DELEGATE_ORDER_DANGER,
                   data={"code": code, "code_name": code_name, "msg": msg})
# 推送委托队列变化消息
@@ -29,4 +54,5 @@
        if data:
            fdata["data"] = data
        middle_api_protocol.request(middle_api_protocol.load_push_msg(fdata))
    thread_pool.submit(push)
    thread_pool.submit(push)
test/test_code_attribute.py
@@ -10,9 +10,10 @@
def __get_refer_volume_info():
    code = "001239"
    limit_up_price = 22.80
    code = "600053"
    limit_up_price = 14.91
    volumes_data = init_data_util.get_volumns_by_code(code, 150)
    volumes_data = volumes_data[1:]
    volumes = init_data_util.parse_max_volume(volumes_data[:90],
                                              code_nature_analyse.is_new_top(
                                                  limit_up_price,
trade/huaxin/huaxin_trade_server.py
@@ -1319,149 +1319,154 @@
                # 获取委托中的代码
                current_delegates = huaxin_trade_record_manager.DelegateRecordManager().list_current_delegates()
                fdatas = []
                for c in current_delegates:
                    if int(c["direction"]) != huaxin_util.TORA_TSTP_D_Buy:
                        continue
                    code = c["securityID"]
                    code_name = gpcode_manager.get_code_name(code)
                    # 获取下单位置信息
                    order_begin_pos = TradePointManager().get_buy_compute_start_data_cache(code)
                    if order_begin_pos is None or order_begin_pos.buy_single_index is None:
                        continue
                    l2_data_util.load_l2_data(code)
                    total_datas = l2_data_util.local_today_datas.get(code)
                    trade_index, is_default = transaction_progress.TradeBuyQueue().get_traded_index(code)
                    # 下单位置
                    place_order_index = SecondCancelBigNumComputer().get_real_place_order_index_cache(code)
                    # 计算信号位置到真实下单位置的总买(不管是否已撤)
                    total_nums = 0
                    for i in range(order_begin_pos.buy_single_index, place_order_index):
                        data = total_datas[i]
                        val = data["val"]
                        if not L2DataUtil.is_limit_up_price_buy(val):
                            continue
                        total_nums += val["num"]
                    # 计算已成交/已撤单的数量
                    deal_or_cancel_num = 0
                    for i in range(order_begin_pos.buy_single_index, trade_index + 1):
                        data = total_datas[i]
                        val = data["val"]
                        if not L2DataUtil.is_limit_up_price_buy(val):
                            continue
                        deal_or_cancel_num += val["num"]
                    # 获取剩下的笔数
                    total_left_count = 0
                    total_left_num = 0
                    for i in range(trade_index + 1, place_order_index):
                        data = total_datas[i]
                        val = data["val"]
                        if not L2DataUtil.is_limit_up_price_buy(val):
                            continue
                        if val["num"] * float(val["price"]) < 5000:
                            continue
                        left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code,
                                                                                                                 i,
                                                                                                                 total_datas,
                                                                                                                 l2_data_util.local_today_canceled_buyno_map.get(
                                                                                                                     code))
                        if left_count > 0:
                            total_left_count += left_count
                            total_left_num += val["num"] * left_count
                    # 获取正在成交
                    dealing_info = HuaXinTransactionDataManager.get_dealing_order_info(code)
                    if dealing_info:
                        if str(total_datas[trade_index]["val"]["orderNo"]) == str(dealing_info[0]):
                            total_left_num += (total_datas[trade_index]["val"]["num"] - dealing_info[1] // 100)
                    limit_up_price = gpcode_manager.get_limit_up_price(code)
                    buy1_money = Buy1PriceManager().get_latest_buy1_money(code)
                    if buy1_money is None:
                        buy1_money = 0
                    # 获取已经成交的大单数量
                    total_big_num = 0
                    total_big_count = 0
                if current_delegates:
                    for c in current_delegates:
                        try:
                            if int(c["direction"]) != huaxin_util.TORA_TSTP_D_Buy:
                                continue
                            code = c["securityID"]
                            code_name = gpcode_manager.get_code_name(code)
                            # 获取下单位置信息
                            order_begin_pos = TradePointManager().get_buy_compute_start_data_cache(code)
                            if order_begin_pos is None or order_begin_pos.buy_single_index is None:
                                continue
                            l2_data_util.load_l2_data(code)
                            total_datas = l2_data_util.local_today_datas.get(code)
                            trade_index, is_default = transaction_progress.TradeBuyQueue().get_traded_index(code)
                            # 下单位置
                            place_order_index = SecondCancelBigNumComputer().get_real_place_order_index_cache(code)
                            # 计算信号位置到真实下单位置的总买(不管是否已撤)
                            total_nums = 0
                            for i in range(order_begin_pos.buy_single_index, place_order_index):
                                data = total_datas[i]
                                val = data["val"]
                                if not L2DataUtil.is_limit_up_price_buy(val):
                                    continue
                                total_nums += val["num"]
                            # 计算已成交/已撤单的数量
                            deal_or_cancel_num = 0
                            for i in range(order_begin_pos.buy_single_index, trade_index + 1):
                                data = total_datas[i]
                                val = data["val"]
                                if not L2DataUtil.is_limit_up_price_buy(val):
                                    continue
                                deal_or_cancel_num += val["num"]
                            # 获取剩下的笔数
                            total_left_count = 0
                            total_left_num = 0
                            for i in range(trade_index + 1, place_order_index):
                                data = total_datas[i]
                                val = data["val"]
                                if not L2DataUtil.is_limit_up_price_buy(val):
                                    continue
                                if val["num"] * float(val["price"]) < 5000:
                                    continue
                                left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code,
                                                                                                                         i,
                                                                                                                         total_datas,
                                                                                                                         l2_data_util.local_today_canceled_buyno_map.get(
                                                                                                                             code))
                                if left_count > 0:
                                    total_left_count += left_count
                                    total_left_num += val["num"] * left_count
                            # 获取正在成交
                            dealing_info = HuaXinTransactionDataManager.get_dealing_order_info(code)
                            if dealing_info:
                                if str(total_datas[trade_index]["val"]["orderNo"]) == str(dealing_info[0]):
                                    total_left_num += (total_datas[trade_index]["val"]["num"] - dealing_info[1] // 100)
                            limit_up_price = gpcode_manager.get_limit_up_price(code)
                            buy1_money = Buy1PriceManager().get_latest_buy1_money(code)
                            if buy1_money is None:
                                buy1_money = 0
                            # 获取已经成交的大单数量
                            total_big_num = 0
                            total_big_count = 0
                    for i in range(0, trade_index):
                        val = total_datas[i]["val"]
                        if not L2DataUtil.is_limit_up_price_buy(val):
                            continue
                        # 是不是大单
                        if not l2_data_util.is_big_money(val):
                            continue
                            for i in range(0, trade_index):
                                val = total_datas[i]["val"]
                                if not L2DataUtil.is_limit_up_price_buy(val):
                                    continue
                                # 是不是大单
                                if not l2_data_util.is_big_money(val):
                                    continue
                        canceled_data = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_canceled_data_v2(code,
                                                                                                                i,
                                                                                                                total_datas,
                                                                                                                l2_data_util.local_today_canceled_buyno_map.get(
                                                                                                                    code))
                        if not canceled_data:
                            total_big_count += 1
                        else:
                            total_big_num -= canceled_data["val"]["num"]
                        total_big_num += val["num"]
                                canceled_data = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_canceled_data_v2(code,
                                                                                                                        i,
                                                                                                                        total_datas,
                                                                                                                        l2_data_util.local_today_canceled_buyno_map.get(
                                                                                                                            code))
                                if not canceled_data:
                                    total_big_count += 1
                                else:
                                    total_big_num -= canceled_data["val"]["num"]
                                total_big_num += val["num"]
                    not_deal_total_big_num = 0
                    not_deal_total_big_count = 0
                    for i in range(trade_index, total_datas[-1]["index"] + 1):
                        val = total_datas[i]["val"]
                        if not L2DataUtil.is_limit_up_price_buy(val):
                            continue
                        # 是不是大单
                        if not l2_data_util.is_big_money(val):
                            continue
                            not_deal_total_big_num = 0
                            not_deal_total_big_count = 0
                            for i in range(trade_index, total_datas[-1]["index"] + 1):
                                val = total_datas[i]["val"]
                                if not L2DataUtil.is_limit_up_price_buy(val):
                                    continue
                                # 是不是大单
                                if not l2_data_util.is_big_money(val):
                                    continue
                        canceled_data = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_canceled_data_v2(code,
                                                                                                                i,
                                                                                                                total_datas,
                                                                                                                l2_data_util.local_today_canceled_buyno_map.get(
                                                                                                                    code))
                        if not canceled_data:
                            not_deal_total_big_count += 1
                        else:
                            not_deal_total_big_num -= canceled_data["val"]["num"]
                        not_deal_total_big_num += val["num"]
                                canceled_data = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_canceled_data_v2(code,
                                                                                                                        i,
                                                                                                                        total_datas,
                                                                                                                        l2_data_util.local_today_canceled_buyno_map.get(
                                                                                                                            code))
                                if not canceled_data:
                                    not_deal_total_big_count += 1
                                else:
                                    not_deal_total_big_num -= canceled_data["val"]["num"]
                                not_deal_total_big_num += val["num"]
                    real_place_order_after_count = 0
                    real_place_order_after_num = 0
                            real_place_order_after_count = 0
                            real_place_order_after_num = 0
                    # 统计真实下单位置后面未撤的金额
                    for i in range(place_order_index, total_datas[-1]["index"]):
                        val = total_datas[i]["val"]
                        if not L2DataUtil.is_limit_up_price_buy(val):
                            continue
                        # 是不是大单
                        if not l2_data_util.is_big_money(val):
                            continue
                            # 统计真实下单位置后面未撤的金额
                            for i in range(place_order_index, total_datas[-1]["index"]):
                                val = total_datas[i]["val"]
                                if not L2DataUtil.is_limit_up_price_buy(val):
                                    continue
                                # 是不是大单
                                if not l2_data_util.is_big_money(val):
                                    continue
                        canceled_data = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_canceled_data_v2(code,
                                                                                                                i,
                                                                                                                total_datas,
                                                                                                                l2_data_util.local_today_canceled_buyno_map.get(
                                                                                                                    code))
                        if not canceled_data:
                            real_place_order_after_count += 1
                            real_place_order_after_num += val["num"]
                                canceled_data = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_canceled_data_v2(code,
                                                                                                                        i,
                                                                                                                        total_datas,
                                                                                                                        l2_data_util.local_today_canceled_buyno_map.get(
                                                                                                                            code))
                                if not canceled_data:
                                    real_place_order_after_count += 1
                                    real_place_order_after_num += val["num"]
                    # 获取当日的量比
                    volume_rate = code_volumn_manager.get_volume_rate(code)
                            # 获取当日的量比
                            volume_rate = code_volumn_manager.get_volume_rate(code)
                    # 是否需要注意
                    need_pay_attention = (total_left_count <= 10 or total_left_num * float(
                        limit_up_price) * 100 < 1000 * 10000) and (
                                                 real_place_order_after_count <= 10 or real_place_order_after_num * float(
                                             limit_up_price) * 100 < 1000 * 10000)
                    fdata = {"code_info": (code, code_name), "total_num": total_nums, "finish_num": deal_or_cancel_num,
                             "buy1_money": output_util.money_desc(buy1_money),
                             "big_num_count": total_big_count,
                             "big_num_money": output_util.money_desc(total_big_num * float(limit_up_price) * 100),
                             "not_deal_big_num_count": not_deal_total_big_count,
                             "not_deal_big_num_money": output_util.money_desc(
                                 not_deal_total_big_num * float(limit_up_price) * 100),
                             "left_count": total_left_count,
                             "volume_rate": volume_rate,
                             "left_money": output_util.money_desc(total_left_num * float(limit_up_price) * 100),
                             "pay_attention": need_pay_attention
                             }
                    fdatas.append(fdata)
                            # 是否需要注意
                            need_pay_attention = (total_left_count <= 10 or total_left_num * float(
                                limit_up_price) * 100 < 1500 * 10000) and (
                                                         real_place_order_after_count <= 10 or real_place_order_after_num * float(
                                                     limit_up_price) * 100 < 1500 * 10000)
                            fdata = {"code_info": (code, code_name), "total_num": total_nums, "finish_num": deal_or_cancel_num,
                                     "buy1_money": output_util.money_desc(buy1_money),
                                     "big_num_count": total_big_count,
                                     "big_num_money": output_util.money_desc(total_big_num * float(limit_up_price) * 100),
                                     "not_deal_big_num_count": not_deal_total_big_count,
                                     "not_deal_big_num_money": output_util.money_desc(
                                         not_deal_total_big_num * float(limit_up_price) * 100),
                                     "left_count": total_left_count,
                                     "volume_rate": volume_rate,
                                     "left_money": output_util.money_desc(total_left_num * float(limit_up_price) * 100),
                                     "pay_attention": need_pay_attention
                                     }
                            fdatas.append(fdata)
                        except Exception as e:
                            logger_debug.exception(e)
                result = {"code": 0, "data": fdatas}
                self.send_response(result, client_id, request_id)
            elif ctype == "set_real_place_order_index":
utils/init_data_util.py
@@ -29,8 +29,36 @@
    return datas
# 返回:(60天最大量,昨日量,量参考日期,参考量据今交易日数)
def parse_max_volume(datas, is_new_or_near_top=False):
    result = __parse_max_volume(datas, is_new_or_near_top)
    refer_index = result[3]
    # 计算最低价
    refer_price = datas[refer_index]["high"]
    min_price = float(refer_price)
    for i in range(0, refer_index + 1):
        if min_price > datas[i]["low"]:
            min_price = datas[i]["low"]
    if (refer_price - min_price) / refer_price < 0.4:
        return result
    # 超跌
    new_datas = []
    for i in range(0, refer_index):
        # 获取涨幅
        item = datas[i]
        rate = (item["low"] - item["pre_close"]) / item["pre_close"]
        new_datas.append((i, rate))
    new_datas.sort(key=lambda x: x[1])
    refer_index = new_datas[0][0]
    # 获取当前天和后一天较大量
    if refer_index > 0:
        if datas[refer_index - 1]["volume"] > datas[refer_index]["volume"]:
            refer_index -= 1
    return datas[refer_index]["volume"], datas[refer_index]["volume"], datas[refer_index]['bob'].strftime("%Y-%m-%d"), refer_index
# 返回:(60天最大量,昨日量,量参考日期,参考量据今交易日数)
def __parse_max_volume(datas, is_new_or_near_top=False):
    max_volume = 0
    max_volume_date = None
    max_volume_index = None