【Kafka】2.在SpringBoot中使用官方原生java版Kafka客户端

news2024/12/23 10:36:01

目 录

    • 1. 新建一个消息生产者
    • 2. 新建一个消息消费者
    • 3. 测 试

在开始之前,需要先做点准备工作,用 IDEA 新建一个 Maven 项目,取名 kafka-study,然后删掉它的 src 目录,接着在 pom.xml 里面引入下面的依赖。这个项目的作用是作为整个工程的父项目,后面的项目都在此基础上新建 module 即可。

这里用到的环境信息如下:

  • JDK1.8
  • SpringBoot2.7.1
  • IDEA
  • Maven3.6.3
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <packaging>pom</packaging>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.1</version>
        <relativePath/>
    </parent>

    <groupId>org.yuhuofei</groupId>
    <artifactId>kafka-study</artifactId>
    <version>1.0-SNAPSHOT</version>

    <name>kafka-study</name>
    <description>Study project for Kafka</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

1. 新建一个消息生产者

使用 IDEA 在 kafka-study 的基础上新建一个 Maven 类型的 module ,命名为 producer-01,直至完成。接着,我们改造这个 module ,把它变成我们想要的消息生产者。

1、在 java 目录下,新建一个包 com.yuhuofei ,然后再创建一个启动类 ProducerApplication.java ,内容如下:

package com.yuhuofei;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class ProducerApplication {

    public static void main(String[] args) {
        SpringApplication.run(ProducerApplication.class, args);
    }

}

2、在 resources 目录下,新建配置文件 application.properties ,内容如下:

server.port=8080
server.servlet.context-path=/producer-01
#Kafka配置
topic.name=my-kafka-topic

3、在自己的 pom.xml 文件中,引入 Kafka 客户端的依赖。这里需要注意的是,由于我们在上一篇博客里安装的是 kafka_2.11-2.2.1 版本的服务端,所以引入的 Kafka 客户端的版本最好对应,也选择 2.2.1 版本的。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>kafka-study</artifactId>
        <groupId>org.yuhuofei</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>producer-01</artifactId>

    <dependencies>
        <!--官方原生的java版kafka客户端依赖-->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.2.1</version>
        </dependency>
        <!--fastjson依赖-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>2.0.11.graal</version>
        </dependency>
    </dependencies>

</project>

4、由于 Kafka 进行消息的发布或者消费是需要有 Topic 的,因此我们需要要创建一个 Topic。在 zookeeper 和 Kafka 服务端都启动的情况下,打开 D:\Kafka\kafka_2.11-2.2.1\bin\windows 文件夹,在地址栏输入 cmd,打开控制台,然后用下面的命令创建、查看、删除 topic 。

如下图所示,我已经创建了一个名字为 my-kafka-topic 的 topic ,这个 topic 的分区为 1,副本也是 1。

# 创建topic
kafka-topics.bat --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic my-kafka-topic
# 查询topic列表
kafka-topics.bat --zookeeper 127.0.0.1:2181 --list					
# 删除topic
kafka-topics.bat --delete --zookeeper 127.0.0.1:2181 --topic my-kafka-topic
# 查询某个topic信息
kafka-topics.bat --describe --zookeeper localhost:2181 --topic my-kafka-topic
#kafka查看topic数据内容
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic my-kafka-topic --from-beginning

在这里插入图片描述

5、实现生产者发送消息

  • controller层
package com.yuhuofei.controller;

import com.yuhuofei.service.ProducerInterface;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

/**
 * @Description
 * @ClassName ProducerController
 * @Author yuhuofei
 * @Date 2023/8/13 16:38
 * @Version 1.0
 */
@RequestMapping("/info")
@RestController
public class ProducerController {

    @Resource
    private ProducerInterface producerInterface;

    @GetMapping("/create-message")
    public void createTopic() {
        producerInterface.createMessage();
    }
}

  • service层
package com.yuhuofei.service;

/**
 * @Description
 * @InterfaceName ProducerController
 * @Author yuhuofei
 * @Date 2023/8/13 16:39
 * @Version 1.0
 */
public interface ProducerInterface {

    void createMessage();
}

  • serviceImpl层
package com.yuhuofei.service.impl;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.yuhuofei.service.ProducerInterface;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

/**
 * @Description
 * @ClassName ProducerInterfaceImpl
 * @Author yuhuofei
 * @Date 2023/8/13 16:39
 * @Version 1.0
 */
@Service
public class ProducerInterfaceImpl implements ProducerInterface {

    @Value("${topic.name}")
    private String topicName;

    @Resource
    private Producer producer;

    @Override
    public void createMessage() {
        //创建测试数据
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("word", "这是生产者生产的测试消息");
        String jsonString = JSON.toJSONString(jsonObject);
        //使用kafka发送消息
        ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topicName, jsonString);
        producer.send(producerRecord);
    }
}

  • 客户端配置类
package com.yuhuofei.config;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.Properties;

/**
 * @Description 配置生产者客户端参数
 * @ClassName KafkaProperties
 * @Author yuhuofei
 * @Date 2023/8/13 22:18
 * @Version 1.0
 */
@Configuration
public class KafkaConfig {

    @Bean("producer")
    public Producer<String, String> getKafkaProducer() {
        Properties properties = new Properties();
        //配置Kafka的服务端IP和端口
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        //指定key使用的序列化类
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //指定value使用的序列化类
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        return new KafkaProducer<>(properties);
    }
}

6、启动服务,调用接口 http://localhost:8080/producer-01/info/create-message ,然后在控制台用命令查看一下消息是否写入,如下图所见,虽然是乱码,但是已经可以看出消息成功写入到 Kafka

在这里插入图片描述
到这里,消息生产者的创建暂时告一段落。

2. 新建一个消息消费者

和前面建立生产者类似的步骤,只是文件信息和名字不一样,这里步骤就不重复了,直接给出相关的配置和类。这个消费者,取名 consumer-01 。

  • pom.xml文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>kafka-study</artifactId>
        <groupId>org.yuhuofei</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>consumer-01</artifactId>

    <dependencies>
        <!--官方原生的java版kafka客户端依赖-->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.2.1</version>
        </dependency>
        <!--fastjson依赖-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>2.0.11.graal</version>
        </dependency>
    </dependencies>
</project>
  • properties文件
server.port=8081
server.servlet.context-path=/consumer-01
#Kafka配置
topic.name=my-kafka-topic
  • 启动类 ConsumerApplication.java
package com.yuhuofei;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class ConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }

}

  • 控制层ConsumerController.java
package com.yuhuofei.controller;

import com.yuhuofei.service.ConsumerInterface;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

/**
 * @Description
 * @ClassName ConsumerController
 * @Author yuhuofei
 * @Date 2023/8/14 0:19
 * @Version 1.0
 */
@RestController
@RequestMapping("/consumer")
public class ConsumerController {

    @Resource
    private ConsumerInterface consumerInterface;

    @GetMapping("/getData")
    public void getData() {
        consumerInterface.getData();
    }
}
  • service层
package com.yuhuofei.service;

/**
 * @Description
 * @ClassName ConsumerInterface
 * @Author yuhuofei
 * @Date 2023/8/14 0:20
 * @Version 1.0
 */
public interface ConsumerInterface {
    void getData();
}

  • serviceImpl层
package com.yuhuofei.service.impl;

import com.yuhuofei.service.ConsumerInterface;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;

/**
 * @Description
 * @ClassName ConsumerInterfaceImpl
 * @Author yuhuofei
 * @Date 2023/8/14 0:20
 * @Version 1.0
 */
@Service
public class ConsumerInterfaceImpl implements ConsumerInterface {

    @Value("${topic.name}")
    private String topicName;

    @Resource
    private Consumer consumer;

    @Override
    public void getData() {
        Collection<String> topics = Collections.singletonList(topicName);
        consumer.subscribe(topics);
        List<String> result = new ArrayList<>();
        while (true) {
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord consumerRecord : consumerRecords) {
                //从ConsumerRecord中获取消费数据
                String res = (String) consumerRecord.value();
                System.out.println("从Kafka中消费的原始数据: " + res);
                result.add(res);
                System.out.println(result);
            }
        }
    }
}
  • config配置类
package com.yuhuofei.config;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.Properties;

/**
 * @Description 配置消费者参数
 * @ClassName KafkaConsumerConfig
 * @Author yuhuofei
 * @Date 2023/8/14 0:08
 * @Version 1.0
 */
@Configuration
public class KafkaConsumerConfig {

    @Bean("consumer")
    public Consumer<String, String> getKafkaConsumer() {
        Properties properties = new Properties();
        //配置Kafka的服务端IP和端口
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        //指定key使用的序列化类
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        //指定value使用的序列化类
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        //指定消费者组
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-01-group");
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,100);
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");
        return new KafkaConsumer<>(properties);
    }
}

最终整个项目目录如下图所示。

在这里插入图片描述

3. 测 试

1、检查 zookeeper 和 Kafka 服务端是否是启动的,没启动就先启动。

2、接着,启动消费者服务,然后调用一下接口 http://localhost:8081/consumer-01/consumer/getData ,这么做的目的启动一个线程,监听 topic 。

3、最后,启动生产者服务,调用接口 http://localhost:8080/producer-01/info/create-message ,向 Kafka 服务器写数据。

结果如下所示:

控制台看到的所有消息记录
这是所有发送的记录
生产者发送一条消息
在这里插入图片描述
消费者监听到一条消息并接收
在这里插入图片描述

在 SpringBoot 中,使用官方原生 java 版 Kafka 客户端的入门介绍到这里结束,还有更多使用方式,等待挖掘。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/874144.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

HCIP学习--BGP3

目录 前置内容 BGP下一跳的修改问题 BGP的属性 配置 PrefVal权重属性 负载分担 LocPrf 负载分担 NextHop AS-PATH Ogn 配置 MED 配置 BGP选路规则 BGP的社团属性 配置及解释 前置内容 HCIP学习--BGP1_板栗妖怪的博客-CSDN博客 HCIP学习--BGP2_板栗妖怪的博客…

Python 解析c文件并导出到Excel

文章目录 1. 目录结构&#xff1a;2.代码1. test.c2. write_excel.py3. cparser.py4. 模板.xlsx5. output.xlsx 脚本中主要使用 openpyxl cparser 库 1. 目录结构&#xff1a; ast.txt &#xff1a;存放解析 c 文件的语法树&#xff0c;便于查找内容cparser.py &#xff1a;解…

@Param详解

文章目录 背景什么是ParamParam的使用方法使用方法&#xff1a;遇到的问题及因Param解决了什么问题使用与不使用对比 Param是如何进行映射的总结 背景 最近在开发过程中&#xff0c;在写mapper接口是在参数前加了Param注解&#xff0c;但是在运行的时候就会报错&#xff0c;说…

Elasticsearch 8.X 复杂分词搞不定,怎么办?

1、实战问题 球友提问&#xff1a;我想停用所有纯数字的分词 &#xff0c; 官网上的这个方法好像对ik分词器无效&#xff01; 有没有什么别的方法啊&#xff0c; chart gpt 说分词可以用正则匹配 但是测试好像是不行的 我的es版本是 8.5.3。 2、进一步沟通后&#xff0c;得…

若依框架浅浅介绍

由若依官网所给介绍可知 1、文件结构介绍 在ruoyi-admin的pom.xml文件中引入了ruoyi-framework、ruoyi-quartz和ruoyi-generatior模块&#xff0c;在ruoyi-framework的pom.xml文件中引入了ruoyi-system模块。 2、技术栈介绍 前端&#xff1a;Vue、Element UI后端&#xff1a…

Netty:在一个ByteBuf中寻找另外一个ByteBuf出现的位置

说明 利用ByteBufUtil的indexOf(ByteBuf needle, ByteBuf haystack)函数可以在haystack中寻找needle出现的位置。如果没有找到&#xff0c;返回-1。 示例 在一个ByteBuf 中找到了另外一个ByteBuf package com.thb;import io.netty.buffer.ByteBuf; import io.netty.buffer.…

AUTOSAR规范与ECU软件开发(基础篇)2.5 AUTOSAR方法论

前言 AUTOSAR方法论(AUTOSAR Methodology) 中车用控制器软件的开发涉及系统级、 ECU级和软件组件级。 系统级主要考虑系统功能需求、 硬件资源、 系统约束, 然后建立系统架构; ECU级根据抽象后的信息对ECU进行配置; 系统级和ECU级设计的同时, 伴随着软件组件级的开发。 上…

python软件安装包百度云,python软件安装教程2020

大家好&#xff0c;小编为大家解答python软件安装在哪个盘比较好的问题。很多人还不知道python软件安装过程中,customize&#xff0c;现在让我们一起来看看吧&#xff01; 一&#xff1a;python安装 安装软件的路径中不允许出现中文、带空格的字符串、特殊符号、纯数字&#xf…

n-皇后问题

希望这篇题解对你有用&#xff0c;麻烦动动手指点个赞或关注&#xff0c;感谢您的关注 不清楚蓝桥杯考什么的点点下方&#x1f447; 考点秘籍 想背纯享模版的伙伴们点点下方&#x1f447; 蓝桥杯省一你一定不能错过的模板大全(第一期) 蓝桥杯省一你一定不能错过的模板大全…

面试热题(最大子数组和)

给你一个整数数组 nums &#xff0c;请你找出一个具有最大和的连续子数组&#xff08;子数组最少包含一个元素&#xff09;&#xff0c;返回其最大和。 子数组 是数组中的一个连续部分。 输入&#xff1a;nums [-2,1,-3,4,-1,2,1,-5,4] 输出&#xff1a;6 解释&#xff1a;连续…

Creo结构设计-创建ASM装配工程以及零件协调配合绘图

问题描述 在结构设计时&#xff0c;往往包含了多个组件&#xff0c;各个组件需要分开设计&#xff0c;但同时需要借鉴上一个模块的尺寸信息&#xff0c;如果创建多个零件&#xff0c;最后再组合那么会降低效率&#xff0c;那么有什么好的解决方式&#xff0c;能够再一个工程里…

代码随想录算法训练营之JAVA|第二十七天| 455. 分发饼干

今天是第27天刷leetcode&#xff0c;立个flag&#xff0c;打卡60天。 算法挑战链接 455. 分发饼干https://leetcode.cn/problems/assign-cookies/ 第一想法 题目理解&#xff1a;G个人分 S块饼干 要求饼干大于或者等于人的肚量。 第一想法&#xff1a;将人按照肚量从小到达…

PyQt5的信号与槽函数

目录 一、介绍 二、一个信号连接一个槽 三、一个信号连接多个槽 四、多个信号连接一个槽 五、自定义信号 1、创建自定义信号 2、让自定义信号携带值 一、介绍 在下图中 &#xff08;1&#xff09;widget就是PyQt中的控件对象。其实就是组件&#xff08;2&#xff09;…

教育行业文件协作的最佳实践分享!

在教育工作中&#xff0c;经常需要进行文件协作&#xff0c;无论是师生间还是老师与老师之间。目前最常用的文件协作方式就是通过社交工具或者邮件进行文件共享。 这种协作方式的缺点 1、大文件传输不便&#xff1a;这种协作方式依托于社交工具&#xff0c;对于大文件传输并不…

YOLOV8/YOLOv7/YOLOv5改进:引入GAMAttention注意力机制

为了提高各种计算机视觉任务的性能&#xff0c;人们研究了各种注意机制。然而&#xff0c;以往的方法忽略了保留通道和空间方面的信息以增强跨维度交互的重要性。因此&#xff0c;我们提出了一种全局调度机制&#xff0c;通过减少信息缩减和放大全局交互表示来提高深度神经网络…

6914. 翻倍以链表形式表示的数字

题目描述&#xff1a; 给你一个 非空 链表的头节点 head &#xff0c;表示一个不含前导零的非负数整数。 将链表 翻倍 后&#xff0c;返回头节点 head 。 示例&#xff1a; 解题思路&#xff1a; 先计算第一位是否超出位数&#xff0c;超出新建存储该数值&#xff0c;再逐个翻倍…

JZ34二叉树中和为某一值的路径

题目地址&#xff1a;二叉树中和为某一值的路径(二)_牛客题霸_牛客网 题目回顾&#xff1a; 解题思路&#xff1a; 这里求的是和为某一值的路径&#xff0c;要用dfs算法&#xff0c;也就是说这里使用深度优先搜索算法。 从根节点开始向左右子树进行递归操作&#xff0c;在递…

Baumer工业相机堡盟工业相机如何通过BGAPI SDK设置相机的固定帧率(C++)

Baumer工业相机堡盟工业相机如何通过BGAPI SDK设置相机的固定帧率&#xff08;C&#xff09; Baumer工业相机Baumer工业相机的固定帧率功能的技术背景CameraExplorer如何查看相机固定帧率功能在BGAPI SDK里通过函数设置相机固定帧率 Baumer工业相机通过BGAPI SDK设置相机固定帧…

ORCA优化器浅析——CDXLScalarFilter Class for DXL filter operators

CDXLScalarFilter CDXLScalarFilter为Class for representing DXL filter operators。CDXLScalarFilter相对于CDXLScalar没有增加其他数据成员。 class CDXLScalarFilter : public CDXLScalar{ private: CDXLScalarFilter(CDXLScalarFilter &); // private copy ctor pub…

初识C语言(3)

什么是C语言 1.第一个C语言程序 2.数据类型 3.变量、常量 4.字符串转义字符注释 5.选择语句 6.循环语句 7.函数 8.数组 9.操作符 10.常见关键字 11.define 定义常量和宏 12.指针 13.结构体 这一篇文章我们从常见关键字开始说起&#xff0c;也是…