Administrator
65 分钟以前 2f2516749615da866e96d8d24e499b7ecbb63a3e
l2/transaction_progress.py
@@ -5,96 +5,140 @@
# 买入队列
import itertools
import json
import time
import constant
from db import redis_manager
import tool
from db import redis_manager_delegate as redis_manager
from db.redis_manager_delegate import RedisUtils
from utils import tool
import l2.l2_data_util
from log import logger_l2_trade_buy_queue
from log_module.log import logger_l2_trade_buy_queue, logger_l2_trade_buy_progress
class TradeBuyQueue:
    buy_progress_index_cache = {}
    latest_buy_progress_index_cache = {}
    # 成交速率
    trade_speed_cache = {}
    __db = 0
    __redis_manager = redis_manager.RedisManager(0)
    __instance = None
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(TradeBuyQueue, cls).__new__(cls, *args, **kwargs)
            cls.__load_datas()
        return cls.__instance
    def __init__(self):
        self.last_buy_queue_data = {}
    def __getRedis(self):
        return self.__redis_manager.getRedis()
    @classmethod
    def __get_redis(cls):
        return cls.__redis_manager.getRedis()
    @classmethod
    def __load_datas(cls):
        __redis = cls.__get_redis()
        try:
            keys = RedisUtils.keys(__redis, "trade_buy_progress_index-*")
            for k in keys:
                code = k.split("-")[-1]
                val = RedisUtils.get(__redis, k)
                val = json.loads(val)
                tool.CodeDataCacheUtil.set_cache(cls.buy_progress_index_cache, code, val)
        finally:
            RedisUtils.realse(__redis)
    def __save_buy_queue_data(self, code, num_list):
        key = "trade_buy_queue_data-{}".format(code)
        self.__getRedis().setex(key, tool.get_expire(), json.dumps((num_list, tool.get_now_time_str())))
        RedisUtils.setex(self.__get_redis(), key, tool.get_expire(), json.dumps((num_list, tool.get_now_time_str())))
    # 返回数据与更新时间
    def __get_buy_queue_data(self, code):
        key = "trade_buy_queue_data-{}".format(code)
        val = self.__getRedis().get(key)
        val = RedisUtils.get(self.__get_redis(), key)
        if val is None:
            return None, None
        val = json.loads(val)
        return val[0], [1]
    def __save_buy_progress_index(self, code, index, is_default):
        tool.CodeDataCacheUtil.set_cache(self.buy_progress_index_cache, code, (index, is_default))
        key = "trade_buy_progress_index-{}".format(code)
        self.__getRedis().setex(key, tool.get_expire(), json.dumps((index, is_default)))
        RedisUtils.setex_async(self.__db, key, tool.get_expire(), json.dumps((index, is_default)))
        # 返回数据与更新时间
    def __get_buy_progress_index(self, code):
        key = "trade_buy_progress_index-{}".format(code)
        val = self.__getRedis().get(key)
        val = RedisUtils.get(self.__get_redis(), key)
        if val is None:
            return None, True
        val = json.loads(val)
        return int(val[0]), bool(val[1])
    def __get_buy_progress_index_cache(self, code):
        cache_result = tool.CodeDataCacheUtil.get_cache(self.buy_progress_index_cache, code)
        if cache_result[0]:
            return cache_result[1]
        return None, True
    # 最近的非涨停买1的时间
    def __save_latest_not_limit_up_time(self, code, time_str):
        key = "latest_not_limit_up_time-{}".format(code)
        self.__getRedis().setex(key, tool.get_expire(), time_str)
        RedisUtils.setex(self.__get_redis(), key, tool.get_expire(), time_str)
    def __get_latest_not_limit_up_time(self, code):
        key = "latest_not_limit_up_time-{}".format(code)
        self.__getRedis().get(key)
        if not constant.TEST:
            return RedisUtils.get(self.__get_redis(), key)
        return None
    # 保存数据,返回保存数据的条数
    def save(self, code, limit_up_price, buy_1_price, buy_1_time, queues):
        # 如果买1不为涨停价就不需要保存
        if queues == self.last_buy_queue_data.get(code):
        # 2个以上的数据才有处理价值
        if not queues or len(queues) < 2:
            return None
        if abs(float(buy_1_price) - float(limit_up_price)) >= 0.01:
        # 如果买1不为涨停价就不需要保存
        old_queues = self.last_buy_queue_data.get(code)
        if old_queues and len(old_queues) == len(queues):
            # 元素相同就不需要再次处理
            old_str = ",".join([str(k) for k in old_queues[1:]])
            new_str = ",".join([str(k) for k in queues[1:]])
            if old_str == new_str:
                return None
        self.last_buy_queue_data[code] = queues
        if abs(float(buy_1_price) - float(limit_up_price)) >= 0.001:
            # 保存最近的涨停起始时间
            self.__save_latest_not_limit_up_time(code, buy_1_time)
            return None
        self.last_buy_queue_data[code] = queues
        min_num = round(constant.L2_MIN_MONEY / (limit_up_price * 100))
        num_list = []
        # 忽略第一条数据
        for i in range(1, len(queues)):
            num = queues[i]
            if num > min_num:
            if num > min_num and len(num_list) < 4:
                num_list.append(num)
        # 保存列表
        self.__save_buy_queue_data(code, num_list)
        return num_list
    # 保存成交索引
    def compute_traded_index(self, code, buy1_price, buyQueueBig):
    def compute_traded_index(self, code, buy1_price, buyQueueBig, exec_time=None):
        total_datas = l2.l2_data_util.local_today_datas.get(code)
        today_num_operate_map = l2.l2_data_util.local_today_num_operate_map.get(code)
        index = None
        for i in range(0, len(buyQueueBig)):
            buyQueueBigTemp = buyQueueBig[i:]
            if i > 0 and len(buyQueueBigTemp) < 2:
                # 已经执行过一次,且数据量小于2条就终止计算
                break
        if True:
            # 最多5个数据
            buyQueueBigTemp = buyQueueBig
            last_index, is_default = self.get_traded_index(code)
            c_last_index = 0
            if not is_default and last_index is not None:
                c_last_index = last_index
            latest_not_limit_up_time = self.__get_latest_not_limit_up_time(code)
            # 如果是3个/4个数据找不到就调整顺序
            fbuyQueueBigTempList = []
            if 3 <= len(buyQueueBigTemp) <= 4:
@@ -105,37 +149,67 @@
            fbuyQueueBigTempList.insert(0, buyQueueBigTemp)
            for temp in fbuyQueueBigTempList:
                try:
                    index = l2.l2_data_util.L2TradeQueueUtils.find_traded_progress_index(buy1_price, total_datas,
                    index = l2.l2_data_util.L2TradeQueueUtils.find_traded_progress_index(code, buy1_price, total_datas,
                                                                                         today_num_operate_map,
                                                                                         temp,
                                                                                         c_last_index,
                                                                                         self.__get_latest_not_limit_up_time(
                                                                                             code))
                                                                                         latest_not_limit_up_time
                                                                                         )
                    if index is not None:
                        break
                        # 判断位置是否大于执行位2s
                        if exec_time and tool.trade_time_sub(total_datas[index]["val"]["time"], exec_time) > 5:
                            # 位置是否大于执行位2s表示无效
                            index = None
                            continue
                        # 只能削减一半以下才能终止
                        if len(temp) * 2 < len(buyQueueBig):
                            index = None
                            break
                except:
                    pass
            if index is not None:
                break
        if index is not None:
            logger_l2_trade_buy_queue.info(f"确定交易进度:code-{code} index-{index}")
            # 保存成交进度
            # self.__save_buy_progress_index(code, index, False)
            return index
            if index is not None:
                logger_l2_trade_buy_queue.info(f"确定交易进度:code-{code} index-{index}")
                logger_l2_trade_buy_progress.info(
                    f"确定交易进度成功:code-{code}  index-{index} queues:{buyQueueBig}  last_index-{c_last_index} latest_not_limit_up_time-{latest_not_limit_up_time}  exec_time-{exec_time}")
                # 保存成交进度
                # self.__save_buy_progress_index(code, index, False)
                return index
            else:
                logger_l2_trade_buy_progress.warning(
                    f"确定交易进度失败:code-{code} queues:{buyQueueBig}  last_index-{c_last_index} latest_not_limit_up_time-{latest_not_limit_up_time} exec_time-{exec_time}")
        return index
    # 获取成交进度索引
    def get_traded_index(self, code):
        index, is_default = self.__get_buy_progress_index(code)
        index, is_default = self.__get_buy_progress_index_cache(code)
        return index, is_default
    def set_default_traded_index(self, code, index):
        self.__save_buy_progress_index(code, index, True)
    def set_traded_index(self, code, index):
    # 设置交易进度
    def set_traded_index(self, code, index, total_datas = None):
        last_info = self.latest_buy_progress_index_cache.get(code)
        # 交易进度是否改变
        traded_index_changed = False
        if not last_info or last_info[0] != index:
            if last_info and total_datas:
                val = total_datas[last_info[0]]['val']
                if time.time() - last_info[1] > 0:
                    rate = round(val["num"] * float(val["price"]) * 100 / (time.time() - last_info[1]))
                    # 成交速率
                    self.trade_speed_cache[code] = rate
            self.latest_buy_progress_index_cache[code] = (index, time.time())
            traded_index_changed = True
        self.__save_buy_progress_index(code, index, False)
        return traded_index_changed
    # 获取成交速率
    def get_trade_speed(self, code):
        return self.trade_speed_cache.get(code)
if __name__ == '__main':
    pass
if __name__ == '__main__':
    a = [1, 2, 3, 4]
    results = [str(k) for k in a]
    b = [1, 2, 3]
    result = (",".join([str(k) for k in a]) == ",".join([str(k) for k in b]))
    print(result)