微服务 02-rabbitmq在springboot中如何使用(上篇)

news2025/1/12 23:42:31

目录

前言: 上文传送

-> 安装rabbitmq传送门:

-> rabbitmq使用出现问题解决传送门:

1. rabbitmq的六大模式: 

1.1 简单模式: (一对一)

-> 业务场景: 

1.2 工作模式: (一对多)

-> 业务场景: 

1.3 发布与订阅模式: (广播)

1.4 路由模式: 

 -> 业务场景

1.5 主题模式: (路由升级版)

-> 业务场景

1.6 RPC异步调用模式

-> 业务场景

2. RabbitMQ 的工作机制主要包括:

3. springboot整合rabbitMq准备工作

ps: 有两种写法 rabbitTemplate和AmqpTemplate

3.1 创建俩springboot工程(正常创建 不用加依赖)

3.2 添加依赖

3.3 修改application.yml

 3.4 启动测试 

 启动失败解决方案

4. 六大模式实际操作

4.0 调用模拟(定时调度任务)

-> 4.0.1 生产者启动类

->4.0.2 消费者启动类

4.1 简单模式

-> 4.1.1 生产者代码

-> 4.1.2 消费者代码

4.2 工作模式

-> 4.2.1 生产者代码

-> 4.2.2 消费者代码

4.3 消息和订阅模式(广播)

-> 4.3.0 消费者中声明fanout交换机

-> 4.3.1 消费者代码

-> 4.3.2 生产者代码

本篇总结及下篇传送门

->  下一篇: 03-rabbitmq在springboot中如何使用(下篇)


前言: 上文传送

-> 安装rabbitmq传送门:

 01-rabbitmq的应用场景及安装(docker)

-> rabbitmq使用出现问题解决传送门:

  00-rabbitmq出现的异常以及解决方案

1. rabbitmq的六大模式: 

1.1 简单模式: (一对一)

发送者向队列发送消息,接收者从队列接收消息。

-> 业务场景: 

订单创建, 邮件发送 等一切消费者不关心过程的操作

1.2 工作模式: (一对多)

多个工作者同时从同一个队列中接收消息,并处理这些消息,每个消息只被一个工作者处理。

-> 业务场景: 

 多个消费者一起抗压, 共同接收, 与一类似

1.3 发布与订阅模式: (广播)

一个发送者将消息发送到交换机,交换机将消息分发到多个队列中,每个队列都有一个接收者。

1.4 路由模式: 

与发布/订阅模式类似,不同之处在于消息通过“路由键”(Routing Key)匹配到队列中的接收者。

 -> 业务场景

用于日志收集, 不同订单处理等 能用路由模式就不要用主题模式

1.5 主题模式: (路由升级版)

使用了更加灵活的通配符匹配规则。

-> 业务场景

电商平台可以使用主题模式将商品销售信息发送到不同的队列

1.6 RPC异步调用模式

需要一个唯一的标识符,在客户端发送请求时带上该标识符,服务端在处理完请求后将标识符带上响应一起发送回客户端,这样客户端就能正确地匹配请求和响应。

-> 业务场景

例如,电商平台可以使用异步调用模式实现库存管理系统与订单系统的交互,当有订单创建时,订单系统会调用库存管理系统减少商品库存,库存管理系统处理完毕后将结果发送到回调队列,订单系统从回调队列中获取处理结果并进行相应的流程处理。

2. RabbitMQ 的工作机制主要包括:

  1. 消息生产者(Publisher):向 RabbitMQ 中发送消息的应用程序;

  2. 消息队列(Queue):用于存放消息的缓冲区,可以根据需要设置队列的属性,如队列最大长度、队列消息的过期时间等;

  3. 交换机(Exchange):用于接收生产者发送的消息,并将消息路由到相应的队列中,交换机的类型包括直接模式、主题模式、扇形模式和头部模式;

  4. 绑定(Binding):将交换机和队列通过指定的路由键(Routing Key)进行绑定,绑定关系可以是多对多的;

  5. 消息消费者(Consumer):从队列中取出消息,并进行处理的应用程序;

  6. 虚拟主机(Virtual Host):RabbitMQ 允许通过虚拟主机实现逻辑上的消息隔离,不同的虚拟主机之间互相独立,可以拥有自己独立的交换机、队列、权限等。

在整个工作机制中,生产者将消息发送到指定的交换机中,交换机根据路由键将消息分发到相应的队列中,消费者从队列中取出消息进行处理。RabbitMQ 还可以进行消息的持久化、消息的优先级设置、消息的事务处理等操作,具备较高的可靠性和可用性。


3. springboot整合rabbitMq准备工作

ps: 有两种写法 rabbitTemplate和AmqpTemplate

这俩可以理解成俩公司封装的两套功能 实现了一种效果  都可以实现 选择其一即可 

准备工作

3.1 创建俩springboot工程(正常创建 不用加依赖)

rabbitmq-provider 生产者

rabbitmq-consumer 消费者

模拟微服务俩工程 不需要注册中心

3.2 添加依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

3.3 修改application.yml

spring:
  rabbitmq:
    host: *
    username: pzy
    password: *
    virtual-host: pingzhuyan
    listener:
      simple:
        prefetch: 1 #设置每次预抓取的数量是1,处理完之前不收下一条 默认250
server:
  port: 9501
spring:
  rabbitmq:
    host: *
    username: pzy
    password: *
    virtual-host: pingzhuyan
    listener:
      simple:
        prefetch: 1 #设置每次预抓取的数量是1,处理完之前不收下一条 默认250
server:
  port: 9502

 3.4 启动测试 

 启动失败解决方案

1. 端口号冲突: 换端口

2. bean冲突: 清缓存重启

3. 删除后pom,xml变成灰色 鼠标右键 点击添加maven project即可

其他百度, rabbitmq连接不上的 看看服务起没起

4. 六大模式实际操作

4.0 调用模拟(定时调度任务)

异步线程池看其他文章 这里不做介绍

-> 4.0.1 生产者启动类

package cn.pingzhuyan.rabbitmq;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;

@EnableAsync
@EnableScheduling
@SpringBootApplication
public class RabbitProviderApp {

    public static void main(String[] args) {
        SpringApplication.run(RabbitProviderApp.class, args);
    }

}

->4.0.2 消费者启动类

package cn.pingzhuyan.rabbitmq;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class RabbitConsumerApp {

    public static void main(String[] args) {
        SpringApplication.run(RabbitConsumerApp.class, args);
    }

}

4.1 简单模式

package cn.pingzhuyan.rabbitmq.config;

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author pzy
 * @version 0.1.0
 * @description: TODO
 */
@Configuration
public class RabbitConfig {

    /**
     * 声明一个简单队列
     * @return
     */
    @Bean
    public Queue helloQueue(){
//        return new Queue("PZY",true,false,false);
        return new Queue("PZY");
    }



}

-> 4.1.1 生产者代码

package cn.pingzhuyan.rabbitmq.singleM1;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

/**
 * 简单模式生产者
 * @author pzy
 * @version 0.1.0
 * @description: TODO
 */

@Component
public class SingleM1Provider {

    @Autowired
    private AmqpTemplate amqpTemplate;

    @Scheduled(cron = "*/1 * * * * ?" )
    public void singleSend01() {
        System.out.println("生产者: <简单队列>定时(1次/s)发送 -> 今天天气真好!");

        amqpTemplate.convertAndSend("PZY", "今天天气真好!");
    }

}

-> 4.1.2 消费者代码

package cn.pingzhuyan.rabbitmq.singleM1;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @author pzy
 * @version 0.1.0
 * @description: TODO
 */
//@Component
public class RabbitConsumerM1 {


    @RabbitListener(queues = "PZY")
    public void singleReceive1(String msg) {

        System.out.printf("消费者接收到: %s\n", msg);
    }


}

4.2 工作模式

-> 4.2.1 生产者代码

package cn.pingzhuyan.rabbitmq.workM2;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

/**
 * 简单模式生产者
 * @author pzy
 * @version 0.1.0
 * @description: TODO
 */

@Component
public class WorkM2Provider {

    @Autowired
    private AmqpTemplate amqpTemplate;

    @Async
    @Scheduled(cron = "*/1 * * * * ?" )
    public void workSend01() {
        System.out.println("生产者1: <工作模式>定时(1次/s)发送 -> 今天天气真好!");

        amqpTemplate.convertAndSend("PZY", "今天天气真好!");
    }
    @Async
    @Scheduled(cron = "*/1 * * * * ?" )
    public void workSend02() {
        System.out.println("生产者2: <工作模式>定时(1次/s)发送 -> 今天天气真好!");

        amqpTemplate.convertAndSend("PZY", "今天天气真好!");
    }

}

-> 4.2.2 消费者代码

package cn.pingzhuyan.rabbitmq.workM2;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @author pzy
 * @version 0.1.0
 * @description: TODO
 */
@Component
public class RabbitConsumerM2 {


    @RabbitListener(queues = "PZY")
    public void workReceiveM2One(String msg) {

        System.out.printf("消费者1 接收到: %s\n", msg);
    }

    @RabbitListener(queues = "PZY")
    public void workReceiveM2Two(String msg) {

        System.out.printf("消费者2 接收到: %s\n", msg);
    }


}

4.3 消息和订阅模式(广播)

-> 4.3.0 消费者中声明fanout交换机

package cn.pingzhuyan.rabbitmq.config;

import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * rabbitmq的默认手动确认模式
 * @author pzy
 * @version 0.1.0
 * @description: TODO
 */
@Configuration
public class RabbitConfig {

    /**
     * 创建(声明)一个简单队列
     * @return
     */
    @Bean
    public Queue helloQueue(){
//        return new Queue("PZY",true,false,false);
        return new Queue("PZY");
    }

    /**
     * 创建radioFanout交换机
     * 消费者需要绑定此交换机
     * @return
     */
    @Bean
    public FanoutExchange radioFanout(){
        return new FanoutExchange("PZY_RADIO",false,false);
    }




}

-> 4.3.1 消费者代码

package cn.pingzhuyan.rabbitmq.faoutM3;

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 发布与订阅模式的消费者
 *
 * @author pzy
 * @version 0.1.0
 * @description: TODO
 */
@Component
public class FanoutF3Consumer {

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue,  //不写就是随机队列, false true true
            exchange = @Exchange(name = "PZY_RADIO", declare = "false")//交换机(PZY_RADIO, 不创建并使用已经存在的交换机)
    ))
    public void radioFanoutMessage1(String msg) {
        System.out.printf("消费者1接收到: %s\n", msg);
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue,
            exchange = @Exchange(name = "PZY_RADIO", declare = "false")//队列绑定交换机
    ))
    public void radioFanoutMessage2(String msg) {
        System.out.printf("消费者2接收到: %s\n", msg);
    }

}

-> 4.3.2 生产者代码

package cn.pingzhuyan.rabbitmq.fanoutM3;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

/**
 * 发布和订阅模式生产者
 *
 * @author pzy
 * @version 0.1.0
 * @description: TODO
 */

@Component
public class FanoutF3Provider {

    @Autowired
    private AmqpTemplate amqpTemplate;


    /**
     * 注意一下:
     * 下面发送的时候 一定要绑定交换机 不要绑定路由键(没有意义 全是默认)
     *  amqpTemplate.convertAndSend("PZY_RADIO", str);//错误写法
     */
    @Async
    @Scheduled(cron = "*/1 * * * * ?")
    public void fanoutSend01() {

        String str = Math.random() * 1000 + "" + Math.random() * 1000;

        System.out.println("生产者1: <发布和订阅模式>定时(1次/s)发送 -> " + str);

        amqpTemplate.convertAndSend("PZY_RADIO","", str);
    }


}

本篇总结及下篇传送门

前三种最常见的本篇已经写完了

rabbitTemplate写法与这个很相似, 实现效果完全相同

->  下一篇: 03-rabbitmq在springboot中如何使用(下篇)

 介绍后3种模式

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/722251.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

计算机系统的层次结构

计算机系统 计 算 机 系 统 { 计 算 机 软 件 { 系 统 软 件 应 用 软 件 计 算 机 硬 件 { 存 储 器 运 算 器 控 制 器 输 入 设 备 输 出 设 备 计算机系统 \begin{cases} 计算机软件\begin{cases}系统软件\\应用软件\end{cases}\\计算机硬件\begin{cases}存储器\\运算器\…

YoloV8改进---注意力机制:引入瓶颈注意力模块BAM,对标CBAM

目录 ​编辑 1.BAM介绍 2.BAM引入到yolov8 2.1 加入modules.py中&#xff1a; 2.2 加入tasks.py中&#xff1a; 2.3 yolov8_BAM.yaml 1.BAM介绍 论文&#xff1a;https://arxiv.org/pdf/1807.06514.pdf 摘要&#xff1a;提出了一种简单有效的注意力模块&#xff0c;称为瓶颈…

06、Nginx反向代理与负载均衡

反向代理&#xff1a; 这种代理方式叫做&#xff0c;隧道代理。有性能瓶颈&#xff0c;因为所有的数据都经过Nginx&#xff0c;所以Nginx服务器的性能至关重要 负载均衡&#xff1a; 把请求&#xff0c;按照一定算法规则&#xff0c;分配给多台业务服务器&#xff08;即使其中…

【论文笔记】Skill-based Meta Reinforcement Learning

【论文笔记】Skill-based Meta Reinforcement Learning 文章目录 【论文笔记】Skill-based Meta Reinforcement LearningAbstract1 INTRODUCTION2 RELATED WORKMeta-Reinforcement LearningOffline datasetsOffline Meta-RLSkill-based Learning 3 PROBLEM FORMULATION AND PRE…

CPU大小端和网络序的理解

引子 Big/Little Endian是Host CPU如何去理解在内存中的数据&#xff0c;内存中的数据是没有Big/Little Endian之分的&#xff08;内存仅仅作为存储介质&#xff09;&#xff0c;而Host CPU才有Big/Little Endian之分。 不同Endian的CPU&#xff0c;从内存读取数据的时候&#…

Linux进度条

Linux进度条 一.基本概念1.回车和换行2.缓冲区2.实现倒计时 二.进度条1.前置工作2.代码实现 一.基本概念 1.回车和换行 回车&#xff1a;指光标移到该行的起始位置&#xff08;\r&#xff09;。 换行&#xff1a;换到下一行&#xff08;\n&#xff09;。 在c语音里\n将回车和换…

spring.boot 随笔0 springFactoriesInstance入门

0. 其实也没有那么入门 明天还要上班&#xff0c;速度write&#xff0c;直接放一张多样性比较好的 spring.factories 文件(取自 spring-boot-2.3.4.RELEASE.jar) # PropertySource Loaders org.springframework.boot.env.PropertySourceLoader\ org.springframework.boot.env…

知识图谱实战应用19-基于Py2neo的英语单词关联记忆知识图谱项目

大家好,我是微学AI,今天给大家介绍一下知识图谱实战应用19-基于Py2neo的英语单词关联记忆知识图谱项目。基于Py2neo的英语单词关联记忆知识图谱项目可以帮助用户更系统地学习和记忆英语单词,通过图谱的可视化展示和智能推荐功能,提供了一种全新的、更高效的记忆方法,并促进…

AtomicInteger使用详解

AtomicInteger使用详解 1、get()&#xff1a;获取当前AtomicInteger对象的值。2、set(int newValue)&#xff1a;将AtomicInteger对象的值设置为指定的newValue。3、getAndSet(int newValue)&#xff1a;先获取当前AtomicInteger对象的值&#xff0c;然后将对象的值设置为指定的…

ROS1/2机器人课程的价值和规模

价值用价格&#xff0c;规模用销量。 免费的ROS1/2课程也很多。 2023版&#xff0c;15元&#xff0c;24人。 2022版&#xff0c;1.99元&#xff0c;21人。 价格不贵&#xff0c;人数很少&#xff0c;店家也很少。 当然&#xff0c;有朋友说&#xff0c;有免费冲击&#xff0…

宏晶微 音频处理芯片 MS7124

MS7124是一款高性能24bit数字立体声音频DAC&#xff0c;该DAC采用Sigma-Delta结构&#xff0c;支持标准的I2S数字信号输入&#xff0c;输出支持立体声和单声道。

比对Excel数据

以a个为准绳比对b表数据&#xff0c;添加比对结果列输出。 (本笔记适合初通 Python 的 coder 翻阅) 【学习的细节是欢悦的历程】 Python 官网&#xff1a;https://www.python.org/ Free&#xff1a;大咖免费“圣经”教程《 python 完全自学教程》&#xff0c;不仅仅是基础那么…

大数据Doris(五十七):RECOVER数据删除恢复

文章目录 RECOVER数据删除恢复 一、Recover语法 二、数据恢复案例 RECOVER数据删除恢复 Doris为了避免误操作造成的灾难&#xff0c;支持对误删除的数据库/表/分区进行数据恢复&#xff0c;在drop table或者 drop database之后&#xff0c;Doris不会立刻对数据进行物理删除…

C语言进阶--文件操作

目录 一.何为文件 二.文件名 三.文件的打开和关闭 3.1.流 3.2.文件指针 3.3.文件的打开与关闭 打开文件&#xff1a; 模式&#xff1a; 关闭文件&#xff1a; 四.文件的顺序读写 4.1.常见的顺序读写函数 4.2.字符的输入输出fgetc/fputc 输出函数&#xff1a; 输入…

计算机存储层次及常用存储简介

计算机存储层次&#xff08;Memory hierarchy&#xff09; 存储层次是在计算机体系结构下存储系统层次结构的排列顺序。 每一层于下一层相比 都拥有 较高的速度 和 较低延迟性 &#xff0c;以及 较小的容量 &#xff08;也有少量例外&#xff0c;如AMD早期的Duron CPU&#xf…

pod 控制器 3

简单回顾 之前我们学习过的的 docker &#xff0c;例如我们运行 docker run busybox echo "hello wrold" 他的实际内在逻辑是这个样子的 程序将指令推送给 dockerdocker 会检查本地是否有 busybox 镜像&#xff0c;若没有则去 docker hub 上面拉取镜像&#xff0c;…

windows下mysql配置文件my.ini在这个位置

windows下mysql配置文件my.ini在这个位置 选中服务邮件 右键----属性&#xff0c;弹出下图 一般默认路径&#xff1a; "D:\Program Files\MySQL\MySQL Server 8.0\bin\mysqld.exe" --defaults-file"C:\ProgramData\MySQL\MySQL Server 8.0\my.ini" MySQ…

信息安全管理与评估赛题第4套

全国职业院校技能大赛 高等职业教育组 信息安全管理与评估 赛题四 模块一 网络

simulink 常用模块

add sub pro div加减乘除 Relational Operator 数值比较模块 < < > > ! Compare To Constant 直接和某数字比较&#xff0c;是上面的封装 Logical Operator 逻辑运算 & | ~ switch 相当于c语言的if 中间是条件&#xff0c;满足走上&#xff0c;否则走下…

上门按摩小程序开发|同城预约上门小程序定制

上门按摩小程序对实体按摩商家来说是非常适合的。下面是对上门按摩小程序适合实体按摩商家开发的简单介绍&#xff1a;   扩展服务范围&#xff1a;上门按摩小程序可以让实体按摩商家将服务范围扩展到用户的家中或办公场所。用户可以通过小程序预约上门按摩服务&#xff0c;无…