【大数据之Kafka】十一、Kafka消费者及消费者组案例

news2024/10/6 3:40:16

1 独立消费者案例(订阅主题)

(1)需求:创建一个独立消费者,消费 first 主题中数据。
(2)分析:
在这里插入图片描述
注意:在消费者 API 代码中必须配置消费者组 id。命令行启动消费者不填写消费者组 id 会被自动填写随机的消费者组 id。

步骤:
(1)创建包名:com.study.kafka.consumer
(2)创建类:CustomConsumer

package com.study.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;

public class CustomConsumer {
    public static void main(String[] args) {
        //0.配置
        Properties properties = new Properties();

        //连接
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");

        //配置反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());

        //配置消费者组(组名任意取)
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test1");

        //修改分区分配策略
        properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.StickyAssignor");



        //1.创建消费者对象
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);


        //2.订阅主题,注册要消费的主题,可以有多个
        ArrayList<String> topics = new ArrayList<>();
        topics.add("first1");
        kafkaConsumer.subscribe(topics);

        //3.消费,拉取数据,打印
        while (true){
            //设置1s消费一批数据
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));

            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord);
            }

        }

    }
}

(3)在 IDEA 中执行消费者程序。
(4)在 Kafka 集群控制台,创建 Kafka 生产者,并输入数据。

bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092,hadoop103:9092 --topic first
> hello

(5)在 IDEA 控制台观察接收到的数据。
在这里插入图片描述

2 独立消费者案例(订阅分区)

(1)需求:创建一个独立消费者,消费 first 主题 0 号分区的数据。
(2)分析:
在这里插入图片描述
步骤:
(1)创建类CustomConsumerPartition

package com.study.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;

public class CustomConsumerPartition {
    public static void main(String[] args) {
        //0.配置
        Properties properties = new Properties();

        //连接
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");

        //配置反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());

        //配置消费者组(组名任意取)
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");

        //1.创建消费者对象
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);


        //2.订阅主题,注册要消费的主题,可以有多个
        //消费某个主题的某个分区数据,订阅对应的分区
        ArrayList<TopicPartition> topicPartitions = new ArrayList<>();
        topicPartitions.add(new TopicPartition("first",0));
        kafkaConsumer.assign(topicPartitions);

        //3.消费,拉取数据,打印
        while (true){
            //设置1s消费一批数据
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));

            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord);
            }
        }
    }
}

(2)在 IDEA 中执行消费者程序。
(3)在 IDEA 中执行生产者程序 CustomProducerCallback()在控制台观察生成几个 0 号分区的数据。
在这里插入图片描述
在这里插入图片描述

3 消费者组案例

(1)需求:测试同一个主题的分区数据,只能由一个消费者组中的一个消费。
(2)分析:
在这里插入图片描述
步骤:
(1)复制两份CustomConsumer的代码,并在 IDEA 同时启动,即一个消费者组中有三个消费者。
在这里插入图片描述
(2)在 IDEA 中执行生产者程序 CustomProducerCallback()在控制台观察。
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1003526.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

算法通关村第13关【青铜】| 数字与数学基础问题

数字统计专题 1.数组元素积的符号 思路&#xff1a;每回碰到负数就取反 class Solution {public int arraySign(int[] nums) {int res nums[0];if(nums[0]>0){res 1;}else if(nums[0]<0){res -1;}else{return res;}for(int i 1;i<nums.length;i){if(nums[i]<…

Linux基本认识

一、Linux基本概念 Linux 内核最初只是由芬兰人林纳斯托瓦兹&#xff08;Linus Torvalds&#xff09;在赫尔辛基大学上学时出于个人爱好而编写的。 Linux 是一套免费使用和自由传播的类 Unix 操作系统&#xff0c;是一个基于 POSIX 和 UNIX 的多用户、多任务、支持多线程和多…

地下管网实时水位监测用什么设备好?

地下排水管网是城市重要基础设施生命线之一&#xff0c;主要用于排放雨水、地表水和废水&#xff0c;以维护城市的安全运行。然而&#xff0c;在极端天气事件发生时&#xff0c;排水系统可能会面临压力巨大&#xff0c;导致排水不畅引发城市内涝。通过对管网水位实时监测&#…

Java集合大总结——Collection集合

Collection集合的整理 1、List&#xff0c;Set&#xff0c;Queue&#xff0c;Map四者的区别集合底层数据结构梳理2、关于集合的的选用2.1 为什么使用集合3、List接口3.1 ArrayList 和 Array&#xff08;数组&#xff09;的区别&#xff1f;3.1 LinkedList 为什么不能实现Random…

基于python+txt的学生成绩管理系统

基于pythontxt的学生成绩管理系统 一、系统介绍二、效果展示三、其他系统实现四、获取源码 一、系统介绍 录入学生信息查找学生信息删除学生信息修改学生信息排序统计学生信息显示所有学生信息 基于python的学生成绩管理系统&#xff0c;具备基本的增删改查功能&#xff0c;包…

2023-9-12 完全背包问题

题目链接&#xff1a;完全背包问题 初版(时间复杂度拉满) #include <iostream> #include <algorithm>using namespace std;const int N 1010;int n, m; int v[N], w[N]; int f[N][N];int main() {cin >> n >> m;for(int i 1; i < n; i ) cin >…

AntDB数据库参加ACDU中国行杭州站,分享数据库运维实践与经验

关于ACDU 和中国行: ACDU是由墨天轮社区举办的中国数据库联盟的品牌活动之一&#xff0c;在线下汇集数据库领域的行业知名人士&#xff0c;共同探讨数据库前沿技术及其应用&#xff0c;促进行业发展和创新的平台&#xff0c;也为开发者们提供友好交流的机会。 AntDB作为具有技术…

Kafka 基于 S3 的数据导出、导入、备份、还原、迁移方案

在系统升级或迁移时&#xff0c;用户常常需要将一个 Kafka 集群中的数据导出&#xff08;备份&#xff09;&#xff0c;然后在新集群或另一个集群中再将数据导入&#xff08;还原&#xff09;。通常&#xff0c;Kafka集群间的数据复制和同步多采用 Kafka MirrorMaker&#xff0…

【C++】常用集合算法

0.前言 1.set_intersection #include <iostream> using namespace std;// 常用集合算法 交集set_intersection #include<vector> #include<algorithm>void myPrint(int val) {cout << val << " "; }void test01() {vector<int>v…

Oracle启动报错解决:ora-00119和ora-00132

WINDOWS环境下&#xff0c; 查看Oracle的各项服务都正常&#xff0c; 但是SQL窗口启动ORACLE报错ora-00119和ora-00132&#xff0c;如何解决&#xff1a; 一、问题描述 1、ORACLE服务全部打开&#xff0c;没有报错&#xff1b; 2、plsql登陆报ora-12505错&#xff1b; 3、监听…

docker启动MySQL报错:退出状态码1

docker启动mysql反复重启&#xff0c;通过 使用 docker logs 容器ID chown: cannot read directory /var/lib/mysql/: Permission denied 但是目录权限确认没问题&#xff0c;即使 chmod 777 还是报相同的错误&#xff0c;后来发现是selinux的问题 查看状态 getenforce 临时…

保护个人隐私,自建个性图床:Cpolar+Qchan轻量级搭建教程分享

文章目录 前言1. Qchan网站搭建1.1 Qchan下载和安装1.2 Qchan网页测试1.3 cpolar的安装和注册 2. 本地网页发布2.1 Cpolar云端设置2.2 Cpolar本地设置 3. 公网访问测试总结 前言 图床作为云存储的一项重要应用场景&#xff0c;在大量开发人员的努力下&#xff0c;已经开发出大…

照片怎么换背景图?照片抠图换背景方法分享

照片怎么把背景进行更换呢&#xff1f;当我们拍好一张照片&#xff0c;但是对照片的背景不太满意&#xff0c;想要将照片的背景进行更换&#xff0c;怎么做才能实现呢&#xff1f;其实这一类的问题我们在制作海报的时候也会经常遇到&#xff0c;如果不是专业的图片编辑制作人员…

C++之weak_ptr与shared_ptr智能指针实例(一百九十五)

简介&#xff1a; CSDN博客专家&#xff0c;专注Android/Linux系统&#xff0c;分享多mic语音方案、音视频、编解码等技术&#xff0c;与大家一起成长&#xff01; 优质专栏&#xff1a;Audio工程师进阶系列【原创干货持续更新中……】&#x1f680; 人生格言&#xff1a; 人生…

问 ChatGPT 50个问题,耗水 500 毫升;通讯专家发帖称 iPhone 网络国内造假丨RTE开发者日报 Vol.47

开发者朋友们大家好&#xff1a; 这里是「RTE 开发者日报」&#xff0c;每天和大家一起看新闻、聊八卦。我们的社区编辑团队会整理分享 RTE &#xff08;Real Time Engagement&#xff09; 领域内「有话题的新闻」、「有态度的观点」、「有意思的数据」、「有思考的文章」、「…

mysql在ubuntu上命令行登陆密码不正确

1.登陆提示如下 2.使用mysql -u root -p登录也是类似的 3.打开宝塔面板 点击root密码&#xff0c;更改密码后即可在命令行界面登录 4.登录效果如下

Java从入门到精通-类和对象(二)

0. 类和对象 3. 类的构造方法 构造方法是一种特殊的方法&#xff0c;用于创建和初始化对象。构造方法的名称必须与类名相同&#xff0c;它没有返回值&#xff0c;并且在创建对象时自动调用。构造方法的主要作用是确保对象在创建时具有合适的初始状态。 以下是构造方法的基本概…

掌握这些,让你轻松玩转钡铼PLC网关与西门子S7-1200的MQTT通信

一、软硬件描述 西门子PLC S7-1215钡铼BL102网关mosquitto MQTT服务器&#xff08;腾讯云上搭建&#xff09;可以上网的路由器一套 二、需要使用的软件。 西门子Portal v15.1 (西门子PLC编程软件&#xff09;钡铼BL102调试软件及说明书。 说明书下载链接&#xff1a; http:/…

Map集合案例-统计投票人数

需求&#xff1a; 某个班级80名学生&#xff0c;现在需要组成秋游活动&#xff0c;班长提供了四个景点依次是(A、B、C、D)&#xff0c;每个学生只能选择一个景点&#xff0c;请统计出最终哪个景点想去的人数最多。 利用Map集合进行统计 //A06_HashMapDemo2.java package dail…

【报错】bash: curl: command not found

&#xff08;1&#xff09;报错&#x1f631;&#x1f631;&#x1f631; bash: curl: command not found &#xff08;2&#xff09;分析&#x1f430;&#x1f430;&#x1f430; 安装对应的工具包 curl即可。 &#xff08;3&#xff09;解决方法&#x1f489;&#x1f4…