Administrator
2024-10-17 9e5b50a69a64aad3d396e1808344645ad2c25efc
引入精选流出到买入策略
9个文件已修改
139 ■■■■ 已修改文件
api/outside_api_command_callback.py 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
code_attribute/first_target_code_data_processor.py 12 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
log_module/log.py 8 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
servers/data_server.py 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/code_plate_key_manager.py 47 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/kpl_api.py 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/kpl_data_manager.py 37 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/kpl_util.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/huaxin_trade_data_update.py 20 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
api/outside_api_command_callback.py
@@ -1191,6 +1191,8 @@
                kpl_data_manager.PullTask.repaire_pull_task()
                # 修复数据服务
                server_util.repaire_data_server()
                # 任务修复
                huaxin_trade_data_update.repaire_task()
                self.send_response({"code": 0, "data": {}}, client_id, request_id)
            elif ctype == "get_trade_queue":
                code = data["code"]
code_attribute/first_target_code_data_processor.py
@@ -141,12 +141,12 @@
                            l2_trade_util.forbidden_trade(code,
                                                          f"无辨识度,涨停价({limit_up_price})>50")
                            continue
                    # if code_nature_analyse.is_price_too_high_in_days(code, volumes_data, limit_up_price)[
                    #     0] and code.find("30") != 0:
                    #     # 判断是否太高
                    #     l2_trade_util.forbidden_trade(code, "6天内股价长得太高")
                    #     continue
                    #     pass
                    if code_nature_analyse.is_price_too_high_in_days(code, volumes_data, limit_up_price)[
                        0] and code.find("30") != 0:
                        # 判断是否太高
                        l2_trade_util.forbidden_trade(code, "6天内股价长得太高")
                        continue
                        pass
                    if code_nature_analyse.is_continue_limit_up_not_enough_fall_dwon(code, volumes_data):
                        # 判断是否太高
log_module/log.py
@@ -217,6 +217,10 @@
                   filter=lambda record: record["extra"].get("name") == "kpl_open_limit_up",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("kpl", "kpl_jx_out"),
                   filter=lambda record: record["extra"].get("name") == "kpl_jx_out",
                   rotation="00:00", compression="zip", enqueue=True)
        # 看盘日志
        logger.add(self.get_path("kp", "kp_msg"),
                   filter=lambda record: record["extra"].get("name") == "kp_msg",
@@ -428,6 +432,10 @@
logger_kpl_open_limit_up = __mylogger.get_logger("kpl_open_limit_up")
logger_kpl_jx_out = __mylogger.get_logger("kpl_jx_out")
logger_kp_msg = __mylogger.get_logger("kp_msg")
logger_redis_debug = __mylogger.get_logger("redis_debug")
servers/data_server.py
@@ -908,6 +908,12 @@
            if result_list:
                self.__kplDataManager.save_data(type_, result_list)
                RealTimeKplMarketData.set_market_jingxuan_blocks(result_list)
        elif type_ == KPLDataType.JINGXUAN_RANK_OUT.value:
            result_list = kpl_util.parseMarketJingXuan(data["data"])
            # 保存精选数据
            if result_list:
                self.__kplDataManager.save_data(type_, result_list)
                RealTimeKplMarketData.set_market_jingxuan_out_blocks(result_list)
        return json.dumps({"code": 0})
    def __send_response(self, data):
third_data/code_plate_key_manager.py
@@ -21,7 +21,7 @@
from log_module import log, async_log_util
from db import redis_manager_delegate as redis_manager
from log_module.log import logger_kpl_block_can_buy, logger_debug
from log_module.log import logger_kpl_block_can_buy, logger_debug, logger_kpl_jx_out
from third_data.kpl_util import KPLPlatManager
from trade import trade_manager, l2_trade_util, trade_constant
@@ -350,11 +350,18 @@
    __KPLPlateForbiddenManager = KPLPlateForbiddenManager()
    __LimitUpCodesPlateKeyManager = LimitUpCodesPlateKeyManager()
    __KPLPlatManager = KPLPlatManager()
    # 精选前几
    # 精选流入前几
    __top_jx_blocks = set()
    # 精选流出前几
    __top_jx_out_blocks = set()
    @classmethod
    def set_market_jingxuan_blocks(cls, datas):
        """
        设置精选流入数据
        @param datas:
        @return:
        """
        blocks = set()
        for data in datas:
            if data[3] <= 0:
@@ -363,8 +370,33 @@
        cls.__top_jx_blocks = blocks
    @classmethod
    def set_market_jingxuan_out_blocks(cls, datas):
        """
        设置精选流出数据
        @param datas:
        @return:
        """
        blocks = set()
        for i in range(0, len(datas)):
            if i >= 10:
                break
            data = datas[i]
            if data[3] > 0 - 5e7:
                # 过滤5千万以上的
                break
            blocks.add(kpl_util.filter_block(data[1]))
        # 记录精选流出日志
        async_log_util.info(logger_kpl_jx_out, f"原数据:{datas[:10]} 板块:{blocks}")
        cls.__top_jx_out_blocks = blocks
    @classmethod
    def get_top_market_jingxuan_blocks(cls):
        return cls.__top_jx_blocks
    @classmethod
    def get_top_market_jingxuan_out_blocks(cls):
        return cls.__top_jx_out_blocks
    @classmethod
    def set_top_5_industry(cls, datas):
@@ -418,12 +450,12 @@
    __blocks_dict = {}
    __instance = None
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(CodesHisReasonAndBlocksManager, cls).__new__(cls, *args, **kwargs)
        return cls.__instance
    def __get_redis(self):
        return self.__redisManager.getRedis()
@@ -903,6 +935,11 @@
        fresults = []
        if not keys:
            return fresults, set()
        # 获取精选流出板块
        jx_out_blocks = RealTimeKplMarketData.get_top_market_jingxuan_out_blocks()
        if jx_out_blocks:
            keys -= jx_out_blocks
        code_limit_up_reasons_dict = {}
        load_code_block()
        for block in keys:
@@ -1507,6 +1544,10 @@
        kpl_history_blocks = CodesHisReasonAndBlocksManager().get_history_blocks_cache(code)
        if kpl_history_blocks:
            fblocks |= BlockMapManager().filter_blocks(kpl_history_blocks)
        jx_out_blocks = RealTimeKplMarketData.get_top_market_jingxuan_out_blocks()
        if jx_out_blocks:
            fblocks-=jx_out_blocks
        return fblocks, match_blocks
    @classmethod
third_data/kpl_api.py
@@ -269,10 +269,11 @@
if __name__ == "__main__":
    print(getLimitUpInfoNew())
    # print(getLimitUpInfoNew())
    # __getConceptBK("300564")
    # data = (getMarketJingXuanRealRankingInfo())
    data = (getMarketJingXuanRealRankingInfo(False))
    print(data)
    # data=json.loads(data)
    # print(len(data["list"]))
    # data = json.loads(getCodesByPlate("801235"))
third_data/kpl_data_manager.py
@@ -480,9 +480,15 @@
            threading.Thread(target=cls.run_limit_up_task, daemon=True).start()
        key = "jingxuan_rank"
        if key not in cls.__latest_update_time_dict or time.time() - cls.__latest_update_time_dict[key] > 20:
            logger_debug.info("任务修复-开盘啦:精选列表")
            logger_debug.info("任务修复-开盘啦:精选流入列表")
            # 大于20s就需要更新
            threading.Thread(target=cls.run_market_jingxuan, daemon=True).start()
            threading.Thread(target=cls.run_market_jingxuan_in, daemon=True).start()
        key = "jingxuan_rank_out"
        if key not in cls.__latest_update_time_dict or time.time() - cls.__latest_update_time_dict[key] > 20:
            logger_debug.info("任务修复-开盘啦:精选流出列表")
            # 大于20s就需要更新
            threading.Thread(target=cls.run_market_jingxuan_out, daemon=True).start()
    @classmethod
    def run_limit_up_task(cls):
@@ -508,7 +514,11 @@
                time.sleep(3)
    @classmethod
    def run_market_jingxuan(cls):
    def run_market_jingxuan_in(cls):
        """
        精选流入
        @return:
        """
        while True:
            try:
                if tool.is_trade_time():
@@ -519,6 +529,24 @@
                pass
            finally:
                cls.__latest_update_time_dict["jingxuan_rank"] = time.time()
                time.sleep(3)
    @classmethod
    def run_market_jingxuan_out(cls):
        """
        精选流出
        @return:
        """
        while True:
            try:
                if tool.is_trade_time():
                    results = kpl_api.getMarketJingXuanRealRankingInfo(False)
                    result = json.loads(results)
                    cls.__upload_data("jingxuan_rank_out", result)
            except:
                pass
            finally:
                cls.__latest_update_time_dict["jingxuan_rank_out"] = time.time()
                time.sleep(3)
    @classmethod
@@ -565,7 +593,8 @@
        threading.Thread(target=cls.run_limit_up_task, daemon=True).start()
        # threading.Thread(target=get_bidding_money, daemon=True).start()
        # threading.Thread(target=get_market_industry, daemon=True).start()
        threading.Thread(target=cls.run_market_jingxuan, daemon=True).start()
        threading.Thread(target=cls.run_market_jingxuan_in, daemon=True).start()
        threading.Thread(target=cls.run_market_jingxuan_out, daemon=True).start()
if __name__ == "__main__":
third_data/kpl_util.py
@@ -65,7 +65,7 @@
    FENG_XIANG = "feng_xiang"
    INDUSTRY_RANK = "industry_rank"
    JINGXUAN_RANK = "jingxuan_rank"
    JINGXUAN_RANK_OUT = "jingxuan_rank_out"
def __parseDaBanItemData(data, type):
    if type == DABAN_TYPE_BIDDING:
trade/huaxin/huaxin_trade_data_update.py
@@ -143,10 +143,30 @@
            time.sleep(0.01)
def get_request_queue_size():
    """
    获取请求队列的大小
    @return:
    """
    return trade_data_request_queue.qsize()
def repaire_task():
    """
    任务修复
    @return:
    """
    queue_size = get_request_queue_size()
    if queue_size<2:
        return
    threading.Thread(target=lambda: __read_update_task_queue(), daemon=True).start()
def __add_data(data):
    trade_data_request_queue.put_nowait(data)
def add_delegate_list(source, delay=0):
    __add_data({"type": "delegate_list", "delay": delay})
    async_log_util.info(hx_logger_trade_debug, f"请求委托列表,来源:{source}")