spring-cloud-stream

news2024/12/29 8:58:27

系列文章目录

第一章 Java线程池技术应用
第二章 CountDownLatch和Semaphone的应用
第三章 Spring Cloud 简介
第四章 Spring Cloud Netflix 之 Eureka
第五章 Spring Cloud Netflix 之 Ribbon
第六章 Spring Cloud 之 OpenFeign
第七章 Spring Cloud 之 GateWay
第八章 Spring Cloud Netflix 之 Hystrix
第九章 代码管理gitlab 使用
第十章 SpringCloud Alibaba 之 Nacos discovery
第十一章 SpringCloud Alibaba 之 Nacos Config
第十二章 Spring Cloud Alibaba 之 Sentinel
第十三章 JWT
第十四章 RabbitMQ应用
第十五章 RabbitMQ 延迟队列
第十六章 spring-cloud-stream

在这里插入图片描述


文章目录

  • 系列文章目录
    • @[TOC](文章目录)
  • 前言
  • 1、stream设计思想
  • 2、编码常用的注解
  • 3、编码步骤
    • 3.1、添加依赖
    • 3.2、修改配置文件
    • 3.3、生产
    • 3.4、消费
    • 3.5、延迟队列
      • 3.5.1、修改配置文件
      • 3.5.2、生产端
      • 3.5.2、消息确认机制 消费端
  • 总结

前言

https://github.com/spring-cloud/spring-cloud-stream-binder-rabbit
官方定义Spring Cloud Stream是一个构建消息驱动微服务的框架。应用程序通过inputs或者outputs来与Spring Cloud Stream中binder对象交互。通过我们配置来binding(绑定),而Spring Cloud Stream的binder对象负责与消息中间件交互。

SpringCloud stream通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。Spring Cloud Stream为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。

Stream让我们不再关注具体MQ的细节我们只需要用一种适配绑定的方式,自动的给我们在各种MQ内切换,总的来说Stream能够屏蔽底层消息中间件的差异、降低切换成本,是统一消息的编程模型。

1、stream设计思想

在这里插入图片描述
在这里插入图片描述

  • Binder:很方便的连接中间件,屏蔽差异
  • Channel:通道是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过Channel对队列进行配置
  • Source和Sink:简单的可理解为参照对象是Spring Cloud Stream自身,从Stream发布消息就是输出,接受消息就是输入。

2、编码常用的注解

在这里插入图片描述

组成说明
Middleware中间件,目前只支持RabbitMQ和Kafka
BinderBinder是应用与消息中间件之间的封装,目前实现了Kafka和RabbitMQ的Binder,通过BInder可以很方便的连接中间件,可以动态的改变消息类型(对应于Kafka的topic,RabbitMQ的exchange),这些都可以通过配置文件来实现。
@Input注解标识输入通道,通过该输入通道接收到的消息进入应用程序
@Output注解标识输出通道,发布的消息将通过该通道离开应用程序
@StreamListener监听队列,用于消费者的队列的消息接收
@EnableBinding指信道channel和exchange绑定在一起

3、编码步骤

3.1、添加依赖

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

3.2、修改配置文件

server:
  port: 8088

spring:
  cloud:
    stream:
      binders: #需要绑定的rabbitmq的服务信息
        defaultRabbit:  #定义的名称,用于bidding整合
          type: rabbit  #消息组件类型
          environment:  #配置rabbimq连接环境
            spring:
              rabbitmq:
                host: localhost   #rabbitmq 服务器的地址
                port: 5672           #rabbitmq 服务器端口
                username: tiger       #rabbitmq 用户名
                password: tiger       #rabbitmq 密码
                virtual-host: tiger_vh  #虚拟路径
      bindings:        #服务的整合处理
        saveOrderOutput:    #这个是消息通道的名称 --->保存订单输出通道
          destination: exchange-saveOrder     #exchange名称,交换模式默认是topic;把SpringCloud stream的消息输出通道绑定到RabbitMQ的exchange-saveOrder交换器。
          content-type: application/json      #设置消息的类型,本次为json
          default-binder: defaultRabbit
          group: saveOrderGroup               #分组
        saveOrderInput: #生产者绑定,这个是消息通道的名称---> 保存订单输入通道
          destination: exchange-saveOrder     #exchange名称,交换模式默认是topic;把SpringCloud stream的消息输出通道绑定到RabbitMQ的exchange-saveOrder交换器。
          content-type: application/json      #设置消息的类型,本次为json
          default-binder: defaultRabbit
          group: saveOrderGroup               #分组

3.3、生产

/**
 * 订单消息输出通道处理器
 */
@Component
public interface OrderOutputChannelProcesor {
    @Output("saveOrderOutput")
    MessageChannel saveOrderOutput();
}

@Slf4j
@EnableBinding(OrderOutputChannelProcesor.class)
public class OrderMessageProducer {
    @Autowired
    @Output("saveOrderOutput")
    private MessageChannel messageChannel;

    public void sentMsg(UserInfo userInfo){
        messageChannel.send(MessageBuilder.withPayload(userInfo).build());
        log.info("消息发送成功:" + userInfo);
    }
}

3.4、消费

/**
 * 订单消息输入通道处理器
 */
@Component
public interface OrderInputChannelProcesor {
    @Input("saveOrderInput")
    SubscribableChannel saveOrderInput();
}
@Slf4j
@EnableBinding(OrderInputChannelProcesor.class)
public class OrderMessageConsumer {
    @StreamListener("saveOrderInput")
    public void receiveMsg(Message<UserInfo> userInfoMessage){
        log.info("接收消息成功:" + userInfoMessage.getPayload());
    }
}

3.5、延迟队列

安装延迟队列插件:
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.11.1/rabbitmq_delayed_message_exchange-3.11.1.ez
下载解压,到plugins目录,执行以下的命令:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

3.5.1、修改配置文件

server:
  port: 8088

spring:
  cloud:
    stream:
      binders: #需要绑定的rabbitmq的服务信息
        defaultRabbit:  #定义的名称,用于bidding整合
          type: rabbit  #消息组件类型
          environment:  #配置rabbimq连接环境
            spring:
              rabbitmq:
                host: localhost   #rabbitmq 服务器的地址
                port: 5672           #rabbitmq 服务器端口
                username: tiger       #rabbitmq 用户名
                password: tiger       #rabbitmq 密码
                virtual-host: tiger_vh  #虚拟路径
      bindings:        #服务的整合处理
        saveOrderOutput:    #这个是消息通道的名称 --->保存订单输出通道
          destination: exchange-saveOrder-delay     #exchange名称,交换模式默认是topic;把SpringCloud stream的消息输出通道绑定到RabbitMQ的exchange-saveOrder交换器。
          content-type: application/json      #设置消息的类型,本次为json
          default-binder: defaultRabbit
          group: saveOrderGroup               #分组
        saveOrderInput: #生产者绑定,这个是消息通道的名称---> 保存订单输入通道
          destination: exchange-saveOrder-delay     #exchange名称,交换模式默认是topic;把SpringCloud stream的消息输出通道绑定到RabbitMQ的exchange-saveOrder交换器。
          content-type: application/json      #设置消息的类型,本次为json
          default-binder: defaultRabbit
          group: saveOrderGroup               #分组
      rabbit:
        bindings: #服务的整合处理
          saveOrderOutput:    #这个是消息通道的名称 --->保存订单输出通道
            producer:
              delayed-exchange: true
          saveOrderInput:
            consumer:
              delayed-exchange: true

3.5.2、生产端

@Slf4j
@EnableBinding(OrderOutputChannelProcesor.class)
public class OrderMessageProducer {
    @Autowired
    @Output("saveOrderOutput")
    private MessageChannel messageChannel;

    public void sentMsg(UserInfo userInfo){
        messageChannel.send(MessageBuilder.withPayload(userInfo).setHeader("x-delay", 5000).build());
        log.info("消息发送成功:" + userInfo);
    }
}

3.5.2、消息确认机制 消费端

rabbit:
  bindings: #服务的整合处理
    saveOrderInput:
      consumer:
        acknowledge-mode: MANUAL #手动确认
@StreamListener("saveOrderInput")
public void receiveMsg(Message<UserInfo> userInfoMessage){

    log.info("接收消息成功:" + userInfoMessage.getPayload());
    Channel channel = (Channel) userInfoMessage.getHeaders().get(AmqpHeaders.CHANNEL);
    Long delieverTag = (Long) userInfoMessage.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
    /*
     * deliveryTag:Channel的消息投递的唯一标识符。
     * multiple:是否否定应答多条消息。如果设置为true,则否定应答带指定deliveryTag的消息及该deliveryTag之前的多条消息;
     * 如果设置为false,则仅否定应答带指定deliveryTag的单条消息。
     * requeue:被否定应答的消息是否重入队列。如果设置为true,则消息重入队列;
     * 如果设置为false,则消息被丢弃或发送到死信Exchange。
     */
    try {
        channel.basicAck(delieverTag,true);
    } catch (IOException e) {
        e.printStackTrace();
    }
}

定义交换机类型为direct

rabbit:
  bindings: #服务的整合处理
    saveOrderInput:
      consumer:
        bindingRoutingKey: orderRoutingKey
        bindQueue: true
        exchangeType: direct
    saveOrderOutput:
      producer:
        routingKeyExpression: orderRoutingKey
        exchangeType: direct

总结

spring-cloud-stream目前支持RabbitMQ和Kafka,与spring-cloud无缝集成,非常方便。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1199426.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【MongoDB】索引 – 文本索引(用权重控制搜索结果)

一、准备工作 这里准备一些数据 db.books.drop();db.books.insert({_id: 1, name: "Java", alias: "java 入门", description: "入门图书" }); db.books.insert({_id: 2, name: "C", alias: "c", description: "C 入…

【算法专题】双指针—三数之和

力扣题目链接&#xff1a;三数之和 一、题目解析 二、算法原理 解法一&#xff1a;排序暴力枚举利用set去重 代码就不写了&#xff0c;你们可以试着写一下 解法二&#xff1a;排序双指针 这题和上一篇文章的两数字和方法类似 排序固定一个数a在这个数的后面区间&#xff0…

2023年第十六届山东省职业院校技能大赛高职组“信息安全管理与评估”赛项规程

第十六届山东省职业院校技能大赛 高职组“信息安全管理与评估”赛项规程 一、赛项名称 赛项名称&#xff1a;信息安全管理与评估 英文名称&#xff1a;Information Security Management and Evaluation 赛项组别&#xff1a;高职组 赛项归属&#xff1a;电子与信息大类 二…

《詩經别解》——國風·周南·雎鳩​​​​​​​

一、关于古文的一个认识 目前可以阅读的古文经典&#xff0c;大多是经历了几千年的传承。期间的武力战争、文化纷争、宗教侵袭、官僚介入及文人的私人恩怨与流派桎梏&#xff0c;印刷与制作技术&#xff0c;导致这些古文全部都已经面目全非。简单地说&#xff0c;你读到的都是…

Java 简单实现一个 TCP 回显服务器

文章目录 TCP 服务端TCP 客户端实现效果TCP 服务端(实现字典功能)总结 TCP 服务端 package network;import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.io.PrintWriter; import java.net.ServerSocket; import java.net.Soc…

路径规划-车辆分配及导航

1.根据城市之间的连通状态&#xff0c;构建以城市为结点、两个城市间的距离&#xff08;根据两个城市经纬度计算的欧式距离&#xff09;作为边权重的无向图。 2.根据起始点&#xff0c;对除了起始点之外的其他点进行聚类&#xff0c;将点划分成几个部分。 3.在每个部分中找出…

javaSE学习笔记(七)IO流

目录 六、IO流 1.IO流概述 概念 分类 IO体系 简单介绍 最重要&#xff0c;最常用&#xff0c;最常见的两个流 2.File类 路径分隔符 绝对路径和相对路径 构造方法 方法 重命名注意事项 删除注意事项 3.FileInputStream&FileOutputStream FileInputStream 获取…

归并外排序实现

文章目录 1. 海量数据排序 1. 海量数据排序 如果我们想在文件中海量数据排序&#xff0c;我们比较适合选用归并排序。 首先&#xff0c;我们要看要排序的文件的大小&#xff0c;比如说这个文件是10G&#xff0c;而我们的内存是1G&#xff0c;那么我们可以把文件切成10份。这样…

怎么制作安装电子版说明书?方法献上~

在现代科技发展的背景下&#xff0c;制作一份优质的电子版说明书对于帮助用户正确、高效地使用产品至关重要。无论是软件、设备还是家电产品&#xff0c;一份清晰明了的电子版说明书可以为用户提供指导和支持&#xff0c;提升用户体验和满意度。那么&#xff0c;如何制作一份出…

Dubbo快速入门

1.什么是Dubbo&#xff1f; Dubbo是一款高性能分布式服务框架&#xff0c;由阿里巴巴开发并开源发布。它支持多种协议&#xff0c;如dubbo、HTTP、Hessian、Thrift等&#xff0c;可以很好地解决分布式服务中的服务治理问题&#xff0c;提供了服务注册、发现、负载均衡、容错等功…

模拟实现string类——【C++】

W...Y的主页 &#x1f60a; 代码仓库分享 &#x1f495; &#x1f354;前言&#xff1a; 我们已经将STL中的string类重要接口全部认识并熟练掌握&#xff0c;为了让我们对string与C类与对象更深层次的了解&#xff0c;我们这篇博客将string类进行模拟实现。 目录 string类的…

原生微信小程序学习之旅(一) -来简单的使用

文章目录 取消导航栏标头组件创建添加Component组件接收传入的数据 页面创建(Page)关于tabBartabBar自定义样式 轮播图轮播图指示点样式改变 微信小程序快速获取用户信息路由跳转获取url路径中的参数 bindtap(click)传参wx:if编写用户登陆关于默认工程目前的获取方法尝试一下服…

海外媒体发稿:彭博社发稿宣传中,5种精准营销方式

在如今的信息发生爆炸时期&#xff0c;营销方式多种多样&#xff0c;但是充分体现精准营销并针对不同用户群体的需求并非易事。下面我们就根据彭博社发稿营销推广为例子&#xff0c;给大家介绍怎样根据不同用户人群方案策划5种精准营销方式。 1.界定总体目标用户人群在制订精准…

通过设置响应头解决跨域问题

网上很多文章都是告诉你直接Nginx添加这几个响应头信息就能解决跨域&#xff0c;当然大部分情况是能解决&#xff0c;但是我相信还是有很多情况&#xff0c;明明配置上了&#xff0c;也同样会报跨域问题。 这大概率是因为&#xff0c;服务端没有正确处理预检请求也就是OPTIONS请…

设计模式之--原型模式(深浅拷贝)

原型模式 缘起 某天&#xff0c;小明的Leader找到小明:“小明啊&#xff0c;如果有个发简历的需求&#xff0c;就是有个简历的模板&#xff0c;然后打印很多份&#xff0c;要去一份一份展示出来&#xff0c;用编程怎么实现呢&#xff1f;” 小明一听&#xff0c;脑袋里就有了…

matlab 小波自适应阈值去噪

1、内容简介 略 12-可以交流、咨询、答疑 小波自适应阈值去噪 2、内容说明 小波自适应阈值一维信号去噪&#xff0c;也包含软阈值和硬阈值 硬阈值、软阈值、自适应阈值 3、仿真分析 略 4、参考论文 略 链接&#xff1a;https://pan.baidu.com/s/1yQ1yDfk-_Qnq7tGpa23L…

【LeetCode:715. Range 模块 | 线段树】

&#x1f680; 算法题 &#x1f680; &#x1f332; 算法刷题专栏 | 面试必备算法 | 面试高频算法 &#x1f340; &#x1f332; 越难的东西,越要努力坚持&#xff0c;因为它具有很高的价值&#xff0c;算法就是这样✨ &#x1f332; 作者简介&#xff1a;硕风和炜&#xff0c;…

如何查看反汇编(VS)

如何查看反汇编 1. 设置断点2. 运行到该处3. 右键 反汇编结果 1. 设置断点 2. 运行到该处 3. 右键 反汇编 结果 即可跳转查看反汇编

kubernetes--pod详解

目录 一、pod简介&#xff1a; 1. Pod基础概念&#xff1a; 2. Kubrenetes集群中Pod的两种使用方式&#xff1a; 3. pod资源中包含的容器&#xff1a; 4. pause容器的两个核心功能&#xff1a; 5. Kubernetes中使用pause容器概念的用意&#xff1a; 二、pod的分类&#xff1a…

springboot模板引擎

1.服务端渲染时相比与前后端分离开发 原理是 跳过前端这一层 直接到服务端 通过数据和模板 生成页面返回前端 springboot包含如下模板引擎 典型如thymeleaf 1>导入依赖 2>查看路径 模板页面在 public static final String DEFAULT_PREFIX “classpath:/templates/”; 即…