package org.yeshi.utils.log; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import java.util.Properties; public class KafkaUtil { public static Producer createProducer( String bootstrapServers, String batchSize, String lingerMs, String compressionType, String retries, String maxRequestSize, String username, String password) { // 当配置项为IS_UNDEFINED时,使用默认值 if (bootstrapServers == null) { bootstrapServers = "kafka地址"; } if (batchSize.contains("IS_UNDEFINED")) { batchSize = "50000"; } if (lingerMs.contains("IS_UNDEFINED")) { lingerMs = "60000"; } if (retries.contains("IS_UNDEFINED")) { retries = "3"; } if (maxRequestSize.contains("IS_UNDEFINED")) { maxRequestSize = "5242880"; } Properties properties = new Properties(); // kafka地址,集群用逗号分隔开 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); // acks取值: // 0: kafka不返回确认信息,不保证record是否被收到,因为没有返回所以重试机制不会生效 // 1: partition leader确认record写入到日志中,但不保证信息是否被正确复制 // all: leader会等待所有信息被同步后返回确认信息 properties.put(ProducerConfig.ACKS_CONFIG, "1"); properties.put(ProducerConfig.RETRIES_CONFIG, Integer.valueOf(retries)); // 批量发送,当达到batch size最大值触发发送机制(10.0后支持批量发送) properties.put(ProducerConfig.BATCH_SIZE_CONFIG, Integer.valueOf(batchSize)); // 该配置是指在batch.size数量未达到时,指定时间内也会推送数据 properties.put(ProducerConfig.LINGER_MS_CONFIG, Integer.valueOf(lingerMs)); // 配置缓存 properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); if (!compressionType.contains("IS_UNDEFINED")) { // 指定压缩算法 properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionType); } // 每个请求的消息大小 properties.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, Integer.valueOf(maxRequestSize)); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); if (!"".equals(username)) { properties.put("security.protocol", "SASL_PLAINTEXT"); properties.put("sasl.mechanism", "PLAIN"); properties.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"" + "用戶名" + "\" password=\"" + "密碼" + "\";"); } return new KafkaProducer(properties); } }