Administrator
2023-08-07 e0ca7b43c17ebe25e718d5ca229c989f48340015
redis批量提交数据
9个文件已修改
267 ■■■■■ 已修改文件
code_attribute/big_money_num_manager.py 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
db/redis_manager.py 216 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/cancel_buy_strategy.py 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_manager.py 11 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/safe_count_manager.py 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/transaction_progress.py 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.py 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/trade_api_server.py 10 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/trade_manager.py 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
code_attribute/big_money_num_manager.py
@@ -7,7 +7,8 @@
from db.redis_manager import RedisUtils
from utils import tool
__redisManager = redis_manager.RedisManager(0)
__db = 0
__redisManager = redis_manager.RedisManager(__db)
# 是否为大单
@@ -25,7 +26,7 @@
    if code not in __big_money_cache:
        __big_money_cache[code] = 0
    __big_money_cache[code] += num
    RedisUtils.incrby_async(__redisManager.getRedis(), "big_money-{}".format(code), num)
    RedisUtils.incrby_async(__db, "big_money-{}".format(code), num)
# 设置过期时间
db/redis_manager.py
@@ -1,6 +1,8 @@
"""
redis管理器
"""
import logging
import queue
import time
from threading import Thread
@@ -31,214 +33,140 @@
class RedisUtils:
    __async_task_queue = queue.Queue()
    @classmethod
    def get(cls, redis_, key, auto_free=True):
    def exec(cls, method_name, key, lamada_method):
        __start_time = time.time()
        try:
            return redis_.get(key)
            return lamada_method()
        finally:
            logger_redis_debug.info("get({}):{}", round((time.time() - __start_time) * 1000, 3), key)
            if auto_free:
                # redis_.connection_pool.disconnect()
                pass
            logger_redis_debug.info("{}({}):{}", method_name, round((time.time() - __start_time) * 1000, 3), key)
    @classmethod
    def get(cls, redis_, key, auto_free=True):
        return cls.exec("get", key, lambda: redis_.get(key))
    @classmethod
    def scard(cls, redis_, key, auto_free=True):
        __start_time = time.time()
        try:
            return redis_.scard(key)
        finally:
            logger_redis_debug.info("scard({}):{}", round((time.time() - __start_time) * 1000, 3), key)
            if auto_free:
                # redis_.connection_pool.disconnect()
                pass
        return cls.exec("scard", key, lambda: redis_.scard(key))
    @classmethod
    def delete(cls, redis_, key, auto_free=True, _async=False):
        __start_time = time.time()
        try:
            return redis_.delete(key)
        finally:
            if not _async:
                logger_redis_debug.info("delete({}):{}", round((time.time() - __start_time) * 1000, 3), key)
            if auto_free:
                # redis_.connection_pool.disconnect()
                pass
        return cls.exec("delete", key, lambda: redis_.delete(key))
    @classmethod
    def delete_async(cls, redis_, key, auto_free=True):
    def delete_async(cls, db, key, auto_free=True):
        __start_time = time.time()
        Thread(target=lambda: cls.delete(redis_, key, auto_free,_async=True)).start()
        cls.add_async_task(db, "delete", (key))
        logger_redis_debug.info("delete_async({}):{}", round((time.time() - __start_time) * 1000, 3), key)
    @classmethod
    def keys(cls, redis_, key, auto_free=True):
        __start_time = time.time()
        try:
            logger_redis_debug.info("keys(start):{}", key)
            return redis_.keys(key)
        finally:
            logger_redis_debug.info("keys({}):{}", round((time.time() - __start_time) * 1000, 3), key)
            if auto_free:
                # redis_.connection_pool.disconnect()
                pass
        return cls.exec("keys", key, lambda: redis_.keys(key))
    @classmethod
    def set(cls, redis_, key, val, auto_free=True):
        __start_time = time.time()
        try:
            return redis_.set(key, val)
        finally:
            logger_redis_debug.info("set({}):{}", round((time.time() - __start_time) * 1000, 3), key)
            if auto_free:
                # redis_.connection_pool.disconnect()
                pass
        return cls.exec("set", key, lambda: redis_.set(key, val))
    @classmethod
    def setex(cls, redis_, key, expire, val, auto_free=True, _async = False):
        __start_time = time.time()
        try:
            return redis_.setex(key, expire, val)
        finally:
            if not _async:
                logger_redis_debug.info("setex({}):{}", round((time.time() - __start_time) * 1000, 3), key)
            if auto_free:
                # redis_.connection_pool.disconnect()
                pass
    def setex(cls, redis_, key, expire, val, auto_free=True, _async=False):
        return cls.exec("setex", key, lambda: redis_.setex(key, expire, val))
    @classmethod
    def setex_async(cls, redis_, key, expire, val, auto_free=True):
    def setex_async(cls, db, key, expire, val, auto_free=True):
        __start_time = time.time()
        Thread(target=lambda: cls.setex(redis_, key, expire, val, auto_free,_async=True)).start()
        cls.add_async_task(db, "setex", (key, expire, val))
        logger_redis_debug.info("setex_async({}):{}", round((time.time() - __start_time) * 1000, 3), key)
    @classmethod
    def setnx(cls, redis_, key, val, auto_free=True):
        __start_time = time.time()
        try:
            return redis_.setnx(key, val)
        finally:
            logger_redis_debug.info("setnx({}):{}", round((time.time() - __start_time) * 1000, 3), key)
            if auto_free:
                # redis_.connection_pool.disconnect()
                pass
        return cls.exec("setnx", key, lambda: redis_.setnx(key, val))
    @classmethod
    def expire(cls, redis_, key, expire, auto_free=True):
        __start_time = time.time()
        try:
            return redis_.expire(key, expire)
        finally:
            logger_redis_debug.info("expire({}):{}", round((time.time() - __start_time) * 1000, 3), key)
            if auto_free:
                # redis_.connection_pool.disconnect()
                pass
        return cls.exec("expire", key, lambda: redis_.expire(key, expire))
    @classmethod
    def sadd(cls, redis_, key, val, auto_free=True):
        __start_time = time.time()
        try:
            return redis_.sadd(key, val)
        finally:
            logger_redis_debug.info("sadd({}):{}", round((time.time() - __start_time) * 1000, 3), key)
            if auto_free:
                # redis_.connection_pool.disconnect()
                pass
        return cls.exec("sadd", key, lambda: redis_.sadd(key, val))
    @classmethod
    def sismember(cls, redis_, key, val, auto_free=True):
        __start_time = time.time()
        try:
            return redis_.sismember(key, val)
        finally:
            logger_redis_debug.info("sismember({}):{}", round((time.time() - __start_time) * 1000, 3), key)
            if auto_free:
                # redis_.connection_pool.disconnect()
                pass
        return cls.exec("sismember", key, lambda: redis_.sismember(key, val))
    @classmethod
    def smembers(cls, redis_, key, auto_free=True):
        __start_time = time.time()
        try:
            return redis_.smembers(key)
        finally:
            logger_redis_debug.info("smembers({}):{}", round((time.time() - __start_time) * 1000, 3), key)
            if auto_free:
                # redis_.connection_pool.disconnect()
                pass
        return cls.exec("smembers", key, lambda: redis_.smembers(key))
    @classmethod
    def srem(cls, redis_, key, val, auto_free=True):
        __start_time = time.time()
        try:
            return redis_.srem(key, val)
        finally:
            logger_redis_debug.info("srem({}):{}", round((time.time() - __start_time) * 1000, 3), key)
            if auto_free:
                # redis_.connection_pool.disconnect()
                pass
        return cls.exec("srem", key, lambda: redis_.srem(key, val))
    @classmethod
    def incrby(cls, redis_, key, num, auto_free=True, _async=False):
        __start_time = time.time()
        try:
            return redis_.incrby(key, num)
        finally:
            if not _async:
                logger_redis_debug.info("incrby({}):{}", round((time.time() - __start_time) * 1000, 3), key)
            if auto_free:
                # redis_.connection_pool.disconnect()
                pass
        return cls.exec("incrby", key, lambda: redis_.incrby(key, num))
    @classmethod
    def incrby_async(cls, redis_, key, num, auto_free=True):
    def incrby_async(cls, db, key, num, auto_free=True):
        __start_time = time.time()
        Thread(target=lambda: cls.incrby(redis_, key, num, auto_free)).start()
        cls.add_async_task(db, "incrby", (key, num))
        logger_redis_debug.info("incrby_async({}):{}", round((time.time() - __start_time) * 1000, 3), key)
    @classmethod
    def lpush(cls, redis_, key, val, auto_free=True):
        __start_time = time.time()
        try:
            return redis_.lpush(key, val)
        finally:
            logger_redis_debug.info("lpush({}):{}", round((time.time() - __start_time) * 1000, 3), key)
            if auto_free:
                # redis_.connection_pool.disconnect()
                pass
        return cls.exec("lpush", key, lambda: redis_.lpush(key, val))
    @classmethod
    def lpop(cls, redis_, key, auto_free=True):
        __start_time = time.time()
        try:
            return redis_.lpop(key)
        finally:
            logger_redis_debug.info("lpop({}):{}", round((time.time() - __start_time) * 1000, 3), key)
            if auto_free:
                # redis_.connection_pool.disconnect()
                pass
        return cls.exec("lpop", key, lambda: redis_.lpop(key))
    @classmethod
    def rpush(cls, redis_, key, val, auto_free=True):
        __start_time = time.time()
        try:
            return redis_.rpush(key, val)
        finally:
            logger_redis_debug.info("rpush({}):{}", round((time.time() - __start_time) * 1000, 3), key)
            if auto_free:
                # redis_.connection_pool.disconnect()
                pass
        return cls.exec("rpush", key, lambda: redis_.rpush(key, val))
    @classmethod
    def realse(cls, redis_):
        pass
    @classmethod
    def add_async_task(cls, db, method, args):
        cls.__async_task_queue.put_nowait((db, method, args))
    @classmethod
    def get_async_task_count(cls):
        return cls.__async_task_queue.qsize()
    # 运行异步任务
    @classmethod
    def run_loop(cls):
        while True:
            try:
                data = cls.__async_task_queue.get()
                if data:
                    db = data[0]
                    method_name = data[1]
                    print(db,method_name)
                    args = data[2]
                    _redis = RedisManager(db).getRedisNoPool()
                    method = getattr(_redis, method_name)
                    if type(args) == tuple:
                        result = method(*args)
                        print(result)
                    else:
                        result = method(args)
                        print(result)
            except Exception as e1:
                logging.exception(e1)
                pass
if __name__ == "__main__":
    time1 = time.time()
    RedisUtils.setex_async(RedisManager(0).getRedis(), "test123123", tool.get_expire(), "123213")
    print(time.time() - time1)
    input()
    RedisUtils.setex_async(0, "test", tool.get_expire(), "123123")
    print("大小",RedisUtils.get_async_task_count())
    RedisUtils.incrby_async(0, "test_1", 1)
    print("大小", RedisUtils.get_async_task_count())
    RedisUtils.delete_async(0, "test")
    print("大小", RedisUtils.get_async_task_count())
    RedisUtils.run_loop()
l2/cancel_buy_strategy.py
@@ -27,6 +27,7 @@
class SecondCancelBigNumComputer:
    __db = 0
    __redis_manager = redis_manager.RedisManager(0)
    __sCancelParamsManager = l2_trade_factor.SCancelParamsManager
    __s_big_num_cancel_compute_data_cache = {}
@@ -66,7 +67,7 @@
        CodeDataCacheUtil.clear_cache(cls.__s_big_num_cancel_compute_data_cache, code)
        ks = ["s_big_num_cancel_compute_data-{}".format(code)]
        for key in ks:
            RedisUtils.delete_async(cls.__get_redis(), key)
            RedisUtils.delete_async(cls.__db, key)
    @classmethod
    def clear_data(cls):
@@ -749,6 +750,7 @@
# 计算 成交位->真实下单位置 总共还剩下多少手没有撤单
# 成交位变化之后重新计算
class DCancelBigNumComputer:
    __db = 0
    __redis_manager = redis_manager.RedisManager(0)
    __cancel_real_order_index_cache = {}
@@ -764,7 +766,7 @@
    @classmethod
    def __del_real_order_index(cls, code):
        CodeDataCacheUtil.clear_cache(cls.__cancel_real_order_index_cache, code)
        RedisUtils.delete_async(cls.__get_redis(), f"d_cancel_real_order_index-{code}")
        RedisUtils.delete_async(cls.__db, f"d_cancel_real_order_index-{code}")
    @classmethod
    def __get_real_order_index(cls, code):
@@ -846,6 +848,7 @@
# ---------------------------------L撤-------------------------------
# 计算成交位置之后的大单(特定笔数)的撤单比例
class LCancelBigNumComputer:
    __db = 0
    __redis_manager = redis_manager.RedisManager(0)
    __last_trade_progress_dict = {}
    __cancel_watch_index_cache = {}
@@ -882,7 +885,7 @@
    @classmethod
    def del_watch_index(cls, code):
        CodeDataCacheUtil.clear_cache(cls.__cancel_watch_index_cache, code)
        RedisUtils.delete_async(cls.__get_redis(), f"l_cancel_watch_index-{code}")
        RedisUtils.delete_async(cls.__db, f"l_cancel_watch_index-{code}")
    @classmethod
    def clear(cls, code=None):
l2/l2_data_manager.py
@@ -9,6 +9,7 @@
from log_module.log import logger_l2_trade_buy
from utils.tool import CodeDataCacheUtil
_db = 1
_redisManager = redis_manager.RedisManager(1)
@@ -42,7 +43,7 @@
    @staticmethod
    def delete_buy_point(code):
        CodeDataCacheUtil.clear_cache(TradePointManager.__buy_compute_index_info_cache, code)
        RedisUtils.delete_async(TradePointManager.__get_redis(), "buy_compute_index_info-{}".format(code))
        RedisUtils.delete_async(_db, "buy_compute_index_info-{}".format(code))
    # 获取买入点信息
    # 返回数据为:买入点 累计纯买额 已经计算的数据索引
@@ -86,7 +87,7 @@
                     volume_rate)
        CodeDataCacheUtil.set_cache(TradePointManager.__buy_compute_index_info_cache, code, data_)
        RedisUtils.setex_async(
            TradePointManager.__get_redis(), _key, expire,
            _db, _key, expire,
            json.dumps(data_))
    # 获取撤买入开始计算的信息
@@ -110,7 +111,7 @@
    # 删除买撤点数据
    @classmethod
    def delete_buy_cancel_point(cls, code):
        RedisUtils.delete_async(TradePointManager.__get_redis(), "buy_cancel_single_pos-{}".format(code))
        RedisUtils.delete_async(_db, "buy_cancel_single_pos-{}".format(code))
    # 设置买撤纯买额
    @classmethod
@@ -131,7 +132,7 @@
    @classmethod
    def delete_compute_info_for_cancel_buy(cls, code):
        RedisUtils.delete_async(TradePointManager.__get_redis(), "compute_info_for_cancel_buy-{}".format(code))
        RedisUtils.delete_async(_db, "compute_info_for_cancel_buy-{}".format(code))
    # 从买入信号开始设置涨停买与涨停撤的单数
    @classmethod
@@ -153,7 +154,7 @@
    @classmethod
    def delete_count_info_for_cancel_buy(cls, code):
        RedisUtils.delete_async(TradePointManager.__get_redis(), "count_info_for_cancel_buy-{}".format(code))
        RedisUtils.delete_async(_db, "count_info_for_cancel_buy-{}".format(code))
# 清除l2数据
l2/safe_count_manager.py
@@ -16,6 +16,7 @@
class BuyL2SafeCountManager(object):
    __db = 0
    __redis_manager = redis_manager.RedisManager(0)
    def __init__(self):
@@ -29,7 +30,7 @@
        key = "safe_count_l2-{}-{}".format(code, last_buy_single_index)
        tool.CodeDataCacheUtil.set_cache(safe_count_l2_cache, f"{code}-{last_buy_single_index}",
                                         (last_buy_single_index, process_index, buy_num, cancel_num))
        RedisUtils.setex_async(self.__getRedis(), key, tool.get_expire(),
        RedisUtils.setex_async(self.__db, key, tool.get_expire(),
                               json.dumps((last_buy_single_index, process_index, buy_num, cancel_num)))
    # 返回数据与更新时间
@@ -91,7 +92,7 @@
        tool.CodeDataCacheUtil.clear_cache(latest_place_order_info_cache, code)
        key = f"latest_place_order_info-{code}"
        RedisUtils.delete_async(self.__getRedis(), key)
        RedisUtils.delete_async(self.__db, key)
    # 获取基础的安全笔数
    def __get_base_save_count(self, code, is_first):
l2/transaction_progress.py
@@ -17,6 +17,7 @@
class TradeBuyQueue:
    __db = 0
    __redis_manager = redis_manager.RedisManager(0)
    def __init__(self):
@@ -41,7 +42,7 @@
    def __save_buy_progress_index(self, code, index, is_default):
        tool.CodeDataCacheUtil.set_cache(buy_progress_index_cache, code, (index, is_default))
        key = "trade_buy_progress_index-{}".format(code)
        RedisUtils.setex_async(self.__getRedis(), key, tool.get_expire(), json.dumps((index, is_default)))
        RedisUtils.setex_async(self.__db, key, tool.get_expire(), json.dumps((index, is_default)))
        # 返回数据与更新时间
    def __get_buy_progress_index(self, code):
main.py
@@ -4,6 +4,7 @@
import multiprocessing
from db import redis_manager
from log_module.log import logger_l2_process_time
from server import *
@@ -25,6 +26,10 @@
    t1 = threading.Thread(target=trade_api_server.run, args=(pipe,), daemon=True)
    t1.start()
    # redis后台服务
    t1 = threading.Thread(target=redis_manager.RedisUtils.run_loop,  daemon=True)
    t1.start()
    # 交易服务
    trade_server.run()
trade/huaxin/trade_api_server.py
@@ -344,6 +344,12 @@
                        except:
                            fdata["mysql"] = 0
                        try:
                            # redis异步任务数量
                            fdata["redis_async_task_count"] = redis_manager.RedisUtils.get_async_task_count()
                        except:
                            pass
                        # 获取CPU与内存适用情况
                        memory_info = psutil.virtual_memory()
                        cpu_percent = psutil.cpu_percent(interval=1)
@@ -447,14 +453,14 @@
                    if type_ == "want_list":
                        print("want_list before", gpcode_manager.WantBuyCodesManager().list_code_cache())
                        gpcode_manager.WantBuyCodesManager().sync()
                        print("want_list after",gpcode_manager.WantBuyCodesManager().list_code_cache())
                        print("want_list after", gpcode_manager.WantBuyCodesManager().list_code_cache())
                    elif type_ == "white_list":
                        print("white_list before", l2_trade_util.WhiteListCodeManager().list_codes_cache())
                        l2_trade_util.WhiteListCodeManager().sync()
                        print("white_list after", l2_trade_util.WhiteListCodeManager().list_codes_cache())
                    elif type_ == "black_list":
                        print("black_list before", l2_trade_util.BlackListCodeManager().list_codes_cache())
                        l2_trade_util. BlackListCodeManager().sync()
                        l2_trade_util.BlackListCodeManager().sync()
                        print("black_list after", l2_trade_util.BlackListCodeManager().list_codes_cache())
                    elif type_ == "pause_buy_list":
                        print("pause_buy_list before", gpcode_manager.PauseBuyCodesManager().list_code_cache())
trade/trade_manager.py
@@ -23,6 +23,7 @@
trade_gui = import_util.import_lib("trade.trade_gui")
__db = 2
__redis_manager = redis_manager.RedisManager(2)
# 未交易
@@ -196,7 +197,7 @@
def set_trade_state(code, state):
    logger_trade.info("set_trade_state {}-{}".format(code, state))
    tool.CodeDataCacheUtil.set_cache(__trade_state_cache, code, state)
    RedisUtils.setex_async(__redis_manager.getRedis(), "trade-state-{}".format(code), tool.get_expire(), state)
    RedisUtils.setex_async(__db, "trade-state-{}".format(code), tool.get_expire(), state)
def get_codes_by_trade_state(state):