Administrator
2024-04-17 b03f1ca7c2d77be46ca55c7fb68ae6215783840a
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
"""
下单信号管理
"""
import time
 
from l2 import l2_log
from l2.huaxin import l2_huaxin_util
from log_module.log import logger_l2_trade_buy, logger_debug
from utils import tool
 
 
class L2TradeSingleDataProcessor:
    """
    L2逐笔成交数据信号管理
    """
    __latest_sell_data = {}
 
    __latest_limit_up_sell_list_dict = {}
 
    __latest_limit_up_sell_order_no_set_dict = {}
 
    # 主动卖订单号集合
    __active_sell_order_no_set_dict = {}
 
    @classmethod
    def add_l2_delegate_limit_up_sell(cls, code, data):
        """
        添加涨停卖单数据,当前不是涨停成交时才加
        @param code:
        @param data:
        @return:
        """
        if code not in cls.__latest_limit_up_sell_list_dict:
            cls.__latest_limit_up_sell_list_dict[code] = []
        cls.__latest_limit_up_sell_list_dict[code].append(data)
        if code not in cls.__latest_limit_up_sell_order_no_set_dict:
            cls.__latest_limit_up_sell_order_no_set_dict[code] = set()
        cls.__latest_limit_up_sell_order_no_set_dict[code].add(data['val']['orderNo'])
        # 只保留前20的数据
        if len(cls.__latest_limit_up_sell_list_dict[code]) > 20:
            cls.__latest_limit_up_sell_list_dict[code] = cls.__latest_limit_up_sell_list_dict[code][-20:]
            # 删除之前的map
            for d in cls.__latest_limit_up_sell_list_dict[code][0:-20]:
                cls.__latest_limit_up_sell_order_no_set_dict[code].discard(d["val"]["orderNo"])
 
    @classmethod
    def add_l2_delegate_limit_up_sell_cancel(cls, code, order_no):
        """
        涨停卖撤
        @param code:
        @param order_no:
        @return:
        """
        if code not in cls.__latest_limit_up_sell_order_no_set_dict:
            return
        order_no_set = cls.__latest_limit_up_sell_order_no_set_dict[code]
        if order_no not in order_no_set:
            return
        cls.__latest_limit_up_sell_order_no_set_dict[code].discard(order_no)
 
        for i in range(0, len(cls.__latest_limit_up_sell_list_dict[code])):
            if cls.__latest_limit_up_sell_list_dict[code][i]['val']['orderNo'] == order_no:
                cls.__latest_limit_up_sell_list_dict[code].pop(i)
                break
 
    @classmethod
    def get_latest_limit_up_sell_order_count(cls, code):
        sell_list = cls.__latest_limit_up_sell_list_dict.get(code)
        if not sell_list:
            return 0
        return len(sell_list)
 
    @classmethod
    def process_passive_limit_up_sell_data(cls, code, datas, limit_up_price):
        """
        添加涨停被动卖成交数据
        @param data: 数据格式:(data['SecurityID'], data['TradePrice'], data['TradeVolume'],
        #           data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'],
        #           data['SellNo'], data['ExecType'])
        @return:
        """
        try:
            start_time = time.time()
            sell_list = cls.__latest_limit_up_sell_list_dict.get(code)
            if not sell_list:
                return
            sell_info = sell_list[-1]
            for data in datas:
                if data[6] < data[7] or data[1] != limit_up_price:
                    # 排除主动卖/非涨停卖
                    continue
                sell_no = data[7]
                if sell_no != sell_info['val']['orderNo']:
                    continue
                # 需要判断当前单是否已经成交完成
                if code not in cls.__latest_sell_data:
                    cls.__latest_sell_data[code] = [sell_no, data[2]]
                else:
                    if cls.__latest_sell_data[code][0] == sell_no:
                        cls.__latest_sell_data[code][1] += data[2]
                    else:
                        cls.__latest_sell_data[code] = [sell_no, data[2]]
                sell_info_num = sell_info['val']['num']
                deal_num = cls.__latest_sell_data[code][1] // 100
                if sell_info_num == deal_num:
                    use_time = round((time.time() - start_time) * 1000, 3)
                    l2_log.info(code, logger_l2_trade_buy,
                                f"找到最近的被动涨停卖单数据:{sell_info['val']['orderNo']}, 计算耗时:{use_time}ms, 可以触发下单")
                    # 成交完成
                    L2TradeSingleDataManager.set_latest_sell_data(code, data)
                    l2_log.info(code, logger_l2_trade_buy, "被动卖数据处理完毕")
                    break
                    # l2_log.info(code, logger_l2_trade_buy, f"找到最近的被动涨停卖单数据:{data['val']['orderNo']}, 可以触发下单")
        except Exception as e:
            logger_debug.exception(e)
 
    @classmethod
    def add_active_limit_up_sell_data(cls, data):
        """
        添加主动卖数据
        @param data:
        @return:
        """
        try:
            code = data[0]
            sell_no = data[7]
            if code not in cls.__active_sell_order_no_set_dict:
                cls.__active_sell_order_no_set_dict[code] = set()
            cls.__active_sell_order_no_set_dict[code].add(sell_no)
 
        except Exception as e:
            logger_debug.exception(e)
 
 
class L2TradeSingleCallback:
    """
    交易信号回调
    """
 
    def OnTradeSingle(self, code, _type, data):
        """
         交易数据信号回调
        @param code:
        @param _type: 类型:0-TYPE_PASSIVE  1-TYPE_ACTIVE
        @param data: (逐笔成交数据,生效时间)
        @return:
        """
 
 
class L2TradeSingleDataManager:
    __callback = None
 
    TYPE_PASSIVE = 0
    TYPE_ACTIVE = 1
 
    """
    买入信号管理
    """
    # 最近的涨停卖被动成交数据
    __latest_sell_data_dict = {}  # 格式为:{code:(逐笔成交数据,生效时间(带ms))}
 
    # 由被动向主动卖成交转变的数据
    __latest_sell_active_deal_data_dict = {}  # 格式为:{code:(逐笔成交数据,生效时间(带ms))}
 
    @classmethod
    def set_callback(cls, callback: L2TradeSingleCallback):
        cls.__callback = callback
 
    @classmethod
    def set_latest_sell_data(cls, code, data):
        """
        设置最近成交的涨停卖被动成交数据
        @param code: 代码
        @param data: L2逐笔成交数据  数据格式:(data['SecurityID'], data['TradePrice'], data['TradeVolume'],
        #           data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'],
        #           data['SellNo'], data['ExecType'])
        @return:
        """
        deal_time = l2_huaxin_util.convert_time(data[3], True)
        # 生效时间在1s以内
        cls.__latest_sell_data_dict[code] = (data, tool.trade_time_add_millionsecond(deal_time, 1000))
        if cls.__callback:
            cls.__callback.OnTradeSingle(code, cls.TYPE_PASSIVE, cls.__latest_sell_data_dict[code])
 
    @classmethod
    def set_sell_passive_to_active_datas(cls, code, passive_data, active_data):
        """
        设置被动卖成交向主动卖成交的转变数据
        @param code: 代码
        @param passive_data: 被动卖成交逐笔
        @param active_data: 主动卖成交逐笔
        @return:
        """
        l2_log.info(code, logger_l2_trade_buy, f"被动卖变主动卖:{passive_data} => {active_data}")
        deal_time = l2_huaxin_util.convert_time(passive_data[3], True)
        # 生效时间在1s以内
        cls.__latest_sell_active_deal_data_dict[code] = (
            active_data, tool.trade_time_add_millionsecond(deal_time, 1000))
        if cls.__callback:
            cls.__callback.OnTradeSingle(code, cls.TYPE_ACTIVE, cls.__latest_sell_active_deal_data_dict[code])
 
    @classmethod
    def get_valid_trade_single(cls, code, latest_time_with_ms):
        """
        获取有效的成交下单信号
        @param code:
        @param latest_time_with_ms:
        @return: (逐笔成交数据, 类型)
        """
        # 如果有最近卖涨停成交完就用最近的数据
        data = cls.__latest_sell_data_dict.get(code)
        if data and tool.trade_time_sub_with_ms(latest_time_with_ms, data[1]) <= 0:
            return data, cls.TYPE_PASSIVE
        data = cls.__latest_sell_active_deal_data_dict.get(code)
        if data and tool.trade_time_sub_with_ms(latest_time_with_ms, data[1]) <= 0:
            return data, cls.TYPE_ACTIVE
        return None
 
    @classmethod
    def is_can_place_order(cls, code, buy_data):
        """
        是否可以下单
        @param code: 代码
        @param buy_data: L2逐笔委托大于50w的买入数据
        @return: (是否可以下单, 原因)
        """
        buy_data_time_with_ms = tool.to_time_with_ms(buy_data['val']['time'], buy_data['val']['tms'])
        single = cls.get_valid_trade_single(code, buy_data_time_with_ms)
        if not single:
            return False, "没有找到可用信号"
        if single[0][0][6] < buy_data['val']['orderNo']:
            # 信号位置的成交买单号要小于买入数据的买单号才行
            return True, f"找到信号:{single}"
        else:
            return False, f"买单没有在信号位之后 信号:{single}"
 
    @classmethod
    def clear_data(cls, code):
        if code in cls.__latest_sell_data_dict:
            cls.__latest_sell_data_dict.pop(code)
 
        if code in cls.__latest_sell_active_deal_data_dict:
            cls.__latest_sell_active_deal_data_dict.pop(code)