kafka基本概念以及用法

news2024/11/28 2:49:27

kafka基本概念以及用法目录


文章目录

  • kafka基本概念以及用法目录
  • 一、什么是kafka?
  • 二、为什么要使用kafka?
    • 三、kafka的基本概念
    • 四、安装kafka(windows版本)
    • 五、命令行控制kafka生产消费数据,创建 删除topic
    • 六、java操作kafka消费生产


提示:以下是本篇文章正文内容,下面案例可供参考

一、什么是kafka?

kafka是一个分布式流处理工具,被全球大量公司广泛应用在大数据处理领域

二、为什么要使用kafka?

众所周知,在微服务领域或者大数据领域,需要服务和服务之间进行数据交换,数据通信,大数据领域系统和系统之间可能还有海量的数据交换压力。
1.传统的线程和线程之间的数据交换方式
在这里插入图片描述
jvm中会有一块公共的区域叫做堆内存,线程和线程之间会推送数据到堆内存中,其他线程需要获取数据就去堆内存获取
2.传统的进程和进程之间的数据交换方式
在这里插入图片描述

进程和进程之间是通过网络传输数据(Http,或者socket等常见网络传输协议)
但是不管是进程还是线程,传统这种数据传输交换方式,如果在海量数据高并发的场景下,如果接受数据方的内存跟不上推送的速度,就会引起内存溢出,堆内存溢出等生产问题。而kafka就是为了解决这个问题,孕育而生的。他充当了交换数据中间的一个中间件,类似一个消息队列的缓冲区

三、kafka的基本概念

一般市面上面的消息队列都遵循了JMS(Java Message Service)的传输规范
1.P2P(point to point)
在这里插入图片描述
2.PS(publish and subscribe) 发布订阅模式
在这里插入图片描述

四、安装kafka(windows版本)

1.下载kafka
https://kafka.apache.org/downloads
在这里插入图片描述
解压目录:
在这里插入图片描述
启动kafka需要依赖zookeeper,我们可以使用kafka自带的zookeeper
在这里插入图片描述
在log文件夹下面新建zk文件夹区分日志文件
在这里插入图片描述
修改配置文件中zookeeper文件日志文件位置
在这里插入图片描述
修改kafka运行日志保存位置
在这里插入图片描述
启动zookeeper和kafka
cmd到bin目录下面windows执行下面的bat脚本
在这里插入图片描述后面跟上刚才修改的配置文件
在这里插入图片描述
启动zookeeper成功 。
开始启动kafka
启动bat脚本 后面跟上刚才带上的配置文件
在这里插入图片描述
在这里插入图片描述
启动成功
tips:后续可能会出现kafka出现进程挂掉的报错
在这里插入图片描述
可以删除配置的两个文件夹下面的文件,重新启动zookeeper和kafka
在这里插入图片描述

五、命令行控制kafka生产消费数据,创建 删除topic

1.创建一个名为test的topic
kafka-topics.bat --bootstrap-server localhost:9092 --topic test --create
2.查看所有topic
kafka-topics.bat --bootstrap-server localhost:9092 --list
3.删除一个名为test的topic
kafka-topics.bat --bootstrap-server localhost:9092 --topic test --delete
4.生产数据
kafka-console-producer.bat --broker-list localhost:9092 --topic test
5.消费数据
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test
在这里插入图片描述
在这里插入图片描述

tips:可以加–help查看每个脚本后面的参数的具体用法
在这里插入图片描述

六、java操作kafka消费生产

先引入maven依赖:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.8.0</version>
</dependency>

version根据自己的kafka版本做调整

生产者代码:

   public static void main(String[] args) {
        // 配置Kafka生产者
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // Kafka集群地址
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        // 创建Kafka生产者
        Producer<String, String> producer = new KafkaProducer<>(props);

        // 发送消息
        try {
            producer.send(new ProducerRecord<>("test", "key", "Message to send"));
            System.out.println("Message sent");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 关闭生产者
            producer.close();
        }
    }

命令行接受到了生产数据
在这里插入图片描述
消费者代码:

 public static void main(String[] args) {
        // 配置Kafka消费者
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("group.id", "test-group");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        // 创建Kafka消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

        // 订阅主题
        consumer.subscribe(Arrays.asList("test"));

        // 轮询消费消息
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("Offset: %d, Key: %s, Value: %s\n", record.offset(), record.key(), record.value());
            }
        }
    }

再运行生产者代码 控制台就会打印出来消费的数据
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2186881.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Ubuntu操作系统版本服务支持时间(更新到24.04)

文章参考链接 以下是解释&#xff1a; 开发代号&#xff1a;Ubuntu的每个版本都有一个开发代号&#xff0c;例如“Mantic Minotaur”。 版本命名&#xff1a;Ubuntu的版本号是根据发布年份和月份来命名的。例如&#xff0c;Ubuntu 23.10是在2023年10月发布的。 LTS版本&…

Windows 11 24H2正式发布

微软最近正式发布了Windows 11 24H2&#xff0c;这是Windows 11的最新功能更新&#xff0c;带来了多项新特性和改进。 主要新功能&#xff1a; 人工智能增强&#xff1a;此次更新特别强调AI能力&#xff0c;推出了如Windows Copilot的增强版本。Copilot的界面得到了改善&#…

【微服务】注册中心 - Eureka(day3)

CAP理论 P是分区容错性。简单来说&#xff0c;分区容错性表示分布式服务中一个节点挂掉了&#xff0c;并不影响其他节点对外提供服务。也就是一台服务器出错了&#xff0c;仍然可以对外进行响应&#xff0c;不会因为某一台服务器出错而导致所有的请求都无法响应。综上所述&…

关于Mybatis框架操作时注意的细节,常见的错误!(博主亲生体会的细节!)

目录 1.在对DB进行CRUD时&#xff0c;除了查&#xff0c;其余的操作都要进行事务的提交否则不成功。 2.用sqlSession原生方法时&#xff0c;第一个参数方法名&#xff0c;是xml文件中定义的id名&#xff0c;底层找的是你这个接口所定义的方法名。 3.以包为单位引入映射文件 …

第三节-类与对象(2)默认成员函数详解

1.类的6个默认成员函数 如果一个类中什么成员都没有&#xff0c;简称为空类&#xff08;空类大小为1&#xff09;。 空类中真的什么都没有吗&#xff1f;并不是&#xff0c;任何类在什么都不写时&#xff0c;编译器会自动生成以下6个默认成员函数。 默认成员函数&#xff1a;…

DOM树(下) -- 第八课

文章目录 前言一、DOM属性操作1. 获取属性值2. 设置属性值3. 移除属性值 二、节点1.什么是节点?2. 节点层级1. 获取父级节点2. 获取兄弟节点3. 获取子节点 3. 节点操作1. 创建节点2. 添加和删除节点 三、事件进阶1. 注册事件1. 传统方式2. 监听方式 2. 删除事件3. 事件流 四、…

第4篇:MSSQL日志分析----应急响应之日志分析篇

常见的数据库攻击包括弱口令、SQL注入、提升权限、窃取备份等。对数据库日志进行分析&#xff0c;可以发现攻击行为&#xff0c;进一步还原攻击场景及追溯攻击源。 0x01 MSSQL日志分析 首先&#xff0c;MSSQL数据库应启用日志记录功能&#xff0c;默认配置仅限失败的登录&…

Veritus netbackup 管理控制台无法连接:未知错误

节假日停电&#xff0c;netbackup服务器意外停机后重新开机&#xff0c;使用netbackup管理控制台无法连接&#xff0c;提示未知错误。 ssh连接到服务器&#xff0c;操作系统正常&#xff0c;那应该是应用有问题&#xff0c;先试一下重启服务器看看。重新正常关机&#xff0c;重…

【Ubuntu】使用阿里云apt源来更新apt源

1.前言 我在京东云买了一个云服务器&#xff0c;但是我第一次使用apt的时候&#xff0c;发现遇到了下面这些情况 后面听老师讲&#xff0c;还需要执行下面这个 但是我再次使用apt下载软件的时候&#xff0c;还是出现了下面这个情况 后面问了老师才知道是apt源的问题&#x…

解决Github打不开或速度慢的问题

一、原因 我们先分析一下Github在国内访问慢或有时候登陆不上去的问题原因&#xff1a;其实这都是因为我们访问github官网时是直接访问域名即github.com&#xff0c;那么中间有个域名通过DNS解析的过程&#xff0c;将域名解析为对应的ip地址&#xff0c;其实主要时间都是花在了…

【寻找one piece的算法之路】——双指针算法!他与她是否会相遇呢?

&#x1f490;个人主页&#xff1a;初晴~ &#x1f4da;相关专栏&#xff1a;寻找one piece的刷题之路 什么是双指针算法 双指针算法是一种常用的编程技巧&#xff0c;尤其在处理数组和字符串问题时非常有效。这种方法的核心思想是使用两个指针来遍历数据结构&#xff0c;这两…

学习记录:js算法(五十二):验证二叉搜索树

文章目录 验证二叉搜索树我的思路网上思路 总结 验证二叉搜索树 给你一个二叉树的根节点 root &#xff0c;判断其是否是一个有效的二叉搜索树。 有效 二叉搜索树定义如下&#xff1a; 节点的 左子树 只包含 小于 当前节点的数。 节点的 右子树 只包含 大于 当前节点的数。 所有…

【Python】AudioLazy:基于 Python 的数字信号处理库详解

AudioLazy 是一个用于 Python 的开源数字信号处理&#xff08;DSP&#xff09;库&#xff0c;设计目的是简化信号处理任务并提供更直观的操作方式。它不仅支持基础的滤波、频谱分析等功能&#xff0c;还包含了滤波器、信号生成、线性预测编码&#xff08;LPC&#xff09;等高级…

Mybatis框架梳理

Mybatis框架梳理 前言1.ORM2.模块划分2.1 ORM的实现2.2 SQL的映射2.3 插件机制2.4 缓存机制2.5 其他 3. 愿景 前言 如果让我聊一聊mybatis&#xff0c;我该怎么说呢&#xff1f;开发中时时刻刻都在用它&#xff0c;此时此刻&#xff0c;脑海中却只浮现ORM框架这几个字&#xff…

Linux --入门学习笔记

文章目录 Linux概述基础篇Linux 的安装教程 ⇒ 太简单了&#xff0c;百度一搜一大堆。此处略……Linux 的目录结构常用的连接 linux 的开源软件vi 和 vim 编辑器Linux 的关机、开机、重启用户登录和注销用户管理添加用户 ⇒ ( useradd 用户名 ) &#xff08; useradd -d 制定目…

【AIGC】内容创作——AI文字、图像、音频和视频的创作流程

我的主页&#xff1a;2的n次方_ 近年来&#xff0c;生成式人工智能&#xff08;AIGC&#xff0c;Artificial Intelligence Generated Content&#xff09;技术迅速发展&#xff0c;彻底改变了内容创作的各个领域。无论是文字、图像、音频&#xff0c;还是视频&#xff0c;A…

SPARK调优:AQE特性(含脑图总结)

学完AQE需要能够回答如下的几个问题&#xff1a; 什么是AQE&#xff1f;AQE的实现原理是什么&#xff1f;AQE的特性有哪些&#xff1f;使用什么参数实现&#xff1f;AQE每个特性可以解决什么问题&#xff1f;什么问题是AQE不能解决的 HL&#xff1a;学习脑图如下 SparkAQE是spa…

MES系统适用于哪些行业?MES系统对于企业的作用和价值

MES系统&#xff08;制造执行系统&#xff09;广泛应用于多个行业&#xff0c;并在这些行业中发挥着重要作用&#xff0c;为企业带来了显著的价值。以下是对MES系统适用行业及其对企业作用和价值的详细分析&#xff1a; 一、MES系统适用的行业 电子信息行业&#xff1a; 随着市…

大功率LED模块(5V STM32)

目录 一、介绍 二、模块原理 1.尺寸介绍 2.模块原理图 3.引脚描述 三、程序设计 main.c文件 timer.h文件 timer.c文件 四、实验效果 五、资料获取 项目分享 一、介绍 大功率LED模块是一种照明设备&#xff0c;它将大功率高亮度发光二极管(LED)集成在铝基板上&…

Linux学习笔记(二):深入理解用户管理、运行级别与命令行操作

Linux学习笔记&#xff08;二&#xff09;&#xff1a;深入理解用户管理、运行级别与命令行操作 Linux学习笔记&#xff08;一&#xff09;&#xff1a;Linux学习环境的安装及远程连接工具的使用 1. 用户管理 1.1 用户密码管理 创建用户密码 使用 passwd 命令可以为指定用户…