Administrator
2024-05-27 ec1d64c71f86ff2564e709a75fda9f487acc2610
third_data/kpl_data_manager.py
@@ -9,12 +9,13 @@
import constant
from db.redis_manager_delegate import RedisUtils
from log_module import async_log_util
from log_module import async_log_util, log
from utils import tool
# 开盘啦历史涨停数据管理
from db import mysql_data_delegate as mysql_data, redis_manager_delegate as redis_manager
from log_module.log import logger_kpl_limit_up_reason_change, logger_debug, logger_kpl_limit_up
from log_module.log import logger_kpl_limit_up_reason_change, logger_debug, logger_kpl_limit_up, \
    logger_kpl_open_limit_up
from third_data import kpl_util, kpl_api
from third_data.code_plate_key_manager import LimitUpCodesPlateKeyManager, CodesHisReasonAndBlocksManager
@@ -48,8 +49,12 @@
    __LimitUpCodesPlateKeyManager = LimitUpCodesPlateKeyManager()
    __CodesPlateKeysManager = CodesHisReasonAndBlocksManager()
    __current_code_reasons_dict = {}
    # 当前涨停原因+推荐原因的代码集合
    __current_reason_codes_dict = {}
    # 当前涨停原因的代码集合
    __current_limit_up_reason_codes_dict = {}
    __records_cache = {}
    record_code_dict = {}
    @classmethod
    def __load_hist_and_blocks(cls, code):
@@ -67,9 +72,24 @@
    @classmethod
    def save_record(cls, day, records):
        # 统计炸板
        try:
            last_codes = set()
            if cls.latest_origin_datas:
                last_codes = set([x[0] for x in cls.latest_origin_datas])
            now_codes = set()
            if records:
                now_codes = set([x[0] for x in records])
            open_limit_up_codes = last_codes - now_codes
            if open_limit_up_codes:
                logger_kpl_open_limit_up.info(f"炸板代码:{open_limit_up_codes}")
        except Exception as e:
            pass
        # 统计代码所属板块
        code_block_dict = {}
        for data in records:
            cls.record_code_dict[data[0]] = data
            blocks = set(data[5].split("、"))
            code = data[0]
            for b in blocks:
@@ -77,13 +97,20 @@
                    code_block_dict[code] = set()
                code_block_dict[code].add(b)
                # 设置涨停数据
        if records:
            cls.latest_origin_datas = records
            cls.__LimitUpCodesPlateKeyManager.set_today_limit_up([(r[0], r[5]) for r in records])
            cls.__LimitUpCodesPlateKeyManager.set_today_limit_up(
                [(r[0], r[5], r[6].split('、') if r[6] else []) for r in records])
        code_reasons_dict = {}
        reason_codes_dict = {}
        limit_up_reason_codes_dict = {}
        for d in records:
            if d[5] not in limit_up_reason_codes_dict:
                limit_up_reason_codes_dict[d[5]] = set()
            limit_up_reason_codes_dict[d[5]].add(d[0])
            # 涨停原因 + 推荐原因
            bs = {d[5]}
            if d[6]:
@@ -95,6 +122,7 @@
                reason_codes_dict[b].add(d[0])
        cls.__current_code_reasons_dict = code_reasons_dict
        cls.__current_reason_codes_dict = reason_codes_dict
        cls.__current_limit_up_reason_codes_dict = limit_up_reason_codes_dict
        # 涨停数据记录
        mysqldb = mysql_data.Mysqldb()
@@ -143,7 +171,8 @@
    @classmethod
    def load_total_datas(cls):
        cls.total_datas = KPLLimitUpDataRecordManager.list_all(tool.get_now_date_str())
        cls.__LimitUpCodesPlateKeyManager.set_today_total_limit_up([(r[3], r[2]) for r in cls.total_datas])
        cls.__LimitUpCodesPlateKeyManager.set_today_total_limit_up(
            [(r[3], r[2], r[6].split("、") if r[6] else []) for r in cls.total_datas])
        for d in cls.total_datas:
            cls.__load_hist_and_blocks(d[3])
@@ -228,6 +257,10 @@
    @classmethod
    def get_current_reason_codes_dict(cls):
        return copy.deepcopy(cls.__current_reason_codes_dict)
    @classmethod
    def get_current_limit_up_reason_codes_dict(cls):
        return copy.deepcopy(cls.__current_limit_up_reason_codes_dict)
    @classmethod
    def get_current_reasons(cls):
@@ -390,65 +423,96 @@
    return __latest_current_limit_up_records.get(day)
# 运行拉取任务
def run_pull_task():
    def __upload_data(type, datas):
class PullTask:
    # 最近更新时间
    __latest_update_time_dict = {}
    @classmethod
    def __upload_data(cls, type, datas):
        root_data = {
            "type": type,
            "data": datas
        }
        requests.post("http://127.0.0.1:9004/upload_kpl_data", json.dumps(root_data))
    def get_limit_up():
    @classmethod
    def repaire_pull_task(cls):
        """
        修复拉取任务
        @return:
        """
        # 修复涨停
        logger_debug.info("任务修复-开盘啦:启动修复")
        key = "limit_up"
        if key not in cls.__latest_update_time_dict or time.time() - cls.__latest_update_time_dict[key] > 20:
            logger_debug.info("任务修复-开盘啦:涨停列表")
            # 大于20s就需要更新
            threading.Thread(target=cls.run_limit_up_task, daemon=True).start()
    @classmethod
    def run_limit_up_task(cls):
        # 关闭log
        log.close_print()
        while True:
            if tool.is_trade_time() or True:
                try:
            try:
                if (tool.is_trade_time() and int(tool.get_now_time_str().replace(':', '')) > int("092530")):
                    results = kpl_api.getLimitUpInfoNew()
                    result = json.loads(results)
                    start_time = time.time()
                    __upload_data("limit_up", result)
                except Exception as e:
                    cls.__upload_data("limit_up", result)
            except Exception as e:
                try:
                    logging.exception(e)
            time.sleep(3)
    def get_bidding_money():
        # 竞价数据上传
        while True:
            if int("092600") < int(tool.get_now_time_str().replace(":", "")) < int("092700"):
                try:
                    results = kpl_api.daBanList(kpl_api.DABAN_TYPE_BIDDING)
                    result = json.loads(results)
                    __upload_data("biddings", result)
                except Exception as e:
                    pass
            time.sleep(3)
    def get_market_industry():
        while True:
            if tool.is_trade_time():
                try:
                    results = kpl_api.getMarketIndustryRealRankingInfo()
                    result = json.loads(results)
                    __upload_data("industry_rank", result)
                    logger_debug.exception(e)
                except:
                    pass
            time.sleep(3)
            except:
                pass
            finally:
                cls.__latest_update_time_dict["limit_up"] = time.time()
                time.sleep(3)
    def get_market_jingxuan():
        while True:
            if tool.is_trade_time():
                try:
                    results = kpl_api.getMarketJingXuanRealRankingInfo()
                    result = json.loads(results)
                    __upload_data("jingxuan_rank", result)
                except:
                    pass
            time.sleep(3)
    @classmethod
    # 运行拉取任务
    def run_pull_task(cls):
        def get_bidding_money():
            # 竞价数据上传
            while True:
                if int("092600") < int(tool.get_now_time_str().replace(":", "")) < int("092700"):
                    try:
                        results = kpl_api.daBanList(kpl_api.DABAN_TYPE_BIDDING)
                        result = json.loads(results)
                        cls.__upload_data("biddings", result)
                    except Exception as e:
                        pass
                time.sleep(3)
    threading.Thread(target=get_limit_up, daemon=True).start()
    # threading.Thread(target=get_bidding_money, daemon=True).start()
    # threading.Thread(target=get_market_industry, daemon=True).start()
    # threading.Thread(target=get_market_jingxuan, daemon=True).start()
        def get_market_industry():
            while True:
                if tool.is_trade_time():
                    try:
                        results = kpl_api.getMarketIndustryRealRankingInfo()
                        result = json.loads(results)
                        cls.__upload_data("industry_rank", result)
                    except:
                        pass
                time.sleep(3)
        def get_market_jingxuan():
            while True:
                if tool.is_trade_time():
                    try:
                        results = kpl_api.getMarketJingXuanRealRankingInfo()
                        result = json.loads(results)
                        cls.__upload_data("jingxuan_rank", result)
                    except:
                        pass
                time.sleep(3)
        threading.Thread(target=cls.run_limit_up_task, daemon=True).start()
        # threading.Thread(target=get_bidding_money, daemon=True).start()
        # threading.Thread(target=get_market_industry, daemon=True).start()
        # threading.Thread(target=get_market_jingxuan, daemon=True).start()
if __name__ == "__main__":