Administrator
2023-08-02 8595dc22aa9dde6aba6d0f8cdcf1656a8a59513b
redis封装
21个文件已修改
131 ■■■■■ 已修改文件
code_attribute/big_money_num_manager.py 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
code_attribute/code_volumn_manager.py 10 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
code_attribute/gpcode_first_screen_manager.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
code_attribute/gpcode_manager.py 26 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
db/redis_manager.py 15 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
inited_data.py 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_manager.py 17 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_util.py 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/safe_count_manager.py 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_data_util.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
output/kp_client_msg_manager.py 补丁 | 查看 | 原始文档 | blame | 历史
third_data/code_plate_key_manager.py 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/history_k_data_util.py 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ths/l2_code_operate.py 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ths/ths_util.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/huaxin_trade_record_manager.py 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/trade_server.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/trade_data_manager.py 补丁 | 查看 | 原始文档 | blame | 历史
trade/trade_gui.py 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/trade_manager.py 14 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utils/data_process.py 3 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
code_attribute/big_money_num_manager.py
@@ -20,7 +20,7 @@
def add_num(code, num):
    redis = __redisManager.getRedis()
    redis.incrby("big_money-{}".format(code), num)
    RedisUtils.incrby(redis,"big_money-{}".format(code), num)
# 设置过期时间
@@ -31,12 +31,12 @@
def reset(code):
    redis = __redisManager.getRedis()
    redis.set("big_money-{}".format(code), 0)
    RedisUtils.set(redis,"big_money-{}".format(code), 0)
def get_num(code):
    redis = __redisManager.getRedis()
    num = redis.get("big_money-{}".format(code))
    num = RedisUtils.get(redis, "big_money-{}".format(code))
    if num is None:
        return 0
    return round(int(num) / 1000 / 4)
code_attribute/code_volumn_manager.py
@@ -30,11 +30,11 @@
    yesterday = global_util.yesterday_volumn.get(code)
    redis = __redis_manager.getRedis()
    if max60 is None:
        max60 = redis.get("volumn_max60-{}".format(code))
        max60 = RedisUtils.get(redis, "volumn_max60-{}".format(code))
        if max60:
            max60 = json.loads(max60)
    if yesterday is None:
        yesterday = redis.get("volumn_yes-{}".format(code))
        yesterday = RedisUtils.get(redis, "volumn_yes-{}".format(code))
    return max60, yesterday
@@ -51,7 +51,7 @@
    _volumn = global_util.today_volumn.get(code)
    if _volumn is None:
        redis = __redis_manager.getRedis()
        _volumn = redis.get("volumn_today-{}".format(code))
        _volumn = RedisUtils.get(redis, "volumn_today-{}".format(code))
    return _volumn
@@ -96,7 +96,7 @@
    if keys is not None:
        for k in keys:
            code = k.split("-")[1]
            max60_volumn = redis.get(k)
            max60_volumn = RedisUtils.get(redis, k)
            if max60_volumn:
                max60_volumn = json.loads(max60_volumn)
            global_util.max60_volumn[code] = max60_volumn
@@ -104,7 +104,7 @@
    if keys is not None:
        for k in keys:
            code = k.split("-")[1]
            global_util.yesterday_volumn[code] = redis.get(k)
            global_util.yesterday_volumn[code] = RedisUtils.get(redis, k)
if __name__ == "__main__":
code_attribute/gpcode_first_screen_manager.py
@@ -52,7 +52,7 @@
    redis = __redisManager.getRedis()
    if codes:
        for code in codes:
            redis.srem("first_no_screen_codes", code)
            RedisUtils.srem(redis, "first_no_screen_codes", code)
def __get_first_no_screen_codes():
code_attribute/gpcode_manager.py
@@ -281,7 +281,7 @@
    for code in add_codes:
        RedisUtils.sadd(redis_instance, "gp_list_first", code)
    for code in del_set:
        redis_instance.srem("gp_list_first", code)
        RedisUtils.srem(redis_instance, "gp_list_first", code)
    RedisUtils.expire(redis_instance, "gp_list_first", tool.get_expire())
    old_name_codes = CodesNameManager.list_first_code_name_dict()
@@ -297,7 +297,7 @@
def remove_first_gp_code(codes):
    redis_instance = __redisManager.getRedis()
    for code in codes:
        redis_instance.srem("gp_list_first", code)
        RedisUtils.srem(redis_instance, "gp_list_first", code)
# 获取首板代码
@@ -309,7 +309,7 @@
# 是否在首板里面
def is_in_first_gp_codes(code):
    redis_instance = __redisManager.getRedis()
    return redis_instance.sismember("gp_list_first", code)
    return RedisUtils.sismember(redis_instance, "gp_list_first", code)
# 获取名称对应的代码
@@ -366,13 +366,13 @@
def rm_gp(code):
    redis_instance = __redisManager.getRedis()
    redis_instance.srem("gp_list", code)
    RedisUtils.srem(redis_instance, "gp_list", code)
    remove_first_gp_code([code])
def is_in_gp_pool(code):
    redis_instance = __redisManager.getRedis()
    return redis_instance.sismember("gp_list", code) or is_in_first_gp_codes(code)
    return RedisUtils.sismember(redis_instance, "gp_list", code) or is_in_first_gp_codes(code)
def get_gp_list():
@@ -518,13 +518,13 @@
    keys = RedisUtils.keys(redis_instance, "listen_code-*-*")
    for key in keys:
        result = redis_instance.get(key)
        result = RedisUtils.get(redis_instance, key)
        if result:
            # 移除需要添加的代码
            codes_set.discard(result)
            client_id_, pos_, code_ = int(key.split("-")[1]), int(key.split("-")[2]), result
            key_ = "code_listen_pos-{}".format(code_)
            val = redis_instance.get(key_)
            val = RedisUtils.get(redis_instance, key_)
            if val is None:
                RedisUtils.setex(redis_instance, key_, tool.get_expire(), json.dumps((client_id_, pos_)))
            else:
@@ -541,7 +541,7 @@
def init_listen_code_by_pos(client_id, pos):
    redis_instance = __redisManager.getRedis()
    key = "listen_code-{}-{}".format(client_id, pos)
    redis_instance.setnx(key, "")
    RedisUtils.setnx(redis_instance, key, "")
    RedisUtils.expire(redis_instance, key, tool.get_expire())
@@ -580,7 +580,7 @@
            index = key.split("-")[-1]
            if int(index) + 1 > constant.L2_CODE_COUNT_PER_DEVICE:
                continue
            result = redis_instance.get(key)
            result = RedisUtils.get(redis_instance, key)
            if result is None or len(result) == 0:
                available_positions.append((client_id, int(key.replace("listen_code-{}-".format(client_id), ""))))
            else:
@@ -617,7 +617,7 @@
        k = "listen_code-{}-*".format(client_id)
        keys = RedisUtils.keys(redis_instance, k)
        for key in keys:
            code = redis_instance.get(key)
            code = RedisUtils.get(redis_instance, key)
            if not code:
                free_count += 1
    return free_count
@@ -626,7 +626,7 @@
# 获取正在监听的代码的位置
def get_listen_code_pos(code):
    redis_instance = __redisManager.getRedis()
    val = redis_instance.get("code_listen_pos-{}".format(code))
    val = RedisUtils.get(redis_instance, "code_listen_pos-{}".format(code))
    if val is None:
        return None, None
    val = json.loads(val)
@@ -642,7 +642,7 @@
# 是否正在监听
def is_listen(code):
    redis_instance = __redisManager.getRedis()
    val = redis_instance.get("code_listen_pos-{}".format(code))
    val = RedisUtils.get(redis_instance, "code_listen_pos-{}".format(code))
    if val is None:
        return False
    else:
@@ -666,7 +666,7 @@
# 是否正在操作
def is_operate(code):
    redis_instance = __redisManager.getRedis()
    return redis_instance.get("gp_operate-{}".format(code)) is not None
    return RedisUtils.get(redis_instance, "gp_operate-{}".format(code)) is not None
# 设置正在操作的代码
db/redis_manager.py
@@ -34,6 +34,10 @@
        return redis_.get(key)
    @classmethod
    def scard(cls, redis_, key):
        return redis_.scard(key)
    @classmethod
    def delete(cls, redis_, key):
        logger_redis_debug.info("delete:{}", key)
        return redis_.delete(key)
@@ -52,6 +56,11 @@
    def setex(cls, redis_, key, expire, val):
        logger_redis_debug.info("setex:{}", key)
        return redis_.setex(key, expire, val)
    @classmethod
    def setnx(cls, redis_, key, val):
        logger_redis_debug.info("setnx:{}", key)
        return redis_.setnx(key, val)
    @classmethod
    def expire(cls, redis_, key, expire):
@@ -93,6 +102,12 @@
        logger_redis_debug.info("lpop:{}", key)
        return redis_.lpop(key)
    @classmethod
    def rpush(cls, redis_, key, val):
        logger_redis_debug.info("rpush:{}", key)
        return redis_.rpush(key, val)
inited_data.py
@@ -143,7 +143,6 @@
    # 多个时间点获取收盘价
    gmapi.schedule(schedule_func=__get_latest_info, date_rule='1d', time_rule='09:28:00')
    # 初始化内容
    clients = authority.get_l2_clients()
l2/l2_data_manager.py
@@ -51,7 +51,7 @@
    def get_buy_compute_start_data(code):
        redis = TradePointManager.__get_redis()
        _key = "buy_compute_index_info-{}".format(code)
        _data_json = redis.get(_key)
        _data_json = RedisUtils.get(redis, _key)
        if _data_json is None:
            return None, None, None, 0, 0, [], 0
        _data = json.loads(_data_json)
@@ -97,7 +97,7 @@
    @staticmethod
    def get_buy_cancel_single_pos(code):
        redis = TradePointManager.__get_redis()
        info = redis.get("buy_cancel_single_pos-{}".format(code))
        info = RedisUtils.get(redis, "buy_cancel_single_pos-{}".format(code))
        if info is None:
            return None
        else:
@@ -130,7 +130,7 @@
    @classmethod
    def get_compute_info_for_cancel_buy(cls, code):
        redis = TradePointManager.__get_redis()
        info = redis.get("compute_info_for_cancel_buy-{}".format(code))
        info = RedisUtils.get(redis, "compute_info_for_cancel_buy-{}".format(code))
        if info is None:
            return None, 0
        else:
@@ -147,14 +147,15 @@
    def set_count_info_for_cancel_buy(cls, code, index, buy_count, cancel_count):
        redis = TradePointManager.__get_redis()
        expire = tool.get_expire()
        RedisUtils.setex(redis,"count_info_for_cancel_buy-{}".format(code), expire, json.dumps((index, buy_count, cancel_count)))
        RedisUtils.setex(redis, "count_info_for_cancel_buy-{}".format(code), expire,
                         json.dumps((index, buy_count, cancel_count)))
        logger_l2_trade_buy.info("{}保存撤单纯买额信息:{},{}", code, index, buy_count, cancel_count)
    # 获取买撤纯买额计算信息
    @classmethod
    def get_count_info_for_cancel_buy(cls, code):
        redis = TradePointManager.__get_redis()
        info = redis.get("count_info_for_cancel_buy-{}".format(code))
        info = RedisUtils.get(redis, "count_info_for_cancel_buy-{}".format(code))
        if info is None:
            return None, 0, 0
        else:
@@ -184,7 +185,7 @@
def init_l2_fixed_codes():
    key = "l2-fixed-codes"
    redis = _redisManager.getRedis()
    count = redis.scard(key)
    count = RedisUtils.scard(redis, key)
    if count > 0:
        RedisUtils.delete(redis, key)
    RedisUtils.sadd(redis, key, "000000")
@@ -195,7 +196,7 @@
def remove_from_l2_fixed_codes(code):
    key = "l2-fixed-codes"
    redis = _redisManager.getRedis()
    redis.srem(key, code)
    RedisUtils.srem(redis, key, code)
# 添加代码到L2固定监控
@@ -210,7 +211,7 @@
def is_in_l2_fixed_codes(code):
    key = "l2-fixed-codes"
    redis = _redisManager.getRedis()
    return redis.sismember(key, code)
    return RedisUtils.sismember(redis, key, code)
if __name__ == "__main__":
l2/l2_data_util.py
@@ -118,8 +118,7 @@
    redis_instance = _redisManager.getRedis()
    try:
        if redis_instance.setnx("l2-save-{}".format(code), "1") > 0:
        if RedisUtils.setnx(redis_instance, "l2-save-{}".format(code), "1") > 0:
            # 计算保留的时间
            expire = tool.get_expire()
            i = 0
l2/safe_count_manager.py
@@ -39,7 +39,8 @@
    # 保存最近的下单信息
    def __save_latest_place_order_info(self, code, buy_single_index, buy_exec_index, cancel_index):
        key = "latest_place_order_info-{}".format(code)
        RedisUtils.setex( self.__getRedis(), key, tool.get_expire(), json.dumps((buy_single_index, buy_exec_index, cancel_index)))
        RedisUtils.setex(self.__getRedis(), key, tool.get_expire(),
                         json.dumps((buy_single_index, buy_exec_index, cancel_index)))
    def __get_latest_place_order_info(self, code):
        key = "latest_place_order_info-{}".format(code)
l2_data_util.py
@@ -196,7 +196,7 @@
# 获取最新数据条数
def get_l2_latest_data_number(code):
    redis = l2_data_manager._redisManager.getRedis()
    num = redis.get("l2_latest_data_num-{}".format(code))
    num =RedisUtils.get(redis, "l2_latest_data_num-{}".format(code))
    if num is not None:
        return int(num)
    return None
output/kp_client_msg_manager.py
third_data/code_plate_key_manager.py
@@ -259,7 +259,8 @@
    def set_history_limit_up_reason(self, code, reasons):
        self.__history_limit_up_reason_dict[code] = set(reasons)
        RedisUtils.setex(self.__get_redis(),f"kpl_his_limit_up_reason-{code}", tool.get_expire(), json.dumps(list(reasons)))
        RedisUtils.setex(self.__get_redis(), f"kpl_his_limit_up_reason-{code}", tool.get_expire(),
                         json.dumps(list(reasons)))
        logger_kpl_debug.debug(f"设置历史涨停原因:{code}-{reasons}")
    # 如果返回值不为None表示已经加载过历史原因了
third_data/history_k_data_util.py
@@ -8,6 +8,7 @@
import requests
import constant
from db.redis_manager import RedisUtils
from utils import tool
from db import redis_manager
import gm.api as gmapi
@@ -84,9 +85,9 @@
    @classmethod
    def getJueJinAccountInfo(cls):
        redis = cls.__redisManager.getRedis()
        account_id = redis.get("juejin-account-id")
        strategy_id = redis.get("juejin-strategy-id")
        token = redis.get("juejin-token")
        account_id = RedisUtils.get(redis, "juejin-account-id")
        strategy_id =RedisUtils.get(redis, "juejin-strategy-id")
        token = RedisUtils.get(redis, "juejin-token")
        return account_id, strategy_id, token
    @classmethod
ths/l2_code_operate.py
@@ -110,7 +110,7 @@
        while True:
            cls.set_read_queue_valid()
            try:
                data = redis.lpop("code_operate_queue")
                data =RedisUtils.lpop(redis,"code_operate_queue")
                # print("读取操作队列", data, redis.llen("code_operate_queue"))
                if data is not None:
                    data = json.loads(data)
@@ -187,7 +187,7 @@
        if int(tool.get_now_time_str().replace(":", "")) < int("092510"):
            return
        redis = self.redis_manager_.getRedis()
        redis.rpush("code_operate_queue",
        RedisUtils.rpush(redis, "code_operate_queue",
                    json.dumps({"type": type, "msg": msg, "code": code, "client": client, "pos": pos,
                                "create_time": round(time.time() * 1000)}))
@@ -211,7 +211,7 @@
                    "data": {"index": int(pos), "code": code, "min_price": float(min_price),
                             "max_price": float(max_price)}}
            redis = self.redis_manager_.getRedis()
            redis.rpush("code_operate_queue", json.dumps(
            RedisUtils.rpush(redis, "code_operate_queue", json.dumps(
                {"type": 3, "code": code, "client": client_id, "data": data, "create_time": round(time.time() * 1000)}))
    # 移除监控
@@ -240,7 +240,7 @@
    @classmethod
    def is_read_queue_valid(cls):
        redis = cls.getRedis()
        return redis.get("operate_queue_read_state") is not None
        return RedisUtils.get(redis, "operate_queue_read_state") is not None
# 通过l2代码校验代码位
ths/ths_util.py
@@ -182,7 +182,7 @@
    if not dead:
        RedisUtils.setex(redis,key, tool.get_expire(), 0)
    else:
        redis.incrby(key, 1)
        RedisUtils.incrby(redis, key, 1)
        RedisUtils.expire(redis, key, tool.get_expire())
@@ -190,7 +190,7 @@
def is_ths_dead(client_id):
    key = "ths_state_dead_count-{}".format(client_id)
    redis = __redisManager.getRedis()
    val = redis.get(key)
    val = RedisUtils.get(redis, key)
    if val is not None and int(val) >= 5:
        return True
    else:
trade/huaxin/huaxin_trade_record_manager.py
trade/huaxin/trade_server.py
@@ -22,12 +22,12 @@
from l2.l2_data_manager_new import L2TradeDataProcessor
from log_module.log import hx_logger_l2_upload, hx_logger_contact_debug, hx_logger_trade_callback, \
    hx_logger_l2_orderdetail, hx_logger_l2_transaction, hx_logger_l2_market_data, logger_l2_trade_buy_queue
from third_data import block_info, kpl_api, kpl_data_manager
from third_data import block_info, kpl_api
from third_data.code_plate_key_manager import KPLCodeJXBlockManager
from third_data.history_k_data_util import JueJinApi
from trade import deal_big_money_manager, current_price_process_manager
from trade.huaxin import huaxin_trade_api as trade_api, trade_api_server, huaxin_trade_api, huaxin_trade_data_update
from trade.huaxin import huaxin_trade_api as trade_api, huaxin_trade_api, huaxin_trade_data_update
from utils import socket_util
trade_data_request_queue = queue.Queue()
trade/trade_data_manager.py
trade/trade_gui.py
@@ -11,13 +11,14 @@
import win32gui
import win32con
from code_attribute import gpcode_manager
from db.redis_manager import RedisUtils
from ocr import ocr_util
from trade import l2_trade_util
from db import redis_manager
from log_module.log import *
from utils.tool import async_call
from utils import win32_util, capture_util
from utils import win32_util, capture_util, tool
class THSGuiTrade(object):
trade/trade_manager.py
@@ -134,7 +134,7 @@
# 获取交易状态
def get_trade_state(code):
    redis = __redis_manager.getRedis()
    state = redis.get("trade-state-{}".format(code))
    state =RedisUtils.get(redis, "trade-state-{}".format(code))
    if state is None:
        return TRADE_STATE_NOT_TRADE
    return int(state)
@@ -164,7 +164,7 @@
    codes = []
    if keys is not None:
        for key in keys:
            if int(redis.get(key)) in states:
            if int(RedisUtils.get(redis, key)) in states:
                codes.append(key.replace("trade-state-", ''))
    return codes
@@ -172,13 +172,13 @@
# 设置交易账户的可用金额
def set_available_money(client_id, money):
    redis = __redis_manager.getRedis()
    redis.set("trade-account-canuse-money", money)
    RedisUtils.set(redis,"trade-account-canuse-money", money)
# 获取交易账户的可用金额
def get_available_money():
    redis = __redis_manager.getRedis()
    result = redis.get("trade-account-canuse-money")
    result = RedisUtils.get(redis, "trade-account-canuse-money")
    if result is None:
        return None
    return round(float(result), 2)
@@ -259,14 +259,14 @@
                "time": result[5], "trade_num": result[6], "type": result[7], "day": result[8],
                "create_time": result[9]}
        datas.append(data)
    return datas, redis.get("trade-success-latest-time")
    return datas, RedisUtils.get(redis,"trade-success-latest-time")
# 获取交易委托数据
def get_trade_delegate_data():
    redis = __redis_manager.getRedis()
    result = redis.get("trade-delegate-latest")
    time_str = redis.get("trade-delegate-latest-time")
    result = RedisUtils.get(redis, "trade-delegate-latest")
    time_str =RedisUtils.get(redis, "trade-delegate-latest-time")
    if result is None:
        return [], time_str
    else:
utils/data_process.py
@@ -1,9 +1,6 @@
# 数据处理
import json
from db import redis_manager
from log_module.log import logger_l2_error
__redisManager = redis_manager.RedisManager(0)
def parse(str):