Administrator
2023-05-15 045a5aa6434da6e83c3d850b17e7e58cd7b55ef5
third_data/data_server.py
@@ -5,20 +5,229 @@
import time
from http.server import BaseHTTPRequestHandler
import cv2
import dask
import global_util
import gpcode_manager
import log
import tool
from l2 import code_price_manager
from third_data import kpl_util, kpl_data_manager
from third_data.kpl_data_manager import KPLDataManager
from output.limit_up_data_filter import IgnoreCodeManager
from third_data import kpl_util, kpl_data_manager, kpl_api
from third_data.code_plate_key_manager import RealTimeKplMarketData
from third_data.kpl_data_manager import KPLDataManager, KPLLimitUpDataRecordManager, KPLPlatManager, \
    KPLCodeLimitUpReasonManager
from third_data.kpl_util import KPLDataType
import urllib.parse as urlparse
from urllib.parse import parse_qs
from output import code_info_output, limit_up_data_filter, output_util
from trade import bidding_money_manager
from trade import bidding_money_manager, trade_manager
from trade.l2_trade_util import BlackListCodeManager
class DataServer(BaseHTTPRequestHandler):
    ocr_temp_data = {}
    __kplDataManager = KPLDataManager()
    __IgnoreCodeManager = IgnoreCodeManager()
    __KPLPlatManager = KPLPlatManager()
    __KPLCodeLimitUpReasonManager = KPLCodeLimitUpReasonManager()
    # 历史板块
    __history_plates_dict = {}
    # 板块
    __blocks_dict = {}
    def __get_limit_up_list(self):
        # 统计目前为止的代码涨停数量(分涨停原因)
        now_limit_up_codes_info = self.__kplDataManager.get_data(KPLDataType.LIMIT_UP)
        limit_up_reason_dict = {}
        for d in now_limit_up_codes_info:
            if d[5] not in limit_up_reason_dict:
                limit_up_reason_dict[d[5]] = [0, 0]
            limit_up_reason_dict[d[5]][0] += 1
        # 获取想买原因想买单的代码数量
        reason_map = self.__KPLCodeLimitUpReasonManager.list_all()
        want_codes = gpcode_manager.WantBuyCodesManager.list_code()
        # 其他想买单
        other_count = 0
        for k in reason_map:
            reson = reason_map[k]
            if k in want_codes and reson in limit_up_reason_dict:
                limit_up_reason_dict[reson][1] += 1
            elif k in want_codes:
                other_count += 1
        limit_up_reason_statistic_info = [(k, limit_up_reason_dict[k][0], limit_up_reason_dict[k][1]) for k in
                                          limit_up_reason_dict]
        limit_up_reason_statistic_info.sort(key=lambda x: x[1])
        limit_up_reason_statistic_info.reverse()
        if other_count > 0:
            limit_up_reason_statistic_info.insert(0, ('其他', other_count, other_count))
        total_datas = KPLLimitUpDataRecordManager.total_datas
        if not total_datas:
            KPLLimitUpDataRecordManager.load_total_datas()
            total_datas = KPLLimitUpDataRecordManager.total_datas
        # 通过涨停时间排序
        total_datas = list(total_datas)
        codes_set = set([d[3] for d in total_datas])
        # 判断是龙几,判断是否涨停,判断是否炸板,加载分数
        rank_dict = limit_up_data_filter.get_limit_up_time_rank_dict(total_datas)
        limit_up_dict, limit_up_codes, open_limit_up_codes = limit_up_data_filter.get_limit_up_info(codes_set)
        score_dict = limit_up_data_filter.get_codes_scores_dict(codes_set)
        fresult = []
        ignore_codes = self.__IgnoreCodeManager.list_ignore_codes("1")
        total_datas.sort(key=lambda x: int(x[5]))
        total_datas.reverse()
        # 获取涨停原因变化记录
        reason_changes = log.load_kpl_reason_changes()
        reason_changes.reverse()
        reason_changes_dict = {}
        for r in reason_changes:
            if r[0] not in reason_changes_dict:
                reason_changes_dict[r[0]] = r[1]
        for d in total_datas:
            code = d[3]
            # (代码, 名称, 涨停状态(0 - 无状态 1-涨停 2-炸板), 龙几, 首板, 分值, 涨停时间, 原因, 相同原因代码数量, 自由流通, 涨停原因是否变化)
            limit_up_state = 0
            if code in limit_up_dict:
                if limit_up_dict[code][0]:
                    limit_up_state = 1
                elif limit_up_dict[code][1]:
                    limit_up_state = 2
            score = ""
            if code in score_dict:
                score = score_dict[code]
            if code in ignore_codes:
                continue
            fresult.append((code, d[4], limit_up_state, f"龙{rank_dict.get(code)}", d[12], score,
                            output_util.time_format(int(d[5])), d[2], d[10], output_util.money_desc(d[13]),
                            reason_changes_dict.get(code)))
        response_data = json.dumps({"code": 0, "data": {"limit_up_count": len(limit_up_codes),
                                                        "open_limit_up_count": len(open_limit_up_codes),
                                                        "limit_up_reason_statistic": limit_up_reason_statistic_info,
                                                        "limit_up_codes": fresult}})
        return response_data
    def __get_plate_info(self, ps_dict):
        @dask.delayed
        def kpl_getStockIDPlate(code_):
            temp_data = kpl_api.getStockIDPlate(code_)
            return temp_data
        @dask.delayed
        def kpl_getSonPlate(plate_code_):
            if not plate_code:
                return None
            temp_data = kpl_api.getSonPlate(plate_code_)
            return temp_data
        @dask.delayed
        def kpl_getCodesByPlate(plate_code_):
            if not plate_code:
                return None
            temp_data = kpl_api.getCodesByPlate(plate_code_)
            return temp_data
        @dask.delayed
        def request_data(f1_, f2_):
            temp_data = f1_, f2_
            return temp_data
        # 获取板块的代码
        fresult = {}
        code = ps_dict["code"]
        code_info = KPLLimitUpDataRecordManager.list_by_code(code, tool.get_now_date_str())[0]
        hot_block_name = code_info[2]
        plate_code = self.__KPLPlatManager.get_plat(hot_block_name)
        f1 = kpl_getStockIDPlate(code)
        # f2 = kpl_getSonPlate(plate_code)
        f3 = kpl_getCodesByPlate(plate_code)
        dask_result = request_data(f1, f3)
        plate_info, codes_by_plate_info = dask_result.compute()
        if plate_info:
            plate_info.sort(key=lambda x: x[2])
            plate_info.reverse()
            fresult["plate"] = plate_info
        # 获取代码的历史涨停数据,(涨停原因,日期,板块)
        fresult["code_records"] = KPLLimitUpDataRecordManager.get_latest_infos(code, 4, False)[:2]
        # 获取今日数据
        fresult["today"] = (code_info[2], code_info[1], code_info[6])
        fresult["industry"] = global_util.code_industry_map.get(code)
        if plate_code:
            # 获取强度
            # datas = son_plate_info
            # # (代码,名称,强度)
            # temp = kpl_util.parseSonPlat(datas)
            # temp.sort(key=lambda x: x[2])
            # temp.reverse()
            # fresult["plat_strength"] = temp
            # 获取涨停原因下面的列表
            datas = codes_by_plate_info
            # (代码,名称,现价,涨幅,自由流通,几板,龙几,主力净额,300w净额,机构增仓)
            temps = kpl_util.parsePlateCodes(datas)
            # --数据准备开始--
            codes_set = set([d[0] for d in temps])
            limit_up_dict, limit_up_codes, open_limit_up_codes = limit_up_data_filter.get_limit_up_info(codes_set)
            score_dict = limit_up_data_filter.get_codes_scores_dict(codes_set)
            want_codes = gpcode_manager.WantBuyCodesManager.list_code()
            black_codes = BlackListCodeManager.list_codes()
            total_datas = KPLLimitUpDataRecordManager.total_datas
            code_info_dict = {}
            for val in total_datas:
                code_info_dict[val[3]] = val
            # --数据准备结束--
            ignore_codes = self.__IgnoreCodeManager.list_ignore_codes("2")
            # 最终结果:(代码,名称,涨停状态(0-无状态 1-涨停 2-炸板),龙几,首板,分值,涨停时间,原因,相同原因代码数量,自由流通,涨停原因是否变化,涨幅,现价,黑名单,想买单,主力净值,300w,)
            codes_info_list = []
            for t in temps:
                code = t[0]
                limit_up_state = 0
                if code in limit_up_dict:
                    if limit_up_dict[code][0]:
                        limit_up_state = 1
                    elif limit_up_dict[code][1]:
                        limit_up_state = 2
                score = ""
                if code in score_dict:
                    score = score_dict[code]
                limit_up_time = ''
                if code in code_info_dict:
                    limit_up_time = output_util.time_format(code_info_dict[code][5])
                final_code_info = {"code_info": (
                    t[0], t[1], limit_up_state, t[6], t[5], score, limit_up_time,
                    code_info[2], code_info[10], output_util.money_desc(t[4]), 0, t[3], t[2],
                    "黑名单" if code in black_codes else "", "想买单" if code in want_codes else "",
                    output_util.money_desc(t[7]), output_util.money_desc(t[8]), output_util.money_desc(t[9]))}
                if code in code_info_dict:
                    final_code_info["today"] = (
                        code_info_dict[code][2], code_info_dict[code][1], code_info_dict[code][6])
                # 加载历史
                if code in self.__history_plates_dict:
                    final_code_info["code_records"] = self.__history_plates_dict[code][1]
                # 加载板块
                if code in self.__blocks_dict:
                    final_code_info["plate"] = self.__blocks_dict[code][1]
                # 获取二级行业
                final_code_info["industry"] = global_util.code_industry_map.get(code)
                if code not in ignore_codes:
                    codes_info_list.append(final_code_info)
                fresult["code_list_info"] = codes_info_list
        response_data = json.dumps({"code": 0, "data": fresult})
        return response_data
    def do_GET(self):
        path = self.path
@@ -26,7 +235,7 @@
        response_data = ""
        if url.path == "/get_kpl_data":
            best_feng_kou = self.__kplDataManager.get_data(kpl_util.KPLDataType.BEST_FENG_KOU)
            best_feng_kou=best_feng_kou[:22]
            best_feng_kou = best_feng_kou[:22]
            feng_kou = self.__kplDataManager.get_data(kpl_util.KPLDataType.FENG_KOU)
            feng_kou = feng_kou[:22]
            industry_rank = self.__kplDataManager.get_data(kpl_util.KPLDataType.INDUSTRY_RANK)
@@ -35,6 +244,58 @@
            feng_xiang = feng_xiang[:22]
            response_data = json.dumps({"code": 0, "data": {"best_feng_kou": best_feng_kou, "feng_kou": feng_kou,
                                                            "industry_rank": industry_rank, "feng_xiang": feng_xiang}})
        elif url.path == "/get_score_info":
            ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()])
            code = ps_dict['code']
            name = ps_dict.get('name')
            data = code_info_output.get_output_params(code)
            if data["code_name"].find("None") > -1 and name:
                data["code_name"] = f"{name} {code}"
            self.__history_plates_dict[code] = (time.time(), data["kpl_code_info"]["code_records"])
            self.__blocks_dict[code] = (time.time(), data["kpl_code_info"]["plate"])
            response_data = json.dumps({"code": 0, "data": data})
            # 获取评分信息
            pass
        elif url.path == "/kpl/get_limit_up_list":
            response_data = self.__get_limit_up_list()
        elif url.path == "/kpl/get_plate_info":
            ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()])
            response_data = self.__get_plate_info(ps_dict)
        elif url.path == "/kpl/get_market_data":
            # 获取板块信息
            ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()])
            type_ = int(ps_dict['type'])
            result = []
            if type_ == 0:
                # 行业,主力净额倒序
                result = kpl_api.getMarketIndustryRealRankingInfo(True)
                result = kpl_util.parseMarketIndustry(result)
            elif type_ == 1:
                # 行业,主力净额顺序
                result = kpl_api.getMarketIndustryRealRankingInfo(False)
                result = kpl_util.parseMarketIndustry(result)
            elif type_ == 2:
                # 精选,主力净额倒序
                result = kpl_api.getMarketJingXuanRealRankingInfo(True)
                result = kpl_util.parseMarketJingXuan(result)
            elif type_ == 3:
                # 精选,主力净额顺序
                result = kpl_api.getMarketJingXuanRealRankingInfo(False)
                result = kpl_util.parseMarketJingXuan(result)
            response_data = json.dumps({"code": 0, "data": result})
        elif url.path == "/kpl/add_ignore_code":
            ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()])
            code = ps_dict['code']
            type_ = ps_dict['type']
            self.__IgnoreCodeManager.ignore_code(type_, code)
            response_data = json.dumps({"code": 0})
        self.send_response(200)
        # 发给请求客户端的响应数据
        self.send_header('Content-type', 'application/json')
@@ -54,7 +315,7 @@
        type_ = data["type"]
        print("开盘啦type:", type_)
        if type_ == KPLDataType.BIDDING.value:
            result_list = kpl_util.parseDaBanData(data["data"], kpl_util.DABAN_TYPE_LIMIT_UP)
            result_list = kpl_util.parseDaBanData(data["data"], kpl_util.DABAN_TYPE_BIDDING)
            # 竞价取前20
            if result_list:
                result_list.sort(key=lambda x: x[2])
@@ -107,9 +368,16 @@
                self.__kplDataManager.save_data(type_, result_list)
        elif type_ == KPLDataType.INDUSTRY_RANK.value:
            result_list = kpl_util.parseIndustryRank(data["data"])
            # 保存风向数据
            # 保存行业数据
            if result_list:
                self.__kplDataManager.save_data(type_, result_list)
                RealTimeKplMarketData.set_top_5_industry(result_list)
        elif type_ == KPLDataType.JINGXUAN_RANK.value:
            result_list = kpl_util.parseMarketJingXuan(data["data"])
            # 保存精选数据
            if result_list:
                self.__kplDataManager.save_data(type_, result_list)
                RealTimeKplMarketData.set_top_5_reasons(result_list)
        return json.dumps({"code": 0})
    def __send_response(self, data):