Administrator
2025-05-29 ca869a91c5acc28c1fa9e34be658ba355572c380
servers/data_server.py
@@ -3,21 +3,30 @@
import logging
import socketserver
import time
import urllib
from http.server import BaseHTTPRequestHandler
import dask
import requests
from code_attribute.gpcode_manager import BlackListCodeManager
import constant
import inited_data
from code_attribute.gpcode_manager import BlackListCodeManager, HumanRemoveForbiddenManager
from l2.huaxin import huaxin_target_codes_manager
from l2.l2_transaction_data_manager import HuaXinBuyOrderManager
from log_module.log import logger_system, logger_debug, logger_kpl_limit_up, logger_request_api
from log_module.log import logger_system, logger_debug, logger_kpl_limit_up, logger_request_api, \
    logger_kpl_market_strong, logger_kpl_new_blocks
from third_data.custom_block_in_money_manager import CodeInMoneyManager
from third_data.kpl_data_constant import LimitUpCodesBlockRecordManager, LimitUpDataConstant
from third_data.kpl_data_constant import LimitUpCodesBlockRecordManager, LimitUpDataConstant, \
    ContainsLimitupCodesBlocksManager
from third_data.kpl_limit_up_data_manager import LatestLimitUpBlockManager, CodeLimitUpSequenceManager
from third_data.third_blocks_manager import BlockMapManager
from trade.buy_radical import radical_buy_data_manager, new_block_processor
from trade.buy_radical.block_special_codes_manager import BlockSpecialCodesManager
from trade.buy_radical.new_block_processor import BeforeBlocksComputer
from trade.buy_strategy import OpenLimitUpGoodBlocksBuyStrategy
from trade.buy_radical.radical_buy_data_manager import RadicalBuyBlockManager
from utils import global_util, tool, data_export_util
from code_attribute import gpcode_manager
from trade.buy_radical.radical_buy_data_manager import RadicalBuyBlockManager, BeforeSubDealBigOrderManager
from utils import global_util, tool, data_export_util, init_data_util
from code_attribute import gpcode_manager, code_nature_analyse
from log_module import log_analyse, log_export, async_log_util
from l2 import code_price_manager, l2_data_util, transaction_progress
from cancel_strategy.s_l_h_cancel_strategy import HourCancelBigNumComputer, LCancelRateManager
@@ -33,7 +42,7 @@
from output import code_info_output, limit_up_data_filter, output_util, kp_client_msg_manager
from trade import bidding_money_manager, trade_manager, l2_trade_util, trade_record_log_util, trade_constant, \
    trade_data_manager
    trade_data_manager, current_price_process_manager
import concurrent.futures
# 禁用http.server的日志输出
@@ -47,7 +56,6 @@
    __IgnoreCodeManager = IgnoreCodeManager()
    __KPLPlatManager = KPLPlatManager()
    __KPLCodeLimitUpReasonManager = KPLCodeLimitUpReasonManager()
    __KPLPlateForbiddenManager = KPLPlateForbiddenManager()
    # 历史板块
    __history_plates_dict = {}
    # 板块
@@ -57,6 +65,8 @@
    __industry_cache_dict = {}
    __latest_limit_up_codes_set = set()
    __data_process_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=10)
    # 新题材请求
    __new_blocks_codes_request_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=5)
    # 代码的涨幅
    __code_limit_rate_dict = {}
@@ -70,6 +80,8 @@
        records = LimitUpDataConstant.history_limit_up_datas
        if not currents:
            currents = self.__kplDataManager.get_data(KPLDataType.LIMIT_UP)
        if currents is None:
            currents = []
        # 获取历史涨停
        if not records:
            KPLLimitUpDataRecordManager.load_total_datas()
@@ -97,10 +109,19 @@
                if b not in current_reason_dict:
                    current_reason_dict[b] = []
                current_reason_dict[b].append(_code)
        # (板块名称,涨停代码数量,炸板数量,涨停时间)
        limit_up_reason_statistic_info = [(k, len(record_reason_dict[k]), len(record_reason_dict[k]) - len(
        # (板块名称,涨停代码数量,炸板数量,涨停时间, 是否有辨识度的票)
        limit_up_reason_statistic_info = [[k, len(record_reason_dict[k]), len(record_reason_dict[k]) - len(
            current_reason_dict.get(k) if k in current_reason_dict else []),
                                           0) for k in record_reason_dict]
                                           0, 0] for k in record_reason_dict]
        try:
            for b in limit_up_reason_statistic_info:
                codes_ = BlockSpecialCodesManager().get_block_codes(b[0])
                if not codes_:
                    codes_ = set()
                b[4] = len(set(record_reason_dict[b[0]]) & set(codes_))
        except:
            pass
        limit_up_reason_statistic_info.sort(key=lambda x: x[1] - x[2])
        limit_up_reason_statistic_info.reverse()
@@ -422,7 +443,7 @@
                # 精选,主力净额顺序
                result = kpl_api.getMarketJingXuanRealRankingInfo(False)
                result = kpl_util.parseMarketJingXuan(result)
            forbidden_plates = self.__KPLPlateForbiddenManager.list_all()
            forbidden_plates = KPLPlateForbiddenManager().list_all()
            fresult = []
            for d in result:
                if type_ == 2 or type_ == 3:
@@ -440,11 +461,30 @@
            self.__IgnoreCodeManager.ignore_code(type_, code)
            response_data = json.dumps({"code": 0})
        elif url.path == "/kpl/forbidden_plate":
            # 添加不能买的板块
            ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()])
            plate = ps_dict["plate"]
            # 加入禁止
            self.__KPLPlateForbiddenManager.save_plate(plate)
            KPLPlateForbiddenManager().save_plate(plate)
            response_data = json.dumps({"code": 0})
        elif url.path == "/kpl/del_forbidden_plate":
            # 删除不能买的板块
            ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()])
            plate = ps_dict["plate"]
            # 加入禁止
            KPLPlateForbiddenManager().delete_plate(plate)
            response_data = json.dumps({"code": 0})
        elif url.path == "/kpl/list_forbidden_plate":
            # 不能买的板块列表
            results = KPLPlateForbiddenManager().list_all_cache()
            response_data = json.dumps({"code": 0, "data": list(results)})
        elif url.path == "/kpl/list_deleted_forbidden_plate":
            # 获取已经删除的板块
            results = KPLPlateForbiddenManager().list_all_deleted_cache()
            if results:
                results -= KPLPlateForbiddenManager().list_all_cache()
            response_data = json.dumps({"code": 0, "data": list(results)})
        elif url.path == "/kpl/get_plate_codes":
            # 获取涨停原因下面的代码
            ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()])
@@ -533,21 +573,42 @@
                    continue
                if not tool.is_can_buy_code(d[3]):
                    continue
                # 代码,名称,涨停时间,是否炸板,是否想买,是否已经下过单,涨停时间,自由流通市值,是否在黑名单里面, 是否有辨识度, 大单净额
                # 代码,名称,涨停时间,是否炸板,是否加绿,是否已经下过单,涨停时间,自由流通市值,是否在黑名单里面, 是否有辨识度, 大单净额, 是否加想
                codes_info.append(
                    [d[3], d[4], tool.to_time_str(int(d[5])), 1 if d[3] not in now_limit_up_codes else 0, 0, 0, d[12],
                     output_util.money_desc(d[13]), 1, 1 if l2_trade_util.is_in_forbidden_trade_codes(d[3]) else 0, 1 if d[3] in special_codes else 0, CodeInMoneyManager().get_money(d[3])])
                    [d[3],
                     d[4],
                     tool.to_time_str(int(d[5])),
                     1 if d[3] not in now_limit_up_codes else 0,
                     0,
                     0,
                     d[12],
                     output_util.money_desc(d[13]),
                     1,
                     1 if l2_trade_util.is_in_forbidden_trade_codes(d[3]) else 0,
                     1 if d[3] in special_codes else 0,
                     CodeInMoneyManager().get_money(d[3]),
                     0])
            codes_info.sort(key=lambda x: x[2])
            # 查询是否为想买单
            green_codes = gpcode_manager.GreenListCodeManager().list_codes_cache()
            want_codes = gpcode_manager.WantBuyCodesManager().list_code_cache()
            for code_info in codes_info:
                code_info[4] = 1 if code_info[0] in green_codes else 0
                code_info[12] = 1 if code_info[0] in want_codes else 0
                # 获取代码状态
                if trade_manager.CodesTradeStateManager().get_trade_state_cache(
                        code_info[0]) != trade_constant.TRADE_STATE_NOT_TRADE:
                    code_info[5] = 1
            response_data = json.dumps({"code": 0, "data": codes_info})
            # 涨停数据
            fdatas = {"limit_up_list": codes_info}
            # 辨识度票
            fdatas["speical_codes"] = [(x, gpcode_manager.get_code_name(x)) for x in special_codes]
            forbidden_refer_codes = KPLPlateForbiddenManager().get_watch_high_codes_by_block(plate)
            if forbidden_refer_codes is None:
                forbidden_refer_codes = set()
            fdatas["forbidden_refer_codes"] = [(x, gpcode_manager.get_code_name(x)) for x in forbidden_refer_codes]
            response_data = json.dumps({"code": 0, "data": fdatas})
        elif url.path == "/kpl/get_open_limit_up_count_rank":
            # 获取炸板次数排行
            ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()])
@@ -749,7 +810,9 @@
                deals_month = trade_data_manager.AccountMoneyManager().get_deal_count_info(start_date, end_date)
                cost_month = sum([round(0.1 * x[1], 2) for x in delegates_month])
                make_month = 0
                make_month += 5 * deals_month[0][1] + 1 * deals_month[1][1] + 0 * deals_month[2][1]
                make_month += max(1 * deals_month[0][1] if deals_month[0][1] else 0,
                                  deals_month[0][2] * 1.854 / 10000 if deals_month[0][2] else 0) + 1 * deals_month[1][
                                  1] + 0 * deals_month[2][1]
                fdata["month_commission"] = round(make_month - cost_month, 2)
                # 计算当日手续费详情
                delegates = trade_data_manager.AccountMoneyManager().get_delegated_count_info()
@@ -760,13 +823,213 @@
                fdata["delegates"]["sell"] = delegates[3]
                deals = trade_data_manager.AccountMoneyManager().get_deal_count_info()
                fdata["deals"] = {}
                fdata["deals"]["stock"] = {"count": deals[0][1], "price": 5, "money": round(5 * deals[0][1], 2)}
                fdata["deals"]["stock"] = {"count": deals[0][1], "price": 1, "money": round(1 * deals[0][1], 2)}
                fdata["deals"]["sh_cb"] = {"count": deals[1][1], "price": 1, "money": round(1 * deals[1][1], 2)}
                fdata["deals"]["sz_cb"] = {"count": deals[2][1], "price": 0, "money": round(0 * deals[2][1], 2)}
                fdata["commission"] = trade_data_manager.AccountMoneyManager().get_commission_cache()
                response_data = json.dumps({"code": 0, "data": fdata})
            except Exception as e:
                logger_debug.exception(e)
        elif url.path == "/get_place_order_records":
            # 获取下单记录
            ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()])
            try:
                day = ps_dict.get("day")
                if not day:
                    day = tool.get_now_date_str()
                records = log_export.load_trade_recod_by_type("place_order", date=day)
                fdata = []
                for record in records:
                    print(record)
                    # (下单时间, 代码, 名称, 下单模式, 板块信息)
                    fdata.append((record[0], record[1], gpcode_manager.get_code_name(record[1]), record[3]["mode_desc"],
                                  record[3].get("block_info")))
                response_data = json.dumps({"code": 0, "data": fdata})
            except:
                pass
        elif url.path == "/get_blocks_in_money_info":
            # 获取板块资金流入状况
            ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()])
            type_ = int(ps_dict.get("type"))
            try:
                fdatas = []
                if type_ == 0:
                    in_blocks = RealTimeKplMarketData().get_top_market_jingxuan_blocks()
                    if not in_blocks:
                        in_blocks = set()
                    fdatas = RealTimeKplMarketData.top_in_list_cache
                    if not fdatas:
                        datas = self.__kplDataManager.get_data(KPLDataType.JINGXUAN_RANK)
                        fdatas = datas
                    # 返回是否在流入前几
                    temp_datas = []
                    for d in fdatas:
                        temp = list(d)
                        if d[1] in in_blocks:
                            temp.append(1)
                        else:
                            temp.append(0)
                        if in_blocks and d[1] == in_blocks[-1]:
                            temp.append(RealTimeKplMarketData.get_market_strong())
                        else:
                            temp.append(0)
                        temp_datas.append(temp)
                    fdatas = temp_datas
                elif type_ == 1:
                    out_blocks = RealTimeKplMarketData().get_top_market_jingxuan_out_blocks()
                    if not out_blocks:
                        out_blocks = set()
                    fdatas = RealTimeKplMarketData.top_out_list_cache
                    if not fdatas:
                        datas = self.__kplDataManager.get_data(KPLDataType.JINGXUAN_RANK_OUT)
                        fdatas = datas
                    # 返回是否在流入前几
                    temp_datas = []
                    for d in fdatas:
                        temp = list(d)
                        if d[1] in out_blocks:
                            temp.append(1)
                        else:
                            temp.append(0)
                        temp_datas.append(temp)
                    fdatas = temp_datas
                response_data = json.dumps({"code": 0, "data": fdatas})
            except:
                pass
        elif url.path == "/get_block_codes_with_money":
            # 获取板块资金流入状况
            ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()])
            block = ps_dict.get("block")
            # 是否倒序排
            desc = int(ps_dict.get("desc"))
            try:
                response_data = requests.get(
                    "http://127.0.0.1:9005/get_block_codes_money?block=" + urllib.parse.quote(block))
                r_str = response_data.text
                response_data = json.loads(r_str)
                if response_data["code"] == 0:
                    datas = response_data["data"]
                    fdatas = []
                    for d in datas:
                        # (代码, 名称, 流入金额, 是否被排除成分股)
                        fdatas.append((d[0], gpcode_manager.get_code_name(d[0]), d[1], d[2]))
                    if desc:
                        fdatas.sort(key=lambda x: x[2], reverse=True)
                    else:
                        fdatas.sort(key=lambda x: x[2])
                    fdatas = fdatas[:50]
                    response_data = json.dumps({"code": 0, "data": fdatas})
                else:
                    response_data = r_str
            except  Exception as e:
                response_data = json.dumps({"code": 1, "data": str(1)})
        elif url.path == "/get_l2_subscript_codes":
            # 获取L2订阅的代码
            ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()])
            code = ps_dict.get('code')
            fresults = []
            try:
                if code:
                    codes = [code]
                else:
                    codes = huaxin_target_codes_manager.HuaXinL2SubscriptCodesManager.get_subscript_codes()
                if codes:
                    for code in codes:
                        deal_big_order_detail_info = None
                        try:
                            # 获取成交大单:(参考大单金额,已成交大单金额,大单要求金额, 已成交涨停买金额, 已成交涨停卖金额)
                            th_temp_buy_info = BeforeSubDealBigOrderManager().get_temp_deal_big_order_threshold_info(
                                code)
                            th_buy, th_buy_default = BeforeSubDealBigOrderManager().get_big_order_threshold(code)
                            try:
                                th_sell = BeforeSubDealBigOrderManager().get_big_sell_order_threshold(code)
                            except:
                                th_sell = 0
                            deal_big_money_info = radical_buy_data_manager.get_total_deal_big_order_info(
                                code, gpcode_manager.get_limit_up_price_as_num(code))
                            if deal_big_money_info[1] == 0 and len(codes) == 1:
                                deal_big_money_info = list(deal_big_money_info)
                                # 没有订阅L2会出现没有值的情况,如果涨停过就拉取之前的涨停买/卖大单
                                deal_big_orders_result = radical_buy_data_manager.request_deal_big_orders(code)
                                if deal_big_orders_result:
                                    buy_datas, sell_datas = deal_big_orders_result[0], deal_big_orders_result[1]
                                    limit_up_price = gpcode_manager.get_limit_up_price_as_num(code)
                                    limit_up_price_money_list = [x[0] for x in buy_datas if x[1] == limit_up_price]
                                    threshold_money = BeforeSubDealBigOrderManager.compute_re_limit_up_big_money_threshold(
                                        limit_up_price_money_list)
                                    # 设置买单阈值
                                    th_buy = threshold_money
                                    buy_money = sum(limit_up_price_money_list)
                                    sell_money = sum([x[0] for x in sell_datas if x[1] == limit_up_price])
                                    # 涨停大单净买入
                                    deal_big_money_info[1] = buy_money - sell_money
                                    deal_big_money_info[
                                        2] = radical_buy_data_manager.compute_total_deal_big_order_threshold_money(code,
                                                                                                                   limit_up_price,
                                                                                                                   threshold_money)
                            big_money_rate = radical_buy_data_manager.TotalDealBigOrderInfoManager.get_big_order_rate(
                                code)
                            if not big_money_rate:
                                big_money_rate = 0
                            # 大单成交信息
                            deal_big_order_info = [
                                (output_util.money_desc(th_temp_buy_info[0] if th_temp_buy_info else 0),  # 临时买大单阈值
                                 output_util.money_desc(th_buy),  # 买大单阈值
                                 output_util.money_desc(th_sell),  # 卖单阈值
                                 big_money_rate * 100  # 大单成交比
                                 ),
                                output_util.money_desc(deal_big_money_info[1]),
                                output_util.money_desc(deal_big_money_info[2]),
                                output_util.money_desc(deal_big_money_info[3]),
                            ]
                            if len(codes) == 1:
                                # 加载大单详情
                                deal_big_order_detail_info = radical_buy_data_manager.get_l2_big_order_deal_info(code)
                                # 加载涨停大单详情
                                limit_up_big_order_detail = radical_buy_data_manager.get_total_detal_big_order_details(
                                    code)
                                if max(limit_up_big_order_detail) == 0:
                                    # 没有数据,从网络加载
                                    limit_up_big_order_detail = list(limit_up_big_order_detail)
                                    limit_up_big_order_detail[1] = deal_big_order_detail_info[1][0]
                                    limit_up_big_order_detail[3] = deal_big_order_detail_info[2][0]
                                deal_big_order_info.append(
                                    output_util.money_desc(limit_up_big_order_detail[0] + limit_up_big_order_detail[1]))
                                deal_big_order_info.append(
                                    output_util.money_desc(limit_up_big_order_detail[2] + limit_up_big_order_detail[3]))
                                deal_big_order_info.append(
                                    radical_buy_data_manager.TotalDealBigOrderInfoManager().get_big_order_rate(code))
                        except Exception as e:
                            logger_debug.error(f"可能没有获取到涨停价:{code}")
                            if not gpcode_manager.get_limit_up_price(code):
                                init_data_util.re_set_price_pre(code)
                            logger_debug.exception(e)
                            deal_big_order_info = None
                        code_name = gpcode_manager.get_code_name(code)
                        fresults.append((code, code_name, deal_big_order_info, deal_big_order_detail_info))
                response_data = json.dumps({"code": 0, "data": fresults})
            except Exception as e:
                response_data = json.dumps({"code": 1, "data": str(1)})
        elif url.path == "/get_all_special_codes":
            # 获取所有辨识度的代码
            code_blocks_dict = BlockSpecialCodesManager().get_code_blocks_dict()
            fdata = {}
            for k in code_blocks_dict:
                fdata[k] = list(code_blocks_dict[k])
            response_data = json.dumps({"code": 0, "data": fdata})
        elif url.path == "/get_new_blocks_special_codes":
            # 获取所有辨识度的代码
            code_blocks_dict = BlockSpecialCodesManager().get_temp_code_blocks_dict()
            fdata = {}
            for k in code_blocks_dict:
                fdata[k] = list(code_blocks_dict[k])
            response_data = json.dumps({"code": 0, "data": fdata})
        async_log_util.info(logger_request_api, f"结束请求{tool.get_thread_id()}-{url}")
        self.send_response(200)
@@ -793,11 +1056,46 @@
                    CodeInMoneyManager().set_money(code, d[code])
            except Exception as e:
                logging.exception(e)
            result_str =json.dumps({"code": 0})
            result_str = json.dumps({"code": 0})
            self.__send_response(result_str)
    def __process_kpl_data(self, data_origin):
        def do_limit_up(result_list_):
            def request_new_blocks_codes(blocks_info, all_new_blocks):
                """
                请求新板块的代码
                @param blocks_info:[(板块名称,板块代码)]
                @return:
                """
                yesterday_codes = kpl_data_manager.get_yesterday_limit_up_codes()
                blocks = set()
                for bi in blocks_info:
                    if bi[0] in blocks:
                        continue
                    blocks.add(bi[0])
                    result = kpl_api.getCodesByPlate(bi[1])
                    result = json.loads(result)
                    code_info_list = []
                    for d in result["list"]:
                        if d[0] in yesterday_codes:
                            continue
                        # 涨幅要大于5%
                        rate = d[6] / int(round((tool.get_limit_up_rate(d[0]) - 1) * 10))
                        if rate / ((tool.get_limit_up_rate(d[0]) - 1) * 10) < 5:
                            continue
                        # 格式:(代码,涨幅)
                        code_info_list.append((d[0], d[6]))
                    # 保存新题材
                    datas = [(d[0], d[6]) for d in result["list"]]
                    async_log_util.info(logger_kpl_new_blocks, f"{(tool.get_thread_id(), bi, datas)}")
                    if code_info_list:
                        # 将代码加入新题材
                        new_block_processor.process_new_block_by_component_codes(bi[0],
                                                                                 set([x[0] for x in code_info_list]),
                                                                                 all_new_blocks)
            try:
                if result_list_:
                    # 保存涨停时间
@@ -861,6 +1159,7 @@
                        pass
                    try:
                        CodeLimitUpSequenceManager().set_current_limit_up_datas(result_list_)
                        ContainsLimitupCodesBlocksManager().set_current_limit_up_datas(result_list_)
                    except:
                        pass
                    try:
@@ -869,6 +1168,32 @@
                    except:
                        pass
                    try:
                        # 新题材
                        new_block_processor.process_limit_up_list({x[0]: x[5] for x in result_list_})
                        new_block_codes = new_block_processor.screen_new_blocks_with_limit_up_datas(
                            [(x[0], x[5]) for x in result_list_])
                        if new_block_codes:
                            # 统计板块的代码
                            records = KPLLimitUpDataRecordManager.total_datas
                            block_plate_code_dict = {}
                            for x in records:
                                block_plate_code_dict[kpl_util.filter_block(x[2])] = x[15]
                            # 新板块
                            update_new_block_plates = []
                            for b in new_block_codes:
                                for c in new_block_codes[b]:
                                    new_block_processor.process_new_block_by_limit_up_list(c, b)
                            for r in new_block_codes:
                                if r in block_plate_code_dict:
                                    update_new_block_plates.append((r, block_plate_code_dict[r]))
                            if update_new_block_plates:
                                # 需要获取板块下的代码
                                self.__new_blocks_codes_request_thread_pool.submit(
                                    lambda: request_new_blocks_codes(update_new_block_plates, new_block_codes.keys()))
                    except Exception as e:
                        logger_debug.exception(e)
                    self.__kplDataManager.save_data(type_, result_list_)
            except Exception as e:
                logger_debug.exception(e)
@@ -946,6 +1271,12 @@
            if result_list:
                self.__kplDataManager.save_data(type_, result_list)
                RealTimeKplMarketData.set_market_jingxuan_out_blocks(result_list)
        elif type_ == KPLDataType.MARKET_STRONG.value:
            strong = data["data"]
            logger_kpl_market_strong.info(strong)
            # 保存市场热度
            if strong is not None:
                RealTimeKplMarketData.set_market_strong(strong)
        return json.dumps({"code": 0})
    def __send_response(self, data):