"""
|
自定义板块流入金额
|
"""
|
import copy
|
|
import constant
|
from huaxin_client import l1_subscript_codes_manager
|
from third_data.kpl_data_constant import LimitUpCodesBlockRecordManager
|
from third_data.third_blocks_manager import BlockMapManager
|
from utils import tool
|
|
|
@tool.singleton
|
class CodeInMoneyManager:
|
def __init__(self):
|
self.__code_money_dict = {}
|
self.__load_data()
|
|
def __load_data(self):
|
with open(f"{constant.get_path_prefix()}\\logs\\huaxin_local\\l2\\upload.{tool.get_now_date_str()}.log") as f:
|
lines = f.readlines()
|
for line in lines:
|
line = line.split(" - ")[1]
|
item = eval(line)
|
self.add_data(item)
|
|
def add_data(self, item):
|
code = item[0]
|
if code not in self.__code_money_dict:
|
self.__code_money_dict[code] = 0
|
if item[1] == 0:
|
self.__code_money_dict[code] += item[2][2]
|
else:
|
self.__code_money_dict[code] -= item[2][2]
|
|
def get_code_money_dict(self):
|
return self.__code_money_dict
|
|
def get_money(self, code):
|
if code in self.__code_money_dict:
|
return self.__code_money_dict.get(code)
|
return 0
|
|
def set_money(self, code, money):
|
self.__code_money_dict[code] = money
|
|
|
@tool.singleton
|
class BlockInMoneyRankManager:
|
"""
|
板块流入流出管理
|
"""
|
|
__code_blocks = {}
|
|
__in_list = []
|
__out_list = []
|
|
def __load_codes(self):
|
codes = []
|
codes_sh, codes_sz = l1_subscript_codes_manager.get_codes(False)
|
for b in codes_sh:
|
codes.append(b.decode())
|
for b in codes_sz:
|
codes.append(b.decode())
|
return codes
|
|
def __load_blocks(self):
|
if self.codes:
|
for code in self.codes:
|
before_fblocks = LimitUpCodesBlockRecordManager().get_radical_buy_blocks(code)
|
if not before_fblocks:
|
before_fblocks = set()
|
fblocks = BlockMapManager().filter_blocks(before_fblocks)
|
if fblocks:
|
fblocks -= constant.KPL_INVALID_BLOCKS
|
self.__code_blocks[code] = fblocks
|
|
def __init__(self):
|
self.codes = self.__load_codes()
|
self.__load_blocks()
|
|
def get_codes(self):
|
return self.codes
|
|
def compute(self):
|
codes = self.codes
|
block_money = {}
|
for code in codes:
|
money = CodeInMoneyManager().get_money(code)
|
if money is None:
|
continue
|
before_fblocks = LimitUpCodesBlockRecordManager().get_radical_buy_blocks(code)
|
if not before_fblocks:
|
before_fblocks = set()
|
fblocks = BlockMapManager().filter_blocks(before_fblocks)
|
if fblocks:
|
fblocks -= constant.KPL_INVALID_BLOCKS
|
for b in fblocks:
|
if b not in block_money:
|
block_money[b] = 0
|
block_money[b] += money
|
temp_list = [(x, block_money[x]) for x in block_money]
|
temp_list.sort(key=lambda x: x[1], reverse=True)
|
self.__in_list = temp_list
|
temp_list = copy.deepcopy(temp_list)
|
temp_list.sort(key=lambda x: x[1])
|
self.__out_list = temp_list
|
|
def get_in_list(self):
|
return self.__in_list
|
|
def get_out_list(self):
|
return self.__out_list
|
|
|
if __name__ == '__main__':
|
print(CodeInMoneyManager().get_money("300264"))
|
# BlockInMoneyRankManager().compute()
|
# print(BlockInMoneyRankManager().get_in_list()[:20])
|
# print(BlockInMoneyRankManager().get_out_list()[:20])
|