Administrator
2024-09-25 fd635fe4ec7e52226f700d79d58ca7b36c875927
代码板块匹配
4个文件已修改
2个文件已添加
360 ■■■■ 已修改文件
api/outside_api_command_callback.py 38 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_manager_new.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
servers/huaxin_trade_server.py 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/code_plate_key_manager.py 100 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/third_blocks_manager.py 198 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utils/trade_util.py 12 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
api/outside_api_command_callback.py
@@ -1,4 +1,5 @@
import concurrent.futures
import copy
import json
import logging
import threading
@@ -34,12 +35,13 @@
    logger_real_place_order_position, logger_device
from output import l2_output_util
from third_data import kpl_data_manager, kpl_util, history_k_data_manager, huaxin_l1_data_manager
from third_data.code_plate_key_manager import CodePlateKeyBuyManager
from third_data.code_plate_key_manager import CodePlateKeyBuyManager, KPLCodeJXBlockManager
from third_data.history_k_data_manager import HistoryKDataManager
from third_data.history_k_data_util import JueJinApi, HistoryKDatasUtils
from third_data.kpl_data_manager import KPLDataManager
from third_data.kpl_limit_up_data_manager import CodeLimitUpSequenceManager
from third_data.kpl_util import KPLDataType
from third_data.third_blocks_manager import CodeThirdBlocksManager, SOURCE_TYPE_KPL, BlockMapManager
from trade import trade_manager, l2_trade_util, trade_data_manager, trade_constant
import l2_data_util as l2_data_util_old
@@ -180,7 +182,8 @@
                        msg_list.append(f"撤卖单数量:{sell_count}")
                    except Exception as e:
                        logger_debug.exception(e)
                    can_cancel = l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, "手动撤单", cancel_type=trade_constant.CANCEL_TYPE_HUMAN)
                    can_cancel = l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, "手动撤单",
                                                                                     cancel_type=trade_constant.CANCEL_TYPE_HUMAN)
                    if not can_cancel:
                        msg_list.append(f"无法撤买单")
                    else:
@@ -298,7 +301,8 @@
                if operate == outside_api_command_manager.OPERRATE_SET:
                    # 先手动撤单
                    try:
                        l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, "手动拉黑", cancel_type=trade_constant.CANCEL_TYPE_HUMAN)
                        l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, "手动拉黑",
                                                                            cancel_type=trade_constant.CANCEL_TYPE_HUMAN)
                    except Exception as e:
                        logger_debug.exception(e)
                    l2_trade_util.forbidden_trade(code, msg="手动加入 trade_server")
@@ -1057,7 +1061,8 @@
                            try:
                                limit_up_sequence = CodeLimitUpSequenceManager.get_current_limit_up_sequence(code)
                                if limit_up_sequence:
                                    fdata['block'] = f"{limit_up_sequence[0]}-{limit_up_sequence[1]}({limit_up_sequence[2]}&{limit_up_sequence[2] - limit_up_sequence[3]})"
                                    fdata[
                                        'block'] = f"{limit_up_sequence[0]}-{limit_up_sequence[1]}({limit_up_sequence[2]}&{limit_up_sequence[2] - limit_up_sequence[3]})"
                            except:
                                pass
                            # 获取涨停时间
@@ -1210,9 +1215,30 @@
                self.send_response({"code": 0, "data": {}, "msg": f""},
                                   client_id,
                                   request_id)
            elif ctype == "get_code_third_blocks":
                # 获取第三方板块数据
                code = data["code"]
                source_dict = copy.deepcopy(CodeThirdBlocksManager().get_source_blocks(code))
                source_origin_dict = copy.deepcopy(CodeThirdBlocksManager().get_source_blocks_origin(code))
                kpl_blocks = KPLCodeJXBlockManager().get_jx_blocks_radical(code)
                if kpl_blocks is None:
                    kpl_blocks = set()
                match_blocks, info = CodeThirdBlocksManager().get_intersection_blocks_info(code, kpl_blocks)
                source_origin_dict[SOURCE_TYPE_KPL] = kpl_blocks
                source_dict[SOURCE_TYPE_KPL] = BlockMapManager().filter_blocks(kpl_blocks)
                data = {
                    "blocks": {},
                    "origin_blocks": {},
                    "match_blocks": list(match_blocks)
                }
                for s in source_origin_dict:
                    data["origin_blocks"][s] = list(source_origin_dict[s])
                for s in source_dict:
                    data["blocks"][s] = list(source_dict[s])
                self.send_response({"code": 0, "data": data, "msg": f""},
                                   client_id,
                                   request_id)
        except Exception as e:
            logging.exception(e)
            self.send_response({"code": 1, "msg": f"数据处理出错:{e}"}, client_id, request_id)
l2/l2_data_manager_new.py
@@ -17,7 +17,7 @@
from l2.place_order_single_data_manager import L2TradeSingleDataProcessor
from log_module import async_log_util, log_export
from third_data import kpl_data_manager, block_info
from utils import global_util, ths_industry_util, tool, buy_condition_util, buy_strategy_util
from utils import global_util, ths_industry_util, tool, buy_condition_util, buy_strategy_util, trade_util
import l2_data_util
from db import redis_manager_delegate as redis_manager
from third_data.code_plate_key_manager import CodePlateKeyBuyManager, KPLCodeJXBlockManager
@@ -735,7 +735,7 @@
        with cls.__buy_lock_dict[code]:
            # 判断是否可以下单,不处于可下单状态需要返回
            state = cls.__CodesTradeStateManager.get_trade_state_cache(code)
            if state == trade_constant.TRADE_STATE_BUY_DELEGATED or state == trade_constant.TRADE_STATE_BUY_PLACE_ORDER or state == trade_constant.TRADE_STATE_BUY_SUCCESS:
            if not trade_util.is_can_order_by_state(state):
                # 不处于可下单状态
                return False
            __start_time = tool.get_now_timestamp()
servers/huaxin_trade_server.py
@@ -53,7 +53,7 @@
from trade.sell.sell_rule_manager import TradeRuleManager
from trade.trade_data_manager import RadicalBuyDealCodesManager
from trade.trade_manager import CodesTradeStateManager
from utils import socket_util, middle_api_protocol, tool, huaxin_util, global_util
from utils import socket_util, middle_api_protocol, tool, huaxin_util, global_util, trade_util
trade_data_request_queue = queue.Queue()
@@ -768,6 +768,12 @@
    def OnLimitUpActiveBuy(self, code, transaction_datas):
        try:
            # 判断是否处于可下单状态
            state = CodesTradeStateManager().get_trade_state_cache(code)
            if not trade_util.is_can_order_by_state(state):
                # 不处于可下单状态
                return
            # 判断最近60个交易日有无涨停
            # 判断昨日是否涨停过
            async_log_util.info(logger_l2_radical_buy, f"涨停主动买:{code}-{transaction_datas[-1]}")
third_data/code_plate_key_manager.py
@@ -15,6 +15,7 @@
from third_data.history_k_data_manager import HistoryKDataManager
from third_data.history_k_data_util import HistoryKDatasUtils
from third_data.kpl_data_constant import LimitUpDataConstant
from third_data.third_blocks_manager import BlockMapManager, CodeThirdBlocksManager
from trade.radical_buy_data_manager import RedicalBuyDataManager
from utils import global_util, tool, buy_condition_util, init_data_util
from log_module import log, async_log_util
@@ -34,16 +35,41 @@
    __code_blocks = {}
    # 备用
    __code_by_blocks = {}
    # 激进买的代码板块
    __code_blocks_for_radical_buy = {}
    __instance = None
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(KPLCodeJXBlockManager, cls).__new__(cls, *args, **kwargs)
            cls.__load_data()
        return cls.__instance
    def __get_redis(self):
        return self.__redisManager.getRedis()
    @classmethod
    def __load_data(cls):
        keys = RedisUtils.keys(cls.__get_redis(), "kpl_jx_blocks_by-*")
        if keys:
            for k in keys:
                val = RedisUtils.get(cls.__get_redis(), k)
                val = json.loads(val)
                cls.__code_by_blocks[k.split("-")[1]] = (val, time.time())
        keys = RedisUtils.keys(cls.__get_redis(), "kpl_jx_blocks-*")
        if keys:
            for k in keys:
                val = RedisUtils.get(cls.__get_redis(), k)
                val = json.loads(val)
                cls.__code_blocks[k.split("-")[1]] = (val, time.time())
        keys = RedisUtils.keys(cls.__get_redis(), "kpl_jx_blocks_radical-*")
        if keys:
            for k in keys:
                val = RedisUtils.get(cls.__get_redis(), k)
                val = json.loads(val)
                cls.__code_blocks_for_radical_buy[k.split("-")[1]] = (val, time.time())
    @classmethod
    def __get_redis(cls):
        return cls.__redisManager.getRedis()
    def save_jx_blocks(self, code, blocks: list, current_limit_up_blocks: set, by=False):
        if not blocks:
@@ -64,28 +90,18 @@
            RedisUtils.setex_async(self.__db, f"kpl_jx_blocks-{code}", tool.get_expire(), json.dumps(final_blocks))
            self.__code_blocks[code] = (final_blocks, time.time())
    # 获取精选板块
    def get_jx_blocks(self, code, by=False):
        if by:
            if code in self.__code_by_blocks:
                return self.__code_by_blocks[code]
            val = RedisUtils.get(self.__get_redis(), f"kpl_jx_blocks_by-{code}")
            if val is None:
                return None
            else:
                val = json.loads(val)
                self.__code_by_blocks[code] = val
            return self.__code_by_blocks[code]
        else:
            if code in self.__code_blocks:
                return self.__code_blocks[code]
            val = RedisUtils.get(self.__get_redis(), f"kpl_jx_blocks-{code}")
            if val is None:
                return None
            else:
                val = json.loads(val)
                self.__code_blocks[code] = val
            return self.__code_blocks[code]
    def save_jx_blocks_for_radical_buy(self, code, blocks: list):
        if not blocks:
            return
        RedisUtils.setex_async(self.__db, f"kpl_jx_blocks_radical-{code}", tool.get_expire(), json.dumps(blocks))
        self.__code_blocks_for_radical_buy[code] = (blocks, time.time())
    # 获取精选板块(激进买)
    def get_jx_blocks_radical(self, code):
        blocks_info = self.__code_blocks_for_radical_buy.get(code)
        if blocks_info:
            return set(blocks_info[0])
        return None
    def get_jx_blocks_cache(self, code, by=False):
        if by:
@@ -130,8 +146,6 @@
                                async_log_util.info(logger_kpl_block_can_buy,
                                                    f"{code}:获取到精选板块(更新)-{blocks}  耗时:{int(time.time() - start_time)}s")
                                self.save_jx_blocks(code, blocks, current_limit_up_blocks)
                elif price_rate > 0.03:
                    # 添加备用板块
                    if not self.get_jx_blocks_cache(code, by=True):
@@ -140,6 +154,16 @@
                        self.save_jx_blocks(code, blocks, current_limit_up_blocks, by=True)
                        async_log_util.info(logger_kpl_block_can_buy,
                                            f"{code}:获取到精选板块(备用)-{blocks}  耗时:{int(time.time() - start_time)}s")
                if price_rate > 0.03:
                    if not self.__code_blocks_for_radical_buy.get(code):
                        start_time = time.time()
                        blocks = kpl_api.getCodeJingXuanBlocks(code, jx=False)
                        blocks = set([b[1] for b in blocks])
                        # fblocks = BlockMapManager().filter_blocks(blocks)
                        async_log_util.info(logger_kpl_block_can_buy,
                                            f"{code}:获取到板块(激进买) 过滤前-{blocks} 耗时:{int(time.time() - start_time)}s")
                        self.save_jx_blocks_for_radical_buy(code, list(blocks))
        except Exception as e:
            logger_kpl_block_can_buy.error(f"{code} 获取板块出错")
            logger_kpl_block_can_buy.exception(e)
@@ -558,6 +582,13 @@
            keys |= k4
        keys = keys - set(constant.KPL_INVALID_BLOCKS)
        return keys, k1, k11, k2, k3, k4
    def get_plate_keys_for_radical_buy(self, code):
        """
        激进买入的板块
        @param code:
        @return:
        """
class CodePlateKeyBuyManager:
@@ -1128,6 +1159,8 @@
            if d[6]:
                blocks |= set(d[6].split("、"))
            blocks -= constant.KPL_INVALID_BLOCKS
            # 过滤
            blocks = BlockMapManager().filter_blocks(blocks)
            temp_dict[code] = (kpl_util.get_high_level_count(d[4]), blocks)
        kpl_data_constant.open_limit_up_code_dict_for_radical_buy = temp_dict
@@ -1153,6 +1186,7 @@
            blocks = LimitUpDataConstant.get_blocks_with_history(_code)
            if not blocks:
                blocks = set()
            blocks = BlockMapManager().filter_blocks(blocks)
            if _code == code:
                # 获取当前代码涨停时间
                limit_up_time = int(k[2])
@@ -1205,6 +1239,7 @@
            if _code in exclude_codes:
                continue
            blocks = LimitUpDataConstant.get_blocks_with_history(_code)
            blocks = BlockMapManager().filter_blocks(blocks)
            if _code == code:
                # 获取当前代码涨停时间
                limit_up_time = int(k[5])
@@ -1378,9 +1413,18 @@
            for c in open_limit_up_code_dict:
                open_limit_up_blocks |= open_limit_up_code_dict[c][1]
        # 获取代码的板块
        keys_, k1_, k11_, k2_, k3_, k4_ = cls.__TargetCodePlateKeyManager.get_plate_keys(code, contains_today=False)
        match_blocks = open_limit_up_blocks & keys_
        keys_ = KPLCodeJXBlockManager().get_jx_blocks_radical(code)
        if not keys_:
            return set(), "没获取到板块"
        keys_ = BlockMapManager().filter_blocks(keys_)
        if not keys_:
            return set(), "过滤后没获取到板块"
        # 获取交集
        keys_ = CodeThirdBlocksManager().get_intersection_blocks_info(code, keys_)
        if not keys_:
            return set(), "没获取到板块交集"
        match_blocks = open_limit_up_blocks & keys_
        can_buy_blocks = set()
        fmsges = []
        msges = []
third_data/third_blocks_manager.py
New file
@@ -0,0 +1,198 @@
"""
三方板块管理
"""
import json
from itertools import combinations
from db.mysql_data_delegate import Mysqldb
from third_data import kpl_api
SOURCE_TYPE_KPL = 1  # 东方财富
SOURCE_TYPE_TDX = 2  # 通达信
SOURCE_TYPE_THS = 3  # 同花顺
SOURCE_TYPE_EASTMONEY = 4  # 东方财富
class CodeThirdBlocksManager:
    __instance = None
    __mysql = Mysqldb()
    """
    代码的三方板块管理
    """
    # 代码板块:{code:{1:{"b1","b2"},2:{"c1","c2"}}}
    __code_source_blocks_dict = {}
    __code_source_blocks_dict_origin = {}
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(CodeThirdBlocksManager, cls).__new__(cls, *args, **kwargs)
            cls.__load_data()
        return cls.__instance
    @classmethod
    def __load_data(cls):
        results = cls.__mysql.select_all("select _code, _source, _blocks from code_third_blocks")
        cls.__code_source_blocks_dict.clear()
        for result in results:
            if result[0] not in cls.__code_source_blocks_dict:
                cls.__code_source_blocks_dict[result[0]] = {}
                cls.__code_source_blocks_dict_origin[result[0]] = {}
            blocks = set(result[2].split("、"))
            cls.__code_source_blocks_dict_origin[result[0]][result[1]] = blocks
            cls.__code_source_blocks_dict[result[0]][result[1]] = BlockMapManager().filter_blocks(blocks)
    def get_source_blocks(self, code):
        """
        根据代码获取到来源板块映射
        @param code:
        @return:
        """
        return self.__code_source_blocks_dict.get(code)
    def get_source_blocks_origin(self, code):
        """
        根据代码获取到来源板块映射
        @param code:
        @return:
        """
        return self.__code_source_blocks_dict_origin.get(code)
    def get_intersection_blocks_info(self, code, blocks):
        # 获取交集
        bs = []
        b1 = BlockMapManager().filter_blocks(blocks)
        if b1:
            bs.append(b1)
        sb_dict = self.__code_source_blocks_dict.get(code)
        if sb_dict:
            for s in sb_dict:
                if sb_dict[s]:
                    bs.append(sb_dict[s])
        if len(bs) < 2:
            return set(), bs
        s_count = len(bs)
        fblocks = set()
        # 求2个平台的交集
        for ces in combinations(bs, 2):
            ic = None
            for c in ces:
                if ic is None:
                    ic = set(c)
                ic &= c
            if ic:
                fblocks |= ic
        return fblocks, bs
    def set_blocks(self, code, blocks, source_type):
        """
        设置代码来源的板块
        @param code:
        @param blocks:
        @param source_type:
        @return:
        """
        # 更新缓存数据
        if code not in self.__code_source_blocks_dict:
            self.__code_source_blocks_dict[code] = {}
        if blocks:
            self.__code_source_blocks_dict[code][source_type] = BlockMapManager().filter_blocks(set(blocks))
            self.__code_source_blocks_dict_origin[code][source_type] = set(blocks)
        _id = f"{code}_{source_type}"
        blocks = "、".join(blocks)
        results = self.__mysql.select_one(f"select * from code_third_blocks where _id ='{_id}'")
        if results:
            # 更新数据
            self.__mysql.execute(
                f"update code_third_blocks  set _blocks = '{blocks}', _update_time = now() where _id ='{_id}'")
        else:
            self.__mysql.execute(
                "insert into code_third_blocks(_id, _code, _source, _blocks,_create_time) values('{}','{}',{},'{}',now())".format(
                    _id, code, source_type, blocks))
    def list(self, code=None, source_type=None, max_update_time=None):
        sql = f"select * from code_third_blocks where 1=1 "
        if code:
            sql += f" and _code = '{code}'"
        if source_type is not None:
            sql += f" and _source={source_type}"
        if max_update_time:
            sql += f" and (_update_time is null or _update_time<'{max_update_time}')"
        return self.__mysql.select_all(sql)
    def list_all_blocks(self, source_type):
        sql = f"select _blocks from code_third_blocks where _source = '{source_type}'"
        return self.__mysql.select_all(sql)
class BlockMapManager:
    """
    板块映射管理
    """
    __mysql = Mysqldb()
    __instance = None
    __block_map = {}
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(BlockMapManager, cls).__new__(cls, *args, **kwargs)
            cls.__load_data()
        return cls.__instance
    @classmethod
    def __load_data(cls):
        results = cls.__mysql.select_all("select origin_block,blocks from block_map")
        cls.__block_map.clear()
        for result in results:
            cls.__block_map[result[0]] = set(result[1].split("、"))
    def set_block_map(self, origin_block, blocks):
        if not blocks:
            blocks = {origin_block}
        blocks_str = "、".join(blocks)
        result = self.__mysql.select_one(f"select * from block_map where origin_block='{origin_block}'")
        if result:
            # 更新
            self.__mysql.execute(
                f"update block_map set blocks='{blocks_str}', update_time=now() where origin_block='{origin_block}'")
        else:
            self.__mysql.execute(
                f"insert into block_map(origin_block, blocks, create_time) values('{origin_block}','{blocks_str}', now())")
    def get_map_blocks_cache(self, block):
        """
        获取映射好的板块
        @param block:
        @return:
        """
        return self.__block_map.get(block)
    def filter_blocks(self, blocks):
        """
        批量过滤板块
        @param blocks:
        @return:
        """
        if blocks is None or len(blocks) == 0:
            return set()
        fbs = set()
        for block in blocks:
            if block.endswith("概念"):
                block = block[:-2]
            b = self.get_map_blocks_cache(block)
            if b:
                fbs |= b
        return fbs
    def get_all_blocks(self):
        return self.__block_map.keys()
if __name__ == '__main__':
    code = "000761"
    blocks = kpl_api.getCodeJingXuanBlocks(code, jx=False)
    blocks = set([b[1] for b in blocks])
    print(CodeThirdBlocksManager().get_intersection_blocks_info(code, blocks))
utils/trade_util.py
New file
@@ -0,0 +1,12 @@
from trade import trade_constant
def is_can_order_by_state(state):
    """
    根据状态判断是否可以下单
    @param state:
    @return:
    """
    if state == trade_constant.TRADE_STATE_BUY_DELEGATED or state == trade_constant.TRADE_STATE_BUY_PLACE_ORDER or state == trade_constant.TRADE_STATE_BUY_SUCCESS:
        return False
    return True