admin
2021-08-07 2b53a01ef7275e3bd708529ed64042ff480a9f73
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package org.yeshi.utils.log;
 
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.ConsoleAppender;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
public class KafkaAppender extends ConsoleAppender<ILoggingEvent> {
    public static final Logger LOGGER = LoggerFactory.getLogger(KafkaAppender.class);
 
    private String bootstrapServers;
    private String topic;
    private String batchSize;
    private String lingerMs;
    private String compressionType;
    private String retries;
    private String maxRequestSize;
    private String isSend;
 
    private Producer<String, String> producer;
 
    public KafkaAppender(){
 
    }
 
 
    @Override
    public void start() {
        super.start();
        if ("true".equals(this.isSend)) {
            if (producer == null) {
                producer = KafkaUtil.createProducer(this.bootstrapServers, this.batchSize,
                        this.lingerMs, this.compressionType, this.retries, this.maxRequestSize, "", "");
            }
        }
    }
 
    @Override
    public void stop() {
        super.stop();
        if ("true".equals(this.isSend)) {
            this.producer.close();
        }
 
        LOGGER.info("Stopping kafkaAppender...");
    }
 
    @Override
    protected void append(ILoggingEvent eventObject) {
        byte[] byteArray;
        String log;
        // 对日志格式进行解码
        byteArray = this.encoder.encode(eventObject);
        log = new String(byteArray);
        ProducerRecord<String, String> record = new ProducerRecord<>(this.topic, log);
        if (eventObject.getMarker() == null && "true".equals(this.isSend)) {
            //如果需要进行分析日志,可以对record进行数据结构重构下
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception != null) {
                        LOGGER.error("Send log to kafka failed: [{}]", log);
                    }
                }
            });
        }
    }
 
    public String getBootstrapServers() {
        return bootstrapServers;
    }
 
    public void setBootstrapServers(String bootstrapServers) {
        this.bootstrapServers = bootstrapServers;
    }
 
    public String getTopic() {
        return topic;
    }
 
    public void setTopic(String topic) {
        this.topic = topic;
    }
 
    public String getBatchSize() {
        return batchSize;
    }
 
    public void setBatchSize(String batchSize) {
        this.batchSize = batchSize;
    }
 
    public String getLingerMs() {
        return lingerMs;
    }
 
    public void setLingerMs(String lingerMs) {
        this.lingerMs = lingerMs;
    }
 
    public String getCompressionType() {
        return compressionType;
    }
 
    public void setCompressionType(String compressionType) {
        this.compressionType = compressionType;
    }
 
    public String getRetries() {
        return retries;
    }
 
    public void setRetries(String retries) {
        this.retries = retries;
    }
 
    public String getMaxRequestSize() {
        return maxRequestSize;
    }
 
    public void setMaxRequestSize(String maxRequestSize) {
        this.maxRequestSize = maxRequestSize;
    }
 
    public String getIsSend() {
        return isSend;
    }
 
    public void setIsSend(String isSend) {
        this.isSend = isSend;
    }
 
    public Producer<String, String> getProducer() {
        return producer;
    }
 
    public void setProducer(Producer<String, String> producer) {
        this.producer = producer;
    }
}