""" 自定义板块流入金额 """ import copy import itertools import os import constant import l2_data_util from db import mysql_data_delegate as mysql_data from huaxin_client import l1_subscript_codes_manager from third_data.kpl_data_constant import LimitUpCodesBlockRecordManager from third_data.third_blocks_manager import BlockMapManager from utils import tool, global_util @tool.singleton class CodeInMoneyManager: def __init__(self): # 总的净流入 self.__code_money_dict = {} # 总买单信息:{"code":[金额, 数量]} self.__code_buy_money_dict = {} # 总卖单信息:{"code":[金额, 数量]} self.__code_sell_money_dict = {} # 净流入大单买金额 self.__code_big_buy_money_list_dict = {} # 净流出大单卖金额 self.__code_big_sell_money_list_dict = {} self.__latest_price = {} self.__load_data() def __load_data(self): _path = f"{constant.get_path_prefix()}/logs/huaxin_local/l2/transaction_big_order.{tool.get_now_date_str()}.log" if os.path.exists(_path): with open(_path) as f: lines = f.readlines() for line in lines: line = line.split(" - ")[1] item = eval(line) self.add_data(item) def add_data(self, item): """ 添加数据 @param item: (代码,类型, 订单数据) 订单数据:(订单号, 量, 金额, 时间, 最新成交价格) @return: """ code = item[0] if code not in self.__code_money_dict: self.__code_money_dict[code] = 0 if code not in self.__code_buy_money_dict: self.__code_buy_money_dict[code] = [0, 0] if code not in self.__code_sell_money_dict: self.__code_sell_money_dict[code] = [0, 0] if not tool.is_ge_code(code): big_money = l2_data_util.get_big_money_val(item[2][4]) if item[2][2] < big_money: # 不是常规定义的大单就返回 return if tool.is_ge_code(code) and item[2][2] < 299e4 and item[2][1] < 290000: return if item[1] == 0: # item[2]的数据结构: (买单号, 量, 金额, 时间, 最新成交价格) self.__code_money_dict[code] += item[2][2] self.__code_buy_money_dict[code][0] += item[2][2] self.__code_buy_money_dict[code][1] += 1 if code not in self.__code_big_buy_money_list_dict: self.__code_big_buy_money_list_dict[code] = [] # 大买单信息:(金额, 最新价格, 订单号) if len(item[2]) >= 5: self.__code_big_buy_money_list_dict[code].append((item[2][2], item[2][4], item[2][0])) else: self.__code_money_dict[code] -= item[2][2] self.__code_sell_money_dict[code][0] += item[2][2] self.__code_sell_money_dict[code][1] += 1 # 大卖单信息 if code not in self.__code_big_sell_money_list_dict: self.__code_big_sell_money_list_dict[code] = [] if len(item[2]) >= 5: # 大卖单信息:(金额, 最新价格, 订单号) self.__code_big_sell_money_list_dict[code].append((item[2][2], item[2][4], item[2][0])) self.__latest_price[code] = item[2][4] def get_code_money_dict(self): return self.__code_money_dict def get_money(self, code): if code in self.__code_money_dict: return self.__code_money_dict.get(code) return 0 def get_money_info(self, code): """ 获取代码流入信息 @param code: 代码信息 @return: 净流入金额,[大单买金额, 大单买数量],[大单卖金额,大单卖数量] """ return self.__code_money_dict.get(code), self.__code_buy_money_dict.get(code), self.__code_sell_money_dict.get( code) def set_money(self, code, money): self.__code_money_dict[code] = money def get_big_buy_money_list(self, code): """ 获取代码的大买单列表 @param code: @return:[(金额, 价格, 订单号)] """ return self.__code_big_buy_money_list_dict.get(code) def get_big_sell_money_list(self, code): """ 获取代码的大买单列表 @param code: @return:[(金额, 价格, 订单号)] """ return self.__code_big_sell_money_list_dict.get(code) def get_latest_price(self, code): return self.__latest_price.get(code) @tool.singleton class BlockInMoneyRankManager: """ 板块流入流出管理 """ __mysql_db = mysql_data.Mysqldb() __code_blocks = {} __in_list = [] __out_list = [] # 最近这段时间的代码涨停次数 __history_code_limit_up_count = {} # 被排除的代码 __exclude_codes = set() def __load_codes(self): codes = [] codes_sh, codes_sz = l1_subscript_codes_manager.get_codes(False) for b in codes_sh: codes.append(b.decode()) for b in codes_sz: codes.append(b.decode()) return codes def __load_blocks(self): if self.codes: for code in self.codes: before_fblocks = LimitUpCodesBlockRecordManager().get_radical_buy_blocks(code) if not before_fblocks: before_fblocks = set() fblocks = BlockMapManager().filter_blocks(before_fblocks) if fblocks: fblocks -= constant.KPL_INVALID_BLOCKS self.__code_blocks[code] = fblocks def __load_exclude_codes(self): """ 获取之前4个交易日的数据 @return: """ max_day = tool.get_now_date_str() min_day = tool.date_sub(max_day, 30) sql = f"select * from (select distinct(r.`_day`) as 'day' from `kpl_limit_up_record` r where r.`_day`<'{max_day}' and r.`_day`>'{min_day}') a order by a.day desc limit 4" results = self.__mysql_db.select_all(sql) dates = [r[0] for r in results] # 获取之前4天涨停次数>=2次的代码 day_codes = {} for day in dates: sql = f"select distinct(r._code) from kpl_limit_up_record r where r.`_day`='{day}'" results = self.__mysql_db.select_all(sql) day_codes[day] = set([x[0] for x in results]) codes_list = [day_codes[k] for k in day_codes] # 统计代码的涨停天数 code_limit_up_count_dict = {} for codes in codes_list: for c in codes: if c not in code_limit_up_count_dict: code_limit_up_count_dict[c] = 0 code_limit_up_count_dict[c] += 1 self.__history_code_limit_up_count = code_limit_up_count_dict self.load_today_limit_up_codes() def load_today_limit_up_codes(self): # 加载今日涨停代码 day = tool.get_now_date_str() sql = f"select distinct(r._code) from kpl_limit_up_record r where r.`_day`='{day}'" results = self.__mysql_db.select_all(sql) codes = set([x[0] for x in results]) # 计算需要排除的代码 temp_limit_up_count = {} # 统计总共涨停天数 for c in self.__history_code_limit_up_count: if c in codes: temp_limit_up_count[c] = self.__history_code_limit_up_count[c] + 1 else: temp_limit_up_count[c] = self.__history_code_limit_up_count[c] exclude_codes = set() for c in temp_limit_up_count: if temp_limit_up_count[c] < 3: continue exclude_codes.add(c) self.__exclude_codes = exclude_codes def __init__(self): self.codes = self.__load_codes() self.__load_blocks() self.__load_exclude_codes() print("排除的代码", self.__exclude_codes) def get_codes(self): return self.codes def compute(self): codes = self.codes block_money = {} for code in codes: if code in self.__exclude_codes: continue money = CodeInMoneyManager().get_money(code) if money is None: continue # 大自由流通市值的流出不算 if money < 0: price = CodeInMoneyManager().get_latest_price(code) zylt_volume = global_util.zylt_volume_map.get(code) if price and zylt_volume and zylt_volume * price > 200e8: continue before_fblocks = LimitUpCodesBlockRecordManager().get_radical_buy_blocks(code) if not before_fblocks: before_fblocks = set() fblocks = BlockMapManager().filter_blocks(before_fblocks) if fblocks: fblocks -= constant.KPL_INVALID_BLOCKS for b in fblocks: if b not in block_money: block_money[b] = 0 block_money[b] += money temp_list = [(x, block_money[x]) for x in block_money] temp_list.sort(key=lambda x: x[1], reverse=True) self.__in_list = temp_list temp_list = copy.deepcopy(temp_list) temp_list.sort(key=lambda x: x[1]) self.__out_list = temp_list def get_block_codes_money(self, block): """ 获取板块中代码的流入流出 @param block: @return:(代码, 金额, 是否被排除) """ codes = self.codes fdatas = [] for code in codes: if code in self.__exclude_codes: continue money = CodeInMoneyManager().get_money(code) if money is None: continue # 大自由流通市值的流出不算 if money < 0: price = CodeInMoneyManager().get_latest_price(code) zylt_volume = global_util.zylt_volume_map.get(code) if price and zylt_volume and zylt_volume * price > 200e8: continue before_fblocks = LimitUpCodesBlockRecordManager().get_radical_buy_blocks(code) if not before_fblocks: before_fblocks = set() fblocks = BlockMapManager().filter_blocks(before_fblocks) if block not in fblocks: continue fdatas.append((code, money, code in self.__exclude_codes)) return fdatas def get_in_list(self): return self.__in_list def get_out_list(self): return self.__out_list if __name__ == '__main__': code = "600839" before_fblocks = LimitUpCodesBlockRecordManager().get_radical_buy_blocks(code) print(before_fblocks) # print(CodeInMoneyManager().get_money("300264")) # BlockInMoneyRankManager().compute() # print(BlockInMoneyRankManager().get_in_list()[:20]) # print(BlockInMoneyRankManager().get_out_list()[:20])