kafka消息队列最常用的两种模式,以及应用场景

news2025/2/3 10:06:44

目录

一、发布-订阅模式

二、点对点模式

三、应用场景


 

一、发布-订阅模式

发布-订阅模式是最常见的消息传递模式,其中消息发布者将消息发送到一个或多个主题(Topic),而订阅者可以选择订阅一个或多个主题来接收消息。每个订阅者都可以独立地消费消息,而发布者和订阅者之间没有直接的联系。

在Kafka中,使用KafkaProducer类进行消息发布,KafkaConsumer类进行消息订阅。以下是一个简单的Java代码示例:

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Properties;

public class PubSubExample {
    private static final String TOPIC = "my_topic";
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";

    public static void main(String[] args) {
        // Kafka Producer
        Properties producerProps = new Properties();
        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);

        // Publish messages
        for (int i = 0; i < 10; i++) {
            String message = "Message " + i;
            ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, message);
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception != null) {
                        System.err.println("Error publishing message: " + exception.getMessage());
                    } else {
                        System.out.println("Message published successfully: " + metadata.offset());
                    }
                }
            });
        }

        producer.close();

        // Kafka Consumer
        Properties consumerProps = new Properties();
        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "my_consumer_group");
        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
        consumer.subscribe(Collections.singletonList(TOPIC));

        // Consume messages
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("Received message: " + record.value());
                // Process the message
            }
        }
    }
}

 

二、点对点模式

点对点模式中,消息发送者将消息发送到一个指定的队列(Queue),而消息接收者从相同的队列中接收消息。每个消息只能被一个接收者消费。

在Kafka中,点对点模式可以通过创建单个消费者组来实现。以下是一个简单的Java代码示例:

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Properties;

public class PointToPointExample {
    private static final String QUEUE = "my_queue";
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";

    public static void main(String[] args) {
        // Kafka Producer
        Properties producerProps = new Properties();
        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);

        // Publish messages
        for (int i = 0; i < 10; i++) {
            String message = "Message " + i;
            ProducerRecord<String, String> record = new ProducerRecord<>(QUEUE, message);
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception != null) {
                        System.err.println("Error publishing message: " + exception.getMessage());
                    } else {
                        System.out.println("Message published successfully: " + metadata.offset());
                    }
                }
            });
        }

        producer.close();

        // Kafka Consumer
        Properties consumerProps = new Properties();
        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "my_consumer_group");
        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
        consumer.subscribe(Collections.singletonList(QUEUE));

        // Consume messages
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("Received message: " + record.value());
                // Process the message
                consumer.commitAsync();
            }
        }
    }
}

以上代码示例演示了如何使用Kafka的Java客户端库进行发布和订阅消息以及点对点消息传递。请注意,代码中的BOOTSTRAP_SERVERS需要根据你的实际环境进行配置。

 

三、应用场景

Kafka消息队列具有高吞吐量、低延迟、可扩展性等特点,因此广泛应用于以下场景:

  1. 日志收集和数据管道:Kafka可以用作集中式日志收集系统,可以将不同服务、应用程序、服务器生成的日志集中到一个中心化的消息队列中,再通过消费者进行处理、分析和存储。同时,Kafka还可以作为数据管道,将不同数据源的数据通过消息队列进行传输和处理。

  2. 实时流处理:Kafka与流处理框架(如Apache Flink、Apache Spark)结合使用,可以实现实时的数据流处理。Kafka可以作为输入源和输出源,将数据流传输给流处理框架进行实时分析、计算和处理。

  3. 微服务架构:Kafka可以用作微服务之间的异步通信机制,不同的微服务各自独立地生产和消费消息,实现解耦和扩展性。同时,Kafka还可以用于实现事件驱动架构,不同的微服务通过订阅事件的方式进行通信和协作。

  4. 网络爬虫和数据采集:Kafka可以用于构建高可靠的网络爬虫系统和数据采集系统。爬虫可以将抓取的数据写入Kafka队列,然后其他系统可以消费这些数据进行进一步的处理和分析。

  5. 消息系统和通信中间件:Kafka提供了可靠的消息传递机制,可以作为消息系统和通信中间件,用于构建分布式系统、实现异步通信和跨系统的数据传输。

总之,Kafka消息队列的应用场景非常广泛,适用于大数据处理、实时数据流处理、异步通信等各种场景。它具有高性能、可靠性和可扩展性的特点,可以帮助解决数据流处理和消息传递的各种问题。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/762204.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

在嵌入式系统开发培训中常用的数据库有哪些种?

数据库是一种储存和管理、组织数据的仓库&#xff0c;在嵌入式开发当中起到至关重要的作用。一个在嵌入式培训中&#xff0c;我们可学习使用的数据库有多种&#xff0c;每种数据库都会呈现出不同的一面&#xff0c;那么我们在嵌入式系统开发培训中可用到的数据库都有哪几种&…

JQuery(二):DOM操作、动画、遍历、事件绑定

1.DOM操作 1.1内容操作 html(): 获取/设置元素的标签体内容 <a><font>内容</font></a> --> <font>内容</font>text(): 获取/设置元素的标签体纯文本内容 <a><font>内容</font></a> --> 内容val()&am…

RK3588+FPGA视频实时处理与双屏显示、存储解决方案

主板平台的主要功能电路示意图 在ARM端: 脚踏开关是电平输入10 口&#xff0c;双路。 触摸面板与主板的连接方式为 UART 外加12V 电源。 键盘为自开发产品&#xff0c;通过USB透传 UART&#xff0c;并传递12V电源USB、千兆网络为主板上的接口&#xff0c;USB 为3.0版本host 接口…

Hadoop 之 单机部署和测试(一)

Hadoop单机部署和测试 一.单机部署1.安装 JDK&#xff08;JDK11&#xff09;2.安装 HADOOP3.测试 一.单机部署 系统版本&#xff1a;cat /etc/anolis-release1.安装 JDK&#xff08;JDK11&#xff09; #!/bin/bashTOP_PATH$(pwd) JAVA_PATH/usr/local/java FILEls $TOP_PATH/…

【Linux | Shell】结构化命令2 - test命令、方括号测试条件、case命令

目录 一、概述二、test 命令2.1 test 命令2.2 方括号测试条件2.3 test 命令和测试条件可以判断的 3 类条件2.3.1 数值比较2.3.2 字符串比较 三、复合条件测试四、if-then 的高级特性五、case 命令 一、概述 上篇文章介绍了 if 语句相关知识。但 if 语句只能执行命令&#xff0c…

兴达易控modbus转profinet网关与三菱变频器通讯

本案例分享兴达易控modbus转profinet网关&#xff08;MDPN100&#xff09;连接西门子1200plc&#xff0c;实现三菱变频器485通讯兼容转modbusTCP通信&#xff0c;在博图中配置。 拓展图 打开博图&#xff0c;并添加PLC 加载由兴达易控免费提供的modbus转profinet GSD文件 安装网…

基于MSP432P401R送药小车【2021年电赛F题】

文章目录 一、任务清单1. 硬件部分2. 软件部分 二、神经网络训练1. 创建数据集2. 数据采集3. 数字训练 三、OpenMV数字及其坐标识别四、巡线1. 直行2. 转向3. 停止 五、路口判断与原路径返回六、技术交流 由于前边已经用MSP430做过一遍该赛题了&#xff0c;这里就不再重复叙述赛…

Java培训:什么是Busy spin?为什么要使用Busy spin?

Busy spin(繁忙自旋)是一种线程等待的技术&#xff0c;它通过循环检查条件来等待某个事件或条件的发生&#xff0c;而不进行阻塞或休眠。 通常情况下&#xff0c;线程等待事件发生的方式是使用阻塞或休眠操作&#xff0c;这样线程会释放CPU资源&#xff0c;其他线程可以继续执行…

Qt6 Qt Quick UI原型学习QML第二篇

Qt6 Qt Quick UI原型学习QML第二篇 界面效果QML语法语法讲解核心要素项目元素矩形元素文本元素图像元素MouseArea元素 界面效果 QML语法 import QtQuick 2.12 import QtQuick.Window 2.12Window {id: rootvisible: truewidth: 640height: 480title: qsTr("QML学习第二篇&…

【题解】 模拟赛2 题解

T1 假设商品价格为x 618:int(x*0.66) 211:x-(x/100)*35 两者比较一下大小即可 #include<bits/stdc.h> using namespace std;int x,x1,x2;int main(){scanf("%d",&x);x1 x*0.66;x2 x-(x/100)*35;if (x1 x2) printf("both\n%d",x1);if (x1 &g…

浏览器打开PDF标题乱码

问题 使用 itext5 用pdf模板生成预览pdf乱码问题 解决办法 使用pdf编辑器打开之后&#xff0c;选择 文件>> 属性&#xff0c; 修改乱码的标题。

【业务功能篇45】SSM整合shiro项目:web.xml执行顺序

web.xml 的加载顺序是&#xff1a;ServletContext -> context-param -> listener -> filter -> servlet 学习shiro时&#xff0c;需要配置shiro &#xff0c;我们需要在filter过滤器之前&#xff0c;先初始化好shiro组件&#xff0c;不然请求认证无法走到shiro,根据…

plt.text()函数解析

plt.text(x, y, s, fontsize, verticalalignment,horizontalalignment,rotation , *kwargs) 参数&#xff1a; x,y:表示坐标值上的值s:表示说明文字fontsize:表示字体大小verticalalignment&#xff1a;垂直对齐方式 &#xff0c;参数&#xff1a;[ ‘center’ | ‘top’ | ‘…

【公益】Q学友联合东湖街道开展“星级大厨来做客”技能培训活动

“大家一定要用温水和面&#xff0c;和面时要注意方向和力度&#xff0c;往同一个方向揉面……”在东湖街道综合文体服务中心一楼的中式面点培训现场&#xff0c;飘荡着阵阵面香&#xff0c;充斥着欢声笑语。 为进一步丰富居民业余文化生活&#xff0c;提高灵活就业人员的职业技…

手把手教你搭建SpringCloud项目:什么是微服务?一看就会系列!

什么是微服务&#xff1f;一看就会系列&#xff01; 一、手把手教你搭建SpringCloud项目&#xff08;一&#xff09;图文详解&#xff0c;傻瓜式操作 二、手把手教你搭建SpringCloud项目&#xff08;二&#xff09;生产者与消费者 三、手把手教你搭建SpringCloud项目&#x…

mpVue 微信小程序基于vant-weapp 组件的二次封装TForm 表单组件(修改源码插槽使用)

一、前言 1、mpVue微信小程序不支持动态组件&#xff08;<component> &#xff09; 2、mpVue微信小程序不支持动态属性及事件穿透&#xff08;$attrs和$listeners&#xff09; 3、mpVue微信小程序不支持render函数 二、最终效果 三、配置参数&#xff08;Attributes&…

Qt6 Qt Quick UI原型学习QML第三篇

文章目录 效果QML代码ClickableImage.qml文件Image&#xff08;图片&#xff09;元素 解释 MyQML.qml文件 解释&#xff1a;Window元素、Item元素、Image元素、MouseArea元素、Column元素、Row元素、Grid元素、Flow元、Repeater元素 效果 QML代码 ClickableImage.qml文件 图像…

[JavaScript] 第三章 Chrome 浏览器中执行 JavaScript

系列文章目录 [JavaScript] 第一章 暂无 [JavaScript] 第一章 暂无 [JavaScript] 第三章 Chrome 浏览器中执行 JavaScript 文章目录 系列文章目录前言1、准备工作1.1、创建html工程1.2、创建html文件夹&#xff0c;存放html文件1.3、创建JavaScript演示html1.4、通过idea打开页…

Unity打包窗口化放大、缩小、拖拽功能、无边框设置 C#

Unity打包Windows窗口实现放大、缩小、拖拽、无边框 文章目录 Unity打包Windows窗口实现放大、缩小、拖拽、无边框前言一、引入 user32.dll二、使用步骤1.引入库2.功能封装3.效果图如下&#xff0c;绑定自定义按钮 总结 前言 Unity无边框设置、窗口化放大、缩小、拖拽 提示&am…

蓝牙耳机选购攻略:开放式耳机篇!如何选购开放式耳机?开放式蓝牙耳机哪些品牌比较好?过来人告诉你如何选购开放式耳机!

作为一个耳机爱好者&#xff0c;最近更是喜欢上了开放式蓝牙耳机&#xff0c;实际用过的起码有十几款&#xff0c;但其实最终能留下来的也只有四五款。由于前期并不知道应该如何选择开放式耳机&#xff0c;经常都会高价买到些质量差、音质也不好、漏音大的开放式耳机&#xff0…