7个文件已修改
215 ■■■■ 已修改文件
api/outside_api_command_callback.py 21 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
data_parser/transaction_big_order_parser.py 83 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_manager_new.py 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_data_parser.py 40 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
output/output_util.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
servers/data_server.py 4 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/buy_radical/radical_buy_data_manager.py 62 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
api/outside_api_command_callback.py
@@ -54,7 +54,8 @@
from trade.huaxin import huaxin_trade_api, huaxin_trade_data_update, \
    huaxin_trade_record_manager, huaxin_trade_order_processor, huaxin_sell_util
from trade.huaxin.huaxin_trade_record_manager import PositionManager, DealRecordManager, DelegateRecordManager
from trade.buy_radical.radical_buy_data_manager import RadicalBuyBlockManager, BeforeSubDealBigOrderManager
from trade.buy_radical.radical_buy_data_manager import RadicalBuyBlockManager, BeforeSubDealBigOrderManager, \
    TotalDealBigOrderThresholdMoneyManager
from trade.sell import sell_manager
from trade.sell.sell_rule_manager import TradeRuleManager, SellRule
from trade.trade_data_manager import RadicalBuyDealCodesManager
@@ -1319,7 +1320,9 @@
                    if volumes_data:
                        is_new_top = code_nature_analyse.is_new_top(code,
                                                                    gpcode_manager.get_limit_up_price_by_preprice(code,
                                                                                                                  volumes_data[0]["close"]),
                                                                                                                  volumes_data[
                                                                                                                      0][
                                                                                                                      "close"]),
                                                                    volumes_data)
                data = {
@@ -1476,8 +1479,18 @@
                                   client_id,
                                   request_id)
            elif ctype == "set_total_deal_big_order_threshold_money":
                code = data.get("code")
                money = data.get("money")
                if not code or not money:
                    self.send_response({"code": 1, "data": {}, "msg": "code/money为空"},
                                       client_id,
                                       request_id)
                    return
                TotalDealBigOrderThresholdMoneyManager().set_money(code, int(money))
                self.send_response({"code": 0, "data": {}},
                                   client_id,
                                   request_id)
        except Exception as e:
            logging.exception(e)
            logger_debug.exception(e)
data_parser/transaction_big_order_parser.py
@@ -194,10 +194,91 @@
    ))
def extract_big_order_of_code(dir_path, code=None):
    """
    提取代码的大单
    @param dir_path: 数据目录
    @param code: 为空表示导出全部
    @return:
    """
    def first_last(group):
        """
            获取第一条数据与最后一条
            @param group:
            @return:
            """
        return pd.Series({
            'SecurityID': group['SecurityID'].iloc[0],
            'BuyNo': group['BuyNo'].iloc[0],
            'TotalVolume': group['TotalVolume'].sum(),
            'TotalAmount': group['TotalAmount'].sum(),
            'EndTime': group['EndTime'].iloc[-1],
            'EndPrice': group['EndPrice'].iloc[-1],
            'StartTime': group['StartTime'].iloc[0],
            'StartPrice': group['StartPrice'].iloc[0]
        })
    combined_path = os.path.join(dir_path, 'combined.csv')
    if not os.path.exists(combined_path):
        print("拼接数据不存在")
        return
    df = pd.read_csv(combined_path)
    df_copy = df.copy()
    if code:
        df_copy = df_copy[df_copy["SecurityID"] == int(code)]
    if df_copy.empty:
        print("目标代码对应成交数据为空")
        return
    df_copy["SecurityID"] = df_copy["SecurityID"].apply(BigOrderDealParser.code_format)
    # 按SecurityID和BuyNo分组
    grouped = df_copy.groupby(['SecurityID', 'BuyNo'])
    grouped_result = grouped.apply(first_last)
    grouped_result = grouped_result[grouped_result["TotalAmount"] > 500000]
    # print(grouped_result)
    # 遍历内容
    if code:
        grouped_result.to_csv(os.path.join(dir_path, f"big_buy_{code}.csv"), index=False)
    else:
        grouped_result.to_csv(os.path.join(dir_path, f"big_buy.csv"), index=False)
    print("保存成功")
def extract_big_order_codes(dir_path):
    """
    导出大单代码
    @param dir_path: 数据目录
    @param code:
    @return:
    """
    def first_last(group):
        """
            获取第一条数据与最后一条
            @param group:
            @return:
            """
        return pd.Series({
        })
    combined_path = os.path.join(dir_path, 'combined.csv')
    if not os.path.exists(combined_path):
        print("拼接数据不存在")
        return
    df = pd.read_csv(combined_path)
    df_copy = df.copy()
    if df_copy.empty:
        print("目标代码对应成交数据为空")
        return
    df_copy["SecurityID"] = df_copy["SecurityID"].apply(BigOrderDealParser.code_format)
    # 按SecurityID和BuyNo分组
    grouped = df_copy.groupby(['SecurityID'])
    return set(grouped.groups.keys())
if __name__ == "__main__":
    # pre_process_transactions("E:/测试数据/Transaction_Test.csv")
    # pre_process_ngtsticks("E:/测试数据/NGTSTick_Test.csv")
    # concat_pre_transactions("E:/测试数据/Transaction_Test")
    process_combined_transaction("E:/测试数据/Transaction_Test")
    # extract_big_order_codes("E:/测试数据/Transaction_Test")
    extract_big_order_of_code("E:/测试数据/Transaction_Test")
l2/l2_data_manager_new.py
@@ -1041,7 +1041,6 @@
            else:
                try:
                    # 判断是否为首封下单
                    order_begin_pos.first_limit_up_buy = radical_buy_data_manager.is_first_limit_up_buy(code)
@@ -1054,7 +1053,7 @@
                                                                 order_begin_pos.buy_exec_index,
                                                                 local_today_datas.get(code))
                        return False
                    l2_log.debug(code, "可以下单,原因:{}, 下单模式:{}", reason, order_begin_pos.mode)
                    l2_log.debug(code, "可以下单,原因:{}, 下单模式:{} 最小订单号:{}", reason, order_begin_pos.mode, order_begin_pos.min_order_no)
                    l2_log.debug(code, "开始执行买入")
                    trade_manager.start_buy(code, capture_timestamp, last_data,
                                            last_data_index, order_begin_pos.mode, order_begin_pos.buy_exec_index)
l2_data_parser.py
@@ -227,9 +227,11 @@
                # 将文件写入到文本
                writer.writerow(row)
def test1(args):
    index, df = args
    print(index)
def pre_process_transactions(csv_path="E:/测试数据/Transaction_Test.csv"):
    def str_to_float(s):
@@ -265,9 +267,6 @@
            'EndTime': group['TradeTime'].iloc[-1],
            'EndPrice': group['TradePrice'].iloc[-1]
        })
    dtype = {
        'SecurityID': 'category',  # 低基数分类数据
@@ -384,26 +383,35 @@
        elif _type == 'MarketData':
            parse_market_data(day)
        elif _type == 'Transaction_New':
            transaction_big_order_parser.pre_process_transactions(f"/home/userzjj/ftp/{day}/Transaction.csv")
            if len(params) > 2:
                process_count = int(params[2].strip())
            else:
                process_count = 4
            transaction_big_order_parser.pre_process_transactions(f"/home/userzjj/ftp/{day}/Transaction.csv",
                                                                  process_count=process_count)
            transaction_big_order_parser.concat_pre_transactions(f"/home/userzjj/ftp/{day}/Transaction")
        elif _type == 'NGTSTick_New':
            transaction_big_order_parser.pre_process_ngtsticks(f"/home/userzjj/ftp/{day}/NGTSTick.csv")
            if len(params) > 2:
                process_count = int(params[2].strip())
            else:
                process_count = 4
            transaction_big_order_parser.pre_process_ngtsticks(f"/home/userzjj/ftp/{day}/NGTSTick.csv",
                                                               process_count=process_count)
            transaction_big_order_parser.concat_pre_ngtsticks(f"/home/userzjj/ftp/{day}/NGTSTick")
        elif _type == 'Transaction_Concat':
            transaction_big_order_parser.concat_pre_transactions(f"/home/userzjj/ftp/{day}/Transaction")
        elif _type == 'NGTSTick_Concat':
            transaction_big_order_parser.concat_pre_ngtsticks(f"/home/userzjj/ftp/{day}/NGTSTick")
        elif _type == 'ExtractDealBigOrder':
            # 提取所有成交的大单
            if len(params) > 2:
                save_path = params[2].strip()
            # 命令模式  /home/userzjj/app/gp-server/l2_data_parser ExtractDealBigOrder 2025-05-09
            # 根据code提取大单
            if not code:
                transaction_big_order_parser.extract_big_order_of_code(f"/home/userzjj/ftp/{day}/NGTSTick")
                transaction_big_order_parser.extract_big_order_of_code(f"/home/userzjj/ftp/{day}/Transaction")
            else:
                save_path = None
                if tool.is_sh_code(code):
                    transaction_big_order_parser.extract_big_order_of_code(f"/home/userzjj/ftp/{day}/NGTSTick", code)
                else:
                    transaction_big_order_parser.extract_big_order_of_code(f"/home/userzjj/ftp/{day}/Transaction", code)
            if len(params) > 3:
                target_code = params[3].strip()
            else:
                target_code = None
            parse_deal_big_orders(day, save_path, target_code)
output/output_util.py
@@ -2,7 +2,7 @@
from l2 import l2_data_util
def money_desc(money):
    if abs(money) > 100000000:
    if abs(money) >= 100000000:
        return f"{round(money / 100000000, 2)}亿"
    else:
        return f"{round(money / 10000, 2)}万"
servers/data_server.py
@@ -961,7 +961,9 @@
                                 big_money_rate * 100  # 大单成交比
                                 ),
                                output_util.money_desc(deal_big_money_info[1]),
                                output_util.money_desc(deal_big_money_info[2])]
                                output_util.money_desc(deal_big_money_info[2]),
                                output_util.money_desc(deal_big_money_info[3]),
                            ]
                            if len(codes) == 1:
                                # 加载大单详情
                                deal_big_order_detail_info = radical_buy_data_manager.get_l2_big_order_deal_info(code)
trade/buy_radical/radical_buy_data_manager.py
@@ -314,6 +314,49 @@
@tool.singleton
class TotalDealBigOrderThresholdMoneyManager:
    """
    累计成交大单阈值管理(人为设置)
    """
    __db = 3
    __redis_manager = redis_manager.RedisManager(3)
    __total_big_order_threshold = {}
    def __init__(self):
        self.__load_data()
    def __get_redis(self):
        return self.__redis_manager.getRedis()
    def __load_data(self):
        keys = redis_manager.RedisUtils.keys(self.__get_redis(), "total_radical_big_order_threshold-*")
        for k in keys:
            code = k.split("-")[1]
            val = redis_manager.RedisUtils.get(self.__get_redis(), k)
            val = int(val)
            self.__total_big_order_threshold[code] = val
    def set_money(self, code, money):
        """
        设置金额
        @param code:
        @param money:
        @return:
        """
        self.__total_big_order_threshold[code] = money
        redis_manager.RedisUtils.setex_async(self.__db, f"total_radical_big_order_threshold-{code}", tool.get_expire(),
                                             money)
    def get_money_cache(self, code):
        """
        获取缓存
        @param code:
        @return:
        """
        return self.__total_big_order_threshold.get(code)
@tool.singleton
class RadicalCodeMarketInfoManager:
    """
    激进买的票行情数据管理
@@ -617,7 +660,12 @@
                        code)
                    if deal_money >= 2 * THRESHOLD_MONEY:
                        gpcode_manager.WhiteListCodeManager().add_code(code)
                        trade_record_log_util.add_common_msg(code, "加白",  f"{code}大单成交足够加白, 本批次成交金额-{deal_money}/{THRESHOLD_MONEY * 2}  累计大单金额:{total_deal_big_order_result[1]}/{total_deal_big_order_result[2]}")
                        trade_record_log_util.add_common_msg(code, "加白",
                                                             f"{code}大单成交足够加白, 本批次成交金额-{deal_money}/{THRESHOLD_MONEY * 2}  累计大单金额:{total_deal_big_order_result[1]}/{total_deal_big_order_result[2]}")
                    else:
                        async_log_util.info(logger_debug,
                                            f"{code}-成交大单少({deal_money}/{2 * THRESHOLD_MONEY},最小订单号-{order_begin_pos.min_order_no}),不能加白")
        except Exception as e:
            logger_debug.exception(e)
            async_log_util.info(logger_debug, f"处理成交大单足够加白的问题:{str(e)}")
@@ -1717,7 +1765,7 @@
    """
    总成交大单啊是否足够
    @param code:
    @return:(缺少的资金,总成交金额, 要求的大单金额)
    @return:(缺少的资金, 总成交金额, 要求的大单金额, 计算得到的大单阈值金额)
    """
    THRESHOLD_MONEY, is_temp_threshold_money = BeforeSubDealBigOrderManager().get_big_order_threshold_info(code)
@@ -1729,7 +1777,13 @@
    if tool.is_ge_code(code):
        TOTAL_BIG_DEAL_MONEY_THRESHOLD_COUNT *= 3.3
    TOTAL_BIG_DEAL_MONEY_THRESHOLD_MONEY = TOTAL_BIG_DEAL_MONEY_THRESHOLD_COUNT * THRESHOLD_MONEY
    TOTAL_BIG_DEAL_MONEY_THRESHOLD_MONEY_WITH_COMPUTE = TOTAL_BIG_DEAL_MONEY_THRESHOLD_COUNT * THRESHOLD_MONEY
    human_setting_money = TotalDealBigOrderThresholdMoneyManager().get_money_cache(code)
    if human_setting_money is not None:
        TOTAL_BIG_DEAL_MONEY_THRESHOLD_MONEY = int(human_setting_money)
    else:
        TOTAL_BIG_DEAL_MONEY_THRESHOLD_MONEY = TOTAL_BIG_DEAL_MONEY_THRESHOLD_MONEY_WITH_COMPUTE
    # if is_for_buy and is_temp_threshold_money:
    #     # 首次上板买入,大单阈值打3折
@@ -1749,7 +1803,7 @@
    except Exception as e:
        async_log_util.info(logger_l2_radical_buy, f"计算正在成交大单出错:{str(e)}")
    total_lack_money = max(0, int(TOTAL_BIG_DEAL_MONEY_THRESHOLD_MONEY - deal_big_order_money))
    return total_lack_money, deal_big_order_money, TOTAL_BIG_DEAL_MONEY_THRESHOLD_MONEY
    return total_lack_money, deal_big_order_money, TOTAL_BIG_DEAL_MONEY_THRESHOLD_MONEY, TOTAL_BIG_DEAL_MONEY_THRESHOLD_MONEY_WITH_COMPUTE
def is_big_order_deal_enough(code, volume_rate, refer_total_sell_money, for_buy=False, is_almost_open_limit_up=False):