本地搭建kafka并用java实现发送消费消息

news2025/1/24 17:38:11

1、下载kafka的jar包文件

https://www.apache.org/dyn/closer.cgi?path=/kafka/3.1.0/kafka_2.12-3.1.0.tgz

请添加图片描述

2、下载完成直接操作命令启动

	1、打开新的terminal(终端)窗口,进入kafka的bin目录 启动zk
		 ./zookeeper-server-start.sh ../config/zookeeper.properties
	2、新开命令行
		启动kafka,进入bin目录:./kafka-server-start.sh ../config/server.properties &

3、测试

		1、进入bin目录 ,创建主题
 			./kafka-topics.sh —create —bootstrap-server localhost:9092 —replication-factor 1 —partition 1 —topic kafkaTest 
 		2、启动生产者
			./kafka-console-producer.sh --broker-list localhost:9092 --topic kafkaTest 
		3、启动消费者
			./kafka-console-consumer.sh --bootstrap-server qf01:9092 --topic test --from-beginning

4、展示

生产者:
请添加图片描述
发送kafka的测试,消费者直接消费
请添加图片描述

5、java操作kafka完成消费

需要启动上面的操作 启动kafka 建立主题,启动zk
首先下载一个可视化软件
mac版本

https://www.kafkatool.com/download.html

5.1 导入依赖

<!-- kafka客户端工具 -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.4.1</version>
        </dependency>

        <!-- 工具类 -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-io</artifactId>
            <version>1.3.2</version>
        </dependency>

5.2 开发生产者

调用send发送1-100消息到指定Topic test

public class KafkaProducerTest {
    public static void main(String[] args) {
        // 1. 创建用于连接Kafka的Properties配置
        Properties props = new Properties();
        props.put("bootstrap.servers", "server1:9092");
        props.put("acks", "all");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // 2. 创建一个生产者对象KafkaProducer
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);

        // 3. 调用send发送1-100消息到指定Topic test
        for (int i = 0; i < 100; ++i) {
            try {
                // 获取返回值Future,该对象封装了返回值
                Future<RecordMetadata> future = producer.send(new ProducerRecord<String, String>("test", null, i + ""));
                // 调用一个Future.get()方法等待响应
                future.get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }

        // 5. 关闭生产者
        producer.close();

    }
}

5.3 示例

请添加图片描述

5.4 开发消费者

public class KafkaConsumerTest {

    /**
     * @param args
     * @throws InterruptedException 
     */
    public static void main(String[] args) throws InterruptedException {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "localhost:9092");
        props.setProperty("group.id", "test");
        // 自动提交offset
        props.setProperty("enable.auto.commit", "true");
        // 自动提交offset的时间间隔
        props.setProperty("auto.commit.interval.ms", "1000");
        // 拉取的key、value数据的
        props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        // 2.创建Kafka消费者
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);

        // 3. 订阅要消费的主题
        // 指定消费者从哪个topic中拉取数据
        kafkaConsumer.subscribe(Arrays.asList("test"));

        // 4.使用一个while循环,不断从Kafka的topic中拉取消息
        while (true) {
            // Kafka的消费者一次拉取一批的数据
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(5);
            // 5.将将记录(record)的offset、key、value都打印出来
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                // 主题
                String topic = consumerRecord.topic();
                // offset:这条消息处于Kafka分区中的哪个位置
                long offset = consumerRecord.offset();
                // key\value
                String key = consumerRecord.key();
                String value = consumerRecord.value();

                System.out.println("topic: " + topic + " offset:" + offset + " key:" + key + " value:" + value);
            }
            Thread.sleep(1000);
        }
    }

}

5.5 启动两个类(部分截图)

请添加图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1041065.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

bash中执行比较的几种方法

bash 脚本中的 test 命令用于检查表达式的有效性&#xff0c;检查命令或表达式为 true 或者 false。此外&#xff0c;它还可以用于检查文件的类型和权限。 如果命令或表达式有效&#xff0c;则 test 命令返回0&#xff0c;否则返回1。 使用 test 命令 test 命令的基本语法如…

速卖通数据分析怎么看?速卖通数据分析工具有哪些?—站斧浏览器

速卖通数据分析怎么看&#xff1f; 1、关注销售指标&#xff1a;在进行速卖通数据分析时&#xff0c;卖家应特别关注销售指标&#xff0c;如销售额、订单量、转化率等。通过对这些指标的分析&#xff0c;卖家可以了解到自己店铺的销售状况以及变化趋势&#xff0c;进而采取相应…

【postgresql】 ERROR: multiple assignments to same column “XXX“

Cause: org.postgresql.util.PSQLException: ERROR: multiple assignments to same column "XXX"; bad SQL grammar []; nested exception is org.postgresql.util.PSQLException: ERROR: multiple assignments to same column "XXX"; 原因&#xff1a;or…

SpringCloud Gateway--Predicate/断言(详细介绍)中

&#x1f600;前言 本篇博文是关于SpringCloud Gateway–Predicate/断言&#xff08;详细介绍&#xff09;中&#xff0c;希望你能够喜欢 &#x1f3e0;个人主页&#xff1a;晨犀主页 &#x1f9d1;个人简介&#xff1a;大家好&#xff0c;我是晨犀&#xff0c;希望我的文章可以…

GraalJS及平台JS脚本能力建设

GraalJS及平台JS脚本能力建设 GraalJS替换Nashorn Oracle宣布弃用Nashorn Javascript引擎&#xff0c;最终将从未来所有的JDK中删除。 Nashorn最初是在JDK 8中引入的&#xff0c;用于取代Rhino脚本引擎。发布时&#xff0c;Nashorn是ECMAScript-262 5.1的完整实现&#xff0…

服务接口调用OpenFeign_日志增强

OpenFeign虽然提供了日志增强功能&#xff0c;但是默认是不显示任何日志的&#xff0c;不过开发者在调试阶段可以自己配置日志的级别。 OpenFeign的日志级别如下&#xff1a; NONE&#xff1a;默认的&#xff0c;不显示任何日志;BASIC&#xff1a;仅记录请求方法、URL、响应状…

CMU15-213 课程笔记 04-Floating Point

文章目录 浮点数如何用二进制表示IEEE 浮点数标准IEEE 浮点数实现IEEE 浮点数在内存里 E exp - bias 计算指数M 1.xxx 尾数计算举例&#xff1a;对一个浮点数进行转换一些关于浮点数的计算等等 浮点数如何用二进制表示 计算机内部的浮点数不是这样存在内存里的&#xff08;至…

解决vs2022项目文件夹内.vs文件夹容量虚高问题

打开系统显示隐藏文件夹 会在vs2022的项目文件夹内有一个.vs文件夹 在子目录里会有一个Browse.VC.db文件,我的项目代码只有120m,而这个db文件居然有70m 而且每次打开vs项目,会使这个文件发生容量变化,如果你的git项目恰好包含这个.vs文件夹,那就比较不爽了,每次都要更新这个文件…

Python中的设计模式 -- 单例

迷途小书童 读完需要 2分钟 速读仅需 1 分钟 当我们谈到单例模式时&#xff0c;可以想象一个非常特殊的餐厅&#xff0c;这个餐厅只有一个桌子&#xff0c;无论多少人来用餐&#xff0c;都只能坐在这个桌子上。这个桌子就是餐厅的单例&#xff0c;它保证了整个餐厅中只有一个桌…

数据结构学习笔记——查找算法中的树形查找(平衡二叉树)

目录 一、平衡二叉树的定义二、平衡因子三、平衡二叉树的插入和构造&#xff08;一&#xff09;LL型旋转&#xff08;二&#xff09;LR型旋转&#xff08;三&#xff09;RR型旋转&#xff08;四&#xff09;RL型旋转 四、平衡二叉树的删除&#xff08;一&#xff09;叶子结点&a…

进行 XSS 攻击 和 如何防御

跨站脚本攻击&#xff08;XSS 攻击&#xff09;是 Web 开发中最危险的攻击之一。以下是它们的工作原理以及防御方法。 XSS 攻击 跨站脚本攻击就是在另一个用户的计算机上运行带有恶意的 JS 代码。假如我们的程序没有对这些恶意的脚本进行防御的话&#xff0c;他们就会由我们的…

【刷题笔记9.25】LeetCode:相交链表

LeetCode&#xff1a;相交链表 一、题目描述 给你两个单链表的头节点 headA 和 headB &#xff0c;请你找出并返回两个单链表相交的起始节点。如果两个链表不存在相交节点&#xff0c;返回 null 。 二、分析及代码 方法一&#xff1a;使用哈希Set集合 &#xff08;注意…

安装ipfs-swarm-key-gen

安装ipfs-swarm-key-gen Linux安装go解释器安装ipfs-swarm-key-gen Linux安装go解释器 https://blog.csdn.net/omaidb/article/details/133180749 安装ipfs-swarm-key-gen # 编译ipfs-swarm-key-gen二进制文件 go get -u github.com/Kubuxu/go-ipfs-swarm-key-gen/ipfs-swarm…

机器学习(19)---神经网络详解

神经网络 一、神经网络概述1.1 神经元模型1.2 激活函数 二、感知机2.1 概述2.2 实现逻辑运算2.3 多层感知机 三、神经网络3.1 工作原理3.2 前向传播3.3 Tensorflow实战演示3.3.1 导入数据集查看3.3.2 数据预处理3.3.3 建立模型3.3.4 评估模型 四、反向传播五、例题5.1 题15.2 题…

Qt/C++音视频开发56-udp推流和拉流/组播和单播推流

一、前言 之前已经实现了rtsp/rtmp推流&#xff0c;rtsp/rtmp/hls/flv/ws-flv/webrtc等拉流&#xff0c;这种一般都需要依赖一个独立的流媒体服务程序&#xff0c;有没有一种更便捷的方式不需要这种依赖&#xff0c;然后又能实现推拉流呢&#xff0c;当然有的那就是udpp推流&a…

Linux DataEase数据可视化分析工具结合cpolar实现远程访问

文章目录 前言1. 安装DataEase2. 本地访问测试3. 安装 cpolar内网穿透软件4. 配置DataEase公网访问地址5. 公网远程访问Data Ease6. 固定Data Ease公网地址 前言 DataEase 是开源的数据可视化分析工具&#xff0c;帮助用户快速分析数据并洞察业务趋势&#xff0c;从而实现业务…

2023-油猴(Tampermonkey)脚本推荐

2023-油猴&#xff08;Tampermonkey&#xff09;脚本推荐 知乎增强 链接 https://github.com/XIU2/UserScript https://greasyfork.org/zh-CN/scripts/419081 介绍 移除登录弹窗、屏蔽首页视频、默认收起回答、快捷收起回答/评论&#xff08;左键两侧&#xff09;、快捷回…

HTML+CSS综合案例二:CSS简介

<!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title> CSS简介</title><style>h1{color: #33…

Linux安装Mysql主从集群(图文解说详细版)

MySQL主从集群是一种数据库架构模式&#xff0c;由一个主数据库&#xff08;Master&#xff09;和多个从数据库&#xff08;Slave&#xff09;组成。在主从集群中&#xff0c;主数据库负责处理写操作&#xff08;如插入、更新、删除&#xff09;&#xff0c;而从数据库则用于读…

蓝桥杯每日一题2023.9.23

4961. 整数删除 - AcWing题库 题目描述 分析 注&#xff1a;如果要进行大量的删除操作可以使用链表 动态求最小值使用堆&#xff0c;每次从堆中取出最小值的下标然后在链表中删除 注意long long 代码解释&#xff1a; while(k --){auto t q.top();q.pop();res t.first;i…