pom 依赖:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.5.14.RELEASE</version>
</dependency>
或者
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.5.1</version>
</dependency>
ps:前面的 spring-kafka 依赖中已经包含了后面的 kafka-clients
KafkaConsumerDemo.java:
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.LoggerFactory;
import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.LoggerContext;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.*;
import java.util.logging.Logger;
public class KafkaConsumerDemo {
static Map<String,Object> properties = new HashMap<String,Object>();
private static KafkaConsumer kafkaConsumer = null;
/**
*
* windows 环境需要将下面 8 行添加到 "C:\Windows\System32\drivers\etc\hosts" 文件中:
* xxx.xxx.xxx.xxx1 xxx-data01
* xxx.xxx.xxx.xxx2 xxx-data02
* xxx.xxx.xxx.xxx3 xxx-data03
* xxx.xxx.xxx.xxx4 xxx-data04
* xxx.xxx.xxx.xxx5 xxx-data05
* xxx.xxx.xxx.xxx6 xxx-data06
* xxx.xxx.xxx.xxx7 xxx-data07
* xxx.xxx.xxx.xxx8 xxx-data08
* @param args
*/
public static void main(String[] args) {
// 禁止控制台输出一些 org.apache.kafka.xxx 相关的日志
LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();
loggerContext.getLogger("org.apache.kafka.clients.consumer.internals.ConsumerCoordinator").setLevel(Level.OFF);
loggerContext.getLogger("org.apache.kafka.clients.FetchSessionHandler").setLevel(Level.OFF);
loggerContext.getLogger("org.apache.kafka.clients.consumer.internals.Fetcher").setLevel(Level.OFF);
loggerContext.getLogger("org.apache.kafka.clients.consumer.internals.AbstractCoordinator").setLevel(Level.OFF);
loggerContext.getLogger("org.apache.kafka.clients.NetworkClient").setLevel(Level.OFF);
loggerContext.getLogger("org.apache.kafka.common.network.Selector").setLevel(Level.OFF);
loggerContext.getLogger("org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient").setLevel(Level.OFF);
loggerContext.getLogger("org.apache.kafka.clients.Metadata").setLevel(Level.OFF);
loggerContext.getLogger("org.apache.kafka.clients.consumer.KafkaConsumer").setLevel(Level.OFF);
loggerContext.getLogger("org.apache.kafka.common.utils.AppInfoParser").setLevel(Level.OFF);
loggerContext.getLogger("org.apache.kafka.clients.consumer.KafkaConsumer").setLevel(Level.OFF);
loggerContext.getLogger("org.apache.kafka.clients.consumer.ConsumerConfig").setLevel(Level.OFF);
properties.put("bootstrap.servers","127.0.0.1:9192,127.0.0.1:9192,127.0.0.1:9192"); // 指定 Broker
properties.put("group.id", "11111111111111111111111"); // 指定消费组群 ID,为防止自己启动拉取消息导致其他生产环境的消费者无法消费该消息,请设置一个绝对不重复的值,以起到隔离的作用
properties.put("max.poll.records", "1000");
// todo 设置可批量拉取???
properties.put("enable.auto.commit", "false");
properties.put("key.deserializer", StringDeserializer.class); // 将 key 的字节数组转成 Java 对象
properties.put("value.deserializer", StringDeserializer.class); // 将 value 的字节数组转成 Java 对象
kafkaConsumer = new KafkaConsumer<String, String>(properties);
// List<String> topics = queryAllTopics( consumer );
kafkaConsumer.subscribe( Collections.singletonList( "ods_carbon_rfid_device_record" ) ); // 订阅主题 order-events
new Thread(new Runnable() {
@Override
public void run() {
receiveMessage();
}
}).start();
}
/**
* 查询全部的主题(topic)列表
* @param kafkaConsumer
* @return
*/
private static List<String> queryAllTopics(KafkaConsumer kafkaConsumer) {
if( kafkaConsumer == null ){
return null;
}
Map<String, List<PartitionInfo>> map = kafkaConsumer.listTopics();
if( map == null ){
return null;
}
return new ArrayList<String>( map.keySet() );
}
public static void receiveMessage() {
try {
while ( true ){
synchronized (KafkaConsumerDemo.class) {
// ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
// 30L 表示超时时间为 30秒,有消息立即返回,没消息最多等 30 秒后返回
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
ConsumerRecords<String,String> records = kafkaConsumer.poll(Duration.ofSeconds(30L));
String date = sdf.format(new Date());
if( records == null ){
System.out.println( date + " 本次未拉取到任何消息" );
}else {
System.out.println( date + " 本次拉取到 " + records.count() + " 条消息" );
int i = 1;
for (ConsumerRecord<String,String> record: records) {
String info = String.format("[Topic: %s][Partition:%d][Offset:%d][Key:%s][Message:%s]", record.topic(), record.partition(), record.offset(), record.key(), record.value());
System.out.println( "第" + i + "条消息:" + info );
i++;
}
kafkaConsumer.commitSync();
}
/**
* 当你用 KafkaConsumer从Kafka里读取消息并且处理完后,commitSync 方法会帮你把这些消息的处理进度(也就是偏移量 offset )同步地告诉 Kafka 服务器。
* 这样,Kafka 就知道你已经处理到哪儿了。如果消费者(也就是读取消息的程序)突然崩溃或者重启,Kafka 就能根据最后一次提交的偏移量,让你从上一次处理
* 完的地方继续开始,而不会漏掉或者重复处理消息。
* 简单来说,commitSync 方 法就是用来“保存进度”的,确保消息处理的可靠性和顺序性。
*/
// Thread.sleep( 5000L );
}
}
} catch (Exception e){
e.printStackTrace();
} finally {
kafkaConsumer.close();
}
}
}