Spring Cloud Stream 消息驱动基础入门与实践总结

news2024/11/15 17:15:23

Spring Cloud Stream是用于构建与共享消息传递系统连接的高度可伸缩的事件驱动微服务框架,该框架提供了一个灵活的编程模型,它建立在已经建立和熟悉的Spring熟语和最佳实践上,包括支持持久化的发布/订阅、消费组以及消息分区这三个核心概念。

【1】概念介绍

① 什么是Spring Cloud Stream

官方定义 Spring Cloud Stream 是一个构建消息驱动微服务的框架。为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。简单来讲,就是屏蔽了底层XXMQ,应用层不用关注底层是RabbitMQ还是Kafka 。类似于Spring Data抽离持久层屏蔽底层各种数据库的概念。

应用程序通过 inputs 或者 outputs 来与 Spring Cloud Stream中binder对象交互。通过我们配置来binding(绑定) ,而 Spring Cloud Stream 的 binder对象负责与消息中间件交互。所以,我们只需要搞清楚如何与 Spring Cloud Stream 交互就可以方便使用消息驱动的方式。

通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。

官网文档:https://spring.io/projects/spring-cloud-stream#overview

在这里插入图片描述

② stream如何统一底层差异

在没有绑定器这个概念的情况下,我们的SpringBoot应用要直接与消息中间件进行信息交互的时候,由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性。

Binder可以生成Binding,Binding用来绑定消息容器的生产者和消费者,它有两种类型,INPUT和OUTPUT,INPUT对应于消费者,OUTPUT对应于生产者。

通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离。Stream对消息中间件的进一步封装,可以做到代码层面对中间件的无感知,甚至于动态的切换中间件(rabbitmq切换为kafka),使得微服务开发的高度解耦,服务可以关注更多自己的业务流程。

  • 通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离。

  • 通过向应用程序暴露统一的Channel通道,使得应用程序不需要再考虑各种不同的消息中间件实现。

  • 通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离。

在这里插入图片描述

③ Spring Cloud Stream标准流程设计

Stream中的消息通信方式遵循了发布-订阅模式。

  • Binder:很方便的连接中间件,屏蔽差异
  • Channel:通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过Channel对队列进行配置
  • Source和Sink:简单的可理解为参照对象是Spring Cloud Stream自身,从Stream发布消息就是输出,接受消息就是输入。

在这里插入图片描述

④ 几个API注解

@EnableBinding:指信道channel和exchange绑定在一起。

@StreamListener:监听队列,用于消费者的队列的消息接收

@Input:注解标识输入通道,通过该输入通道接收到的消息进入应用程序。

@Output:注解标识输出通道,发布的消息将通过该通道离开应用程序

下面以RabbitMQ为底层MQ来说明如何使用Stream,当然同样要先安装好RabbitMQ。

【2】消息生产者

① pom依赖

<dependency>
     <groupId>org.springframework.cloud</groupId>
     <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
 </dependency>

② yml配置

server:
  port: 8801

spring:
  application:
    name: cloud-stream-provider
  cloud:
    stream:
      binders: #在此配置要绑定的 rabbitmq的服务信息
        defaultRabbit:  # 表示定义的名称,用于 binding整合
          type: rabbit  # 消息组件类型
          environment:  # 设置rabbitmq相关的环境配置
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # 服务的整合处理
        output: # 输出通道的名称
          destination: studyExchange  #表示要使用的 Exchange 名称定义
          content-type: application/json  # 消息类型
          binder: defaultRabbit
eureka:
  client:
    service-url:
      defaultZone: http://localhost:7001/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 # 设置心跳时间间隔默认30s
    lease-expiration-duration-in-seconds: 5 # 如果超过了5秒的间隔默认90s
    instance-id: send-8001.com  #信息列表显示主机名称
    prefer-ip-address: true # 访问路径变为ip地址

③ 消息服务类

public interface IMessageProvider {
    public String send();
}

@EnableBinding(Source.class)//定义消息推送管道
@Slf4j
public class IMessageProviderImpl implements IMessageProvider {

    @Resource
    private MessageChannel output;//消息发送通道

    @Override
    public String send() {
        String serial = UUID.randomUUID().toString();
        output.send(MessageBuilder.withPayload(serial).build());
        log.info(serial+"***********************");
        return serial;
    }
}

编写控制器发送消息:

@RestController
public class IMessageController {
    @Resource
    private IMessageProvider provider;

    @GetMapping("/sendMessage")
    public String send(){
        return provider.send();
    }
}

【3】消息消费者

① pom依赖

<dependency>
     <groupId>org.springframework.cloud</groupId>
     <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
 </dependency>

② yml配置

server:
  port: 8802

spring:
  application:
    name: cloud-stream-consumer
  cloud:
    stream:
      binders: #在此配置要绑定的 rabbitmq的服务信息
        defaultRabbit:  # 表示定义的名称,用于 binding整合
          type: rabbit  # 消息组件类型
          environment:  # 设置rabbitmq相关的环境配置
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # 服务的整合处理
        input: # 输出通道的名称
          destination: studyExchange  #表示要使用的 Exchange 名称定义
          content-type: application/json  # 消息类型
          binder: defaultRabbit
          group: group1
eureka:
  client:
    service-url:
      defaultZone: http://localhost:7001/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 # 设置心跳时间间隔默认30s
    lease-expiration-duration-in-seconds: 5 # 如果超过了5秒的间隔默认90s
    instance-id: receive-8002.com  #信息列表显示主机名称
    prefer-ip-address: true # 访问路径变为ip地址

③ 消息接收服务

@Component
@Slf4j
@EnableBinding(Sink.class)
public class StreamController {
    @Value("${server.port}")
    private String serverPort;

    @StreamListener(Sink.INPUT)
    public void input(Message<String>message){
        log.info("消费者1号接收到消息"+message.getPayload()+"\t port:"+serverPort);
    }

}

上面【2】【3】即可实现消息的发送和接收,但是假设有多个消费者,我们还要考虑两个问题:消息的重复消费和消息的持久化。

【4】group分组解决重复消费问题

比如在如下场景中,订单系统我们做集群部署,都会从RabbitMQ中获取订单信息,那如果一个订单同时被两个服务获取到,那么就会造成数据错误,我们得避免这种情况。这时我们就可以使用Stream中的消息分组来解决。

在Stream中处于同一个group中的多个消费者是竞争关系,就能够保证消息只会被其中一个应用消费一次。

  • 不同组是可以全面消费的(重复消费),
  • 同一组内会发生竞争关系,只有其中一个可以消费。

也就是说,两个消费者微服务的group定义为同一个,即可以解决重复消费问题

【5】group分组解决消息丢失问题

即,在你服务停机重启期间,消息在不断发送,而服务启动后并没有接收到发送的消息。这是由于你没有配置group属性导致的。

解决方案:配置group属性

spring:
  application:
    name: cloud-stream-consumer
  cloud:
    stream:
      binders: #在此配置要绑定的 rabbitmq的服务信息
        defaultRabbit:  # 表示定义的名称,用于 binding整合
          type: rabbit  # 消息组件类型
          environment:  # 设置rabbitmq相关的环境配置
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # 服务的整合处理
        input: # 输出通道的名称
          destination: studyExchange  #表示要使用的 Exchange 名称定义
          content-type: application/json  # 消息类型
          binder: defaultRabbit
          group: group1 # 这个很重要!!!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1818617.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【OceanBase诊断调优】 —— DDL时报磁盘不足问题排查

1. 背景 由于在4.x的部分版本中&#xff0c;我们对于一些ddl操作还存在磁盘空间放大问题&#xff0c;本文主要介绍了这一类问题的排查。 2. 问题排查 2.1 整体排查链路 2.2 问题现象 DDL过程中报磁盘空间不足&#xff0c;需要确认是否符合预期&#xff0c;如果是符合预期&a…

Java使用swing实现简易计算器

效果如下 代码实现 import javax.swing.*; import java.awt.*; import java.awt.event.ActionEvent; import java.awt.event.ActionListener;public class SimpleCalculator {private JFrame frame;private JTextField numField1;private JTextField numField2;private JTex…

c++中main(int argc, char* argv[])参数详解

目录 一、main函数形式 1.无参数&#xff1a; 2.带有两个参数&#xff1a; 二、参数详解 1.int argc 2.char* argv[] 三、示例演示 一、main函数形式 在C中&#xff0c;main 函数可以有两种常见的参数形式&#xff1a; 1.无参数&#xff1a; 代码如下&#xff1a; i…

手机流畅运行470亿参数大模型,上交大发布PowerInfer-2推理框架,性能提升29倍

苹果一出手&#xff0c;在手机等移动设备上部署大模型迅速成为行业焦点。 目前&#xff0c;移动设备上运行的模型相对较小&#xff08;苹果的是3B&#xff0c;谷歌的是2B&#xff09;&#xff0c;并且消耗大量内存&#xff0c;这在很大程度上限制了其应用场景。 即使是苹果&…

Objective-C基础语言开发来袭,你准备好了吗?

文/ZaiZai 前言 今天小白电脑技术的公众号迎来了一位Objective-C语言开发大神——ZaiZai。接下来有想要学习写插件&#xff08;iOS/macOS/iPadOS/tvOS&#xff09;的小伙伴可以关注微信公众号&#xff0c;教程将持续更新。 ZaiZai个人介绍……呃……他不让放。 Objective-C…

HTML制作一个日蚀的动画特效

大家好&#xff0c;今天制作一个日蚀动画特效&#xff01; 先看具体效果&#xff1a; 使用一个逐渐扩大的圆形阴影来模拟月亮遮挡太阳的效果。使用了CSS的keyframes动画和border-radius属性来创建一个简单的圆形阴影效果。 HTML <!DOCTYPE html> <html lang"e…

阿里巴巴 2024 最新 Java 架构师进阶宝典!助力程序员金九银十面试跳槽涨薪

最近感慨面试难的人越来越多了&#xff0c;一方面是市场环境&#xff0c;更重要的一方面是企业对Java的人才要求越来越高了。 基本上这样感慨的分为两类人&#xff0c;第一&#xff0c;虽然挂着3、5年经验&#xff0c;但肚子里货少&#xff0c;也没啥拿得出手的项目&#xff0c…

C#下WinForm多语种切换

这是应一个网友要求写的&#xff0c;希望对你有所帮助。本文将介绍如何在一个WinForm应用程序中实现多语种切换。通过一个简单的示例&#xff0c;你将了解到如何使用资源文件管理不同语言的文本&#xff0c;并通过用户界面实现语言切换。 创建WinForm项目 打开Visual Studio&a…

26大技巧教你使用好AI大模型

前言 在探索与生成式AI如ChatGPT、Microsoft Copilot等前沿工具交互的过程中&#xff0c;我们不可避免地会遇到一个核心问题——如何编写出既能让大模型轻松理解又能准确执行的prompt。这一挑战不仅要求用户精准把握问题的核心&#xff0c;提炼出简洁明了的关键词&#xff0c;…

若依对数据二次处理导致查询total只有十条的问题处理办法

前言&#xff1a; 在使用若依框架的过程中&#xff0c;如果是查询结果数据直接返回&#xff0c;那么其自带的分页插件可以正常返回数据以及总条数&#xff0c;若是在业务逻辑层对数据进行了其他二次处理&#xff0c;再返回就会出现异常&#xff0c;无论查询了多少条&#xff0…

NVMe全闪存储系统性能测试及产品功能与应用场景

今天我们继续对全闪存储系统GS 5024UE的评测&#xff0c;重点关注GS 5024UE的性能测试数据&#xff0c;以及产品所具备的功能、应用场景。通过Windows IOmeter测试软件&#xff0c;来测试GS 5024UE设备的性能&#xff0c;在机器上配上24颗 NVMe 3.84TB硬盘, 16条32Gb FC数据&am…

Ubuntu安装opendaylight控制器

目录 实验任务 实验环境 安装过程&#xff1a; 将opendaylight添加到环境变量中 实验任务 在虚拟机1中安装opendaylight控制器并安装相应的组件在虚拟机2中使用mininet创建一个测试拓扑并将控制器的地址指向虚拟机1在虚拟机1中的opendaylight的web界面可以查看到创建的拓扑将…

python快速入门之Flask框架

文章目录 一、pip安装二、接口开发三、测试 一、pip安装 pip install flask 二、接口开发 from flask import Flaskapp Flask(__name__)app.route("/test") def index():return "test"if __name__ __main__:app.run()三、测试 http://127.0.0.1:5000…

AI大模型探索之路-实战篇:智能化IT领域搜索引擎之GLM-4大模型技术的实践探索

系列篇章&#x1f4a5; No.文章1AI大模型探索之路-实战篇&#xff1a;智能化IT领域搜索引擎的构建与初步实践2AI大模型探索之路-实战篇&#xff1a;智能化IT领域搜索引擎之GLM-4大模型技术的实践探索3AI大模型探索之路-实战篇&#xff1a;智能化IT领域搜索引擎之知乎网站数据获…

如何高效管理和监控 Elasticsearch 别名及索引?

0、引言 在 Elasticsearch 项目中&#xff0c;管理和监控索引是开发者的一项重要任务。 尤其是当我们需要在项目的管理部分展示索引和别名的统计信息时&#xff0c;了解如何有效地列出这些别名和索引显得尤为重要。 本篇博客将介绍几种在 Elasticsearch 中列出别名和索引的方法…

JAVA小知识18:常用数组操作API之Arrays

在JAVA小知识17中我们详细的讲述了关于数组的定义以及使用方法&#xff0c;今天来讲一个关于操作数组的工具类。java.util.Arrays是一个专门用于操作数组的工具类&#xff0c;它封装了非常多的方法方便我们操作数组。 一、常用方法 方法说明public static String toString(数组…

“全光无线星空”照亮津亚电子智能制造之路

随着第四次工业革命浪潮的到来,智能制造正成为制造业的新常态。工业4.0时代的工厂不再是封闭的制造孤岛,而是通过高度的数字化和网络化,实现生产过程的智能化、自动化和灵活化。在这样的大趋势下,制造业正经历着从传统制造向智能制造的深刻转型,数字化车间和智能化生产线成为推…

在Dataworks调度里检查上游表的分区是否已经产出

在Dataworks调度里检查上游表的分区是否已经产出 新建PyOdps3节点&#xff0c;贴如如下代码&#xff1a; import sys import time from datetime import datetimebizdate args[bizdate] if not o.exist_table(args[table]):sys.exit(1)# 设置结束时间为今天的20:00 end_time …

Linux-笔记 全志平台OTG虚拟 串口、网口、U盘笔记

前言&#xff1a; 此文章方法适用于全志通用平台&#xff0c;并且三种虚拟功能同一时间只能使用一个&#xff0c;原因是此3种功能都是内核USB Gadget precomposed configurations的其中一个选项&#xff0c;只能单选&#xff0c;不能多选&#xff0c;而且不能通过修改配置文件去…

入门 Axure RP 9 | 原型设计基础教程

选择正确的原型设计工具并非易事&#xff0c;Axure RP 9能够快速完成原型设计。原型设计是一种经过时间考验的方法&#xff0c;可以将你的设计快速放置在用户的设备并交到他们手中。替代Axure RP 9的原型设计工具即时设计是一个完全集成的协同设计工具&#xff0c;无需使用不同…