Administrator
2023-08-17 b4480ad745510c8e81c88c20fb67cb65eef79cc5
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
"""
成交大单管理
"""
import json
 
from db.redis_manager_delegate import RedisUtils
from utils import tool
from db import redis_manager_delegate as redis_manager
from l2 import l2_data_util, l2_data_source_util
 
 
class DealComputeProgressManager:
    __db = 2
    __redisManager = redis_manager.RedisManager(2)
    __deal_compute_progress_cache = {}
    __last_progress = {}
    __instance = None
 
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(DealComputeProgressManager, cls).__new__(cls, *args, **kwargs)
            cls.__load_datas()
        return cls.__instance
 
    @classmethod
    def __get_redis(cls):
        return cls.__redisManager.getRedis()
 
    @classmethod
    def __load_datas(cls):
        __redis = cls.__get_redis()
        try:
            keys = RedisUtils.keys(__redis, "deal_compute_info-*")
            for k in keys:
                code = k.split("-")[-1]
                val = RedisUtils.get(__redis, k)
                val = json.loads(val)
                tool.CodeDataCacheUtil.set_cache(cls.__deal_compute_progress_cache, code, val)
 
        finally:
            RedisUtils.realse(__redis)
 
    # 获取成交计算进度
    def __get_deal_compute_progress(self, code):
        val = RedisUtils.get(self.__get_redis(), f"deal_compute_info-{code}")
        if val is None:
            return -1, 0
        val = json.loads(val)
        return val[0], val[1]
 
    # 获取成交计算进度
    def get_deal_compute_progress_cache(self, code):
        cache_result = tool.CodeDataCacheUtil.get_cache(self.__deal_compute_progress_cache, code)
        if cache_result[0]:
            return cache_result[1]
        return -1, 0
 
    # 设置成交进度
    def __set_deal_compute_progress(self, code, index, money):
        tool.CodeDataCacheUtil.set_cache(self.__deal_compute_progress_cache, code, (index, money))
        RedisUtils.setex_async(self.__db, f"deal_compute_info-{code}", tool.get_expire(), json.dumps((index, money)))
 
    # 设置成交进度
    def set_trade_progress(self, code, progress, total_data, local_today_num_operate_map):
        if self.__last_progress.get(code) == progress:
            return
        self.__last_progress[code] = progress
        # 计算从开始位置到成交位置
        c_index, deal_num = self.get_deal_compute_progress_cache(code)
        process_index = c_index
        for i in range(c_index + 1, progress):
            data = total_data[i]
            val = data['val']
            process_index = i
            # 是否有大单
            if not l2_data_util.is_big_money(val):
                continue
            if l2_data_util.L2DataUtil.is_limit_up_price_buy(val):
                # 是否已经取消
                cancel_data = self.__get_cancel_data(code, data, local_today_num_operate_map)
                if cancel_data is None:
                    deal_num += val["num"] * data["re"]
                    self.__save_traded_index(code, data["index"])
 
        self.__set_deal_compute_progress(code, process_index, deal_num)
 
    def get_deal_big_money_num(self, code):
        if code in self.__deal_compute_progress_cache:
            return self.__deal_compute_progress_cache.get(code)[1]
        compute_index, num = self.get_deal_compute_progress_cache(code)
        return num
 
    def __get_cancel_data(self, code, buy_data, local_today_num_operate_map):
        val = buy_data['val']
        cancel_datas = local_today_num_operate_map.get(
            "{}-{}-{}".format(val["num"], "1", val["price"]))
        if cancel_datas:
            for cancel_data in cancel_datas:
                buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data(code, cancel_data,
                                                                                                 local_today_num_operate_map)
                if buy_index == buy_data["index"]:
                    return cancel_data
        return None
 
    def __save_traded_index(self, code, index):
        RedisUtils.sadd(self.__get_redis(), f"deal_indexes-{code}", index)
        RedisUtils.expire(self.__get_redis(), f"deal_indexes-{code}", tool.get_expire())
 
    def __get_traded_indexes(self, code):
        return RedisUtils.smembers(self.__get_redis(), f"deal_indexes-{code}")
 
    # 获取成交的索引
    def get_traded_indexes(self, code):
        return self.__get_traded_indexes(code)
 
 
def get_deal_big_money_num(code):
    val = DealComputeProgressManager().get_deal_compute_progress_cache(code)
    return val[1]