Administrator
2025-06-11 01696a5d8c2c3cf3062aa6a8ccbf123547c2dbf0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
import logging
import time
 
import dask
 
import constant
from cancel_strategy.s_l_h_cancel_strategy import HourCancelBigNumComputer
from cancel_strategy.s_l_h_cancel_strategy import LCancelBigNumComputer, LCancelRateManager
from cancel_strategy.s_l_h_cancel_strategy import SCancelBigNumComputer
from code_attribute import gpcode_manager
from l2 import l2_data_util, l2_data_manager, transaction_progress, l2_log, data_callback
from l2.cancel_buy_strategy import FCancelBigNumComputer, \
    NewGCancelBigNumComputer, \
    NBCancelBigNumComputer
from l2.huaxin import l2_huaxin_util
from l2.l2_data_manager import OrderBeginPosInfo
from l2.l2_data_manager_new import L2TradeDataProcessor
from l2.l2_data_util import L2DataUtil
from l2.l2_limitup_sell_data_manager import L2LimitUpSellDataManager
from l2.l2_transaction_data_manager import HuaXinBuyOrderManager, HuaXinSellOrderStatisticManager, BigOrderDealManager
from l2.place_order_single_data_manager import L2TradeSingleDataProcessor
from log_module import async_log_util
from log_module.log import hx_logger_l2_debug, logger_l2_trade_buy_queue, logger_debug, hx_logger_l2_upload, \
    logger_trade, logger_l2_trade
from trade import current_price_process_manager, trade_constant
import concurrent.futures
 
from trade.buy_radical import radical_buy_strategy
from trade.buy_radical.radical_buy_data_manager import RadicalBuyDataManager, EveryLimitupBigDealOrderManager
from utils import tool
 
 
class HuaXinTransactionDatasProcessor:
    __statistic_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=constant.HUAXIN_L2_MAX_CODES_COUNT + 2)
    __TradeBuyQueue = transaction_progress.TradeBuyQueue()
    # 非涨停成交时间
    __not_limit_up_time_dict = {}
 
    # 计算成交进度
    @classmethod
    def __compute_latest_trade_progress(cls, code, fdatas):
        buyno_map = l2_data_util.local_today_buyno_map.get(code)
        if not buyno_map:
            return None
        buy_progress_index = None
        for i in range(len(fdatas) - 1, -1, -1):
            d = fdatas[i]
            buy_no = f"{d[0][6]}"
            if buyno_map and buy_no in buyno_map:
                # 成交进度位必须是涨停买
                if L2DataUtil.is_limit_up_price_buy(buyno_map[buy_no]["val"]):
                    buy_progress_index = buyno_map[buy_no]["index"]
                break
        return buy_progress_index
 
    @classmethod
    def statistic_big_order_infos(cls, code, fdatas, order_begin_pos: OrderBeginPosInfo):
        """
        统计大单成交
        @param code:
        @param fdatas: 格式:[(数据本身, 是否主动买, 是否涨停, 总成交额, 不含ms时间,含ms时间)]
        @return:
        """
 
        def statistic_big_buy_data():
            use_time_list = []
            __start_time = time.time()
            buy_datas, bigger_buy_datas = HuaXinBuyOrderManager.statistic_big_buy_data(code, fdatas, limit_up_price)
            use_time_list.append((time.time() - __start_time, "买单统计"))
            if buy_datas:
                BigOrderDealManager().add_buy_datas(code, buy_datas)
                active_big_buy_orders = []
                if buy_datas:
                    for x in buy_datas:
                        if x[0] > x[6]:
                            # (买单号, 成交金额, 最后成交时间)
                            active_big_buy_orders.append((x[0], x[2], x[4]))
                EveryLimitupBigDealOrderManager.add_big_buy_order_deal(code, active_big_buy_orders)
                use_time_list.append((time.time() - __start_time, "买单统计结果处理"))
            try:
                is_placed_order = l2_data_manager.TradePointManager.is_placed_order(order_begin_pos)
                if is_placed_order:
                    if order_begin_pos and order_begin_pos.mode == OrderBeginPosInfo.MODE_RADICAL:
                        RadicalBuyDataManager.big_order_deal(code)
 
                    if bigger_buy_datas:
                        # 有大于50w的大单成交
                        buyno_map = l2_data_util.local_today_buyno_map.get(code)
                        if buyno_map:
                            for buy_data in bigger_buy_datas:
                                order_no = f"{buy_data[0]}"
                                if order_no in buyno_map:
                                    LCancelBigNumComputer().add_deal_index(code, buyno_map[order_no]["index"],
                                                                           order_begin_pos.buy_single_index)
            except Exception as e:
                logger_debug.exception(e)
            if use_time_list and use_time_list[-1][0] > 0.005:
                l2_log.info(code, hx_logger_l2_upload,
                            f"买单统计+处理耗时:{use_time_list[-1][0]}  详情:{use_time_list}")
 
            return buy_datas
 
        def statistic_big_sell_data():
            use_time_list = []
            __start_time = time.time()
            sell_datas = HuaXinSellOrderStatisticManager.statistic_big_sell_data(code, fdatas)
            if sell_datas:
                BigOrderDealManager().add_sell_datas(code, sell_datas)
            use_time_list.append((time.time() - __start_time, "卖单统计"))
            if use_time_list and use_time_list[-1][0] > 0.005:
                l2_log.info(code, hx_logger_l2_upload,
                            f"卖单统计+处理耗时:{use_time_list[-1][0]}  详情:{use_time_list}")
            return sell_datas
 
        def statistic_big_data(f1_, f2_):
            temp_data = f1_, f2_
            return temp_data
 
        limit_up_price = gpcode_manager.get_limit_up_price_as_num(code)
 
        if False and len(fdatas) > 100:
            # 并行处理买单与卖单
            # 超过100条数据才需要并行处理
            f1 = dask.delayed(statistic_big_buy_data)()
            f2 = dask.delayed(statistic_big_sell_data)()
            dask_result = dask.delayed(statistic_big_data)(f1, f2)
            buy_datas, sell_datas = dask_result.compute()
        else:
            buy_datas = statistic_big_buy_data()
            sell_datas = statistic_big_sell_data()
        # L撤的比例与买卖大单无直接关系了
        # if buy_datas or sell_datas:
        #     buy_money = BigOrderDealManager().get_total_buy_money(code)
        #     sell_money = BigOrderDealManager().get_total_sell_money(code)
        #     LCancelRateManager.set_big_num_deal_info(code, buy_money, sell_money)
 
    @classmethod
    def process_huaxin_transaction_datas(cls, code, o_datas):
        # 整形数据,格式:[(数据本身, 是否主动买, 是否涨停, 总成交额, 不含ms时间,含ms时间)]
        limit_up_price = gpcode_manager.get_limit_up_price_as_num(code)
        # q.append((data['SecurityID'], data['TradePrice'], data['TradeVolume'],
        #                   data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'],
        #                   data['SellNo'], data['ExecType']))
        fdatas = [
            [d, d[6] > d[7], limit_up_price == d[1], d[1] * d[2], '', '']
            for d in o_datas]
        temp_time_dict = {}
        for d in fdatas:
            if d[0][3] not in temp_time_dict:
                temp_time_dict[d[0][3]] = l2_huaxin_util.convert_time(d[0][3], with_ms=True)
            d[5] = temp_time_dict.get(d[0][3])
            d[4] = d[5][:8]
        temp_time_dict.clear()
 
        __start_time = time.time()
        # 设置成交价
        try:
            current_price_process_manager.set_trade_price(code, fdatas[-1][0][1])
            if not fdatas[-1][2]:
                if code not in cls.__not_limit_up_time_dict:
                    cls.__not_limit_up_time_dict[code] = fdatas[-1][5]
                last_time = cls.__not_limit_up_time_dict[code]
                # 炸板时间持续500ms以上算炸板
                if tool.trade_time_sub_with_ms(fdatas[-1][5], last_time) > 500:
                    # 没有涨停
                    EveryLimitupBigDealOrderManager.open_limit_up(code, f"最新成交价:{fdatas[-1][0][1]}")
                    radical_buy_strategy.clear_data(code, msg=f"没有涨停:{fdatas[-1][0]}")
            else:
                if code in cls.__not_limit_up_time_dict:
                    cls.__not_limit_up_time_dict.pop(code)
        except Exception as e:
            async_log_util.error(logger_debug, f"L2成交开板计算错误:{str(e)}")
 
        total_datas = l2_data_util.local_today_datas.get(code)
        use_time_list = []
        try:
            buyno_map = l2_data_util.local_today_buyno_map.get(code)
            if buyno_map is None:
                buyno_map = {}
 
            order_begin_pos = l2_data_manager.TradePointManager().get_buy_compute_start_data_cache(code)
            # 是否已经下单
            is_placed_order = l2_data_manager.TradePointManager.is_placed_order(order_begin_pos)
 
            _start_time = time.time()
            # 设置涨停卖成交数据
            L2LimitUpSellDataManager.set_deal_datas(code, fdatas)
            use_time_list.append(("统计涨停卖成交", time.time() - _start_time))
            _start_time = time.time()
            #  大单统计
            # cls.__statistic_thread_pool.submit(cls.statistic_big_order_infos, code, datas, order_begin_pos)
            try:
                cls.statistic_big_order_infos(code, fdatas, order_begin_pos)
            except Exception as e:
                async_log_util.error(hx_logger_l2_debug, f"统计大单出错:{str(e)}")
            use_time_list.append(("统计大单数据", time.time() - _start_time))
            _start_time = time.time()
 
            try:
                last_data = fdatas[-1]
                # 统计上板时间
                if last_data[1] and last_data[2]:
                    current_price_process_manager.set_latest_not_limit_up_time(code, last_data[5])
                if not last_data[1] and last_data[2]:
                    L2LimitUpSellDataManager.clear_data(code)
                big_sell_order_info = None
                # 统计卖单
                big_sell_order_info = HuaXinSellOrderStatisticManager.statistic_continue_limit_up_sell_transaction_datas(
                    code, fdatas,
                    limit_up_price)
 
                use_time_list.append(("处理卖单成交数据", time.time() - _start_time))
                _start_time = time.time()
 
                if is_placed_order:
 
                    LCancelBigNumComputer().set_big_sell_order_info(code, big_sell_order_info)
 
                    # need_cancel, cancel_msg = SCancelBigNumComputer().set_big_sell_order_info_for_cancel(code,
                    #                                                                                      big_sell_order_info,
                    #                                                                                      order_begin_pos)
                    need_cancel, cancel_msg = False, ""
                    cancel_type = None
                    if need_cancel:
                        cancel_msg = f"S撤:{cancel_msg}"
                        cancel_type = trade_constant.CANCEL_TYPE_S
                    if not need_cancel:
                        need_cancel, cancel_msg = FCancelBigNumComputer().need_cancel_for_p(code,
                                                                                            order_begin_pos)
                        cancel_type = trade_constant.CANCEL_TYPE_P
                    # 判断时间是否与本地时间相差5s以上
                    if tool.trade_time_sub(tool.get_now_time_str(), fdatas[-1][4]) > 10:
                        now_seconds = int(tool.get_now_time_str().replace(":", ""))
                        if now_seconds < int("093100"):  # or int("130000") <= now_seconds < int("130200"):
                            need_cancel, cancel_msg = True, f"成交时间与本地时间相差10S以上,{fdatas[-1][4]}"
                            cancel_type = trade_constant.CANCEL_TYPE_L2_DELAY
                    if need_cancel:
                        L2TradeDataProcessor.cancel_buy(code, cancel_msg, cancel_type=cancel_type)
 
                    # GCancelBigNumComputer().set_big_sell_order_info(code, big_sell_order_info)
                    use_time_list.append(("处理卖单相关撤数据", time.time() - _start_time))
                    _start_time = time.time()
                # 统计涨停卖成交
                HuaXinSellOrderStatisticManager.statistic_active_sell_deal_volume(code, fdatas, limit_up_price)
                use_time_list.append(("统计成交量数据", time.time() - _start_time))
            except Exception as e:
                async_log_util.error(logger_debug, f"卖单统计异常:{big_sell_order_info}")
                logger_debug.exception(e)
 
            _start_time = time.time()
            # if big_money_count > 0:
            #     LCancelRateManager.compute_big_num_deal_rate(code)
 
            buy_progress_index = cls.__compute_latest_trade_progress(code, fdatas)
 
            if buy_progress_index is not None:
                buy_progress_index_changed = cls.__TradeBuyQueue.set_traded_index(code, buy_progress_index,
                                                                                  total_datas)
                l2_log.info(code, logger_l2_trade_buy_queue, "获取成交位置成功: code-{} index-{}", code, buy_progress_index)
                if is_placed_order:
                    # NewGCancelBigNumComputer().set_trade_progress(code, order_begin_pos.buy_single_index,
                    #                                               buy_progress_index)
                    LCancelBigNumComputer().set_trade_progress(code, order_begin_pos.buy_single_index,
                                                               buy_progress_index,
                                                               total_datas)
                    cancel_result = FCancelBigNumComputer().need_cancel_for_deal_fast(code, buy_progress_index)
                    if cancel_result[0]:
                        L2TradeDataProcessor.cancel_buy(code, f"F撤:{cancel_result[1]}",
                                                        cancel_type=trade_constant.CANCEL_TYPE_F)
                    if not cancel_result[0]:
                        try:
                            cancel_result = NBCancelBigNumComputer().need_cancel(code, buy_progress_index)
                            if cancel_result[0]:
                                L2TradeDataProcessor.cancel_buy(code, f"大市值无大单撤:{cancel_result[1]}",
                                                                cancel_type=trade_constant.CANCEL_TYPE_NB)
                        except:
                            pass
 
                    if not cancel_result[0] and buy_progress_index_changed:
                        try:
                            cancel_result = FCancelBigNumComputer().need_cancel_for_w(code)
                            if cancel_result[0]:
                                L2TradeDataProcessor.cancel_buy(code, f"W撤:{cancel_result[1]}",
                                                                cancel_type=trade_constant.CANCEL_TYPE_W)
                        except:
                            pass
                    # SCancelBigNumComputer().set_transaction_index(code, order_begin_pos.buy_single_index,
                    #                                               buy_progress_index)
                    # HourCancelBigNumComputer().set_transaction_index(code, order_begin_pos.buy_single_index,
                    #                                                  buy_progress_index)
            else:
                pass
            if is_placed_order:
                # 触发L撤上重新计算
                LCancelBigNumComputer().re_compute_l_up_watch_indexes(code, order_begin_pos.buy_single_index)
            use_time_list.append(("处理成交进度相关撤", time.time() - _start_time))
 
        except Exception as e:
            logging.exception(e)
            hx_logger_l2_debug.exception(e)
        finally:
            use_time = int((time.time() - __start_time) * 1000)
            if use_time > 5:
                l2_log.info(code, hx_logger_l2_upload,
                            f"{code}处理成交用时:{use_time} 数据数量:{len(fdatas)}  详情:{use_time_list}")
 
    @classmethod
    def process_huaxin_transaction_datas_v2(cls, code, o_datas):
        """
        新版处理华鑫成交数据:
        尚未下单的时候异步统计成交,同步遍历获取最后一个涨停卖委托数据,当最后一个涨停卖成交的时候就是下单时机
        @param code:
        @param o_datas:
        @return:
        """
 
        def __process_placed_order():
            """
            处理处于下单状态的数据
            @return:
            """
            try:
                cls.statistic_big_order_infos(code, fdatas, order_begin_pos)
            except Exception as e:
                async_log_util.error(hx_logger_l2_debug, f"统计大单出错:{str(e)}")
            # 统计连续的卖单数据,用于撤单,只有当下单之后才会执行
            big_sell_order_info = HuaXinSellOrderStatisticManager.statistic_continue_limit_up_sell_transaction_datas(
                code, fdatas,
                limit_up_price)
            LCancelBigNumComputer().set_big_sell_order_info(code, big_sell_order_info)
            need_cancel, cancel_msg = False, ""
            cancel_type = None
            if not need_cancel:
                need_cancel, cancel_msg = FCancelBigNumComputer().need_cancel_for_p(code,
                                                                                    order_begin_pos)
                cancel_type = trade_constant.CANCEL_TYPE_P
            if need_cancel:
                L2TradeDataProcessor.cancel_buy(code, cancel_msg, cancel_type=cancel_type)
            # 统计涨停主动卖成交,为了F撤准备数据
            HuaXinSellOrderStatisticManager.statistic_active_sell_deal_volume(code, fdatas, limit_up_price)
            # 计算成交进度
            buy_progress_index = cls.__compute_latest_trade_progress(code, fdatas)
            if buy_progress_index is not None:
                total_datas = l2_data_util.local_today_datas.get(code)
                buy_progress_index_changed = cls.__TradeBuyQueue.set_traded_index(code, buy_progress_index,
                                                                                  total_datas)
                async_log_util.info(logger_l2_trade_buy_queue, "获取成交位置成功: code-{} index-{}", code,
                                    buy_progress_index)
                if is_placed_order:
                    LCancelBigNumComputer().set_trade_progress(code, order_begin_pos.buy_single_index,
                                                               buy_progress_index,
                                                               total_datas)
                    cancel_result = FCancelBigNumComputer().need_cancel_for_deal_fast(code, buy_progress_index)
                    if cancel_result[0]:
                        L2TradeDataProcessor.cancel_buy(code, f"F撤:{cancel_result[1]}",
                                                        cancel_type=trade_constant.CANCEL_TYPE_F)
 
        limit_up_price = gpcode_manager.get_limit_up_price_as_num(code)
        # =====格式化数据=====
        # 整形数据,格式:[(数据本身, 是否主动买, 是否涨停, 总成交额, 不含ms时间,含ms时间)]
        use_time_list = []
        __start_time = int(time.time() * 1000)
        fdatas = [
            [d, d[6] > d[7], limit_up_price == d[1], d[1] * d[2], '', '']
            for d in o_datas]
        temp_time_dict = {}
        for d in fdatas:
            if d[0][3] not in temp_time_dict:
                temp_time_dict[d[0][3]] = l2_huaxin_util.convert_time(d[0][3], with_ms=True)
            d[5] = temp_time_dict.get(d[0][3])
            d[4] = d[5][:8]
        temp_time_dict.clear()
        _start_time = int(time.time() * 1000)
        use_time_list.append((_start_time - __start_time, "数据整形"))
 
        try:
 
            # ======需要同步处理的数据========
            # 设置成交价
            try:
                current_price_process_manager.set_trade_price(code, fdatas[-1][0][1])
                if not fdatas[-1][2]:
                    # 没有涨停
                    EveryLimitupBigDealOrderManager.open_limit_up(code, f"最新成交价:{fdatas[-1][0][1]}")
                    radical_buy_strategy.clear_data(code)
            except:
                pass
 
            # 统计上板时间
            try:
                last_data = fdatas[-1]
                if last_data[1] and last_data[2]:
                    # 涨停主动买
                    current_price_process_manager.set_latest_not_limit_up_time(code, last_data[5])
                elif not last_data[1] and last_data[2]:
                    # 涨停主动卖
                    L2LimitUpSellDataManager.clear_data(code)
            except:
                pass
 
            # ==========处于委托状态就同步处理数据,没有下过单就异步处理数据==========
            order_begin_pos = l2_data_manager.TradePointManager().get_buy_compute_start_data_cache(code)
            is_placed_order = l2_data_manager.TradePointManager.is_placed_order(order_begin_pos)
            if is_placed_order:
                # 下过单了
                __process_placed_order()
            else:
                filter_datas = L2TradeSingleDataProcessor.filter_last_limit_up_sell_data(code, fdatas)
                _start_time = int(time.time() * 1000)
                use_time_list.append((_start_time - __start_time, "处理涨停卖"))
                # 回调数据
                if filter_datas is not None:
                    l2_log.info(code, logger_l2_trade, f"最后一笔涨停卖被吃:{filter_datas[0]}")
                    data_callback.l2_trade_single_callback.OnLastLimitUpSellDeal(code, filter_datas[0][0])
 
                    _start_time = int(time.time() * 1000)
                    use_time_list.append((_start_time - __start_time, "处理买入信号"))
 
                # 如果是被动买就更新成交进度
                if not fdatas[-1][1]:
                    buy_progress_index = cls.__compute_latest_trade_progress(code, fdatas)
                    if buy_progress_index is not None:
                        total_datas = l2_data_util.local_today_datas.get(code)
                        cls.__TradeBuyQueue.set_traded_index(code, buy_progress_index,
                                                             total_datas)
                # 如果数据量大于20条就采用线程池更新数据
                if len(fdatas) >= 20:
                    cls.__statistic_thread_pool.submit(cls.statistic_big_order_infos, code, fdatas, order_begin_pos)
                else:
                    cls.statistic_big_order_infos(code, fdatas, order_begin_pos)
 
                _start_time = int(time.time() * 1000)
                use_time_list.append((_start_time - __start_time, "统计大单"))
        except Exception as e:
            hx_logger_l2_debug.exception(e)
        finally:
            _start_time = int(time.time() * 1000)
            if _start_time - __start_time > 5:
                l2_log.info(code, hx_logger_l2_upload,
                            f"{code}处理成交用时:{_start_time - __start_time} 数据数量:{len(fdatas)}  详情:{use_time_list}")