Kafka 从安装到应用

news2024/9/25 23:23:51

博主介绍: ✌博主从事应用安全和大数据领域,有8年研发经验,5年面试官经验,Java技术专家✌

Java知识图谱点击链接:体系化学习Java(Java面试专题)

💕💕 感兴趣的同学可以收藏关注下不然下次找不到哟💕💕

在这里插入图片描述

文章目录

  • 1、什么是 Kafka
  • 2、Kafka 安装教程
  • 3、Kafka 常用的几个命令介绍
  • 4、Kafka 的应用场景
  • 5、Kafka 在 java 上的基础应用代码
  • 6、Kafka 在 SpringBoot 上的应用代码

1、什么是 Kafka

2、Kafka 安装教程

安装前linux上需要安装jdk,这个步骤不做赘述了。并且还要安装一个 Zookeeper。

Kafka的下载地址在官网上可以找到:https://kafka.apache.org/downloads
在该网页上可以找到各个版本的Kafka安装包下载链接,包括源码和二进制包。建议选择最新版本的二进制包进行下载。

linux 在线下载使用wget命令下载Kafka安装包:

wget https://downloads.apache.org/kafka/3.5.0/kafka_2.12-3.5.0.tgz

下载日志如下:


[root@ecs-32f7 software]# wget https://downloads.apache.org/kafka/3.5.0/kafka_2.12-3.5.0.tgz
--2023-06-26 16:59:00--  https://downloads.apache.org/kafka/3.5.0/kafka_2.12-3.5.0.tgz
Resolving downloads.apache.org (downloads.apache.org)... 88.99.95.219, 135.181.214.104, 2a01:4f9:3a:2c57::2, ...
Connecting to downloads.apache.org (downloads.apache.org)|88.99.95.219|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 107000763 (102M) [application/x-gzip]
Saving to: ‘kafka_2.12-3.5.0.tgz’

100%[===================================================================================================================================>] 107,000,763 1.90MB/s   in 4m 26s

2023-06-26 17:03:31 (393 KB/s) - ‘kafka_2.12-3.5.0.tgz’ saved [107000763/107000763]

[root@ecs-32f7 software]#

接下安装包:

tar -zxvf /root/software/kafka_2.12-3.5.0.tgz

修改配置文件

vim /root/software/kafka_2.12-3.5.0/config/server.properties
broker.id=0
listeners=PLAINTEXT://localhost:9092
log.dirs=/root/software/kafka_2.12-3.5.0/kafka-logs

启动

cd /root/software/kafka_2.12-3.5.0
bin/kafka-server-start.sh config/server.properties &

3、Kafka 常用的几个命令介绍

创建 topic

./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

使用Kafka的命令行工具 kafka-console-producer.sh 可以向Kafka发送消息。执行以下命令启动消息生产者:

./kafka-console-producer.sh --broker-list localhost:9092 --topic test

使用Kafka的命令行工具 kafka-console-consumer.sh 可以接收Kafka中的消息。执行以下命令启动消息消费者:

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

查看消费者组 lag 的信息

./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group <group_name>

看某个消费者的详细信息,可以执行以下命令:

./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group <group_name> --members --verbose

4、Kafka 的应用场景

Kafka是一个高性能、可扩展、分布式的消息队列系统,常用于以下场景:

  1. 数据收集与处理:Kafka可以作为数据收集和处理的中间件,用于收集和传输大量的数据,同时支持数据流处理和批处理等多种模式,非常适合大数据场景下的数据处理和分析。
  2. 消息系统:Kafka可以作为消息系统,用于支持实时的消息传递和处理,比如实时日志处理、实时监控和告警等场景。
  3. 数据存储:Kafka提供了高可靠性的数据存储机制,可以用于存储各种类型的数据,比如日志、事件、消息等,同时支持数据的持久化和复制,非常适合高可靠性和高可用性的数据存储场景。
  4. 流处理:Kafka提供了流处理API,可以用于实时处理数据流,支持流与流之间的连接和数据转换,非常适合实时数据处理和分析场景。
  5. 消息队列:Kafka本身就是一个消息队列系统,可以用于支持各种类型的消息队列应用,比如任务队列、通知队列、消息推送等场景。

总之,Kafka具有很广泛的应用场景,尤其是在大数据、实时计算和分布式系统等领域有着广泛的应用。

5、Kafka 在 java 上的基础应用代码

生产者代码如下:

package com.pany.camp.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

/**
 *
 * @description:  Kafka 生产者
 * @copyright: @Copyright (c) 2022 
 * @company: Aiocloud
 * @author: pany
 * @version: 1.0.0 
 * @createTime: 2023-06-26 18:06
 */
public class KafkaProducerExample {
    private static final String TOPIC_NAME = "test-topic";
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", BOOTSTRAP_SERVERS);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 10; i++) {
            String message = "Hello, Kafka! This is message " + i;
            ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, message);
            producer.send(record);
        }
        producer.close();
    }
}

消费者代码如下:

package com.pany.camp.kafka;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

/**
 *
 * @description: Kafka 消费者
 * @copyright: @Copyright (c) 2022 
 * @company: Aiocloud
 * @author: pany
 * @version: 1.0.0 
 * @createTime: 2023-06-26 18:04
 */
public class KafkaConsumerExample {
    private static final String TOPIC_NAME = "test-topic";
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";
    private static final String GROUP_ID = "test-group";

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        Consumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList(TOPIC_NAME));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            records.forEach(record -> {
                System.out.printf("Received message: key=%s, value=%s, partition=%d, offset=%d\n",
                        record.key(), record.value(), record.partition(), record.offset());
            });
        }
    }
}

以上需要引入依赖


<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.8.0</version>
</dependency>

6、Kafka 在 SpringBoot 上的应用代码

生产者

package com.pany.camp.kafka;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 *
 * @description:  生产者
 * @copyright: @Copyright (c) 2022 
 * @company: Aiocloud
 * @author: pany
 * @version: 1.0.0 
 * @createTime: 2023-06-26 18:10
 */
@Component
public class KafkaProducer {

    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String topic, String message) {
        kafkaTemplate.send(topic, message);
    }
}

消费者

package com.pany.camp.kafka;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

/**
 *
 * @description:  消费者
 * @copyright: @Copyright (c) 2022 
 * @company: Aiocloud
 * @author: pany
 * @version: 1.0.0 
 * @createTime: 2023-06-26 18:10 
 */
@Component
public class KafkaConsumer {
    @KafkaListener(topics = "test_topic")
    public void receiveMessage(String message) {
        System.out.println("Received message: " + message);
    }
}

配置文件 properties 配置

spring.kafka.bootstrap-servers=localhost:9092

下面是一个发送消息的例子

package com.pany.camp.kafka;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

/**
 *
 * @description:  发送消息
 * @copyright: @Copyright (c) 2022 
 * @company: Aiocloud
 * @author: pany
 * @version: 1.0.0 
 * @createTime: 2023-06-26 18:12
 */
@RestController
public class KafkaController {
    @Autowired
    private KafkaProducer kafkaProducer;

    @GetMapping("/send/{message}")
    public String sendMessage(@PathVariable String message) {
        kafkaProducer.sendMessage("test_topic", message);
        return "Message sent: " + message;
    }
}

在这里插入图片描述

💕💕 本文由激流原创,首发于CSDN博客,博客主页 https://blog.csdn.net/qq_37967783?spm=1010.2135.3001.5421
💕💕喜欢的话记得点赞收藏啊

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/688887.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MySQL实战解析底层---幻读是什么,幻读有什么问题?

目录 前言 幻读是什么&#xff1f; 幻读有什么问题&#xff1f; 如何解决幻读&#xff1f; 前言 为了便于说明问题&#xff0c;这一篇文章&#xff0c;就先使用一个小一点儿的表建表和初始化语句如下&#xff1a; 这个表除了主键id外&#xff0c;还有一个索引c&#xff0c…

web值控制标签的显示与隐藏、document、getElementById、style、css、hidden、display、visibility

文章目录 方式一方式二方式三visibility小结 方式一 使用HTML的hidden属性&#xff0c;隐藏后不占用原来的位置 hidden属性是一个Boolean类型的值&#xff0c;如果想要隐藏元素&#xff0c;就将值设置为true&#xff0c;否则就将值设置为false 选取id为test的元素 let test do…

“事后达尔文”—— 游戏业务效果评估方法实践

作者&#xff1a;vivo 互联网数据分析团队 Luo Yandong、Zhang Lingchao 本文介绍了互联网业务数据效果评估的几种常见问题及方法&#xff0c;并基于分层抽样的逻辑优化出一套可应用于解决用户不均匀的“事后达尔文"分析法&#xff0c;可适用于无法AB测试或人群不均匀的AB…

VCO的设计

理想振荡器只有电感和电容&#xff0c;会一直振荡下去。但是实际的振荡器存在一定的寄生电阻并联在RC两端&#xff0c;会使振幅变小。因此需要RC旁边再并联一个负电阻以此来抵消寄生电阻的影响。一般会选择负阻提供的能量为寄生电阻能量的的2-3倍。如果负阻跟RC并联&#xff0c…

剖析float相加产生精度损失的原因

float相加产生精度损失的原因 一、什么是float类型及其特点1.1、float类型的定义和使用方法1.2、float类型的特点&#xff0c;包括精度限制 二、为什么会出现float相加精度损失2.1、计算机二进制存储浮点数的方式2.2、浮点数运算中的舍入误差2.3、累加多个小数时的误差累积 三、…

kali中Metasploit基本使用方法

1.kali启动postgresql并设置开机自启动 systemctl start postgresql.servicesystemctl enable postgresql.service2.kali启动Metasploit 方式一:应用程序 -> 漏洞利用工具集 -> Metasploit framework 方式二: msfconsole 3. Metasploit常用命令 connect 命令 连接远程主…

Qt信号槽之connect介绍(上)

关于Qt信号槽中connect与disconnect介绍 首先我们要知道&#xff0c;如果想要使用Qt中的信号槽机制&#xff0c; 那么必须继承QObject类&#xff0c;因为QObject类中包含了信号槽的一系列操作&#xff0c;今天我们来讲解的是信号与槽怎么建立连接以及断开连接。 一、connect …

在windows server上用Mosquitto软件创建MQTT服务器

今天下午捣鼓了半天&#xff0c;在云服务器上面创建了个MQTT服务器&#xff0c;然后用MQTTX软件进行了测试。过程记录如下&#xff1a; 1、下载mosquitto软件&#xff0c;链接如下图&#xff1a; 2、下载完成后安装&#xff0c;一直点下一步下一步就好了。 3、在安装路径下&am…

快速捡回使用workbench控制mysql创建数据库的基本步凑

首先如果&#xff0c;不想要在原来已经建好的数据库下建立数据表&#xff0c;可以新建数据库。 具体操作步凑如下&#xff1a; 选择后如下所示&#xff1a; 有现成的创建代码的话&#xff0c;就直接复制执行现成的创建代码即可&#xff0c;如果没有现成的创建代码的话&#xff…

Java设计模式之单例模式-【懒汉式与饿汉式】

1、单例&#xff0c;模式 单例模式属于创建型模式的一种&#xff0c;应用于保证一个类仅有一个实例的场景下&#xff0c;并且提供了一个访问它的全局方法 单例模式的特点&#xff1a;从系统启动到终止&#xff0c;整个过程只会产生一个实例。单例模式常用写法&#xff1a;懒汉…

STM32设置为I2C从机模式

STM32设置为I2C从机模式 目录 STM32设置为I2C从机模式前言1 硬件连接2 软件编程3 运行测试3.1 I2C连续写入3.1 I2C连续读取3.1 I2C单次读写测试 4 总结 前言 STM32的I2C作为主机的情况相信很多同学都用过&#xff0c;网上也有很多教程&#xff0c;但是作为从设备使用的例子应该…

【C++ 程序设计】第 9 章:函数模板与类模板

目录 一、函数模板 &#xff08;1&#xff09;函数模板的概念 &#xff08;2&#xff09;函数模板的示例 &#xff08;3&#xff09;函数或函数模板调用语句的匹配顺序 二、类模板 &#xff08;1&#xff09;类模板概念 &#xff08;2&#xff09;类模板示例 &…

阵列模式综合第三部分:深度学习(附源码)

一、前言 这个例子展示了如何设计和训练卷积神经网络&#xff08;CNN&#xff09;来计算产生所需模式的元素权重。 二、介绍 模式合成是阵列处理中的一个重要课题。阵列权重有助于塑造传感器阵列的波束图案&#xff0c;以匹配所需图案。传统上&#xff0c;由于空间信号处理和频…

SSL工作原理

SSL 是一个安全协议&#xff0c;它提供使用 TCP/IP 的通信应用程序间的隐私与完整性。因特网的 超文本传输协议&#xff08;HTTP&#xff09;使用 SSL 来实现安全的通信。 在客户端与服务器间传输的数据是通过使用对称算法&#xff08;如 DES 或 RC4&#xff09;进行加密的。公…

使用ZenDAS进行Gompertz趋势分析

某项目做了18次测试&#xff0c;每次测试发现的缺陷个数如下表所示&#xff1a; 测试序号 发现缺陷数 1 60 2 96 3 157 4 191 5 155 6 106 7 64 8 335 9 92 10 196 11 109 12 133 13 166 14 129 15 16 16 30 17 19 18 5 对上述的数据在Z…

IPv6手工隧道配置与验证实验

IPv6手工隧道配置与验证实验 【实验目的】 熟悉IPv6手工隧道的概念。 掌握IPv6和IPv4共存的实现方法。 掌握IPv6手工隧道的配置。 验证配置。 【实验拓扑】 实验拓扑如下图所示。 实验拓扑 设备参数如表所示。 设备参数表 设备 接口 IPv6地址 子网掩码位数 默认网…

中间件-netty(1)

netty 前言篇 文章目录 一、IO基础篇1.概念1.1 阻塞(Block)和非阻塞(Non-Block)1.2 同步(Synchronization)和异步(Asynchronous)1.3 BIO 与 NIO 对比1.3.1 面向流与面向缓冲1.3.2 阻塞与非阻塞1.3.3 选择器的问世 2.NIO 和 BIO 如何影响应用程序的设计2.1 API调用2.2 数据处理2…

蓝桥杯专题-试题版-【操作格子】【查找整数】【分解质因数】【高精度加法】

点击跳转专栏>Unity3D特效百例点击跳转专栏>案例项目实战源码点击跳转专栏>游戏脚本-辅助自动化点击跳转专栏>Android控件全解手册点击跳转专栏>Scratch编程案例点击跳转>软考全系列点击跳转>蓝桥系列 &#x1f449;关于作者 专注于Android/Unity和各种游…

Spring FrameWork从入门到NB -三级缓存解决循环依赖内幕 (二)

开始用上一篇文章讲到的Spring依赖注入的步骤&#xff0c;用两个例子来推导一下整个过程&#xff0c;举例子有助于了解真相。 先用一个最简单的例子&#xff1a;没有依赖的单例bean的创建。 推导过程是需要上一篇文章的步骤的&#xff0c;要参照步骤一步一步来。 无依赖的单…

Linux内核代码60%都是驱动?驱动代码不会造成内核臃肿吗?

为什么内核中驱动占比最高 一、前言二、Linux中避免内核臃肿的措施2.1 交叉编译及SDK包的裁剪2.2 设备树2.3 模块化2.4 硬件抽象层 三、嵌入式Linux的裁剪四、总结 一、前言 今天逛知乎看到这么一个问题&#xff1a;为什么Linux内核代码60%都是驱动? 如果每支持新的设备就加入…