Spring Cloud Stream实践

news2025/1/14 6:22:26

概述

不同中间件,有各自的使用方法,代码也不一样。

可以使用Spring Cloud Stream解耦,切换中间件时,不需要修改代码。实现方式为使用绑定层,绑定层对生产者和消费者提供统一的编码方式,需要连接不同的中间件时,绑定层使用不同的绑定器即可,也就是把切换中间件需要做相应的修改工作交给绑定层来做。

本文的操作是在 微服务调用链路追踪 的基础上进行。

环境说明

jdk1.8

maven3.6.3

mysql8

spring cloud2021.0.8

spring boot2.7.12

idea2022

rabbitmq3.12.4

步骤

消息生产者

创建子模块stream_producer

添加依赖

    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
    </dependencies>

刷新依赖

配置application.yml

server:
  port: 7001
spring:
  application:
    name: stream_producer
  rabbitmq:
    addresses: 127.0.0.1
    username: guest
    password: guest
  cloud:
    stream:
      bindings:
        output:
          destination: my-default #指定消息发送的目的地,值为rabbit的exchange的名称
      binders:
        defaultRabbit:
          type: rabbit #配置默认的绑定器为rabbit

查看Source.class源码

编写生产者代码,发送一条消息("hello world")到rabbitmq的my-default exchange中

package org.example.stream;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;

@EnableBinding(Source.class)
@SpringBootApplication
public class StreamProductApplication implements CommandLineRunner {
    @Autowired
    private MessageChannel output;

    @Override
    public void run(String... args) throws Exception {
        //发送消息
        // messageBuilder 工具类,创建消息
        output.send(MessageBuilder.withPayload("hello world").build());
    }

    public static void main(String[] args) {
        SpringApplication.run(StreamProductApplication.class, args);
    }


}

查看rabbitmq web UI

http://localhost:15672/

看到Exchanges中还没有my-default

运行StreamProductApplication

刷新rabbitmq Web UI,看到了my-dafault的exchange

消息消费者

创建子模块stream_consumer

添加依赖

    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
    </dependencies>

配置application.yml

server:
  port: 7002
spring:
  application:
    name: stream_consumer
  rabbitmq:
    addresses: 127.0.0.1
    username: guest
    password: guest
  cloud:
    stream:
      bindings:
        input: #内置获取消息的通道,从destination配置值的exchange中获取信息
          destination: my-default #指定消息发送的目的地,值为rabbit的exchange的名称
      binders:
        defaultRabbit:
          type: rabbit #配置默认的绑定器为rabbit

查看内置通道名称为input

编写消息消费者启动类,在启动类监听接收消息

package org.example.stream;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;

@SpringBootApplication
@EnableBinding(Sink.class)
public class StreamConsumerApplication {

    @StreamListener(Sink.INPUT)
    public void input(Message<String> message){
        System.out.println("监听收到:" + message.getPayload());
    }

    public static void main(String[] args) {
        SpringApplication.run(StreamConsumerApplication.class, args);
    }
}

运行stream_consumer消费者服务,监听消息

运行stream_producer生产者服务,发送消息

查看消费者服务控制台日志,接收到了消息

优化代码

之前把生产和消费的消息都写在启动类中了,代码耦合高。

优化思路是把不同功能的代码分开放。

消息生产者

stream_producer 代码结构如下

package org.example.stream.producer;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

/**
 * 向中间件发送数据
 */
@Component
@EnableBinding(Source.class)
public class MessageSender {
    @Autowired
    private MessageChannel output;//通道

    //发送消息
    public void send(Object obj){
        output.send(MessageBuilder.withPayload(obj).build());
    }
}

修改启动类

package org.example.stream;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;

@SpringBootApplication
public class StreamProductApplication {

    public static void main(String[] args) {
        SpringApplication.run(StreamProductApplication.class, args);
    }

}

pom.xml添加junit依赖

<dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <scope>test</scope>
</dependency>

刷新依赖

编写测试类

在stream_producerm模块的src/test目录下,新建org.example.stream包,再建出ProducerTest类,代码如下

package org.example.stream;

import org.example.stream.producer.MessageSender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest
public class ProducerTest {
    @Autowired
    private MessageSender messageSender;//注入发送消息工具类

    @Test
    public void testSend(){
        messageSender.send("hello world");
    }
}

 

消息消费者

stream_consumer代码结构如下

添加MessageListener类获取消息

package org.example.stream.consumer;

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.stereotype.Component;

@Component
@EnableBinding(Sink.class)
public class MessageListener {

    // 监听binding中的信息
    @StreamListener(Sink.INPUT)
    public void input(String message){
        System.out.println("获取信息:" + message);
    }
}

修改启动类

package org.example.stream;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;

@SpringBootApplication
public class StreamConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(StreamConsumerApplication.class, args);
    }

}

启动consumer接收消息

执行producer单元测试类ProducerTest的testSend()方法,发送消息

查看consumer控制台输出,接收到信息了

代码解耦后,同样能成功生产消息和消费消息。

自定义消息通道

此前使用默认的消息通道outputinput。

也可以自己定义消息通道,例如:myoutputmyinput

消息生产者

org.example.stream包下新建channel包,在channel包下新建MyProcessor接口类

package org.example.stream.channel;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;

/**
 * 自定义的消息通道
 */
public interface MyProcessor {
    /**
     * 消息生产这的配置
     */
    String MYOUTPUT = "myoutput";

    @Output("myoutput")
    MessageChannel myoutput();

    /**
     * 消息消费者的配置
     */
    String MYINPUT = "myinput";
    @Input("myinput")
    SubscribableChannel myinput();
}

修改MessageSender

package org.example.stream.producer;

import org.example.stream.channel.MyProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

/**
 * 向中间件发送数据
 */
@Component
@EnableBinding(MyProcessor.class)
public class MessageSender {
    @Autowired
    private MessageChannel myoutput;//通道

    //发送消息
    public void send(Object obj){
        myoutput.send(MessageBuilder.withPayload(obj).build());
    }
}

修改application.yml

cloud:
  stream:
    bindings:
      output:
        destination: my-default #指定消息发送的目的地
      myoutput:
        destination: custom-output

消息消费者

在stream_consumer服务的org.example.stream包下新建channel包,在channel包下新建MyProcessor接口类

package org.example.stream.channel;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;

/**
 * 自定义的消息通道
 */
public interface MyProcessor {
    /**
     * 消息生产者的配置
     */
    String MYOUTPUT = "myoutput";

    @Output("myoutput")
    MessageChannel myoutput();

    /**
     * 消息消费者的配置
     */
    String MYINPUT = "myinput";
    @Input("myinput")
    SubscribableChannel myinput();
}

修改MessageListener

package org.example.stream.stream;

import org.example.stream.channel.MyProcessor;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.stereotype.Component;

@Component
@EnableBinding(MyProcessor.class)
public class MessageListener {

    // 监听binding中的信息
    @StreamListener(MyProcessor.MYINPUT)
    public void input(String message){
        System.out.println("获取信息:" + message);
    }
}

修改application.yml配置

cloud:
  stream:
    bindings:
      input: #内置获取消息的通道,从destination配置值的exchange中获取信息
        destination: my-default #指定消息发送的目的地
      myinput:
        destination: custom-output

测试

启动stream_consumer

运行单元测试的testSend()方法生产消息

查看stream_consumer控制台,能看到生产的消息,如下

获取信息:hello world

消息分组

采用复制配置方式运行两个消费者

启动第一个消费者(端口为7002)

修改端口为7003,copy configuration,再启动另一个消费者

执行生产者单元测试生产消息,看到两个消费者都接收到了信息

说明:如果有两个消费者,生产一条消息后,两个消费者均能收到信息。

但当我们发送一条消息只需要其中一个消费者消费消息时,这时候就需要用到消息分组,发送一条消息消费者组内只有一个消费者消费到。

我们只需要在服务消费者端设置spring.cloud.stream.bindings.input.group 属性即可

重启两个消费者

修改端口号为7002,重新启动第一个消费者

修改端口号为7003,重新启动第二个消费者

生产者生产一条消息

查看消费者接收消息情况,只有一个消费者接收到信息。

消息分区

消息分区就是实现特定消息只往特定机器发送。

修改生产者配置

  cloud:
    stream:
      bindings:
        output:
          destination: my-default #指定消息发送的目的地,值为rabbit的exchange的名称
        myoutput:
          destination: custom-output
          producer:
            partition-key-expression: payload #分区关键字 可以是对象中的id,或对象
            partition-count: 2 #分区数量

修改消费者1的application.yml配置

server:
  port: 7002
spring:
  application:
    name: stream_consumer
  rabbitmq:
    addresses: 127.0.0.1
    username: guest
    password: guest
  cloud:
    stream:
      bindings:
        input: #内置获取消息的通道,从destination配置值的exchange中获取信息
          destination: my-default #指定消息发送的目的地,值为rabbit的exchange的名称
        myinput:
          destination: custom-output
          group: group1 #消息分组,有多个消费者时,只有一个消费者接收到信息
          consumer:
            partitioned: true #开启分区支持
      binders:
        defaultRabbit:
          type: rabbit #配置默认的绑定器为rabbit
      instance-count: 2 #消费者总数
      instance-index: 0 #当前消费者的索引

启动消费者1

修改消费者2的配置

server:
  port: 7003
spring:
  application:
    name: stream_consumer
  rabbitmq:
    addresses: 127.0.0.1
    username: guest
    password: guest
  cloud:
    stream:
      bindings:
        input: #内置获取消息的通道,从destination配置值的exchange中获取信息
          destination: my-default #指定消息发送的目的地,值为rabbit的exchange的名称
        myinput:
          destination: custom-output
          group: group1 #消息分组,有多个消费者时,只有一个消费者接收到信息
          consumer:
            partitioned: true #开启分区支持
      binders:
        defaultRabbit:
          type: rabbit #配置默认的绑定器为rabbit
      instance-count: 2 #消费者总数
      instance-index: 1 #当前消费者的索引

修改端口号为7003,当前消费者的索引instance-index的值修改为1

启动消费者2

生产者发送消息,看到只有Application(2)接收到消息

再用生产者发送一次消息,也是Application(2)接收到消息

说明实现了消息分区

也可以更改发送的数据,看是否能发送到不同消费者

修改生产者,发送数据由hello world变为hello world1,同时发送5次

	public void testSend(){
        for (int i = 0; i < 5; i++) {
            messageSender.send("hello world1");
        }
    }

看到hello world1全部被Application消费

所以消息分区是根据发送的消息不同,发送到不同消费者中。

完成!enjoy it!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1231372.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

解决Python requests库中的重定向问题

目录 一、默认情况下&#xff0c;requests库如何处理重定向 二、手动处理重定向 三、处理多个重定向 四、注意事项 总结 在Python requests库中&#xff0c;处理重定向是一个常见的问题。默认情况下&#xff0c;requests库会自动处理重定向&#xff0c;并将最终的响应返回…

Vuex 组件间通讯

组件间通讯 Vuex https://vuex.vuejs.org/zh/ 基本原理 数据提取到父级 // index 文件 import Vue from vue import Vuex from "vuex" import tab from ./tab // 引入 modulesVue.use(Vuex) // 全局引入// 创建 Vuex 实例 export default new Vuex.Store({modules: …

力扣刷题-二叉树-二叉树最小深度

给定一个二叉树&#xff0c;找出其最小深度。 最小深度是从根节点到最近叶子节点的最短路径上的节点数量。 说明&#xff1a;叶子节点是指没有子节点的节点。&#xff08;注意题意&#xff09; 示例 1&#xff1a; 输入&#xff1a;root [3,9,20,null,null,15,7] 输出&#x…

Redis篇---第十篇

系列文章目录 文章目录 系列文章目录前言一、怎么提高缓存命中率&#xff1f;二、Redis 如何解决 key 冲突&#xff1f;三、Redis 报内存不足怎么处理&#xff1f; 前言 前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分…

基于变形卷积和注意机制的带钢表面缺陷快速检测网络DCAM-Net(论文阅读笔记)

原论文链接->DCAM-Net: A Rapid Detection Network for Strip Steel Surface Defects Based on Deformable Convolution and Attention Mechanism | IEEE Journals & Magazine | IEEE Xplore DCAM-Net: A Rapid Detection Network for Strip Steel Surface Defects Base…

下厨房网站月度最佳栏目菜谱数据获取及分析PLus

目录 概要 源数据获取 写Python代码爬取数据 Scala介绍与数据处理 1.Sacla介绍 2.Scala数据处理流程 数据可视化 最终大屏效果 小结 概要 本文的主题是获取下厨房网站月度最佳栏目近十年数据&#xff0c;最终进行数据清洗、处理后生成所需的数据库表&#xff0c;最终进…

【Java 进阶篇】Ajax 实现——JQuery 实现方式 `ajax()`

嗨&#xff0c;亲爱的读者们&#xff01;欢迎来到这篇关于使用 jQuery 中的 ajax() 方法进行 Ajax 请求的博客。在前端开发中&#xff0c;jQuery 提供了简便而强大的工具&#xff0c;其中 ajax() 方法为我们处理异步请求提供了便捷的解决方案。无需手动创建 XMLHttpRequest 对象…

多目标应用:基于多目标灰狼优化算法MOGWO求解微电网多目标优化调度(MATLAB代码)

一、微网系统运行优化模型 微电网优化模型介绍&#xff1a; 微电网多目标优化调度模型简介_IT猿手的博客-CSDN博客 二、多目标灰狼优化算法MOGWO 多目标灰狼优化算法MOGWO简介&#xff1a; 三、多目标灰狼优化算法MOGWO求解微电网多目标优化调度 &#xff08;1&#xff09…

GFS分布式系统

GFS分布式文件系统 gfs glusterFS 开源的分布式的文件系统 存储服务器 客户端 以及网络&#xff08;NFS/samba&#xff09;网关 传统式&#xff08;老的&#xff09;分布式元服务系统&#xff0c;元服务器保存存储节点的目录树信息 一旦元服务器故障&#xff0c;所有的存储节点…

Linux 串口应用编程

1 串口 API Linux串口通信&#xff1a; 在 Linux 系统中&#xff0c;操作设备的统一接口就是&#xff1a; open/ioctl/read/write 。 对于 UART &#xff0c;又在 ioctl 之上封装了很多函数&#xff0c;主要是用来设置行规程。所以对于 UART &#xff0c;编程的套路就是…

交易量原则,昂首资本一个比喻说清楚

即使你是刚进入交易市场的新手小白&#xff0c;也可能听过这句话&#xff1a;“当需求超过供给时&#xff0c;市场就会上涨。当供应超过需求时&#xff0c;市场就会下跌。”为了理解交易量的重要性&#xff0c;昂首资本来看看这句话背后的原则。 对于未接触过此类术语的读者&a…

Chrome添加扩展程序

Crx4Chrome 下载crx 打开扩展程序 如果拖动crx文件到扩展程序提示只能通过Chrome应用商店添加此项内容 修改crx文件后缀为zip并解压&#xff0c;再拖动到扩展程序

【设计模式】结构型设计模式

结构型设计模式 文章目录 结构型设计模式一、概述二、适配器模式&#xff08;Adapter Pattern&#xff09;2.1 类适配器模式2.2 对象适配器模式2.3 接口适配器模式2.4 小结 三、桥接模式&#xff08;Bridge Pattern&#xff09;四、装饰器模式&#xff08;Decorator Pattern&am…

毕业设计ASP.NET 2368酒店信息管理系统【程序源码+文档+调试运行】

一、摘要 本文旨在设计并实现一个功能全面、易于使用的酒店信息管理系统。系统将管理员、客户和前台客服三种用户的需求纳入考虑&#xff0c;并针对每种用户设计了相应的功能模块。系统功能包括用户管理、客户管理、客房管理、商品管理、客房预订管理、入住管理和系统管理。此…

HDD与QLC SSD深度对比:功耗与存储密度的终极较量

在当今数据世界中&#xff0c;存储设备的选择对于整体系统性能和能耗有着至关重要的影响。硬盘HDD和大容量QLC SSD是两种主流的存储设备&#xff0c;而它们在功耗方面的表现是许多用户关注的焦点。 扩展阅读&#xff1a; 1.面对SSD的步步紧逼&#xff0c;HDD依然奋斗不息 2.…

动态页面调研及设计方案

文章目录 vue2 动态表单、动态页面调研一、form-generator二、ng-form-element三、Variant Form四、form-create vue2 动态表单、动态页面调研 一、form-generator 预览&#xff1a;https://mrhj.gitee.io/form-generator/#/ Vue2 Element UI支持拖拽生成表单不支持其他组件…

【iOS】——知乎日报第五周总结

文章目录 一、评论区展开与收缩二、FMDB库实现本地持久化FMDB常用类&#xff1a;FMDB的简单使用&#xff1a; 三、点赞和收藏的持久化 一、评论区展开与收缩 有的评论没有被回复评论或者被回复评论过短&#xff0c;这时就不需要展开全文的按钮&#xff0c;所以首先计算被回复评…

单图像3D重建AI算法综述【2023】

计算机视觉是人工智能的一个快速发展的领域&#xff0c;特别是在 3D 领域。 本概述将考虑一个应用任务&#xff1a;2D 和 3D 环境之间的转换。 在线工具推荐&#xff1a; Three.js AI纹理开发包 - YOLO合成数据生成器 - GLTF/GLB在线编辑 - 3D模型格式在线转换 - 可编程3D场景编…

Flink 运行架构和核心概念

Flink 运行架构和核心概念 几个角色的作用&#xff1a; 客户端&#xff1a;提交作业JobManager进程 任务管理调度 JobMaster线程 一个job对应一个JobMaster 负责处理单个作业ResourceManager 资源的分配和管理&#xff0c;资源就是任务槽分发器 提交应用&#xff0c;为每一个…

【C++上层应用】1. 异常处理

文章目录 【 1. C的标准异常 】【 2. 异常转移处理 】2.1 throw 抛出异常2.2 try 捕获异常2.3 catch 捕获异常2.4 实例 【 3. 定义新的异常 】 异常是程序在执行期间产生的问题&#xff0c;比如编译报错、链接错误等。 【 1. C的标准异常 】 C 提供了一系列标准的异常&#xf…