Administrator
2025-06-09 1e16e3fdc6fafc66c4a0ae168d1b4e46b61e5a70
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
"""
自定义板块流入金额
"""
import copy
import itertools
import os
 
import constant
import l2_data_util
from db import mysql_data_delegate as mysql_data
from huaxin_client import l1_subscript_codes_manager
from third_data.kpl_data_constant import LimitUpCodesBlockRecordManager
from third_data.third_blocks_manager import BlockMapManager
from utils import tool, global_util
 
 
@tool.singleton
class CodeInMoneyManager:
    def __init__(self):
        # 总的净流入
        self.__code_money_dict = {}
        # 总买单信息:{"code":[金额, 数量]}
        self.__code_buy_money_dict = {}
        # 总卖单信息:{"code":[金额, 数量]}
        self.__code_sell_money_dict = {}
 
        # 净流入大单买金额
        self.__code_big_buy_money_list_dict = {}
        # 净流出大单卖金额
        self.__code_big_sell_money_list_dict = {}
        self.__latest_price = {}
        self.__load_data()
 
    def __load_data(self):
        _path = f"{constant.get_path_prefix()}/logs/huaxin_local/l2/transaction_big_order.{tool.get_now_date_str()}.log"
        if os.path.exists(_path):
            with open(_path) as f:
                lines = f.readlines()
                for line in lines:
                    line = line.split(" - ")[1]
                    item = eval(line)
                    self.add_data(item)
 
    def add_data(self, item):
        """
        添加数据
        @param item: (代码,类型, 订单数据)  订单数据:(订单号, 量, 金额, 时间, 最新成交价格)
        @return:
        """
        code = item[0]
        if code not in self.__code_money_dict:
            self.__code_money_dict[code] = 0
 
        if code not in self.__code_buy_money_dict:
            self.__code_buy_money_dict[code] = [0, 0]
 
        if code not in self.__code_sell_money_dict:
            self.__code_sell_money_dict[code] = [0, 0]
 
        if not tool.is_ge_code(code):
            big_money = l2_data_util.get_big_money_val(item[2][4])
            if item[2][2] < big_money:
                # 不是常规定义的大单就返回
                return
        if tool.is_ge_code(code) and item[2][2] < 299e4 and item[2][1] < 290000:
            return
        if item[1] == 0:
            # item[2]的数据结构:  (买单号, 量, 金额, 时间, 最新成交价格)
            self.__code_money_dict[code] += item[2][2]
            self.__code_buy_money_dict[code][0] += item[2][2]
            self.__code_buy_money_dict[code][1] += 1
 
            if code not in self.__code_big_buy_money_list_dict:
                self.__code_big_buy_money_list_dict[code] = []
            # 大买单信息:(金额, 最新价格, 订单号)
            if len(item[2]) >= 5:
                self.__code_big_buy_money_list_dict[code].append((item[2][2], item[2][4], item[2][0]))
        else:
            self.__code_money_dict[code] -= item[2][2]
            self.__code_sell_money_dict[code][0] += item[2][2]
            self.__code_sell_money_dict[code][1] += 1
            # 大卖单信息
            if code not in self.__code_big_sell_money_list_dict:
                self.__code_big_sell_money_list_dict[code] = []
            if len(item[2]) >= 5:
                # 大卖单信息:(金额, 最新价格, 订单号)
                self.__code_big_sell_money_list_dict[code].append((item[2][2], item[2][4], item[2][0]))
 
        self.__latest_price[code] = item[2][4]
 
    def get_code_money_dict(self):
        return self.__code_money_dict
 
    def get_money(self, code):
        if code in self.__code_money_dict:
            return self.__code_money_dict.get(code)
        return 0
 
    def get_money_info(self, code):
        """
        获取代码流入信息
        @param code: 代码信息
        @return: 净流入金额,[大单买金额, 大单买数量],[大单卖金额,大单卖数量]
        """
        return self.__code_money_dict.get(code), self.__code_buy_money_dict.get(code), self.__code_sell_money_dict.get(
            code)
 
    def set_money(self, code, money):
        self.__code_money_dict[code] = money
 
    def get_big_buy_money_list(self, code):
        """
        获取代码的大买单列表
        @param code:
        @return:[(金额, 价格, 订单号)]
        """
        return self.__code_big_buy_money_list_dict.get(code)
 
    def get_big_sell_money_list(self, code):
        """
        获取代码的大买单列表
        @param code:
        @return:[(金额, 价格, 订单号)]
        """
        return self.__code_big_sell_money_list_dict.get(code)
 
    def get_latest_price(self, code):
        return self.__latest_price.get(code)
 
 
@tool.singleton
class BlockInMoneyRankManager:
    """
    板块流入流出管理
    """
    __mysql_db = mysql_data.Mysqldb()
    __code_blocks = {}
 
    __in_list = []
    __out_list = []
    # 最近这段时间的代码涨停次数
    __history_code_limit_up_count = {}
    # 被排除的代码
    __exclude_codes = set()
 
    def __load_codes(self):
        codes = []
        codes_sh, codes_sz = l1_subscript_codes_manager.get_codes(False)
        for b in codes_sh:
            codes.append(b.decode())
        for b in codes_sz:
            codes.append(b.decode())
        return codes
 
    def __load_blocks(self):
        if self.codes:
            for code in self.codes:
                before_fblocks = LimitUpCodesBlockRecordManager().get_radical_buy_blocks(code)
                if not before_fblocks:
                    before_fblocks = set()
                fblocks = BlockMapManager().filter_blocks(before_fblocks)
                if fblocks:
                    fblocks -= constant.KPL_INVALID_BLOCKS
                self.__code_blocks[code] = fblocks
 
    def __load_exclude_codes(self):
        """
        获取之前4个交易日的数据
        @return:
        """
        max_day = tool.get_now_date_str()
        min_day = tool.date_sub(max_day, 30)
        sql = f"select * from (select distinct(r.`_day`) as 'day' from `kpl_limit_up_record` r where r.`_day`<'{max_day}' and r.`_day`>'{min_day}') a order by a.day desc limit 4"
        results = self.__mysql_db.select_all(sql)
        dates = [r[0] for r in results]
        # 获取之前4天涨停次数>=2次的代码
        day_codes = {}
        for day in dates:
            sql = f"select distinct(r._code) from kpl_limit_up_record r where r.`_day`='{day}'"
            results = self.__mysql_db.select_all(sql)
            day_codes[day] = set([x[0] for x in results])
        codes_list = [day_codes[k] for k in day_codes]
        # 统计代码的涨停天数
        code_limit_up_count_dict = {}
        for codes in codes_list:
            for c in codes:
                if c not in code_limit_up_count_dict:
                    code_limit_up_count_dict[c] = 0
                code_limit_up_count_dict[c] += 1
        self.__history_code_limit_up_count = code_limit_up_count_dict
        self.load_today_limit_up_codes()
 
    def load_today_limit_up_codes(self):
        # 加载今日涨停代码
        day = tool.get_now_date_str()
        sql = f"select distinct(r._code) from kpl_limit_up_record r where r.`_day`='{day}'"
        results = self.__mysql_db.select_all(sql)
        codes = set([x[0] for x in results])
        # 计算需要排除的代码
        temp_limit_up_count = {}
        # 统计总共涨停天数
        for c in self.__history_code_limit_up_count:
            if c in codes:
                temp_limit_up_count[c] = self.__history_code_limit_up_count[c] + 1
            else:
                temp_limit_up_count[c] = self.__history_code_limit_up_count[c]
        exclude_codes = set()
        for c in temp_limit_up_count:
            if temp_limit_up_count[c] < 3:
                continue
            exclude_codes.add(c)
        self.__exclude_codes = exclude_codes
 
    def __init__(self):
        self.codes = self.__load_codes()
        self.__load_blocks()
        self.__load_exclude_codes()
        print("排除的代码", self.__exclude_codes)
 
    def get_codes(self):
        return self.codes
 
    def compute(self):
        codes = self.codes
        block_money = {}
        for code in codes:
            if code in self.__exclude_codes:
                continue
            money = CodeInMoneyManager().get_money(code)
            if money is None:
                continue
            # 大自由流通市值的流出不算
            if money < 0:
                price = CodeInMoneyManager().get_latest_price(code)
                zylt_volume = global_util.zylt_volume_map.get(code)
                if price and zylt_volume and zylt_volume * price > 200e8:
                    continue
 
            before_fblocks = LimitUpCodesBlockRecordManager().get_radical_buy_blocks(code)
            if not before_fblocks:
                before_fblocks = set()
            fblocks = BlockMapManager().filter_blocks(before_fblocks)
            if fblocks:
                fblocks -= constant.KPL_INVALID_BLOCKS
            for b in fblocks:
                if b not in block_money:
                    block_money[b] = 0
                block_money[b] += money
        temp_list = [(x, block_money[x]) for x in block_money]
        temp_list.sort(key=lambda x: x[1], reverse=True)
        self.__in_list = temp_list
        temp_list = copy.deepcopy(temp_list)
        temp_list.sort(key=lambda x: x[1])
        self.__out_list = temp_list
 
    def get_block_codes_money(self, block):
        """
        获取板块中代码的流入流出
        @param block:
        @return:(代码, 金额, 是否被排除)
        """
        codes = self.codes
        fdatas = []
        for code in codes:
            if code in self.__exclude_codes:
                continue
            money = CodeInMoneyManager().get_money(code)
            if money is None:
                continue
            # 大自由流通市值的流出不算
            if money < 0:
                price = CodeInMoneyManager().get_latest_price(code)
                zylt_volume = global_util.zylt_volume_map.get(code)
                if price and zylt_volume and zylt_volume * price > 200e8:
                    continue
            before_fblocks = LimitUpCodesBlockRecordManager().get_radical_buy_blocks(code)
            if not before_fblocks:
                before_fblocks = set()
            fblocks = BlockMapManager().filter_blocks(before_fblocks)
            if block not in fblocks:
                continue
            fdatas.append((code, money, code in self.__exclude_codes))
        return fdatas
 
    def get_in_list(self):
        return self.__in_list
 
    def get_out_list(self):
        return self.__out_list
 
 
if __name__ == '__main__':
    code = "600839"
    before_fblocks = LimitUpCodesBlockRecordManager().get_radical_buy_blocks(code)
    print(before_fblocks)
 
    # print(CodeInMoneyManager().get_money("300264"))
    # BlockInMoneyRankManager().compute()
    # print(BlockInMoneyRankManager().get_in_list()[:20])
    # print(BlockInMoneyRankManager().get_out_list()[:20])