第02讲:SpringCloudStream

news2024/10/6 0:34:49

一、什么是SpringCloudStream

SpringCloudStream是SpringCloud的一个子项目,他提供了一套更加通用的操作MQ的解决方案

在这里插入图片描述

  1. Destination Binder(目标绑定器) :微服务与消息中间件通信的组件
  2. Destination Bindings(目标绑定):Binding是连接应用程序跟消息中间件的桥梁,用于消息的消费和生产,由binder创建
  3. Message:消息

二、为什么选择SpringCloudStream

​ 现在的mq产品主流有4中:rabbitmq,rocketmq,activemq,kafka;有时候很意外的是:学的其中一个,公司用的又是另外一个,导致学习成本提高。有或者是 业务服务使用rabbitmq,而数据库后台使用kafka,整个项目使用了2种mq,可能会导致切换困难,维护成本高等因素。

我们希望能够像学习 hibernate时那样,不管底层是oracle还是mysql或其他数据库,只要给我一组统一的API操作即可;而springcloud-stream就相当于mq的统一接口。

在这里插入图片描述

Tip:input表示微服务接收消息,output表示微服务发送消息

二、SpringBoot整合SpringCloudStream

2.1、创建项目

父工程:stream-mq-demo

子工程(消息生产者):producer

子工程(消息消费者):consumer

2.1、pom.xml

配置父工程的pom.xml

  • springboot版本
  • 各组件的版本号
  • 配置SpringCloud和SpringCloudAlibaba
  • 添加依赖(RocketMQ、SpringCloudStream等)
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.5.RELEASE</version>
    </parent>

    <groupId>org.example</groupId>
    <artifactId>stream-mq-demo</artifactId>
    <packaging>pom</packaging>
    <version>1.0-SNAPSHOT</version>
    <modules>
        <module>producer</module>
        <module>consumer</module>
    </modules>

    <properties>
        <spring-cloud.version>Greenwich.SR1</spring-cloud.version>
        <spring-cloud-alibaba.version>0.9.0.RELEASE</spring-cloud-alibaba.version>
        <java.version>1.8</java.version>
        <lombok.version>1.18.8</lombok.version>
        <rocketmq.version>2.0.3</rocketmq.version>
    </properties>

    <dependencies>
        <!-- RocketMQ坐标 -->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>${rocketmq.version}</version>
        </dependency>
        <!-- SpringCloudStream坐标 -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
        </dependency>
        <!-- SpringWeb坐标 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- lombok坐标 -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>${lombok.version}</version>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <!--整合spring cloud-->
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
            <!--整合spring cloud alibaba-->
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-alibaba-dependencies</artifactId>
                <version>${spring-cloud-alibaba.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>
</project>

2.2、开发消息生产者

在子工程producer中进行开发

2.2.1、application.yml

spring:
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: 127.0.0.1:9876
      bindings:
        #消费者
        output:
          #用来指定topic
          destination: stream-test-topic
server:
  port: 8081

2.2.2、启动类

在启动类中配置生产者

package demo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;

@SpringBootApplication
@EnableBinding(Source.class)
public class ProducerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ProducerApplication.class, args);
    }
}

2.2.3、在controller中测试

package demo.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class TestController {
    @Autowired
    private Source source;
    
    @GetMapping("/send")
    public String testSend(){
        source.output()
                .send(
                        MessageBuilder
                                .withPayload("这是一条测试的消息")
                                .build());
        return "消息发送完成,请到MQ控制台查看";
    }
}

2.2.4、测试

使用postman发送请求 http://localhost:8081/send

在MQ的控制台可以看到有新的消息

在这里插入图片描述

2.3、开发消费者

在子工程consumer中进行开发

2.3.1、application.yml

spring:
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: 127.0.0.1:9876
      bindings:
        #消息消费者
        input:
          #用来指定topic,要和消息生产者的的topic匹配
          destination: stream-test-topic
          #一定要设置,必填项,如果用其他MQ,该属性可以不设置
          group: test
server:
  port: 8082

2.3.2、启动类

在启动类中配置消费者

package demo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;

@SpringBootApplication
@EnableBinding(Sink.class)
public class ConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }
}

2.3.3、监听消息

在消费者中监听生产者发送的消息

package demo.mq;

import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.stereotype.Service;

@Service
@Slf4j
public class TestStreamConsumer {

    @StreamListener(Sink.INPUT)
    public void receive(String messageBody){
        log.info("通过stream收到了消息:messageBody={}", messageBody);
//        throw new RuntimeException();
    }

    /**
     * 全局异常处理
     *
     * @param message 发生异常的消息
     */
    @StreamListener("errorChannel")
    public void error(Message<?> message) {
        ErrorMessage errorMessage = (ErrorMessage) message;
        log.warn("RocketMQ-SpringCloudStream发生异常,errorMessage={}", errorMessage);
    }
}

2.3.4、测试

使用postman发送请求 http://localhost:8081/send

在消费者控制台可以看到该条消息被消费了

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/531864.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

2023爱分析・云原生 IDE 市场厂商评估报告-行云创新(CloudToGo)

1. 研究范围定义 企业数字化转型初期&#xff0c;通过资源池云化&#xff0c;解决了IDC时代运维、部署、扩容的难题&#xff0c;但传统应用单体架构厚重、烟囱式架构等带来的一系列应用层面的问题并没有得到有效解决&#xff0c;云对业务的价值主要还停留在资源供给的阶段…

Scaled dot-prodect Attention的原理和实现(附源码)

文章目录 背景什么是AttentionAttention权重的计算方法1. 多层感知机法2. Bilinear方法3. Dot Product4. Scaled Dot Product Scaled dot-prodect Attention的源码实现 背景 要了解深度学习中的Attention&#xff0c;就不得不先谈Encoder-Decoder框架&#xff08;sequence to s…

拍立淘API接口说明文档 按图搜索淘宝商品API 实时数据返回

开发背景&#xff1a; 随着电商行业的不断发展&#xff0c;人们的购物需求日益增多。在购买商品时&#xff0c;很多人会通过搜索引擎、社交媒体等手段来获取信息或灵感。但是&#xff0c;在这些渠道中找到想要的商品并不容易&#xff0c;因为其中可能会混杂着一些广告或无关内…

Android内存优化检测工具LeakCanary使用

一、什么是LeakCanary leakCanary是Square开源框架&#xff0c;是一个Android和Java的内存泄露检测库。如果检测到某个activity有内存泄露&#xff0c;LeakCanary就是自动地显示一个通知&#xff0c;所以可以把它理解为傻瓜式的内存泄露检测工具。通过它可以大幅度减少开发中遇…

Java 并发队列详解

一&#xff0c;简介 1&#xff0c;并发队列两种实现 以ConcurrentLinkedQueue为代表的高性能非阻塞队列以BlockingQueue接口为代表的阻塞队列 2&#xff0c;阻塞队列与非阻塞队列的区别 当阻塞队列是空的时&#xff0c;从队列中获取元素的操作将会被阻塞&#xff0c;试图从…

【BFS】华子20230506笔试第三题(动态迷宫问题)Java实现

文章目录 题目链接思路BFS板子我的解答 题目链接 塔子哥的codeFun2000&#xff1a;http://101.43.147.120/p/P1251 测试样例1 输入 3 2 1 0 1 2 2 1 2 0 100 100 100 100 000 100 000 000 001输出 1测试样例2 输入 3 2 1 0 2 0 0 1 2 2 000 000 001 010 101 101 110 010 …

在docker容器中启动docker服务并实现构建多平台镜像的能力

在docker容器中启动docker服务并实现构建多平台镜像的能力 背景 在容器中运行docker&#xff0c;是devops中无法避免的场景&#xff0c;通常被应用于提供统一的镜像构建工具&#xff0c;出于安全考虑&#xff0c;不适合将主机的docker进程暴露给公司的内部人员使用&#xff0…

SpringCloud alibaba微服务b2b2c电子商务平台

1. 涉及平台 平台管理、商家端&#xff08;PC端、手机端&#xff09;、买家平台&#xff08;H5/公众号、小程序、APP端&#xff08;IOS/Android&#xff09;、微服务平台&#xff08;业务服务、系统服务、中间件服务&#xff09; 2. 核心架构 Spring Cloud、Spring Boot2、My…

飞书开发流程

1、进入飞书并创建一个应用 链接: 创建应用 创建应用成功后需要审核通过&#xff0c;如果你拥有管理权限则可以自己进入管理后台通过审核&#xff0c;否则需要联系管理员通过审核 2、进入开发者后台 链接: 发者后台 3、在该调试平台上测试 以这个订阅审批事件为例 这一步…

DHCP协议简单配置

实验原理 网络中主机需要与外界进行通信时,需要配置自己的IP地址、网关地址、DNS服务器等网络参数信息。手工在每台主机上配置维护成本高,容易出错,而且不利于管理员统一维护。 通过DHCP地址自动配置协议,使终端设备能自动获取地址,实现即插即用且IP地址统一由服务器管理…

springboot+java充电桩充电额维修管理系统

项目介绍 Spring Boot 是 Spring 家族中的一个全新的框架&#xff0c;它用来简化Spring应用程序的创建和开发过程。也可以说 Spring Boot 能简化我们之前采用SSM&#xff08;Spring MVC Spring MyBatis &#xff09;框架进行开发的过程。 系统基于B/S即所谓浏览器/服务器模式…

STM32 学习笔记_9 定时器中断:编码器接口模式

TIM编码器接口 之前我们处理旋转编码器&#xff0c;是转一下中断一次&#xff0c;挺消耗资源的。 我们可以利用TIM的编码器功能&#xff0c;隔一段时间取一下旋转器值使得cnt或–&#xff0c;以此判断旋转位置以及计算速度&#xff0c;相比中断节约资源。相当于外接了一个有方…

Kubernetes那点事儿——暴露服务之Service

Kubernetes那点事儿——暴露服务之Service 前言一、Service二、Service与Pod关系三、Service常用类型ClusterIPNodePortLoadBalancer 四、Service代理模式IptablesIPVS修改代理模式 前言 K8s中&#xff0c;我们将应用跑在Pod里。多数情况下是一组Pod&#xff0c;用户如何访问这…

凌恩生物美文分享 | 提升科研有一套 | 宏基因组磷循环分析又出新!

磷是包括微生物在内的所有生命体中不可缺少的元素。在生物大分子核酸、高能量化合物ATP、以及生物体内糖代谢的某些中间体中&#xff0c;都有磷的存在。在自然界中&#xff0c;磷的循环包括可溶性无机磷的同化、有机磷的矿化、不溶性磷的溶解等。微生物分解含磷化合物的作用&am…

操作系统面试相关知识

目录 一、简介1、什么是操作系统2、操作系统主要有哪些功能&#xff1f; 二、操作系统结构1、什么是内核&#xff1f;2、什么是用户态和内核态&#xff1f;3、 用户态和内核态是如何切换的&#xff1f; 三、 进程和线程1、并行和并发有什么区别&#xff1f;2、什么是进程上下文…

无线传感器网络的时钟同步估计问题(Matlab代码实现)

目录 &#x1f4a5;1 概述 &#x1f4da;2 运行结果 &#x1f389;3 参考文献 &#x1f468;‍&#x1f4bb;4 Matlab代码 &#x1f4a5;1 概述 随着无线传感器网络的快速发展,其应用领域也越来越广。在诸多的应用环境中都需要大量已同步的传感器节点通过协同作用执行一个…

Python 中IndexError: list assignment index out of range 错误解决

文章目录 Python IndexError&#xff1a;列表分配索引超出范围修复 Python 中的 IndexError: list assignment index out of range修复 IndexError: list assignment index out of range 使用 append() 函数修复 IndexError: list assignment index out of range 使用 insert()…

怎么把文本翻译成英文?安利三个文本翻译方法

在当今全球化的时代&#xff0c;跨国交流和合作已经成为常态。然而&#xff0c;不同语言之间的沟通障碍经常阻碍着信息传递和理解。为了帮助我们更好地进行国际交流&#xff0c;文本翻译英文软件应运而生。这类软件能够将各种语言的文本迅速准确地翻译成英文&#xff0c;使我们…

【起飞】让你电脑速度快到飞起的一些牛逼的设置整理【电脑卡顿反应慢等问题解决】

对于开发来说电脑的反应速度简直影响了思维的速度&#xff0c;要让电脑速度跟上我们的思维&#xff0c;提高工作效率&#xff0c;早点打卡下班回家陪老婆孩子哈哈 这篇文章主要对windows系统做的一些优化&#xff0c;是真的好用&#xff0c;仿佛在访问静态页面一样&#xff0c;…

超实用!年薪40W的项目经理都在用的6个项目管理软件

项目管理软件是帮助团队进行项目计划、任务分配、进度跟踪和团队协作等方面的工具&#xff0c;已经成为了项目经理必不可少的工具之一。 市面上的项目管理软件有很多&#xff0c;这就来分享一下几款我认为好用的项目管理软件&#xff01; 一、六款好用的项目管理软件 1.简道…