【Spring Cloud Stream 消息驱动】 —— 每天一点小知识

news2025/1/1 23:05:54

在这里插入图片描述

                                                                              💧 S p r i n g C l o u d S t r e a m 消息驱动 \color{#FF1493}{Spring Cloud Stream 消息驱动} SpringCloudStream消息驱动💧          


🌷 仰望天空,妳我亦是行人.✨
🦄 个人主页——微风撞见云的博客🎐
🐳 《数据结构与算法》专栏的文章图文并茂🦕生动形象🦖简单易学!欢迎大家来踩踩~🌺
💧 《Java学习笔记》专栏的文章是本人在Java学习中总结的一些知识点~ 💐
🥣 《每天一点小知识》专栏的文章可以丰富你的知识库,滴水成河~ 🌊
🪁 希望本文能够给读者带来一定的帮助~🌸文章粗浅,敬请批评指正!🐥


文章目录

  • 🐳Spring Cloud Stream 消息驱动
    • 消息驱动概述
    • 案例说明
    • 消息驱动之生产者
    • 消息驱动之消费者
    • 分组与持久化
      • 消费者分组
      • 消息持久化
    • 总结
  • 🐳结语


🐳Spring Cloud Stream 消息驱动

在微服务架构中,消息驱动是一种常见的通信方式,它通过解耦和异步处理提供了可靠的服务间通信机制。Spring Cloud Stream 是 Spring Cloud 生态系统中的一个模块,它提供了一种简化和统一的方式来构建基于消息驱动的应用程序。本文将介绍 Spring Cloud Stream 的基本概念和用法,并通过一个案例来说明如何使用 Spring Cloud Stream 实现消息驱动。


消息驱动概述

  💧在传统的应用程序架构中,服务间的通信通常是通过直接调用来完成的,这种紧耦合的方式会导致系统的可扩展性和灵活性受限。而消息驱动则提供了一种解耦的方式,将消息作为信息载体,在不同的服务之间传递和处理。消息驱动的架构模式具有以下优势:

  • 解耦性:消息驱动通过将消息作为中介来实现服务之间的通信,服务之间不直接依赖于彼此的存在和实现细节,从而实现解耦。
  • 异步处理:消息驱动允许发送方将消息发送到消息队列后立即返回,而不需要等待接收方处理完毕。这种异步处理的方式可以提高系统的响应性和吞吐量。
  • 可靠性:消息驱动使用消息队列来存储和传递消息,消息队列通常具备持久化、可靠性和高可用性的特性,从而确保消息的可靠传递。

  💧Spring Cloud Stream 提供了一种简化和统一的编程模型,使得开发人员可以更轻松地使用消息驱动来构建应用程序。它提供了抽象的消息绑定层,使得应用程序可以与不同的消息中间件(如 RabbitMQ、Kafka 等)进行集成,而无需关心底层消息中间件的细节。

案例说明

  💧我们以一个在线商城的订单系统为例来说明如何使用 Spring Cloud Stream 实现消息驱动。订单系统包括两个微服务:订单服务和库存服务。当用户下单时,订单服务将订单信息发布到消息队列中,库存服务订阅订单消息并处理库存扣减操作。

  💧接下来,让我们一步一步实现这个案例,并使用 RabbitMQ 作为消息中间件。

消息驱动之生产者

  💧首先,我们需要创建订单服务作为消息驱动的生产者。订单服务负责将订单信息发布到消息队列中。

  1. 在您的订单服务项目中,添加以下依赖:
<

dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
  1. application.properties 文件中添加 RabbitMQ 的连接信息:
spring.cloud.stream.bindings.output.destination=orders
spring.cloud.stream.bindings.output.content-type=application/json
spring.rabbitmq.host=<RabbitMQ 主机名>
spring.rabbitmq.port=<RabbitMQ 端口>
spring.rabbitmq.username=<RabbitMQ 用户名>
spring.rabbitmq.password=<RabbitMQ 密码>
  1. 创建一个名为 OrderProducer 的类,用于发送订单消息:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

@Component
@EnableBinding(Source.class)
public class OrderProducer {

    private final Source source;

    @Autowired
    public OrderProducer(Source source) {
        this.source = source;
    }

    public void sendOrderMessage(Order order) {
        source.output().send(MessageBuilder.withPayload(order).build());
    }
}
  1. 创建一个名为 Order 的类,用于表示订单信息:
public class Order {

    private String orderId;
    private String customerId;
    // 其他订单属性和方法省略...

    // 构造函数、getter、setter 省略...
}
  1. 在需要发送订单消息的地方,使用 OrderProducer 发送消息。例如,在一个订单控制器中:
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class OrderController {

    private final OrderProducer orderProducer;

    public OrderController(OrderProducer orderProducer) {
        this.orderProducer = orderProducer;
    }

    @PostMapping("/place-order")
    public String placeOrder(@RequestBody Order order) {
        // 处理订单逻辑...

        // 发送订单消息
        orderProducer.sendOrderMessage(order);

        return "Order placed successfully";
    }
}

  💧通过完成上述步骤,订单服务就成为了消息驱动的生产者,它将订单信息发布到名为 “orders” 的消息队列中。

  💧接下来,我们将创建库存服务作为消息驱动的消费者,并处理订单消息中的库存扣减操作。

消息驱动之消费者

  💧我们将创建库存服务作为消息驱动的消费者,它将订阅订单消息并处理库存扣减操作。

  1. 在库存服务项目中,添加以下依赖:
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
  1. application.properties 文件中添加 RabbitMQ 的连接信息,与订单服务中的配置保持一致。

  2. 创建一个名为 OrderConsumer 的类,用于接收和处理订单消息:

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.stereotype.Component;

@Component
@EnableBinding(Sink.class)
public class OrderConsumer {

    @StreamListener(Sink.INPUT)
    public void processOrderMessage(Order order) {
        // 处理订单消息,执行库存扣减操作
        // ...

        // 打印日志
        System.out.println("Received order: " + order.getOrderId());
    }
}
  1. 启动库存服务应用程序。库存服务现在已经成为消息驱动的消费者,并订阅了名为 “orders” 的消息队列。每当有订单消息到达队列时,processOrderMessage 方法将被调用,并处理相应的库存扣减操作。

  💧通过以上步骤,我们完成了消息驱动的生产者和消费者的搭建。订单服务作为生产者将订单消息发布到消息队列中,而库存服务作为消费者订阅订单消息并处理相应的业务逻辑。

分组与持久化

  💧Spring Cloud Stream 还提供了一些高级功能,例如消费者分组和消息持久化,以满足更复杂的应用需求。

消费者分组

  💧消费者分组可以确保相同分组名称的消费者实例共享消息的处理负载。这对于水平扩展和负载均衡非常有用。

  💧要为消费者设置分组,只需在消费者类上添加 @EnableBinding 注解,并在 @StreamListener 注解中指定分组名称,如下所示:

@Component
@EnableBinding(Sink.class)
public class OrderConsumer {

    @StreamListener(target = Sink.INPUT, condition = "headers['type']=='order' and headers['group']=='group1'")
    public void processOrderMessage(Order order) {
        // 处理订单消息
        // ...
    }
}

消息持久化

  💧消息持久化是确保消息在发生故障或重启后仍然可靠地传递的重要机制。Spring Cloud Stream 默认情况下会将消息持久化到消息中间件中,但需要确保消息中间件也配置了持久化机制。

  💧例如,对于 RabbitMQ,可以通过在 application.properties 文件中设置以下属性来启用消息持久化:

spring.cloud.stream.rabbit.bindings.input.consumer.de

clare-durable-queue=true
spring.cloud.stream.rabbit.bindings.input.consumer.durable-subscription=true

  💧这将确保消费者队列和订阅是持久化的。

总结

通过使用 Spring Cloud Stream,我们可以轻松构建基于消息驱动的应用程序。本文介绍了消息驱动的概念,通过一个在线商城的订单系统案例演示了如何使用 Spring Cloud Stream 来实现消息驱动。我们创建了订单服务作为消息驱动的生产者,将订单信息发布到消息队列中;同时创建了库存服务作为消息驱动的消费者,订阅订单消息并处理库存扣减操作。此外,还介绍了消费者分组和消息持久化的高级功能。


在这里插入图片描述


🐳结语

🐬初学一门技术时,总有些许的疑惑,别怕,它们是我们学习路上的点点繁星,帮助我们不断成长。

🐟积少成多,滴水成河。文章粗浅,希望对大家有帮助!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/675585.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ECharts数据可视化

目录 第一章 什么是ECharts 第二章 搭建环境 2.1 Echarts的下载 2.2 Visual Studio Code下载 第三章 一个简单的可视化展示 第四章 Echarts组件 4.1 标题 4.2 提示框 4.3 工具栏 4.4 图例 4.5 时间轴 4.6 数据区域缩放 4.6.1 滑动条型数据区域缩放 4.6.2 内置型…

chatgpt赋能python:烧录单片机程序:Python的力量

烧录单片机程序&#xff1a;Python的力量 随着技术的发展和人类渴求的不断追求&#xff0c;电子设备的普及程度越来越高。在一个电子设备内部&#xff0c;单片机的应用非常广泛。然而&#xff0c;单片机作为计算机的重要组成部分&#xff0c;也需要相对应的程序来实现不同的功…

实战:Gradle构建工具实践-2023.6.22(测试成功)

实战&#xff1a;Gradle构建工具实践-2023.6.22(测试成功) 目录 推荐文章 https://www.yuque.com/xyy-onlyone/aevhhf?# 《玩转Typora》 实验环境 gitlab/gitlab-ce:15.0.3-ce.0 jenkins/jenkins:2.346.3-2-lts-jdk11 gradle-7.6.1 openjdk 11.0.18实验软件 链接&#xff1…

x-s参数逆向

x-s参数逆向[2023.6.22] 1.提要 众所周知&#xff0c;此次的加密逻辑进入一个叫window._webmsxyw()的函数里面 该函数是封装在一个自执行函数内部&#xff0c;并添加到了window属性里&#xff0c;下面是两种获取思路。 2.扣环境 扣环境的话&#xff0c;只需要在jsdom的docu…

内存耗尽后Redis会发生什么?

作为一台服务器来说&#xff0c;内存并不是无限的&#xff0c;所以总会存在内存耗尽的情况&#xff0c;那么当 Redis 服务器的内存耗尽后&#xff0c;如果继续执行请求命令&#xff0c;Redis 会如何处理呢&#xff1f; 内存回收 使用Redis 服务时&#xff0c;很多情况下某些键…

2023 node 接入腾讯云短信服务,实现发送短信功能

1、在 腾讯云开通短信服务&#xff0c;并申请签名和正文模板 腾讯云短信 https://console.cloud.tencent.com/smsv2 a、签名即是短信的开头。例如 【腾讯云短信】xxxxxxx&#xff1b; b、正文模板即短信内容&#xff0c; 变量部分使用{1}&#xff0c; 数字从1开始累推。例如&a…

Golang每日一练(leetDay0104) 最小高度树、戳气球

目录 310. 最小高度树 Minimum Height Trees &#x1f31f;&#x1f31f; 312. 戳气球 Burst Balloons &#x1f31f;&#x1f31f;&#x1f31f; &#x1f31f; 每日一练刷题专栏 &#x1f31f; Rust每日一练 专栏 Golang每日一练 专栏 Python每日一练 专栏 C/C每日一…

MindSpore-TOOD模型权重迁移推理对齐实录

准备工作 环境&#xff1a; wsl2 Ubuntu 20.04 mindspore 2.0.0 python 3.8 pytorch 2.0.1 cpu 基于自己编写的mindspore TOOD项目和MMDetection实现的pytorch权重来做迁移&#xff0c; TOOD论文pytorch mmdetection实现 tood_r50_fpn_1x_coco权重 论文中的代码也是用mmdet…

浅谈前后端交互的基本原理

本文受众人群&#xff1a; 前端/后端开发工程师&#xff1b;Web应用程序设计师&#xff1b;项目经理&#xff1b;产品经理等。 为什么要去了解&#xff1f; 了解前后端交互的基本原理对于从事与Web开发相关的角色的人群是非常重要的。这包括前端开发工程师、后端开发工程师、全…

【Java高级语法】(十三)注解:解码程序设计中的元数据利器,在小小的@符里挖呀挖呀挖~用小小的注解做强大的开发...

Java高级语法详解之注解 1️⃣ 概念2️⃣ 优势和缺点3️⃣ 使用3.1 元注解3.2 自定义注解3.3 常用内置注解 4️⃣ 应用场景5️⃣ 扩展&#xff1a;那些流行框架中的注解&#x1f33e; 总结 1️⃣ 概念 Java 注解&#xff08;Annotation&#xff09; 是Java语言中一种元数据形式…

chatgpt赋能python:Python爬虫速度分析:如何加速你的爬虫?

Python爬虫速度分析&#xff1a;如何加速你的爬虫&#xff1f; Python作为一种优秀的胶水语言&#xff0c;被广泛应用于web开发、数据处理等众多领域。在众多应用场景中&#xff0c;Python爬虫无疑是其中之一。然而&#xff0c;在爬取海量数据时&#xff0c;爬虫的速度往往成为…

Arthas原理分析

在日常开发中&#xff0c;经常会使用到arthas排查线上问题&#xff0c;觉得arthas的功能非常强大&#xff0c;所以打算花了点时间了解一下其实现原理。并试着回答一下使用Arthas时存在的一些疑问。 Arthas主要基于是Instrumentation JavaAgent Attach API ASM 反射 OGNL等…

chatgpt赋能python:Python点的用法

Python点的用法 作为一名有着10年Python编程经验的工程师&#xff0c;我发现很多初学者对Python的点(.)用法存在疑惑。因此&#xff0c;在这篇文章中&#xff0c;我将详细介绍Python点的用法&#xff0c;并希望能够对这个问题有一个全面的认识。 什么是点 在Python中&#x…

Linux Xshell配置public key实现免密登录linux服务器

linux服务器安装成功后&#xff0c;登录linux服务器的工具有很多中&#xff0c;例如&#xff1a;Xshell、SecureCRT等等。而我所服务的用户使用xshell工具来对linux服务器进行运维。 当使用xshell登录linux服务器时&#xff0c;xshell提供了三种身份验证方式&#xff1a; 1.P…

实战:Maven构建工具实践-2023.6.21(测试成功)

实战&#xff1a;Maven构建工具实践-2023.6.21(测试成功) 目录 推荐文章 https://www.yuque.com/xyy-onlyone/aevhhf?# 《玩转Typora》 实验环境 gitlab/gitlab-ce:15.0.3-ce.0 jenkins/jenkins:2.346.3-2-lts-jdk11 apache-maven-3.9.2 openjdk 11.0.18实验软件 链接&…

对centOS的home目录进行扩容。

对centos的home目录进行扩容 1 首先要了解PV\VG\LV的含义1.1 基本概念1.2 基本命令行 2 实际操作2.1 盘符当前现状2.1实操 1 首先要了解PV\VG\LV的含义 1.1 基本概念 物理卷&#xff08;Physical Volume&#xff0c;PV&#xff09; 指磁盘分区或从逻辑上与磁盘分区具有同样功能…

SPSS统计教程:卡方检验

本文简要的介绍了卡方分布、卡方概率密度函数和卡方检验&#xff0c;并通过SPSS实现了一个卡方检验例子&#xff0c;不仅对结果进行了解释&#xff0c;而且还给出了卡方、自由度和渐近显著性的计算过程。本文用到的数据"2.2.sav"链接为: https://url39.ctfile.com/f/…

菲涅尔圆孔衍射matlab完整程序分享

根据惠更斯 &#xff0d; 菲涅耳原理&#xff0c;光的衍射是光束内部的次波之间的相干叠加&#xff0c;衍射光波场的光振动符合菲涅耳积分公式。但直接运用菲涅耳积分公式计算衍射光场是很困难的。对于夫琅和费衍射(远场衍射)&#xff0c;在光源和接收屏距离衍射屏均为无穷远的…

实战:k8s证书续签-2023.6.19(测试成功)

实战&#xff1a;k8s证书续签-2023.6.19(测试成功) 目录 推荐文章 https://www.yuque.com/xyy-onlyone/aevhhf?# 《玩转Typora》 1、前言 k8s集群核心的证书有2套&#xff0c;还有1套非核心的(即使出问题也问题不大)。 ⚠️ 如果是kubeadm搭建的k8s集群&#xff0c;其有效期为…

chatgpt赋能python:Python烧录单片机:快速的开发工具

Python烧录单片机&#xff1a;快速的开发工具 简介 Python是一种高级的编程语言&#xff0c;被广泛应用于各种领域&#xff0c;包括机器学习、数据分析和物联网等领域。Python的易用性和简洁性已经成为其成功的关键因素之一。Python也能在烧录单片机时提供极大的方便性和灵活…