工作中常用的RabbitMQ实践

news2025/2/23 21:22:15

目录

1.前置

2.导入依赖

3.生产者

4.消费者

5.验证

验证Direct

验证Fanout

验证Topic


1.前置

安装了rabbitmq,并成功启动

2.导入依赖

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

3.生产者

生产端项目结构:
 

逻辑:生产者只对交换机进行生产,至于队列绑定等放在消费端进行执行

BusinessConfig

定义了三个不同类型的交换机

direct类型:(当生产者往该交换机发送消息时,他必须指定固定的routingkey,当routingkey值为空,他也会匹配routingkey为空的队列)

fanout类型:(当生产者往该交换机发送消息时,他所绑定的队列都会收到消息,routingkey即使写了也会忽略,一般为空字符串)

Topic类型:(当生产者往该交换机发送消息时,他并不像direct指定固定的routingkey,可以进行模糊匹配,当该routingkey为空时,他会匹配routingkey为空的队列)

package com.zsp.quartz.queue;

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Configuration;


/**
 * @Author: ZhangSP
 * @Date: 2023/12/7  14:05
 */
public class BusinessConfig {
    // 声明direct交换机
    public static final String EXCHANGE_DIRECT= "exchange_direct_inform";

    // 声明fanout交换机
    public static final String EXCHANGE_FANOUT= "exchange_fanout_inform";

    // 声明topic交换机
    public static final String EXCHANGE_TOPIC= "exchange_topic_inform";
}

TestProducer

生产消息

package com.zsp.quartz.queue;

import com.alibaba.fastjson.JSON;
import com.zsp.quartz.entity.User;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@SpringBootTest
@RunWith(SpringRunner.class)
public class TestProducer {
    @Autowired
    RabbitTemplate rabbitTemplate;

    @Test
    public void Producer_topics_springbootTest() {

        //使用rabbitTemplate发送消息
        String message = "";
        User user = new User();
        user.setName("张三");
        user.setEmail("anjduahsd");
        message = JSON.toJSONString(user);

        // direct
        rabbitTemplate.convertAndSend(BusinessConfig.EXCHANGE_DIRECT,"",message);

        // fanout
        rabbitTemplate.convertAndSend(BusinessConfig.EXCHANGE_FANOUT,"",message);

        // topic
        rabbitTemplate.convertAndSend(BusinessConfig.EXCHANGE_TOPIC,"",message);
    }
}

4.消费者

消费者目录结构:

BusinessConfig:定义交换机类型,配置交换机与队列的绑定关系,通过容器工厂声明队列

package com.zsp.consumer.queue;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.Connection;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;

/**
 * @Author: ZhangSP
 * @Date: 2023/12/7  14:05
 */
@Slf4j
@Configuration
public class BusinessConfig {
    // 声明direct
    public static final String EXCHANGE_DIRECT= "exchange_direct_inform";
    public static final String QUEUE_DIRECT_EMAIL = "queue_direct_inform_email";
    public static final String QUEUE_DIRECT_SMS = "queue_direct_inform_sms";
    public void BindDirectEmail(Channel channel) {
        try {
            channel.exchangeDeclare(EXCHANGE_DIRECT, BuiltinExchangeType.DIRECT.getType(), true);
            channel.queueDeclare(QUEUE_DIRECT_EMAIL, true, false, false, null);
            channel.queueBind(QUEUE_DIRECT_EMAIL, EXCHANGE_DIRECT, "");
        } catch (Exception e) {
            log.error("声明Direct->email队列时失败", e);
        }
    }
    public void BindDirectSms(Channel channel) {
        try {
            channel.exchangeDeclare(EXCHANGE_DIRECT, BuiltinExchangeType.DIRECT.getType(), true);
            channel.queueDeclare(QUEUE_DIRECT_SMS, true, false, false, null);
            channel.queueBind(QUEUE_DIRECT_SMS, EXCHANGE_DIRECT, "123");
        } catch (Exception e) {
            log.error("声明Direct->sms失败", e);
        }
    }
    // 声明fanout
    public static final String EXCHANGE_FANOUT= "exchange_fanout_inform";
    public static final String QUEUE_FANOUT_EMAIL = "queue_fanout_inform_email";
    public static final String QUEUE_FANOUT_SMS = "queue_fanout_inform_sms";
    public void BindFanoutEmail(Channel channel) {
        try {
            channel.exchangeDeclare(EXCHANGE_FANOUT, BuiltinExchangeType.FANOUT.getType(), true);
            channel.queueDeclare(QUEUE_FANOUT_EMAIL, true, false, false, null);
            channel.queueBind(QUEUE_FANOUT_EMAIL, EXCHANGE_FANOUT, "");
        } catch (Exception e) {
            log.error("声明Fanout->email队列时失败", e);
        }
    }
    public void BindFanoutSms(Channel channel) {
        try {
            channel.exchangeDeclare(EXCHANGE_FANOUT, BuiltinExchangeType.FANOUT.getType(), true);
            channel.queueDeclare(QUEUE_FANOUT_SMS, true, false, false, null);
            channel.queueBind(QUEUE_FANOUT_SMS, EXCHANGE_FANOUT,"");
        } catch (Exception e) {
            log.error("声明Fanout->sms失败", e);
        }
    }

    // 声明topic
    public static final String EXCHANGE_TOPIC= "exchange_topic_inform";
    public static final String QUEUE_TOPIC_EMAIL = "queue_topic_inform_email";
    public static final String QUEUE_TOPIC_SMS = "queue_topic_inform_sms";
    public static final String ROUTINGKEY_EMAIL="inform.#.email.#";
    public static final String ROUTINGKEY_SMS="inform.#.sms.#";

    public void BindTopicEmail(Channel channel) {
        try {
            channel.exchangeDeclare(EXCHANGE_TOPIC, BuiltinExchangeType.TOPIC.getType(),true);
            channel.queueDeclare(QUEUE_TOPIC_EMAIL, true, false, false, null);
            channel.queueBind(QUEUE_TOPIC_EMAIL, EXCHANGE_TOPIC, ROUTINGKEY_EMAIL);
        } catch (Exception e) {
            log.error("声明Topic->email队列时失败", e);
        }
    }
    public void BindTopicSms(Channel channel) {
        try {
            channel.exchangeDeclare(EXCHANGE_TOPIC, BuiltinExchangeType.TOPIC.getType(),true);
            channel.queueDeclare(QUEUE_TOPIC_SMS, true, false, false, null);
            channel.queueBind(QUEUE_TOPIC_SMS, EXCHANGE_TOPIC,"");
        } catch (Exception e) {
            log.error("声明Topic->sms失败", e);
        }
    }




    // 声明队列
    @Autowired
    @Qualifier(value = "zspConnectionFactory")
    private ConnectionFactory connectionFactory;
    @PostConstruct
    public void shengmingQueue() {
        try {
            Connection connection = connectionFactory.createConnection();
            Channel channel = connection.createChannel(false);
            BindDirectEmail(channel);
            BindDirectSms(channel);
            BindFanoutEmail(channel);
            BindFanoutSms(channel);
            BindTopicEmail(channel);
            BindTopicSms(channel);
        } catch (Exception e) {
            log.error("业务实例声明绑定队列报错:",e);
        }
    }
}

RabbitFactory:自定义容器工厂

package com.zsp.consumer.queue;

import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableRabbit
public class RabbitFactory {

    @Bean("zspConnectionFactory")
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        // 设置RabbitMQ的连接信息,如主机名、端口号、用户名和密码等
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("root");
        connectionFactory.setPassword("root");
        return connectionFactory;
    }

    @Bean("rabbitListenerContainerFactory")
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(@Qualifier("zspConnectionFactory") ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setConcurrentConsumers(5);
        factory.setMaxConcurrentConsumers(10);
        return factory;
    }
}

ReceiveHandler:队列监听

package com.zsp.consumer.queue;

import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class ReceiveHandler {
    //监听自定义的Direct队列
    @RabbitListener(queues = BusinessConfig.QUEUE_DIRECT_SMS, containerFactory = "rabbitListenerContainerFactory")
    public void directSMS(String msg, Message message, Channel channel) {
        JSONObject jsonObject = JSONObject.parseObject(msg);
        System.out.println("Direct队列->sms队列" + jsonObject);
    }

    @RabbitListener(queues = BusinessConfig.QUEUE_DIRECT_EMAIL, containerFactory = "rabbitListenerContainerFactory")
    public void directEmail(String msg, Message message, Channel channel) {
        JSONObject jsonObject = JSONObject.parseObject(msg);
        System.out.println("Direct队列->email队列" + jsonObject);
    }

    //监听自定义的Fanout队列
    @RabbitListener(queues = BusinessConfig.QUEUE_FANOUT_SMS, containerFactory = "rabbitListenerContainerFactory")
    public void FanoutSMS(String msg, Message message, Channel channel) {
        JSONObject jsonObject = JSONObject.parseObject(msg);
        System.out.println("Fanout队列->sms队列" + jsonObject);
    }

    @RabbitListener(queues = BusinessConfig.QUEUE_FANOUT_EMAIL, containerFactory = "rabbitListenerContainerFactory")
    public void FanoutEmail(String msg, Message message, Channel channel) {
        JSONObject jsonObject = JSONObject.parseObject(msg);
        System.out.println("Fanout队列->email队列" + jsonObject);
    }

    //监听自定义的Topic队列
    @RabbitListener(queues = BusinessConfig.QUEUE_TOPIC_SMS, containerFactory = "rabbitListenerContainerFactory")
    public void TopicSMS(String msg, Message message, Channel channel) {
        JSONObject jsonObject = JSONObject.parseObject(msg);
        System.out.println("Topic队列->sms队列" + jsonObject);
    }

    @RabbitListener(queues = BusinessConfig.QUEUE_TOPIC_EMAIL, containerFactory = "rabbitListenerContainerFactory")
    public void TopicEmail(String msg, Message message, Channel channel) {
        JSONObject jsonObject = JSONObject.parseObject(msg);
        System.out.println("Topic队列->email队列" + jsonObject);
    }
}

5.验证

先启动消费者端,然后执行TestProducer

验证Direct

1.向routingkey为空的队列发消息

我们在消费者端配置了routingkey为空的队列,叫做 QUEUE_DIRECT_EMAIL

因此会打印出下面这条记录

2.向routingkey为123的队列发消息

我们在消费者端配置了routingkey为123的队列,叫做 QUEUE_DIRECT_SMS

因此会打出下面这条记录

验证Fanout

谁跟我绑定了,我都发

验证Topic

模糊匹配routingkey

匹配sms队列

会把下面这个打印出来

需要注意的是如果我们没有自定义容器工厂的话,这个containerFactory可以不写
简单理解就是实例,也就是rabbitmq服务地址是在哪里,实例包括了域名、端口、账号、密码等。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1293811.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Vulnerability: File Upload(low)--MYSQL注入

选择难度&#xff1a; 1.打开DVWA&#xff0c;并登录账户 2.选择模式&#xff0c;这里我们选择 文件上载的最低级模式&#xff08;low&#xff09; 在vsc里面写个一句话木马 这里我们注意&#xff0c;因为这个是木马很容易被查杀&#xff0c;从而无法使用&#xff0c;所以我们…

Docker安装postgres最新版

1. postgres数据库 PostgreSQL是一种开源的关系型数据库管理系统&#xff08;RDBMS&#xff09;&#xff0c;它是一种高度可扩展的、可靠的、功能丰富的数据库系统。以下是关于PostgreSQL的一些介绍&#xff1a; 开源性&#xff1a;PostgreSQL是一个开源项目&#xff0c;可以…

这个sql有点东西,记录一下

我有一个需求&#xff1a;在订单表里面查询指定时间的订单数据&#xff0c;如果要是没有订单的话&#xff0c;需要展示当天日期和数据&#xff0c;数据为0 先看一下效果&#xff1a; 话不多说&#xff0c;直接上SQL SELECTdate_range.date AS 日期,COUNT( oco.id ) AS 总订单…

返回列表中满足指定条件的连续元素:只返回第一个不符合条件元素之前的各元素itertools.takewhile()

【小白从小学Python、C、Java】 【计算机等考500强证书考研】 【Python-数据分析】 返回列表中满足指定条件的连续元素&#xff1a; 只返回第一个不符合条件元素之前的各元素 itertools.takewhile() [太阳]选择题 请问以下代码输出的结果是&#xff1f; import itertools a …

spark sql基于RBO的优化

前言 这里只对RBO优化进行简单的讲解。讲解RBO之前必须对spark sql的执行计划做一个简单的介绍。 这个里讲解的不是很清楚&#xff0c;需要结合具体的执行计划来进行查看 1、执行计划 在spark sql的执行计划中&#xff0c;执行计划分为两大类&#xff0c;即逻辑执行计划、物…

zxjy003- Spring Cloud后端工程搭建

1、创建 sprigboot 工程 guli-parent groupId &#xff1a; com.atguigu artifactId &#xff1a; guli-parent 2.删除src目录 3.配置pom.xml 修改版本为 &#xff1a;2.2.1.RELEASE<artifactId> 节点后面添加 pom类型 全部依赖&#xff0c;复制下面的即可&#xff0c…

Python 从入门到精通 学习笔记 Day02

Python 从入门到精通 第二天 今日目标 字符串基本操作、字符串序列操作、输入输出函数 字符串内置方法、运算符、练习之前学习的内容 一、字符串基本操作 在Python中&#xff0c;字符串的转义是指在字符串中使用特殊的字符序列来表示一些特殊字符。 在Python中&#xff0c;字…

小红书蒲公英平台开通后,有哪些注意的地方,以及如何进行报价?

今天来给大家聊聊当小红书账号过1000粉后&#xff0c;开通蒲公英需要注意的事项。 蒲公英平台是小红书APP中的一个专为内容创作者设计的平台。它为品牌和创作者提供了一个完整的服务流程&#xff0c;包括内容的创作、推广、互动以及转换等多个方面。 2.蒲公英平台的主要功能 &…

MySQL8.0新特性:函数索引,使用函数也不会导致索引失效~

文章目录 写在前面使用函数索引1、数据准备2、索引验证&#xff08;1&#xff09;普通索引&#xff08;2&#xff09;函数索引 总结 写在前面 之前我们知道&#xff0c;如果在查询中加入了函数&#xff0c;索引不生效&#xff0c;所以MySQL 8引入了函数索引&#xff0c;MySQL …

Uview------使用教程

一、点击一下链接安装&#xff1a; https://ext.dcloud.net.cn/plugin?id1593 如果使用HBuilderX编辑器的可以直接点击第一种方式自动安装即可 二&#xff1a;配置文件 在main.js中写入 记得要写在import Vue from vue下面 import uView from ./uni_modules/uview-ui Vue…

【EI征稿中|ACM出版】2023 人工智能、系统与网络安全国际学术会议 (AISNS 2023)

2023 人工智能、系统与网络安全国际学术会议 (AISNS 2023&#xff09; 2023 International Conference on Artificial Intelligence, Systems and Network Security 由西南科技大学计算机科学与技术学院主办的2023人工智能、系统与网络安全国际学术会议 (AISNS 2023&#xff…

windows MYSQL下载和自定路径安装,以及解决中文乱码问题。

文章讲的很详细&#xff0c;请耐心往下看。 一、mysql下载 下载网址&#xff1a;https://www.mysql.com/downloads/ 表示不登录&#xff0c;直接下载。 以上就把安装包下载完了。下载是8.0.35版本。 二、接下来看怎么安装 1.双击安装包&#xff0c;进行安装。 注意&#x…

MX6ULL学习笔记 (八) platform 设备驱动实验

前言&#xff1a; 什么是 Linux 下的 platform 设备驱动 Linux下的字符设备驱动一般都比较简单&#xff0c;只是对IO进行简单的读写操作。但是I2C、SPI、LCD、USB等外设的驱动就比较复杂了&#xff0c;需要考虑到驱动的可重用性&#xff0c;以避免内核中存在大量重复代码&…

MATLAB - 绘制立体图(平面+水深)

目录 代码结果 代码 % 在 X-Y 平面上绘图 % 正常绘制平面图 [X,Y,Z] peaks; contour(X,Y,Z,20); hold on% ****重点******************************************** % 改为三维视图&#xff0c;具体可以help % view(3); %此时的平面图对应z0 &#xff1b;默认az-37.5&#x…

Sql Server关于表的建立、修改、删除

表的创建&#xff1a; &#xff08;1&#xff09;在“对象资源管理器”面板中展开“数据库”节点&#xff0c;可以看到自己创建的数据库&#xff0c;比如Product。展开Product节点&#xff0c;右击“表”节点&#xff0c;在弹出的快捷菜单中选择“新建表”项&#xff0c;进入“…

[FPGA 学习记录] 快速开发的法宝——IP核

快速开发的法宝——IP核 文章目录 1 IP 核是什么2 为什么要使用 IP 核3 IP 核的存在形式4 IP 核的缺点5 Quartus II 软件下 IP 核的调用6 Altera IP 核的分类 在本小节当中&#xff0c;我们来学习一下 IP 核的相关知识。 IP 核在 FPGA 开发当中应用十分广泛&#xff0c;它被称为…

FastAPI查询参数和字符串校验

在FastAPI中&#xff0c;你可以为参数声明额外的信息和校验。这对于查询参数来说尤其有用&#xff0c;因为它们通常用于过滤或排序结果。本教程将引导你如何使用Query对象来添加这些额外的校验。 导入所需库 首先&#xff0c;你需要导入FastAPI以及Query对象&#xff1a; fr…

【已解决】ImportError: cannot import name ‘Merge‘ from ‘keras.layers‘

问题描述 ImportError: cannot import name ‘Merge‘ from ‘keras.layers‘ 解决办法 1、tensorflow和keras版本要对应&#xff1b; 2、使用"merge" pip uninstall keras pip install keras2.3.1 from keras.layers import merge完结撒花 我这血中带泪的成长&…

从传统到胜利:广汽集团汽车产业创新之旅

置身于汽车行业百年未有之大变局&#xff0c;作为传统车企中的排头兵&#xff0c;广汽创新可圈可点&#xff0c;广汽近年来取得了骄人业绩&#xff0c;不论是整体产销规模&#xff0c;还是新能源汽车产业化、新技术领域开拓等&#xff0c;都呈现节节攀升的局面。本文奖从产业变…

2024年值得关注的8个未来数据库

2024年值得关注的8个未来数据库 关系型数据库管理系统在数据库技术领域占据主导地位已经多年了。当SQL在1970年代首次出现时&#xff0c;关系型数据库管理系统的使用和受欢迎程度迅速提升。很快&#xff0c;MySQL成为了大多数公司和团队首选的数据库。 然而&#xff0c;2023年…