Kafka入门, 消费者组案例(十九)

news2025/1/11 10:51:38

pom 文件

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.0.0</version>
        </dependency>
    </dependencies>

独立消费者案例(订阅主语)

在这里插入图片描述
在消费者API代码中,必须配置消费者id。命令行启动消费者不填写消费者组id会被自动填写随机得消费者组id

package com.longer.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;

public class CustomConsumer {
    public static void main(String[] args) {
        //创建消费者的配置对象
        Properties properties=new Properties();
        //2、给消费者配置对象添加参数
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092");
        //配置序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        //配置消费者组(组名任意起名)必须
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
        //创建消费者对象
        KafkaConsumer<String,String> kafkaConsumer=new KafkaConsumer<String, String>(properties);
        //注册要消费的主题
        ArrayList<String> topics=new ArrayList<>();
        topics.add("first");
        kafkaConsumer.subscribe(topics);
        while (true){
            //设置1s中消费一批数据
            ConsumerRecords<String,String> consumerRecords=kafkaConsumer.poll(Duration.ofSeconds(1));
            //打印消费到的数据
            for(ConsumerRecord<String,String> record:consumerRecords){
                System.out.println(record);
            }
        }
    }
}

发送的信息
在这里插入图片描述
消费信息(因为我发了好多次)
在这里插入图片描述

独立消费者案例

在这里插入图片描述

package com.longer.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;

public class CustomConsumerPartition {
    public static void main(String[] args) {
        Properties properties=new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        //配置消费者组(组名任意起名)必须
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
        KafkaConsumer kafkaConsumer=new KafkaConsumer(properties);
        //消费某个注意的某个分区数据
        ArrayList<TopicPartition> topicPartitions=new ArrayList<>();
        topicPartitions.add(new TopicPartition("first",0));
        kafkaConsumer.assign(topicPartitions);
        while (true){
            ConsumerRecords<String,String> consumerRecords=kafkaConsumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord);
            }
        }
    }
}

在这里插入图片描述

在这里插入图片描述
发送信息发现只消费0分区的信息
在这里插入图片描述

消费者组案例

在这里插入图片描述

package com.longer.consumer.group;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;

public class CustomConsumer1 {
    public static void main(String[] args) {
        //创建消费者的配置对象
        Properties properties=new Properties();
        //2、给消费者配置对象添加参数
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092");
        //配置序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        //配置消费者组(组名任意起名)必须
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
        //创建消费者对象
        KafkaConsumer<String,String> kafkaConsumer=new KafkaConsumer<String, String>(properties);
        //注册要消费的主题
        ArrayList<String> topics=new ArrayList<>();
        topics.add("first");
        kafkaConsumer.subscribe(topics);
        while (true){
            //设置1s中消费一批数据
            ConsumerRecords<String,String> consumerRecords=kafkaConsumer.poll(Duration.ofSeconds(1));
            //打印消费到的数据
            for(ConsumerRecord<String,String> record:consumerRecords){
                System.out.println(record);
            }
        }
    }
}

复制三分,然后运行
在这里插入图片描述在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/720992.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

简单认识LVS-DR负载群集和部署实例

文章目录 一、LVS-DR负载群集简介1、DR模式数据包流向分析2、DR 模式的特点 二、DR模式 LVS负载均衡群集部署 一、LVS-DR负载群集简介 1、DR模式数据包流向分析 1、客户端发送请求到 Director Server&#xff08;负载均衡器&#xff09;&#xff0c;请求的数据报文&#xff0…

放大器的基本知识

文章目录 1.反向输入&#xff08;引出&#xff1a;反向器&#xff09;1.反向输入例子 2.同向输入&#xff08;引出&#xff1a;电压跟随器&#xff09;2.同向输入例子 3.加法运算 1.反向输入&#xff08;引出&#xff1a;反向器&#xff09; 1.反向输入例子 —————————…

基于Java网上药品售卖系统设计实现(源码+lw+部署文档+讲解等)

博主介绍&#xff1a;✌全网粉丝30W,csdn特邀作者、博客专家、CSDN新星计划导师、Java领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精彩专…

【MYSQL基础】基础命令介绍

基础命令 MYSQL注释方式 -- 单行注释/* 多行注释 哈哈哈哈哈 哈哈哈哈 */连接数据库 mysql -u root -p12345678退出数据库连接 使用exit;命令可以退出连接 查询MYSQL版本 mysql> select version(); ----------- | version() | ----------- | 8.0.27 | ----------- 1…

HA使用Node-RED推送消息到手机

目录 1.一个简单示例(1).注入使用个inject节点&#xff0c;用来触发(2).手机通知是call service节点(3).保存-部署&#xff0c;看效果 2.消息-添加变量 1.一个简单示例 (1).注入使用个inject节点&#xff0c;用来触发 (2).手机通知是call service节点 Node-RED需要提前和Home A…

spring boot + Apache tika 实现文档内容解析

Apache tika是Apache开源的一个文档解析工具。Apache Tika可以解析和提取一千多种不同的文件类型(如PPT、XLS和PDF)的内容和格式&#xff0c;并且Apache Tika提供了多种使用方式&#xff0c;既可以使用图形化操作页面&#xff08;tika-app&#xff09;&#xff0c;又可以独立部…

Dockerfile 基本命令

本文目录 1. 什么是 Dockerfile2. Dockerfile 基本命令2.1 FROM 指定基础镜像2.2 RUN 执行命令2.3 COPY 复制文件2.4 ADD 更高级的复制文件2.5 CMD2.6 ENTRYPOINT2.7 ENV 设置环境变量2.8 ARG2.9 VOLUME 定义匿名卷2.10 EXPOSE2.11 WORKDIR 指定工作目录2.12 USER 指定当前用户…

【洛谷】P1073 [NOIP2009 提高组] 最优贸易(dp+搜索)

接下来讲具体解法。第一、输入。存邻接表第二、我们需要做深搜。可以用递归来做&#xff0c;同时做动规&#xff1a;函数如下&#xff08;贴了注释&#xff09;.void dfs(int x,int minx,int pre) { //x表示当前访问的节点编号&#xff0c;minx表示目…

添加白名单 gcc/g++【Linux系统编程】

目录 一、添加白名单 二、gcc和g的使用 1、背景知识 一、添加白名单 如何让普通用户可以执行sudo&#xff08;以root的身份&#xff09;指令&#xff1f; 添加白名单 用root身份在/etc/sudoers目录添加 vim /etc/sudoers二、gcc和g的使用 1、背景知识 &#xff08;1&#…

【FFmpeg实战】ffplay整体框架

原文地址&#xff1a;https://segmentfault.com/a/1190000042611796 本文使用的ffplay.c的版本是搭配ffmpeg5.0的版本。 ffplay代码大致架构 关于fplay的架构很难三言两语说得清楚&#xff0c;而且本人对它的理解也不是很深&#xff0c;加上行笔比较啰嗦&#xff0c;可能就更…

springboot配置多个mongo数据源

yaml配置文件&#xff1a; spring:data:mongodb:uri: mongodb://admin:密码ip:27017/paasoo?authSourceadminother:uri: mongodb://admin:密码ip:27017/conversation?authSourceadmin java config文件&#xff1a; package com.paasoo.quartz.config.mongo;import org.spr…

VR数字乡村激活乡土文化生命力,助力乡村振兴

民俗节庆、传统技艺等蕴含着中华五千年以来的传统文化&#xff0c;乡村文化建设在为文化留住血脉的同时&#xff0c;也为高质量发展创造更多可能。找准乡村文化与产业的结合点&#xff0c;有利于激发产业发展的潜力&#xff0c;激活乡土文化的生命力&#xff0c;为乡村振兴注入…

baichuan-7B模型

文章目录 baichuan-7B介绍baichuan-7B 推理baichuan-7B 微调 baichuan-7B介绍 2023年6月15日&#xff0c;搜狗创始人王小川创立的百川智能公司&#xff0c;发布了70 亿参数量的中英文预训练大模型——baichuan-7B。 baichuan-7B 基于 Transformer 结构&#xff0c;在大约 1.2…

【Ubuntu学习MySQL——安装MySQL】

首先得su&#xff0c;然后输入密码&#xff0c;进入到root模式下&#xff0c;以下命令均在root用户模式下进行 1.在这里我们使用RPM包来安装Mysql&#xff0c;所以首先安装RPM包 apt install rpm2.安装完RPM包之后&#xff0c;检测系统是否自带安装MySQL&#xff0c;如果没有…

最小年龄仅5岁!盘点全球最“天才”少年黑客 TOP 10

你还能想起自己8岁的时候&#xff0c;每天都在玩什么吗&#xff1f;可能是在楼下和小朋友一起捉迷藏&#xff1f;在家追一本连载的漫画书&#xff1f;又或者在电脑上玩种菜偷菜的小游戏&#xff1f; 当同龄人还在沉迷于这些比较“基础”的小游戏时&#xff0c;有这样一批和互联…

ARM_uart_发送接收字符 and 发送接收字符串

include/uart4.h #ifndef __UART4_H__ #define __UART4_H__#include "stm32mp1xx_gpio.h" #include "stm32mp1xx_rcc.h" #include "stm32mp1xx_uart.h"//初始化相关操作 void hal_uart4_init();//发送一个字符 void hal_put_char(const char st…

逆波兰表达式

思路 变量 String[] arr Stack 代码 public class Test1 {public static void main(String[] args) {String s "3 40 5 * 6 -";Stack numArr new Stack(10);int num1 0;int num2 0;int res 0;int index 0;String[] arr s.split(" ");for(String…

Flink 读写Kafka总结

前言 总结Flink读写Kafka Flink 版本 1.15.4 Table API 本文主要总结Table API的使用&#xff08;SQL&#xff09;&#xff0c;官方文档&#xff1a;https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/connectors/table/kafka/ kerberos认证相关配置 …

spring生命周期配置

初始化方法&#xff0c;可读化方法&#xff1a; 初始化方法定义在java接口文件&#xff1a; init-method&#xff1a;指定类中的初始化方法名称 destroy-method &#xff1a;指定类中销毁方法名称 这里要在配置文件中配置一份&#xff1a; 如果想要destroy文件关闭后还能运行&…

Spring源码整体脉络介绍及源码编译

需完成的任务 类------------------------------------------BeanFactory----------------------------------------->Bean【BeanFactory调用getBean()生产出来的】 BeanFactory Spring顶层核心接口&#xff0c;使用了简单工厂模式【根据名字&#xff0c;生产出不同的Bean…