Administrator
2024-11-15 b53b0f632cca75df8f39a17fab3d26caeecb2caf
自定义板块净流入
6个文件已修改
294 ■■■■■ 已修改文件
huaxin_client/l1_client.py 43 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
log_module/log.py 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.py 13 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
servers/huaxin_trade_server.py 23 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/code_plate_key_manager.py 50 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/buy_radical/block_special_codes_manager.py 159 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l1_client.py
@@ -10,6 +10,9 @@
import xmdapi
from huaxin_client import tool, constant
from log_module.log import logger_system, logger_local_huaxin_l1, logger_l2_codes_subscript, logger_debug
from third_data import custom_block_in_money_manager
from third_data.custom_block_in_money_manager import BlockInMoneyRankManager, CodeInMoneyManager
from third_data.history_k_data_util import HistoryKDatasUtils, JueJinApi
from utils import tool as out_tool
################B类##################
@@ -132,7 +135,13 @@
        # (代码, 现价, 涨幅, 量, 当前时间, 买1价, 买1量, 买2价, 买2量, 更新时间)
        level1_data_dict[pMarketDataField.SecurityID] = (
            pMarketDataField.SecurityID, pMarketDataField.LastPrice, rate, pMarketDataField.Volume, time.time(),
            pMarketDataField.BidPrice1, pMarketDataField.BidVolume1, pMarketDataField.BidPrice2, pMarketDataField.BidVolume2, pMarketDataField.UpdateTime)
            pMarketDataField.BidPrice1, pMarketDataField.BidVolume1, pMarketDataField.BidPrice2,
            pMarketDataField.BidVolume2, pMarketDataField.UpdateTime)
        try:
            custom_block_in_money_manager.CodeInMoneyManager.set_market_info(pMarketDataField.SecurityID, lastPrice,
                                                                             close_price, pMarketDataField.Turnover)
        except:
            pass
__latest_subscript_codes = set()
@@ -208,12 +217,36 @@
        finally:
            time.sleep(3)
def __test_block_in_money():
    codes = BlockInMoneyRankManager().get_codes()
    page_size = 200
    total_page = len(codes) // page_size + 1
def run(queue_l1_w_strategy_r, queue_l1_r_strategy_w, fixed_codes=None):
    for i in range(0, total_page):
        temp_codes = codes[i * page_size: (i + 1) * page_size]
        print(temp_codes)
        # 获取最近的信息
        latest_infos = HistoryKDatasUtils.get_gp_latest_info(temp_codes, "sec_id,pre_close")
        pre_close_dict = {x["sec_id"]: x["pre_close"] for x in latest_infos}
        current_infos = JueJinApi.get_gp_current_info(temp_codes, "symbol,price,cum_amount")
        current_infos = {x["symbol"].split(".")[1]: (x["price"], x["cum_amount"]) for x in
                         current_infos}
        for code in current_infos:
            if code not in pre_close_dict:
                continue
            CodeInMoneyManager.set_market_info(code, current_infos[code][0], pre_close_dict[code],
                                               current_infos[code][1])
def run(queue_l1_w_strategy_r, queue_l1_r_strategy_w, queue_custom_block_in_money, fixed_codes=None):
    """
    运行l1订阅任务
    @param queue_l1_w_strategy_r: L1方写,策略方读
    @param queue_l1_r_strategy_w: L1方读,策略方写
    @param queue_custom_block_in_money: 板块流入流出计算结果
    @param fixed_codes: 固定要返回数据的代码
    @return:
    """
@@ -272,6 +305,9 @@
    threading.Thread(target=__run_subscript_task, args=(spi,), daemon=True).start()
    # TODO 测试
    __test_block_in_money()
    # 等待程序结束
    while True:
        print("数量", len(level1_data_dict))
@@ -307,6 +343,9 @@
            if len(datas) > 0:
                logger_l2_codes_subscript.info("开始#华鑫L1上传代码:数量-{}", len(datas))
                __upload_codes_info(queue_l1_w_strategy_r, datas)
            # 计算流入流出并上传
            custom_block_in_money_manager.BlockInMoneyRankManager().compute()
            queue_custom_block_in_money.put_nowait(custom_block_in_money_manager.BlockInMoneyRankManager().get_in_list(),  custom_block_in_money_manager.BlockInMoneyRankManager().get_out_list())
        except Exception as e:
            logging.exception(e)
            logger_debug.exception(e)
log_module/log.py
@@ -225,6 +225,10 @@
                   filter=lambda record: record["extra"].get("name") == "kpl_jx_out",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("kpl", "kpl_jx_in"),
                   filter=lambda record: record["extra"].get("name") == "kpl_jx_in",
                   rotation="00:00", compression="zip", enqueue=True)
        # 看盘日志
        logger.add(self.get_path("kp", "kp_msg"),
                   filter=lambda record: record["extra"].get("name") == "kp_msg",
@@ -438,6 +442,8 @@
logger_kpl_jx_out = __mylogger.get_logger("kpl_jx_out")
logger_kpl_jx_in = __mylogger.get_logger("kpl_jx_in")
logger_kp_msg = __mylogger.get_logger("kp_msg")
main.py
@@ -27,9 +27,10 @@
                      queue_l1_w_strategy_r_: multiprocessing.Queue,
                      queue_strategy_w_trade_r_: multiprocessing.Queue,
                      queue_strategy_w_trade_r_for_read_: multiprocessing.Queue, queue_l1_trade_r_strategy_w_,
                      queue_l1_trade_w_strategy_r_, trade_ipc_addr):
                      queue_l1_trade_w_strategy_r_, trade_ipc_addr, queue_custom_block_in_money):
    """
    策略进程
    @param pipe_server:
    @param queue_strategy_r_trade_w_:
    @param queue_l1_w_strategy_r_:
@@ -38,6 +39,7 @@
    @param queue_l1_trade_r_strategy_w_:
    @param queue_l1_trade_w_strategy_r_:
    @param trade_ipc_addr: 交易ipc地址(下单地址, 撤单地址)
    @param queue_custom_block_in_money: 接收板块流入流出
    @return:
    """
    logger_system.info("策略进程ID:{}", os.getpid())
@@ -58,11 +60,14 @@
    # redis后台服务
    t1 = threading.Thread(target=redis_manager.RedisUtils.run_loop, name="redis", daemon=True)
    t1.start()
    #
    # 启动华鑫交易服务
    huaxin_trade_server.run(queue_strategy_r_trade_w_, queue_l1_w_strategy_r_, queue_strategy_w_trade_r_,
                            queue_strategy_w_trade_r_for_read_,
                            queue_l1_trade_w_strategy_r_, trade_ipc_addr)
                            queue_l1_trade_w_strategy_r_, trade_ipc_addr, queue_custom_block_in_money)
# 主服务
@@ -94,6 +99,7 @@
        # l1
        queue_l1_w_strategy_r = multiprocessing.Queue()
        queue_l1_r_strategy_w = multiprocessing.Queue()
        queue_custom_block_in_money = multiprocessing.Queue()  # 接收板块流入流出
        # l1交易
        queue_l1_trade_w_strategy_r = multiprocessing.Queue()
        queue_l1_trade_r_strategy_w = multiprocessing.Queue()
@@ -115,6 +121,7 @@
        # L1订阅数据
        l1Process = multiprocessing.Process(target=huaxin_client.l1_client.run,
                                            args=(queue_l1_w_strategy_r, queue_l1_r_strategy_w,
                                                  queue_custom_block_in_money,
                                                  gpcode_manager.BuyOpenLimitUpCodeManager().get_codes(),))
        l1Process.start()
@@ -141,7 +148,7 @@
        # 主进程
        createTradeServer(pss_strategy, queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r,
                          queue_strategy_w_trade_r_for_read, queue_l1_trade_r_strategy_w,
                          queue_l1_trade_w_strategy_r, (order_ipc_addr, cancel_order_ipc_addr))
                          queue_l1_trade_w_strategy_r, (order_ipc_addr, cancel_order_ipc_addr), queue_custom_block_in_money)
        # 将tradeServer作为主进程
        l1Process.join()
servers/huaxin_trade_server.py
@@ -39,7 +39,7 @@
    hx_logger_l2_orderdetail, hx_logger_l2_market_data, logger_l2_g_cancel, logger_debug, \
    logger_system, logger_trade, logger_local_huaxin_l1_trade_info, logger_l2_codes_subscript, logger_l2_radical_buy
from third_data import block_info, kpl_data_manager, history_k_data_manager, huaxin_l1_data_manager
from third_data.code_plate_key_manager import KPLCodeJXBlockManager, CodePlateKeyBuyManager
from third_data.code_plate_key_manager import KPLCodeJXBlockManager, CodePlateKeyBuyManager, RealTimeKplMarketData
from third_data.history_k_data_util import JueJinApi
from trade import l2_trade_util, \
    trade_data_manager, trade_constant, buy_open_limit_up_strategy
@@ -577,6 +577,20 @@
                logger_debug.exception(e)
def __recv_pipe_block_in_money(queue_custom_block_in_money: multiprocessing.Queue):
    logger_system.info(f"trade_server __recv_pipe_block_in_money 线程ID:{tool.get_thread_id()}")
    if queue_custom_block_in_money is not None:
        while True:
            try:
                val = queue_custom_block_in_money.get()
                if val:
                    in_list, out_list = val[0], val[1]
                    RealTimeKplMarketData.set_market_jingxuan_blocks_from_custom(in_list)
                    RealTimeKplMarketData.set_market_jingxuan_out_blocks_from_custom(out_list)
            except Exception as e:
                logger_debug.exception(e)
# 排得太远撤单
def __cancel_buy_for_too_far():
    while True:
@@ -1019,7 +1033,7 @@
def run(queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_read,
        queue_l1_trade_w_strategy_r, trade_ipc_addr):
        queue_l1_trade_w_strategy_r, trade_ipc_addr, queue_custom_block_in_money):
    """
    @param queue_strategy_r_trade_w:
@@ -1028,6 +1042,7 @@
    @param queue_strategy_w_trade_r_for_read:
    @param queue_l1_trade_w_strategy_r:
    @param trade_ipc_addr: 交易IPC地址:(下单ipc地址,撤单ipc地址)
    @param queue_custom_block_in_money: 接收板块流入流出队列
    @return:
    """
    logger_system.info(f"trade_server 线程ID:{tool.get_thread_id()}")
@@ -1068,6 +1083,10 @@
        t1 = threading.Thread(target=lambda: async_log_util.l2_data_log.run_sync(), daemon=True)
        t1.start()
        # 读取板块资金流入
        t1 = threading.Thread(target=lambda: __recv_pipe_block_in_money(queue_custom_block_in_money), daemon=True)
        t1.start()
        logger_system.info("create TradeServer")
        t1 = threading.Thread(target=lambda: clear_invalid_client(), daemon=True)
        t1.start()
third_data/code_plate_key_manager.py
@@ -18,7 +18,7 @@
from log_module import async_log_util
from db import redis_manager_delegate as redis_manager
from log_module.log import logger_kpl_block_can_buy, logger_kpl_jx_out
from log_module.log import logger_kpl_block_can_buy, logger_kpl_jx_out, logger_kpl_jx_in
from third_data.kpl_util import KPLPlatManager
from trade import trade_manager, l2_trade_util, trade_constant
@@ -360,6 +360,8 @@
        #         break
        #     blocks.add(data[1])
        # cls.__top_jx_blocks = blocks
        if True:
            return
        blocks = set()
        for data in datas:
            if data[1] in constant.KPL_INVALID_BLOCKS:
@@ -371,6 +373,28 @@
                break
            blocks.add(kpl_util.filter_block(data[1]))
        # 记录精选流出日志
        async_log_util.info(logger_kpl_jx_in, f"原数据:{datas[:20]} 板块:{blocks}")
        cls.__top_jx_blocks = BlockMapManager().filter_blocks(blocks)
    @classmethod
    def set_market_jingxuan_blocks_from_custom(cls, datas):
        """
        设置自定义精选流入数据
        @param datas:[(板块,流入金额)]
        @return:
        """
        blocks = set()
        for data in datas:
            if data[0] in constant.KPL_INVALID_BLOCKS:
                continue
            if data[1] < 5e7:
                continue
            blocks.add(data[0])
            if len(blocks) >= 10:
                break
            blocks.add(kpl_util.filter_block(data[0]))
        # 记录精选流出日志
        async_log_util.info(logger_kpl_jx_in, f"原数据:{datas[:20]} 板块:{blocks}")
        cls.__top_jx_blocks = BlockMapManager().filter_blocks(blocks)
    @classmethod
@@ -380,6 +404,8 @@
        @param datas:
        @return:
        """
        if True:
            return
        blocks = set()
        for data in datas:
            if data[1] in constant.KPL_INVALID_BLOCKS:
@@ -396,6 +422,28 @@
        cls.__top_jx_out_blocks = BlockMapManager().filter_blocks(blocks)
    @classmethod
    def set_market_jingxuan_out_blocks_from_custom(cls, datas):
        """
        设置自定义精选流出数据
        @param datas:[(板块,流入金额)]
        @return:
        """
        blocks = set()
        for data in datas:
            if data[0] in constant.KPL_INVALID_BLOCKS:
                continue
            if data[1] > -5e7:
                # 过滤5千万以上的
                break
            blocks.add(data[0])
            if len(blocks) >= 10:
                break
            blocks.add(kpl_util.filter_block(data[0]))
        # 记录精选流出日志
        async_log_util.info(logger_kpl_jx_out, f"原数据:{datas[:10]} 板块:{blocks}")
        cls.__top_jx_out_blocks = BlockMapManager().filter_blocks(blocks)
    @classmethod
    def get_top_market_jingxuan_blocks(cls):
        return cls.__top_jx_blocks
trade/buy_radical/block_special_codes_manager.py
@@ -1,7 +1,9 @@
"""
板块辨识度票管理
"""
import constant
from db import mysql_data_delegate as mysql_data
from third_data.history_k_data_util import HistoryKDatasUtils
from utils import tool
@@ -37,6 +39,159 @@
            self.mysql.execute(sql)
class AnalysisBlockSpecialCodesManager:
    __mysql = mysql_data.Mysqldb()
    def __get_code_blocks(self, min_day, max_day):
        results = self.__mysql.select_all(
            f"SELECT r.`_hot_block_name`, r.`_code`, COUNT(*) FROM kpl_limit_up_record  r WHERE r.`_day`>'{min_day}' and r.`_day`<'{max_day}' and r._code not like '68%' group by  r.`_hot_block_name`, r.`_code`")
        return results
    def __get_limit_up_info(self, min_day):
        sql = f"SELECT r.`_code`, COUNT(r.`_code`),r.`_code_name`,IF( r.`_zylt_val` is null, 1, r.`_zylt_val`/100000000 )  FROM (SELECT * FROM kpl_limit_up_record  r ORDER BY r.`_create_time` DESC)  r WHERE r.`_day`>'{min_day}' GROUP BY r.`_code`"
        results = self.__mysql.select_all(sql)
        # {"代码":(涨停次数, 名称, 自由流通市值)}
        return {x[0]: (x[1], x[2], x[3]) for x in results}
    def __get_top(self, block, min_day, max_day):
        sql = f"SELECT r.`_code`,r.`_code_name`, COUNT(*), IF( r.`_zylt_val` is null, 1, r.`_zylt_val`/100000000 )  FROM (SELECT * FROM kpl_limit_up_record  r ORDER BY r.`_create_time` DESC)  r WHERE r.`_hot_block_name`='{block}' AND r.`_day`>'{min_day}' and r.`_day`<'{max_day}' and r._code not like '68%' GROUP BY r.`_code`"
        results = self.__mysql.select_all(sql)
        results = list(results)
        info_map = {x[1]: x for x in results}
        max_count = min(10, len(results) // 2)
        results.sort(key=lambda x: x[2], reverse=True)
        names1 = set([x[0] for x in results[:max_count]])
        results.sort(key=lambda x: x[3], reverse=True)
        names2 = set([x[0] for x in results[:max_count]])
        fnames = names1 & names2
        return fnames
    def __get_block_map(self):
        """
        获取板块裂变
        @return:
        """
        constant.get_path_prefix()
        with open(f"{constant.get_path_prefix()}/板块对应.txt", encoding="utf-8") as f:
            lines = f.readlines()
            block_map = {}
            for line in lines:
                line = line.strip()
                if line:
                    line = line.replace(",", ",").replace("丨", "|")
                    parent_blocks = set()
                    if line.find("|") >= 0:
                        sts = line.split("|")
                        parent_blocks |= set(sts[0].split(","))
                        line = sts[1]
                    blocks = line.split(",")
                    if not blocks:
                        continue
                    if not parent_blocks:
                        parent_blocks.add(blocks[0])
                    for b in blocks:
                        b = b.strip()
                        if b not in block_map:
                            block_map[b] = set()
                        block_map[b] |= parent_blocks
                    if len(parent_blocks) > 1:
                        # parent加入
                        for b in parent_blocks:
                            if b not in block_map:
                                block_map[b] = set()
                            block_map[b].add(b)
            return block_map
    def test_block_map(self):
        print(self.__get_block_map())
    def get_block_special_codes(self):
        """
        获取板块有辨识度的代码
        @return:
        """
        trading_dates = HistoryKDatasUtils.get_latest_trading_date(15)
        max_day = trading_dates[-1]
        min_day = tool.date_sub(max_day, 365)
        block_map = self.__get_block_map()
        # [(板块名称,代码, 在板块中的涨停次数)]
        code_block_infos = self.__get_code_blocks(min_day, max_day)
        code_block_dict = {}  # {"代码":{"板块": 涨停次数}}
        for b in code_block_infos:
            if b[1] not in code_block_dict:
                code_block_dict[b[1]] = {}
            bs = block_map.get(b[0])
            if not bs:
                bs = {b[0]}
            for bb in bs:
                if bb not in code_block_dict[b[1]]:
                    code_block_dict[b[1]][bb] = 0
                code_block_dict[b[1]][bb] += b[2]
        block_codes_dict = {}  # {"板块":[(代码,涨停次数)]}
        for code in code_block_dict:
            for b in code_block_dict[code]:
                if b not in block_codes_dict:
                    block_codes_dict[b] = []
                block_codes_dict[b].append((code, code_block_dict[code][b]))
        limit_up_info_map = self.__get_limit_up_info(min_day)
        fdatas = []
        for b in block_codes_dict:
            # if b != '人工智能':
            #     continue
            if b in constant.KPL_INVALID_BLOCKS:
                continue
            code_info_list = block_codes_dict[b]
            # code_info_list.sort(key=lambda x: x[1], reverse=True)
            max_count = len(code_info_list) // 3
            # code_info_list.sort(key=lambda x: float(limit_up_info_map[x[0]][2]), reverse=True)
            # code_info_list.sort(key=lambda x: code_block_dict[x[0]][b], reverse=True)
            zylt_list = code_info_list[:max_count]
            zylt_list = [x[0] for x in zylt_list]
            # fcodes = list(set(count_list) & set(zylt_list))
            # fcodes.sort(key=lambda x: code_block_dict[x][b], reverse=True)
            # ffcodes = fcodes[:5]
            index = 0
            # 获取股价,是否是ST
            if not zylt_list:
                continue
            juejin_results = HistoryKDatasUtils.get_gp_latest_info(zylt_list, fields="sec_id,sec_level,upper_limit")
            if juejin_results is None:
                continue
            juejin_result_dict = {x['sec_id']: (x['sec_id'], x['sec_level'], x['upper_limit']) for x in juejin_results}
            for code in zylt_list:
                if code_block_dict[code][b] <= 3:
                    # 累计涨停次数小于3次
                    continue
                if code not in juejin_result_dict:
                    # 查询不到信息
                    continue
                if juejin_result_dict[code][1] != 1:
                    # 非正常票
                    continue
                if juejin_result_dict[code][2] < 3:
                    # 小于3块
                    continue
                index += 1
                fdatas.append(
                    (b, limit_up_info_map[code][1], code, code_block_dict[code][b],
                     int(float(limit_up_info_map[code][2]))))
                if index >= 10:
                    break
        # BlockSpecialCodesManager().set_block_codes_list(fdatas)
        return fdatas
if __name__ == "__main__":
    codes = BlockSpecialCodesManager().get_block_codes("证券")
    print(codes)
    datas = AnalysisBlockSpecialCodesManager().get_block_special_codes()
    for d in datas:
        print(d)
    # BlockSpecialCodesManager().set_block_codes_list(datas)
    print(datas)