package org.yeshi.utils.log; import ch.qos.logback.classic.spi.ILoggingEvent; import ch.qos.logback.core.ConsoleAppender; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class KafkaAppender extends ConsoleAppender { public static final Logger LOGGER = LoggerFactory.getLogger(KafkaAppender.class); private String bootstrapServers; private String topic; private String batchSize; private String lingerMs; private String compressionType; private String retries; private String maxRequestSize; private String isSend; private Producer producer; public KafkaAppender(){ } @Override public void start() { super.start(); if ("true".equals(this.isSend)) { if (producer == null) { producer = KafkaUtil.createProducer(this.bootstrapServers, this.batchSize, this.lingerMs, this.compressionType, this.retries, this.maxRequestSize, "", ""); } } } @Override public void stop() { super.stop(); if ("true".equals(this.isSend)) { this.producer.close(); } LOGGER.info("Stopping kafkaAppender..."); } @Override protected void append(ILoggingEvent eventObject) { byte[] byteArray; String log; // 对日志格式进行解码 byteArray = this.encoder.encode(eventObject); log = new String(byteArray); ProducerRecord record = new ProducerRecord<>(this.topic, log); if (eventObject.getMarker() == null && "true".equals(this.isSend)) { //如果需要进行分析日志,可以对record进行数据结构重构下 producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception != null) { LOGGER.error("Send log to kafka failed: [{}]", log); } } }); } } public String getBootstrapServers() { return bootstrapServers; } public void setBootstrapServers(String bootstrapServers) { this.bootstrapServers = bootstrapServers; } public String getTopic() { return topic; } public void setTopic(String topic) { this.topic = topic; } public String getBatchSize() { return batchSize; } public void setBatchSize(String batchSize) { this.batchSize = batchSize; } public String getLingerMs() { return lingerMs; } public void setLingerMs(String lingerMs) { this.lingerMs = lingerMs; } public String getCompressionType() { return compressionType; } public void setCompressionType(String compressionType) { this.compressionType = compressionType; } public String getRetries() { return retries; } public void setRetries(String retries) { this.retries = retries; } public String getMaxRequestSize() { return maxRequestSize; } public void setMaxRequestSize(String maxRequestSize) { this.maxRequestSize = maxRequestSize; } public String getIsSend() { return isSend; } public void setIsSend(String isSend) { this.isSend = isSend; } public Producer getProducer() { return producer; } public void setProducer(Producer producer) { this.producer = producer; } }