admin
2025-06-04 287c506725b2d970f721f80169f83c2418cb0991
data_server.py
@@ -1,24 +1,21 @@
import http
import json
import random
import socketserver
import time
from http.server import BaseHTTPRequestHandler
import dask
from code_attribute import gpcode_manager
from log_module import log_analyse, log_export
from output import limit_up_data_filter, output_util, code_info_output
from log_module import log_analyse, log_export, log, request_log_util
from output import limit_up_data_filter, output_util
from output.limit_up_data_filter import IgnoreCodeManager
from third_data import kpl_util, kpl_data_manager, kpl_api
from third_data.code_plate_key_manager import KPLPlateForbiddenManager
from third_data.kpl_data_manager import KPLLimitUpDataRecordManager, KPLDataManager, KPLCodeLimitUpReasonManager
from third_data.kpl_util import KPLPlatManager, KPLDataType
from trade import trade_manager
from trade.l2_trade_util import BlackListCodeManager
from utils import tool, global_util, kp_client_msg_manager, hosting_api_util
from utils.history_k_data_util import HistoryKDatasUtils
from utils import tool, global_util, hosting_api_util
import urllib.parse as urlparse
from urllib.parse import parse_qs
@@ -45,6 +42,17 @@
        if not total_datas:
            KPLLimitUpDataRecordManager.load_total_datas()
            total_datas = KPLLimitUpDataRecordManager.total_datas
        current_datas_results = hosting_api_util.common_request({"ctype": "get_kpl_limit_up_datas"})
        if type(current_datas_results) == str:
            current_datas_results = json.loads(current_datas_results)
        current_datas = current_datas_results.get("data")  # KPLLimitUpDataRecordManager.latest_origin_datas
        current_block_codes = {}
        for c in current_datas:
            if c[5] not in current_block_codes:
                current_block_codes[c[5]] = set()
            current_block_codes[c[5]].add(c[0])
        # 通过涨停时间排序
        total_datas = list(total_datas)
@@ -64,12 +72,16 @@
                limit_up_reason_want_count_dict[d[2]] = 0
            if d[3] in want_codes:
                limit_up_reason_want_count_dict[d[2]] += 1
        # (板块名称,涨停代码数量,想买单数量,涨停时间)
        # (板块名称,涨停代码数量,炸板数量,想买单数量,涨停时间)
        limit_up_reason_statistic_info = [
            (k, len(limit_up_reason_dict[k]), limit_up_reason_want_count_dict.get(k), limit_up_reason_dict[k][0][5]) for
            (k, len(limit_up_reason_dict[k]),
             len(limit_up_reason_dict[k]) - (len(current_block_codes[k]) if k in current_block_codes else 0),
             limit_up_reason_want_count_dict.get(k), limit_up_reason_dict[k][0][5]) for
            k in
            limit_up_reason_dict]
        limit_up_reason_statistic_info.sort(key=lambda x: int(x[3]))
        limit_up_reason_statistic_info.sort(
            key=lambda x: len(current_block_codes.get(x[0])) if x[0] in current_block_codes else 0)
        limit_up_reason_statistic_info.reverse()
        codes_set = set([d[3] for d in total_datas])
        # 判断是龙几,判断是否涨停,判断是否炸板,加载分数
@@ -259,28 +271,37 @@
    def do_GET(self):
        path = self.path
        url = urlparse.urlparse(path)
        if url.path == "/kpl/get_limit_up_list":
            response_data = self.__get_limit_up_list()
            self.send_response(200)
            # 发给请求客户端的响应数据
            self.send_header('Content-type', 'application/json')
            self.end_headers()
            self.wfile.write(response_data.encode())
        else:
            ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()])
            result = hosting_api_util.get_from_data_server(url.path, ps_dict)
            self.__send_response(result)
        request_log_util.request_info("DATA_SERVER_GET", f"GET 请求开始:{url.path}")
        try:
            if url.path == "/kpl/get_limit_up_list":
                response_data = self.__get_limit_up_list()
                self.send_response(200)
                # 发给请求客户端的响应数据
                self.send_header('Content-type', 'application/json')
                self.end_headers()
                self.wfile.write(response_data.encode())
            else:
                ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()])
                result = hosting_api_util.get_from_data_server(url.path, ps_dict)
                self.__send_response(result)
        finally:
            request_log_util.request_info("DATA_SERVER_GET", f"GET 请求结束")
    def do_POST(self):
        path = self.path
        url = urlparse.urlparse(path)
        if url.path == "/upload_kpl_data":
            # 接受开盘啦数据
            params = self.__parse_request()
            result_str = self.__process_kpl_data(params)
            self.__send_response(result_str)
        request_log_util.request_info("DATA_SERVER_POST", f"POST 请求开始:{url.path}")
        try:
            if url.path == "/upload_kpl_data":
                # 接受开盘啦数据
                params = self.__parse_request()
                result_str = self.__process_kpl_data(params)
                self.__send_response(result_str)
        finally:
            request_log_util.request_info("DATA_SERVER_POST", f"POST 请求结束")
    def __process_kpl_data(self, data):
        data = json.loads(json.dumps(data).replace("概念", ""))
        type_ = data["type"]
        print("开盘啦type:", type_)
        if type_ == KPLDataType.BIDDING.value: