"""
|
自定义板块流入金额
|
"""
|
import copy
|
|
import constant
|
from huaxin_client import l1_subscript_codes_manager
|
from third_data.kpl_data_constant import LimitUpCodesBlockRecordManager
|
from third_data.third_blocks_manager import BlockMapManager
|
from utils import tool
|
|
|
class CodeInMoneyManager:
|
"""
|
单个票流入的金额计算
|
"""
|
__code_money_dict = {}
|
|
@classmethod
|
def set_market_info(cls, code, price, pre_price, current_amount):
|
"""
|
设置市场行情信息
|
@param code:
|
@param price:
|
@param pre_price:
|
@param current_amount:
|
@return:
|
"""
|
money = (price - pre_price) * current_amount / pre_price
|
cls.__code_money_dict[code] = int(money)
|
|
@classmethod
|
def get_money(cls, code):
|
return cls.__code_money_dict.get(code)
|
|
|
@tool.singleton
|
class BlockInMoneyRankManager:
|
"""
|
板块流入流出管理
|
"""
|
|
__code_blocks = {}
|
|
__in_list = []
|
__out_list = []
|
|
def __load_codes(self):
|
codes = []
|
codes_sh, codes_sz = l1_subscript_codes_manager.get_codes(False)
|
for b in codes_sh:
|
codes.append(b.decode())
|
for b in codes_sz:
|
codes.append(b.decode())
|
return codes
|
|
def __load_blocks(self):
|
if self.codes:
|
for code in self.codes:
|
before_fblocks = LimitUpCodesBlockRecordManager().get_radical_buy_blocks(code)
|
if not before_fblocks:
|
before_fblocks = set()
|
fblocks = BlockMapManager().filter_blocks(before_fblocks)
|
if fblocks:
|
fblocks -= constant.KPL_INVALID_BLOCKS
|
self.__code_blocks[code] = fblocks
|
|
def __init__(self):
|
self.codes = self.__load_codes()
|
self.__load_blocks()
|
|
def get_codes(self):
|
return self.codes
|
|
def compute(self):
|
codes = self.codes
|
block_money = {}
|
for code in codes:
|
money = CodeInMoneyManager.get_money(code)
|
if money is None:
|
continue
|
before_fblocks = LimitUpCodesBlockRecordManager().get_radical_buy_blocks(code)
|
if not before_fblocks:
|
before_fblocks = set()
|
fblocks = BlockMapManager().filter_blocks(before_fblocks)
|
if fblocks:
|
fblocks -= constant.KPL_INVALID_BLOCKS
|
for b in fblocks:
|
if b not in block_money:
|
block_money[b] = 0
|
block_money[b] += money
|
temp_list = [(x, block_money[x]) for x in block_money]
|
temp_list.sort(key=lambda x: x[1], reverse=True)
|
self.__in_list = temp_list
|
temp_list = copy.deepcopy(temp_list)
|
temp_list.sort(key=lambda x: x[1])
|
self.__out_list = temp_list
|
|
def get_in_list(self):
|
return self.__in_list
|
|
def get_out_list(self):
|
return self.__out_list
|