spring boot 使用 Kafka

news2024/12/28 20:37:49

一、Kafka作为消息队列的好处

  1. 高吞吐量:Kafka能够处理大规模的数据流,并支持高吞吐量的消息传输。

  2. 持久性:Kafka将消息持久化到磁盘上,保证了消息不会因为系统故障而丢失。

  3. 分布式:Kafka是一个分布式系统,可以在多个节点上运行,具有良好的可扩展性和容错性。

  4. 支持多种协议:Kafka支持多种协议,如TCP、HTTP、UDP等,可以与不同的系统进行集成。

  5. 灵活的消费模式:Kafka支持多种消费模式,如拉取和推送,可以根据需要选择合适的消费模式。

  6. 可配置性强:Kafka的配置参数非常丰富,可以根据需要进行灵活配置。

  7. 社区支持:Kafka作为Apache旗下的开源项目,拥有庞大的用户基础和活跃的社区支持,方便用户得到及时的技术支持。

二、springboot中使用Kafka

  1. 添加依赖:在pom.xml文件中添加Kafka的依赖,包括spring-kafka和kafka-clients。确保版本与你的项目兼容。

  2. 创建生产者:创建一个Kafka生产者类,实现Producer接口,并使用KafkaTemplate发送消息。

  3. 配置生产者:在Spring Boot的配置文件中配置Kafka生产者的相关参数,例如bootstrap服务器地址、Kafka主题等。

  4. 发送消息:在需要发送消息的地方,注入Kafka生产者,并使用其发送消息到指定的Kafka主题。

  5. 创建消费者:创建一个Kafka消费者类,实现Consumer接口,并使用KafkaTemplate订阅指定的Kafka主题。

  6. 配置消费者:在Spring Boot的配置文件中配置Kafka消费者的相关参数,例如group id、auto offset reset等。

  7. 接收消息:在需要接收消息的地方,注入Kafka消费者,并使用其接收消息。

  8. 处理消息:对接收到的消息进行处理,例如保存到数据库或进行其他业务逻辑处理。

三、使用Kafka

pom中填了依赖

<dependency>  
    <groupId>org.springframework.kafka</groupId>  
    <artifactId>spring-kafka</artifactId>  
    <version>2.8.1</version>  
</dependency>  
<dependency>  
    <groupId>org.apache.kafka</groupId>  
    <artifactId>kafka-clients</artifactId>  
    <version>2.8.1</version>  
</dependency>
  1. 创建生产者:创建一个Kafka生产者类,实现Producer接口,并使用KafkaTemplate发送消息。

import org.apache.kafka.clients.producer.*;  
import org.springframework.beans.factory.annotation.Value;  
import org.springframework.kafka.core.KafkaTemplate;  
import org.springframework.stereotype.Component;  
  
@Component  
public class KafkaProducer {  
    @Value("${kafka.bootstrap}")  
    private String bootstrapServers;  
  
    @Value("${kafka.topic}")  
    private String topic;  
  
    private KafkaTemplate<String, String> kafkaTemplate;  
  
    public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {  
        this.kafkaTemplate = kafkaTemplate;  
    }  
  
    public void sendMessage(String message) {  
        Producer<String, String> producer = new KafkaProducer<>(bootstrapServers, new StringSerializer(), new StringSerializer());  
        try {  
            producer.send(new ProducerRecord<>(topic, message));  
        } catch (Exception e) {  
            e.printStackTrace();  
        } finally {  
            producer.close();  
        }  
    }  
}
  1. 配置生产者:在Spring Boot的配置文件中配置Kafka生产者的相关参数,例如bootstrap服务器地址、Kafka主题等。

import org.springframework.context.annotation.Bean;  
import org.springframework.context.annotation.Configuration;  
import org.springframework.kafka.core.DefaultKafkaProducerFactory;  
import org.springframework.kafka.core.KafkaTemplate;  
import org.springframework.kafka.core.ProducerFactory;  
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;  
import org.springframework.kafka.core.ConsumerFactory;  
import org.springframework.kafka.core.ConsumerConfig;  
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;  
import org.springframework.kafka.listener.MessageListener;  
import org.springframework.context.annotation.PropertySource;  
import java.util.*;  
import org.springframework.beans.factory.*;  
import org.springframework.*;  
import org.springframework.*;expression.*;value; 																																		 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	  @Value("${kafka}")   Properties kafkaProps = new Properties(); @Bean public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> pf){ KafkaTemplate<String, String> template = new KafkaTemplate<>(pf); template .setMessageConverter(new StringJsonMessageConverter()); template .setSendTimeout(Duration .ofSeconds(30)); return template ; } @Bean public ProducerFactory<String, String> producerFactory(){ DefaultKafkaProducerFactory<String, String> factory = new DefaultKafkaProducerFactory<>(kafkaProps); factory .setBootstrapServers(bootstrapServers); factory .setKeySerializer(new StringSerializer()); factory .setValueSerializer(new StringSerializer()); return factory ; } @Bean public ConsumerFactory<String, String> consumerFactory(){ DefaultKafkaConsumerFactory<String, String> factory = new DefaultKafkaConsumerFactory<>(consumerConfigProps); factory .setBootstrapServers(bootstrapServers); factory .setKeyDeserializer(new StringDeserializer()); factory .setValueDeserializer(new StringDeserializer()); return factory ; } @Bean public ConcurrentMessageListenerContainer<String, String> container(ConsumerFactory<String, String> consumerFactory, MessageListener listener){ ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(consumerFactory); container .setMessageListener(listener); container .setConcurrency(3); return container ; } @Bean public MessageListener

消费者

import org.apache.kafka.clients.consumer.*;  
import org.springframework.kafka.core.KafkaTemplate;  
import org.springframework.stereotype.Component;  
  
@Component  
public class KafkaConsumer {  
    @Value("${kafka.bootstrap}")  
    private String bootstrapServers;  
  
    @Value("${kafka.group}")  
    private String groupId;  
  
    @Value("${kafka.topic}")  
    private String topic;  
  
    private KafkaTemplate<String, String> kafkaTemplate;  
  
    public KafkaConsumer(KafkaTemplate<String, String> kafkaTemplate) {  
        this.kafkaTemplate = kafkaTemplate;  
    }  
  
    public void consume() {  
        Consumer<String, String> consumer = new KafkaConsumer<>(consumerConfigs());  
        consumer.subscribe(Collections.singletonList(topic));  
        while (true) {  
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));  
            for (ConsumerRecord<String, String> record : records) {  
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());  
            }  
        }  
    }  
  
    private Properties consumerConfigs() {  
        Properties props = new Properties();  
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);  
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);  
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");  
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");  
        return props;  
    }  
}

四、kafka与rocketMQ比较

Kafka和RocketMQ都是开源的消息队列系统,它们具有许多相似之处,但在一些关键方面也存在差异。以下是它们在数据可靠性、性能、消息传递方式等方面的比较:

  1. 数据可靠性:
  • Kafka使用异步刷盘方式,而RocketMQ支持异步实时刷盘、同步刷盘、同步复制和异步复制。这使得RocketMQ在单机可靠性上比Kafka更高,因为它不会因为操作系统崩溃而导致数据丢失。此外,RocketMQ新增的同步刷盘机制也进一步保证了数据的可靠性。
  1. 性能:
  • Kafka和RocketMQ在性能方面各有千秋。由于Kafka的数据以partition为单位,一个Kafka实例上可能有多达上百个partition,而一个RocketMQ实例上只有一个partition。这使得RocketMQ可以充分利用IO组的commit机制,批量传输数据,从而在replication时具有更好的性能。然而,Kafka的异步replication性能理论上低于RocketMQ的replication,因为同步replication与异步replication相比,性能上会有约20%-30%的损耗。
  1. 消息传递方式:
  • Kafka和RocketMQ在消息传递方式上也有所不同。Kafka采用Producer发送消息后,broker马上把消息投递给consumer,这种方式实时性较高,但会增加broker的负载。而RocketMQ基于Pull模式和Push模式的长轮询机制,来平衡Push和Pull模式各自的优缺点。RocketMQ的消息及时性较好,严格的消息顺序得到了保证。
  1. 其他特性:
  • Kafka在单机支持的队列数超过64个队列,而RocketMQ最高支持5万个队列。队列越多,可以支持的业务就越多。

五、kafka使用场景

  1. 实时数据流处理:Kafka可以处理大量的实时数据流,这些数据流可以来自不同的源,如用户行为、传感器数据、日志文件等。通过Kafka,可以将这些数据流进行实时的处理和分析,例如进行实时数据分析和告警。
  2. 消息队列:Kafka可以作为一个消息队列使用,用于在分布式系统中传递消息。它能够处理高吞吐量的消息,并保证消息的有序性和可靠性。
  3. 事件驱动架构:Kafka可以作为事件驱动架构的核心组件,将事件数据发布到不同的消费者,以便进行实时处理。这种架构可以简化应用程序的设计和开发,提高系统的可扩展性和灵活性。
  4. 数据管道:Kafka可以用于数据管道,将数据从一个系统传输到另一个系统。例如,可以将数据从数据库或日志文件传输到大数据平台或数据仓库。
  5. 业务事件通知:Kafka可以用于通知业务事件,例如订单状态变化、库存更新等。通过订阅Kafka主题,相关的应用程序和服务可以实时地接收到这些事件通知,并进行相应的处理。
  6. 流数据处理框架集成:Kafka可以与流处理框架集成,如Apache Flink、Apache Spark等。通过集成,可以将流数据从Kafka中实时导入到流处理框架中进行处理,实现流式计算和实时分析。

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1429275.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Swift Vapor 教程(查询数据、插入数据)

上一篇简单写了 怎么创建 Swift Vapor 项目以及在开发过程中使用到的软件。 这一篇写一个怎么在创建的项目中创建一个简单的查询数据和插入数据。 注&#xff1a;数据库配置比较重要 先将本地的Docker启动起来&#xff0c;用Docker管理数据库 将项目自己创建的Todo相关的都删掉…

TQ15EG开发板教程:在VIVADO2023.1 以及VITIS环境下 检测DDR4

打开VIVADO2023.1 创建一个新的工程&#xff0c;设置工程名称和地址 选择RTL工程&#xff0c;勾选不添加文件 搜索15eg&#xff0c;选择xqzu15eg-ffrb1156-2-i 完成创建工程 添加设计模块 设置模块名称 在模块中添加mpsoc器件 双击器件进行配置 若有配置文件预设可以直接导入配…

ChatGPT的探索与实践-应用篇

这篇文章主要介绍在实际的开发过程当中&#xff0c;如何使用GPT帮助开发&#xff0c;优化流程&#xff0c;文末会介绍如何与618大促实际的业务相结合&#xff0c;来提升应用价值。全是干货&#xff0c;且本文所有代码和脚本都是利用GPT生成的&#xff0c;请放心食用。 场景一&…

Windows10 安装 OpenSSH 配置 SFTP服务器

1、下载 https://github.com/PowerShell/Win32-OpenSSH/releases 2、默认安装 3、创建用户 4、修改配置文件 C:\ProgramData\ssh\sshd_config# 最后一行后面加入 ForceCommand internal-sftp# 设置用户登录后默认目录 Match User sftpuser ChrootDirectory C:\SFTP# Disable…

(CVPR-2021)RepVGG:让 VGG 风格的 ConvNet 再次伟大

RepVGG&#xff1a;让 VGG 风格的 ConvNet 再次伟大 Title&#xff1a;RepVGG: Making VGG-style ConvNets Great Again paper是清华发表在CVPR 2021的工作 paper链接 Abstract 我们提出了一种简单但功能强大的卷积神经网络架构&#xff0c;它具有类似 VGG 的推理时间主体&…

自学网安-IIS服务器

部署环境&#xff1a;win2003 配置环境&#xff1a;winxp ip&#xff1a;10.1.1.2 win2003 ip&#xff1a;10.1.1.1 开始安装 双击“应用程序服务器” 双击“Internet 信息服务&#xff08;IIS&#xff09;” 勾选万维网服务&#xff0c;确定然后下一步 查看端口号;netstat …

vue2学习笔记(2/2)

vue2学习笔记&#xff08;1/2&#xff09; vue2学习笔记&#xff08;2/2&#xff09; 文章目录 1. 初始化脚手架2. 分析脚手架&render函数文件结构图示及说明main.jsindex.htmlApp.vueSchool.vueStudent.vue 关于不同版本的Vue修改默认配置vue.config.js配置文件 3. ref属…

【数据结构与算法】——单链表的原理及C语言实现

数据结构与算法——链表原理及C语言实现 链表的原理链表的基本属性设计创建一个空链表链表的遍历&#xff08;显示数据&#xff09;释放链表内存空间 链表的基本操作设计&#xff08;增删改查&#xff09;链表插入节点链表删除节点链表查找节点增删改查测试程序 链表的复杂操作…

当人工智能遇上教育,会擦出怎样的火花?

在这个时代&#xff0c;科技的风暴正以前所未有的速度席卷全球。其中&#xff0c;人工智能&#xff0c;这个被誉为21世纪的“科技之星”&#xff0c;正悄然改变着我们的生活。但是&#xff0c;当人工智能遇上传统教育领域时&#xff0c;你猜会发生什么&#xff1f; 有人说&…

element-ui button 组件源码分享

element-ui button 源码分享&#xff0c;基于对源码的理解&#xff0c;编写一个简单的 demo&#xff0c;主要分三个模块来分享&#xff1a; 一、button 组件的方法。 1.1 在方法这块&#xff0c;button 组件内部通过暴露 click 方法实现&#xff0c;具体如下&#xff1a; 二、…

勇敢的小刺猬

故事名称&#xff1a;《勇敢的小刺猬》 角色&#xff1a; 小明&#xff08;刺猬&#xff09;小鸟森林医生邪恶的狐狸 场景&#xff1a;森林 【场景1&#xff1a;森林里的小路上】 小明&#xff08;边走边哼着歌&#xff09;&#xff1a;今天的阳光真好&#xff0c;真是个适合帮…

盘点那些硬件+项目学习套件:STM32U5单片机开发板及入门常见问题解答

华清远见20岁了~过去3年里&#xff0c;华清远见研发中心针对个人开发板业务&#xff0c;打造了多款硬件项目学习套件&#xff0c;涉及STM32单片机、嵌入式、物联网、人工智能、鸿蒙、ESP32、阿里云IoT等多技术方向。 今天我们来盘点一下&#xff0c;比较受欢迎几款“硬件项目”…

ubuntu22.04安装部署02:禁用显卡更新

一、查看可用显卡驱动 ubuntu-drivers devices 二、查看显卡信息 # -i表示不区分大小写 lspci | grep -i nvidia nvidia-smi 三、查看已安装显卡驱动 cat /proc/driver/nvidia/version 四、锁定显卡升级 使用cuda自带额显卡驱动&#xff0c;居然无法&#xff0c;找到如何锁…

模拟请求ElasticSearch

Step1 安装chrome的这个插件 Step2 打开插件&#xff0c;GET的json填什么。 在IDEA的debug模式&#xff0c;走到Java代码的searchBuilder&#xff0c; 在这个searchBuilder变量里&#xff0c;对里面query变量点右侧 view按钮&#xff0c; IDEA里会显示出一个json&#xff…

ref和reactive

看尤雨溪说&#xff1a;为什么Vue3 中应该使用 Ref 而不是 Reactive&#xff1f;

Multisim14.0仿真(四十二)基于74LS183的8位表决器设计

一、74LS183简介&#xff1a; 74LS183是一种4位高速全加器&#xff0c;用于数字电路中的加法运算。74LS183输入端包括两个4位二进制数和一个进位信号&#xff0c;输出端包括1个4位二进制数和一个进位信号。 74LS138具有快速响应、低功耗灯特点&#xff0c;能实现高校的数字匀速…

接口和抽象类【Java面向对象知识回顾②】

Java中的抽象类和接口是两种常见的抽象概念&#xff0c;它们都能够帮助我们实现抽象化和多态性&#xff0c;但是它们在一些细节上有所不同 抽象类 抽象类是一种特殊的类&#xff0c;不能被实例化&#xff0c;只能被继承。抽象类具有类的所有特性&#xff0c;包括成员变量、成员…

链式二叉树(3)

目录 Main函数 ​ 二叉树第K层的节点个数 整体思路 分析理解 注意事项 二叉树查找值为x的节点 整体思路 分析理解 注意事项 Main函数 #include<stdio.h> #include<stdlib.h> #include<string.h> #include<assert.h> #include<math.h&g…

MATLAB怎么读取txt文件

在MATLAB中可以使用以下几种方式读取txt文本文件: importdata函数 A importdata(data.txt) 这会返回一个包含文本数据的cell数组。 dlmread函数 A dlmread(data.txt,,) 这会将文本文件中的数据读取为数值矩阵,其中’,指定了数据之间的分隔符。 textscan函数 fid fopen(…