Administrator
2025-05-25 326e0b138c00fe14ab860441b2e05f8c3c37576f
大单足够下单方式改变
3个文件已修改
178 ■■■■■ 已修改文件
l2/l2_data_manager_new.py 147 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_data_parser.py 30 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/history_k_data_util.py 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_manager_new.py
@@ -219,6 +219,67 @@
        #                                      round(t.time() * 1000) - start_time))
class HumanRadicalBuySingleManager:
    """
    人为买入管理
    """
    # 人为下单标记:{"代码":(信号时间, 间隔时间,信号截至时间, radical_result)}
    __human_radical_buy_mark_info = {}
    @classmethod
    def add_single(cls, code, latest_data, radical_result):
        """
        添加买入信号
        @param code:
        @param latest_data:
        @param radical_result:
        @return:
        """
        start_time_with_ms = L2DataUtil.get_time_with_ms(latest_data["val"])
        if tool.is_sh_code(code):
            cls.__human_radical_buy_mark_info[code] = (
                start_time_with_ms, 400, tool.trade_time_add_millionsecond(start_time_with_ms, 3000), radical_result)
        else:
            cls.__human_radical_buy_mark_info[code] = (
                start_time_with_ms, 30, tool.trade_time_add_millionsecond(start_time_with_ms, 2000), radical_result)
    @classmethod
    def remove_single(cls, code):
        """
        移除信号
        @param code:
        @return:
        """
        if code in cls.__human_radical_buy_mark_info:
            cls.__human_radical_buy_mark_info.pop(code)
    @classmethod
    def has_single(cls, code):
        """
        是否有信号
        @param code:
        @return:
        """
        if code in cls.__human_radical_buy_mark_info:
            return True
    @classmethod
    def is_valid(cls, code, data):
        """
        信号是否有效
        @param code:
        @return: 是否有效,无效消息/有效对象
        """
        if code not in cls.__human_radical_buy_mark_info:
            return False, "没有人买入信号"
        single_time_ms, space_time_ms, expire_time_ms, _ = cls.__human_radical_buy_mark_info[code]
        if tool.trade_time_sub_with_ms(L2DataUtil.get_time_with_ms(data["val"]),
                                       expire_time_ms) > 0:
            cls.__human_radical_buy_mark_info.pop(code)
            return False, "超过信号生效时间"
        return True, cls.__human_radical_buy_mark_info[code]
class L2TradeDataProcessor:
    unreal_buy_dict = {}
    volume_rate_info = {}
@@ -1458,10 +1519,32 @@
        _start_time = tool.get_now_timestamp()
        total_datas = local_today_datas[code]
        # ---------计算激进买入的信号---------
        radical_result = cls.__compute_radical_order_begin_pos(code, compute_start_index, compute_end_index)
        if not HumanRadicalBuySingleManager.has_single(code):
            # ---------计算激进买入的信号---------
            radical_result = cls.__compute_radical_order_begin_pos(code, compute_start_index, compute_end_index)
        else:
            human_radical_result = cls.__compute_human_radical_order_begin_pos(code, compute_start_index,
                                                                               compute_end_index)
            if human_radical_result[0]:
                radical_result = list(human_radical_result[2])
                # 改变执行位置
                radical_result[1] = human_radical_result[1]["index"]
            else:
                radical_result = None
        if radical_result and radical_result[0]:
        if radical_result[0]:
            if not HumanRadicalBuySingleManager.has_single(code):
                big_order_deal_enough_result = radical_buy_data_manager.is_big_order_deal_enough(code,
                                                                                                 code_volumn_manager.CodeVolumeManager().get_volume_rate_refer_in_5days(
                                                                                                     code), 0)
                if big_order_deal_enough_result[6] <= 0:
                    HumanRadicalBuySingleManager.add_single(code, total_datas[-1], radical_result)
                    async_log_util.info(logger_l2_not_buy_reasons, f"{code}#大单足够,需要根据人为下单")
                    return
            # 下单前一步,移除人为下单信号
            is_human_radical_buy = HumanRadicalBuySingleManager.has_single(code)
            HumanRadicalBuySingleManager.remove_single(code)
            buy_single_index, buy_exec_index = radical_result[1], radical_result[1]
            buy_volume_rate = cls.volume_rate_info[code][0]
            refer_sell_data = cls.__L2MarketSellManager.get_refer_sell_data(code, total_datas[buy_single_index]["val"][
@@ -1479,7 +1562,7 @@
                                                     max_num_set=set(),
                                                     buy_volume_rate=buy_volume_rate,
                                                     mode=OrderBeginPosInfo.MODE_RADICAL,
                                                     mode_desc=f"大单不足扫入:{radical_result[2]}",
                                                     mode_desc=f"大单不足扫入:{radical_result[2]} 是否跟人买入-{is_human_radical_buy}",
                                                     sell_info=sell_info,
                                                     threshold_money=threshold_money)
            order_begin_pos_info.at_limit_up = cls.__is_at_limit_up_buy(code)
@@ -2089,6 +2172,62 @@
            async_log_util.info(logger_l2_not_buy_reasons, f"{code}#{result[2]}")
        return result
    @classmethod
    def __compute_human_radical_order_begin_pos(cls, code, start_index, end_index):
        """
        处理跟人买
        @param code:
        @param start_index:
        @param end_index:
        @return:
        """
        total_datas = local_today_datas.get(code)
        result = HumanRadicalBuySingleManager.is_valid(code, total_datas[start_index])
        if not result[0]:
            return False, None, result[1]
        result = result[1]
        single_time_ms, space_time_ms, expire_time_ms, radical_result = result[0], result[1], result[2], result[3]
        bigger_num = l2_data_util.get_big_money_val(gpcode_manager.get_limit_up_price_as_num(code),
                                                    tool.is_ge_code(code)) // (
                             gpcode_manager.get_limit_up_price_as_num(code) * 100)
        canceled_buyno_map = local_today_buyno_map.get(code)
        for i in range(start_index, end_index + 1):
            data = total_datas[i]
            val = data["val"]
            if not L2DataUtil.is_limit_up_price_buy(val):
                continue
            if val["num"] < bigger_num:
                continue
            left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code, i,
                                                                                                     total_datas,
                                                                                                     canceled_buyno_map)
            if left_count == 0:
                continue
            # 判断是否超过生效时间
            if tool.trade_time_sub_with_ms(L2DataUtil.get_time_with_ms(total_datas[i]["val"]),
                                           expire_time_ms) > 0:
                HumanRadicalBuySingleManager.remove_single(code)
                return False, None, "超过信号生效时间"
            is_valid = False
            # 判断距离上个50w买单的时间是否超过了space_time_ms
            for ii in range(i - 1, -1, -1):
                data_child = total_datas[ii]
                val_child = data_child["val"]
                if not L2DataUtil.is_limit_up_price_buy(val_child):
                    continue
                if val_child["num"] * float(val_child["price"]) < 5000:
                    continue
                if tool.trade_time_sub_with_ms(L2DataUtil.get_time_with_ms(val),
                                               L2DataUtil.get_time_with_ms(val_child)) > space_time_ms:
                    is_valid = True
                    break
            if is_valid:
                return True, data, radical_result
        return False, None, "没有有效信号"
    # 总卖额参考时间使用记录
    __refer_sell_used_times = {}
l2_data_parser.py
@@ -5,6 +5,8 @@
import os
import sys
import time
from multiprocessing import Pool
import pandas as pd
from db import mysql_data_delegate as mysql_data
@@ -224,6 +226,9 @@
                # 将文件写入到文本
                writer.writerow(row)
def test1(args):
    index, df = args
    print(index)
def pre_process_transactions(csv_path="E:/测试数据/Transaction_Test.csv"):
    def str_to_float(s):
@@ -260,16 +265,24 @@
            'EndPrice': group['TradePrice'].iloc[-1]
        })
    dtype = {
        'SecurityID': 'category',  # 低基数分类数据
    }
    chunk_size = 100000
    chunk_size = 10000
    # 创建DataFrame
    chunks = pd.read_csv(csv_path, chunksize=chunk_size)
    indexed_data = list(enumerate(chunks))
    # 新写法
    with Pool(processes=4) as pool:
        pool.map(test1, indexed_data)
    result_list = []
    index = 0
    for df in chunks:
        index += 1
    for chunk_index, chunk in enumerate(chunks):
        df = chunk.copy()
        index = chunk_index + 1
        child_path = csv_path.replace(".csv", f"_{index}.csv")
        if os.path.exists(child_path):
            continue
@@ -279,7 +292,6 @@
        df = df[df["SecurityID"].str.startswith(("30", "00", "60"), na=False)]
        # 计算成交金额
        df['TradeAmount'] = df['TradePrice'] * df['TradeVolume']
        # 按SecurityID和BuyNo分组
        grouped = df.groupby(['SecurityID', 'BuyNo'])
@@ -319,7 +331,7 @@
    # [ExchangeID,SecurityID,MainSeq,SubSeq,TickTime,TickType,BuyNo,SellNo,Price,Volume,TradeMoney,Side,TradeBSFlag,MDSecurityStat,Info1,Info2,Info3,LocalTimeStamp]
    chunk_size = 200000
    chunk_size = 10000
    # 创建DataFrame
    chunks = pd.read_csv(csv_path, chunksize=chunk_size)
    result_list = []
@@ -346,13 +358,13 @@
    print(f"处理完毕,总共{index}批")
if __name__ == '__main__1':
if __name__ == '__main__':
    # df = pd.read_csv(f"E:/测试数据/Transaction_Test.csv")
    pre_process_ngtstick()
    pre_process_transactions()
# 命令模式  /home/userzjj/app/gp-server/l2_data_parser Transaction  2025-05-08
# 解析大单: /home/userzjj/app/gp-server/l2_data_parser ExtractDealBigOrder 2025-05-09 /home/userzjj/最终成交数据20250509.txt 000555
if __name__ == '__main__':
if __name__ == '__main__1':
    if len(sys.argv) > 1:
        params = sys.argv[1:]
        print("接收的参数", params)
third_data/history_k_data_util.py
@@ -185,7 +185,6 @@
                                                     fields=fields)
    @classmethod
    @classmethod
    def get_history_instruments(cls, symbols, start_date, end_date, fields=None):
        if constant.JUEJIN_LOCAL_API:
            account_id, s_id, token = cls.getJueJinAccountInfo()