Administrator
6 天以前 abd510d66074ac640555c241b6343a53cca8f070
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
# l2数据工具
"""
L2数据处理工具包
"""
 
# 比较时间的大小
import json
import time
 
from db.redis_manager_delegate import RedisUtils
from utils.tool import async_call
 
from l2 import l2_data_manager
from utils import tool
 
 
def run_time():
    def decorator(func):
        def infunc(*args, **kwargs):
            start = round(time.time() * 1000)
            result = func(args, **kwargs)
            print("执行时间", round(time.time() * 1000) - start)
            return result
 
        return infunc
 
    return decorator
 
 
# 是否为大单
def is_big_money(val, is_ge=False):
    """
    判断是否为大单
    @param val: l2数据
    @param is_ge: 是否为创业板
    @return:
    """
    price = float(val["price"])
    money = round(price * val["num"], 2)
    if is_ge:
        if money >= 29900 or val["num"] >= 2999:
            return True
        else:
            return False
    else:
        if price > 3.0:
            if money >= 29900 or val["num"] >= 7999:
                return True
            else:
                return False
        else:
            max_money = price * 10000
            if money >= max_money * 0.95:
                return True
            else:
                return False
 
 
# 获取大资金的金额
def get_big_money_val(limit_up_price, is_ge=False):
    if is_ge:
        return min(299 * 10000, round(limit_up_price * 2900 * 100))
    else:
        if limit_up_price > 3.0:
            return min(299 * 10000, round(limit_up_price * 7999 * 100))
        else:
            max_money = limit_up_price * 10000 * 100
            return int(max_money * 0.95)
 
# if int(val["num"]) >= constant.BIG_MONEY_NUM:
#     return True
# if int(val["num"]) * limit_up_price >= constant.BIG_MONEY_AMOUNT:
#     return True
# return False_
 
 
def compare_time(time1, time2):
    result = int(time1.replace(":", "", 2)) - int(time2.replace(":", "", 2))
    return result
 
 
# 将key转为l2数据对象
def l2_data_key_2_obj(k, value):
    key = k.replace("l2-", "")
    split_data = key.split("-")
    code = split_data[0]
    operateType = split_data[1]
    time = split_data[2]
    num = split_data[3]
    price = split_data[4]
    limitPrice = split_data[5]
    cancelTime = split_data[6]
    cancelTimeUnit = split_data[7]
    item = {"operateType": operateType, "time": time, "num": num, "price": price, "limitPrice": limitPrice,
            "cancelTime": cancelTime, "cancelTimeUnit": cancelTimeUnit}
    json_value = json.loads(value)
    _data = {"key": key, "val": item, "re": json_value["re"], "index": int(json_value["index"])}
    return _data
 
 
# 减去时间
def __sub_time(time_str, seconds):
    time_seconds = get_time_as_seconds(time_str) - seconds
    h = time_seconds // 3600
    m = time_seconds % 3600 // 60
    s = time_seconds % 60
    return "{0:0>2}:{1:0>2}:{2:0>2}".format(h, m, s)
 
 
def get_time_as_seconds(time_str):
    times = time_str.split(":")
    time_seconds = int(times[0]) * 3600 + int(times[1]) * 60 + int(times[2])
    return time_seconds
 
 
# 计算时间的区间
def compute_time_space_as_second(cancel_time, cancel_time_unit):
    __time = int(cancel_time)
    if int(cancel_time) == 0:
        return 0, 0
    unit = int(cancel_time_unit)
    if unit == 0:
        # 秒
        return __time, (__time + 1)
    elif unit == 1:
        # 分钟
        return __time * 60, (__time + 1) * 60
    elif unit == 2:
        # 小时
        return __time * 3600, (__time + 1) * 3600
 
 
# 获取买入时间范围
def get_buy_time_range(cancel_data):
    # 计算时间区间
    min_space, max_space = compute_time_space_as_second(cancel_data["val"]["cancelTime"],
                                                        cancel_data["val"]["cancelTimeUnit"])
    max_time = __sub_time(cancel_data["val"]["time"], min_space)
    min_time = __sub_time(cancel_data["val"]["time"], max_space)
    return min_time, max_time
 
 
# 判断卖撤的卖信号是否在目标信号之前
def is_sell_index_before_target(sell_cancel_data, target_data, local_today_num_operate_map):
    min_space, max_space = compute_time_space_as_second(sell_cancel_data["val"]["cancelTime"],
                                                        sell_cancel_data["val"]["cancelTimeUnit"])
    max_time = __sub_time(sell_cancel_data["val"]["time"], min_space)
    min_time = __sub_time(sell_cancel_data["val"]["time"], max_space)
    # 如果最大值都在目标信号之前则信号肯定在目标信号之前
    if int(target_data["val"]["time"].replace(":", "")) > int(max_time.replace(":", "")):
        return True
    sell_datas = local_today_num_operate_map.get(
        "{}-{}-{}".format(sell_cancel_data["val"]["num"], "2", sell_cancel_data["val"]["price"]))
    if sell_datas:
        for i in range(0, len(sell_datas)):
            data = sell_datas[i]
            if int(data["val"]["operateType"]) != 2:
                continue
            if int(data["val"]["num"]) != int(sell_cancel_data["val"]["num"]):
                continue
            if min_space == 0 and max_space == 0:
                # 本秒内
                if compare_time(data["val"]["time"], min_time) == 0:
                    return data["index"] < target_data["index"]
            # 数据在正确的区间
            elif compare_time(data["val"]["time"], min_time) > 0 and compare_time(data["val"]["time"], max_time) <= 0:
                return data["index"] < target_data["index"]
    return False
 
 
__last_big_data = {}
 
 
@async_call
def save_big_data(code, same_time_nums, datas):
    latest_datas = __last_big_data.get(code)
    d1 = json.dumps(datas)
    d2 = json.dumps(latest_datas)
    if latest_datas is not None and d1.strip() == d2.strip():
        return None
    __last_big_data[code] = datas
    # 获取不一样的快照
    if latest_datas is not None:
        for i in range(len(d1)):
            if d1[i] != d2[i]:
                # 保存快照
                # logger_l2_big_data.debug("code:{} d1:{}  d2:{}", code, d1[i - 60: i + 30], d2[i - 60: i + 30])
                break
    time_str = tool.get_now_time_str()
 
    for time_ in same_time_nums:
        # 只保留最近3s内的大数据
        if abs(get_time_as_seconds(time_str) - get_time_as_seconds(time_)) > 3:
            continue
        if same_time_nums[time_] > 20:
            RedisUtils.setex(l2_data_manager._redisManager.getRedis(),
                             "big_data-{}-{}".format(code, int(round(time.time() * 1000))), tool.get_expire(),
                             d1)
            break
 
 
# 保存l2最新数据的大小
# @async_call
def save_l2_latest_data_number(code, num):
    RedisUtils.setex(l2_data_manager._redisManager.getRedis(), "l2_latest_data_num-{}".format(code), 3, num)
 
 
# 获取最新数据条数
def get_l2_latest_data_number(code):
    num = RedisUtils.get(l2_data_manager._redisManager.getRedis(), "l2_latest_data_num-{}".format(code))
    if num is not None:
        return int(num)
    return None
 
 
# l2数据拼接工具  暂时还未启用
class L2DataConcatUtil:
 
    # 初始化
    def __init__(self, code, last_datas, datas):
        self.last_datas = last_datas
        self.datas = datas
        self.code = code
 
    def __get_data_identity(self, data_):
        data = data_["val"]
        return "{}-{}-{}-{}-{}-{}".format(data.get("time"), data.get("num"), data.get("price"), data.get("operateType"),
                                          data.get("cancelTime"), data.get("cancelTimeUnit"))
 
    # 获取拼接的特征,获取最后3笔
    def __get_concat_feature(self):
        # 最少需要3条数据+2条需要有特征点的数据
        min_identity = 2
        min_count = 3
 
        identity_set = set()
        count = 0
        start_index = -1
        for i in range(len(self.last_datas) - 1, -1, -1):
            identity_set.add(self.__get_data_identity(self.last_datas[i]))
            count += 1
            start_index = i
            if count >= min_count and len(identity_set) >= min_identity:
                break
        return start_index, len(self.last_datas) - 1
 
    # 获取新增数据
    def get_add_datas(self):
        # 查询当前数据是否在最近一次数据之后
        if self.last_datas and self.datas:
            if int(self.datas[-1]["val"]["time"].replace(":", "")) - int(
                    self.last_datas[-1]["val"]["time"].replace(":", "")) < 0:
                return []
 
        # 获取拼接点
        start_index, end_index = self.__get_concat_feature()
        if start_index < 0:
            return self.datas
        print("特征位置:", start_index, end_index)
        # 提取特征点的标识数据
        identity_list = []
        for i in range(start_index, end_index + 1):
            identity_list.append(self.__get_data_identity(self.last_datas[i]))
 
        # 查找完整的特征
        identity_count = len(identity_list)
        for n in range(0, identity_count):
            # 每次遍历减少最前面一个特征量
            for i in range(0, len(self.datas) - len(identity_list) + n):
                if self.__get_data_identity(self.datas[i]) == identity_list[n]:
                    # n==0 表示完全匹配 , i=0 表示即使不是完全匹配,但必须新数据第一个元素匹配
                    if n == 0 or i == 0:
                        find_identity = True
                        for j in range(n + 1, len(identity_list)):
                            if identity_list[j] != self.__get_data_identity(self.datas[i + j - n]):
                                find_identity = False
                                break
 
                        if find_identity:
                            return self.datas[i + len(identity_list) - n:]
                else:
                    continue
        print("新数据中未找到特征标识")
        return self.datas
 
 
def test_add_datas():
    def load_data(datas):
        data_list = []
        for data in datas:
            data_list.append({"val": {"time": data}})
        return data_list
 
    # 不匹配
    latest_datas = []
    datas = ["10:00:02", "10:00:02", "10:00:03", "10:00:04", "10:00:05"]
    latest_datas = load_data(latest_datas)
    datas = load_data(datas)
    print(L2DataConcatUtil("000333", latest_datas, datas).get_add_datas())
 
    # 不匹配
    latest_datas = ["10:00:02"]
    datas = ["10:00:02", "10:00:02", "10:00:03", "10:00:04", "10:00:05"]
    latest_datas = load_data(latest_datas)
    datas = load_data(datas)
    print(L2DataConcatUtil("000333", latest_datas, datas).get_add_datas())
 
    # 不匹配
    latest_datas = ["10:00:00", "10:00:01", "10:00:02", "10:00:03"]
    datas = ["10:00:02", "10:00:02", "10:00:03", "10:00:04", "10:00:05"]
    latest_datas = load_data(latest_datas)
    datas = load_data(datas)
    print(L2DataConcatUtil("000333", latest_datas, datas).get_add_datas())
 
    # 匹配
    latest_datas = ["10:00:00", "10:00:01", "10:00:02", "10:00:03"]
    datas = ["10:00:01", "10:00:02", "10:00:03", "10:00:04", "10:00:05"]
    latest_datas = load_data(latest_datas)
    datas = load_data(datas)
    print(L2DataConcatUtil("000333", latest_datas, datas).get_add_datas())
 
    latest_datas = ["10:00:00", "10:00:01", "10:00:02", "10:00:02"]
    datas = ["10:00:02", "10:00:02", "10:00:03", "10:00:04", "10:00:05"]
    latest_datas = load_data(latest_datas)
    datas = load_data(datas)
    print(L2DataConcatUtil("000333", latest_datas, datas).get_add_datas())
 
    latest_datas = ["10:00:00", "10:00:01", "10:00:02", "10:00:02"]
    datas = ["10:00:02", "10:00:02", "10:00:00", "10:00:01", "10:00:02", "10:00:02", "10:00:04", "10:00:05"]
    latest_datas = load_data(latest_datas)
    datas = load_data(datas)
    print(L2DataConcatUtil("000333", latest_datas, datas).get_add_datas())
 
 
def test(datas):
    datas["code"] = "test"
 
 
if __name__ == "__main__":
    test_add_datas()