""" 自定义板块流入金额 """ import copy import os import constant from huaxin_client import l1_subscript_codes_manager from third_data.kpl_data_constant import LimitUpCodesBlockRecordManager from third_data.third_blocks_manager import BlockMapManager from utils import tool @tool.singleton class CodeInMoneyManager: def __init__(self): self.__code_money_dict = {} self.__load_data() def __load_data(self): _path = f"{constant.get_path_prefix()}/logs/huaxin_local/l2/transaction_big_order.{tool.get_now_date_str()}.log" if os.path.exists(_path): with open(_path) as f: lines = f.readlines() for line in lines: line = line.split(" - ")[1] item = eval(line) self.add_data(item) def add_data(self, item): code = item[0] if code not in self.__code_money_dict: self.__code_money_dict[code] = 0 if item[1] == 0: self.__code_money_dict[code] += item[2][2] else: self.__code_money_dict[code] -= item[2][2] def get_code_money_dict(self): return self.__code_money_dict def get_money(self, code): if code in self.__code_money_dict: return self.__code_money_dict.get(code) return 0 def set_money(self, code, money): self.__code_money_dict[code] = money @tool.singleton class BlockInMoneyRankManager: """ 板块流入流出管理 """ __code_blocks = {} __in_list = [] __out_list = [] def __load_codes(self): codes = [] codes_sh, codes_sz = l1_subscript_codes_manager.get_codes(False) for b in codes_sh: codes.append(b.decode()) for b in codes_sz: codes.append(b.decode()) return codes def __load_blocks(self): if self.codes: for code in self.codes: before_fblocks = LimitUpCodesBlockRecordManager().get_radical_buy_blocks(code) if not before_fblocks: before_fblocks = set() fblocks = BlockMapManager().filter_blocks(before_fblocks) if fblocks: fblocks -= constant.KPL_INVALID_BLOCKS self.__code_blocks[code] = fblocks def __init__(self): self.codes = self.__load_codes() self.__load_blocks() def get_codes(self): return self.codes def compute(self): codes = self.codes block_money = {} for code in codes: money = CodeInMoneyManager().get_money(code) if money is None: continue before_fblocks = LimitUpCodesBlockRecordManager().get_radical_buy_blocks(code) if not before_fblocks: before_fblocks = set() fblocks = BlockMapManager().filter_blocks(before_fblocks) if fblocks: fblocks -= constant.KPL_INVALID_BLOCKS for b in fblocks: if b not in block_money: block_money[b] = 0 block_money[b] += money temp_list = [(x, block_money[x]) for x in block_money] temp_list.sort(key=lambda x: x[1], reverse=True) self.__in_list = temp_list temp_list = copy.deepcopy(temp_list) temp_list.sort(key=lambda x: x[1]) self.__out_list = temp_list def get_in_list(self): return self.__in_list def get_out_list(self): return self.__out_list if __name__ == '__main__': print(CodeInMoneyManager().get_money("300264")) # BlockInMoneyRankManager().compute() # print(BlockInMoneyRankManager().get_in_list()[:20]) # print(BlockInMoneyRankManager().get_out_list()[:20])