Kafka 使用java实现,快速入门

news2025/1/12 12:27:11

一、kafka的生产者和消费者

1. 生产者发送消息的流程

 2. 消费者接收消息的流程

 二、 java 代码实现

1. 添加依赖:

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.12</artifactId>
        </dependency>

2. 实现生产者

public class NormalProducer {

    public static void main(String[] args) {
        Properties properties = new Properties();
        //	1.配置生产者启动的关键属性参数

        //	1.1	BOOTSTRAP_SERVERS_CONFIG:连接kafka集群的服务列表,如果有多个,使用"逗号"进行分隔
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.101:9092");
        //	1.2	CLIENT_ID_CONFIG:这个属性的目的是标记kafkaclient的ID
        properties.put(ProducerConfig.CLIENT_ID_CONFIG, "quickstart-producer");
        //	1.3 KEY_SERIALIZER_CLASS_CONFIG VALUE_SERIALIZER_CLASS_CONFIG
        //	Q: 对 kafka的 key 和 value 做序列化,为什么需要序列化?
        //	A: 因为KAFKA Broker 在接收消息的时候,必须要以二进制的方式接收,所以必须要对KEY和VALUE进行序列化
        //	字符串序列化类:org.apache.kafka.common.serialization.StringSerializer
        //	KEY: 是kafka用于做消息投递计算具体投递到对应的主题的哪一个partition而需要的
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //	VALUE: 实际发送消息的内容
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        //	2.创建kafka生产者对象 传递properties属性参数集合
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        for(int i = 0; i <10; i ++) {
            //	3.构造消息内容
            User user = new User("00" + i, "张三");
            ProducerRecord<String, String> record =
                    //	arg1:topic , arg2:实际的消息体内容,quick_start 是 topic 名称
                    new ProducerRecord<String, String>("quick_start",
                            JSON.toJSONString(user));

            //	4.发送消息
            producer.send(record);
        }


        //	5.关闭生产者
        producer.close();

    }
}

其中的 User 对象为:

public class User {

	private String id;
	
	private String name;

	public User() {
	}

	public User(String id, String name) {
		this.id = id;
		this.name = name;
	}

	public String getId() {
		return id;
	}

	public void setId(String id) {
		this.id = id;
	}

	public String getName() {
		return name;
	}

	public void setName(String name) {
		this.name = name;
	}
}

3. 实现消费者

public class NormalConsumer {

    public static void main(String[] args) {

        //	1. 配置属性参数
        Properties properties = new Properties();

        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.101:9092");

        //	org.apache.kafka.common.serialization.StringDeserializer
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        //	非常重要的属性配置:与我们消费者订阅组有关系
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "quickstart-group");
        //	常规属性:会话连接超时时间
        properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
        //	消费者提交offset: 自动提交 & 手工提交,默认是自动提交
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);

        //	2. 创建消费者对象
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

        //	3. 订阅你感兴趣的主题:quick_start
        consumer.subscribe(Collections.singletonList("quick_start"));

        System.err.println("quickstart consumer started...");

        try {
            //	4.采用拉取消息的方式消费数据
            while(true) {
                //	等待多久拉取一次消息
                //	拉取TOPIC_QUICKSTART主题里面所有的消息
                //	topic 和 partition是 一对多的关系,一个topic可以有多个partition
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                //	因为消息是在partition中存储的,所以需要遍历partition集合
                for(TopicPartition topicPartition : records.partitions()) {
                    //	通过TopicPartition获取指定的消息集合,获取到的就是当前topicPartition下面所有的消息
                    List<ConsumerRecord<String, String>> partitionRecords = records.records(topicPartition);
                    //	获取TopicPartition对应的主题名称
                    String topic = topicPartition.topic();
                    //	获取当前topicPartition下的消息条数
                    int size = partitionRecords.size();

                    System.err.println(String.format("--- 获取topic: %s, 分区位置:%s, 消息总数: %s",
                            topic,
                            topicPartition.partition(),
                            size));

                    for(int i = 0; i < size; i++) {
                        ConsumerRecord<String, String> consumerRecord = partitionRecords.get(i);
                        //	实际的数据内容
                        String value = consumerRecord.value();
                        //	当前获取的消息偏移量
                        long offset = consumerRecord.offset();
                        //	ISR : High Watermark, 如果要提交的话,比如提交当前消息的offset+1
                        //	表示下一次从什么位置(offset)拉取消息
                        long commitOffser = offset + 1;
                        System.err.println(String.format("获取实际消息 value:%s, 消息offset: %s, 提交offset: %s",
                                value, offset, commitOffser));
                    }
                }
            }
        } finally {
            consumer.close();
        }
    }
}

4. 测试结果

生产者发送的消息在消费者端可以正常接收:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1108901.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

vue3+koa+axios实现前后端通信

vue3koaaxios实现前后端通信 写了一个小demo来实现前后端通信,涉及跨域问题&#xff0c;非常简单可以给大家平时开发的时候参考 服务端&#xff1a; 目录结构如下&#xff1a; router index.js // router的入口文件 // 引入路由 const Router require("koa-router&quo…

React组件渲染和更新的过程

一、回顾Vue组件渲染和更新的过程 二、回顾JSX本质和vdom 三、组件渲染和更新 1、组件渲染过程 props state (组件有了props state)render()生成vnodepatch(elem, vnode) 2、组件更新过程 setState(newState) --> dirtyComponents (可能有子组件)render()生成newVnodepa…

3、Flowable任务分配和流程变量

任务分配和流程变量 1.任务分配 1.1 固定分配 固定分配就是我们前面介绍的&#xff0c;在绘制流程图或者直接在流程文件中通过Assignee来指定的方式 1.2 表达式分配 Flowable使用UEL进行表达式解析。UEL代表Unified Expression Language&#xff0c;是EE6规范的一部分.Flo…

通配符SSL证书价格贵吗

通配符SSL证书是一种SSL证书&#xff0c;用于保护基于相同域名但具有不同子域名的多个网站。 它使用通配符字符 "*" 来代表所有可能的二级子域名。JoySSL注册码230609领取免费使用通配符证书 通配符SSL证书的作用是使您可以使用单个证书来保护主域名和所有其下的子域…

千兆光模块和万兆光模块的区别?

在网络通信领域&#xff0c;千兆光模块和万兆光模块是最为常见且广泛应用的两种光模块。不同之处在于传输速率、封装、传输距离、功耗、发射光功率、接收光功率和应用场景等。 千兆光模块的传输速率为1 Gbps&#xff0c;万兆光模块的传输速率为10 Gbps&#xff0c;这意味着万…

柯桥基础俄语口语|初级俄语学习:用联想法学习字母

初级俄语 用联想法学习字母 开始学习俄语的时候&#xff0c;第一任务就是掌握字母表。今天彼得俄语会帮助大家巧记俄语字母表。为此&#xff0c;小编把所有字母分成三组&#xff1a;与拼音相似的&#xff0c;可以用图片记住的以及可以用汉字记住的。 与拼音相似的字母 俄语有3…

数据结构与算法-(9)---双端队列(Deque)

&#x1f308;write in front&#x1f308; &#x1f9f8;大家好&#xff0c;我是Aileen&#x1f9f8;.希望你看完之后&#xff0c;能对你有所帮助&#xff0c;不足请指正&#xff01;共同学习交流. &#x1f194;本文由Aileen_0v0&#x1f9f8; 原创 CSDN首发&#x1f412; 如…

TCP通信-同时接受多个客户端消息

同时处理多个客户端消息的原理 代码实现 public class ClientDemo1 {public static void main(String[] args) {try {System.out.println("客户端启动");// 1、创建Socket通信管道请求有服务端的连接// public Socket(String host, int port)// 参数一&#xff1a;服…

图像处理软件Photoshop 2023 mac新增功能 ps 2023中文版

​Photoshop 2023 mac是一款功能强大、易用且灵活的图像编辑软件&#xff0c;旨在满足专业设计师和摄影师的需求。无论您是处理照片、制作图形还是进行艺术创作&#xff0c;Photoshop 2023 都能为您提供丰富的工具和效果&#xff0c;帮助您实现创意想法。Photoshop还支持多种文…

无蓝光的护眼灯有哪些品牌?分享五款优秀的无蓝光护眼台灯

现在儿童近视率越来越高了&#xff0c;用眼过度疲劳是导致近视的主要因素&#xff0c;学习环境的光线是否合适&#xff0c;都会直接影响用眼的疲劳程度。所以给孩子营造一个良好的学习环境非常重要&#xff01;为大家推荐五大品牌的护眼台灯。 1.书客护眼台灯L1 推荐指数&…

如何压缩ppt文件的大小?

如何压缩ppt文件的大小&#xff1f;要知道现在很多课件都是使用ppt文件&#xff0c;那么就导致ppt文件过大&#xff0c;我们很多时候电脑的存储空间就不够了。为了能够更好的存储这些ppt文件&#xff0c;我们通常会选择压缩ppt文件。怎么压缩ppt文件更快更好&#xff0c;没有损…

[java进阶]——IO流,递归实现多级文件拷贝

&#x1f308;键盘敲烂&#xff0c;年薪30万&#x1f308; 目录 一、认识IO流 二、了解编码与解码 二、IO流体系 三、字节输入输出流 四、字符输入输出流 五、多级文件拷贝 一、认识IO流 IO流也叫输入流(intput)、输出流(onput)&#xff0c;该流就像java程序同硬盘之间的…

迅为RK3588开发板Android12单摄方案设备树修改

打开 3588-android12/kernel-5.10/arch/arm64/boot/dts/rockchip/topeet_camera_config.dtsi 设备树&#xff0c;此设备树中对底板上的摄像头接口进行了配置&#xff0c;如下图所示&#xff1a; 如果想要使用 J1 接口打开摄像头 OV5695 或者 摄像头 OV13850&#xff0c;只需要在…

django建站过程(2)创建第一个应用程序页面

创建第一个应用程序页面 设置第一个页面【settings.py,urls.py,views.py】settings.pyurls.pyviews.py django是由一系列应用程序组成&#xff0c;协同工作&#xff0c;让项目成为一个整体。前面已创建了一个应用程序baseapp,使用的命令 python manage.py startapp baseapps这…

QT最小化到托盘显示

一、效果&#xff1a; 程序关闭后&#xff0c;程序并没有退出&#xff0c;而是放入了托盘中&#xff1b;点击恢复原始大小&#xff0c;或者双击托盘图标&#xff0c;可以恢复程序原来的窗口。如下图。 那qt是如何实现这样的办法呢&#xff0c;其实就是用到了 QSystemTrayIcon类…

Java开发树结构数据封装!

目录 源数据如下controller接口&#xff1a;service层封装:Dao接口&#xff1a;Dao层Mapper:映射实体类&#xff1a; 源数据如下 controller接口&#xff1a; RequestMapping("/UserTreeInfo")public RespBody getUserTreeInfo(Long userId) {List<MenuTreeVo>…

如何使用BERT生成单词嵌入?

阿比贾特萨拉里 一、说明 BERT&#xff0c;或来自变形金刚&#xff08;Transformer&#xff09;的双向编码器表示&#xff0c;是由谷歌开发的强大语言模型。它已广泛用于自然语言处理任务&#xff0c;例如情感分析、文本分类和命名实体识别。BERT的主要特征之一是它能够生成单词…

python中不可变类型和可变类型

不可变类型&#xff1a;修改之后内存存储地址不会发生改变 可变类型&#xff1a;修改之后内存存储地址发生改变 set

chromium线程模型(2)-线程池实现

通过chromium 官方文档&#xff0c;线程和任务一节我们可以知道 &#xff0c;chromium有两类线程&#xff0c;一类是普通线程&#xff0c;最典型的就是io线程和ui线程。 另一类是 线程池线程。 今天我们先分析线程池的实现&#xff08;基于版本 117.0.5847.0&#xff08;开发者…

python小游戏:飞机射击游戏代码

创建一个完整的飞行游戏涉及到许多方面&#xff0c;包括图形设计、游戏物理引擎和用户输入处理等。在这里&#xff0c;我将提供一个简单的基础框架&#xff0c;你可以在其基础上进一步扩展和完善游戏。 在这个示例中&#xff0c;我们将使用Pygame库来创建一个基本的飞行游戏。…