Administrator
2025-06-09 8b7972581d0324e3f634b5b5a57a9ed7db1addaf
servers/data_server.py
@@ -9,21 +9,25 @@
import requests
import constant
from code_attribute.gpcode_manager import BlackListCodeManager
import inited_data
from api import low_suction_data_pusher
from code_attribute.gpcode_manager import BlackListCodeManager, HumanRemoveForbiddenManager
from l2.huaxin import huaxin_target_codes_manager
from l2.l2_transaction_data_manager import HuaXinBuyOrderManager
from log_module.log import logger_system, logger_debug, logger_kpl_limit_up, logger_request_api
from log_module.log import logger_system, logger_debug, logger_kpl_limit_up, logger_request_api, \
    logger_kpl_market_strong, logger_kpl_new_blocks
from third_data.custom_block_in_money_manager import CodeInMoneyManager
from third_data.kpl_data_constant import LimitUpCodesBlockRecordManager, LimitUpDataConstant, \
    ContainsLimitupCodesBlocksManager
from third_data.kpl_limit_up_data_manager import LatestLimitUpBlockManager, CodeLimitUpSequenceManager
from third_data.third_blocks_manager import BlockMapManager
from trade.buy_radical import radical_buy_data_manager
from trade.buy_radical import radical_buy_data_manager, new_block_processor
from trade.buy_radical.block_special_codes_manager import BlockSpecialCodesManager
from trade.buy_radical.new_block_processor import BeforeBlocksComputer
from trade.buy_strategy import OpenLimitUpGoodBlocksBuyStrategy
from trade.buy_radical.radical_buy_data_manager import RadicalBuyBlockManager, BeforeSubDealBigOrderManager
from utils import global_util, tool, data_export_util
from code_attribute import gpcode_manager
from utils import global_util, tool, data_export_util, init_data_util
from code_attribute import gpcode_manager, code_nature_analyse
from log_module import log_analyse, log_export, async_log_util
from l2 import code_price_manager, l2_data_util, transaction_progress
from cancel_strategy.s_l_h_cancel_strategy import HourCancelBigNumComputer, LCancelRateManager
@@ -62,6 +66,8 @@
    __industry_cache_dict = {}
    __latest_limit_up_codes_set = set()
    __data_process_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=10)
    # 新题材请求
    __new_blocks_codes_request_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=5)
    # 代码的涨幅
    __code_limit_rate_dict = {}
@@ -805,8 +811,9 @@
                deals_month = trade_data_manager.AccountMoneyManager().get_deal_count_info(start_date, end_date)
                cost_month = sum([round(0.1 * x[1], 2) for x in delegates_month])
                make_month = 0
                make_month += max(1 * deals_month[0][1], deals_month[0][2] * 1.854 / 10000) + 1 * deals_month[1][
                    1] + 0 * deals_month[2][1]
                make_month += max(1 * deals_month[0][1] if deals_month[0][1] else 0,
                                  deals_month[0][2] * 1.854 / 10000 if deals_month[0][2] else 0) + 1 * deals_month[1][
                                  1] + 0 * deals_month[2][1]
                fdata["month_commission"] = round(make_month - cost_month, 2)
                # 计算当日手续费详情
                delegates = trade_data_manager.AccountMoneyManager().get_delegated_count_info()
@@ -942,8 +949,39 @@
                                th_sell = BeforeSubDealBigOrderManager().get_big_sell_order_threshold(code)
                            except:
                                th_sell = 0
                            # (缺少的资金, 净成交金额, 要求的大单金额, 计算得到的大单阈值金额, 人为设置的大单)
                            deal_big_money_info = radical_buy_data_manager.get_total_deal_big_order_info(
                                code, gpcode_manager.get_limit_up_price_as_num(code))
                            if deal_big_money_info[1] == 0 and len(codes) == 1:
                                # 总成交金额为0
                                deal_big_money_info = list(deal_big_money_info)
                                # 没有订阅L2会出现没有值的情况,如果涨停过就拉取之前的涨停买/卖大单
                                deal_big_orders_result = radical_buy_data_manager.request_deal_big_orders(code)
                                if deal_big_orders_result:
                                    buy_datas, sell_datas = deal_big_orders_result[0], deal_big_orders_result[1]
                                    limit_up_price = gpcode_manager.get_limit_up_price_as_num(code)
                                    limit_up_price_money_list = [x[0] for x in buy_datas if x[1] == limit_up_price]
                                    if limit_up_price_money_list:
                                        threshold_money = BeforeSubDealBigOrderManager().compute_re_limit_up_big_money_threshold(
                                            limit_up_price_money_list)
                                    else:
                                        threshold_money = 299e4
                                    logger_debug.info(f"{code}-临时回封均大单:{threshold_money}")
                                    # 设置买单阈值
                                    th_buy = threshold_money
                                    buy_money = sum(limit_up_price_money_list)
                                    sell_money = sum([x[0] for x in sell_datas if x[1] == limit_up_price])
                                    # 涨停买净成交金额净买额
                                    deal_big_money_info[1] = buy_money - sell_money
                                    # 要求的大单累计金额
                                    deal_big_money_info[
                                        2] = radical_buy_data_manager.compute_total_deal_big_order_threshold_money(code,
                                                                                                                   limit_up_price,
                                                                                                                   threshold_money)
                                    # logger_debug.info(f"{code}-累计大单阈值:{deal_big_money_info[2]}")
                                    # logger_debug.info(f"{code}非订阅大单, buy_money-{buy_money}, sell_money-{sell_money}")
                            big_money_rate = radical_buy_data_manager.TotalDealBigOrderInfoManager.get_big_order_rate(
                                code)
                            if not big_money_rate:
@@ -955,24 +993,36 @@
                                 output_util.money_desc(th_sell),  # 卖单阈值
                                 big_money_rate * 100  # 大单成交比
                                 ),
                                # 涨停大单净买入
                                output_util.money_desc(deal_big_money_info[1]),
                                output_util.money_desc(deal_big_money_info[2])]
                                # 累计总大单阈值
                                output_util.money_desc(deal_big_money_info[2]),
                                # 原累计大单阈值(非人为设置)
                                output_util.money_desc(deal_big_money_info[3]),
                                # 人为设置的大单
                                output_util.money_desc(deal_big_money_info[4]) if deal_big_money_info[4] else '',
                            ]
                            if len(codes) == 1:
                                # 加载大单详情
                                deal_big_order_detail_info = radical_buy_data_manager.get_l2_big_order_deal_info(code)
                                # 加载涨停大单详情
                                # 买大单, 上板前买大单, 卖大单, 上板前卖大单
                                limit_up_big_order_detail = radical_buy_data_manager.get_total_detal_big_order_details(
                                    code)
                                deal_big_order_info.append(
                                    output_util.money_desc(limit_up_big_order_detail[0] + limit_up_big_order_detail[1]))
                                deal_big_order_info.append(
                                    output_util.money_desc(limit_up_big_order_detail[2] + limit_up_big_order_detail[3]))
                                # 累计涨停买金额
                                buy_money = output_util.money_desc(limit_up_big_order_detail[0] + limit_up_big_order_detail[1])
                                sell_money = output_util.money_desc(limit_up_big_order_detail[2] + limit_up_big_order_detail[3])
                                deal_big_order_info.append(buy_money)
                                # 累计涨停卖金额
                                deal_big_order_info.append(sell_money)
                                deal_big_order_info.append(
                                    radical_buy_data_manager.TotalDealBigOrderInfoManager().get_big_order_rate(code))
                        except Exception as e:
                            logger_debug.error(f"可能没有获取到涨停价:{code}")
                            # logger_debug.exception(e)
                            if not gpcode_manager.get_limit_up_price(code):
                                init_data_util.re_set_price_pre(code)
                            logger_debug.exception(e)
                            deal_big_order_info = None
                        code_name = gpcode_manager.get_code_name(code)
                        fresults.append((code, code_name, deal_big_order_info, deal_big_order_detail_info))
@@ -983,6 +1033,13 @@
        elif url.path == "/get_all_special_codes":
            # 获取所有辨识度的代码
            code_blocks_dict = BlockSpecialCodesManager().get_code_blocks_dict()
            fdata = {}
            for k in code_blocks_dict:
                fdata[k] = list(code_blocks_dict[k])
            response_data = json.dumps({"code": 0, "data": fdata})
        elif url.path == "/get_new_blocks_special_codes":
            # 获取所有辨识度的代码
            code_blocks_dict = BlockSpecialCodesManager().get_temp_code_blocks_dict()
            fdata = {}
            for k in code_blocks_dict:
                fdata[k] = list(code_blocks_dict[k])
@@ -1017,7 +1074,42 @@
            self.__send_response(result_str)
    def __process_kpl_data(self, data_origin):
        def do_limit_up(result_list_):
            def request_new_blocks_codes(blocks_info, all_new_blocks):
                """
                请求新板块的代码
                @param blocks_info:[(板块名称,板块代码)]
                @return:
                """
                yesterday_codes = kpl_data_manager.get_yesterday_limit_up_codes()
                blocks = set()
                for bi in blocks_info:
                    if bi[0] in blocks:
                        continue
                    blocks.add(bi[0])
                    result = kpl_api.getCodesByPlate(bi[1])
                    result = json.loads(result)
                    code_info_list = []
                    for d in result["list"]:
                        if d[0] in yesterday_codes:
                            continue
                        # 涨幅要大于5%
                        rate = d[6] / int(round((tool.get_limit_up_rate(d[0]) - 1) * 10))
                        if rate / ((tool.get_limit_up_rate(d[0]) - 1) * 10) < 5:
                            continue
                        # 格式:(代码,涨幅)
                        code_info_list.append((d[0], d[6]))
                    # 保存新题材
                    datas = [(d[0], d[6]) for d in result["list"]]
                    async_log_util.info(logger_kpl_new_blocks, f"{(tool.get_thread_id(), bi, datas)}")
                    if code_info_list:
                        # 将代码加入新题材
                        new_block_processor.process_new_block_by_component_codes(bi[0],
                                                                                 set([x[0] for x in code_info_list]),
                                                                                 all_new_blocks)
            try:
                if result_list_:
                    # 保存涨停时间
@@ -1091,30 +1183,38 @@
                        pass
                    try:
                        records = KPLLimitUpDataRecordManager.total_datas
                        # 计算今日新增的题材概念
                        block_codes = {}
                        for x in records:
                            bs = {kpl_util.filter_block(x[2])}
                            if x[6]:
                                bs |= set(x[6].split("、"))
                            for b in bs:
                                if b not in block_codes:
                                    block_codes[b] = set()
                                block_codes[b].add(x[3])
                        reasons = set(block_codes.keys())
                        reasons -= constant.KPL_INVALID_BLOCKS
                        reasons -= LimitUpCodesBlockRecordManager().get_total_before_blocks()
                        if reasons:
                            for r in reasons:
                                for c in block_codes[r]:
                                    add_result = LimitUpCodesBlockRecordManager().add_new_blocks(c, r)
                                    if add_result:
                                        # 增加新题材是否成功, 临时将票加入辨识度
                                        BlockSpecialCodesManager().add_code_block_for_temp(c, r)
                        # 新题材
                        new_block_processor.process_limit_up_list({x[0]: x[5] for x in result_list_})
                        new_block_codes = new_block_processor.screen_new_blocks_with_limit_up_datas(
                            [(x[0], x[5]) for x in result_list_])
                        if new_block_codes:
                            # 统计板块的代码
                            records = KPLLimitUpDataRecordManager.total_datas
                            block_plate_code_dict = {}
                            for x in records:
                                block_plate_code_dict[kpl_util.filter_block(x[2])] = x[15]
                            # 新板块
                            update_new_block_plates = []
                            for b in new_block_codes:
                                for c in new_block_codes[b]:
                                    new_block_processor.process_new_block_by_limit_up_list(c, b)
                            for r in new_block_codes:
                                if r in block_plate_code_dict:
                                    update_new_block_plates.append((r, block_plate_code_dict[r]))
                            if update_new_block_plates:
                                # 需要获取板块下的代码
                                self.__new_blocks_codes_request_thread_pool.submit(
                                    lambda: request_new_blocks_codes(update_new_block_plates, new_block_codes.keys()))
                    except Exception as e:
                        logger_debug.exception(e)
                    # 将数据推送至其他项目
                    try:
                        low_suction_data_pusher.push_limit_up_list(result_list_)
                    except:
                        pass
                    self.__kplDataManager.save_data(type_, result_list_)
            except Exception as e:
                logger_debug.exception(e)
@@ -1194,11 +1294,10 @@
                RealTimeKplMarketData.set_market_jingxuan_out_blocks(result_list)
        elif type_ == KPLDataType.MARKET_STRONG.value:
            strong = data["data"]
            logger_debug.debug("开盘啦市场强度:{}", strong)
            logger_kpl_market_strong.info(strong)
            # 保存市场热度
            if strong is not None:
                RealTimeKplMarketData.set_market_strong(strong)
        return json.dumps({"code": 0})
    def __send_response(self, data):