Administrator
4 天以前 48fb7a00951f91bdc707e5dd2d196e5bccb752c3
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
"""
安全笔数管理
"""
# 下单L2的安全笔数管理
import json
 
from db.redis_manager_delegate import RedisUtils
from l2 import l2_data_source_util
from trade import l2_trade_factor
from db import redis_manager_delegate as redis_manager
from utils import tool
from l2.l2_data_util import L2DataUtil, local_today_buyno_map
 
 
class BuyL2SafeCountManager(object):
    __db = 0
    __redis_manager = redis_manager.RedisManager(0)
    __instance = None
    latest_place_order_info_cache = {}
    safe_count_l2_cache = {}
 
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(BuyL2SafeCountManager, cls).__new__(cls, *args, **kwargs)
            cls.__load_datas()
        return cls.__instance
 
    def __init__(self):
        self.last_buy_queue_data = {}
 
    @classmethod
    def __get_redis(cls):
        return cls.__redis_manager.getRedis()
 
    @classmethod
    def __load_datas(cls):
        __redis = cls.__get_redis()
        try:
            keys = RedisUtils.keys(__redis, "safe_count_l2-*")
            for k in keys:
                ks = k.split("-")
                code, last_buy_single_index = ks[1], int(ks[2])
                val = RedisUtils.get(__redis, k)
                val = json.loads(val)
                tool.CodeDataCacheUtil.set_cache(cls.safe_count_l2_cache, f"{code}-{last_buy_single_index}", val)
            keys = RedisUtils.keys(__redis, "latest_place_order_info-*")
            for k in keys:
                ks = k.split("-")
                code = ks[-1]
                val = RedisUtils.get(__redis, k)
                val = json.loads(val)
                tool.CodeDataCacheUtil.set_cache(cls.latest_place_order_info_cache, code, val)
        finally:
            RedisUtils.realse(__redis)
 
    # 记录每一次的处理进度
    def __save_compute_progress(self, code, last_buy_single_index, process_index, buy_num, cancel_num):
        key = "safe_count_l2-{}-{}".format(code, last_buy_single_index)
        tool.CodeDataCacheUtil.set_cache(self.safe_count_l2_cache, f"{code}-{last_buy_single_index}",
                                         (last_buy_single_index, process_index, buy_num, cancel_num))
        RedisUtils.setex_async(self.__db, key, tool.get_expire(),
                               json.dumps((last_buy_single_index, process_index, buy_num, cancel_num)))
 
    # 返回数据与更新时间
    def __get_compute_progress(self, code, last_buy_single_index):
        key = "safe_count_l2-{}-{}".format(code, last_buy_single_index)
        val = RedisUtils.get(self.__get_redis(), key)
        if val is None:
            return None, -1, 0, 0
        val = json.loads(val)
        return val[0], val[1], val[2], val[3]
 
    def __get_compute_progress_cache(self, code, last_buy_single_index):
        cache_result = tool.CodeDataCacheUtil.get_cache(self.safe_count_l2_cache, f"{code}-{last_buy_single_index}")
        if cache_result[0]:
            return cache_result[1]
        return None, -1, 0, 0
 
    # 保存最近的下单信息
    def __save_latest_place_order_info(self, code, buy_single_index, buy_exec_index, cancel_index):
        tool.CodeDataCacheUtil.set_cache(self.latest_place_order_info_cache, code,
                                         (buy_single_index, buy_exec_index, cancel_index))
        key = "latest_place_order_info-{}".format(code)
        RedisUtils.setex_async(self.__db, key, tool.get_expire(),
                               json.dumps((buy_single_index, buy_exec_index, cancel_index)))
 
    def __get_latest_place_order_info(self, code):
        key = "latest_place_order_info-{}".format(code)
        val = RedisUtils.get(self.__get_redis(), key)
        if val is None:
            return None, None, None
        val = json.loads(val)
        return val[0], val[1], val[2]
 
    def __get_latest_place_order_info_cache(self, code):
        cache_result = tool.CodeDataCacheUtil.get_cache(self.latest_place_order_info_cache, code)
        if cache_result[0]:
            return cache_result[1]
        return None, None, None
 
    def __get_all_compute_progress(self, code):
        key_regex = f"safe_count_l2-{code}-*"
        keys = RedisUtils.keys(self.__get_redis(), key_regex)
        vals = []
        for k in keys:
            val = RedisUtils.get(self.__get_redis(), k)
            val = json.loads(val)
            vals.append(val)
        return vals
 
    def clear_data(self, code):
        key_regex = f"safe_count_l2-{code}-*"
        keys = RedisUtils.keys(self.__get_redis(), key_regex)
        for k in keys:
            RedisUtils.delete(self.__get_redis(), k)
 
        tool.CodeDataCacheUtil.clear_cache(self.latest_place_order_info_cache, code)
        key = f"latest_place_order_info-{code}"
        RedisUtils.delete_async(self.__db, key)
 
    # 获取基础的安全笔数
    def __get_base_save_count(self, code, is_first):
        return l2_trade_factor.L2TradeFactorUtil.get_safe_buy_count(code, is_first)
 
    # 获取最后的安全笔数
    def get_safe_count(self, code, is_first_code, rate):
        # rate = self.__get_rate(code)
        count, min_count, max_count = self.__get_base_save_count(code, is_first_code)
        # 第4次下单按第一次算
        # if place_order_count and place_order_count >= 3:
        #     rate = 1
        # print("--------------------------------")
        # print("安全笔数比例:", rate)
        # print("--------------------------------")
        # count, min_count, max_count = self.__get_base_save_count(code, is_first_code)
        # count = round(count * rate)
        # if count < min_count:
        #     count = min_count
        # if count > max_count:
        #     count = max_count
        return int(round(count * (1 + rate), 0))
 
    # 计算留下来的比例
    # last_buy_single_index 上一次下单信号起始位置
    # cancel_index 上一次取消下单的位置
    # start_index 数据开始位置
    # end_index 数据结束位置
    def compute_left_rate(self, code, start_index, end_index, total_datas,
                          local_today_num_operate_map):
        last_buy_single_index, buy_exec_index, cancel_index = self.__get_latest_place_order_info_cache(code)
        if last_buy_single_index is None:
            return
        cancel_time = None
        if cancel_index is not None:
            cancel_time = total_datas[cancel_index]["val"]["time"]
        # 获取处理的进度
        last_buy_single_index_, process_index, buy_num, cancel_num = self.__get_compute_progress_cache(code,
                                                                                                       last_buy_single_index)
 
        break_index = -1
        for i in range(start_index, end_index):
            data = total_datas[i]
            val = data["val"]
            # 如果没有取消位置就一直计算下去, 计算截至时间不能大于取消时间
            if cancel_time and int(cancel_time.replace(":", "")) < int(val["time"].replace(":", "")):
                break_index = i
                break
        if break_index >= 0:
            end_index = break_index - 1
        # 获取开始计算的位置
        start_compute_index = min(start_index, last_buy_single_index)
        if start_compute_index <= process_index:
            start_compute_index = process_index + 1
 
        for i in range(start_compute_index, end_index):
            data = total_datas[i]
            val = data["val"]
            if process_index >= i:
                continue
            if L2DataUtil.is_limit_up_price_buy(val):
                # 涨停买
                buy_num += int(val["num"]) * data["re"]
            elif L2DataUtil.is_limit_up_price_buy_cancel(val):
                # 获取买入信息
                buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data_v2(data, local_today_buyno_map.get(code))
                if buy_index is not None:
                    if last_buy_single_index <= buy_index <= end_index:
                        cancel_num += int(val["num"]) * data["re"]
 
        process_index = end_index
        # 保存处理进度与数量
        self.__save_compute_progress(code, last_buy_single_index, process_index, buy_num, cancel_num)
 
    # 获取比例
    def __get_rate(self, code):
        vals = self.__get_all_compute_progress(code)
        rate = (1 - 0)
        for val in vals:
            temp_rate = (1 - round((val[2] - val[3]) / val[2], 4))
            if temp_rate > 1:
                temp_rate = 1
            rate *= temp_rate
        return rate
 
    # 下单成功
    def save_place_order_info(self, code, buy_single_index, buy_exec_index, cancel_index):
        self.__save_latest_place_order_info(code, buy_single_index, buy_exec_index, cancel_index)