Administrator
4 天以前 48fb7a00951f91bdc707e5dd2d196e5bccb752c3
trade/deal_big_money_manager.py
@@ -3,80 +3,102 @@
"""
import json
import tool
from db import redis_manager
from db.redis_manager_delegate import RedisUtils
from utils import tool
from db import redis_manager_delegate as redis_manager
from l2 import l2_data_util, l2_data_source_util
__last_progress = {}
__redisManager = redis_manager.RedisManager(2)
# 成交进度计算
class DealOrderNoManager:
    __db = 2
    __redisManager = redis_manager.RedisManager(2)
    __deal_orderno_cache = {}
    __last_progress = {}
    __instance = None
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(DealOrderNoManager, cls).__new__(cls, *args, **kwargs)
            cls.__load_datas()
        return cls.__instance
    @classmethod
    def __get_redis(cls):
        return cls.__redisManager.getRedis()
    @classmethod
    def __load_datas(cls):
        __redis = cls.__get_redis()
        try:
            keys = RedisUtils.keys(__redis, "deal_orderno-*")
            for k in keys:
                code = k.split("-")[-1]
                val = RedisUtils.smembers(__redis, k)
                if val is None:
                    val = set()
                tool.CodeDataCacheUtil.set_cache(cls.__deal_orderno_cache, code, val)
        finally:
            RedisUtils.realse(__redis)
    # 添加订单号
    def __add_orderno(self, code, orderno):
        RedisUtils.sadd_async(self.__db, f"deal_orderno-{code}", orderno)
        RedisUtils.expire_async(self.__db, f"deal_orderno-{code}", tool.get_expire())
    # 移除订单号
    def __remove_orderno(self, code, orderno):
        RedisUtils.srem_async(self.__db, f"deal_orderno-{code}", orderno)
        RedisUtils.expire_async(self.__db, f"deal_orderno-{code}", tool.get_expire())
    # 清除数据
    def clear(self):
        self.__deal_orderno_cache.clear()
        keys = RedisUtils.keys(self.__get_redis(), "deal_orderno-*")
        for k in keys:
            RedisUtils.delete_async(self.__db, k)
    def remove_orderno(self, code, orderno):
        if code in self.__deal_orderno_cache:
            if orderno in self.__deal_orderno_cache[code]:
                self.__deal_orderno_cache[code].discard(orderno)
                self.__remove_orderno(code, orderno)
    def add_orderno(self, code, orderno):
        if code not in self.__deal_orderno_cache:
            self.__deal_orderno_cache[code] = set()
        self.__deal_orderno_cache[code].add(orderno)
        self.__add_orderno(code, orderno)
    # 设置成交进度
    def get_deal_nums(self, code, orderno_map: dict):
        if code not in self.__deal_orderno_cache:
            return 0
        total_num = 0
        for orderno in self.__deal_orderno_cache[code]:
            if orderno_map:
                if str(orderno) in orderno_map:
                    data = orderno_map[str(orderno)]
                    total_num += data["val"]["num"] * data["re"]
        return total_num
    def get_deal_ordernos(self, code):
        return self.__deal_orderno_cache.get(code)
def __get_redis():
    return __redisManager.getRedis()
def __get_cancel_data(code, buy_data, local_today_num_operate_map):
    val = buy_data['val']
    cancel_datas = local_today_num_operate_map.get(
        "{}-{}-{}".format(val["num"], "1", val["price"]))
    if cancel_datas:
        for cancel_data in cancel_datas:
            buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data(code, cancel_data,
                                                                                             local_today_num_operate_map)
            if buy_index == buy_data["index"]:
                return cancel_data
    return None
def __save_traded_index(code, index):
    __get_redis().sadd(f"deal_indexes-{code}", index)
    __get_redis().expire(f"deal_indexes-{code}", tool.get_expire())
def __get_traded_indexes(code):
    return __get_redis().smembers(f"deal_indexes-{code}")
# 获取成交的索引
def get_traded_indexes(code):
    return __get_traded_indexes(code)
# 获取成交计算进度
def __get_deal_compute_progress(code):
    val = __get_redis().get(f"deal_compute_info-{code}")
    if val is None:
        return -1, 0
    val = json.loads(val)
    return val[0], val[1]
# 设置成交进度
def __set_deal_compute_progress(code, index, money):
    __get_redis().setex(f"deal_compute_info-{code}", tool.get_expire(), json.dumps((index, money)))
# 设置成交进度
def set_trade_progress(code, progress, total_data, local_today_num_operate_map):
    if __last_progress.get(code) == progress:
        return
    __last_progress[code] = progress
    # 计算从开始位置到成交位置
    c_index, deal_num = __get_deal_compute_progress(code)
    for i in range(c_index + 1, progress):
        data = total_data[i]
        val = data['val']
        # 是否有大单
        if not l2_data_util.is_big_money(val):
            continue
        if l2_data_util.L2DataUtil.is_limit_up_price_buy(val):
            # 是否已经取消
            cancel_data = __get_cancel_data(code, data, local_today_num_operate_map)
            if cancel_data is None:
                deal_num += val["num"]
                __save_traded_index(code, data["index"])
    __set_deal_compute_progress(code, progress, deal_num)
# 统计大单总共成交手数
def get_deal_big_money_num(code):
    compute_index, num = __get_deal_compute_progress(code)
    return num
    val = DealOrderNoManager().get_deal_nums(code, l2_data_util.local_today_buyno_map.get(code))
    return val
# 统计大单成交笔数
def get_deal_big_money_count(code):
    ordernos = DealOrderNoManager().get_deal_ordernos(code)
    if ordernos:
        return len(ordernos)
    return 0
if __name__ == "__main__":
    pass