大数据-55 Kafka sh脚本使用 与 JavaAPI使用 topics.sh producer.sh consumer.sh kafka-clients

news2024/11/27 3:52:07

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka (正在更新…)

章节内容

上节我们完成了:

  • Kafka介绍
  • ZK的基本环境
  • Kafka下载解压配置
  • Kafka启动配置
  • Kafka启动服务

在这里插入图片描述

Kafka启动

上节我们通过sh脚本启动,但是当我们的SSH关闭的时候,Kafka服务也退出。
这里我们可以使用 Kakfa 的守护进程的方式启动,就可以在后台运行了。

kafka-server-start.sh -daemon /opt/servers/kafka_2.12-2.7.2/config/server.properties

启动之后,我们可以通过 ps 工具看到:

ps aux | grep kafka

返回结果如下图:
在这里插入图片描述

sh脚本使用

topics.sh

kakfa-topics.sh 用于管理主题

查看所有

kafka-topics.sh --list --zookeeper h121.wzk.icu:2181

当前执行返回的是空的,因为我们没有任何主题。

创建主题

kafka-topics.sh --zookeeper h121.wzk.icu:2181 --create --topic wzk_topic_1 --partitions 1 --replication-factor 1

执行结果中,我们可以观察到,已经顺利的完成了。
在这里插入图片描述

查看主题

kafka-topics.sh --zookeeper h121.wzk.icu:2181 --describe --topic wzk_topic_1

执行结果中,我们可以观察到,已经顺利的完成了。
在这里插入图片描述

删除主题

kafka-topics.sh --zookeeper h121.wzk.icu:2181 --delete --topic wzk_topic_1

在这里插入图片描述

新建主题(用于测试)

kafka-topics.sh --zookeeper h121.wzk.icu:2181 --create --topic wzk_topic_test --partitions 1 --replication-factor 1

producer.sh

kafka-console-producer.sh 用于生产消息

生成数据

kafka-console-producer.sh --topic wzk_topic_test --broker-list h121.wzk.icu:9092

手动生成一批数据来进行测试:
在这里插入图片描述

consumer.sh

kafka-console-consumer.sh 用于消费消息

消费数据

kafka-console-consumer.sh --bootstrap-server h121.wzk.icu:9092 --topic wzk_topic_test

此时,我们需要再开启一个 Producer 产生数据,它才会继续消费。

从头消费

kafka-console-consumer.sh --bootstrap-server h121.wzk.icu:9092 --topic wzk_topic_test --from-beginning

从头开始消费的话,我们可以看到消费者已经把刚才我们写入的数据都消费了
在这里插入图片描述

Java API

架构图

在这里插入图片描述

POM

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.7.2</version>
</dependency>

生产者1测试


public class TestProducer01 {

    public static void main(String[] args) throws Exception {
        Map<String, Object> configs = new HashMap<>();
        configs.put("bootstrap.servers", "h121.wzk.icu:9092");
        configs.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
        configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        configs.put("acks", "1");
        KafkaProducer<Integer, String> producer = new KafkaProducer<>(configs);
        ProducerRecord<Integer, String> record = new ProducerRecord<>(
                "wzk_topic_test",
                0, 0,
                "hello world by java!"
        );
        Future<RecordMetadata> future = producer.send(record);
        future.get(3_000, TimeUnit.SECONDS);
        producer.close();
    }

}

生产者1运行

  2024-07-12 11:53:11,542 INFO [org.apache.kafka.clients.producer.ProducerConfig] - ProducerConfig values: 
  	acks = 1
  	batch.size = 16384
  	bootstrap.servers = [h121.wzk.icu:9092]
  	buffer.memory = 33554432
  	client.dns.lookup = use_all_dns_ips
  	client.id = producer-1
  	compression.type = none
  	connections.max.idle.ms = 540000
  	delivery.timeout.ms = 120000
  	enable.idempotence = false
  	interceptor.classes = []
  	internal.auto.downgrade.txn.commit = false
  	key.serializer = class org.apache.kafka.common.serialization.IntegerSerializer
  	linger.ms = 0
  	max.block.ms = 60000
  	max.in.flight.requests.per.connection = 5
  	max.request.size = 1048576

运行结果如下图:
在这里插入图片描述

生产者2测试

public class TestProducer02 {

    public static void main(String[] args) throws Exception {
        Map<String, Object> configs = new HashMap<>();
        configs.put("bootstrap.servers", "h121.wzk.icu:9092");
        configs.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
        configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        configs.put("acks", "1");
        KafkaProducer<Integer, String> producer = new KafkaProducer<>(configs);
        ProducerRecord<Integer, String> record = new ProducerRecord<>(
                "wzk_topic_test",
                0, 0,
                "hello world by java! CallBack test!"
        );
        producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                if (e != null) {
                    System.out.println(
                            "主题: " + recordMetadata.topic() + ", " +
                                    "分区: " + recordMetadata.partition() + ", " +
                                    "时间戳: " + recordMetadata.timestamp()
                    );
                } else {
                    System.out.println("生产消息异常!!!");
                }
            }
        });
        producer.close();
    }

}

运行之后,控制台输出:

2024-07-12 12:46:48,795 INFO [org.apache.kafka.common.utils.AppInfoParser] - Kafka version: 2.7.2
2024-07-12 12:46:48,795 INFO [org.apache.kafka.common.utils.AppInfoParser] - Kafka commitId: 37a1cc36bf4d76f3
2024-07-12 12:46:48,795 INFO [org.apache.kafka.common.utils.AppInfoParser] - Kafka startTimeMs: 1720759608792
2024-07-12 12:46:49,200 INFO [org.apache.kafka.clients.Metadata] - [Producer clientId=producer-1] Cluster ID: DGjwPmfLSk2OKosFFLZJpg
2024-07-12 12:46:49,209 INFO [org.apache.kafka.clients.producer.KafkaProducer] - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
主题: wzk_topic_test, 分区: 0, 时间戳: 1720759609201
2024-07-12 12:46:49,282 INFO [org.apache.kafka.common.metrics.Metrics] - Metrics scheduler closed
2024-07-12 12:46:49,282 INFO [org.apache.kafka.common.metrics.Metrics] - Closing reporter org.apache.kafka.common.metrics.JmxReporter
2024-07-12 12:46:49,282 INFO [org.apache.kafka.common.metrics.Metrics] - Metrics reporters closed
2024-07-12 12:46:49,283 INFO [org.apache.kafka.common.utils.AppInfoParser] - App info kafka.producer for producer-1 unregistered

运行的之后的控制台如下:
在这里插入图片描述

消费者01运行


public class TestConsumer01 {

    public static void main(String[] args) throws Exception {
        Map<String, Object> configs = new HashMap<>();
        configs.put("bootstrap.servers", "h121.wzk.icu:9092");
        configs.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
        configs.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        configs.put("group.id", "wzk-test");

        KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(configs);

        final List<String> topics = Arrays.asList("wzk_topic_test");
        consumer.subscribe(topics, new ConsumerRebalanceListener() {
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                collection.forEach(item -> {
                    System.out.println("剥夺的分区: " + item.partition());
                });
            }

            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                collection.forEach(item -> {
                    System.out.println("接收的分区: " + item.partition());
                });
            }
        });

        final ConsumerRecords<Integer, String> records = consumer.poll(3_000);
        final Iterable<ConsumerRecord<Integer, String>> topic1Iterable = records.records("topic_1");
        topic1Iterable.forEach(record -> {
            System.out.println("消息头字段:" + Arrays.toString(record.headers().toArray()));
            System.out.println("消息的key:" + record.key());
            System.out.println("消息的偏移量:" + record.offset());
            System.out.println("消息的分区号:" + record.partition());
            System.out.println("消息的序列化key字节数:" + record.serializedKeySize());
            System.out.println("消息的序列化value字节数:" + record.serializedValueSize());
            System.out.println("消息的时间戳:" + record.timestamp());
            System.out.println("消息的时间戳类型:" + record.timestampType());
            System.out.println("消息的主题:" + record.topic());
            System.out.println("消息的值:" + record.value());
        });

        consumer.close();
    }

}

消费者01测试

2024-07-12 13:00:17,456 INFO [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] - [Consumer clientId=consumer-wzk-test-1, groupId=wzk-test] Adding newly assigned partitions: wzk_topic_test-0
接收的分区: 0
2024-07-12 13:00:17,480 INFO [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] - [Consumer clientId=consumer-wzk-test-1, groupId=wzk-test] Setting offset for partition wzk_topic_test-0 to the committed offset FetchPosition{offset=12, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[h121.wzk.icu:9092 (id: 0 rack: null)], epoch=0}}
消息头字段:[]
消息的key:0
消息的偏移量:12
消息的分区号:0
消息的序列化key字节数:4
消息的序列化value字节数:20
消息的时间戳:1720760404260
消息的时间戳类型:CreateTime
消息的主题:wzk_topic_test
消息的值:hello world by java!

控制台运行截图如下:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1962078.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

WireShark看tcp网速

1、过滤对应的tcp流 2、统计->TCP流图形->窗口尺寸 3、"接收通过窗口值/时间" 可以得到tcp的接收速度

强化学习时序差分算法之Sarsa算法——以悬崖漫步环境为例

1.导入必要的库环境&#xff0c;代码如下所示。 import matplotlib.pyplot as plt import numpy as np from tqdm import tqdm 2.本悬崖漫步环境中无需提供奖励函数以及状态转移函数&#xff0c;而需提供一个与智能体进行交互的step()函数&#xff0c;该函数输入为智能体当前…

Python time模块格式化时间的N种技巧

文末赠免费精品编程资料~~ 是不是经常对着电脑屏幕上的日期时间发呆&#xff0c;心想&#xff1a;“要是能随心所欲地格式化这些数字就好了。”今天&#xff0c;我们就一起探索Python中的时间宝藏——time模块&#xff0c;让你轻松玩转时间显示&#xff0c;从新手进阶为时间格…

AI算力的新时代:智算中心的挑战与创新

随着AI的发展&#xff0c;作为AI三要素算法、数据、算力中的基础设施——算力首先迎来了高速的发展。智算中心作为AI时代承载算力的关键基础设施&#xff0c;在政策、市场的双重驱动下进入了高速建设周期&#xff0c;其在推动数字经济发展和技术进步方面发挥着重要作用&#xf…

【Gin】深度解析:在Gin框架中优化应用程序流程的责任链设计模式(下)

【Gin】深度解析&#xff1a;在Gin框架中优化应用程序流程的责任链设计模式(下) 大家好 我是寸铁&#x1f44a; 【Gin】深度解析&#xff1a;在Gin框架中优化应用程序流程的责任链设计模式(下)✨ 喜欢的小伙伴可以点点关注 &#x1f49d; 前言 本次文章分为上下两部分&#xf…

数学建模--微分方程

目录 常见的微分方程模型 微分方程建模的基本步骤 代码示例 常微分方程 ​编辑 ​编辑 偏微分方程 ​编辑 应用实例 结论 如何在数学建模中准确识别和选择合适的微分方程模型&#xff1f; 微分方程模型在解决实际问题中的应用案例有哪些&#xff1f; 常微分方程&a…

SpringBoot整合FFmpeg进行视频分片上传

SpringBoot整合FFmpeg进行视频分片上传------>Windows 分片上传的核心思路&#xff1a; 将文件按一定的分割规则&#xff08;静态或动态设定&#xff0c;如手动设置20M为一个分片&#xff09;&#xff0c;用slice分割成多个数据块。为每个文件生成一个唯一标识Key&#xf…

ONNX模型的量化

我们都希望从代码中榨取更多的性能&#xff0c;对吧&#xff1f; 在现代&#xff0c;充斥着需要大量计算资源的复杂机器学习算法&#xff0c;因此&#xff0c;榨取每一点性能至关重要。 传统上&#xff0c;机器学习算法是在具有支持大量并行计算能力的 GPU 上进行训练的。但是…

WordPress建站:如何使用ChemiCloud搭建外贸独立站

以前自行搭建一个网站&#xff0c;不懂一点技术那是很难完成的&#xff0c;现如今WordPress的出现极大地降低了搭建网站的技术门槛&#xff0c;不需要懂任何代码&#xff0c;只需按步骤操作就行。WordPress 是一个非常流行的开源内容管理系统&#xff08;CMS&#xff09;&#…

职业教育计算机网络综合实验实训室建设应用案例

近年来&#xff0c;职业教育在培养技能型人才方面发挥着越来越重要的作用。然而&#xff0c;传统的计算机网络技术教学模式往往重理论、轻实践&#xff0c;导致学生缺乏实际操作能力和职业竞争力。为了改变这一现状&#xff0c;唯众结合职业教育特点&#xff0c;提出了“教、学…

Kubeflow v1.7.0 创建新用户

文章目录 为新用户创建配置文件配置用户密码重启auth生效 为新用户创建配置文件 apiVersion: kubeflow.org/v1beta1 kind: Profile metadata:name: kubeflow-cyw-example-com # replace with the name of profile you want, this will be users namespace name spec:owner:k…

STC单片机UART映射printf

文章目录 使用STC-ISP生成UART初始化函数 增加如下函数&#xff0c;注意使用printf函数需要添加 #include <stdio.h> 头文件 #include <stdio.h>void Uart1_Init(void) //9600bps12.000MHz {SCON 0x50; //8位数据,可变波特率AUXR | 0x01; //串口1选择定时器2为…

【Spring】——Spring概述、IOC、IOC创建对象的方式、Spring配置、依赖注入(DI)以及自动装配知识

&#x1f3bc;个人主页&#xff1a;【Y小夜】 &#x1f60e;作者简介&#xff1a;一位双非学校的大二学生&#xff0c;编程爱好者&#xff0c; 专注于基础和实战分享&#xff0c;欢迎私信咨询&#xff01; &#x1f386;入门专栏&#xff1a;&#x1f387;【MySQL&#xff0…

LeetCode 101.对称二叉树 C写法

LeetCode 101.对称二叉树 C写法 思路&#xff1a; 将该树一分为二&#xff0c;左子树的左边与右子树的右边比&#xff0c;左子树的右边与右子树的左边比&#xff0c;不相等或者一边为空则不是对称。 代码&#x1f50e;&#xff1a; bool _isSymmetric(struct TreeNode* Leftroo…

程序员开发指南

在这个快节奏的时代&#xff0c;作为一名程序员&#xff0c;大家都希望能更快地开发出高质量的应用&#xff0c;而不是花费大量时间在基础设施和后台服务的搭建上。今天&#xff0c;我要向大家介绍一款专为懒人开发者准备的一站式开发应用的神器——MemFire Cloud。 一站式开发…

使用代理访问内网:实验二

目录 环境搭建 内网搭建&#xff08;win2019&#xff09; 跳板机搭建&#xff08;win10&#xff09; 实验步骤 1. win10上线kali 2. 借助msf做代理 3. 在攻击机上做个代理&#xff0c;访问目标网站 4. 使用SocksCap64工具&#xff0c;进行sock4a隧道的连接 5. 启用soc…

TypeScript 的主要特点和重要作用

还是大剑师兰特&#xff1a;曾是美国某知名大学计算机专业研究生&#xff0c;现为航空航海领域高级前端工程师&#xff1b;CSDN知名博主&#xff0c;GIS领域优质创作者&#xff0c;深耕openlayers、leaflet、mapbox、cesium&#xff0c;canvas&#xff0c;webgl&#xff0c;ech…

最短路(dijkstra迪杰斯特拉)

最短路径问题在图论中是一个经典的问题&#xff0c;目的是找到从一个起始顶点到其他所有顶点的最短路径。Dijkstra算法是解决非负权图最短路径问题的常用算法。下面是一个使用Dijkstra算法解决最短路径问题的Java程序例子。 动画描述(从0节点开始更新) 问题描述 假设有一个图…

【机器学习西瓜书学习笔记——模型评估与选择】

机器学习西瓜书学习笔记【第二章】 第二章 模型评估与选择2.1训练误差和测试误差错误率误差 欠拟合和过拟合2.2评估方法留出法交叉验证法自助法 2.3性能度量查准率、查全率与F1查准率查全率F1 P-R曲线ROC与AUCROCAUC 代价敏感错误率与代价曲线代价曲线 2.4比较检验假设检验&…

VSCode+Vue3无法找到模块“../components/xxxxx.vue”的声明文件的错误

莫名奇妙的错误 今天用Vue3写个demo&#xff0c;在components下面新建了一个DeviceList.Vue的文件&#xff0c;在HomeView引用它后居然报错&#xff0c;提示&#xff1a;无法找到模块“…/components/DeviceList.vue”的声明文件&#xff0c;真是离了个大谱&#xff0c;文件明…