"""
|
自定义板块流入金额
|
"""
|
import copy
|
import itertools
|
import os
|
|
import constant
|
import l2_data_util
|
from db import mysql_data_delegate as mysql_data
|
from huaxin_client import l1_subscript_codes_manager
|
from third_data.kpl_data_constant import LimitUpCodesBlockRecordManager
|
from third_data.third_blocks_manager import BlockMapManager
|
from utils import tool, global_util
|
|
|
@tool.singleton
|
class CodeInMoneyManager:
|
def __init__(self):
|
# 总的净流入
|
self.__code_money_dict = {}
|
# 总买单信息:{"code":[金额, 数量]}
|
self.__code_buy_money_dict = {}
|
# 总卖单信息:{"code":[金额, 数量]}
|
self.__code_sell_money_dict = {}
|
|
# 净流入大单买金额
|
self.__code_big_buy_money_list_dict = {}
|
# 净流出大单卖金额
|
self.__code_big_sell_money_list_dict = {}
|
self.__latest_price = {}
|
self.__load_data()
|
|
def __load_data(self):
|
_path = f"{constant.get_path_prefix()}/logs/huaxin_local/l2/transaction_big_order.{tool.get_now_date_str()}.log"
|
if os.path.exists(_path):
|
with open(_path) as f:
|
lines = f.readlines()
|
for line in lines:
|
line = line.split(" - ")[1]
|
item = eval(line)
|
self.add_data(item)
|
|
def add_data(self, item):
|
"""
|
添加数据
|
@param item: (代码,类型, 订单数据) 订单数据:(订单号, 量, 金额, 时间, 最新成交价格)
|
@return:
|
"""
|
code = item[0]
|
if code not in self.__code_money_dict:
|
self.__code_money_dict[code] = 0
|
|
if code not in self.__code_buy_money_dict:
|
self.__code_buy_money_dict[code] = [0, 0]
|
|
if code not in self.__code_sell_money_dict:
|
self.__code_sell_money_dict[code] = [0, 0]
|
|
if not tool.is_ge_code(code):
|
big_money = l2_data_util.get_big_money_val(item[2][4])
|
if item[2][2] < big_money:
|
# 不是常规定义的大单就返回
|
return
|
if tool.is_ge_code(code) and item[2][2] < 299e4 and item[2][1] < 290000:
|
return
|
if item[1] == 0:
|
# item[2]的数据结构: (买单号, 量, 金额, 时间, 最新成交价格)
|
self.__code_money_dict[code] += item[2][2]
|
self.__code_buy_money_dict[code][0] += item[2][2]
|
self.__code_buy_money_dict[code][1] += 1
|
|
if code not in self.__code_big_buy_money_list_dict:
|
self.__code_big_buy_money_list_dict[code] = []
|
# 大买单信息:(金额, 最新价格, 订单号)
|
if len(item[2]) >= 5:
|
self.__code_big_buy_money_list_dict[code].append((item[2][2], item[2][4], item[2][0]))
|
else:
|
self.__code_money_dict[code] -= item[2][2]
|
self.__code_sell_money_dict[code][0] += item[2][2]
|
self.__code_sell_money_dict[code][1] += 1
|
# 大卖单信息
|
if code not in self.__code_big_sell_money_list_dict:
|
self.__code_big_sell_money_list_dict[code] = []
|
if len(item[2]) >= 5:
|
# 大卖单信息:(金额, 最新价格, 订单号)
|
self.__code_big_sell_money_list_dict[code].append((item[2][2], item[2][4], item[2][0]))
|
|
self.__latest_price[code] = item[2][4]
|
|
def get_code_money_dict(self):
|
return self.__code_money_dict
|
|
def get_money(self, code):
|
if code in self.__code_money_dict:
|
return self.__code_money_dict.get(code)
|
return 0
|
|
def get_money_info(self, code):
|
"""
|
获取代码流入信息
|
@param code: 代码信息
|
@return: 净流入金额,[大单买金额, 大单买数量],[大单卖金额,大单卖数量]
|
"""
|
return self.__code_money_dict.get(code), self.__code_buy_money_dict.get(code), self.__code_sell_money_dict.get(
|
code)
|
|
def set_money(self, code, money):
|
self.__code_money_dict[code] = money
|
|
def get_big_buy_money_list(self, code):
|
"""
|
获取代码的大买单列表
|
@param code:
|
@return:[(金额, 价格, 订单号)]
|
"""
|
return self.__code_big_buy_money_list_dict.get(code)
|
|
def get_big_sell_money_list(self, code):
|
"""
|
获取代码的大买单列表
|
@param code:
|
@return:[(金额, 价格, 订单号)]
|
"""
|
return self.__code_big_sell_money_list_dict.get(code)
|
|
def get_latest_price(self, code):
|
return self.__latest_price.get(code)
|
|
|
@tool.singleton
|
class BlockInMoneyRankManager:
|
"""
|
板块流入流出管理
|
"""
|
__mysql_db = mysql_data.Mysqldb()
|
__code_blocks = {}
|
|
__in_list = []
|
__out_list = []
|
# 最近这段时间的代码涨停次数
|
__history_code_limit_up_count = {}
|
# 被排除的代码
|
__exclude_codes = set()
|
|
def __load_codes(self):
|
codes = []
|
codes_sh, codes_sz = l1_subscript_codes_manager.get_codes(False)
|
for b in codes_sh:
|
codes.append(b.decode())
|
for b in codes_sz:
|
codes.append(b.decode())
|
return codes
|
|
def __load_blocks(self):
|
if self.codes:
|
for code in self.codes:
|
before_fblocks = LimitUpCodesBlockRecordManager().get_radical_buy_blocks(code)
|
if not before_fblocks:
|
before_fblocks = set()
|
fblocks = BlockMapManager().filter_blocks(before_fblocks)
|
if fblocks:
|
fblocks -= constant.KPL_INVALID_BLOCKS
|
self.__code_blocks[code] = fblocks
|
|
def __load_exclude_codes(self):
|
"""
|
获取之前4个交易日的数据
|
@return:
|
"""
|
max_day = tool.get_now_date_str()
|
min_day = tool.date_sub(max_day, 30)
|
sql = f"select * from (select distinct(r.`_day`) as 'day' from `kpl_limit_up_record` r where r.`_day`<'{max_day}' and r.`_day`>'{min_day}') a order by a.day desc limit 4"
|
results = self.__mysql_db.select_all(sql)
|
dates = [r[0] for r in results]
|
# 获取之前4天涨停次数>=2次的代码
|
day_codes = {}
|
for day in dates:
|
sql = f"select distinct(r._code) from kpl_limit_up_record r where r.`_day`='{day}'"
|
results = self.__mysql_db.select_all(sql)
|
day_codes[day] = set([x[0] for x in results])
|
codes_list = [day_codes[k] for k in day_codes]
|
# 统计代码的涨停天数
|
code_limit_up_count_dict = {}
|
for codes in codes_list:
|
for c in codes:
|
if c not in code_limit_up_count_dict:
|
code_limit_up_count_dict[c] = 0
|
code_limit_up_count_dict[c] += 1
|
self.__history_code_limit_up_count = code_limit_up_count_dict
|
self.load_today_limit_up_codes()
|
|
def load_today_limit_up_codes(self):
|
# 加载今日涨停代码
|
day = tool.get_now_date_str()
|
sql = f"select distinct(r._code) from kpl_limit_up_record r where r.`_day`='{day}'"
|
results = self.__mysql_db.select_all(sql)
|
codes = set([x[0] for x in results])
|
# 计算需要排除的代码
|
temp_limit_up_count = {}
|
# 统计总共涨停天数
|
for c in self.__history_code_limit_up_count:
|
if c in codes:
|
temp_limit_up_count[c] = self.__history_code_limit_up_count[c] + 1
|
else:
|
temp_limit_up_count[c] = self.__history_code_limit_up_count[c]
|
exclude_codes = set()
|
for c in temp_limit_up_count:
|
if temp_limit_up_count[c] < 3:
|
continue
|
exclude_codes.add(c)
|
self.__exclude_codes = exclude_codes
|
|
def __init__(self):
|
self.codes = self.__load_codes()
|
self.__load_blocks()
|
self.__load_exclude_codes()
|
print("排除的代码", self.__exclude_codes)
|
|
def get_codes(self):
|
return self.codes
|
|
def compute(self):
|
codes = self.codes
|
block_money = {}
|
for code in codes:
|
if code in self.__exclude_codes:
|
continue
|
money = CodeInMoneyManager().get_money(code)
|
if money is None:
|
continue
|
# 大自由流通市值的流出不算
|
if money < 0:
|
price = CodeInMoneyManager().get_latest_price(code)
|
zylt_volume = global_util.zylt_volume_map.get(code)
|
if price and zylt_volume and zylt_volume * price > 200e8:
|
continue
|
|
before_fblocks = LimitUpCodesBlockRecordManager().get_radical_buy_blocks(code)
|
if not before_fblocks:
|
before_fblocks = set()
|
fblocks = BlockMapManager().filter_blocks(before_fblocks)
|
if fblocks:
|
fblocks -= constant.KPL_INVALID_BLOCKS
|
for b in fblocks:
|
if b not in block_money:
|
block_money[b] = 0
|
block_money[b] += money
|
temp_list = [(x, block_money[x]) for x in block_money]
|
temp_list.sort(key=lambda x: x[1], reverse=True)
|
self.__in_list = temp_list
|
temp_list = copy.deepcopy(temp_list)
|
temp_list.sort(key=lambda x: x[1])
|
self.__out_list = temp_list
|
|
def get_block_codes_money(self, block):
|
"""
|
获取板块中代码的流入流出
|
@param block:
|
@return:(代码, 金额, 是否被排除)
|
"""
|
codes = self.codes
|
fdatas = []
|
for code in codes:
|
if code in self.__exclude_codes:
|
continue
|
money = CodeInMoneyManager().get_money(code)
|
if money is None:
|
continue
|
# 大自由流通市值的流出不算
|
if money < 0:
|
price = CodeInMoneyManager().get_latest_price(code)
|
zylt_volume = global_util.zylt_volume_map.get(code)
|
if price and zylt_volume and zylt_volume * price > 200e8:
|
continue
|
before_fblocks = LimitUpCodesBlockRecordManager().get_radical_buy_blocks(code)
|
if not before_fblocks:
|
before_fblocks = set()
|
fblocks = BlockMapManager().filter_blocks(before_fblocks)
|
if block not in fblocks:
|
continue
|
fdatas.append((code, money, code in self.__exclude_codes))
|
return fdatas
|
|
def get_in_list(self):
|
return self.__in_list
|
|
def get_out_list(self):
|
return self.__out_list
|
|
|
if __name__ == '__main__':
|
code = "600839"
|
before_fblocks = LimitUpCodesBlockRecordManager().get_radical_buy_blocks(code)
|
print(before_fblocks)
|
|
# print(CodeInMoneyManager().get_money("300264"))
|
# BlockInMoneyRankManager().compute()
|
# print(BlockInMoneyRankManager().get_in_list()[:20])
|
# print(BlockInMoneyRankManager().get_out_list()[:20])
|