集成RabbitMQ+MQ常用操作

news2024/12/26 5:10:40

文章目录

    • 1.环境搭建
        • 1.Docker安装RabbitMQ
          • 1.拉取镜像
          • 2.安装命令
          • 3.开启5672和15672端口
          • 4.登录控制台
        • 2.整合Spring AMQP
          • 1.sun-common模块下创建新模块
          • 2.引入amqp依赖和fastjson
        • 3.新建一个mq-demo的模块
          • 1.在sun-frame下创建mq-demo
          • 2.然后在mq-demo下创建生产者和消费者子模块
          • 3.查看是否交给父模块管理了
          • 4.在mq-demo模块引入sun-common-rabbitmq依赖
          • 5.publisher引入sun-common-test依赖
          • 6.将sun-common-rabbitmq clean-install一下
          • 7.给consumer和publisher都创建主类
            • 1.ConsumerApplication.java
            • 2.PublisherApplication.java
        • 4.测试MQ
          • 1.application.yml mq的最基本配置
          • 2.consumer
            • 1.TestConfig.java MQ配置
            • 2.TestConfigListener.java 监听队列
          • 3.publisher
            • 1.TestConfig.java 测试(注意指定启动类)
            • 2.结果
    • 2.基本交换机
        • 1.Fanout
          • 1.FanoutConfig.java 交换机配置
          • 2.FanoutConfigListener.java 监听者
          • 3.FanoutConfig.java 生产者
        • 2.Direct
          • 1.DirectConfig.java 交换机配置
          • 2.DirectConfigListener.java 监听者
          • 3.DirectConfig.java 生产者

1.环境搭建

1.Docker安装RabbitMQ
1.拉取镜像
docker pull rabbitmq:3.8-management
2.安装命令
docker run -e RABBITMQ_DEFAULT_USER=sun \
           -e RABBITMQ_DEFAULT_PASS=mq \
           -v mq-plugins:/plugins \
           --name mq \
           --hostname mq \
           -p 15672:15672 \
           -p 5672:5672 \
           -d 699038cb2b96 # 注意这里是镜像id,需要替换
3.开启5672和15672端口
4.登录控制台

15672端口

2.整合Spring AMQP
1.sun-common模块下创建新模块

CleanShot 2024-08-02 at 13.55.01@2x

2.引入amqp依赖和fastjson
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <!-- 继承父模块的版本和通用依赖 -->
    <parent>
        <groupId>com.sunxiansheng</groupId>
        <artifactId>sun-common</artifactId>
        <version>1.0-SNAPSHOT</version>
    </parent>

    <artifactId>sun-common-rabbitmq</artifactId>
    <!-- 子模块的version,如果不写就默认跟父模块的一样 -->
    <version>${children.version}</version>

    <!-- 自定义依赖,无需版本号 -->
    <dependencies>
        <!--AMQP依赖,包含RabbitMQ-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <!-- 用于传递消息时的序列化操作 -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
        </dependency>
    </dependencies>

</project>
3.新建一个mq-demo的模块
1.在sun-frame下创建mq-demo

CleanShot 2024-08-02 at 14.10.42@2x

2.然后在mq-demo下创建生产者和消费者子模块

CleanShot 2024-08-02 at 14.16.31@2x

CleanShot 2024-08-02 at 14.16.48@2x

3.查看是否交给父模块管理了

CleanShot 2024-08-02 at 14.18.56@2x

4.在mq-demo模块引入sun-common-rabbitmq依赖
    <dependencies>
        <!-- 引入sun-common-rabbitmq -->
        <dependency>
            <groupId>com.sunxiansheng</groupId>
            <artifactId>sun-common-rabbitmq</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>
    </dependencies>
5.publisher引入sun-common-test依赖
    <dependencies>
        <!-- sun-common-test -->
        <dependency>
            <groupId>com.sunxiansheng</groupId>
            <artifactId>sun-common-test</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>
    </dependencies>
6.将sun-common-rabbitmq clean-install一下
7.给consumer和publisher都创建主类
1.ConsumerApplication.java
package com.sunxiansheng.consumer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;

@SpringBootApplication
@ComponentScan("com.sunxiansheng")
public class ConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }
}

2.PublisherApplication.java
package com.sunxiansheng.publisher;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class PublisherApplication {
    public static void main(String[] args) {
        SpringApplication.run(PublisherApplication.class);
    }
}

4.测试MQ
1.application.yml mq的最基本配置
spring:
  # RabbitMQ 配置
  rabbitmq:
    # 服务器地址
    host: ip
    # 用户名
    username: sunxiansheng
    # 密码
    password: rabbitmq
    # 虚拟主机
    virtual-host: /
    # 端口
    port: 5672
2.consumer
1.TestConfig.java MQ配置
package com.sunxiansheng.consumer.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * Description: 最基本的MQ测试
 * @Author sun
 * @Create 2024/8/2 14:34
 * @Version 1.0
 */
@Configuration
public class TestConfig {

    /**
     * 创建一个fanout类型的交换机
     * @return
     */
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanout.exchange.test");
    }

    /**
     * 创建一个队列
     * @return
     */
    @Bean
    public Queue fanoutQueueTest() {
        return new Queue("fanout.queue.test");
    }

    /**
     * 交换机和队列绑定
     */
    @Bean
    public Binding binding() {
        return BindingBuilder.bind(fanoutQueueTest()).to(fanoutExchange());
    }

}
2.TestConfigListener.java 监听队列
package com.sunxiansheng.consumer.listener;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * Description: 最基本的MQ测试
 * @Author sun
 * @Create 2024/8/2 14:34
 * @Version 1.0
 */
@Component
public class TestConfigListener {

    @RabbitListener(queues = "fanout.queue.test")
    public void receive(String message) {
        System.out.println("接收到的消息:" + message);
    }

}
3.publisher
1.TestConfig.java 测试(注意指定启动类)
package com.sunxiansheng.consumer.config;

import com.sunxiansheng.publisher.PublisherApplication;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.boot.test.context.SpringBootTest;

import javax.annotation.Resource;

/**
 * Description: 最基本的MQ测试
 * @Author sun
 * @Create 2024/8/2 14:34
 * @Version 1.0
 */
@SpringBootTest(classes = PublisherApplication.class) // 指定启动类
public class TestConfig {

    @Resource
    private AmqpTemplate amqpTemplate;

    @Test
    public void send() {
        // 交换机
        String exchange = "fanout.exchange.test";
        // 路由键
        String routingKey = "";
        // 消息
        String message = "hello fanout";
        // 发送消息
        amqpTemplate.convertAndSend(exchange, routingKey, message);
    }

}
2.结果

CleanShot 2024-08-02 at 14.46.49@2x

2.基本交换机

1.Fanout
1.FanoutConfig.java 交换机配置
package com.sunxiansheng.consumer.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * Description: Fanout交换机
 * @Author sun
 * @Create 2024/7/29 15:06
 * @Version 1.0
 */
@Configuration
public class FanoutConfig {

    @Bean
    public FanoutExchange fanoutExchange1() {
        // 创建一个fanout类型的交换机
        return new FanoutExchange("fanout.exchange");
    }

    @Bean
    public Queue fanoutQueue1() {
        // 创建一个队列
        return new Queue("fanout.queue1");
    }

    @Bean
    public Queue fanoutQueue2() {
        // 创建一个队列
        return new Queue("fanout.queue2");
    }

    // 两个队列绑定到交换机上
    @Bean
    public Binding bindingFanoutQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange1) {
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange1);
    }

    @Bean
    public Binding bindingFanoutQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange1) {
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange1);
    }
}
2.FanoutConfigListener.java 监听者
package com.sunxiansheng.consumer.listener;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * Description: Fanout交换机
 * @Author sun
 * @Create 2024/7/29 15:06
 * @Version 1.0
 */
@Component
public class FanoutConfigListener {

    @RabbitListener(queues = "fanout.queue1")
    public void receive1(String message) {
        System.out.println("fanout.queue1接收到的消息:" + message);
    }

    @RabbitListener(queues = "fanout.queue2")
    public void receive2(String message) {
        System.out.println("fanout.queue2接收到的消息:" + message);
    }

}
3.FanoutConfig.java 生产者
package com.sunxiansheng.consumer.config;

import com.sunxiansheng.publisher.PublisherApplication;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.boot.test.context.SpringBootTest;

import javax.annotation.Resource;

/**
 * Description: Fanout交换机
 * @Author sun
 * @Create 2024/7/29 15:06
 * @Version 1.0
 */
@SpringBootTest(classes = PublisherApplication.class) // 指定启动类
public class FanoutConfig {

    @Resource
    private AmqpTemplate amqpTemplate;

    @Test
    public void send() {
        // 交换机
        String exchange = "fanout.exchange";
        // 路由键
        String routingKey = "";
        // 消息
        String message = "hello fanout";
        // 发送消息
        amqpTemplate.convertAndSend(exchange, routingKey, message);
    }

}
2.Direct
1.DirectConfig.java 交换机配置
package com.sunxiansheng.consumer.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * Description: Direct交换机
 * @Author sun
 * @Create 2024/7/29 15:06
 * @Version 1.0
 */
@Configuration
public class DirectConfig {

    @Bean
    public DirectExchange directExchange() {
        // 创建一个direct类型的交换机
        return new DirectExchange("direct.exchange");
    }

    @Bean
    public Queue directQueue1() {
        // 创建一个队列
        return new Queue("direct.queue1");
    }

    @Bean
    public Queue directQueue2() {
        // 创建一个队列
        return new Queue("direct.queue2");
    }

    // 两个队列绑定到交换机上,这里需要指定routingKey
    @Bean
    public Binding bindingDirectQueue1(Queue directQueue1, DirectExchange directExchange) {
        return BindingBuilder.bind(directQueue1).to(directExchange).with("black");
    }

    @Bean
    public Binding bindingDirectQueue2(Queue directQueue2, DirectExchange directExchange) {
        return BindingBuilder.bind(directQueue2).to(directExchange).with("green");
    }

}
2.DirectConfigListener.java 监听者
package com.sunxiansheng.consumer.listener;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * Description: Direct交换机
 * @Author sun
 * @Create 2024/7/29 15:06
 * @Version 1.0
 */
@Component
public class DirectConfigListener {

    @RabbitListener(queues = "direct.queue1")
    public void receive1(String message) {
        System.out.println("direct.queue1接收到的消息:" + message);
    }

    @RabbitListener(queues = "direct.queue2")
    public void receive2(String message) {
        System.out.println("direct.queue2接收到的消息:" + message);
    }

}
3.DirectConfig.java 生产者
package com.sunxiansheng.consumer.config;

import com.sunxiansheng.publisher.PublisherApplication;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.boot.test.context.SpringBootTest;

import javax.annotation.Resource;

/**
 * Description: Direct交换机
 * @Author sun
 * @Create 2024/7/29 15:06
 * @Version 1.0
 */
@SpringBootTest(classes = PublisherApplication.class) // 指定启动类
public class DirectConfig {

    @Resource
    private AmqpTemplate amqpTemplate;

    @Test
    public void send() {
        // 交换机
        String exchange = "direct.exchange";
        // 路由键
        String routingKey = "black";
        // 消息
        String message = "hello direct";
        // 发送消息
        amqpTemplate.convertAndSend(exchange, routingKey, message);
    }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2265641.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

sentinel笔记10- 限流规则持久化(下)

上一篇整理过单向的持久化&#xff0c;sentinel笔记9- 限流规则持久化&#xff08;上&#xff09;-CSDN博客 本篇进行sentinel 改造&#xff0c;实现双向同步。 1 下载Sentinel源码 https://github.com/alibaba/Sentinel 2 dashboard 改造 2.1修改dashboard项目的pom.xml &…

微服务篇-深入了解 XXL-JOB 分布式任务调度的具体使用(XXL-JOB 的工作流程、框架搭建)

&#x1f525;博客主页&#xff1a; 【小扳_-CSDN博客】 ❤感谢大家点赞&#x1f44d;收藏⭐评论✍ 文章目录 1.0 XXL-JOB 调度中心概述 1.2 XXL-JOB 工作流程 1.3 Cron 表达式调度 2.0 XXL-JOB 框架搭建 2.1 XXL-JOB 调度中心的搭建 2.2 XXL-JOB 执行器的搭建 2.3 使用调度中心…

【jenkins插件】

1) 2) 3) 4) 5) 6) 参考: 知识库/运维/Jenkins/01-安装/13-插件.md zfoo/java-developer-document - 码云 - 开源中国

孔雀鱼和斑马鱼能一起养吗?

在观赏鱼的世界里&#xff0c;孔雀鱼和斑马鱼都是备受鱼友喜爱的热门品种。它们独特的外形和相对容易的饲养条件&#xff0c;使得不少养鱼新手跃跃欲试将它们混养在一起&#xff0c;但这其中实则有诸多因素需要考量。 从生存环境来看&#xff0c;孔雀鱼和斑马鱼有一定的兼容性…

踏踏实实练SQLday1

踏踏实实练SQLday1 1连续登录1.1查询连续登录3天以上的用户第一步去重第二步-开窗rownumber&#xff0c;用date减一下&#xff0c;对结果进行分组 -- over()开窗函数知识图谱第三步 1.2查询连续登录最大天数用户1.3某个用户连续登录天数注意先where一下这个用户的数据过滤出来.…

UM-Net:基于不确定性建模的息肉分割方法,对ICGNet的重新思考|文献速递-生成式模型与transformer在医学影像中的应用

Title 题目 UM-Net: Rethinking ICGNet for polyp segmentation with uncertainty modeling UM-Net&#xff1a;基于不确定性建模的息肉分割方法&#xff0c;对ICGNet的重新思考 01 文献速递介绍 结直肠癌&#xff08;CRC&#xff09;是男性中第三大最常见的恶性肿瘤&…

C语言项目 天天酷跑(上篇)

前言 这里讲述这个天天酷跑是怎么实现的&#xff0c;我会在天天酷跑的下篇添加源代码&#xff0c;这里会讲述天天酷跑这个项目是如何实现的每一个思路&#xff0c;都是作者自己学习于别人的代码而创作的项目和思路&#xff0c;这个代码和网上有些许不一样&#xff0c;因为掺杂了…

协众OA checkLoginQrCode接口 SQL注入漏洞

FOFA app"协众软件-协众OA" 漏洞复现 nuclei运行结果

如何用gpt来分析链接里面的内容(比如分析论文链接)和分析包含多个文件中的一块代码

如何用gpt来分析链接里面的内容&#xff0c;方法如下 这里使用gpt4里面有一个网路的功能 点击搜索框下面这个地球的形状即可启动搜索网页模式 然后即可提出问题在搜索框里&#xff1a;发现正确识别和分析了链接里面的内容 链接如下&#xff1a;https://arxiv.org/pdf/2009.1…

jdk各个版本介绍

JDK&#xff08;Java Development Kit&#xff09;是Java开发者用于构建、测试和部署Java应用程序的工具包。随着Java语言的不断演进&#xff0c;JDK也经历了多个版本的更新。下面是对JDK各个主要版本的简要介绍&#xff1a; JDK 1.0 - 1.4&#xff08;经典时代&#xff09; •…

OpenCV(python)从入门到精通——运算操作

加法减法操作 import cv2 as cv import numpy as npx np.uint8([250]) y np.uint8([10])x_1 np.uint8([10]) y_1 np.uint8([20])# 加法,相加最大只能为255 print(cv.add(x,y))# 减法&#xff0c;相互减最小值只能为0 print(cv.subtract(x_1,y_1))图像加法 import cv2 as…

大湾区经济网报道 | 第三届湾商大会暨湾区未来产业发展论坛隆重举行

大湾区经济网12月25日电&#xff08;首席记者 余芳&#xff09;&#xff0c;在中国式现代化进程与世界新机遇交汇的大背景下&#xff0c;要精准定位并奋力攀登未来科技与产业发展的高峰&#xff0c;加速推进新一代信息技术、人工智能、量子科技、生物科技、新能源以及新材料等领…

CV-OCR经典论文解读|An Empirical Study of Scaling Law for OCR/OCR 缩放定律的实证研究

论文标题 An Empirical Study of Scaling Law for OCR OCR 缩放定律的实证研究 论文链接&#xff1a; An Empirical Study of Scaling Law for OCR论文下载 论文作者 Miao Rang, Zhenni Bi, Chuanjian Liu, Yunhe Wang, Kai Han 内容简介 本论文在光学字符识别&#xf…

ES已死,文本检索永生

长期以来&#xff0c;混合查询&#xff08;Hybrid Search&#xff09;一直是提升 RAG&#xff08;Retrieval-Augmented Generation&#xff09;搜索质量的重要手段。尽管基于密集向量&#xff08;Dense Embedding&#xff09;的搜索技术随着模型规模和预训练数据集的不断扩展&a…

K线单边突破指标(附带源码)

编写需求&#xff1a; 今天我们来根据粉丝要求进行源码复现&#xff1a; 【请根据最近两根K线判断当下的行情做多&#xff0c;做空方向。用三个价格判断当前K线状态&#xff0c;最高价、最低价、收盘价都大于昨日对应价格&#xff0c;为上涨K线。用三个价格判断当前K线状态&a…

基于Springboot的在线问卷调查系统【附源码】

基于Springboot的在线问卷调查系统 效果如下&#xff1a; 系统主页面 问卷列表页面 个人中心页面 系统登陆页面 管理员主页面 问卷管理页面 研究背景 随着互联网技术的飞速发展&#xff0c;传统的问卷调查方式因其时间和地点的限制&#xff0c;难以高效地收集到足够的数据。…

SpringBoot状态机

Spring Boot 状态机&#xff08;State Machine&#xff09;是 Spring Framework 提供的一种用于实现复杂业务逻辑的状态管理工具。它基于有限状态机&#xff08;Finite State Machine, FSM&#xff09;的概念&#xff0c;允许开发者定义一组状态、事件以及它们之间的转换规则。…

Redis基础知识分享(含5种数据类型介绍+增删改查操作)

一、redis基本介绍 1.redis的启动 服务端启动 pythonubuntu:~$ redis-server客户端启动 pythonubuntu:~$ redis-cli <127.0.0.1:6379> exit pythonubuntu:~$ redis-cli --raw //(支持中文的启动方式) <127.0.0.1:6379> exit2.redis基本操作 ping发送给服务器…

Pytorch注意力机制应用到具体网络方法(闭眼都会版)

文章目录 以YoloV4-tiny为例要加入的注意力机制代码模型中插入注意力机制 以YoloV4-tiny为例 解释一下各个部分&#xff1a; 最左边这部分为主干提取网络&#xff0c;功能为特征提取中间这边部分为FPN&#xff0c;功能是加强特征提取最后一部分为yolo head&#xff0c;功能为获…

交通控制系统中的 Prompt工程:引导LLMs实现高效交叉口管理 !

本研究提出了一种新型的交通控制系统方法&#xff0c;通过使用大型语言模型&#xff08;LLMs&#xff09;作为交通控制器。该研究利用它们的逻辑推理、场景理解和决策能力&#xff0c;实时优化通行能力并提供基于交通状况的反馈。LLMs将传统的分散式交通控制过程集中化&#xf…