Administrator
2025-06-09 8b7972581d0324e3f634b5b5a57a9ed7db1addaf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
'''
成交进度
'''
 
# 买入队列
import itertools
import json
import time
 
import constant
from db import redis_manager_delegate as redis_manager
from db.redis_manager_delegate import RedisUtils
from utils import tool
import l2.l2_data_util
from log_module.log import logger_l2_trade_buy_queue, logger_l2_trade_buy_progress
 
 
class TradeBuyQueue:
    buy_progress_index_cache = {}
    latest_buy_progress_index_cache = {}
    # 成交速率
    trade_speed_cache = {}
 
    __db = 0
    __redis_manager = redis_manager.RedisManager(0)
    __instance = None
 
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(TradeBuyQueue, cls).__new__(cls, *args, **kwargs)
            cls.__load_datas()
        return cls.__instance
 
    def __init__(self):
        self.last_buy_queue_data = {}
 
    @classmethod
    def __get_redis(cls):
        return cls.__redis_manager.getRedis()
 
    @classmethod
    def __load_datas(cls):
        __redis = cls.__get_redis()
        try:
            keys = RedisUtils.keys(__redis, "trade_buy_progress_index-*")
            for k in keys:
                code = k.split("-")[-1]
                val = RedisUtils.get(__redis, k)
                val = json.loads(val)
                tool.CodeDataCacheUtil.set_cache(cls.buy_progress_index_cache, code, val)
 
        finally:
            RedisUtils.realse(__redis)
 
    def __save_buy_queue_data(self, code, num_list):
        key = "trade_buy_queue_data-{}".format(code)
        RedisUtils.setex(self.__get_redis(), key, tool.get_expire(), json.dumps((num_list, tool.get_now_time_str())))
 
    # 返回数据与更新时间
    def __get_buy_queue_data(self, code):
        key = "trade_buy_queue_data-{}".format(code)
        val = RedisUtils.get(self.__get_redis(), key)
        if val is None:
            return None, None
        val = json.loads(val)
        return val[0], [1]
 
    def __save_buy_progress_index(self, code, index, is_default):
        tool.CodeDataCacheUtil.set_cache(self.buy_progress_index_cache, code, (index, is_default))
        key = "trade_buy_progress_index-{}".format(code)
        RedisUtils.setex_async(self.__db, key, tool.get_expire(), json.dumps((index, is_default)))
        # 返回数据与更新时间
 
    def __get_buy_progress_index(self, code):
        key = "trade_buy_progress_index-{}".format(code)
        val = RedisUtils.get(self.__get_redis(), key)
        if val is None:
            return None, True
        val = json.loads(val)
        return int(val[0]), bool(val[1])
 
    def __get_buy_progress_index_cache(self, code):
        cache_result = tool.CodeDataCacheUtil.get_cache(self.buy_progress_index_cache, code)
        if cache_result[0]:
            return cache_result[1]
        return None, True
 
    # 最近的非涨停买1的时间
    def __save_latest_not_limit_up_time(self, code, time_str):
        key = "latest_not_limit_up_time-{}".format(code)
        RedisUtils.setex(self.__get_redis(), key, tool.get_expire(), time_str)
 
    def __get_latest_not_limit_up_time(self, code):
        key = "latest_not_limit_up_time-{}".format(code)
        if not constant.TEST:
            return RedisUtils.get(self.__get_redis(), key)
        return None
 
    # 保存数据,返回保存数据的条数
    def save(self, code, limit_up_price, buy_1_price, buy_1_time, queues):
        # 2个以上的数据才有处理价值
        if not queues or len(queues) < 2:
            return None
        # 如果买1不为涨停价就不需要保存
        old_queues = self.last_buy_queue_data.get(code)
        if old_queues and len(old_queues) == len(queues):
            # 元素相同就不需要再次处理
            old_str = ",".join([str(k) for k in old_queues[1:]])
            new_str = ",".join([str(k) for k in queues[1:]])
            if old_str == new_str:
                return None
        self.last_buy_queue_data[code] = queues
 
        if abs(float(buy_1_price) - float(limit_up_price)) >= 0.001:
            # 保存最近的涨停起始时间
            self.__save_latest_not_limit_up_time(code, buy_1_time)
            return None
        min_num = round(constant.L2_MIN_MONEY / (limit_up_price * 100))
        num_list = []
        # 忽略第一条数据
        for i in range(1, len(queues)):
            num = queues[i]
            if num > min_num and len(num_list) < 4:
                num_list.append(num)
        # 保存列表
        self.__save_buy_queue_data(code, num_list)
        return num_list
 
    # 保存成交索引
    def compute_traded_index(self, code, buy1_price, buyQueueBig, exec_time=None):
        total_datas = l2.l2_data_util.local_today_datas.get(code)
        today_num_operate_map = l2.l2_data_util.local_today_num_operate_map.get(code)
        index = None
        if True:
            # 最多5个数据
            buyQueueBigTemp = buyQueueBig
            last_index, is_default = self.get_traded_index(code)
            c_last_index = 0
            if not is_default and last_index is not None:
                c_last_index = last_index
            latest_not_limit_up_time = self.__get_latest_not_limit_up_time(code)
            # 如果是3个/4个数据找不到就调整顺序
            fbuyQueueBigTempList = []
            if 3 <= len(buyQueueBigTemp) <= 4:
                buyQueueBigTempList = itertools.permutations(buyQueueBigTemp, len(buyQueueBigTemp))
                for tempQueue in buyQueueBigTempList:
                    if list(tempQueue) != buyQueueBigTemp:
                        fbuyQueueBigTempList.append(tempQueue)
            fbuyQueueBigTempList.insert(0, buyQueueBigTemp)
            for temp in fbuyQueueBigTempList:
                try:
                    index = l2.l2_data_util.L2TradeQueueUtils.find_traded_progress_index(code, buy1_price, total_datas,
                                                                                         today_num_operate_map,
                                                                                         temp,
                                                                                         c_last_index,
                                                                                         latest_not_limit_up_time
                                                                                         )
                    if index is not None:
                        # 判断位置是否大于执行位2s
                        if exec_time and tool.trade_time_sub(total_datas[index]["val"]["time"], exec_time) > 5:
                            # 位置是否大于执行位2s表示无效
                            index = None
                            continue
                        # 只能削减一半以下才能终止
                        if len(temp) * 2 < len(buyQueueBig):
                            index = None
                            break
                except:
                    pass
 
            if index is not None:
                logger_l2_trade_buy_queue.info(f"确定交易进度:code-{code} index-{index}")
                logger_l2_trade_buy_progress.info(
                    f"确定交易进度成功:code-{code}  index-{index} queues:{buyQueueBig}  last_index-{c_last_index} latest_not_limit_up_time-{latest_not_limit_up_time}  exec_time-{exec_time}")
                # 保存成交进度
                # self.__save_buy_progress_index(code, index, False)
                return index
            else:
                logger_l2_trade_buy_progress.warning(
                    f"确定交易进度失败:code-{code} queues:{buyQueueBig}  last_index-{c_last_index} latest_not_limit_up_time-{latest_not_limit_up_time} exec_time-{exec_time}")
        return index
 
    # 获取成交进度索引
    def get_traded_index(self, code):
        index, is_default = self.__get_buy_progress_index_cache(code)
        return index, is_default
 
    # 设置交易进度
    def set_traded_index(self, code, index, total_datas = None):
        last_info = self.latest_buy_progress_index_cache.get(code)
        # 交易进度是否改变
        traded_index_changed = False
        if not last_info or last_info[0] != index:
            if last_info and total_datas:
                val = total_datas[last_info[0]]['val']
                if time.time() - last_info[1] > 0:
                    rate = round(val["num"] * float(val["price"]) * 100 / (time.time() - last_info[1]))
                    # 成交速率
                    self.trade_speed_cache[code] = rate
            self.latest_buy_progress_index_cache[code] = (index, time.time())
            traded_index_changed = True
        self.__save_buy_progress_index(code, index, False)
        return traded_index_changed
 
    # 获取成交速率
    def get_trade_speed(self, code):
        return self.trade_speed_cache.get(code)
 
 
if __name__ == '__main__':
    a = [1, 2, 3, 4]
    results = [str(k) for k in a]
    b = [1, 2, 3]
    result = (",".join([str(k) for k in a]) == ",".join([str(k) for k in b]))
    print(result)