Administrator
2025-06-10 efe62c0c92bee36da5179f34bb73e8ee4db6f814
third_data/custom_block_in_money_manager.py
@@ -2,36 +2,91 @@
自定义板块流入金额
"""
import copy
import itertools
import os
import constant
import l2_data_util
from db import mysql_data_delegate as mysql_data
from huaxin_client import l1_subscript_codes_manager
from third_data.kpl_data_constant import LimitUpCodesBlockRecordManager
from third_data.third_blocks_manager import BlockMapManager
from utils import tool
from utils import tool, global_util
@tool.singleton
class CodeInMoneyManager:
    def __init__(self):
        # 总的净流入
        self.__code_money_dict = {}
        # 总买单信息:{"code":[金额, 数量]}
        self.__code_buy_money_dict = {}
        # 总卖单信息:{"code":[金额, 数量]}
        self.__code_sell_money_dict = {}
        # 净流入大单买金额
        self.__code_big_buy_money_list_dict = {}
        # 净流出大单卖金额
        self.__code_big_sell_money_list_dict = {}
        self.__latest_price = {}
        self.__load_data()
    def __load_data(self):
        with open(f"{constant.get_path_prefix()}\\logs\\huaxin_local\\l2\\upload.{tool.get_now_date_str()}.log") as f:
            lines = f.readlines()
            for line in lines:
                line = line.split(" - ")[1]
                item = eval(line)
                self.add_data(item)
        _path = f"{constant.get_path_prefix()}/logs/huaxin_local/l2/transaction_big_order.{tool.get_now_date_str()}.log"
        if os.path.exists(_path):
            with open(_path) as f:
                lines = f.readlines()
                for line in lines:
                    line = line.split(" - ")[1]
                    item = eval(line)
                    self.add_data(item)
    def add_data(self, item):
        """
        添加数据
        @param item: (代码,类型, 订单数据)  订单数据:(订单号, 量, 金额, 时间, 最新成交价格)
        @return:
        """
        code = item[0]
        if code not in self.__code_money_dict:
            self.__code_money_dict[code] = 0
        if code not in self.__code_buy_money_dict:
            self.__code_buy_money_dict[code] = [0, 0]
        if code not in self.__code_sell_money_dict:
            self.__code_sell_money_dict[code] = [0, 0]
        if not tool.is_ge_code(code):
            big_money = l2_data_util.get_big_money_val(item[2][4])
            if item[2][2] < big_money:
                # 不是常规定义的大单就返回
                return
        if tool.is_ge_code(code) and item[2][2] < 299e4 and item[2][1] < 290000:
            return
        if item[1] == 0:
            # item[2]的数据结构:  (买单号, 量, 金额, 时间, 最新成交价格)
            self.__code_money_dict[code] += item[2][2]
            self.__code_buy_money_dict[code][0] += item[2][2]
            self.__code_buy_money_dict[code][1] += 1
            if code not in self.__code_big_buy_money_list_dict:
                self.__code_big_buy_money_list_dict[code] = []
            # 大买单信息:(金额, 最新价格, 订单号)
            if len(item[2]) >= 5:
                self.__code_big_buy_money_list_dict[code].append((item[2][2], item[2][4], item[2][0]))
        else:
            self.__code_money_dict[code] -= item[2][2]
            self.__code_sell_money_dict[code][0] += item[2][2]
            self.__code_sell_money_dict[code][1] += 1
            # 大卖单信息
            if code not in self.__code_big_sell_money_list_dict:
                self.__code_big_sell_money_list_dict[code] = []
            if len(item[2]) >= 5:
                # 大卖单信息:(金额, 最新价格, 订单号)
                self.__code_big_sell_money_list_dict[code].append((item[2][2], item[2][4], item[2][0]))
        self.__latest_price[code] = item[2][4]
    def get_code_money_dict(self):
        return self.__code_money_dict
@@ -41,8 +96,36 @@
            return self.__code_money_dict.get(code)
        return 0
    def get_money_info(self, code):
        """
        获取代码流入信息
        @param code: 代码信息
        @return: 净流入金额,[大单买金额, 大单买数量],[大单卖金额,大单卖数量]
        """
        return self.__code_money_dict.get(code), self.__code_buy_money_dict.get(code), self.__code_sell_money_dict.get(
            code)
    def set_money(self, code, money):
        self.__code_money_dict[code] = money
    def get_big_buy_money_list(self, code):
        """
        获取代码的大买单列表
        @param code:
        @return:[(金额, 价格, 订单号)]
        """
        return self.__code_big_buy_money_list_dict.get(code)
    def get_big_sell_money_list(self, code):
        """
        获取代码的大买单列表
        @param code:
        @return:[(金额, 价格, 订单号)]
        """
        return self.__code_big_sell_money_list_dict.get(code)
    def get_latest_price(self, code):
        return self.__latest_price.get(code)
@tool.singleton
@@ -50,11 +133,15 @@
    """
    板块流入流出管理
    """
    __mysql_db = mysql_data.Mysqldb()
    __code_blocks = {}
    __in_list = []
    __out_list = []
    # 最近这段时间的代码涨停次数
    __history_code_limit_up_count = {}
    # 被排除的代码
    __exclude_codes = set()
    def __load_codes(self):
        codes = []
@@ -76,9 +163,59 @@
                    fblocks -= constant.KPL_INVALID_BLOCKS
                self.__code_blocks[code] = fblocks
    def __load_exclude_codes(self):
        """
        获取之前4个交易日的数据
        @return:
        """
        max_day = tool.get_now_date_str()
        min_day = tool.date_sub(max_day, 30)
        sql = f"select * from (select distinct(r.`_day`) as 'day' from `kpl_limit_up_record` r where r.`_day`<'{max_day}' and r.`_day`>'{min_day}') a order by a.day desc limit 4"
        results = self.__mysql_db.select_all(sql)
        dates = [r[0] for r in results]
        # 获取之前4天涨停次数>=2次的代码
        day_codes = {}
        for day in dates:
            sql = f"select distinct(r._code) from kpl_limit_up_record r where r.`_day`='{day}'"
            results = self.__mysql_db.select_all(sql)
            day_codes[day] = set([x[0] for x in results])
        codes_list = [day_codes[k] for k in day_codes]
        # 统计代码的涨停天数
        code_limit_up_count_dict = {}
        for codes in codes_list:
            for c in codes:
                if c not in code_limit_up_count_dict:
                    code_limit_up_count_dict[c] = 0
                code_limit_up_count_dict[c] += 1
        self.__history_code_limit_up_count = code_limit_up_count_dict
        self.load_today_limit_up_codes()
    def load_today_limit_up_codes(self):
        # 加载今日涨停代码
        day = tool.get_now_date_str()
        sql = f"select distinct(r._code) from kpl_limit_up_record r where r.`_day`='{day}'"
        results = self.__mysql_db.select_all(sql)
        codes = set([x[0] for x in results])
        # 计算需要排除的代码
        temp_limit_up_count = {}
        # 统计总共涨停天数
        for c in self.__history_code_limit_up_count:
            if c in codes:
                temp_limit_up_count[c] = self.__history_code_limit_up_count[c] + 1
            else:
                temp_limit_up_count[c] = self.__history_code_limit_up_count[c]
        exclude_codes = set()
        for c in temp_limit_up_count:
            if temp_limit_up_count[c] < 3:
                continue
            exclude_codes.add(c)
        self.__exclude_codes = exclude_codes
    def __init__(self):
        self.codes = self.__load_codes()
        self.__load_blocks()
        self.__load_exclude_codes()
        print("排除的代码", self.__exclude_codes)
    def get_codes(self):
        return self.codes
@@ -87,9 +224,18 @@
        codes = self.codes
        block_money = {}
        for code in codes:
            if code in self.__exclude_codes:
                continue
            money = CodeInMoneyManager().get_money(code)
            if money is None:
                continue
            # 大自由流通市值的流出不算
            if money < 0:
                price = CodeInMoneyManager().get_latest_price(code)
                zylt_volume = global_util.zylt_volume_map.get(code)
                if price and zylt_volume and zylt_volume * price > 200e8:
                    continue
            before_fblocks = LimitUpCodesBlockRecordManager().get_radical_buy_blocks(code)
            if not before_fblocks:
                before_fblocks = set()
@@ -107,6 +253,35 @@
        temp_list.sort(key=lambda x: x[1])
        self.__out_list = temp_list
    def get_block_codes_money(self, block):
        """
        获取板块中代码的流入流出
        @param block:
        @return:(代码, 金额, 是否被排除)
        """
        codes = self.codes
        fdatas = []
        for code in codes:
            if code in self.__exclude_codes:
                continue
            money = CodeInMoneyManager().get_money(code)
            if money is None:
                continue
            # 大自由流通市值的流出不算
            if money < 0:
                price = CodeInMoneyManager().get_latest_price(code)
                zylt_volume = global_util.zylt_volume_map.get(code)
                if price and zylt_volume and zylt_volume * price > 200e8:
                    continue
            before_fblocks = LimitUpCodesBlockRecordManager().get_radical_buy_blocks(code)
            if not before_fblocks:
                before_fblocks = set()
            fblocks = BlockMapManager().filter_blocks(before_fblocks)
            if block not in fblocks:
                continue
            fdatas.append((code, money, code in self.__exclude_codes))
        return fdatas
    def get_in_list(self):
        return self.__in_list
@@ -115,7 +290,11 @@
if __name__ == '__main__':
    print(CodeInMoneyManager().get_money("300264"))
    code = "600839"
    before_fblocks = LimitUpCodesBlockRecordManager().get_radical_buy_blocks(code)
    print(before_fblocks)
    # print(CodeInMoneyManager().get_money("300264"))
    # BlockInMoneyRankManager().compute()
    # print(BlockInMoneyRankManager().get_in_list()[:20])
    # print(BlockInMoneyRankManager().get_out_list()[:20])