SpringCloud 集成 RocketMQ 及配置解析

news2024/11/25 20:53:20

文章目录

  • 前言
  • 一、SpringCloud 集成 RocketMQ
    • 1. pom 依赖
    • 2. yml 配置
    • 3. 操作实体
    • 4. 生产消息
      • 4.1. 自动发送消息
      • 4.2. 手动发送消息
    • 5. 消费消息
  • 二、配置解析
    • 1. spring.cloud.stream.function.definition


前言

  1. 定义
    Spring Cloud Stream 是一个用来为微服务应用构建消息驱动能力的框架。它可以基于 Spring Boot 来创建独立的、可用于生产的 Spring 应用程序。Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,并引入了发布-订阅、消费组、分区这三个核心概念。简单的说,Spring Cloud Stream本质上就是整合了Spring Boot和Spring Integration,实现了一套轻量级的消息驱动的微服务框架。

  2. 抽象模型
    我们都知道市面上有很多消息中间件,Sping Cloud Stream 为了可以集成各种各样的中间件,它抽象出了 Binder 的概念,每个消息中间件都需要有对应自己的 Binder。这样它就可以根据不同的 Binder 集成不同的中间件。下图的input和output是channel,Binder则是消息中间件和通道之间的桥梁。
    在这里插入图片描述

  3. 绑定器
    通过使用 Spring Cloud Stream,可以有效简化开发人员对消息中间件的使用复杂度,让系统开发人员可以有更多的精力关注于核心业务逻辑的处理。但是目前 Spring Cloud Stream 只支持 RabbitMQ 和 Kafka 的自动化配置。
    Spring Cloud Stream 提供了 Binder (负责与消息中间件进行交互),我们则通过 inputs 或者 outputs 这样的消息通道与 Binder 进行交互。

Binder 绑定器是 Spring cloud Stream 中一个非常重要的概念,实现了应用程序和消息中间件之间的隔离,同时我们也可以通过应用程序实现,消息中间件之间的通信。在我们的项目的可以继承多种绑定器,我们可以根据不同特性的消息使用不同的消息中间件。Spring Cloud Stream 为我们实现了 RabbitMQ 和Kafka 的绑定器。如果你想使用其他的消息中间件需要自己去实现绑定器接口。

一、SpringCloud 集成 RocketMQ

1. pom 依赖

<!-- rocketmq -->
<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>

2. yml 配置

spring:
  cloud:
    stream:
	  function:
	    definition: producer1;consumer1 # 方法定义(用于定义发送者或消费者方法)
      # 配置消息通道通用属性(适用于所有消息中间件)
      bindings:
        # 配置channel消息通道
        consumer1-in-0:
          destination: consumer_topic # topic消息主题
          content-type: application/json # 内容格式
          group: consumer-group # 消费者组
        producer1-out-0:
          destination: producer_topic # topic消息主题
          content-type: application/json # 内容格式
      rocketmq:
        binder:
          name-server: 127.0.0.1:9876 # rocketmq服务地址
          vipChannelEnabled: true # 是否开启vip通道(兼容老版本使用。多监听一个端口用于接受处理消息,防止端口占用。)
        # 配置消息通道独特属性(仅适用于rocketmq)
        bindings:
          # 配置channel消息通道(生产者:[functionName]-out-[index],消费者:[functionName]-in-[index])
          producer1-out-0:
            producer:
              group: consumer-group
              sync: true # 是否开启同步发送
          consumer1-in-0: 
            consumer:
              subscription: myTag  # 消费tag
              delayLevelWhenNextConsume: -1
              suspendCurrentQueueTimeMillis: 99999999
              broadcasting: false # 是否使用广播消费,默认为false使用集群消费

3. 操作实体

package com.demo.model;

import lombok.AllArgsConstructor;
import lombok.Data;

/**
 * 消息model
 */
@Data
@AllArgsConstructor
public class MsgModel {

    /**
     * 消息id
     */
    private String msgId;

    /**
     * 消息内容
     */
    private String message;
}

4. 生产消息

4.1. 自动发送消息

通过 MessageBuilder 自动发送消息。

package com.demo;

import com.demo.model.MsgModel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;

import java.util.function.Supplier;

/**
 * 消息生产者类
 */
@Configuration
@Slf4j
public class MyProducer {
    
	/**
	 * 消息生产者1
	 */
    @Bean
    public Supplier<Message<MsgModel>> producer1() {
        return () -> {
            MsgModel msgModel = new MsgModel(System.currentTimeMillis(), "测试消息");
            log.info("producer1发送消息:" + msgModel);
            return MessageBuilder.withPayload(entity).build();
        };
    }
}

这种方式定义 suppelier 会 默认1000ms 发送一次记录。可以修改 spring.cloud.stream.poller.fixedDelay 设置延迟毫秒值。

4.2. 手动发送消息

通过 StreamBridge 手动发送消息。

package com.demo.controller;

import com.demo.model.MsgModel;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * 消息controller
 */
@RestController
@RequiredArgsConstructor
@RequestMapping("/msg")
@Slf4j
public class MsgController {

    private final StreamBridge streamBridge;

	/**
	 * 发送消息
	 */
    @GetMapping("/send")
    public void sendMsg() {
        MsgModel msgModel = new MsgModel(System.currentTimeMillis(), "测试消息");
        log.info("producer1发送消息:" + msgModel);
        streamBridge.send("producer1-out-0", 
        	MessageBuilder.withPayload(entity).setHeader("MyHearder", "这是一个请求头").build());
    }
}

5. 消费消息

package com.demo;

import com.demo.model.MsgModel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;

import java.util.function.Consumer;

/**
 * 消息消费者类
 */
@Configuration
@Slf4j
public class ReceiveMQ {

	/**
	 * 消息消费者1
	 */
    @Bean
    public Consumer<Message<MsgModel>> consumer1(){
        return (message)->{
            MessageHeaders headers = message.getHeaders();
            MsgModel msgModel = message.getPayload();
            log.info("consumer1接收消息,消息头:" + headers.get("MyHeader"));
            log.info("consumer1接收消息,消息内容:" + msgModel);
        };
    }
}

二、配置解析

1. spring.cloud.stream.function.definition

进行生产者或消费者方法定义,在 rocketmq 初始时会加载这些方法以创建生产者或消费者列表。

不管是创建 Consumer 还是 Supplier 或者是 Function Stream 都会将其方法名称进行一个 topic 拆封和绑定。假设创建了一个 Consumer< String > myTopic 的方法,Stream 会将其 拆分成 In 和 out 两个通道:

  • 输入通道(消费者): [functionName]-in-[index]
    consumer1-in-0
  • 输出通道(生产者): [functionName]-out-[index]
    producer1-out-0

注意:这里的 functionName 需要和生产者或消费者方法名称以及 spring.cloud.stream.function.definition 下的名称保持一致。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1671728.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

文件流-二进制文件(中北大学-程序设计基础(2))

目录 题目 源码 结果示例 题目 建立两个二进制磁盘文件f1.dat,f2.dat&#xff0c;编程实现以下工作&#xff1a; &#xff08;1&#xff09;将20个整数&#xff08;可在程序中初始化&#xff09;&#xff0c;分别存放到两个磁盘文件中&#xff0c;前10个放到f1.dat中&…

C++类和对象下——实现日期类

前言 在学习了类和对象的六大成员函数后&#xff0c;为了巩固我们学习的知识可以手写一个日期类来帮助我们理解类和对象&#xff0c;加深对于其的了解。 默认函数 构造函数 既然是写类和对象&#xff0c;我们首先就要定义一个类&#xff0c;然后根据实际需要来加入类的数据与函…

windows和Linux卸载移动磁盘

文章目录 Linux卸载磁盘target is busy.window卸载磁盘打开事件查看器 Linux卸载磁盘target is busy. #查看有哪些进程访问挂载点 lsof /media/lei/repository/#杀死进程 pkill node window卸载磁盘 #提示 #该设备正在使用中. 请关闭可能使用该设备的所有程序或窗口,然后重试…

【React】redux开发者工具redux-devtools-extension的安装和使用

前言 redux-devtools-extension: 是一个用于Redux的开发者工具扩展。适合用于需要调试和监控Redux应用的状态管理。特点是可以提供实时的状态查看、行动日志和错误检测等 安装 chrome安装redux-devtools-extension 项目中安装并引入redux-devtools-extension yarn add re…

强化学习——马尔可夫奖励过程的理解

目录 一、马尔可夫奖励过程1.回报2.价值函数 参考文献 一、马尔可夫奖励过程 在马尔可夫过程的基础上加入奖励函数 r r r 和折扣因子 γ \gamma γ&#xff0c;就可以得到马尔可夫奖励过程&#xff08;Markov reward process&#xff09;。一个马尔可夫奖励过程由 < S , …

C++语言的字符数组

存放字符数据的数组是字符数组&#xff0c;字符数组中的一个元素存放一个字符。字符数组具有数组的共同属性。 1. 声明一个字符数组 char c[5]; 2. 字符数组赋值方式 &#xff08;1&#xff09;为数组元素逐一赋值 c[0]H c[1]E c[2]L c[3]L c[4]O &#xff08;2&…

【JVM】从可达性分析,到JVM垃圾回收算法,再到垃圾收集器

《深入理解Java虚拟机》[1]中&#xff0c;有下面这么一段话&#xff1a; 在JVM的各个区域中&#xff0c;如虚拟机栈中&#xff0c;栈帧随着方法的进入和退出而有条不紊的执行者出栈和入栈操作。每一个栈帧中分配多少内存基本上是在类结构确定下来时就已知的&#xff08;尽管在…

【qt】数值的输入与输出

数值的输入与输出 一.与c中的输入与输出的区别二.QString转数值三.数值转QString1.number()2.asprintf() 四.小项目1.总价和进制2.QSpinBox代替3.QSlider滑动块4.QScrollBar滚动条5.QDial表盘6.QLcdnumber lcd显示 五.总结一下下 一.与c中的输入与输出的区别 在c中我们一般通过…

java项目之相亲网站的设计与实现源码(springboot+mysql+vue)

风定落花生&#xff0c;歌声逐流水&#xff0c;大家好我是风歌&#xff0c;混迹在java圈的辛苦码农。今天要和大家聊的是一款基于springboot的相亲网站的设计与实现。项目源码以及部署相关请联系风歌&#xff0c;文末附上联系信息 。 项目简介&#xff1a; 相亲网站的设计与实…

泛微E9开发 添加多个多选框,实现单选框的效果

利用多个多选框实现单选框的效果 1、功能背景2、展示效果3、实现效果 1、功能背景 如下图所示&#xff0c;在表单中新增四个“选择框-复选框”类型的字段&#xff0c;并且设置其中的选项&#xff0c;每个多选框都只有一个选项&#xff0c;通过代码块实现单选框的效果 1.显示模…

网络工程师练习题

网络工程师练习题 下面的应用中,DHS基于UDP协议。在一台服务器上只开放25和110两个端口,这太服务器可以提供E-Mail服务。与HTTP相比,HTTPS协议将传输的内容进行加密,更加安全。HTTPS基于SSL安全协议,其默认端口是443。某单位网络拓扑如图所示。路由器AR2路由表内容如下所示…

彩虹易支付用户中心美化主题 模版源码

简介&#xff1a; 彩虹易支付用户中心美化主题 模版源码 使用本主题前请备份官方版本文件再进行解压到user目录替换&#xff01; 点击下载

笨方法自学python(九)-读写文件

读取文件 前面已经学过了 input 和 argv&#xff0c;这些是你开始学习读取文件的必备基础。你可能需要多多实验才能明白它的工作原理&#xff0c;这节练习涉及到写两个文件。一个正常的 ex15.py 文件&#xff0c;另外一个是 ex15_sample.txt&#xff0c;第二个文件并不是脚本&…

Lesson5--二叉树(超详细版)

【本节目标】 1. 树概念及结构 2. 二叉树概念及结构 3. 二叉树顺序结构及实现 4. 二叉树链式结构及实现 1.树概念及结构 1.1树的概念 树是一种 非线性&#xff08;线性结构就是顺序表链表&#xff09; 的数据结构&#xff0c;它是由 n &#xff08; n>0 &#xff09;个…

CSS表格

标准的表格结构 table标签&#xff1a;定义表格 caption标签&#xff1a;定义表格标题&#xff0c;这个标题会居中显示在表格上&#xff0c;一个表格只能定义一个标题 th标签&#xff1a;定义表格的表头&#xff0c;通常成粗体居中表示 tr标签&#xff1a;定义表格的一行 td标…

SSL/TLS协议信息泄露漏洞(CVE-2016-2183)解法

1.运行gpedit.msc&#xff0c;进入本地组策略编辑器。 2. 本地组策略编辑器-->计算机配置-->管理模板-->网络-->SSL配置设置-->启用“SSL密码套件顺序”。 3. 将原有的密码套件值清空&#xff0c;拷入下面的值&#xff0c;保存设置&#xff0c;并重启服务器即…

3.整数运算

系列文章目录 信息的表示和处理 : Information Storage&#xff08;信息存储&#xff09;Integer Representation&#xff08;整数表示&#xff09;Integer Arithmetic&#xff08;整数运算&#xff09;Floating Point&#xff08;浮点数&#xff09; 文章目录 系列文章目录前…

软件2班20240513

第三次作业 package com.yanyu;import java.sql.*; import java.util.ResourceBundle;public class JDBCTest01 {public static void main(String[] args) {ResourceBundle bundle ResourceBundle.getBundle("com/resources/db");// ctrl alt vString driver …

01-02-1

1、day10作业 使用的代码 #include<stdio.h> void change(int* i) {*i(*i) / 2; } int main() {int i 0;scanf("%d", &i);change(&i);printf("%d", i);return 0; } ​ ​ 2、day11作业 使用的代码 #include<stdio.h> #include<…

C++初阶学习第七弹——探索STL奥秘(二)——string的模拟实现

标准库中的string&#xff1a;C初阶学习第六弹——string&#xff08;1&#xff09;——标准库中的string类-CSDN博客 前言&#xff1a; 在前面我们已经学习了如何使用标准库中的string类&#xff0c;但作为一个合格的程序员&#xff0c;我们不仅要会用&#xff0c;还要知道如…