Kafka3.1部署和Topic主题数据生产与消费

news2025/1/9 1:09:40

文章目录

  • 前言
  • 一、Kafka3.1X版本在Windows11主机部署
  • 二、Kafk生产Topic主题数据
    • 1.kafka生产数据
    • 2.JAVA kafka客户端消费数据
  • 总结


前言

本章节主要讲述Kafka3.1X版本在Windows11主机下部署以及JAVA对Kafka应用:

一、Kafka3.1X版本在Windows11主机部署

1.安装JDK配置环境变量

2.Zookeeper(zookeeper-3.7.1)
zk
部署后的目录位置:D:\setup\apache-zookeeper-3.7.1

3.安装Kafka3.1X
3.1 下载包(kafka_2.12-3.1.2.tgz)
Kafka
在这里插入图片描述
3.2、 解压并进入Kafka目录:
根目录:D:\setup\kafka3.1.2

3、 编辑config/server.properties文件
注意 log.dirs=D:\setup\kafka3.1.2\logs 为根目录下的\logs

listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://localhost:9092
log.dirs=D:\\setup\\kafka3.1.2\\logs

4.运行Zookeeper
Zookeeper安装目录D:\setup\apache-zookeeper-3.7.1\bin,按下Shift+右键,选择“打开命令窗口”选项,打开命令行

  .\zkServer.cmd;

在这里插入图片描述
5.运行Kafka
Kafka安装目录D:\setup\kafka3.1.2,按下Shift+右键,选择“打开命令窗口”选项,打开命令行

.\bin\windows\kafka-server-start.bat .\config\server.properties

在这里插入图片描述

二、Kafk生产Topic主题数据

1.kafka生产数据

创建Topic主题heima

.\bin\windows\kafka-topics.bat --bootstrap-server localhost:9092 --create --topic heima --partitions 2 --replication-factor 1
Created topic heima.

查看Topic主题heima

.\bin\windows\kafka-topics.bat --describe --bootstrap-server localhost:9092  --topic heima

在这里插入图片描述
Topic主题heima生产数据

.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic heima

在 > 符号后输入数据:

{"mobilePhone":"186xxxx1234","roleCode":"super_admin_xxx"}

在这里插入图片描述

2.JAVA kafka客户端消费数据

2.1 pom.xml文件配置kafka客户端-kafka-clients-2.0.1版本

        <!-- kafka客户端 -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.0.1</version>
        </dependency>

2.2 JAVA数据读取文件

package com.ems.mgr.web.controller.thirdparty;
import com.alibaba.fastjson.JSONObject;
import com.ems.mgr.common.utils.spring.SpringUtils;
import com.ems.mgr.system.service.ISysUserService;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

/**
 * Kafka服务器操作与数据读取
 */
public class KafkaUtilDemo {
    public static final Logger log = LoggerFactory.getLogger(KafkaUtilDemo.class);
    public static final Properties props = new Properties();
//    protected ISysUserService userService = SpringUtils.getBean(ISysUserService.class);

    public static void init(String kafakservers) {
        // 配置Kafka消费者属性
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafakservers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    }

    /**
     * 持续监听并处理kafa消息,当手机号mobilePhone非空时进入数据同步操作
     * @param kafaktopic
     * @return
     */
    public static String poll(String kafaktopic) {
        String msg = "";
        try {
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Collections.singletonList(kafaktopic));
            log.info("Kafka消费者订阅指定主题,持续监听并处理消息");
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(60000));
                for (ConsumerRecord<String, String> record : records) {
                    log.info("offset = " + record.offset() + ",key = " + record.key() + ",value = " + record.value());
                    msg = record.value();
                    if (!StringUtils.isBlank(record.value())) {
                        JSONObject jsonObject = JSONObject.parseObject(record.value());
                        String mobilePhone = jsonObject.getString("mobilePhone");
                        if (StringUtils.isBlank(mobilePhone)) {
                            log.error("Kafka消费者手机号mobilePhone为空");
                        } else {
                            KafkaUtilDemo kafkaUtil = new KafkaUtilDemo();
                            kafkaUtil.syncSystemInfoTask(jsonObject);
                        }
                    }
                }
            }
        } catch (Exception e) {
            log.error("Kafka消费者订阅指定主题,持续监听并处理消息 error msg=" + e.getMessage());
        }
        return msg;
    }

    public boolean syncSystemInfoTask(JSONObject jsonObject) {
        boolean repsBln = true;
        try {
            String mobilePhone = jsonObject.getString("mobilePhone");
            String roleType = jsonObject.getString("roleType");
            String roleCode = jsonObject.getString("roleCode");
            log.info("业务数据同步操作................");
        } catch (Exception e) {
            repsBln = false;
            log.error("Kafka消费者同步入库异常,error msg=" + e.getMessage());
        }
        return repsBln;
    }

    public static void main(String[] args) {
        try {
            String kafakservers = "localhost:9092";
            String kafaktopic = "heima";
            init(kafakservers);
            poll(kafaktopic);
        } catch (Exception e) {
            log.error("error msg=" + e.getMessage());
        }
    }

}

3 执行KafkaUtilDemo 文件,查看消费数据。
在这里插入图片描述

总结

pom.xml文件在引入spring-kafka 会由于版本问题出现


org.apache.kafka
kafka-clients
2.0.1

    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        <version>2.2.8.RELEASE</version>
    </dependency>

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/989181.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Spring Boot 集成MQTT代码示例

文章目录 1. 简介使用场景 2. 搭建MQTT测试环境服务1. 先创建映射目录2. 创建两个文件2.1. mosquitto.conf 3. 启动 MQTT服务 Docker 容器3.1. 配置用户名和密码3.1.1. 创建密码文件3.1.2. 修改配置文件&#xff0c;追加密码文件 3.2. 重启mosquitto 容器服务 3. 编写测试程序3…

2023--9-8 高斯消元解线性方程组

题目链接&#xff1a;高斯消元解线性方程组 #include <iostream> #include <algorithm> #include <cmath>using namespace std;const int N 110; const double eps 1e-8;int n; double a[N][N];int gauss() {int c, r;for(c 0, r 0; c < n; c){// 找到…

(其他) 剑指 Offer 64. 求1+2+…+n ——【Leetcode每日一题】

❓ 剑指 Offer 64. 求12…n 难度&#xff1a;中等 求 12...n &#xff0c;要求不能使用乘除法、for、while、if、else、switch、case 等关键字及 条件判断语句&#xff08;A?B:C&#xff09;。 示例 1&#xff1a; 输入: n 3 输出: 6 示例 2&#xff1a; 输入: n 9 输出:…

LeetCode(力扣)491. 递增子序列Python

LeetCode491. 递增子序列 题目链接代码 题目链接 https://leetcode.cn/problems/non-decreasing-subsequences/ 代码 class Solution:def backtracking(self, nums, index, result, path):if len(path) > 1:result.append(path[:])uset set()for i in range(index, len…

小程序实现摄像头拍照 + 水印绘制

文章标题 01 功能说明02 使用方式 & 效果图2.1 基础用法2.2 拍照 底部定点水印 预览2.3 拍照 整体背景水印 预览 03 全部代码3.1 页面布局 html3.2 业务核心 js3.3 基础样式 css 01 功能说明 需求&#xff1a;小程序端需要调用前置摄像头进行拍照&#xff0c;并且将拍…

当面试官问你离职原因的时候怎么回答比较好?

所有的前提都是建立在有一定的物质基础&#xff0c;当你的一日三餐都成了问题&#xff0c;都需要家庭支持的时候我希望你可以找一份工作&#xff0c;靠自己的本事养活自己从来不丢人&#xff0c;我觉得死要面子活受罪才是真的让你看不起。 所有的建议都是建立在我们是普通打工人…

如何用Jmeter编写脚本压测?

随着商业业务不断扩张&#xff0c;调用adsearch服务频率越来越高&#xff0c;所以这次想做个压测&#xff0c;了解目前多少并发量可以到达adsearch服务的界值。 这次选用的jmeter压测工具&#xff0c;压测思路如图&#xff1a; 一、日志入参 日志选取的adsearch 的 getads部分…

电工-什么是电流

什么是电流&#xff1f;电流计算公式和单位换算及电流方向讲解 前面了解到电路的基本组成是包括&#xff1a;电能、负载、导线构成的&#xff0c;而这电路就是电流流通的路径&#xff0c;那么什么是电流呢&#xff1f;下面就讲讲电流形成的基本概念以及电流计算公式、单位和方…

2023-9-8 求组合数(二)

题目链接&#xff1a;求组合数 II #include <iostream> #include <algorithm>using namespace std;typedef long long LL; const int mod 1e9 7; const int N 100010;// 阶乘&#xff0c;阶乘的逆 int fact[N], infact[N];LL qmi(int a, int k, int p) {int res…

HTTPS 之fiddler抓包--jmeter请求

一、浅谈HTTPS 我们都知道HTTP并非是安全传输&#xff0c;在HTTPS基础上使用SSL协议进行加密构成的HTTPS协议是相对安全的。目前越来越多的企业选择使用HTTPS协议与用户进行通信&#xff0c;如百度、谷歌等。HTTPS在传输数据之前需要客户端&#xff08;浏览器&#xff09;与服…

selenium的Chrome116版驱动下载

这里写自定义目录标题 下载地址https://googlechromelabs.github.io/chrome-for-testing/#stable 选择chromedriver 对应的平台和版本 国内下载地址 https://download.csdn.net/download/dongtest/88314387

分享一个Python Django影片数据爬取与数据分析系统源码

&#x1f495;&#x1f495;作者&#xff1a;计算机源码社 &#x1f495;&#x1f495;个人简介&#xff1a;本人七年开发经验&#xff0c;擅长Java、Python、PHP、.NET、微信小程序、爬虫、大数据等&#xff0c;大家有这一块的问题可以一起交流&#xff01; &#x1f495;&…

数据结构——带头双向循环链表

数据结构——带头双向循环链表 一、带头双向循环链表的定义二、带头双向循环链表的实现2.1初始化创建带头双向循环链表的节点2.2申请新节点2.3节点的初始化2.4带头双向循环链表的尾插2.5带头双向循环链表的头插2.6判空函数2.7带头双向循环链表的打印函数2.8带头双向循环链表的尾…

计算机竞赛 基于深度学习的目标检测算法

文章目录 1 简介2 目标检测概念3 目标分类、定位、检测示例4 传统目标检测5 两类目标检测算法5.1 相关研究5.1.1 选择性搜索5.1.2 OverFeat 5.2 基于区域提名的方法5.2.1 R-CNN5.2.2 SPP-net5.2.3 Fast R-CNN 5.3 端到端的方法YOLOSSD 6 人体检测结果7 最后 1 简介 &#x1f5…

OpenCV 04(通道分离与合并 | 绘制图形)

一、通道的分离与合并 - split(mat)分割图像的通道 - merge((ch1,ch2, ch3)) 融合多个通道 import cv2 import numpy as npimg np.zeros((480, 640, 3), np.uint8)b,g,r cv2.split(img)b[10:100, 10:100] 255 g[10:100, 10:100] 255img2 cv2.merge((b, g, r))cv2.imshow…

《TCP/IP网络编程》阅读笔记--并发多进程服务端的使用

1--并发服务器端 并发服务器端主要有以下三类&#xff1a; ① 多进程服务器&#xff1a;通过创建多个进程提供服务&#xff1b; ② 多路复用服务器&#xff1a;通过捆绑并统一管理I/O对象提供服务&#xff1b; ③ 多线程服务器&#xff1a;通过生成与客户端等量的线程提供服务&…

C/C++ ——内存管理

前言 为什么要研究内存管理&#xff1f; (1)程序员写的程序可以分为动态和静态两种状态。静态&#xff1a;就是程序被存放在ROM中&#xff0c;也就是磁盘、固态硬盘、eMMC等存储介质&#xff1b;动态&#xff1a;程序被执行&#xff0c;此时程序在RAM内存中运行&#xff1b; (…

图床项目数据库表设计

一、表设计 share_picture_list 和 share_file_list 类似&#xff0c;只是 share_picture_list 只存储共享图片相关的信息&#xff0c;及分享给未注册用户看的。share_file_list 是存储共享文件&#xff08;包括图片文件&#xff09;相关的信息&#xff0c;分享给已注册用户的。…

【数据结构】 七大排序详解(贰)——冒泡排序、快速排序、归并排序

文章目录 ⚽冒泡排序⚾算法步骤&#x1f3a8;算法优化&#x1f94e;代码实现&#xff1a;&#x1f3c0;冒泡排序的特性总结 &#x1f9ed;快速排序⚽算法思路&#x1f4cc;思路一&#xff08;Hoare版&#xff09;&#x1f4cc;思路二&#xff08;挖坑法&#xff09;&#x1f4c…

PCL入门(四):kdtree简单介绍和使用

目录 1. kd树的意义2. kd树的使用 参考博客《欧式聚类&#xff08;KD-Tree&#xff09;详解&#xff0c;保姆级教程》和《(三分钟)学会kd-tree 激光SLAM点云搜索常见》 1. kd树的意义 kd树是什么&#xff1f; kd树是一种空间划分的数据结构&#xff0c;对于多个维度的数据&a…