Kafka基础_1

news2025/1/12 23:05:33

Kafka系列

注:大家觉得博客好的话,别忘了点赞收藏呀,本人每周都会更新关于人工智能和大数据相关的内容,内容多为原创,Python Java Scala SQL 代码,CV NLP 推荐系统等,Spark Flink Kafka Hbase Hive Flume等等~写的都是纯干货,各种顶会的论文解读,一起进步。
今天继续和大家分享一下Kafka系列
#博学谷IT学习技术支持

文章目录

  • Kafka系列
  • 前言
  • 一、Kafka的架构
  • 二、Kafka的shell命令使用
    • 1. 如何创建Topic
    • 2.查看当前有那些topic:
    • 3.查看某一个Topic的详细信息:
    • 4.如何删除Topic
    • 5.如何修改Topic
    • 6.如何模拟生产者: 发送数据
    • 7.如何模拟消费者: 消费数据
  • 三、Kafka的Java API的操作
    • 1.创建一个Maven的项目, 导入相关的依赖
    • 2.演示如何将数据生产到Kafka
    • 3.演示如何从Kafka消费数据
  • 总结


前言

Kafka是Apache旗下的一款开源免费的消息队列的中间件产品,最早是由领英公司开发的, 后期共享给Apache, 目前已经是Apache旗下的顶级开源的项目, 采用语言为Scala
在这里插入图片描述
适用场景: 数据传递工作, 需要将数据从一端传递到另一端, 此时可以通过Kafka来实现, 不局限两端的程序

​ 在实时领域中, 主要是用于流式的数据处理工作


一、Kafka的架构

在这里插入图片描述
Kafka Cluster: kafka集群
broker: kafka的节点
producer: 生产者
consumer: 消费者
Topic: 主题/话题 理解就是一个大的逻辑容器(管道)
shard: 分片. 一个Topic可以被分为N多个分片, 分片的数量与节点数据没有关系
replicas: 副本, 可以对每一个分片构建多个副本, 副本数量最多和节点数量一致(包含本身) 保证数据不丢失
zookeeper: 存储管理集群的元数据信息

二、Kafka的shell命令使用

Kafka是一个消息队列的中间件产品, 主要的作用: 将数据从程序一端传递到另一端的操作, 所以说学习Kafka主要学习如何使用Kafka生产数据, 以及如何使用Kafka消费数据

1. 如何创建Topic

./kafka-topics.sh  --create --zookeeper node1:2181,node2:2181,node3:2181 --topic test01 --partitions 3 --replication-factor 2

2.查看当前有那些topic:

./kafka-topics.sh  --list  --zookeeper node1:2181,node2:2181,node3:2181

3.查看某一个Topic的详细信息:

./kafka-topics.sh --describe --zookeeper  node1:2181,node2:2181,node3:2181 --topic test01

在这里插入图片描述

4.如何删除Topic

./kafka-topics.sh --delete --zookeeper node1:2181,node2:2181,node3:2181 --topic test01

5.如何修改Topic

Topic 仅允许增大分片, 不允许减少分片 同时也不支持修改副本的数量

./kafka-topics.sh  --alter --zookeeper node1:2181,node2:2181,node3:2181 --topic test01  --partitions 5

6.如何模拟生产者: 发送数据

./kafka-console-producer.sh  --broker-list node1:9092,node2:9092,node3:9092 --topic test01

在这里插入图片描述

7.如何模拟消费者: 消费数据

默认从当前的时间开始消费数据, 如果想从头开始消费, 可以添加 --from-beginning 参数即可

./kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic test01

在这里插入图片描述

三、Kafka的Java API的操作

1.创建一个Maven的项目, 导入相关的依赖

	<repositories><!--代码库-->
        <repository>
            <id>aliyun</id>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
            <releases><enabled>true</enabled></releases>
            <snapshots>
                <enabled>false</enabled>
                <updatePolicy>never</updatePolicy>
            </snapshots>
        </repository>
    </repositories>

    <dependencies>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.4.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-io</artifactId>
            <version>1.3.2</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.6</version>
        </dependency>

        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.16</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <target>1.8</target>
                    <source>1.8</source>
                </configuration>
            </plugin>
        </plugins>
    </build>

2.演示如何将数据生产到Kafka

package com.itheima.kafka.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class KafkaProducerTest {

    public static void main(String[] args) {

        // 第一步: 创建kafka的生产者核心对象: KafkaProducer  传入相关的配置
        Properties props = new Properties();
        props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
        props.put("acks", "all");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);

        //2. 执行发送数据操作
        for (int i = 0; i < 10; i++) {
            ProducerRecord<String, String> producerRecord = new ProducerRecord<>(
                    "test01", "张三"+i
            );
            producer.send(producerRecord);
        }
        //3. 执行close 释放资源
        producer.close();
    }
}

3.演示如何从Kafka消费数据

package com.itheima.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class KafkaConsumerTest {
    public static void main(String[] args) {
        // 1- 创建Kafka的消费者的核心对象: KafkaConsumer
        Properties props = new Properties();
        props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
        props.put("group.id", "test"); // 消费者组的ID
        props.put("enable.auto.commit", "true"); // 是否自动提交偏移量offset
        props.put("auto.commit.interval.ms", "1000"); // 自动提交的间隔时间
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // key值的反序列化的类
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // value的值反序列化的类

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        //2. 订阅topic: 表示消费者从那个topic来消费数据  可以指定多个
        consumer.subscribe(Arrays.asList("test01"));

        while (true) {
            // 3. 从kafka中获取消息数据, 参数表示当kafka中没有消息的时候, 等待的超时时间, 如果过了等待的时间, 返回空对象(对象存在, 但是内部没有数据  相当于空容器)
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                long offset = record.offset();
                String key = record.key();
                String value = record.value();
                // 偏移量: 每一条数据 其实就是一个偏移量 , 每个分片单独统计消息到达了第几个偏移量 偏移量从 0 开始的
                System.out.println("消息的偏移量为:"+offset+"; key值为:"+key + "; value的值为:"+ value);
            }
        }
    }
}

总结

以上就是今天要讲的内容,本文主要介绍了Kafka的基础和一些常规Shell和Java的操作。后续还会补充更多关于Kafka的内容

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/97050.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

一文讲懂泛型

Java高级Java高级语言特性一. 泛型1. 1 为什么我们需要泛型1. 2 泛型类和泛型接口的定义1. 3 泛型方法1. 4 限定类型变量1. 5 泛型中的约束和局限性1. 6 泛型中的继承规则1. 7 通配符类型1.7.1 问题抛出&#xff0c;为啥需要通配符&#xff1f;1.7.2 &#xff1f; extends X1.7…

RocketMq的基本概念

&#x1f3b6; 文章简介&#xff1a;RocketMq的基本概念 &#x1f4a1; 创作目的&#xff1a;关于RocketMq的基本概念的大致介绍 ☀️ 今日天气&#xff1a;阳光明媚。 &#x1f4dd; 每日一言&#xff1a;冬有冬的来意&#xff0c;雪有雪的秘密。 文章目录&#x1f436; 1、Ro…

MySQL~DQL查询数据

4、DQL查询数据&#xff08;最重点&#xff09; 4.1、DQL &#xff08;Data Query LANGUAGE&#xff1a;数据查询语言&#xff09; 所有的查询操作都用它 Select简单的查询&#xff0c;复杂的查询它都能做~数据库中最核心的语言&#xff0c;最重要的语句使用频率最高 SELEC…

Kafka 集群部署与测试

安装Kafka&#xff08;需要JDK和Zookeeper&#xff09;: 下载Kafka安装包&#xff0c;并解压至node01节点中的/opt/apps目录下。修改配置文件。在server.properties配置文件中指定broker编号、Kafka运行日志存放的路径、指定Zookeeper地址和本地IP。添加环境变量。在/etc/prof…

[ vulhub漏洞复现篇 ] GhostScript 沙箱绕过(任意命令执行)漏洞CVE-2018-19475

&#x1f36c; 博主介绍 &#x1f468;‍&#x1f393; 博主介绍&#xff1a;大家好&#xff0c;我是 _PowerShell &#xff0c;很高兴认识大家~ ✨主攻领域&#xff1a;【渗透领域】【数据通信】 【通讯安全】 【web安全】【面试分析】 &#x1f389;点赞➕评论➕收藏 养成习…

【IDEA】# 快速生成logger、通过Maven的profile配置实现环境的快速切换、常用基础设置

1. 快速生成logger 打开 Settings&#xff0c;找到 Editor 目录下的 Live Templates 选中 Java&#xff0c;点击右侧的加号&#xff0c;创建一个新的模板 在创建模板的相关位置&#xff0c;填上对应的值 Abbreviation&#xff1a;触发的关键字&#xff08;此处我使用的是 l…

Postman进阶篇(十二)-在脚本中使用pm对象访问接口响应数据(pm.response.*)

在之前的文章中介绍过postman中的两个脚本——pre-request script或test script&#xff0c;在这两个脚本中都有使用到pm对象。&#xff08;pre-request script详细介绍、Test script详细介绍&#xff09;pm对象是在postman的脚本中非常重要&#xff0c;也是十分常用的方法。本…

SpringCloud学习笔记 - Nacos配置中心搭建 - Nacos Config

Nacos 提供用于存储配置和其他元数据的 key/value 存储&#xff0c;为分布式系统中的外部化配置提供服务器端和客户端支持。使用 Spring Cloud Alibaba Nacos Config&#xff0c;您可以在 Nacos Server 集中管理你 Spring Cloud 应用的外部属性配置。 Spring Cloud Alibaba Nac…

Volo - Rust gRPC 框架入门

一、参考资料 Volo-GitHub Volo-Overview 二、开发环境搭建 1、安装脚手架 # 安装 volo-cli cargo install volo-cli # 验证安装 volo help 2、编写 IDL # 文件 volo_demo.protosyntax "proto3"; package volo.demo;message Item {int64 id 1;string title …

React学习26(react-redux优化 工作使用)

项目结构 优化说明 1&#xff09;容器组件和UI组件混合成一个文件 2&#xff09;无需自己给容器传递store&#xff0c;在index.js入口文件给包裹一个Provider <Provider store {store}><App/> </Provider> 3&#xff09;使用了react-redux后也不用自己在…

Python入门教程:基本运算符

1.运算符 计算机可以进行的运算有很多种&#xff0c;可不只加减乘除这么简单&#xff0c;运算按种类可分为算数运算、比较运算、逻辑运算、赋值运算、成员运算、身份运算、位运算&#xff0c;今天我们暂只学习算数运算、比较运算、逻辑运算、赋值运算、成员运算 2.算数运算 …

数据聚合、数据同步

文章目录数据聚合Bucket聚合语法聚合结果排序限定聚合范围Metric聚合语法RestAPI实现聚合数据同步发送MQ消息接收MQ消息数据聚合 Bucket聚合语法 GET /hotel/_search {"size": 0, // 设置size为0&#xff0c;结果中 不包含文档&#xff0c;只包含聚合结果~"…

Spark-内核(集群管理器、通讯架构、任务调度机制、Shuffle、内存管理)

文章目录Spark内核Spark部署模式的集群管理器YARN模式运行机制Standalone模式运行机制Spark通讯架构通信架构概述通讯架构解析Spark任务调度机制任务调度概述Stage级调度Spark Task级调度调度策略本地化调度失败重试与黑名单机制Spark Shuffle解析ShuffleMapStage与ResultStage…

SpringCloud微服务之Zuul网关

SpringCloud微服务之Zuul网关 家庭生活中经常有这样的感悟&#xff0c;家中的财政大权在老婆手里&#xff0c;想要花个小钱买个冰棍&#xff0c;得跟老婆请示&#xff0c;想要出个远门看看北京猿人&#xff0c;得跟老婆请示&#xff0c;想不要脸面去个夜店看看别的妞好在哪里&…

代码随想录第九天

专题&#xff1a;字符串 题目&#xff1a;字符串的左旋转操作是把字符串前面的若干个字符转移到字符串的尾部。请定义一个函数实现字符串左旋转操作的功能。 比如&#xff0c;输入字符串"abcdefg"和数字2&#xff0c;该函数将返回左旋转两位得到的结果"cdefgab&…

ADI Blackfin DSP处理器-BF533的开发详解56:CVBS输入-DSP和ADV7180的MDMA用法(含源码)

硬件准备 ADSP-EDU-BF533&#xff1a;BF533开发板 AD-HP530ICE&#xff1a;ADI DSP仿真器 软件准备 Visual DSP软件 硬件链接 CVBS IN视频输出 代码实现功能 电视视频是奇场合偶场交替传输的&#xff0c;所以通过 CVBSIN 模块采集到的图像如上实验所看到的&#xff0c;是…

CentOS 7.6 安装与配置 MySql 5.7.40

1 通过wget下载MySql的rpm # wget https://dev.mysql.com/get/mysql57-community-release-el7-9.noarch.rpm2 检查是否存在MySql的repo # cd /etc/yum.repos.d/ # ls CentOS-Base.repo CentOS-Epel.repo可以看到没有MySql的repo 3 安装MySql的repo&#xff0c;并查看是否安…

React 生命周期

React 生命周期 这篇文章&#xff0c;我们来聊一聊在React中的生命周期。首先我们明确一点&#xff0c;在React中&#xff0c;函数式组件是没有生命周期的。谈到生命周期&#xff0c;都是关于类组件的&#xff01; 生命周期官方网址 React.Component – React (docschina.or…

macOS Monterey 12.6.2 (21G320) Boot ISO 原版可引导镜像

macOS Monterey 12.6&#xff0c;皆为安全更新&#xff0c;不再赘述。 macOS Monterey 12.6&#xff0c;发布于 2022 年 9 月 12 日&#xff08;北京时间今日凌晨&#xff09;&#xff0c;本次为安全更新。 今日&#xff08;2022-07-21&#xff09;凌晨&#xff0c;Apple 终于…

CentOS 8:SSH远程登录

SSH远程登录 SSH远程登录&#xff0c;也是 C / S 模式 服务端&#xff1a;sshd &#xff0c;默认是启动的 systemctl status sshd systemctl start sshd SecureCRT 是打开了一个远程终端 注意&#xff1a;在终端环境里&#xff0c;不可以启动GUI程序 例如&#xff0c;在 …