7个文件已修改
178 ■■■■■ 已修改文件
data_parser/transaction_big_order_parser.py 116 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_data_parser.py 16 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_test.py 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
servers/data_server.py 12 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
servers/huaxin_trade_server.py 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/buy_radical/radical_buy_data_manager.py 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/buy_radical/radical_buy_strategy.py 22 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
data_parser/transaction_big_order_parser.py
@@ -10,6 +10,10 @@
from utils import tool
def print_log(*args):
    print(f"[{tool.get_now_time_str()}]", *args)
class BigOrderDealParser:
    @classmethod
@@ -55,7 +59,7 @@
    child_path = os.path.join(children_dir, f"{index}.csv")
    if os.path.exists(child_path):
        return
    print(f"处理Transaction第{index}批次", os.getpid())
    print_log(f"处理Transaction第{index}批次", os.getpid())
    df["TradePrice"] = df["TradePrice"].apply(BigOrderDealParser.str_to_float)
    df["SecurityID"] = df["SecurityID"].apply(BigOrderDealParser.code_format)
    df = df[df["SecurityID"].str.startswith(("30", "00", "60"), na=False)]
@@ -63,7 +67,7 @@
    df['TradeAmount'] = df['TradePrice'] * df['TradeVolume']
    df = df[df["TradeAmount"] > 0]
    # 判断是否为空
    # print(df.empty)
    # print_log(df.empty)
    # 按SecurityID和BuyNo分组
    grouped = df.groupby(['SecurityID', 'BuyNo'])
    # 应用聚合函数
@@ -106,7 +110,7 @@
    child_path = os.path.join(children_dir, f"{index}.csv")
    if os.path.exists(child_path):
        return
    print(f"处理NGTSTick第{index}批次")
    log(f"处理NGTSTick第{index}批次")
    df = df[df["TickType"] == 'T']
    df["Price"] = df["Price"].apply(BigOrderDealParser.str_to_float)
    df["SecurityID"] = df["SecurityID"].apply(BigOrderDealParser.code_format)
@@ -142,7 +146,7 @@
    """
    combined_path = os.path.join(dir_path, 'combined.csv')
    if os.path.exists(combined_path):
        print("合并的目标文件已存在")
        print_log("合并的目标文件已存在")
        return
    file_list = os.listdir(dir_path)
    file_list.sort(key=lambda x: int(re.findall(r'\d+', x)[0]))
@@ -153,14 +157,14 @@
            continue
        df["SecurityID"] = df["SecurityID"].apply(BigOrderDealParser.code_format)
        df_list.append(df)
    print("准备合并的文件数量:", len(df_list))
    print_log("准备合并的文件数量:", len(df_list))
    combined_df = pd.concat(df_list, ignore_index=True)
    print("合并完成,准备写入文件!")
    print_log("合并完成,准备写入文件!")
    # 保存结果
    combined_df.to_csv(combined_path, index=False)
    print("写入文件完成!")
    print_log("写入文件完成!")
def concat_pre_transactions(dir_path):
@@ -179,7 +183,7 @@
    """
    combined_path = os.path.join(dir_path, 'combined.csv')
    if not os.path.exists(combined_path):
        print("拼接数据不存在")
        print_log("拼接数据不存在")
        return
    df = pd.read_csv(combined_path)
    df_copy = df.copy()
@@ -187,21 +191,45 @@
    # 应用聚合函数
    chunk_result = grouped.apply(pd.Series({}))
    # chunk_result["SecurityID"] = chunk_result["SecurityID"].apply(BigOrderDealParser.code_format)
    print(chunk_result.to_string(
    print_log(chunk_result.to_string(
        index=False,  # 不显示索引
        justify='left',  # 左对齐
        float_format='%.3f'  # 浮点数格式
    ))
def extract_big_order_of_code(dir_path, code=None):
    """
    提取代码的大单
    @param dir_path: 数据目录
    @param code: 为空表示导出全部
    @return:
    """
__combined_df_cache = {}
def extract_big_order_of_all(dir_path, process_count=4):
    combined_path = os.path.join(dir_path, 'combined.csv')
    if not os.path.exists(combined_path):
        print_log("拼接数据不存在")
        return
    codes = extract_big_order_codes(dir_path)
    print_log("总代码数量:", len(codes))
    for code in codes:
        extract_big_order_of_code(dir_path, code)
    combined_path = os.path.join(dir_path, 'combined.csv')
    if not os.path.exists(combined_path):
        print_log("拼接数据不存在")
        return
    output_path = os.path.join(dir_path, f"big_buy_{code}.csv")
    if os.path.exists(output_path):
        print_log("路径已存在:", output_path)
        return
    df = __combined_df_cache.get(combined_path, None)
    if df is None:
        df = pd.read_csv(combined_path)
        __combined_df_cache[combined_path] = df
    args = [(code, df) for code in codes]
    # 新写法
    with Pool(processes=process_count) as pool:
        pool.map(__extract_big_order_of_code, args)
def __extract_big_order_of_code(args):
    def first_last(group):
        """
            获取第一条数据与最后一条
@@ -219,29 +247,46 @@
            'StartPrice': group['StartPrice'].iloc[0]
        })
    combined_path = os.path.join(dir_path, 'combined.csv')
    if not os.path.exists(combined_path):
        print("拼接数据不存在")
    dir_path, code, df = args[0], args[1], args[2]
    output_path = os.path.join(dir_path, f"big_buy_{code}.csv")
    if os.path.exists(output_path):
        print_log("路径已存在:", output_path)
        return
    df = pd.read_csv(combined_path)
    df_copy = df.copy()
    if code:
        df_copy = df_copy[df_copy["SecurityID"] == int(code)]
    if df_copy.empty:
        print("目标代码对应成交数据为空")
        print_log("目标代码对应成交数据为空")
        return
    df_copy["SecurityID"] = df_copy["SecurityID"].apply(BigOrderDealParser.code_format)
    # 按SecurityID和BuyNo分组
    grouped = df_copy.groupby(['SecurityID', 'BuyNo'])
    grouped_result = grouped.apply(first_last)
    grouped_result = grouped_result[grouped_result["TotalAmount"] > 500000]
    # print(grouped_result)
    # print_log(grouped_result)
    # 遍历内容
    if code:
        grouped_result.to_csv(os.path.join(dir_path, f"big_buy_{code}.csv"), index=False)
    else:
        grouped_result.to_csv(os.path.join(dir_path, f"big_buy.csv"), index=False)
    print("保存成功")
    grouped_result.to_csv(output_path, index=False)
    print_log(f"[{tool.get_now_time_str()}]保存成功,路径:{output_path}")
def extract_big_order_of_code(dir_path, code):
    """
    提取代码的大单
    @param dir_path: 数据目录
    @param code: 为空表示导出全部
    @return:
    """
    combined_path = os.path.join(dir_path, 'combined.csv')
    if not os.path.exists(combined_path):
        print_log("拼接数据不存在")
        return
    df = __combined_df_cache.get(combined_path, None)
    if df is None:
        df = pd.read_csv(combined_path)
        __combined_df_cache[combined_path] = df
    __extract_big_order_of_code((dir_path, code, df))
def extract_big_order_codes(dir_path):
@@ -251,24 +296,14 @@
    @param code:
    @return:
    """
    def first_last(group):
        """
            获取第一条数据与最后一条
            @param group:
            @return:
            """
        return pd.Series({
        })
    combined_path = os.path.join(dir_path, 'combined.csv')
    if not os.path.exists(combined_path):
        print("拼接数据不存在")
        print_log("拼接数据不存在")
        return
    df = pd.read_csv(combined_path)
    df_copy = df.copy()
    if df_copy.empty:
        print("目标代码对应成交数据为空")
        print_log("目标代码对应成交数据为空")
        return
    df_copy["SecurityID"] = df_copy["SecurityID"].apply(BigOrderDealParser.code_format)
    # 按SecurityID和BuyNo分组
@@ -277,8 +312,9 @@
if __name__ == "__main__":
    print_log(1, 2, 3)
    # pre_process_transactions("E:/测试数据/Transaction_Test.csv")
    # pre_process_ngtsticks("E:/测试数据/NGTSTick_Test.csv")
    # concat_pre_transactions("E:/测试数据/Transaction_Test")
    # extract_big_order_codes("E:/测试数据/Transaction_Test")
    extract_big_order_of_code("E:/测试数据/Transaction_Test")
    extract_big_order_of_all("E:/测试数据/Transaction_Test")
l2_data_parser.py
@@ -405,13 +405,11 @@
            transaction_big_order_parser.concat_pre_ngtsticks(f"/home/userzjj/ftp/{day}/NGTSTick")
        elif _type == 'ExtractDealBigOrder':
            # 命令模式  /home/userzjj/app/gp-server/l2_data_parser ExtractDealBigOrder 2025-05-09
            # 根据code提取大单
            if not code:
                transaction_big_order_parser.extract_big_order_of_code(f"/home/userzjj/ftp/{day}/NGTSTick")
                transaction_big_order_parser.extract_big_order_of_code(f"/home/userzjj/ftp/{day}/Transaction")
            if len(params) > 2:
                process_count = int(params[2].strip())
            else:
                if tool.is_sh_code(code):
                    transaction_big_order_parser.extract_big_order_of_code(f"/home/userzjj/ftp/{day}/NGTSTick", code)
                else:
                    transaction_big_order_parser.extract_big_order_of_code(f"/home/userzjj/ftp/{day}/Transaction", code)
                process_count = 10
            transaction_big_order_parser.extract_big_order_of_all(f"/home/userzjj/ftp/{day}/NGTSTick",
                                                                  process_count=process_count)
            transaction_big_order_parser.extract_big_order_of_all(f"/home/userzjj/ftp/{day}/Transaction",
                                                                  process_count=process_count)
l2_test.py
@@ -124,10 +124,10 @@
            volume = zylt_volume_map.get(code)
            # 今日涨停价要突破昨日最高价
            k_bars = HistoryKDataManager().get_history_bars(code, last_trade_day)
            if k_bars and 30e8 <= k_bars[0]["close"] * volume * tool.get_limit_up_rate(code) <= 300e8:
                # 自由流通市值在30亿-300亿以上
            if k_bars and 10e8 <= k_bars[0]["close"] * volume * tool.get_limit_up_rate(code) <= 300e8:
                # 自由流通市值在10亿-300亿以上
                limit_up_price = round(tool.get_limit_up_rate(code) * k_bars[0]["close"], 2)
                if limit_up_price > k_bars[0]["high"]:
                if limit_up_price > k_bars[0]["high"] or True:
                    # 今日涨停价要突破昨日最高价
                    codes.add(code)
        # 获取辨识度的票
servers/data_server.py
@@ -9,6 +9,7 @@
import requests
import constant
import inited_data
from code_attribute.gpcode_manager import BlackListCodeManager, HumanRemoveForbiddenManager
from l2.huaxin import huaxin_target_codes_manager
from l2.l2_transaction_data_manager import HuaXinBuyOrderManager
@@ -24,7 +25,7 @@
from trade.buy_radical.new_block_processor import BeforeBlocksComputer
from trade.buy_strategy import OpenLimitUpGoodBlocksBuyStrategy
from trade.buy_radical.radical_buy_data_manager import RadicalBuyBlockManager, BeforeSubDealBigOrderManager
from utils import global_util, tool, data_export_util
from utils import global_util, tool, data_export_util, init_data_util
from code_attribute import gpcode_manager, code_nature_analyse
from log_module import log_analyse, log_export, async_log_util
from l2 import code_price_manager, l2_data_util, transaction_progress
@@ -970,6 +971,11 @@
                                # 加载涨停大单详情
                                limit_up_big_order_detail = radical_buy_data_manager.get_total_detal_big_order_details(
                                    code)
                                if max(limit_up_big_order_detail) == 0:
                                    # 没有数据,从网络加载
                                    limit_up_big_order_detail = list(limit_up_big_order_detail)
                                    limit_up_big_order_detail[1] = deal_big_order_detail_info[1][0]
                                    limit_up_big_order_detail[3] = deal_big_order_detail_info[2][0]
                                deal_big_order_info.append(
                                    output_util.money_desc(limit_up_big_order_detail[0] + limit_up_big_order_detail[1]))
                                deal_big_order_info.append(
@@ -979,6 +985,8 @@
                        except Exception as e:
                            logger_debug.error(f"可能没有获取到涨停价:{code}")
                            if not gpcode_manager.get_limit_up_price(code):
                                init_data_util.re_set_price_pre(code)
                            # logger_debug.exception(e)
                            deal_big_order_info = None
                        code_name = gpcode_manager.get_code_name(code)
@@ -1060,7 +1068,7 @@
                        code_info_list.append((d[0], d[6]))
                    # 保存新题材
                    datas = [(d[0], d[6]) for d in result["list"]]
                    async_log_util.info(logger_kpl_new_blocks, f"{(tool.get_thread_id() ,bi, datas)}")
                    async_log_util.info(logger_kpl_new_blocks, f"{(tool.get_thread_id(), bi, datas)}")
                    if code_info_list:
                        # 将代码加入新题材
                        new_block_processor.process_new_block_by_component_codes(bi[0],
servers/huaxin_trade_server.py
@@ -828,7 +828,7 @@
                        radical_buy_data_manager.ExcludeIndexComputeCodesManager.remove_code(code)
                        if result_by_volume[0] == radical_buy_strategy.BUY_MODE_DIRECT and not tool.is_sh_code(code):
                        if result_by_volume[0] == radical_buy_strategy.BUY_MODE_DIRECT:
                            # 上证不能根据成交买入
                            latest_deal_time = l2_huaxin_util.convert_time(transaction_datas[-1][3])
                            refer_sell_data = L2MarketSellManager().get_refer_sell_data(code, latest_deal_time)
@@ -841,7 +841,8 @@
                            threshold_money = 0
                            every_deal_orders = EveryLimitupBigDealOrderManager.list_big_buy_deal_orders(code)
                            if every_deal_orders:
                                min_order_no = min(every_deal_orders, lambda x: x[0])[0]
                                min_order_no_info = min(every_deal_orders, key=lambda x: x[0])
                                min_order_no = min_order_no_info[0]
                            else:
                                min_order_no = transaction_datas[-1][6]
trade/buy_radical/radical_buy_data_manager.py
@@ -2180,7 +2180,6 @@
        return datas
    return None
def list_l2_big_order_deal_info(codes):
    """
    获取成交大单信息
trade/buy_radical/radical_buy_strategy.py
@@ -167,9 +167,22 @@
    # 每次上板的大单与金额
    big_order_count = radical_buy_data_manager.EveryLimitupBigDealOrderManager.get_big_buy_deal_order_count(code)
    big_order_money = radical_buy_data_manager.EveryLimitupBigDealOrderManager.get_big_buy_deal_order_money(code)
    total_lack_money_info = radical_buy_data_manager.get_total_deal_big_order_info(code,
                                                                                   gpcode_manager.get_limit_up_price_as_num(
                                                                                       code),
                                                                                   is_for_buy=True)
    if total_lack_money_info and total_lack_money_info[2] > 1e8 and total_lack_money_info[0] <= 0:
        # 要求的大单够了 以后,回封买,只要有两个大单成交了,立即下单
        THRESHOLD_MONEY, is_temp_threshold_money = radical_buy_data_manager.BeforeSubDealBigOrderManager().get_big_order_threshold_info(
            code)
        if big_order_money > THRESHOLD_MONEY * 2:
            return BUY_MODE_DIRECT, f"有两个以上大单瞬时成交({big_order_money}/{THRESHOLD_MONEY * 2}), 大单足够"
    if big_order_count >= 2:
        # 判断大单是否满足
        average_big_order_money = int(big_order_money / big_order_count)
        # 如果均价涨幅小于7%均大单等于299w
        # 如果均价涨幅小于7%,均大单等于299w
        average_rate = Buy1PriceManager().get_average_rate(code)
        if average_rate is not None and average_rate < 0.07:
            average_big_order_money = 299 * 10000
@@ -200,12 +213,7 @@
            if tool.is_sz_code(code) and refer_sell_money >= 5e7:
                return BUY_MODE_DIRECT, f"剩余涨停总卖额-{left_limit_up_sell_money},均大单-{average_big_order_money}, 剩余阈值-{threshold_left_sell_money}, 总抛压大({refer_sell_money})"
            else:
                # 判断大单是否满足
                total_lack_money_info = radical_buy_data_manager.get_total_deal_big_order_info(code,
                                                                                               gpcode_manager.get_limit_up_price_as_num(
                                                                                                   code),
                                                                                               is_for_buy=True)
                if total_lack_money_info[0] <= 0:
                if total_lack_money_info[0] <= 0 and tool.is_sz_code(code):
                    return BUY_MODE_DIRECT, f"剩余涨停总卖额-{left_limit_up_sell_money},均大单-{average_big_order_money}, 大单足够"
    else:
        average_big_order_money = 0