Java开发 - 消息队列之Kafka初体验

news2024/12/23 16:16:34

目录

前言

Kafka

什么是Kafka

Kafka软件结构

Kafka的特点

怎么启动Kafka

下载Kafka

配置Kafka 

Zookeeper

启动Kafka

Kafka案例

添加依赖

添加配置

配置启动类

创建生产者

创建消费者

测试

结语


前言

前几日总结了消息队列的一些知识,相信看完的同学们对消息队列的功能和作用都多少有了些基础的了解,为了让大家更快的上手消息队列,今天将给大家带来一篇大数据中常用的Kafka的介绍和实战教程,学完此篇,你将了解什么是Kafka,怎么使用Kafka和怎么将Kafka应用到项目中。

Kafka

什么是Kafka

Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。

Kafka软件结构

Producer:生产者,消息发送方,消息来源;

Consumer:消费者,消息接收方;

Topic:话题,消息收发方根据话题才能找到对的那个人,不会乱发;

Record:消息记录,是生产者和消费者传递的内容,保存在指定Topic内。

Kafka的特点

前文提到,Kafka是针对大数据设计的,官方号称并发1w/s,运行时堪比内存,可见其功能还是很强大的。

实际运行中,Kafka将消息队列中的信息保存在硬盘,它堪比内存的效率是因为对硬盘的读取规则做了优化,这点很强。主要是:顺序读写,零拷贝,日志压缩等技术。详情推荐这篇博客:整理了一周的Kafka规划和优化方案

Kafka在处理队列中的数据时,能够一直向服务器硬盘中保存队列信息,理论上没有大小限制,除非服务器硬盘满了。默认信息保存7天,时间可配置,数据量比较大的情况一天一处理,可以有效提高服务器性能,减少Kafka消耗。

怎么启动Kafka

下载Kafka

下载地址:Kafka下载地址

下载后博主是放在一个叫Kafka的文件夹内,文件夹下放下载后解压的Kafka和一个用来存储Kafka运行过程中产生数据的data(空目录)文件:

配置Kafka 

进入 kafka_2.13-2.4.1/config文件,用记事本打开如下文件:

 将dataDir后面的路径修改成你自己的data文件夹地址:

保存,关闭。再打开server.properties文件:

 修改log.dirs后面的路径为你自己电脑上data的地址,和上面的地址是一样的:

做完这些记得保存啊,不保存啥用也没有。

Zookeeper

说起Zookeeper,其实我们上面的配置是配置的Zookeeper,但却和Kafka息息相关,想知道他们之间的关系吗?我们继续往下看。

ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。

上面的解释很官方,不是很好理解。通俗地说就是:早期的服务器软件安装后都有各自的配置,但是要安装的软件很多,配置多了就不好管理了,Zookeeper可以修改服务器系统中所有软件的配置,久而久之,很多软件就删除了自己写配置文件的功能,改从Zookeeper中获取。ZooKeeper就像一个动物园的饲养员,为什么说是动物园饲养员呢,你看MySql的标志是不是海豚,Tomcat是只猫,等等,你去看,这些软件都是动物标志,像个动物园一样。

启动Kafka

启动kafka要先启动zookeeper,mac电脑终端进入Kafka的bin文件目录下:

# 进入Kafka文件夹
cd Desktop/JAVATOOL/kafka/kafka_2.13-2.4.1/bin
# 启动Zookeeper服务
./zookeeper-server-start.sh ../config/zookeeper.properties 
# 启动Kafka服务
./kafka-server-start.sh ../config/server.properties 

zookeeper启动后终端输出: 

kafka启动后终端输出:

主要是看到这个started字样。

关闭Kafka命令: 

# 关闭Kafka服务
./kafka-server-stop.sh 
# 启动Zookeeper服务
./zookeeper-server-stop.sh

需要先关闭Kafka,再关闭zookeeper,否则Kafka会一直报断开连接的错。 

文件路径写你自己的,不要写博主的,两个服务启动用两个窗口来运行命令,运行期间窗口不要关闭。

Windows电脑需要进入到kafka/bin/windows目录下,接着输入以下命令:

启动zookeeper:

zookeeper-server-start.bat ..\..\config\zookeeper.properties

启动kafka: 

kafka-server-start.bat ..\..\config\server.properties

window关闭服务直接x掉窗口就可以。 

Kafka案例

我们在此前微服务项目的cart子项目中演示Kafka的消息收发,下面跟着博主一步步来做吧。

添加依赖

<dependency>
    <groupId>com.google.code.gson</groupId>
    <artifactId>gson</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

这里特别提醒下,没有看前面微服务项目的童鞋你需要添加的依赖会少一些,建议先去看看微服务内容,把微服务基础项目先搭建起来,后续的开发很多都会在此基础上进行。 传送门:Java开发 - 数风流人物,还看“微服务”

添加配置

spring:
  kafka:
    # 定义kafka的位置
    bootstrap-servers: localhost:9092
    # 话题的分组名称,是必须配置的
    # 为了区分当前项目和其他项目使用的,防止不同项目相同话题的干扰
    # 本质是在话题名称前添加项目名称为前缀来防止的
    consumer:
      group-id: cart

配置启动类

在启动类上添加两个注解:

// 启动kafka的功能
@EnableKafka
// 为了测试kafka,我们可以周期性的发送消息到消息队列
// 使用SpringBoot自带的调度工具即可
@EnableScheduling

  

创建生产者

在cart包下新建一个kafka包,包下建一个叫Producer的类:

package com.codingfire.cloud.cart.kafka;

import com.codingfire.cloud.commons.pojo.cart.entity.Cart;
import com.google.gson.Gson;
import org.apache.commons.lang.math.RandomUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

// 我们需要周期性的向Kafka发送消息
// 需要将具备SpringBoot调度功能的类保存到Spring容器才行
@Component
public class Producer {
    // 能够实现将消息发送到Kafka的对象
    // 只要Kafka配置正确,这个对象会自动保存到Spring容器中,我们直接装配即可
    // KafkaTemplate<[话题名称的类型],[传递消息的类型]>
    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;

    // 每隔10秒向Kafka发送信息
    int i=1;
    // fixedRate是周期运行,单位毫秒 10000ms就是10秒
    @Scheduled(fixedRate = 10000)
    // 这个方法只要启动SpringBoot项目就会按上面设置的时间运行
    public void sendMessage(){
        // 实例化一个Cart类型对象,用于发送消息
        Cart cart=new Cart();
        cart.setId(i++);
        cart.setCommodityCode("PC100");
        cart.setPrice(RandomUtils.nextInt(100)+200);
        cart.setCount(RandomUtils.nextInt(5)+1);
        cart.setUserId("UU100");
        // 将cart对象转换为json格式字符串
        Gson gson=new Gson();
        // 执行转换
        String json=gson.toJson(cart);
        System.out.println("本次发送的消息为:"+json);
        // 执行发送
        // send([话题名称],[发送的消息]),需要遵循上面kafkaTemplate声明的泛型类型
        kafkaTemplate.send("myCart",json);

    }
}

创建消费者

在kafka包下创建Consumer类:

package com.codingfire.cloud.cart.kafka;

import com.codingfire.cloud.commons.pojo.cart.entity.Cart;
import com.google.gson.Gson;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

// 因为Kafka接收消息是自动的,所以这个类也必须交由Spring容器管理0
@Component
public class Consumer {

    // SpringKafka框架接收Kafka中的消息使用监听机制
    // SpringKafka框架提供一个监听器,专门负责关注指定的话题名称
    // 只要该话题名称中有消息,会自动获取该消息,并调用下面方法
    @KafkaListener(topics = "myCart")
    // 上面注解和下面方法关联,方法的参数就是接收到的消息
    public void received(ConsumerRecord<String,String> record){
        // 方法参数类型必须是ConsumerRecord
        // ConsumerRecord<[话题名称类型],[消息类型]>
        // 获取消息内容
        String json=record.value();
        // 要想在java中使用,需要转换为java对象
        Gson gson=new Gson();
        // 将json转换为java对象,需要提供转换目标类型的反射
        Cart cart=gson.fromJson(json,Cart.class);
        System.out.println("接收到对象为:"+cart);
    }


}

测试

下面,我们启动项目,由于我们的项目配置了nacos和seata,所以这两个服务要保证是开启的。如果你是新起的项目可以不用。

项目运行后,根据我们的代码,我们应该是每10s能到一条消息输入和输出的,我们在控制台看一下:

说明我们的测试是成功的,我们已经完成了一个完整的消息发送和接收的过程。 

但是,这也只是Kafka一个最简单的使用案例,关于Kafka肯定不止是做这么一点点事情,关于Kafka更多的功能,博主也还在学习中,推荐一个网址给大家学习:kafka中文教程

结语

最后了,说点什么吧。只能说,Kafka博大精深,微服务深不可测,真不是三言两语,三篇两篇就能说得清楚的,这篇博客就如标题一样,只能算是初体验了,相较于es和redis来说,这篇博客只能算是皮毛,要学的东西还有很多很多,最重要的是实战。

最后要说的是专注,学习不要好高骛远,专注于一个点去学习,光是微服务中任何一个辅助软件都足够研究很久了,先学会用,其次再深入学习,关于更多Kafka的实战应用,后期博主吃透后会给大家详细剖析,一起努力吧。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/404440.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【博客631】监控网卡与进程网络IO使用情况

监控进程的网络IO使用情况 1、vnstat 由于 vnstat 依赖于内核提供的信息&#xff0c;因此执行以下命令来验证内核是否提供了 vnStat 所期望的所有信息&#xff1a; # vnstat --testkernel This test will take about 60 seconds. Everything is ok.不带任何参数的 vnstat 将…

设计模式(十九)----行为型模式之命令模式

1、概述 日常生活中&#xff0c;我们出去吃饭都会遇到下面的场景。 定义&#xff1a; 将一个请求封装为一个对象&#xff0c;使发出请求的责任和执行请求的责任分割开。这样两者之间通过命令对象进行沟通&#xff0c;这样方便将命令对象进行存储、传递、调用、增加与管理。命…

UE官方教程笔记03-功能、术语、操作简介

对官方教程视频[官方培训]03-UE功能、术语、操作简介 | 徐良安 Epic的笔记这一部分基本都是走马观花的简单介绍功能世界创建建模Mesh editingtool是一个全新的建模工具&#xff0c;具备大多数的主流建模软件的核心功能HOUDINI ENGINE FOR UNREALHoudini编辑器&#xff0c;可以用…

springboot2集成knife4j

springboot2集成knife4j springboot2集成knife4j 环境说明集成knife4j 第一步&#xff1a;引入依赖第二步&#xff1a;编写配置类第三步&#xff1a;测试一下 第一小步&#xff1a;编写controller第二小步&#xff1a;启动项目&#xff0c;访问api文档 相关资料 环境说明 …

C++回顾(二十一)—— list容器

21.1 list概述 list是一个双向链表容器&#xff0c;可高效地进行插入删除元素。list不可以随机存取元素&#xff0c;所以不支持at.(pos)函数与[]操作符。It(ok) it5(err)需要添加头文件&#xff1a;#include <list> 21.2 list构造 &#xff08;1&#xff09;默认构造…

摘花生(简单DP)

Hello Kitty想摘点花生送给她喜欢的米老鼠。她来到一片有网格状道路的矩形花生地(如下图)&#xff0c;从西北角进去&#xff0c;东南角出来。地里每个道路的交叉点上都有种着一株花生苗&#xff0c;上面有若干颗花生&#xff0c;经过一株花生苗就能摘走该它上面所有的花生。Hel…

手写代码理解vue响应式原理

vue2响应式应用了Object.defineProperty&#xff0c;vue3中的响应式则是运用proxy。 目录标题1、defineProperty2、代码理解defineProperty3、手写vue2响应式原理4、vue2监听数组响应式5、Proxy理解6、总结1、defineProperty Object.defineProperty(obj, prop, descriptor) ob…

【8.索引篇】

索引分类 索引和数据就是位于存储引擎中&#xff1a; 按「数据结构」分类&#xff1a;Btree索引、Hash索引、Full-text索引。按「物理存储」分类&#xff1a;聚簇索引&#xff08;主键索引&#xff09;、二级索引&#xff08;辅助索引&#xff09;。按「字段特性」分类&#…

linux字符设备和块设备的区别 以及网络设备

一、字符设备 1、字符设备以字节为单位。大多数设备是字符设备&#xff0c;因为他们不需要缓冲而且不以固定块大小进行操作。 2、字符设备无需缓冲直接读写。 3、字符设备只能被顺序读写。 二、块设备 1、块设备只能以块为单位接受输入和输出。 2、块设备对I/0请求有对应的缓冲…

建立自己的博客

环境安装&#xff1a; w10系统安装 第一步&#xff1a;安装git Git 官网: https://git-scm.com/ 第二步&#xff1a;安装Node.js Node.js官网&#xff1a;https://nodejs.org/zh-cn/ 使用cmd检测&#xff1a; node -v 第三步&#xff1a;安装Hexo Hexo官网&#xff1a;htt…

PyInstaller 将DLL文件打包进exe

PyInstaller 将DLL文件打包进exe方法1&#xff1a;通过--add-data命令方法2&#xff1a;通过修改 .spec扩展&#xff1a;博主热门文章推荐&#xff1a;方法1&#xff1a;通过–add-data命令 注意&#xff1a;这里 dll末尾添加的.为当前目录&#xff0c;则该dll要放到main.py同一…

【零基础入门学习Python---Python的五大数据类型之字符串类型】

一.Python的五大数据类型之字符串类型 在Python中,变量用于存储数据。变量名可以是任何字母、数字和下划线的组合。Python支持多种数据类型,包括数字、字符串、列表、元组和字典。这篇文章我们就来学习一下五大数据类型中的字符串类型。 1.1 什么是字符串? 字符串是Pyth…

[acwing周赛复盘] 第 94 场周赛20230311

[acwing周赛复盘] 第 94 场周赛20230311 一、本周周赛总结二、 4870. 装物品1. 题目描述2. 思路分析3. 代码实现三、4871. 最早时刻1. 题目描述2. 思路分析3. 代码实现四、4872. 最短路之和1. 题目描述2. 思路分析3. 代码实现六、参考链接一、本周周赛总结 又是笨比的一周&…

保姆级图文教程 - VirtualBox安装配置Kali Linux

文章目录下载Kali Linux虚拟机包安装Kali用户配置网络配置静态ipDHCP分配IP换deb源下载Kali Linux虚拟机包 官网地址&#xff1a;https://www.kali.org/get-kali/#kali-virtual-machines 我们选择virtualbox版的&#xff0c;就是最中间的那个。 安装Kali 将压缩包解压&…

计算机网络:传输层概述

传输层 只有主机才有的层次 传输层的功能&#xff1a; 1.传输层提供进程与进程之间的逻辑通信。 2.复用&#xff1a;应用层的所有进程可以都使用一同传输层协议。 3.分用&#xff1a;传输层从网络层收到数据后&#xff0c;交付给指明的应用进程。 4.传输层对收到的报文进行差错…

二十一、Django-restframework之序列化器补充

一、常用序列化器字段 序列化器字段处理基元值和内部数据类型之间的转换。它们还处理输入值的验证&#xff0c;以及从它们的父对象检索和设置值。 &#xff08;1&#xff09;核心参数 每个序列化器字段类构造函数至少接受这些参数。一些字段类还接受额外的&#xff0c;字段特…

STM8S系列基于IAR标准外设printf输出demo

STM8S系列基于IAR标准外设printf输出demo&#x1f4cc;STM8S/A标准外设库&#xff08;库版本V2.3.1&#xff09;&#x1f4cd;官网标准外设库&#xff1a;https://www.st.com/zh/embedded-software/stsw-stm8069.html ⛳注意事项 &#x1f6a9;在内存空间比较有限的情况下&am…

.vue 组件打包成 .js

.vue 组件打包成 .js *** 所有的内容 cli 官网都有 *** *** https://cli.vuejs.org/zh/guide/build-targets.html *** 所有的内容 cli 官网都有&#xff1a; https://cli.vuejs.org/zh/guide/build-targets.html 准备 几个 .vue 组件文件 import Main from ./components/Ma…

MySQL InnoDB存储引擎锁与事务实现原理解析(未完成)

InnoDB MySQL存储引擎是基于表的&#xff0c;也就是说每张表可以选择不同的存储引擎。 InnoDB存储引擎的表是索引组织的&#xff0c;也就是数据即索引。 存储引擎文件 InnoDB引擎会包含RedoLog重做日志文件和TableSpace表空间文件。 表空间文件 默认表空间文件&#xff08…

Win32 ListBox控件

Win32 ListBox控件 创建ListBox控件 创建窗口函数 HWND CrateWindowEx(DWORD dwExStyle , // 窗口的扩展风格,基本没用LPCTSTR lpClassName, // 已经注册的窗口类名称LPCTSTR lpWindowName, // 窗口标题栏的名字DWORD dwStyle, // 窗口的基本风格int x, // 左上角水平坐标int …