kafka 快速上手

news2025/1/12 23:09:53

 下载 Apache Kafka

  演示window 安装

   编写启动脚本,脚本的路径根据自己实际的来

启动说明

先启动zookeeper后启动kafka,关闭是先关kafka,然后关闭zookeeper

巧记: 铲屎官(zookeeper)总是第一个到,最后一个走

启动zookeeper

call bin/windows/zookeeper-server-start.bat config/zookeeper.properties

启动kafka  

call bin/windows/kafka-server-start.bat config/server.properties

 测试脚本,主要用于创建主题 ‘test-topic’

# 创建主题(窗口1)
bin/window> kafka-topics.bat --bootstrap-server localhost:9092 --topic test-topic  --create

# 查看主题
bin/window> kafka-topics.bat --bootstrap-server localhost:9092 --list
bin/window> kafka-topics.bat --bootstrap-server localhost:9092 --topic test-topic --describe

# 修改某主题的分区
bin/window> kafka-topics.bat --bootstrap-server localhost:9092 --topic test-topic --alter --partitions 2


# 生产消息(窗口2)向test-topic主题发送消息
bin/window> kafka-console-producer.bat --bootstrap-server localhost:9092 --topic test-topic
>hello kafka

# 消费消息(窗口3)消费test-topic主题的消息
bin/window> kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test-topic
package com.ldj.kafka.admin;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

/**
 * User: ldj
 * Date: 2024/6/13
 * Time: 0:00
 * Description: 创建主题
 */
public class AdminTopic {

    public static void main(String[] args) {
        Map<String, Object> adminConfigMap = new HashMap<>();
        adminConfigMap.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        NewTopic topic1 = new NewTopic("topic-01", 1, (short) 1);
        NewTopic topic2 = new NewTopic("topic-02", 2, (short) 1);

        AdminClient adminClient = AdminClient.create(adminConfigMap);
        CreateTopicsResult addResult = adminClient.createTopics(Arrays.asList(topic1, topic2));

        //DeleteTopicsResult delResult = adminClient.deleteTopics(Arrays.asList("topic-02"));
        
        adminClient.close();
    }

}
package com.ldj.kafka.producer;

import com.alibaba.fastjson.JSON;
import com.ldj.kafka.model.UserEntity;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

/**
 * User: ldj
 * Date: 2024/6/12
 * Time: 21:08
 * Description: 生产者
 */
public class KfkProducer {

    public static void main(String[] args) {

        //生产者配置
        Map<String, Object> producerConfigMap = new HashMap<>();
        producerConfigMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        producerConfigMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        producerConfigMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        //创建生产者
        KafkaProducer<String, String> producer = new KafkaProducer<>(producerConfigMap);

        //构建消息 ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers)
        try {
            for (int i = 0; i < 10; i++) {
                UserEntity userEntity = new UserEntity()
                        .setUserId(2436687942335620L + i)
                        .setUsername("lisi")
                        .setGender(1)
                        .setAge(18);

                ProducerRecord<String, String> record = new ProducerRecord<>(
                        "test-topic",
                        userEntity.getUserId().toString(),
                        JSON.toJSONString(userEntity));
                //发送数据到Broker

                producer.send(record, (RecordMetadata var1, Exception var2) -> {
                    if (Objects.isNull(var2)) {
                        System.out.printf("[%s]消息发送成功!", userEntity.getUserId());
                    } else {
                        System.out.printf("[%s]消息发送失败!err:%s", userEntity.getUserId(), var2.getCause());
                    }
                });
            }
        } finally {
            //关闭通道
            producer.close();
        }
    }

}
package com.ldj.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

/**
 * User: ldj
 * Date: 2024/6/12
 * Time: 21:10
 * Description: 消费者
 */
public class KfkConsumer {

    public static void main(String[] args) {

        //消费者配置
        Map<String, Object> consumerConfigMap = new HashMap<>();
        consumerConfigMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        consumerConfigMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        consumerConfigMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        //所属消费组
        consumerConfigMap.put(ConsumerConfig.GROUP_ID_CONFIG, "test123456");

        //创建消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerConfigMap);

        //消费主题的消息  ConsumerRebalanceListener
        consumer.subscribe(Collections.singletonList("test-topic"));

        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
                //数据存储结构:Map<TopicPartition, List<ConsumerRecord<K, V>>> records;
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println(record.value());
                }
            }
        } finally {
            //关闭消费者
            consumer.close();
        }
    }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1817340.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

虚拟声卡实现音频回环

虚拟声卡实现音频回环 一、电脑扬声器播放声音路由到麦克风1. Voicemeeters安装设置2. 音频设备选择 二、回声模拟 一、电脑扬声器播放声音路由到麦克风 1. Voicemeeters安装设置 2. 音频设备选择 以腾讯会议为例 二、回声模拟 选中物理输入设备“Stereo Input 1”和物理输出设…

浅谈内联钩取原理与实现

前言 导入地址表钩取的方法容易实现但是存在缺陷&#xff0c;若需要钩取的函数不存在导入地址表中&#xff0c;那么我们就无法进行钩取&#xff0c;出现以下几种情况时&#xff0c;导入函数是不会存储在导入地址表中的。 延迟加载&#xff1a;当导入函数还没调用时&#xff0…

Rust 实战丨通过实现 json! 掌握声明宏

在 Rust 编程语言中&#xff0c;宏是一种强大的工具&#xff0c;可以用于在编译时生成代码。json! 是一个在 Rust 中广泛使用的宏&#xff0c;它允许我们在 Rust 代码中方便地创建 JSON 数据。 声明宏&#xff08;declarative macros&#xff09;是 Rust 中的一种宏&#xff0…

debug调试_以Pycharm为例

文章目录 作用步骤打断点调试调试窗口 作用 主要是检查逻辑错误&#xff0c;而非语法错误。 步骤 打断点 在需要调试的代码行前打断点&#xff0c;执行后会停顿在断点位置&#xff08;不运行&#xff09; 调试 右键“debug”&#xff0c;或者直接点击右上角的小虫子 调试…

2-2 基于matlab的变邻域

基于matlab的变邻域&#xff0c;含变惯性权重策略的自适应离散粒子群算法&#xff0c;适应函数是多式联运路径优化距离。有10城市、30城市、75城市三个案例。可直接运行。 2-2 路径规划 自适应离散粒子群算法 - 小红书 (xiaohongshu.com)

Vue基本使用-02

上节我们讲了什么是mvvm模型&#xff0c;以及我们vue的一些常用指令&#xff0c;今天给大家讲一下vue的基本使用&#xff0c;在将之前我们需要重点讲解我们的一个指令&#xff0c;v-model指令 v-model v-model 可以在组件上使用以实现双向绑定,什么是双向绑定呢?意思就是当我们…

【Ubuntu双系统】两块硬盘分别安装系统,一块硬盘安装Ubuntu 一块安装Windows

【Ubuntu双系统】两块硬盘分别安装双系统&#xff0c;一块硬盘安装Ubuntu 一块安装Windows 前言安装Ubuntu前置操作安装过程参考文献 前言 机器情况&#xff1a;两块1T的硬盘&#xff0c;其中一块已安装Windows 11现需在另一块硬盘上安装Ubuntu&#xff0c;该硬盘还未初始化Ub…

SQL聚合函数---汇总数据

此篇文章内容均来自与mysql必知必会教材&#xff0c;后期有衍生会继续更新、补充知识体系结构 文章目录 SQL聚集函数表&#xff1a;AGV()count()根据需求可以进行组合处理 max()min()max&#xff08;&#xff09;、min&#xff08;&#xff09;、avg&#xff08;&#xff09;组…

Mac下载了docker,在终端使用docker命令时用不了

问题&#xff1a;在mac使用docker的时候&#xff0c;拉取docker镜像失败 原因&#xff1a;docker是需要用app使用的 &#xff0c;所以在使用的时候必须打开这个桌面端软件才可以在终端上使用docker命令&#xff01;&#xff01;&#xff01;

【PL理论】(21) 函数式语言:支持匿名函数 fun x → E | 设计递归函数 | 支持递归函数:let rec ...

&#x1f4ad; 写在前面&#xff1a;本章我们将讲解支持匿名函数&#xff0c;先回顾一下 F# 语言表示函数的方法&#xff0c;然后引出它。随后我们讲解一下如何设计递归函数&#xff0c;最后让我们的 F- 语言支持递归函数。 目录 0x00 回顾&#xff1a;F# 语言 0x01 支持匿名…

深度学习笔记: 最详尽Airbnb租赁搜索排名设计

欢迎收藏Star我的Machine Learning Blog:https://github.com/purepisces/Wenqing-Machine_Learning_Blog。如果收藏star, 有问题可以随时与我交流, 谢谢大家&#xff01; Airbnb租赁搜索排名 1. 问题陈述 Airbnb用户在特定地点搜索可用房源。系统应在搜索结果中对多个房源进…

Qt飞机大战小游戏

Gitee地址 &#xff1a;plane-game: 基于Qt的飞机大战小游戏 GitHub地址&#xff1a; https://github.com/a-mo-xi-wei/plane-game

Vue25-内置指令02:v-text指令

一、v-html对比v-text v-html支持结构的解析&#xff0c;v-text不支持结构的解析。 二、v-html的安全性问题 2-1、cookie的原理&#xff08;node.js&#xff09; 7天免登录&#xff0c;cookie实现。 cookie的本质就是类似于json的字符串&#xff0c;格式是&#xff1a;key-va…

图片导入AutoCAD建立草图—CAD图像导入插件

插件介绍 CAD图像导入插件可将PNG&#xff0c;JPG等格式图片导入到AutoCAD软件内建立图像边缘的二维线条模型。插件可以提取图像黑色或白色区域的边界&#xff0c;并可绘制原状边界或平滑边界两种样式。 模型说明 边界提取&#xff0c;黑色或白色边界的提取根据原图类型选择…

【云原生| K8S系列】Kubernetes Daemonset,全面指南

Kubernetes中的DaemonSet是什么? Kubernetes是一个分布式系统&#xff0c;Kubernetes平台管理员应该有一些功能可以在所有节点上运行特定于平台的应用程序。例如&#xff0c;在所有Kubernetes节点上运行日志代理。 这就是Daemonset发挥作用的地方。 Daemonset是一个原生的K…

查询满足条件的元组-WHRER子句(运算符、BETWEEN 、LIKE、IN、NULL)

一、WHERE子句&#xff08;筛选出使选择表达式为真的元组&#xff09; 1、SELECT-FROM子句可以实现数据的查询&#xff08;会查询出所有元组&#xff09;&#xff0c;加上WHERE子句之后可以实现数据的筛选&#xff08;会查询出满足条件的元组&#xff09; SELECT 【ALL|DISTI…

windows 下 基于 WSL2安装DeepSpares进行YOLOV8 v5 的加速推理

文章大纲 简介软硬件限制安装安装 WSL2 基础环境WSL2 手动安装安装 miniconda 环境本地USB 摄像头使用:Windows 无延迟视频流本地USB 摄像头使用:WSL2 挂载 本地 USB 摄像头WSL2更新报错: 离线安装 wsl --update安装 DeepSpares测试打开本地USB 摄像头进行测试测试结果参考文…

50.Python-web框架-Django中引入静态的bootstrap样式

目录 Bootstrap 官网 特性 下载 在线样例 Bootstrap 入门 Bootstrap v5 中文文档 v5.3 | Bootstrap 中文网 在django中使用bootstrap 新建static\bootstrap5目录&#xff0c;解压后的Bootstrap文件&#xff0c;拷贝项目里就好。 在template文件里引用css文…

Nginx+KeepAlived高可用负载均衡集群的部署

目录 一.KeepAlived补充知识 1.一个合格的群集应该具备的特点 2.健康检查&#xff08;探针&#xff09;常用的工作方式 3.相关面试问题 问题1 问题2 二.Keepealived脑裂现象 1.现象 2.原因 硬件原因 运用配置原因 3.解决 4.预防 方法1 方法2 方法3 方法4 三.…

VUE之重定向redirect

VUE之路由和重定向redirect 这个小知识点是在学习做项目的时候遇到的一个问题&#xff0c;借鉴了一个他人的项目&#xff0c;是一个酒店管理系统&#xff0c;拿到源码之后导到我的vscode里。 参考链接 导的过程比较顺利&#xff0c;正常安装&#xff0c;加依赖&#xff0c;没有…