Springboot整合Kafka消息队列服务实例

news2025/1/21 22:05:16

一、Kafka相关概念

1、关于Kafka的描述

Kafka是由Apache开源,具有分布式、分区的、多副本的、多订阅者,基于Zookeeper协调的分布式处理平台,由Scala和Java语言编写。通常用来搜集用户在应用服务中产生的动作日志数据,并高速的处理。日志类的数据需要高吞吐量的性能要求,对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。

2、关于Kafka的功能特点

  1. 通过磁盘数据结构提供消息的持久化,消息存储也能够保持长时间稳定性;
  2. 高吞吐量,即使是非常普通的硬件Kafka也可以支持每秒超高的并发量;
  3. 支持通过Kafka服务器和消费机集群来分区消息;
  4. 支持Hadoop并行数据加载;
  5. API包封装的非常好,简单易用,上手快 ;
  6. 分布式消息队列。Kafka对消息保存时根据Topic(主题)进行归类,发送消息者称为Producer(生产者),消息接受者称为Consumer(消费者);

3、Kafka消息功能

如下图所示,Kafka作为一个中间服务,代表一个broker(经纪人)角色,负责接收APP的消费与推送消息给其他相关APP。这里APP可分为Producer,Consumer。

消息的消费模式

点对点模式:点对点模式通常是一个基于拉取或者轮询的消息传递模型,消费者主动拉取数据,消息收到后从队列移除消息,这种模型不是将消息推送到客户端,而是从队列中请求消息。特点是发送到队列的消息被一个且只有一个消费者接收处理,即使有多个消费者监听队列也是如此。

发布订阅模式:订阅模式是一个基于推送的消费传送模型,消息产生后,Kafka会推送给所有订阅相关Topic的订阅者。发布订阅模型可以有多种不同的订阅者,临时订阅者只在主动监听主题时才接收消息,而持久订阅者则监听主题的所有消息,即使当前订阅者不可用,处于离线状态。

4、Kafka消息队列的作用

  • 应用程序之间解耦,生产者与消费者相互独立,各自异步执行。
  • 消息数据持久化存储,直到所有消息都被消费,规避消息数据丢失的风险。
  • 流量削峰,使用Kafka消息队列可以帮助server承接访问压力,尽可能避免应用程序崩溃。
  • 降低进程间的耦合度,系统部分应用组件发生崩溃时,不会影响到整体系统的运行。
  • 保证消息顺序执行,解决特定场景业务需求。

5、Kafka相关术语介绍

  • Broker

   一台kafka服务器就是一个broker(经纪人)。一个集群由多个broker组成。一个broker可以容纳多个topic(消息主题)。

  • Producer

    消息生产者,就是向kafka broker发消息的APP客户端。

  • Consumer

    消息消费者,向kafka broker取消息的APP客户端。

  • Topic

    每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic,可以理解为一个队列。

  • Consumer Group

     每个Consumer属于一个特定的Consumer Group,可为每个Consumer指定group name,若不指定group name则属于默认的分组。

  • Partition

一个庞大大的topic可以分布到多个broker上,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id。kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体的顺序。Partition是物理上的概念,方便在集群中扩展,提高并发。

二、liunx系统下搭建Kafka环境

       

--新建kafka应用目录。并下载到当前目录下
cd  /usr/local

mkdir kafka

cd kafka 
--下载

wget https://downloads.apache.org/kafka/3.7.0/kafka-3.7.0-src.tgz

--解压

tar -zxvf  kafka-3.7.0-src.tgz

--启动服务

cd kafka-3.7.0

./bin/kafka-server-start.sh    config/server.properties

--查看服务

ps -aux |grep kafka

--开放kafka地址端口

vim server.properties

--添加下面注释

advertised.listeners=PLAINTEXT://10.98.3.22:9092

三、Springboot2整合Kafka 服务

1、导入基础依赖

<!-- SpringBoot依赖 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- kafka 依赖 -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.2.4.RELEASE</version>
</dependency>

2、项目目录结构

3、生产者与消费者yml文件配置

#消费者配置
spring:
  kafka:
    bootstrap-servers: 127.0.0.1:9092
    consumer:
      group-id: test-consumer-group


#生产者配置

spring:
  kafka:
    bootstrap-servers: 127.0.0.1:9092

4、生成消息

@RestController
@RequestMapping("/kafka")
public class ProducerController {

    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;

    @RequestMapping("/send")
    public HttpResult sendMsg () {
        MsgLog msgLog = new MsgLog(1,"消息生成",
                                 1,"消息日志",new Date()) ;
        String msg = JSON.toJSONString(msgLog) ;
        // 这里Topic如果不存在,会自动创建
        kafkaTemplate.send("cicada-topic", msg);
        return HttpResult.create(HttpStatus.SUCCESS,msg);
    }
}
@Component
public class ConsumerMsg {

    private static Logger LOGGER = LoggerFactory.getLogger(ConsumerMsg.class);
    
    //此注解是监听主题为cicada-topic的消息队列
    @KafkaListener(topics = "cicada-topic")
    public void listenMsg (ConsumerRecord<?,String> record) {
        String value = record.value();
        LOGGER.info("ConsumerMsg====>>"+value);
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1837607.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

创建Docker容器与外部机通信(独立IP的方式)

需求&#xff1a;希望外部可以直接通过不同IP地址访问宿主机上的Docker容器&#xff0c;而不需要端口映射&#xff08;同一个IP不同的端口与外部通讯&#xff09;&#xff0c;这通常涉及到在宿主机的网络层面进行更高级的配置&#xff0c;比如使用IP伪装&#xff08;IP masquer…

前端网站(二)-- 菜单页面【附源码直接可用】

菜单页面 开篇&#xff08;请大家看完&#xff09;&#xff1a;此网站写给挚爱&#xff0c;后续页面还会慢慢更新&#xff0c;大家敬请期待~ ~ ~ 轻舟所编写这个前端框架的设计初衷&#xff0c;纯粹是为了哄对象开心。除此之外&#xff0c;并无其它任何用途或目的。 此前端框…

《系统架构设计师教程(第2版)》第11章-未来信息综合技术-04-边缘计算

文章目录 1. 概述1.1 简介1.2 三类落地形态1.2.1 云边缘1.2.2 边缘云1.2.3 云化网关 2. 边缘计算的特点2.1 联接性2.2 数据第一入口2.3 约束性2.4 分布性 3. 边云协同3.1 概述3.2 六种协同3.2.1 资源协同3.2.2 数据协同3.2.3 智能协同3.2.4 应用管理协同3.2.5 业务管理协同3.2.…

STM32单片机USART串口详解

文章目录 1. 通信接口概述 2. 串口通信 3. 硬件电路 4. 电平标准 5. 串口参数及时序 5.1 数据帧的组成 5.2 起始位 5.3 数据位 5.4 校验位 5.5 停止位 5.6 波特率 5.7 数据帧传输过程示例 6. 串口时序 7. USART概述 8. USART框图 9. USART基本结构 10. 数据帧…

Java | Leetcode Java题解之第146题LRU缓存

题目&#xff1a; 题解&#xff1a; public class LRUCache {class DLinkedNode {int key;int value;DLinkedNode prev;DLinkedNode next;public DLinkedNode() {}public DLinkedNode(int _key, int _value) {key _key; value _value;}}private Map<Integer, DLinkedNode…

UE4_材质_雨滴涟漪效果ripple effect_ben教程

学习笔记&#xff0c;不喜勿喷&#xff01;侵权立删&#xff0c;祝愿生活越来越好&#xff01; 雨水落下时会产生这些非常漂亮的同心环波纹&#xff0c;我们要做的第一件事是创建一个单个的圆环遮罩动画&#xff0c;我们希望环在开始的时候在中心很小&#xff0c;然后放大&…

45岁TVB女星惊爆与大15岁老公离婚

45岁的姚嘉妮今日被传媒爆出与结婚18年老公林祖辉离婚&#xff0c;她接受其他传媒访问时&#xff0c;指两人分开不涉及第三者&#xff0c;而一对子女亦会共同抚养&#xff0c;而更令人惊讶的是&#xff0c;两人已经离婚多年&#xff0c;女方形容大家现在关系为家人。 不过9年前…

MBR40100CT-ASEMI无人机专用MBR40100CT

编辑&#xff1a;ll MBR40100CT-ASEMI无人机专用MBR40100CT 型号&#xff1a;MBR40100CT 品牌&#xff1a;ASEMI 封装&#xff1a;TO-220 最大平均正向电流&#xff08;IF&#xff09;&#xff1a;40A 最大循环峰值反向电压&#xff08;VRRM&#xff09;&#xff1a;100V…

深入Node.js:实现网易云音乐数据自动化抓取

随着互联网技术的飞速发展&#xff0c;数据已成为企业和个人获取信息、洞察市场趋势的重要资源。音频数据&#xff0c;尤其是来自流行音乐平台如网易云音乐的数据&#xff0c;因其丰富的用户交互和内容多样性&#xff0c;成为研究用户行为和市场动态的宝贵资料。本文将深入探讨…

maven安装、配置与plugins报红的解决方法

maven安装与配置 1.在官网中下载maven 官网地址&#xff1a;https://maven.apache.org/ 2.解压maven压缩包 将maven解压到一个没有中文的文件夹中&#xff1a; 3.配置环境变量并验证 1、在设置中找到 “高级系统设置”&#xff0c;点击后再点击环境变量&#xff1a; 2、在 “系…

Arcgis导入excel出现的问题

我手动添加了object-id字段也没有用&#xff0c;然后再excel里面又添加了一行&#xff0c;关闭后打开还是不行&#xff0c;额案后在网上看到了一种方法&#xff0c;很有效&#xff0c;予以记录。 1、我的文件是csv格式&#xff0c; 先在excel里面另存为xlsx格式 2、转换工具里…

【植物大战僵尸】C语言教程

Hi~&#xff01;这里是奋斗的小羊&#xff0c;很荣幸您能阅读我的文章&#xff0c;诚请评论指点&#xff0c;欢迎欢迎 ~~ &#x1f4a5;&#x1f4a5;个人主页&#xff1a;奋斗的小羊 &#x1f4a5;&#x1f4a5;所属专栏&#xff1a;C语言 &#x1f680;本系列文章为个人学习…

云徙科技助力竹叶青实现用户精细化运营,拉动全渠道销售额增长

竹叶青茶以其别具一格的风味与深厚的历史底蕴&#xff0c;一直被誉为茶中瑰宝。历经千年的传承与创新&#xff0c;竹叶青不仅坚守着茶叶品质的极致追求&#xff0c;更在数字化的浪潮中&#xff0c;率先打破传统&#xff0c;以科技力量赋能品牌&#xff0c;成为茶行业的领军者。…

深度剖析ElasticSearch分页原理与深分页问题|ES深分页问题|ES分页原理剖析

文章目录 ES分页|Paginate search resultsES深分页的问题一页获取数据量太大&#xff0c;报错分页深度太大&#xff0c;报错官方解释 其他解决方案Search after解决两个问题 有没有深分页查询的必要性&#xff1f;search after & PIT的使用方式1.创建pit2.首次查询3.之后的…

33、循环语句--函数---递归+阶乘

一、函数 1.1、shell的函数 1.1.1、函数的定义&#xff1a;将命令序列按照格式写在一起。格式指的是函数的固定格式。两种格式。 for i in {}do命令序列doneif []then 命令序列else命令序列fi #可以作为一个命令序列作用&#xff1a;方便重复使用&#xff0c;函数库&…

在 Go 中如何让结构体不可比较?

最近我在使用 Go 官方出品的结构化日志包 slog 时&#xff0c;看到 slog.Value 源码中有一个比较好玩的小 Tips&#xff0c;可以限制两个结构体之间的相等性比较&#xff0c;本文就来跟大家分享下。 在 Go 中结构体可以比较吗&#xff1f; 在 Go 中结构体可以比较吗&#xff…

Elasticsearch docker 安装及基本用法

创建网络 首先通过命令创建一个网络 docker network create es-net然后查看网络 [rootDocker ~]# docker network ls NETWORK ID NAME DRIVER SCOPE 4e315f5e3ae7 bridge bridge local a501a9f3b4ee es-net bridge local ebca66b02e8c host …

十一、数据结构(图的最短路)

文章目录 基础部分最短路径问题用 D F S DFS DFS搜索所有的路径用 B F S BFS BFS求最短路径 最短路算法 F l o y d Floyd Floydcode(Floyd的实现): S P F A SPFA SPFAcode(基于邻接表的 S P F A ) SPFA) SPFA) D i j k s t r a Dijkstra Dijkstracode&#xff08;dijkstra的实现…

PHP实现企业微信素材上传与获取的完整指南与踩坑日记

企业微信作为一款专门为企业打造的即时通讯工具&#xff0c;提供了丰富的功能和接口&#xff0c;其中包括素材管理。素材管理在企业内部的沟通、分享和展示中起着重要的作用。本篇文章将介绍如何使用PHP语言对接企业微信素材上传和获取的功能。 ## 1. 准备工作 首先&#xff0…

AI数据分析:Excel表格智能判断数据起点来计算增长率

工作任务&#xff1a;计算Excel表格中2023年1月到2024年4月的总增长率和复合增长率。 如果数据都有的情况下&#xff0c;公式很简单&#xff1a; 总增长率 (O2-B2)/B2 复合增长率 POWER((O2/B2),1/13)-1 但是&#xff0c;2023年1月、2月、3月的数据&#xff0c;有些有&…