Spring Boot 集成Kafka简单应用

news2024/9/20 20:51:58

说明:当前kafka的版本为2.13-2.8.1,Spring Boot的版本为2.7.6

第一步:在pom.xml中引入下述依赖

<dependency>
	<groupId>org.springframework.kafka</groupId>
	<artifactId>spring-kafka</artifactId>
	<version>2.8.11</version>
</dependency>

第二步:在yml配置文件进行如下配置

spring:
  kafka:
    # kafka服务的地址
    bootstrap-servers: 127.0.0.1:9092
    producer:
      # key-value序列化
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      # 设置默认的消费者所属组id 
      group-id: 0
      # key-value反序列化  
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

第三步:创建生产者

package com.example.demo.kafka;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@Slf4j
@RestController
@RequestMapping("kafka")
public class KafkaProducer {

    // 自定义topic
    public static final String TOPIC_NAME="topic.one";

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @RequestMapping("send")
    public String send(@RequestParam("msg")String msg) {
        log.info("准备发送消息为:{}",msg);
        // 1.发送消息
        ListenableFuture<SendResult<String,String>> future=kafkaTemplate.send(TOPIC_NAME,msg);
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onFailure(Throwable throwable) {
                // 2.发送失败的处理
                log.error("生产者 发送消息失败:"+throwable.getMessage());
            }
            @Override
            public void onSuccess(SendResult<String, String> stringObjectSendResult) {
                // 3.发送成功的处理
                log.info("生产者 发送消息成功:"+stringObjectSendResult.toString());
            }
        });
        return "接口调用成功";
    }
}

注意:当kafka中没有名为topic.one的主题时程序会自动创建,其实这里的主题就相当于rabbitmq中的队列。

第四步:创建消费者

package com.example.demo.kafka;

import java.util.Optional;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class KafkaConsumer {

    // 自定义topic
    public static final String TOPIC_NAME="topic.one";

    @KafkaListener(topics = TOPIC_NAME, groupId = "ONE")
    public void topic_one(ConsumerRecord<?, ?> record, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
        Optional message = Optional.ofNullable(record.value());
        if (message.isPresent()) {
            Object msg = message.get();
            log.info("消费者One消费了: +++++++++++++++ Topic:" + topic + ",Record:" + record + ",Message:" + msg);
        }
    }

    @KafkaListener(topics = TOPIC_NAME, groupId = "TWO")
    public void topic_two(ConsumerRecord<?, ?> record, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
        Optional message = Optional.ofNullable(record.value());
        if (message.isPresent()) {
            Object msg = message.get();
            log.info("消费者TwO消费了: +++++++++++++++ Topic:" + topic + ",Record:" + record + ",Message:" + msg);
        }
    }
}

第五步:启动项目,在postman中访问请求下述接口

http://127.0.0.1:8080/kafka/send?msg=测试数据

项目启动后首次调用该接口控制台会输出如下日志:

2023-05-31 22:09:50.935  INFO 20036 --- [nio-8080-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring DispatcherServlet 'dispatcherServlet'
2023-05-31 22:09:50.935  INFO 20036 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet        : Initializing Servlet 'dispatcherServlet'
2023-05-31 22:09:50.936  INFO 20036 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet        : Completed initialization in 1 ms
2023-05-31 22:09:50.963  INFO 20036 --- [nio-8080-exec-1] com.example.demo.kafka.KafkaProducer     : 准备发送消息为:测试数据
2023-05-31 22:09:50.966  INFO 20036 --- [nio-8080-exec-1] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
	acks = -1
	batch.size = 16384
	bootstrap.servers = [127.0.0.1:9092]
	buffer.memory = 33554432
	client.dns.lookup = use_all_dns_ips
	client.id = producer-1
	compression.type = none
	connections.max.idle.ms = 540000
	delivery.timeout.ms = 120000
	enable.idempotence = true
	interceptor.classes = []
	key.serializer = class org.apache.kafka.common.serialization.StringSerializer
	linger.ms = 0
	max.block.ms = 60000
	max.in.flight.requests.per.connection = 5
	max.request.size = 1048576
	metadata.max.age.ms = 300000
	metadata.max.idle.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
	receive.buffer.bytes = 32768
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retries = 2147483647
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.connect.timeout.ms = null
	sasl.login.read.timeout.ms = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.login.retry.backoff.max.ms = 10000
	sasl.login.retry.backoff.ms = 100
	sasl.mechanism = GSSAPI
	sasl.oauthbearer.clock.skew.seconds = 30
	sasl.oauthbearer.expected.audience = null
	sasl.oauthbearer.expected.issuer = null
	sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
	sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
	sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
	sasl.oauthbearer.jwks.endpoint.url = null
	sasl.oauthbearer.scope.claim.name = scope
	sasl.oauthbearer.sub.claim.name = sub
	sasl.oauthbearer.token.endpoint.url = null
	security.protocol = PLAINTEXT
	security.providers = null
	send.buffer.bytes = 131072
	socket.connection.setup.timeout.max.ms = 30000
	socket.connection.setup.timeout.ms = 10000
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2]
	ssl.endpoint.identification.algorithm = https
	ssl.engine.factory.class = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.certificate.chain = null
	ssl.keystore.key = null
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLSv1.2
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.certificates = null
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	transaction.timeout.ms = 60000
	transactional.id = null
	value.serializer = class org.apache.kafka.common.serialization.StringSerializer

2023-05-31 22:09:50.973  INFO 20036 --- [nio-8080-exec-1] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-1] Instantiated an idempotent producer.
2023-05-31 22:09:50.982  INFO 20036 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 3.1.2
2023-05-31 22:09:50.982  INFO 20036 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: f8c67dc3ae0a3265
2023-05-31 22:09:50.982  INFO 20036 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1685542190982
2023-05-31 22:09:50.991  INFO 20036 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-1] Resetting the last seen epoch of partition topic.one-0 to 0 since the associated topicId changed from null to A8Lfh0GZQ3GR6fUo-OvcAQ
2023-05-31 22:09:50.991  INFO 20036 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-1] Cluster ID: 0YMM4442QOyhnWsg0zfjig
2023-05-31 22:09:50.992  INFO 20036 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-1] ProducerId set to 4 with epoch 0
2023-05-31 22:09:51.020  INFO 20036 --- [ad | producer-1] com.example.demo.kafka.KafkaProducer     : 生产者 发送消息成功:SendResult [producerRecord=ProducerRecord(topic=topic.one, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=null, value=测试数据, timestamp=null), recordMetadata=topic.one-0@7]
2023-05-31 22:09:51.025  INFO 20036 --- [ntainer#1-0-C-1] com.example.demo.kafka.KafkaConsumer     : 消费者TwO消费了: +++++++++++++++ Topic:topic.one,Record:ConsumerRecord(topic = topic.one, partition = 0, leaderEpoch = 0, offset = 7, CreateTime = 1685542190992, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = 测试数据),Message:测试数据
2023-05-31 22:09:51.025  INFO 20036 --- [ntainer#0-0-C-1] com.example.demo.kafka.KafkaConsumer     : 消费者One消费了: +++++++++++++++ Topic:topic.one,Record:ConsumerRecord(topic = topic.one, partition = 0, leaderEpoch = 0, offset = 7, CreateTime = 1685542190992, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = 测试数据),Message:测试数据

第二次调用该接口,其输出日志则比较简洁:

2023-05-31 22:11:13.130  INFO 20036 --- [nio-8080-exec-3] com.example.demo.kafka.KafkaProducer     : 准备发送消息为:测试数据
2023-05-31 22:11:13.135  INFO 20036 --- [ad | producer-1] com.example.demo.kafka.KafkaProducer     : 生产者 发送消息成功:SendResult [producerRecord=ProducerRecord(topic=topic.one, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=null, value=测试数据, timestamp=null), recordMetadata=topic.one-0@8]
2023-05-31 22:11:13.144  INFO 20036 --- [ntainer#0-0-C-1] com.example.demo.kafka.KafkaConsumer     : 消费者One消费了: +++++++++++++++ Topic:topic.one,Record:ConsumerRecord(topic = topic.one, partition = 0, leaderEpoch = 0, offset = 8, CreateTime = 1685542273130, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = 测试数据),Message:测试数据
2023-05-31 22:11:13.144  INFO 20036 --- [ntainer#1-0-C-1] com.example.demo.kafka.KafkaConsumer     : 消费者TwO消费了: +++++++++++++++ Topic:topic.one,Record:ConsumerRecord(topic = topic.one, partition = 0, leaderEpoch = 0, offset = 8, CreateTime = 1685542273130, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = 测试数据),Message:测试数据

如上面代码所示,程序创建了两个消息消费者,每个消费者都监听来自主题topic_one的消息,唯一不同的是两个消费者分别属于不同的组别,即groupId设置不同。由此可见一条消息分别被两个不同的消费者消费,由此达到了广播的效果。

如果不需要每个消费者都读取到重复的消息该如何配置?

答案:采用相同消费组唯一消费模式

package com.example.demo.kafka;

import java.util.Optional;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class KafkaConsumer {

    // 自定义topic
    public static final String TOPIC_NAME="topic.one";

    @KafkaListener(topics = TOPIC_NAME, groupId = "ONE")
    public void topic_one(ConsumerRecord<?, ?> record, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
        Optional message = Optional.ofNullable(record.value());
        if (message.isPresent()) {
            Object msg = message.get();
            log.info("消费者One消费了: +++++++++++++++ Topic:" + topic + ",Record:" + record + ",Message:" + msg);
        }
    }

    @KafkaListener(topics = TOPIC_NAME, groupId = "ONE")
    public void topic_two(ConsumerRecord<?, ?> record, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
        Optional message = Optional.ofNullable(record.value());
        if (message.isPresent()) {
            Object msg = message.get();
            log.info("消费者TwO消费了: +++++++++++++++ Topic:" + topic + ",Record:" + record + ",Message:" + msg);
        }
    }
}

上述代码与前段代码唯一的区别在于,两个消费者的组别设置成了一样的。再次调用上述接口,查看程序日志如下:

2023-05-31 22:19:31.953  INFO 17084 --- [nio-8080-exec-5] com.example.demo.kafka.KafkaProducer     : 准备发送消息为:测试数据
2023-05-31 22:19:31.956  INFO 17084 --- [ad | producer-1] com.example.demo.kafka.KafkaProducer     : 生产者 发送消息成功:SendResult [producerRecord=ProducerRecord(topic=topic.one, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=null, value=测试数据, timestamp=null), recordMetadata=topic.one-0@11]
2023-05-31 22:19:31.957  INFO 17084 --- [ntainer#0-0-C-1] com.example.demo.kafka.KafkaConsumer     : 消费者One消费了: +++++++++++++++ Topic:topic.one,Record:ConsumerRecord(topic = topic.one, partition = 0, leaderEpoch = 0, offset = 11, CreateTime = 1685542771953, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = 测试数据),Message:测试数据

从日志可以看出无论访问多少次,只会有一个消费者消费了消息,因此达到了不重复消费的目的。 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/594791.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Android之解决RecyclerView与NestedScrollView的滑动冲突方法

1、解决RecyclerView与NestedScrollView的滑动冲突 问题一&#xff1a;当我们滑动RecyclerView组件时&#xff0c;上方的轮播图并没有进行滑动&#xff08;NestedScrollView没有滑动&#xff0c;即滑动事件被RecyclerView消费了&#xff09;&#xff0c;当RecyclerView滑到底时…

Nginx的搭建和优化

目录 一. Nginx简介1.1 概述1.2 Nginx和Apache优缺点比较 二. Nginx编译安装步骤1 关闭防火墙&#xff0c;安装依赖包2. 创建运行用户和组3. 编译安装 Nginx4. 做软连接并启动nginx5. 停止Nginx6. 添加nginx系统服务方法一&#xff1a;编写脚本方法二 将nginx命令加入服务 7. 查…

高完整性系统工程(四):Formal Verification and Validation

目录 1. Specification Process 1.1 State Invariants 1.2 Exceptional Behaviour 1.3 Framing 1.4 Summary 2. V&V FOR SPECS 2.1 V&V for formal specs 2.2 Proof 2.3 Proof Assistants 2.4 Model Checking 1. Specification Process Specification Proces…

1128 N Queens Puzzle(21行代码)

分数 20 全屏浏览题目 切换布局 作者 CHEN, Yue 单位 浙江大学 The "eight queens puzzle" is the problem of placing eight chess queens on an 88 chessboard so that no two queens threaten each other. Thus, a solution requires that no two queens sha…

QTranslator Class

QTranslator Class QTranslator 类公共成员函数类说明查找翻译使用多种翻译成员函数使用说明 QTranslator 类 QTranslator类为文本输出提供国际化支持。多国语言 Header: #include <QTranslator> qmake: QT core Inherits: QObject公共成员函数 构造函数QTranslator(…

代码随想录算法训练营15期 Day 7 | 454.四数相加II 、 383. 赎金信 、15. 三数之和 、18. 四数之和

昨天看了一下别的东西&#xff0c;导致昨天没有练习打卡&#xff0c;今天补上昨天的学习知识。 454.四数相加II 建议&#xff1a;本题是 使用map 巧妙解决的问题&#xff0c;好好体会一下 哈希法 如何提高程序执行效率&#xff0c;降低时间复杂度&#xff0c;当然使用哈希法 会…

AURIX TC3XX Cached PFLASH与Non-Cached PFLASH的区别

Cached ? Non-Cached&#xff1f; 在阅读TC3XX的用户手册时&#xff0c;在内存映射表中&#xff0c;有两个segment都是Program Flash&#xff0c;而且大小都一样是3M&#xff0c;一个是segment 8 另一个是segment10 这难免让人产生疑惑&#xff0c;二者区别在哪&#xff1f; …

高程实验 二分算法

学校的ppt把相等也考虑到大于上面去了&#xff0c;所以是错误的 1. (程序题) 有n(1<n<1000005)个整数&#xff0c;已经按照从小到大顺序排列好&#xff0c;现在另外给一个整数x&#xff0c;请找出序列中第1个大于x的数的下标&#xff01; 输入&#xff1a; 输入数据包含多…

4. 垃圾收集器ParNewCMS底层三色标记算法详解

JVM性能调优 1. 垃圾收集算法1.1 分代收集理论1.2 标记-复制算法1.3 标记-清除算法1.4 标记-整理算法 2. 垃圾收集器2.1 Serial收集器(-XX:UseSerialGC -XX:UseSerialOldGC)2.2 Parallel Scavenge收集器(-XX:UseParallelGC(年轻代)&#xff0c;-XX:UseParallelOldGC(老年代))2.…

浅谈MySQL主键

常用主键 常用主键 1&#xff09;自增 int、bigint等&#xff0c;顺序递增。 2&#xff09;雪花 雪花算法是因为有时间参数&#xff0c;所以是有序地&#xff0c;而且都是由数字组成。雪花id最大为64位,符合java中long的长度64位&#xff0c;适用于大规模分布式场景。 3&#…

docker基础操作与进阶 - 搭建基于pm2的node环境

1、为什么要使用docker 最近遇到一台机器需要部署两个不同版本node的情况&#xff0c;首先就想起了docker&#xff0c;想必还有其他类似环境问题的情况&#xff0c;需要进行项目隔离&#xff0c;而docker正是用来解决这个问题的。 docker的优势就在于环境隔离&#xff0c;相当…

第九篇、基于Arduino uno,用LCD1602(带IIC的)显示屏显示字符——结果导向

0、结果 说明&#xff1a;可以在LCD1602屏幕上面显示字符&#xff0c;实时的变量&#xff0c;也可以设置是否背光&#xff0c;如果是你想要的&#xff0c;可以接着往下看。 1、外观 说明&#xff1a;注意是带IIC通讯的LCD屏幕&#xff0c;外形如下。 2、连线 说明&#xff…

Hexo写文章不方便?用上GitHub Actions真的是泰裤辣

对于做个人博客的小伙伴来说 HEXO 大家肯定都是非常熟悉的吧,这是一个静态的个人博客程序,通过 HEXO + GitHub Pages 搭建免费个人博客也是很多博主的选择。但相信肯定也会有些困恼,比如博客的渲染维护太麻烦了,我要在一台新设备上写博客并推送到 GitHub Pages 还要先安装 …

【Redis】聊一下哨兵集群

上一篇中&#xff0c;介绍了哨兵机制可以减少主库实例下线的误判率&#xff0c;但是如果只有一个哨兵实例&#xff0c;出现宕机后没有办法保证服务的高可用&#xff0c;所以一般实际的生产环境都是搭建3个哨兵实例构建的集群进行运行。但是具体的运行机制是什么。其实主要就是三…

使用langchain及llama_index实现基于文档(长文本)的相似查询与询问

文章目录 1. 引言2. 简介3. 带关键字的查询方案4. 不带关键字的总结询问5. 实现代码 1. 引言 在调用ChatGPT接口时&#xff0c;我们常常受到4096个字符&#xff08;token&#xff09;的限制。这种限制对于处理长文本或者需要对文档进行相似查询和询问的场景来说是一个挑战。然…

如何复制投票链接投票怎样链接到公众号投票链接如何生成

关于微信投票&#xff0c;我们现在用的最多的就是小程序投票&#xff0c;今天的网络投票&#xff0c;在这里会教大家如何用“活动星投票”小程序来进行投票。 我们现在要以“妙趣拾光”为主题进行一次投票活动&#xff0c;我们可以在在微信小程序搜索&#xff0c;“活动星投票”…

【Python开发】FastAPI 03:请求参数—请求体

除了路径参数和查询参数&#xff0c;还有请求体&#xff0c;其用于传递 JSON、XML 或其他格式的数据&#xff0c;以便服务器能够读取并做出相应的处理&#xff0c;可以说请求体的作用更为强大。试想一下&#xff0c;如果存在七八个参数&#xff0c;路径参数和查询是不是就招架不…

Camera | 10.linux驱动 led架构-基于rk3568

前面文章我们简单给大家介绍了如何移植闪光灯芯片sgm3141&#xff0c;该驱动依赖了led子系统和v4l2子系统。 V4L2可以参考前面camera系列文章&#xff0c;本文主要讲述led子系统。 一、LED子系统框架 Linux内核的 led 子系统主要功能&#xff1a; 为每个设备在/sys/class/le…

《Linux内核源码分析》(2)进程原理及系统调用

《Linux内核源码分析》(2)进程原理及系统调用 一、进程 操作系统的作用&#xff1a;作为硬件的使用层&#xff0c;提供使用硬件资源的能力&#xff0c; 进程的作用&#xff1a;作为操作系统使用层&#xff0c;提供使用操作系统抽象出的资源层的能力 进程、线程和程序的区别&…

【计算机视觉】Segment Anything 安装配置及代码测试(含源代码)

文章目录 一、前言二、安装2.1 基本要求2.2 Install Segment Anything 三、代码使用示例3.1 Automatically generating object masks with SAM3.2 Environment Set-up3.3 显示标注3.4 图像示例3.5 Automatic mask generation3.6 Automatic mask generation options 一、前言 目…