Administrator
2024-11-26 315855587800b136d894771f41b72c60cc37bd32
接口修改/redis队列溢出保护/减少不必要的日志输出
12个文件已修改
195 ■■■■ 已修改文件
api/outside_api_command_callback.py 19 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
db/redis_manager_delegate.py 7 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/code_price_manager.py 26 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_manager_new.py 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_log.py 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_transaction_data_manager.py 28 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_transaction_data_processor.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
servers/huaxin_trade_server.py 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/code_plate_key_manager.py 15 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/kpl_limit_up_data_manager.py 71 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/buy_radical/block_special_codes_manager.py 5 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/buy_radical/radical_buy_data_manager.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
api/outside_api_command_callback.py
@@ -36,7 +36,7 @@
    logger_real_place_order_position, logger_device
from output import l2_output_util
from third_data import kpl_util, history_k_data_manager, huaxin_l1_data_manager, third_blocks_manager, kpl_data_manager
from third_data.code_plate_key_manager import  KPLCodeJXBlockManager
from third_data.code_plate_key_manager import KPLCodeJXBlockManager
from third_data.history_k_data_manager import HistoryKDataManager
from third_data.history_k_data_util import JueJinApi, HistoryKDatasUtils
from third_data.kpl_data_manager import KPLDataManager
@@ -53,6 +53,7 @@
from trade.buy_radical.radical_buy_data_manager import RadicalBuyBlockManager
from trade.sell import sell_manager
from trade.sell.sell_rule_manager import TradeRuleManager, SellRule
from trade.trade_data_manager import RadicalBuyDealCodesManager
from trade.trade_manager import TradeTargetCodeModeManager, AutoCancelSellModeManager
from settings.trade_setting import MarketSituationManager, TradeBlockBuyModeManager
from utils import socket_util, data_export_util, tool, huaxin_util, output_util, global_util
@@ -1083,10 +1084,18 @@
                            limit_up_data = kpl_data_manager.KPLLimitUpDataRecordManager.record_code_dict.get(code)
                            # 获取当前板块
                            try:
                                limit_up_sequence = CodeLimitUpSequenceManager.get_current_limit_up_sequence(code)
                                if limit_up_sequence:
                                    fdata[
                                        'block'] = f"{limit_up_sequence[0]}-{limit_up_sequence[1]}({limit_up_sequence[2]}&{limit_up_sequence[2] - limit_up_sequence[3]})"
                                limit_up_sequences = CodeLimitUpSequenceManager.get_current_limit_up_sequence(code)
                                if limit_up_sequences:
                                    buy_blocks = RadicalBuyDealCodesManager().get_code_blocks(code)
                                    blocks_info = []
                                    for limit_up_sequence in limit_up_sequences:
                                        # 获取代码下单的板块
                                        if buy_blocks and limit_up_sequence[0] not in buy_blocks:
                                            continue
                                        blocks_info.append(
                                            f"{limit_up_sequence[0]}-{limit_up_sequence[1]}({limit_up_sequence[2]}&{limit_up_sequence[2] - limit_up_sequence[3]})")
                                    if buy_blocks:
                                        fdata['block'] = "/".join(blocks_info)
                            except:
                                pass
                            # 获取涨停时间
db/redis_manager_delegate.py
@@ -33,7 +33,7 @@
class RedisUtils:
    __async_task_queue = queue.Queue(maxsize=1000)
    __async_task_queue = queue.Queue(maxsize=4096)
    @classmethod
    def exec(cls, method_name, key, lamada_method):
@@ -187,7 +187,10 @@
    @classmethod
    def add_async_task(cls, db: int, method, args):
        cls.__async_task_queue.put_nowait((db, method, args))
        try:
            cls.__async_task_queue.put_nowait((db, method, args))
        except Exception as e:
            async_log_util.error(logger_redis_debug, f"加入队列出错:{str(e)}")
    @classmethod
    def get_async_task_count(cls):
l2/code_price_manager.py
@@ -74,12 +74,6 @@
        RedisUtils.setex_async(self.__db, f"buy1_price_limit_up_info-{code}", tool.get_expire(),
                               json.dumps((limit_up_time, open_limit_up_time)))
    def __get_buy1_price_limit_up_info(self, code):
        data = RedisUtils.get(self.__get_redis(), f"buy1_price_limit_up_info-{code}")
        if not data:
            return None, None
        data = json.loads(data)
        return data[0], data[1]
    def __get_buy1_price_limit_up_info_cache(self, code):
        cache_result = tool.CodeDataCacheUtil.get_cache(self.__buy1_price_limit_up_info_cache, code)
@@ -94,31 +88,11 @@
        tool.CodeDataCacheUtil.set_cache(self.__current_buy_1_price, code, buy_1_price)
        RedisUtils.setex_async(self.__db, f"buy1_price-{code}", tool.get_expire(), buy_1_price)
    # datas:[(code, buy_1_price)]
    def __save_buy1_prices(self, datas):
        for d in datas:
            code = d[0]
            buy_1_price = d[1]
            # 不保存重复的数据
            self.__save_buy1_price(code, buy_1_price)
    def __get_buy1_price(self, code):
        return RedisUtils.get(self.__get_redis(), f"buy1_price-{code}")
    def __get_buy1_price_cache(self, code):
        cache_result = tool.CodeDataCacheUtil.get_cache(self.__current_buy_1_price, code)
        if cache_result[0]:
            return cache_result[1]
        return None
    # ------------------------炸板后的最低价------------------------------
    def __save_open_limit_up_lowest_price(self, code, price):
        tool.CodeDataCacheUtil.set_cache(self.__open_limit_up_lowest_price_cache, code, round(float(price), 2))
        RedisUtils.setex_async(self.__db, f"open_limit_up_lowest_price-{code}", tool.get_expire(), f"{price}")
    def __get_open_limit_up_lowest_price(self, code):
        return RedisUtils.get(self.__get_redis(), f"open_limit_up_lowest_price-{code}")
    def __get_open_limit_up_lowest_price_cache(self, code):
        cache_result = tool.CodeDataCacheUtil.get_cache(self.__open_limit_up_lowest_price_cache, code)
l2/l2_data_manager_new.py
@@ -786,8 +786,8 @@
        now_time_int = int(tool.get_now_time_str().replace(":", ""))
        if now_time_int >= 145700:
            return False, True, f"14:57后不能交易", True
        if now_time_int < 93200:
            return False, True, f"09:32之前不能交易", True
        if now_time_int < 93100:
            return False, True, f"09:31之前不能交易", True
        # 二板以上的票不买
        yesterday_codes = kpl_data_manager.get_yesterday_limit_up_codes()
        if yesterday_codes and code in yesterday_codes:
@@ -1119,7 +1119,7 @@
        total_data = local_today_datas.get(code)
        # 9:32之前上证开1的票不买
        if tool.is_sh_code(code) and int(total_data[-1]["val"]["time"].replace(":", "")) <= int("093200"):
        if tool.is_sh_code(code) and int(total_data[-1]["val"]["time"].replace(":", "")) <= int("093100"):
            # 获取涨停时间
            limit_up_data = kpl_data_manager.KPLLimitUpDataRecordManager.record_code_dict.get(code)
            if limit_up_data:
l2/l2_log.py
@@ -47,8 +47,9 @@
            try:
                self.distribute_log_manager(c)
            except Exception as e:
                logging.exception(e)
                logger_debug.error(f"L2日志分配出错:新代码总数{len(codes)}")
                pass
                # logging.exception(e)
                # logger_debug.error(f"L2日志分配出错:新代码总数{len(codes)}")
    def get_log_manager(self, code):
        if code in self.distributed_log_dict:
l2/l2_transaction_data_manager.py
@@ -200,18 +200,23 @@
            # q.append((data['SecurityID'], data['TradePrice'], data['TradeVolume'],
            #                   data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'],
            #                   data['SellNo'], data['ExecType']))
            is_limit_up = abs(data[1] - limit_up_price) < 0.0001
            money = data[2] * data[1]
            if code not in cls.__dealing_order_info_dict:
                # 数据格式[订单号,总股数,成交金额,成交开始时间,成交结束时间, 最近的成交价格, 最近的卖单号]
                cls.__dealing_order_info_dict[code] = [data[6], data[2], data[2] * data[1], data[3], data[3], data[1], data[7]]
                # 数据格式[订单号,总股数,成交金额,成交开始时间,成交结束时间, 最近的成交价格, 最近的卖单号, 涨停价成交金额]
                cls.__dealing_order_info_dict[code] = [data[6], data[2],money, data[3], data[3], data[1],
                                                       data[7], 0]
                if is_limit_up:
                    cls.__dealing_order_info_dict[code][7] += money
            else:
                if cls.__dealing_order_info_dict[code][0] == data[6]:
                    # 成交同一个订单号
                    cls.__dealing_order_info_dict[code][1] += data[2]
                    cls.__dealing_order_info_dict[code][2] += data[2] * data[1]
                    cls.__dealing_order_info_dict[code][2] += money
                    cls.__dealing_order_info_dict[code][4] = data[3]
                    cls.__dealing_order_info_dict[code][5] = data[1]
                    cls.__dealing_order_info_dict[code][6] = data[7]
                    cls.__dealing_order_info_dict[code][7] += money
                else:
                    # 保存上一条数据
                    async_log_util.info(hx_logger_l2_transaction_desc, f"{code}#{cls.__dealing_order_info_dict[code]}")
@@ -219,16 +224,19 @@
                    deal_info = cls.__dealing_order_info_dict[code]
                    cls.__latest_deal_order_info_dict[code] = deal_info
                    # 是否为大买单
                    if deal_info[2] >= threshold_big_money:
                        # 如果最后一笔成交价格不是涨停价就不算
                        if abs(deal_info[5] - limit_up_price) < 0.0001:
                            big_buy_datas.append(deal_info)
                    if deal_info[7] >= threshold_big_money:
                        # # 如果最后一笔成交价格不是涨停价就不算
                        # if abs(deal_info[5] - limit_up_price) < 0.0001:
                        big_buy_datas.append(deal_info)
                    if deal_info[2] >= 500000:
                        normal_buy_datas.append(deal_info)
                    # 初始化本条数据
                    cls.__dealing_order_info_dict[code] = [data[6], data[2], data[2] * data[1], data[3], data[3],
                                                           data[1], data[7]]
                    cls.__dealing_order_info_dict[code] = [data[6], data[2], money, data[3], data[3],
                                                           data[1], data[7], 0]
                    if is_limit_up:
                        cls.__dealing_order_info_dict[code][7] += money
            # 统计主动买(买单号大于卖单号)
            try:
                if data[6] > data[7]:
l2/l2_transaction_data_processor.py
@@ -165,7 +165,7 @@
                    # 判断时间是否与本地时间相差5s以上
                    if tool.trade_time_sub(tool.get_now_time_str(), l2_huaxin_util.convert_time(datas[-1][3])) > 10:
                        now_seconds = int(tool.get_now_time_str().replace(":", ""))
                        if now_seconds < int("093200"):  # or int("130000") <= now_seconds < int("130200"):
                        if now_seconds < int("093100"):  # or int("130000") <= now_seconds < int("130200"):
                            need_cancel, cancel_msg = True, f"成交时间与本地时间相差10S以上,{l2_huaxin_util.convert_time(datas[-1][3])}"
                            cancel_type = trade_constant.CANCEL_TYPE_L2_DELAY
                    if need_cancel:
servers/huaxin_trade_server.py
@@ -43,7 +43,7 @@
from third_data.history_k_data_util import JueJinApi
from trade import l2_trade_util, \
    trade_data_manager, trade_constant, buy_open_limit_up_strategy
from trade.buy_radical import radical_buy_data_manager, radical_buy_strategy
from trade.buy_radical import radical_buy_data_manager, radical_buy_strategy, block_special_codes_manager
from trade.buy_money_count_setting import BuyMoneyAndCountSetting, BuyMoneyUtil
from trade.huaxin import huaxin_trade_api as trade_api, huaxin_trade_api, huaxin_trade_data_update, \
@@ -249,6 +249,9 @@
                            code_name_map[d["sec_id"]] = d["sec_name"]
                        # 保存代码名称
                        CodesNameManager().add_code_names(code_name_map)
                        # 更新辨识度代码
                        threading.Thread(target= block_special_codes_manager.update_block_special_codes, daemon=True).start()
                        sk.sendall(
                            socket_util.load_header(json.dumps({"code": 0, "data": fdatas}).encode(encoding='utf-8')))
@@ -773,10 +776,10 @@
                    if result_by_volume[0] != radical_buy_strategy.BUY_MODE_NONE:
                        if not GreenListCodeManager().is_in_cache(code):
                            # 加绿的不需要判断如下问题
                            if tool.get_now_time_as_int() < 93200:
                            if tool.get_now_time_as_int() < 93100:
                                radical_buy_data_manager.ExcludeIndexComputeCodesManager.add_code(code)
                                async_log_util.info(logger_l2_radical_buy,
                                                    f"09:32之前不交易:{code}")
                                                    f"09:31之前不交易:{code}")
                                return True
                            # 判断是否开得太高
                            open_price = L1DataManager.get_open_price(code)
third_data/code_plate_key_manager.py
@@ -360,13 +360,12 @@
        @param datas:
        @return:
        """
        # blocks = set()
        # 老版本实现方式
        # for data in datas:
        #     if data[3] <= 0:
        #         break
        #     blocks.add(data[1])
        # cls.__top_jx_blocks = blocks
        # 流入阈值
        THRESHOLD_MONEY = 100* (tool.trade_time_sub(tool.get_now_time_str(), "09:30:00")//60)+1000
        THRESHOLD_MONEY = min(THRESHOLD_MONEY, 10000)
        THRESHOLD_MONEY = max(THRESHOLD_MONEY, 1000)
        THRESHOLD_MONEY = THRESHOLD_MONEY * 10000
        cls.top_in_list_cache = datas
        blocks = set()
        count = 0
@@ -375,7 +374,7 @@
            cls.__jx_blocks_in_money_dict[data[1]] = data[3]
            if data[1] in constant.KPL_INVALID_BLOCKS:
                continue
            if data[3] < 5e7:
            if data[3] < THRESHOLD_MONEY:
                continue
            # 过滤出来为同一个板块就只算1个数量
            fb = BlockMapManager().filter_blocks({data[1]})
third_data/kpl_limit_up_data_manager.py
@@ -4,6 +4,7 @@
from third_data import kpl_util, kpl_data_manager
from third_data.history_k_data_manager import HistoryKDataManager
from third_data.history_k_data_util import HistoryKDatasUtils
from third_data.kpl_data_constant import LimitUpCodesBlockRecordManager
from utils import tool, init_data_util
@@ -43,6 +44,13 @@
    __first_block_sequence_dict = {}
    @classmethod
    def __get_code_blocks(cls, code):
        blocks = LimitUpCodesBlockRecordManager().get_radical_buy_blocks(code)
        if not blocks:
            blocks = set()
        return blocks
    @classmethod
    def set_current_limit_up_datas(cls, current_limit_up_datas):
        """
        设置目前的涨停代码
@@ -59,49 +67,54 @@
        block_codes = {}
        limit_up_codes = set()
        for code in current_code_block_dict:
            b = current_code_block_dict[code][1]
            if b not in block_codes:
                block_codes[b] = set()
            block_codes[b].add(code)
            bs = cls.__get_code_blocks(code)
            for b in bs:
                if b not in block_codes:
                    block_codes[b] = set()
                block_codes[b].add(code)
            limit_up_codes.add(code)
        for code in record_code_block_dict:
            b = record_code_block_dict[code][1]
            if b not in block_codes:
                block_codes[b] = set()
            block_codes[b].add(code)
            bs = cls.__get_code_blocks(code)
            for b in bs:
                if b not in block_codes:
                    block_codes[b] = set()
                block_codes[b].add(code)
        # 获取上个交易日涨停的代码
        yesterday_codes = kpl_data_manager.get_yesterday_limit_up_codes()
        if yesterday_codes is None:
            yesterday_codes = set()
        for code in limit_up_codes:
            # 计算身位
            b = current_code_block_dict[code][1]
            codes = block_codes[b]
            total_count = len(codes)
            # 统计真正涨停数
            limit_up_count = 0
            limit_up_codes_list = []
            for c in codes:
                if c in limit_up_codes:
                    limit_up_count += 1
                    if c not in yesterday_codes:
                        limit_up_codes_list.append((c, current_code_block_dict[c][2]))
            # 获取首板代码的排位
            limit_up_codes_list.sort(key=lambda x: x[1])
            index = 1
            for i in range(0, len(limit_up_codes_list)):
                if limit_up_codes_list[i][0] == code:
                    index = i + 1
                    break
            cls.__first_block_sequence_dict[code] = (b, index, total_count, limit_up_count)
            bs = cls.__get_code_blocks(code)
            for b in bs:
                # 计算身位
                codes = block_codes[b]
                total_count = len(codes)
                # 统计真正涨停数
                limit_up_count = 0
                limit_up_codes_list = []
                for c in codes:
                    if c in limit_up_codes:
                        limit_up_count += 1
                        if c not in yesterday_codes:
                            limit_up_codes_list.append((c, current_code_block_dict[c][2]))
                # 获取首板代码的排位
                limit_up_codes_list.sort(key=lambda x: x[1])
                index = 1
                for i in range(0, len(limit_up_codes_list)):
                    if limit_up_codes_list[i][0] == code:
                        index = i + 1
                        break
                if code not in cls.__first_block_sequence_dict:
                    cls.__first_block_sequence_dict[code] = []
                cls.__first_block_sequence_dict[code].append((b, index, total_count, limit_up_count))
    @classmethod
    def get_current_limit_up_sequence(cls, code):
        """
        获取代码当前的板块身位
        @param code:
        @return:(板块名称,身位,总涨停数量,目前涨停数量)
        @return:[(板块名称,身位,总涨停数量,目前涨停数量)]
        """
        return cls.__first_block_sequence_dict.get(code)
trade/buy_radical/block_special_codes_manager.py
@@ -187,9 +187,12 @@
        return fdatas
if __name__ == "__main__":
def update_block_special_codes():
    datas = AnalysisBlockSpecialCodesManager().get_block_special_codes()
    for d in datas:
        print(d)
    BlockSpecialCodesManager().set_block_codes_list(datas)
if __name__ == "__main__":
    # print(datas)
    pass
trade/buy_radical/radical_buy_data_manager.py
@@ -918,7 +918,7 @@
    money_y = int(refer_volume * limit_up_price / 1e8)
    money_y = min(money_y, 50)
    money_y = max(money_y, 5)
    before_time = tool.get_now_time_as_int() < 93200
    before_time = tool.get_now_time_as_int() < 93100
    # 计算大单参考数量
    threshold_count = 1  # int(round(0.4 * money_y))
    if refer_total_sell_money >= 1e7: