13、Kafka ------ kafka 消费者API用法(消费者消费消息代码演示)

news2025/1/8 5:58:43

目录

  • kafka 消费者API用法
    • 消费者API
    • 使用消费者API消费消息
    • 消费者消费消息的代码演示
      • 1、官方API示例
      • 2、创建消费者类
      • 3、演示消费结果
        • 1、演示消费者属于同一个消费者组
        • 2、演示消费者不属于同一个消费者组
        • 3、停止线程不适用
        • 4、一些参数解释
    • 代码
      • 生产者:MessageProducer
      • 消费者 Consumer01
      • 消费者 Consumer02
      • pom.xml

kafka 消费者API用法

消费者API

消费者API的核心类是 KafkaConsumer,它提供了如下常用方法:

- subscribe(Collection<String> topics):订阅主题。

- subscribe(Pattern pattern):订阅符合给定正则表达式的所有主题。

- subscription():返回该消费者所订阅的主题集合。

- unsubscribe():取消订阅。

- close():关闭消费者。

- poll(Duration timeout):拉取消息。

- assign(Collection<TopicPartition> partitions):手动为该消费者分配分区。

- assignment():返回分配该消费者的分区集合。

- commitAsync():异步提交offset。

- commitSync():同步提交offset。
  提示:如果开启了自动提交offset,则无需调用上面commitAsync()或commitSync()方法进行手动提交;
  自动提交offset比较方便,但手动提交offset则更精确,消费者程序可以等到消息真正被处理后再手动提交offset。

  ——该选项有点类似于JMS、RabbitMQ的消息消费者的,消息确认机制。

- enforceRebalance():强制执行重平衡。

下面这些方法都体现了Kafka是一个数据流平台,消费者通过这些方法可以从分区的任意位置、重新开始读取数据。

- seek(TopicPartition partition, long offset):跳到指定的offset处,即下一条消息从offset处开始拉取。

- seekToBeginning(Collection<TopicPartition> partitions):跳到指定分区的开始处。

- seekToEnd(Collection<TopicPartition> partitions):跳到指定分区的结尾处。

- position(TopicPartition partition):返回指定分区当前的offset。


使用消费者API消费消息

根据KafkaConsumer不难看出,使用消费者API拉取消息很简单,基本只要几步:

1、创建KafkaConsumer对象,创建该对象时要传入Properties对象,用于对该消费者进行配置。

2、调用KafkaConsumer对象的poll()方法拉取消息,该方法返回ConsumerRecords。

3、对ConsumerRecords执行迭代,即可获取到抓取的每条消息。

4、程序结束时,取消订阅,关闭KafkaConsumer。



消费者消费消息的代码演示

1、官方API示例

KafkaConsumer

在这里插入图片描述

2、创建消费者类

在上一篇的生产者项目中,再写2个消费者来消费消息

Kafka 生产者API 用法

如图,创建2个消费者类,这个是消费者01,消费者02和01都是一模一样的。

在这里插入图片描述

在这里插入图片描述



3、演示消费结果

在这里插入图片描述



1、演示消费者属于同一个消费者组

如上图,可以看出,两个消费者属于同一个消费者组 ConsumerGroupTest_01 ,所以两个消费者消费到的消息是不重复的。因为每个消费者消费的分区都是不同的。

演示前预期结果:因为两个消费者属于同一个消费者组,所以每个消费者消费的分区都是不同的,也就是不会重复消费消息

在这里插入图片描述

演示结果:

演示步骤:启动两个消费者实例,然后启动生产者,往test2主题中发送20条消息,10条消息带key,10条消息不带key,大概率这各10条的消息就会被分配在不同的2个分区中。
根据kafka默认的分区消费规则,应该是一个消费者消费一个分区的消息

生产者发送消息:
生产者代码在这篇:
Kafka 生产者API 用法

在这里插入图片描述

在这里插入图片描述

消费者消费:

如图:消费者01 获取到了带key的消息并消费,消费者02 获取到了不带key的消息并消费,这里的消费消息先弄成打印就可以了。

注意:演示中,多次重启消费者,然后再启动生产者发送消息,总是消费者01消费到所有消息,消费者02没有消费到消息,然后生产者重新发送消息,才能有以下的演示结果。

类似集群模式,就是消息只能被一个消费者消费

在这里插入图片描述



2、演示消费者不属于同一个消费者组

因为两个消费者不属于同一个消费者组,所以两个消费者都能消费到test2主题下的所有分区的消息。

演示步骤:其他代码没变,只是修改了他们所属的消费者组
在这里插入图片描述

在这里插入图片描述

演示结果如图:两个消费者不属于同一个消费者组,每个消费者都能消费到所有消息,

类似于广播模式、或者是发布/订阅模式,
发布/订阅模型可以让一条消息能被多个消费者消费

在这里插入图片描述



3、停止线程不适用

这个停止消费者的线程好像没有用,如图,我生产者再发送消息后,这个消费者还是能消费到消息,并没有想象中的被停止。
现阶段要关闭消费者的话,直接关闭项目就可以了

在这里插入图片描述



4、一些参数解释

在这里插入图片描述

auto.offset.reset

设置从哪里读取消息

在这里插入图片描述



代码

生产者:MessageProducer

package cn.ljh;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

/**
 * Properties: Kafka 设计了 Properties 来封装所有的配置属性
 * <p>
 * KafkaProducer:用来创建消息生产者,是 生产者API 的核心类,
 * 它提供了一个 send()方法 来发送消息,该方法需要传入一个 ProducerRecord<K,V>对象
 * <p>
 * ProducerRecord:代表了一条消息,Kafka 的消息是包含了key、value、timestamp
 */
public class MessageProducer
{
    //主题常量
    public static final String TEST_TOPIC = "test2";

    public static void main(String[] args)
    {

        //Properties 中所设置的key,有效的value,可以通过Kafka官方文档来查询生产者API支持哪些配置属性
        Properties props = new Properties();
        //指定连接Kafka的地址,多个地址之间用逗号隔开
        props.put("bootstrap.servers", "localhost:9092,localhost:9093,localhost:9094");
        //指定Kafka的消息确认机制
        //0:不等待消息确认;1:只等待领导者分区的消息写入之后确认;all:等待所有分区的消息都写入之后才确认
        props.put("acks", "all");
        //指定消息发送失败的重试多少次
        props.put("retries", 0);
        //控制生产者在发送消息之前等待的时间
        //props.put("linger.ms", 3);
        //设置序列化器
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        //1、创建 KafkaProducer 时,需要传入 Properties 对象来配置消息生产者
        Producer<String, String> producer = new KafkaProducer<>(props);

        //2、发送消息
        for (int i = 0; i < 20; i++)
        {
            var msg = "这是第【 " + (i + 1) + " 】条消息!";
            if (i < 10)
            {
                //发送带 key 的消息
                producer.send(new ProducerRecord<String, String>(TEST_TOPIC, "ljh", msg));

            } else
            {
                //发送不带 key 的消息
                producer.send(new ProducerRecord<String, String>(TEST_TOPIC, msg));
            }
        }
        System.out.println("消息发送成功!");
        //3、关闭资源
        producer.close();
    }
}

消费者 Consumer01

package cn.ljh;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import java.util.Scanner;

public class Consumer01
{
    //组id:设置这个消费者实例是属于 ConsumerGroupTest_01 这个消费者组的
    public static final String GROUP_ID = "ConsumerGroupTest_01";

    //1、创建 KafkaConsumer 消费者对象 ,把这个消费者定义成成员变量
    public static KafkaConsumer<String, String> consumer = null;

    public static void main(String[] args)
    {
        //Properties 中所设置的key,有效的value,可以通过Kafka官方文档来查询生产者API支持哪些配置属性
        Properties props = new Properties();

        //指定连接Kafka的地址,多个地址之间用逗号隔开
        props.put("bootstrap.servers", "localhost:9092,localhost:9093,localhost:9094");

        //设置这个消费者实例属于哪个消费者组
        props.setProperty("group.id", GROUP_ID);

        //自动提交offset,就是类似之前的自动消息确认
        props.setProperty("enable.auto.commit", "true");

        //多个消息之间,自动提交消息的时间间隔
        props.setProperty("auto.commit.interval.ms", "1000");

        //设置session的超时时长,默认是10秒,这里设置15秒
        props.setProperty("session.timeout.ms", "15000");

        //设置每次都从最新的消息开始读取
        props.setProperty("auto.offset.reset","latest");

        //设置序列化器
        props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        //1、创建 KafkaConsumer 消费者对象
        consumer = new KafkaConsumer<>(props);

        //2、订阅主题,订阅kafka集群中的test2主题
        consumer.subscribe(Arrays.asList(MessageProducer.TEST_TOPIC));

        //因为获取消息的循环是一个死循环,没法退出,所以我在这里再加一个线程来关闭这个消费者
        //启动一个线程来关闭这个 KafkaConsumer
        new Thread(() ->
        {
            //创建一个Scanner 类来读取控制台数据
            Scanner sc = new Scanner(System.in);
            //如果有下一行,就读取下一行
            while (sc.hasNextLine())
            {
                //获取控制台下一行的内容
                var str = sc.nextLine();
                //就是这个线程一直监听控制台,如果我们在控制台输出” :exit “,则关闭这个这个 KafkaConsumer
                if (str.equals(":exit"))
                {
                    //取消订阅
                    consumer.unsubscribe();
                    //关闭消费者对象
                    consumer.close();
                }
            }

        }).start();

        //这是一个死循环,一直在获取主题中的消息
        while (true)
        {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("收到消息: offset = %d, key = %s, value = %s%n",
                        record.offset(), record.key(), record.value());
        }

    }
}

消费者 Consumer02

package cn.ljh;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import java.util.Scanner;

public class Consumer02
{
    //组id:设置这个消费者实例是属于 ConsumerGroupTest_02 这个消费者组的
    public static final String GROUP_ID = "ConsumerGroupTest_02";

    //1、创建 KafkaConsumer 消费者对象 ,把这个消费者定义成成员变量
    public static KafkaConsumer<String, String> consumer = null;

    public static void main(String[] args)
    {

        //Properties 中所设置的key,有效的value,可以通过Kafka官方文档来查询生产者API支持哪些配置属性
        Properties props = new Properties();
        //指定连接Kafka的地址,多个地址之间用逗号隔开
        props.put("bootstrap.servers", "localhost:9092,localhost:9093,localhost:9094");
        //设置这个消费者实例属于哪个消费者组
        props.setProperty("group.id", GROUP_ID);
        //自动提交offset,就是类似之前的自动消息确认
        props.setProperty("enable.auto.commit", "true");
        //多个消息之间,自动提交消息的时间间隔
        props.setProperty("auto.commit.interval.ms", "1000");
        //设置session的超时时长,默认是10秒,这里设置15秒
        props.setProperty("session.timeout.ms", "15000");

        //设置每次都从最新的消息开始读取
        props.setProperty("auto.offset.reset","latest");

        //设置序列化器
        props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        //1、创建 KafkaConsumer 消费者对象
        consumer = new KafkaConsumer<>(props);

        //2、订阅主题,订阅kafka集群中的test2主题
        consumer.subscribe(Arrays.asList(MessageProducer.TEST_TOPIC));

        //因为获取消息的循环是一个死循环,没法退出,所以我在这里再加一个线程来关闭这个消费者
        //启动一个线程来关闭这个 KafkaConsumer
        new Thread(() ->
        {
            //创建一个Scanner 类来读取控制台数据
            Scanner sc = new Scanner(System.in);
            //如果有下一行,就读取下一行
            while (sc.hasNextLine())
            {
                //获取控制台下一行的内容
                var str = sc.nextLine();
                //就是这个线程一直监听控制台,如果我们在控制台输出” :exit “,则关闭这个这个 KafkaConsumer
                if (str.equals(":exit"))
                {
                    //取消订阅
                    consumer.unsubscribe();
                    //关闭消费者对象
                    consumer.close();
                }
            }

        }).start();


        //这是一个死循环,一直在获取主题中的消息
        while (true)
        {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("收到消息: offset = %d, key = %s, value = %s%n",
                        record.offset(), record.key(), record.value());
        }


    }
}

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>cn.ljh</groupId>
    <artifactId>kafkaproducertest</artifactId>
    <version>1.0.0</version>
    <!-- 项目名,和 artifactId 保持一致 -->
    <name>kafkaproducertest</name>


    <properties>
        <!-- 在这里指定编译器的版本 -->
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
        <java.version>11</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <!-- 导入 Kafka 客户端APIJAR-->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.6.1</version>
        </dependency>
    </dependencies>


</project>

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1404505.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于SpringBoot Vue医院门诊管理系统

大家好✌&#xff01;我是Dwzun。很高兴你能来阅读我&#xff0c;我会陆续更新Java后端、前端、数据库、项目案例等相关知识点总结&#xff0c;还为大家分享优质的实战项目&#xff0c;本人在Java项目开发领域有多年的经验&#xff0c;陆续会更新更多优质的Java实战项目&#x…

Prometheus插件安装kafka_exporter

下载地址 https://github.com/danielqsj/kafka_exporter/releases 解压 tar -zxvf kafka_exporter-1.7.0.linux-amd64.tar.gzmv kafka_exporter-1.7.0.linux-amd64 kafka_exporter服务配置 cd /usr/lib/systemd/systemvi kafka_exporter.service内容如下 [Unit] Descript…

链路聚合原理与配置

链路聚合原理 随着网络规模不断扩大&#xff0c;用户对骨干链路的带宽和可靠性提出了越来越高的要求。在传统技术中&#xff0c;常用更换高速率的接口板或更换支持高速率接口板的设备的方式来增加带宽&#xff0c;但这种方案需要付出高额的费用&#xff0c;而且不够灵活。采用…

CLIP探索笔记

CLIP探索笔记 记录CLIP的流水账&#xff0c;训练和推理是如何完成的&#xff1f; 每一次阅读都有不同的领悟和发现&#xff0c;一些简单的想法。 官方信息 CodePaperBlog只有预测代码模型&#xff0c;没有训练代码 训练阶段 Text Encoder不需要训练&#xff0c;直接拿现成…

数据结构:堆与堆排序

目录 堆的定义&#xff1a; 堆的实现&#xff1a; 堆的元素插入&#xff1a; 堆元素删除&#xff1a; 堆初始化与销毁&#xff1a; 堆排序&#xff1a; 堆的定义&#xff1a; 堆是一种完全二叉树&#xff0c;完全二叉树定义如下&#xff1a; 一棵深度为k的有n个结点的二…

cpu温度监测工具 -- Turbo Boost Switcher Pro

Turbo Boost Switcher Pro是一款专为Mac电脑设计的CPU性能管理软件&#xff0c;它的技术背后是Intel Turbo Boost。Turbo Boost技术是一项能够自动加速处理器主频的技术&#xff0c;为Mac电脑提供更强大的计算能力。然而&#xff0c;这项技术在使用过程中会产生更多热量&#x…

云风网(www.niech.cn)个人网站搭建(二)服务器域名配置

这里直接采用宝塔服务器运维管理面板来进行配置&#xff0c;简单无脑 宝塔 Linux面板8.0.5安装脚本 //Centos安装脚本 yum install -y wget && wget -O install.sh https://download.bt.cn/install/install_6.0.sh && sh install.sh ed8484bec //Ubuntu/Deepi…

JVM系列-4.类加载器

&#x1f44f;作者简介&#xff1a;大家好&#xff0c;我是爱吃芝士的土豆倪&#xff0c;24届校招生Java选手&#xff0c;很高兴认识大家&#x1f4d5;系列专栏&#xff1a;Spring原理、JUC原理、Kafka原理、分布式技术原理、数据库技术、JVM原理&#x1f525;如果感觉博主的文…

java数据结构与算法刷题-----LeetCode645. 错误的集合(位运算解法需要重点掌握)

java数据结构与算法刷题目录&#xff08;剑指Offer、LeetCode、ACM&#xff09;-----主目录-----持续更新(进不去说明我没写完)&#xff1a;https://blog.csdn.net/grd_java/article/details/123063846 文章目录 法一&#xff1a;桶排序思想法二&#xff1a;位运算 法一&#x…

imu_utils安装及标定教程

本文使用香港科技大学的imu_utils方差工具标定&#xff0c;首先将INDEMIND双目惯性模组静止放置三个小时。然后采集IMU数据&#xff0c;生成Allan方差数据&#xff0c;由图分析得到加速度和角速度的高斯白噪声和随机游走Bias误差。 系统配置 系统版本ubuntu18.04OpenCV3.4.13…

C/C++读写文件和stringstream类

目录 C处理文件打开文件两种函数的区别 读文件两种函数区别其它读操作的函数fgetc&#xff1a;从文件中读取一个字符fgets&#xff1a;从文件中读取一个字符串fscanf&#xff1a;按格式从文件中读取指定内容&#xff0c;与scanf函数类似 写文件其它的常用写操作函数fputc&#…

【LeetCode-135】分发糖果(贪心)

LeetCode135.分发糖果 题目描述 老师想给孩子们分发糖果&#xff0c;有 N 个孩子站成了一条直线&#xff0c;老师会根据每个孩子的表现&#xff0c;预先给他们评分。 你需要按照以下要求&#xff0c;帮助老师给这些孩子分发糖果&#xff1a; 每个孩子至少分配到 1 个糖果。…

js中找出两个数组中不同的元素

文章目录 一、题目二、方法2.1、方法一2.2、方法二2.3、方法三 三、最后 一、题目 两个数组 var A [1, 5, 6]; var B [2, 6, 7]&#xff0c;实现一个方法&#xff0c;找出仅存在于A 或者仅存在于B中的所有数字 二、方法 2.1、方法一 const filterArr (arr1, arr2) > …

5大免费代理IP合集,你的代理IP该换啦!

一连代理 代理IP提供平台&#xff0c;代理IP覆盖HTTP/HTTPS/SOCKS5协议&#xff0c;涵盖直连和隧道代理。一键操作可以随机更换IP&#xff0c;实现高效稳定的网络代理。支持在PC、iOS和安卓等平台上使用。当前免费试用选项&#xff0c;让用户能够在使用之前先了解服务的性能和效…

归并排序模板

模板在文末&#xff0c;以下步骤方便理解记忆。 先贴一张快速排序模板步骤&#xff0c;用于对比记忆 归并排序步骤&#xff1a; &#xff08;0&#xff09;如果数组左边界L ≥ 数组右边界&#xff0c;则不需要排序&#xff0c;直接return。 &#xff08;1&#xff09;直接取…

力扣第92题——反转链表 II(C语言题解)

题目描述 给你单链表的头指针 head 和两个整数 left 和 right &#xff0c;其中 left < right 。请你反转从位置 left 到位置 right 的链表节点&#xff0c;返回 反转后的链表 。 示例 1&#xff1a; 输入&#xff1a;head [1,2,3,4,5], left 2, right 4 输出&#xff1…

成绩等级分数段查询(python条件分支语句match...case...)

根据有效分数序列及等级差值&#xff0c;计算并打印等级相应分数区间。 (笔记模板由python脚本于2024年01月20日 23:57:32创建&#xff0c;本篇笔记适合会条件分支语句的初学者的coder翻阅) 【学习的细节是欢悦的历程】 Python 官网&#xff1a;https://www.python.org/ Free&…

python:socket基础操作(2)-《udp发送信息》

基础发送udp信息 1.导入socket模块 2.使用udp模块 3.发送内容 4.关闭套接字 很简单的4步就可以实现udp的消息发送 import socket # 导入模块udp_socket socket.socket(socket.AF_INET,socket.SOCK_DGRAM) # 使用ipv4 udp协议udp_socket.sendto(b"hello world",(&…

翻毛皮鞋脏了不会清洗怎么办?资深劳保鞋厂家来教你

劳保鞋皮面材质中除了常见的牛皮材质&#xff0c;翻毛皮也是频繁使用的材料&#xff0c;材质不同&#xff0c;在养护上也有区别&#xff0c;今天百华小编来和大家聊聊翻毛皮材质的鞋子清洁方法。 翻毛皮鞋清洗前的准备工作 1.除灰&#xff1a;对于表面灰尘&#xff0c;可以使用…

C语言第六弹---分支语句(下)

✨个人主页&#xff1a; 熬夜学编程的小林 &#x1f497;系列专栏&#xff1a; 【C语言详解】 【数据结构详解】 分支语句 1、 逻辑操作符&#xff1a;&& , || , &#xff01;4.1、 逻辑取反运算符 &#xff01;4.2、 与运算符4.3、 或运算符4.4、 练习&#xff1a;闰…