Kafka在Java项目中的应用

news2025/1/14 1:10:31

Kafka在Java项目中的应用

Docker 安装Kafka

一.首先需要安装docker,可看这篇文章安装docker

二.拉取zookeeper和KafKa镜像

docker pull wurstmeister/zookeeper

docker pull wurstmeister/kafka

Kafka组件需要向zookeeper进行注册,所以也需要安装zookeeper

三.启动zookeeper、kafka组件

docker run -d --name zookeeper -p 2181:2181 wurstmeister/zookeeper
 
docker run -d --name kafka --publish 9092:9092 --link zookeeper --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --env KAFKA_ADVERTISED_HOST_NAME=localhost --env KAFKA_ADVERTISED_PORT=9092 wurstmeister/kafka

启动成功界面如下,status即为running(运行中)
在这里插入图片描述

四.创建Springboot项目

4.1 添加依赖

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

4.2 application.yml文件

server:
  port: 9090
spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      # 配置消费者消息offset是否自动重置(消费者重连会能够接收最开始的消息)
      auto-offset-reset: earliest
    producer:
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      retries: 3  #  重试次数
kafka:
  topic:
    my-topic: my-topic
    my-topic2: my-topic2

4.3 创建实体类Book

public class Book {
    private Long id;
    private String name;

    public Book() {
    }

    public Book(Long id, String name) {
        this.id = id;
        this.name = name;
    }

    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    @Override
    public String toString() {
        return "Book{" +
                "id=" + id +
                ", name='" + name + '\'' +
                '}';
    }
}

4.4 配置KafKa信息

@Configuration
public class KafkaConfig {

    @Value("${kafka.topic.my-topic}")
    String myTopic;
    @Value("${kafka.topic.my-topic2}")
    String myTopic2;

    /**
     * JSON消息转换器
     */
    @Bean
    public RecordMessageConverter jsonConverter() {
        return new StringJsonMessageConverter();
    }

    /**
     * 通过注入一个 NewTopic 类型的 Bean 来创建 topic,如果 topic 已存在,则会忽略。
     */
    @Bean
    public NewTopic myTopic() {
        return new NewTopic(myTopic, 2, (short) 1);
    }

    @Bean
    public NewTopic myTopic2() {
        return new NewTopic(myTopic2, 1, (short) 1);
    }
}

4.5 controller代码

@RestController
@RequestMapping(value = "/book")
public class BookController {
    @Value("${kafka.topic.my-topic}")
    String myTopic;
    @Value("${kafka.topic.my-topic2}")
    String myTopic2;
    BookProducerService producer;
    private AtomicLong atomicLong = new AtomicLong();

    BookController(BookProducerService producer) {
        this.producer = producer;
    }

    @GetMapping("/send")
    public String sendMessageToKafkaTopic(@RequestParam("name") String name) {

        this.producer.sendMessage(myTopic, new Book(atomicLong.addAndGet(1), name));

        this.producer.sendMessage(myTopic2, new Book(atomicLong.addAndGet(1), name));

        return name+" : 消息已经发送!";
    }
}

4.6 book 的生成者业务

@Service
public class BookProducerService {

    private static final Logger logger = LoggerFactory.getLogger(BookProducerService.class);

    private final KafkaTemplate<String, Object> kafkaTemplate;//通过构造方法进行注入

    public BookProducerService(KafkaTemplate<String, Object> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendMessage(String topic, Object o) {
        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, o);

        future.addCallback(
                result -> logger.info("生产者成功发送消息到topic:{} partition:{}的消息",
                        result.getRecordMetadata().topic(),
                        result.getRecordMetadata().partition()),
                ex -> logger.error("生产者发送消失败,原因:{}", ex.getMessage()));
    }

}

4.7 book的消费者业务

@Service
public class BookConsumerService {

    @Value("${kafka.topic.my-topic}")
    private String myTopic;
    @Value("${kafka.topic.my-topic2}")
    private String myTopic2;
    private final Logger logger = LoggerFactory.getLogger(BookProducerService.class);
    private final ObjectMapper objectMapper = new ObjectMapper();


    @KafkaListener(topics = {"${kafka.topic.my-topic}"}, groupId = "group1")
    public void consumeMessage(ConsumerRecord<String, String> bookConsumerRecord) {
        try {
            Book book = objectMapper.readValue(bookConsumerRecord.value(), Book.class);
            logger.info("消费者消费topic:{} partition:{}的消息 -> {}", bookConsumerRecord.topic(), bookConsumerRecord.partition(), book.toString());
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
    }

    @KafkaListener(topics = {"${kafka.topic.my-topic2}"}, groupId = "group2")
    public void consumeMessage2(Book book,ConsumerRecord<String,String> bookConsumerRecord) throws JsonProcessingException {
        Book value = objectMapper.readValue(bookConsumerRecord.value(), Book.class);
        logger.info("消费者消费topic:{} partition:{}的消息 -> {}", bookConsumerRecord.topic(), bookConsumerRecord.partition(), value.toString());
        logger.info("消费者消费{}的消息 -> {}", myTopic2, book.toString());
    }
}

代码整体目录如下

在这里插入图片描述

4.8 启动成功界面

在这里插入图片描述

4.9 浏览器访问

在这里插入图片描述

4.10 控制台显示

在这里插入图片描述

至此.基于KafKa的Springboot项目简单应用已经完成,后续需要对Kafka进行更深的学习以及应用!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/545022.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

被00后卷的油尽灯枯了...

内卷的来源 内卷最早的“出处”是几张名校学霸的图片。 大学生们刷爆朋友圈的几张“内卷”图片是这样的&#xff1a;有的人骑在自行车上看书&#xff0c;有的人宿舍床上铺满了一摞摞的书&#xff0c;有的人甚至边骑车边端着电脑写论文。这些图片最早在清华北大的学霸之间流传。…

手机APP性能测试工具PerfDog安装教程及简单使用

一、下载PerfDog PerfDog下载安装传送带&#xff1a;PerfDog | 全平台性能测试分析专家 点击下载对应系统版本&#xff0c;Darren这里下载的是windows版本&#xff0c;苹果电脑可下Mac OS版本。 二、解压文件包 我们在想要存放PerfDog的文件路径先建立一个文件夹&#xff08;方…

远程桌面连接工具在哪里下载?

在市场上&#xff0c;有很多种不同的工具可用。一些远程桌面连接工具&#xff08;如RayLink&#xff09;具有高清流畅、操作简单和连接速度快的特点。而其他一些连接工具则更注重保护安全和数据保密性。不同的远程桌面连接工具各有特点&#xff0c;需要根据不同的需求进行选择。…

Linux将新硬盘挂载到指定目录

MBR分区最大2T&#xff0c;超过2T需用parted工具进行GPT分区 一、fdisk工具进行MBR分区&#xff08;2T以下&#xff09; 1. 查看当前所有硬盘 fdisk -l可以看到未分区的新硬盘/dev/sdd 2. 对新硬盘进行分区 fdisk /dev/sdd输入“n”&#xff0c;按“Enter”&#xff0c;开…

实验2 指令调度和延迟分支【计算机系统结构】

实验2 指令调度和延迟分支【计算机系统结构】 前言推荐代理服务器拒绝访问解决实验2 指令调度和延迟分支1 实验目的2 实验平台3 实验内容和步骤&#xff08;1&#xff09;启动MIPSsim。&#xff08;2&#xff09;进一步理解流水段的构成和各个流水寄存器的功能。&#xff08;3&…

降低FP独立站跳出率的5个小技巧,聪明人已经学起来了

做独立站特别是fp独立站的商家们&#xff0c;要想提高客户的体验感、达到更高的转化率&#xff0c;就要研究一下独立站的跳出率。跳出率可以衡量一个独立站的消费体验&#xff0c;同样也能衡量流量在独立站转化中的重要节点。消费者体验好、跳出率低的独立站才能够留住消费者&a…

机器学习算法应用实战笔记分享

来源&#xff1a;投稿 作者&#xff1a;小灰灰 编辑&#xff1a;学姐 整体代码请参考深度之眼的《机器学习算法应用实战》 视频链接&#xff1a;https://ai.deepshare.net/p/t_pc/goods_pc_detail/goods_detail/p_5e12aa8734510_IpNUGv5w 1.无监督学习方法---主成分分析 主成…

离散数学 | 图论 | 欧拉图 | 哈密顿图 | 割点 | 桥

本文主要解决以下几个问题&#xff1a; 1.欧拉图能不能有割点&#xff0c;能不能有桥&#xff1f; 2.哈密顿图能不能有割点&#xff0c;能不能有桥&#xff1f; 首先我们要明白几个定义 割点的定义就是在一个图G中&#xff0c;它本来是连通的&#xff0c;去掉一个点v以后这个…

前几天面了个30岁的测试员,年薪50w问题基本都能回答上,应该刷了不少八股文···

互联网行业竞争是一年比一年严峻&#xff0c;作为测试工程师的我们唯有不停地学习&#xff0c;不断的提升自己才能保证自己的核心竞争力从而拿到更好的薪水&#xff0c;进入心仪的企业&#xff08;阿里、字节、美团、腾讯等大厂.....&#xff09; 所以&#xff0c;大家就迎来了…

java 通过注解实现数据动态脱敏

一、为什么要数据脱敏&#xff1f; 数据脱敏是指对某些敏感信息通过脱敏规则进行数据的变形&#xff0c;实现敏感隐私数据的可靠保护。在涉及客户安全数据或者一些商业性敏感数据的情况下&#xff0c;在不违反系统规则条件下&#xff0c;对真实数据进行改造并提供测试使用&…

10个高质量的简历制作网站推荐

刚经历完有金三银四&#xff0c;有没有因为简历不行&#xff0c;面试少的可怜的同学。 今天推荐10个高质量的简历制作网站&#xff0c;包括可以在线免费生成设计简历的网站。 1.即时设计资源社区 即时设计是国内首款专业级的 UI 设计工具&#xff0c;像 PC 端的网页&#xf…

ThingsBoard教程(四十六):规则节点解析 延迟节点 Delay Node。生成节点 Generator Node, 日志节点 Log Node

延迟节点 Delay Node Since TB Version 2.1 延迟接收消息的时间段可进行配置。 配置如下: Period in seconds - 指定暂停接收消息的时间段。 Maximum pending messages - 指定最大允许挂起消息的数量(即被暂停的消息队列)。 当特定传入消息的延迟期达到后,该消息将从…

MATLAB第九章_数据图形可视化

目录 数据图形可视化 MATLAB图形窗口 函数绘制 一元函数绘制 二元函数绘图 数据图形绘制简介 离散数据可视化 连续函数可视化 二维绘图函数 基本绘图 快速方程式画图 特殊二维图形 三维绘图函数 绘制三维曲面 生成栅格数据 网格曲线绘制 隐藏线的显示和关闭 数据…

51单片机串口通信

串口通信 1、通信的基本概念2、 51单片机串口介绍2.1、串口通信简介2.2、串口内部结构2.3、串口通信寄存器SCONPCON 2.4、串口工作方式 3、串口使用方法3.1、硬件3.2、通信协议RS2323.3、串口初始化 4、硬件设计5、 软件设计6、拓展 串口的通信&#xff0c;一般是检测通信模块是…

网络安全专业应该从事哪个方向前景比较好

目前网络安全行业包括很多领域&#xff0c;例如网络安全基础、网络安全运维、应用安全、云安全、算法与数据安全、区块链安全、工业控制系统安全和人工智能安全等方向&#xff0c;各个方向的前景都比较好&#xff0c;关键在于选择适合自己并且感兴趣的方向。同时&#xff0c;也…

OSI体系结构7层,5层,4层协议+负载均衡

上图分别是7&#xff0c;4&#xff0c;5层协议 Q&#xff1a;为什么分4层和7层&#xff0c;目的&#xff1f; 7层&#xff1a;OSI参考模型是一种理论模型&#xff0c;旨在描述计算机网络的功能和架构。它提供了一种通用的框架&#xff0c;用于理解和设计网络协议和系统。然而&…

3.6万亿token、3400亿参数,谷歌大模型PaLM 2细节遭曝光

来源 | 机器之心 ID | almosthuman2014 谷歌内部文件又泄露了&#xff0c;这次是谷歌新一代大模型 PaLM 2 的训练细节&#xff1a;训练数据量是前代的近 5 倍、参数量是前代的三分之二左右。 上周四&#xff0c;在 2023 谷歌 I/O 大会上&#xff0c;谷歌 CEO 皮查伊宣布推出对…

埃尔德动力系统指标公式,衡量趋势的惯性和能量

亚历山大埃尔德(Alexander Elder)在其经典著作《以交易为生》&#xff08;原书第2版&#xff09;新增了一个工具——动力系统(Impulse System)&#xff0c;不过书中只介绍了动力系统的指标以及使用方法&#xff0c;并没有介绍系统的参数。其实动力系统在埃尔德2002年出版的《走…

【数据分析之道-基础知识(十一)】面向对象

文章目录 专栏导读1、简介2、类与对象3、属性和方法4、继承5、多态 专栏导读 ✍ 作者简介&#xff1a;i阿极&#xff0c;CSDN Python领域新星创作者&#xff0c;专注于分享python领域知识。 ✍ 本文录入于《数据分析之道》&#xff0c;本专栏针对大学生、初级数据分析工程师精心…

瑞吉外卖开发总结(全功能实现)

技术栈 项目部署 简历上可写的点 集中处理系统异常&#xff0c;自定义统一的错误码, 并封装了全局异常处理器&#xff0c;屏蔽了项目冗余的报错细节、便于接口调用方理解和统一处理。 基于静态ThreadLocal封装了线程隔离的全局上下文对象&#xff0c;便于在请求内部存取用户信…