Kafka 集成 SpringBoot, 快速入门

news2025/1/22 18:47:02

一、kafka的生产者和消费者

1. 生产者发送消息的流程

 2. 消费者接收消息的流程

 二、 java 代码实现

1. 添加依赖:

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.12</artifactId>
        </dependency>

2. 实现生产者

public class NormalProducer {

    public static void main(String[] args) {
        Properties properties = new Properties();
        //	1.配置生产者启动的关键属性参数

        //	1.1	BOOTSTRAP_SERVERS_CONFIG:连接kafka集群的服务列表,如果有多个,使用"逗号"进行分隔
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.101:9092");
        //	1.2	CLIENT_ID_CONFIG:这个属性的目的是标记kafkaclient的ID
        properties.put(ProducerConfig.CLIENT_ID_CONFIG, "quickstart-producer");
        //	1.3 KEY_SERIALIZER_CLASS_CONFIG VALUE_SERIALIZER_CLASS_CONFIG
        //	Q: 对 kafka的 key 和 value 做序列化,为什么需要序列化?
        //	A: 因为KAFKA Broker 在接收消息的时候,必须要以二进制的方式接收,所以必须要对KEY和VALUE进行序列化
        //	字符串序列化类:org.apache.kafka.common.serialization.StringSerializer
        //	KEY: 是kafka用于做消息投递计算具体投递到对应的主题的哪一个partition而需要的
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //	VALUE: 实际发送消息的内容
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        //	2.创建kafka生产者对象 传递properties属性参数集合
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        for(int i = 0; i <10; i ++) {
            //	3.构造消息内容
            User user = new User("00" + i, "张三");
            ProducerRecord<String, String> record =
                    //	arg1:topic , arg2:实际的消息体内容,quick_start 是 topic 名称
                    new ProducerRecord<String, String>("quick_start",
                            JSON.toJSONString(user));

            //	4.发送消息
            producer.send(record);
        }


        //	5.关闭生产者
        producer.close();

    }
}

其中的 User 对象为:

public class User {

	private String id;
	
	private String name;

	public User() {
	}

	public User(String id, String name) {
		this.id = id;
		this.name = name;
	}

	public String getId() {
		return id;
	}

	public void setId(String id) {
		this.id = id;
	}

	public String getName() {
		return name;
	}

	public void setName(String name) {
		this.name = name;
	}
}

3. 实现消费者

public class NormalConsumer {

    public static void main(String[] args) {

        //	1. 配置属性参数
        Properties properties = new Properties();

        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.101:9092");

        //	org.apache.kafka.common.serialization.StringDeserializer
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        //	非常重要的属性配置:与我们消费者订阅组有关系
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "quickstart-group");
        //	常规属性:会话连接超时时间
        properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
        //	消费者提交offset: 自动提交 & 手工提交,默认是自动提交
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);

        //	2. 创建消费者对象
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

        //	3. 订阅你感兴趣的主题:quick_start
        consumer.subscribe(Collections.singletonList("quick_start"));

        System.err.println("quickstart consumer started...");

        try {
            //	4.采用拉取消息的方式消费数据
            while(true) {
                //	等待多久拉取一次消息
                //	拉取TOPIC_QUICKSTART主题里面所有的消息
                //	topic 和 partition是 一对多的关系,一个topic可以有多个partition
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                //	因为消息是在partition中存储的,所以需要遍历partition集合
                for(TopicPartition topicPartition : records.partitions()) {
                    //	通过TopicPartition获取指定的消息集合,获取到的就是当前topicPartition下面所有的消息
                    List<ConsumerRecord<String, String>> partitionRecords = records.records(topicPartition);
                    //	获取TopicPartition对应的主题名称
                    String topic = topicPartition.topic();
                    //	获取当前topicPartition下的消息条数
                    int size = partitionRecords.size();

                    System.err.println(String.format("--- 获取topic: %s, 分区位置:%s, 消息总数: %s",
                            topic,
                            topicPartition.partition(),
                            size));

                    for(int i = 0; i < size; i++) {
                        ConsumerRecord<String, String> consumerRecord = partitionRecords.get(i);
                        //	实际的数据内容
                        String value = consumerRecord.value();
                        //	当前获取的消息偏移量
                        long offset = consumerRecord.offset();
                        //	ISR : High Watermark, 如果要提交的话,比如提交当前消息的offset+1
                        //	表示下一次从什么位置(offset)拉取消息
                        long commitOffser = offset + 1;
                        System.err.println(String.format("获取实际消息 value:%s, 消息offset: %s, 提交offset: %s",
                                value, offset, commitOffser));
                    }
                }
            }
        } finally {
            consumer.close();
        }
    }
}

4. 测试结果

生产者发送的消息在消费者端可以正常接收:

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/510815.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

回溯算法例题(剪枝策略)

目录 1.组合1.77. 组合2.216. 组合总和 III3.17. 电话号码的字母组合4.39. 组合总和5.40. 组合总和 II 2.分割1.131. 分割回文串2.*93. 复原 IP 地址 3.子集1.78. 子集2.90. 子集 II 4.排列1.46. 全排列2.47. 全排列 II 5.棋盘问题1.51. N 皇后2.37. 解数独 6.其他1.491. 递增子…

系统移植 5-10

1.进入linux内核源码目录下&#xff0c;打开Makefile文件&#xff0c;搜索vmlinux&#xff0c;找到cmd_link-vmlinux命令&#xff0c; 1179 cmd_link-vmlinux \ 1180 $(CONFIG_SHELL) $< "$(LD)" "…

不同应用场景瑞芯微RK3568主板方案定制

随着物联网和智能设备的迅猛发展&#xff0c;瑞芯微RK3568主板方案作为一种高性能的系统System-on-a-chip&#xff08;SoC&#xff09;&#xff0c;已经成为嵌入式系统、智能家居设备和工业自动化设备等应用场景的首选方案。定制瑞芯微RK3568主板方案可以满足不同应用场景的需求…

科技云报道:ChatGPT应用爆火,安全的大数据底座何处寻?

科技云报道原创。 毫无疑问&#xff0c;AIGC正在给人类社会带来一场深刻的变革。 而剥开其令人眼花缭乱的华丽外表&#xff0c;运行的核心离不开海量的数据支持。 ChatGPT的“入侵”已经引起了各行各业对内容抄袭的担忧&#xff0c;以及网络数据安全意识的提高。 虽然AI技术…

线程的原子性、可见性、有序性及线程安全知识整理

要想保证线程安全&#xff0c;必须同时满足原子性、可见性、有序性。 一、定义 1.1 原子性 一个操作或者多个操作&#xff0c;要么全部执行&#xff0c;并且执行的过程不会被打断&#xff0c; 要么就全部不执行&#xff08;一个操作是不可被分割的&#xff09;。 Java中实现…

JavaScript经典教程(七)-- JavaScript初级

190&#xff1a;JavaScript初级内容 - DOM查询、插入内容、赋予样式等 1、DOM操作 DOM&#xff1a;节点&#xff0c;也就是html中的元素&#xff1b; DOM操作&#xff1a;其实就是节点元素的方法&#xff1b; &#xff08;1&#xff09;innerHTML - 返回元素内容 同时也可以…

【JUC基础】05. Synchronized和ReentrantLock

1、前言 前面两篇中分别讲了Synchronized和ReentrantLock。两种方式都能实现同步锁&#xff0c;且也都能解决多线程的并发问题。那么这两个有什么区别呢&#xff1f; 这个也是一个高频的面经题。 2、相同点 2.1、都是可重入锁 什么是可重入锁&#xff1f; 可重入锁&#xff0…

Mysql查询字符串中某个字符串出现的次数

目录 1.查单个字符出现的次数2.查多个字符出现的次数3.函数讲解 1.查单个字符出现的次数 比如我想查how do you do 字符串当中出现d的次数&#xff1a; 第一眼看上去有点懵&#xff0c;首先mysql并没有直接计算出现字符次数的函数&#xff0c;所以才使用了下面这种方式&#x…

【排错记录】国产航顺HK32F030M驱动TM1624四位数码管显示

问题描述&#xff1a; 航顺单片机 HK32F030MF4P6用数码管显示驱动TM1624问题描述。 航顺单片HK32F030MF4P6的PC3/PC4/PC5引脚分别连接数码管驱动TM1624的DIN/CLK/STB;当单独使用HK32F030MF4P6单片机最小系统和TM1624数码管模块的时候部分最小系统板能驱动数码管正常显示&…

Centos 7 安装系列(11):Kibana

一、系统环境 操作系统&#xff1a;Centos 7 已安装环境&#xff1a;ElasticSearch 8.6.2 二、安装 需要注意的是&#xff1a;Kibana的版本需要和Elasticsearch保持一致。 2.1 下载并解压安装包 cd /opt yum install -y wget wget https://artifacts.elastic.co/downloads…

马哈鱼SQLFLow对SQL Server OUTPUT Clause 的数据血缘分析

SQL Server OUTPUT Clause 会对 SQL 语句的血缘分析产生影响&#xff0c;如果忽略对 OUTPUT Clause 的分析&#xff0c;那么将漏掉一些关键的数据血缘关系&#xff0c;从而影响数据血缘分析的准确性&#xff0c;进而影响组织的数据治理质量。 Gudu SQLFlow 可以对 SQL Server …

Linux下安装MySQL 5.7

安装MySQL 5.7 1、通过命令下载 wget http://dev.mysql.com/get/Downloads/MySQL-5.7/mysql-5.7.36-linux-glibc2.12-x86_64.tar.gz 2、解压 tar -zxvf mysql-5.7.36-linux-glibc2.12-x86_64.tar.gz -C /usr/local/mysql/ 3、简化 cd /usr/local mv mysql-5.7.36-linux-…

Hibernate 基本操作、懒加载以及缓存

前言 上一篇咱们介绍了 Hibernate 以及写了一个 Hibernate 的工具类&#xff0c;快速入门体验了一波 Hibernate 的使用&#xff0c;我们只需通过 Session 对象就能实现数据库的操作了。 现在&#xff0c;这篇介绍使用 Hibernate 进行基本的 CRUD、懒加载以及缓存的知识。 提示…

干货|做实验到底应该选取多少被试?

Hello&#xff0c;大家好&#xff01; 这里是壹脑云科研圈&#xff0c;我是喵君姐姐~ 我们都知道心理学实验一般是通过分析被试的一系列数据从而得到相应的结论的。那么&#xff0c;在进行心理学实验的时候需要多少被试&#xff1f;怎么去看实验的被试量够不够呢&#xff1f;…

【Spring Cloud】Spring Cloud Alibaba 实战 Seata (分布式事务)

文章目录 一、Seata 简介简要发展史Seata 设计初衷 二、使用 Docker 快速搭建 Seata 1.4三、在 Spring 项目中使用 Seata 客户端 一、Seata 简介 Seata&#xff08;Simple Extensible Autonomous Transaction Architecture&#xff09; 是一款开源的分布式事务解决方案&#xf…

Datawhale-chatGPT用于句词分类

NLU基础 句子级别的分类 Token级别的分类 相关API chatGPT Style prompt建议 NLU应用 文档问答 分类/实体微调 智能对话

php xdebug配置

1.sublime 火绒 火绒安装插件Xdebug Helper for Firefox 管理插件 -…-选项 填入ide key sublime 第一个插件package control ctrlshifitp 输入install 点击安装第一个包管理 package control 第二个插件 xdebug ctrlshifitp 输入xdebug clinet安装 php配置 这里用的时phps…

网安笔记03 DES概述

DES 概述 分组加密算法 &#xff1a; 明文、密文64位分组长度对称算法 &#xff1a; 加密和解密密钥编排不同&#xff0c;但使用同一算法密钥长度&#xff1a;56位 —— 每个第8位为奇偶校验位密钥为任意的56位数&#xff0c;存在 弱密钥&#xff0c; 容易避开混乱与扩散的组合…

初学者自学Web安全的三个必经阶段(含系统路线脑图+工具笔记)

一、为什么选择网络安全&#xff1f; 这几年随着我国《国家网络空间安全战略》《网络安全法》《网络安全等级保护2.0》等一系列政策/法规/标准的持续落地&#xff0c;网络安全行业地位、薪资随之水涨船高。 未来3-5年&#xff0c;是安全行业的黄金发展期&#xff0c;提前踏入…

React学习1

JSX使得创建虚拟DOM更便捷&#xff0c;纯JS创建虚拟DOM太过繁琐 JSX语法规范&#xff1a; JSX的{}&#xff0c;读变量的时候只能存放表达式&#xff0c;不能写语句&#xff08;代码&#xff09; react可以遍历数组&#xff0c;但是无法遍历对象 react是面向组件编程 函数式…