package org.yeshi.utils.log;
|
|
import org.apache.kafka.clients.producer.KafkaProducer;
|
import org.apache.kafka.clients.producer.Producer;
|
import org.apache.kafka.clients.producer.ProducerConfig;
|
|
import java.util.Properties;
|
|
public class KafkaUtil {
|
public static Producer<String, String> createProducer(
|
String bootstrapServers, String batchSize, String lingerMs,
|
String compressionType, String retries, String maxRequestSize, String username, String password) {
|
// 当配置项为IS_UNDEFINED时,使用默认值
|
if (bootstrapServers == null) {
|
bootstrapServers = "kafka地址";
|
}
|
if (batchSize.contains("IS_UNDEFINED")) {
|
batchSize = "50000";
|
}
|
if (lingerMs.contains("IS_UNDEFINED")) {
|
lingerMs = "60000";
|
}
|
|
if (retries.contains("IS_UNDEFINED")) {
|
retries = "3";
|
}
|
if (maxRequestSize.contains("IS_UNDEFINED")) {
|
maxRequestSize = "5242880";
|
}
|
|
Properties properties = new Properties();
|
// kafka地址,集群用逗号分隔开
|
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
|
// acks取值:
|
// 0: kafka不返回确认信息,不保证record是否被收到,因为没有返回所以重试机制不会生效
|
// 1: partition leader确认record写入到日志中,但不保证信息是否被正确复制
|
// all: leader会等待所有信息被同步后返回确认信息
|
properties.put(ProducerConfig.ACKS_CONFIG, "1");
|
properties.put(ProducerConfig.RETRIES_CONFIG, Integer.valueOf(retries));
|
// 批量发送,当达到batch size最大值触发发送机制(10.0后支持批量发送)
|
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, Integer.valueOf(batchSize));
|
// 该配置是指在batch.size数量未达到时,指定时间内也会推送数据
|
properties.put(ProducerConfig.LINGER_MS_CONFIG, Integer.valueOf(lingerMs));
|
|
// 配置缓存
|
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
|
if (!compressionType.contains("IS_UNDEFINED")) {
|
// 指定压缩算法
|
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionType);
|
}
|
// 每个请求的消息大小
|
properties.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, Integer.valueOf(maxRequestSize));
|
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
|
"org.apache.kafka.common.serialization.StringSerializer");
|
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
|
"org.apache.kafka.common.serialization.StringSerializer");
|
if (!"".equals(username)) {
|
properties.put("security.protocol", "SASL_PLAINTEXT");
|
properties.put("sasl.mechanism", "PLAIN");
|
properties.put("sasl.jaas.config",
|
"org.apache.kafka.common.security.scram.ScramLoginModule required username=\"" + "用戶名" + "\" password=\"" + "密碼" + "\";");
|
}
|
return new KafkaProducer<String, String>(properties);
|
}
|
|
}
|