【Kafka】Kafka 简介 || CentOS7 安装 Kafka || SpringBoot整合 Kafka 方式

news2024/11/24 20:28:40

最近的精神状态真的不是很好,刚刚脱离🐏羊的苦海,收获了很多吧,任何经历都是我们成长的关键。本文是我 Kafka 入门部分的一个笔记,大家如果有有疑问的地方可以评论区或者私信我,我看见了都会回复的。最后,祝大家都身体健康 ~

1. 什么是 Kafka ?

Kafka是最初由 Linkedin 公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统 \ MessageQueue系统。可以用于 web/nginx 日志、访问日志,消息服务以及微服务。Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。

zookeeper : 协调组件共同工作的组件

主要应用场景是:日志收集系统和消息系统

举个栗子

生产者消费者,生产者生产鸡蛋,消费者消费鸡蛋,生产者生产一个鸡蛋,消费者就消费一个鸡蛋。

  • 假设消费者消费鸡蛋的时候噎住了(系统宕机了),生产者还在生产鸡蛋,那新生产的鸡蛋就丢失了;
  • 再比如生产者很强劲(大交易量的情况),生产者1秒钟生产100个鸡蛋,消费者1秒钟只能吃50个鸡蛋,那要不了一会,消费者就吃不消了(消息堵塞,最终导致系统超时),消费者拒绝再吃了,”鸡蛋“又丢失了;

这个时候我们放个篮子在它们中间,生产出来的鸡蛋都放到篮子里,消费者去篮子里拿鸡蛋,这样鸡蛋就不会丢失了,都在篮子里,而这个篮子就是”kafka“。鸡蛋其实就是“数据流”,系统之间的交互都是通过“数据流”来传输的(就是tcp、https什么的),也称为报文,也叫“消息”。消息队列满了,其实就是篮子满了,”鸡蛋“ 放不下了,那赶紧多放几个篮子,其实就是kafka的扩容。各位现在知道kafka是干什么的了吧,它就是那个”篮子“

术语解释

  • producer:生产者,就是它来生产“鸡蛋”的
  • consumer:消费者,生出的“鸡蛋”它来消费
  • topic:主题,可理解为标签,生产者每生产出来一个鸡蛋就贴上一个标签(topic),消费者可不是谁生产的“鸡蛋”都吃的,这样不同的生产者生产出来的“鸡蛋”,消费者就可以选择性的“吃”了
  • broker:就是篮子了


二、Kafka 设计目标

  • 以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证访问性能。
  • 高吞吐率 :即使在非常廉价的商用机器上也能做到单机支持每秒100K条消息的传输。
  • 支持Kafka Server间的消息分区,及分布式消费,同时保证每partition内的消息顺序传输。
  • 同时支持离线数据处理和实时数据处理。
  • Scale out:支持在线水平扩展


三、什么是消息系统

  • 一个消息系统负责将数据从一个应用传递到另外一个应用,应用只需关注于数据,无需关注数据在两个或多个应用间是如何传递的。
  • 分布式消息传递基于可靠的消息队列,在客户端应用和消息系统之间异步传递消息。
  • 有两种主要的消息传递模式:点对点传递模式、发布-订阅模式。大部分的消息系统选用发布-订阅模式。Kafka就是一种发布-订阅模式。


四、Kafka 优点

  1. 解耦
    在项目启动之初来预测将来项目会碰到什么需求,是极其困难的。消息系统在处理过程中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口。这允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
  2. 冗余
    有些情况下,处理数据的过程会失败。除非数据被持久化,否则将造成丢失。消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。许多消息队列所采用的"插入-获取-删除"范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕。
  3. 扩展性
    因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可。不需要改变代码、不需要调节参数。扩展就像调大电力按钮一样简单。
  4. 灵活性&峰值处理能力
    在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见;如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。
  5. 可恢复性
    系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。
  6. 顺序保证
    在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。Kafka保证一个Partition内的消息的有序性。


五、Kafka与其他MQ对比

  1. RabbitMQ
    RabbitMQ是使用Erlang编写的一个开源的消息队列,本身支持很多的协议:AMQP,XMPP, SMTP, STOMP,也正因如此,它非常重量级,更适合于企业级的开发。同时实现了Broker构架,这意味着消息在发送给客户端时先在中心队列排队。对路由,负载均衡或者数据持久化都有很好的支持。
  2. Redis
    Redis是一个基于Key-Value对的NoSQL数据库,开发维护很活跃。虽然它是一个Key-Value数据库存储系统,但它本身支持MQ功能,所以完全可以当做一个轻量级的队列服务来使用。对于RabbitMQ和Redis的入队和出队操作,各执行100万次,每10万次记录一次执行时间。测试数据分为128Bytes、512Bytes、1K和10K四个不同大小的数据。实验表明:入队时,当数据比较小时Redis的性能要高于RabbitMQ,而如果数据大小超过了10K,Redis则慢的无法忍受;出队时,无论数据大小,Redis都表现出非常好的性能,而RabbitMQ的出队性能则远低于Redis。
  3. ActiveMQ
    ActiveMQ是Apache下的一个子项目。 类似于ZeroMQ,它能够以代理人和点对点的技术实现队列。同时类似于RabbitMQ,它少量代码就可以高效地实现高级应用场景。
  4. Kafka/Jafka
    Kafka是Apache下的一个子项目,是一个高性能跨语言分布式发布/订阅消息队列系统,而Jafka是在Kafka之上孵化而来的,即Kafka的一个升级版。具有以下特性:快速持久化,可以在O(1)的系统开销下进行消息持久化;高吞吐,在一台普通的服务器上既可以达到10W/s的吞吐速率;完全的分布式系统,Broker、Producer、Consumer都原生自动支持分布式,自动实现负载均衡;支持Hadoop数据并行加载,对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka通过Hadoop的并行加载机制统一了在线和离线的消息处理。Apache Kafka相对于ActiveMQ是一个非常轻量级的消息系统,除了性能非常好之外,还是一个工作良好的分布式系统。


六、Kafka 安装配置

本文写于 2022/12/ 14 日,安装教程如下,但是安装教程是随时可能变化的,万一10年后的你看到这个教程就… (那时候我应该还在世界上,哈哈哈哈,不过可能早就被这行淘汰了),到时候大家去官网看安装教程就行了,我这里也只是把自己安装官网安装的过程写出来

官网教程安装地址: https://kafka.apache.org/quickstart

1. 下载、传输、然后解压

把压缩包下载好,随便找个向虚拟机传输文件的工具,传输进去

然后解压 Kafka

tar -xzf kafka_2.13-3.3.1.tgz

2. 开启 ZooKeeper

bin/zookeeper-server-start.sh config/zookeeper.properties

3. 再开一个终端,开启 kafka

开启 ZooKeeper 的终端,开启成功后,会卡死,再另外开启一个终端,开启 kafka

bin/kafka-server-start.sh config/server.properties

4. 再开一个终端,模拟生产者生产消息,消费者消费消息

开启Kafka 的终端,开启成功后,会卡死,再另外开启一个终端,测试 kafka

  1. 创建一个 topic ,名称为 test
bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092
  1. 创建一个生产者,向 test 中加入消息,不想加入了,按 Ctrl + C 停止即可
bin/kafka-console-producer. --topic test --bootstrap-server localhost:9092
  1. 创建一个消费者,取出 test 中的消息
bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092


七、SpringBoot 整合 Kafka

1. 先修改虚拟机上的 kafka 配置

Kafka默认只监听本机消息,配置Kafka监听其他IP消息的方式如下:

找到 Kafka 安装目录下 config 下的 server.properties 配置文件,修改如下,其中 192.168.150.131:9092 部分修改为你要监听的 ip 地址

listeners=PLAINTEXT://192.168.150.131:9092

2. 重启 kafka

Ctrl + C 停掉 ZooKeeper & Kafka 然后安装上面的方式重新启动即可

3. 创建 SpringBoot 项目

1) 导入 Kafka 依赖

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

2) 编写 yml 配置文件: 修改为你自己的 ip 地址即可

生产者类:

spring:
  kafka:
    producer:
      bootstrap-servers: 192.168.244.130:9092
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
kafka:
  topic:
    test: test
logging:
  level:
    root: DEBUG

消费者类 :

spring:
  kafka:
    consumer:
      #指定消息组
      group-id: test
      # 指定消息被消费之后自动提交偏移量,以便下次继续消费
      enable-auto-commit: true
      bootstrap-servers: 192.168.244.130:9092
      # 指定从最近地方开始消费(earliest)
      auto-offset-reset: latest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
kafka:
  topic:
    test: test

3)编写生产者消息发送类和消费者消息接收类

生产者消息发送类:

package com.example.demo;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;

import javax.annotation.Resource;

@Component
public class KafkaMessageProducer {
    private static final Logger logger = LoggerFactory.getLogger(KafkaMessageProducer.class);
    @Resource
    private KafkaTemplate<String, String> kafkaTemplate; // 装配Kafka模板Bean
    @Value("${kafka.topic.test}") // 读取配置文件中Topic的设置
    public String topic;
    public void send() {
        String message = "Hello World -6.18--" + System.currentTimeMillis();
        // 向kafka发送消息
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(this.topic, message);
        // 设置成功与失败的回调方法
        future.addCallback(success -> logger.info("KafkaMessageProducer 发送消息成功!"),
                fail -> logger.error("KafkaMessageProducer 发送消息失败!"));
    }
}

消费者消息接收类:

package com.example.demo;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

@Component
public class KafkaMessageConsumer {
    private static final Logger logger = LoggerFactory.getLogger(KafkaMessageConsumer.class);
    @KafkaListener(topics = {"${kafka.topic.test}"})
    public void receive(@Payload String message, @Headers MessageHeaders headers){
        logger.info("KafkaMessageConsumer 接收到消息:" + message);
        headers.keySet().forEach(key->logger.info("{}: {}",key,headers.get(key)));
    }
}

4)测试类

package com.example.demo;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import static org.junit.Assert.*;
@RunWith(SpringRunner.class)
@SpringBootTest
public class KafkaMessageProducerTest {
    @Autowired
    KafkaMessageProducer kafkaMessageProducer;
    @Test
    public void send() {
        for (int index = 0; index < 10; index ++) {
            kafkaMessageProducer.send();
        }
    }
}


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/89463.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

安卓玩机搞机技巧综合资源-----手机隐藏拍照录像 取证软件 寻找隐藏摄像头 【十六】

接上篇 安卓玩机搞机技巧综合资源------如何提取手机分区 小米机型代码分享等等 【一】 安卓玩机搞机技巧综合资源------开机英文提示解决dm-verity corruption your device is corrupt. 设备内部报错 AB分区等等【二】 安卓玩机搞机技巧综合资源------EROFS分区格式 小米红…

计算机网络学习笔记(V):传输层

目录 1 传输层概述 1.1 功能 1.2 两种协议 1.TCP 2.UDP协议 1.3 传输层的寻址与端口 2 UDP协议 2.1 特点 2.2 首部格式 2.3 UDP检验 3 TCP协议 3.1 TCP协议 1.特点 2.报文段首部格式 3.2 TCP连接管理 1.连接建立 2.TCP连接释放 3.3 TCP可靠传输 1.校验 2.序…

java+MySQL 基于ssm的视频播放网站

随着现代视频播放网站管理的快速发展,可以说视频播放网站管理已经逐渐成为现代视频播放网站管理过程中最为重要的部分之一。但是一直以来我国传统的视频播放网站管理并没有建立一套完善的行之有效的视频播放网站管理系统,传统的视频播放网站管理已经无法适应高速发展,无论是从效…

中小型水库雨水情测报平台有哪些功能?水库雨水情数据孪生安全监测系统

平升电子中小型水库雨水情测报平台/雨水情监测及视频监控解决方案/水库雨水情数据孪生安全监测系统统辅助水利管理部门实现水库雨水情信息“全要素、全量程、全覆盖”自动测报。系统具备水库水位、雨量、现场图像/视频等水文信息采集、传输、处理及预警广播等功能&#xff0c;有…

美股l2接口有什么特别的功能服务?

美股l2接口主要是面向做美股投资的level2行行情接口&#xff0c;接下来小编说说它的两大特色&#xff01; 1、买卖盘从各五档扩展到各十档 为投资者、基金、机构提供更多的交易参考数据&#xff0c;可以更准确地确定交易任务的完成程度。 美股l2接口&#xff08;十档行情快照…

【ShaderGraph】关于ShaderGraph的的介绍和入门

目录 一.ShaderGraph入门介绍 二.创建SRP工程&#xff08;URP或者HDRP&#xff09; 1.创建URP工程 2.创建HDRP工程 三.创建一个Shader Graph文件资源 ​​​​​​​ 一.ShaderGraph入门介绍 Shader Graph 使您能够直观地构建着色器。您无需编写代码&#xff0c;而是在图…

实验室管理系统

开发工具(eclipse/idea/vscode等)&#xff1a; 数据库(sqlite/mysql/sqlserver等)&#xff1a; 功能模块(请用文字描述&#xff0c;至少200字)&#xff1a;模块划分&#xff1a;班级模块、老师模块、学生模块、实验室模块、试脸模块、作业模块、作业提交、作业打分 管理员功能&…

ICG-Carboxylic Acid ICG标记羧基

ICG-Carboxylic Acid ICG标记羧基 外观&#xff1a; 绿色固体粉末 CAS: 181934-09-8 分子式&#xff1a;C45H50N2O5S 相对分子质量(g/mol): 730.6 溶解性&#xff1a;DMF/DMSO 注意事项 避免长时间接触光线。 保存条件&#xff1a;-20避光。 产品应用 ICG是一种带负电…

OA办公系统,推动企业管理革新

近年来&#xff0c;随着经济形势的发展与现代互联信息技术的高速发展&#xff0c;OA办公系统软件逐渐为人们所熟知&#xff0c;成为企业管理革新的重要选择。 1.OA办公系统助力企业实现组织管理 企业OA财务费控办公系统的扁平化组织可以说是组织创新最重要的一个利器。通过OA财…

基于springboot的电影推荐网站设计与实现(协同算法推荐)

项目描述 临近学期结束&#xff0c;还是毕业设计&#xff0c;你还在做java程序网络编程&#xff0c;期末作业&#xff0c;老师的作业要求觉得大了吗?不知道毕业设计该怎么办?网页功能的数量是否太多?没有合适的类型或系统?等等。这里根据疫情当下&#xff0c;你想解决的问…

猿如意中的【Qt Creator】工具详情介绍

一、工具名称 Qt Creator 二、下载安装渠道 Qt Creator通过CSDN官方开发的【猿如意】客户端进行下载安装。 2.1 什么是猿如意&#xff1f; 猿如意是一款面向开发者的辅助开发工具箱&#xff0c;包含了效率工具、开发工具下载&#xff0c;教程文档&#xff0c;代码片段搜索&…

2019年第一届“长安杯”电子数据取证竞赛答案解析

C D A 仿真后用uname -r查看 B PV&#xff1a;psycial volume VG&#xff1a;volume group LV&#xff1a;logical volume X-Ways检测的分区3、4是root和swap C LBA&#xff1a;Logical Block Address D 应为分区4 C D 39999->22 前面为主机端口&#xff0c;后面为dock…

C语言文件操作(1)

个人主页&#xff1a;平行线也会相交 欢迎 点赞&#x1f44d; 收藏✨ 留言✉ 加关注&#x1f493;本文由 平行线也会相交 原创 收录于专栏【C/C】 目录什么是文件程序文件数据文件文件名文件类型文件缓冲区文件指针什么是文件 磁盘的文件是文件 但在程序设计中&#xff0c;我…

诊断和响应故障_执行块介质恢复(Block Media Recovery)

本章阐述如何还原和恢复数据文件中个别的数据块。 1&#xff0e;块介质恢复概述 块介质恢复通过恢复损坏的数据块提供更短的平均恢复时间&#xff08;MTTR&#xff09;。 1.1&#xff0e;块介质恢复的目的 使用块介质恢复来恢复数据文件中一个或多个损坏的数据块。 块介质…

whistle修改接口返回步骤(只影响前端展示,不会插入数据到数据库)

本文章向大家介绍whistle修改接口返回步骤&#xff08;只影响前端展示&#xff0c;不会插入数据到数据库&#xff09;&#xff0c;主要包括whistle修改接口返回步骤&#xff08;只影响前端展示&#xff0c;不会插入数据到数据库&#xff09;使用实例、应用技巧、基本知识点总结…

m基于高阶累积量和信号子空间的信噪比估计方法的matlab仿真

目录 1.算法描述 2.仿真效果预览 3.MATLAB核心程序 4.完整MATLAB 1.算法描述 随着信息技术的飞速发展&#xff0c;信息战逐渐成为战争的一个重要方式。因此&#xff0c;掌握战场的信息控制权是赢得战争的重要因素。在信息战中&#xff0c;为了干扰和破坏对方的通信系统&…

Rvit中添加尺寸标注的前缀后缀和【一键尺寸定位标注】

一、Rvit中如何添加尺寸标注的前缀后缀 在进行尺寸标注尤其是多段连续的尺寸标注时常会用到诸如&#xff1a;3x20006000的标注样式&#xff0c;在Revt中应如何实现&#xff1f; 我们可以采用以下方法来解决&#xff1a; 1.如图1所示&#xff0c;先对该连续物体进行普通的对齐标…

前端基础(四)_CSS层叠样式表_什么是css_css样式的引入方式_样式表的优先级_样式选择器

一、什么是css? 主要用于html页面文本内容、图片外形、版本布局等外观样式的设置 二、css语法规则 语法&#xff1a; 选择器 声明语句&#xff1b; div {width: 200px;}div就是选择器 花括号中属性键值对就是 属性名:属性值 二、css样式的引入方式 1、行内样式–内联式 语…

uniapp中根据URL链接生成二维码(适用所有Javascript运行环境的前端应用),保存二维码到本地相册。

推荐一款适用所有Javascript运行环境的二维码生成组件。 uQRCode是一款基于Javascript环境开发的二维码生成插件&#xff0c;适用所有Javascript运行环境的前端应用和Node.js应用。 uQRCode可扩展性高&#xff0c;它支持自定义渲染二维码&#xff0c;可通过uQRCode API得到二…

转座子 垃圾DNA是指DNA中不编码蛋白质序列的片段,是DNA中最神秘的部分之一。

什麼是「垃圾DNA」&#xff1f; | GeneOnline News 垃圾dna与转座子 基因组重复序列分类 转座子 - 组学大讲堂问答社区 (omicsclass.com) (122条消息) 说说基因组的垃圾DNA-Transposable elements_msw521sg的博客-CSDN博客 Cell:转座子“跳跃”过程调控机制 - 生物研究专…