快速上手RabbitMQ

news2025/1/14 1:14:26
  1. 安装RabbitMQ
    1. 首先将镜像包上传到虚拟机,使用命令加载镜像
docker load -i mq.tar
    1. 运行MQ容器
docker run \
 -e RABBITMQ_DEFAULT_USER=itcast \
 -e RABBITMQ_DEFAULT_PASS=123321 \
 -v mq-plugins:/plugins \
 --name mq \
 --hostname mq \
 -p 15672:15672 \
 -p 5672:5672 \
 -d \
 rabbitmq:3-management
  1. MQ的基本结构
    1. RabbitMQ的一些角色
      1. publisher:生产者
      2. consumer:消费者
      3. exchange:交换机,负责消息路由
      4. queue:队列,存储消息
      5. virtualHost:虚拟主机,隔离不同租户的exchange,queue,消息的隔离
    1. 快速入门

public class PublisherTest {
    @Test
    public void testSendMessage() throws IOException, TimeoutException {
        // 1.建立连接
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
        factory.setHost("192.168.150.101");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("itcast");
        factory.setPassword("123321");
        // 1.2.建立连接
        Connection connection = factory.newConnection();

        // 2.创建通道Channel
        Channel channel = connection.createChannel();

        // 3.创建队列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);

        // 4.发送消息
        String message = "hello, rabbitmq!";
        channel.basicPublish("", queueName, null, message.getBytes());
        System.out.println("发送消息成功:【" + message + "】");

        // 5.关闭通道和连接
        channel.close();
        connection.close();

    }
}
public class ConsumerTest {

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.建立连接
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
        factory.setHost("192.168.150.101");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("itcast");
        factory.setPassword("123321");
        // 1.2.建立连接
        Connection connection = factory.newConnection();

        // 2.创建通道Channel
        Channel channel = connection.createChannel();

        // 3.创建队列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);

        // 4.订阅消息
        channel.basicConsume(queueName, true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 5.处理消息
                String message = new String(body);
                System.out.println("接收到消息:【" + message + "】");
            }
        });
        System.out.println("等待接收消息。。。。");
    }
}
  1. SpringAMQP
    1. 功能
      1. 自动声明队列、交换机及其绑定关系
      2. 基于注解的监听器模式,异步接收消息
      3. 封装了RabbitTemplate工具,用于发送消息
    1. 简化模型 === producer->queue->consumer
      1. BasicQueue
        1. 首先在父工程中引入依赖
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
        1. 配置MQ地址,在publisher服务的application.yml中添加配置
spring:
  rabbitmq:
    host: 192.168.137.138 # 主机名
    port: 5672 # 端口
    virtual-host: / # 虚拟主机
    username: itcast # 用户名
    password: 123321 # 密码
        1. 编写队列
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg){
    log.info("接受到的消息:{}",msg);
}
        1. 发送消息
@SpringBootTest
public class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSimpleQueue(){
    String queueName = "simple.queue";
    String message = "hello,world";
    rabbitTemplate.convertAndSend(queueName,message);
    }

}
      1. WorkQueue === 让多个消费者绑定到一个队列,共同消费队列中的消息
        1. 结构图

        2. 消息发送
@SpringBootTest
public class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void testWorkQueue() throws Exception{
        String queueName = "simple.queue";
        String message = "hello,world";
        for (int i = 1; i <= 50; i++) {
            rabbitTemplate.convertAndSend(queueName,"第"+i+"个"+message);
            Thread.sleep(20);
        }
    }
}
        1. prefetch能者多劳机制
          1. 原理:mq在收到consumer的ack之前,可以向consumer推送的消息的条数,默认250
          2. 修改consumer服务的application.yml文件
spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
        1. 消息接受
@Slf4j
@Component
public class SpringRabbitListener {

    @RabbitListener(queues = "simple.queue")
    public void onWorkQueue1(String msg) throws Exception {
        log.info("work1接收到的消息,{}", msg);
        Thread.sleep(20);
    }

    @RabbitListener(queues = "simple.queue")
    public void onWorkQueue2(String msg) throws Exception {
        log.info("work2接收到的消息,{}", msg);
        Thread.sleep(200);
    }

}
    1. 发布/订阅模型 === producer->exchange(只负责路由,不负责存储)->queue->consumer
      1. Fanout === 广播给所有的queue
        1. 结构图

        2. 消息发送流程
          1. 可以有多个队列
          2. 每个队列都要绑定到Exchange(交换机)
          3. 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定
          4. 交换机把消息发送给绑定过的所有队列
          5. 订阅队列的消费者都能拿到消息
        1. 在消费者模块中创建一个类,声明队列和交换机
@Configuration
public class FanoutConfig {
    /*
    * 创建一个交换机
    * */
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("fanout.exchange");
    }
    /*
    * 创建队列1
    * */
    @Bean
    public Queue fanoutQueue1(){
        return new Queue("fanout.queue1");
    }
    /*
    * 创建队列2
    * */
    @Bean
    public Queue fanoutQueue2(){
        return new Queue("fanout.queue2");
    }
    /*
    * 将队列1绑定到交换机
    * */
    @Bean
    public Binding queue1Binding(){
        return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
    }
    /*
    * 将队列2绑定到交换机
    * */
    @Bean
    public Binding queue2Binding(){
        return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
    }
    
}
        1. 发送消息
@SpringBootTest
public class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @Test
    public void testFanoutExchange() {
        // 队列名称
        String exchangeName = "fanout.exchange";
        // 消息
        String message = "hello world!";
        rabbitTemplate.convertAndSend(exchangeName, "", message);
    }
}
        1. 消息接受
@Slf4j
@Component
public class SpringRabbitListener {

    @RabbitListener(queues = "fanout.queue1")
    public void fanoutQueue1(String msg){
        log.info("收到了来自fanout.queue1的消息,{}",msg);
    }

    @RabbitListener(queues = "fanout.queue2")
    public void fanoutQueue2(String msg){
        log.info("收到了来自fanout.queue2的消息,{}",msg);
    }
}
      1. Direct === 路由给exchange绑定的queue
        1. 结构图

        2. 消息发送流程
          1. queue与exchange绑定的时候需要设置bindingkey
          2. 可以设置多个bindingkey,key可以重复
          3. produce发送的时候需要设置routingkey
          4. exchange判断消息的routingkey与queue中的bindingkey是否完全一致,一致才会接受到消息
        1. 基于注解声明队列和交换机
@Slf4j
@Component
public class SpringRabbitListener {
    
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue1"),
            exchange = @Exchange(name = "direct.exchange"),
            key = {"red","blue"}
    ))
    public void directQueue1(String msg){
        log.info("收到了来自direct.queue1的消息,{}",msg);
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue2"),
            exchange = @Exchange(name = "direct.exchange"),
            key = {"gary","blue"}
    ))
    public void directQueue2(String msg){
        log.info("收到了来自direct.queue2的消息,{}",msg);
    }

}
        1. 消息发送
@SpringBootTest
public class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void testDirectExchange(){
        String exchange = "direct.exchange";
        String routingKey = "gary";
        String message = "hello direct";
        rabbitTemplate.convertAndSend(exchange,routingKey,message);
    }

}
        1. Direct交换机与Fanout交换机有什么区别?
          1. Fanout交换机将消息路由给每一个与之绑定的队列
          2. Direct交换机根据RoutingKey判断路由给哪个队列
      1. Topic
        1. 结构图

        2. 匹配支持通配符
          1. *:1个单词
          2. #:1个或者多个单词
        1. 基于注解声明队列和交换机
@Slf4j
@Component
public class SpringRabbitListener {

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue1"),
            exchange = @Exchange(name = "topic.exchange"),
            key = "china.#"
    ))
    public void topicQueue1(String msg){
        log.info("收到了来自topic.queue1的消息,{}",msg);
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue2"),
            exchange = @Exchange(name = "topic.exchange"),
            key = "#.news"
    ))
    public void topicQueue2(String msg){
        log.info("收到了来自topic.queue2的消息,{}",msg);
    }

}
        1. 消息发送
@SpringBootTest
public class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testTopicExchange(){
        String exchange = "topic.exchange";
        String routingKey = "china.123";
        String message = "so cool";
        rabbitTemplate.convertAndSend(exchange, routingKey, message);
    }

}
    1. 消息转换器
      1. 默认发送String,byte[],Serializable
      2. 可以自定义序列化
        1. 在publisher和consumer两个服务中都引入依赖:
<dependency>
    <groupId>com.fasterxml.jackson.dataformat</groupId>
    <artifactId>jackson-dataformat-xml</artifactId>
    <version>2.9.10</version>
</dependency>
        1. 注入MessageConverter的实现类
@Bean
public MessageConverter jsonMessageConverter(){
    return new Jackson2JsonMessageConverter();
}
        1. 消息发送
@SpringBootTest
public class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testObjectQueue(){
        String queue = "object.queue";
        User message = new User("蒋浩楠",80);
        rabbitTemplate.convertAndSend(queue,message);
    }
}
        1. 接收消息
@Slf4j
@Component
public class SpringRabbitListener {
    @RabbitListener(queues = "object.queue")
    public void objectQueue(UserDTO dto){
        log.info("收到了来自topic.queue2的消息,{}",dto.toString());
    }
}
  1. RabbitMQ集群

    1. 普通集群
      1. 结构图

      2. 特征
        1. 会在集群的各个节点间共享部分数据,包括:交换机、队列元信息。不包含队列中的消息。
        2. 当访问集群某节点时,如果队列不在该节点,会从数据所在节点传递到当前节点并返回
        3. 队列所在节点宕机,队列中的消息就会丢失
    1. 镜像集群
      1. 结构图

      2. 特征
        1. 交换机、队列、队列中的消息会在各个mq的镜像节点之间同步备份。
        2. 创建队列的节点被称为该队列的主节点,备份到的其它节点叫做该队列的镜像节点
        3. 一个队列的主节点可能是另一个队列的镜像节点
        4. 所有操作都是主节点完成,然后同步给镜像节点
        5. 主宕机后,镜像节点会替代成新的主
    1. 仲裁队列
      1. 特征
        1. 与镜像队列一样,都是主从模式,支持主从数据同步
        2. 使用非常简单,没有复杂的配置
        3. 主从同步基于Raft协议,强一致
      1. java代码中创建仲裁队列
        1. 创建队列
@Bean
public Queue quorumQueue() {
    return QueueBuilder
        .durable("quorum.queue") // 持久化
        .quorum() // 仲裁队列
        .build();
}
        1. SpringAMQP连接MQ集群
spring:
  rabbitmq:
    addresses: 192.168.150.105:8071, 192.168.150.105:8072, 192.168.150.105:8073 #address来代替host、port方式
    username: itcast
    password: 123321
    virtual-host: /
  1. 部署集群
    1. 计划部署3节点的mq集群

    2. 获取cookie,每个集群节点必须具有相同的 cookie。实例之间也需要它来相互通信
docker exec -it mq cat /var/lib/rabbitmq/.erlang.cookie

UTQKOGHXAJPQFJREBLEL #cookie

docker rm -f mq #停止并删除当前的mq容器,我们重新搭建集群
    1. 准备集群配置
#在/tmp目录新建一个配置文件 rabbitmq.conf
cd /tmp

# 创建文件
touch rabbitmq.conf

#配置文件内容如下
loopback_users.guest = false
listeners.tcp.default = 5672
default_user = itcast 
default_pass = 123321
cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config
cluster_formation.classic_config.nodes.1 = rabbit@mq1
cluster_formation.classic_config.nodes.2 = rabbit@mq2
cluster_formation.classic_config.nodes.3 = rabbit@mq3
    1. 再创建一个文件,记录cookie
cd /tmp

# 创建cookie文件
touch .erlang.cookie

# 写入cookie
echo "UTQKOGHXAJPQFJREBLEL" > .erlang.cookie
# 修改cookie文件的权限
# 修改cookie文件的权限
# 修改cookie文件的权限
chmod 600 .erlang.cookie
    1. 准备三个目录,mq1、mq2、mq3,然后拷贝rabbitmq.conf、cookie文件到mq1、mq2、mq3:
cd /tmp

# 创建目录
mkdir mq1 mq2 mq3

# 进入/tmp
cd /tmp

# 拷贝
cp rabbitmq.conf mq1
cp rabbitmq.conf mq2
cp rabbitmq.conf mq3
cp .erlang.cookie mq1
cp .erlang.cookie mq2
cp .erlang.cookie mq3

# 或者
echo mq1 mq2 mq3 | xargs -t -n 1 cp rabbitmq.conf
echo mq1 mq2 mq3 | xargs -t -n 1 cp .erlang.cookie
    1. 启动集群
#创建一个网络
docker network create mq-net

#运行命令
docker run -d --net mq-net \
-v ${PWD}/mq1/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
-v ${PWD}/mq1/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \
--name mq1 \
--hostname mq1 \
-p 8071:5672 \
-p 8081:15672 \
rabbitmq:3-management

docker run -d --net mq-net \
-v ${PWD}/mq2/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
-v ${PWD}/mq2/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \
--name mq2 \
--hostname mq2 \
-p 8072:5672 \
-p 8082:15672 \
rabbitmq:3-management

docker run -d --net mq-net \
-v ${PWD}/mq3/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
-v ${PWD}/mq3/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \
--name mq3 \
--hostname mq3 \
-p 8073:5672 \
-p 8083:15672 \
rabbitmq:3-management
    1. 添加镜像模式
docker exec -it mq1 rabbitmqctl set_policy ha-two "^two\." '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'

    1. 添加仲裁队列

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1644895.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

商城数据库88张表结构完整示意图61~70(十四)

六十一&#xff1a; 六十二&#xff1a; 六十三&#xff1a; 六十四&#xff1a; 六十五&#xff1a; 六十六&#xff1a; 六十七&#xff1a; 六十八&#xff1a; 六十九&#xff1a; 七十&#xff1a;

LeetCode 15 —— 三数之和

阅读目录 1. 题目2. 解题思路3. 代码实现 1. 题目 2. 解题思路 首先我们对数组进行从小到大排序&#xff0c;然后遍历数组 [ 0 , n u m s . s i z e ( ) − 3 ] [0,nums.size()-3] [0,nums.size()−3] 作为三元组中的 a a a&#xff0c;由于三元组的索引互不相同&#xff0c…

万物互联-AI边缘计算赋能腾讯无人驾驶车

为把握智能交通系统发展机遇&#xff0c;探索未来城市智能化交通体系&#xff0c;今年3月&#xff0c;腾讯无人驾驶汽车正式落地深圳智能网联交通示范区&#xff1b;该项目将以智能网联测试为基础,以构建自动驾驶生态和未来交通体系为导向&#xff0c;围绕"车、路、云、网…

ABAP开发(3)数据类型和变量

文章目录 1、常用数据类型2、变量定义1、定义基本类型变量2、定义结构体3、定义类型 1、常用数据类型 2、变量定义 在ABAP中&#xff0c;使用关键字DATA定义变量&#xff0c;句号结尾。 1、定义基本类型变量 定义变量ID&#xff0c;字符串&#xff0c;长度20。 DATA ID(20)…

Keepalived实现LVS高可用

6.1 KeepalivedLVS集群介绍 Keepalived和LVS共同构建了一个高效的负载均衡和高可用性解决方案&#xff1a;LVS作为负载均衡器&#xff0c;负责在集群中的多个服务器间分配流量&#xff0c;以其高性能和可扩展性确保应用程序能够处理大量的并发请求&#xff1b;而Keepalived则作…

启发式算法解魔方——python

未完待续&#xff0c;填坑ing…… 魔方操作的表示——辛马斯特标记 辛马斯特标记&#xff08;Singmaster Notation&#xff09;是一种用于描述魔方和类似拼图的转动操作的标记系统。它以大卫辛马斯特&#xff08;David Singmaster&#xff09;的名字命名&#xff0c;辛马斯特…

商城数据库88张表结构完整示意图81~88及总览图(十六)

八十一&#xff1a; 八十二&#xff1a; 八十三&#xff1a; 八十四&#xff1a; 八十五&#xff1a; 八十六&#xff1a; 八十七&#xff1a; 八十八&#xff1a; 总览图&#xff1a;

JVM调优--理论篇

在对Java应用进行性能优化时&#xff0c;JVM的调优是一个绕不开的话题。本文重点介绍下如何对JVM进行调优&#xff0c;以期提高Java应用的性能、稳定性、响应时间等性能目标。JVM的调优过程符合Java应用的调优过程&#xff0c;主要分为三步&#xff1a;性能监控、性能分析、性能…

MES系统:优化生产执行,实现高效、灵活的制造管理

MES系统作为操作执行层可以缩短排产周期&#xff0c;解决紧急插单问题&#xff1b;通过计划、采集、管控等功能来改进生产执行&#xff1b;与实际生产即时接轨车间时间驱动上层的商务活动。 MES系统包含基础数据、物料和工艺管理、生产过程管理、APS排产、人员管理、设备与工具…

CPU炼丹——YOLOv5s

1.Anaconda安装与配置 1.1安装与配置 Anaconda3的安装看下面的教程&#xff1a; 最新Anaconda3的安装配置及使用教程&#xff08;详细过程&#xff09;http://t.csdnimg.cn/yygXD&#xff0c;接上面文章下载后&#xff0c;配置环境变量的时候记得在原来你装的Python更下面添…

详解LLMOps,将DevOps用于大语言模型开发

大家好&#xff0c;在机器学习领域&#xff0c;随着技术的不断发展&#xff0c;将大型语言模型&#xff08;LLMs&#xff09;集成到商业产品中已成为一种趋势&#xff0c;同时也带来了许多挑战。为了有效应对这些挑战&#xff0c;数据科学家们转向了一种新型的DevOps实践LLM-OP…

质因数分解(cpp实现)--一种快速求得一个数有多少个因子的黑魔法

前言 最近机试没少吃不会质因数分解的亏&#xff0c;用传统的求得因子个数只能过一点点…(ex, 20%) 质因数分解后&#xff0c;可以将因子问题转化为 集合的组合问题&#xff0c;因此会很快&#xff0c;目测是 l o g n log n logn (n是该整数的值)。 传统解法 假设输入整数的…

【JavaEE网络】从数据链路层到应用层的DNS

目录 数据链路层以太网 DNS 数据链路层 越往下与程序员越远 代表协议&#xff1a;以太网。平常用的网线也叫“以太网线”&#xff0c;平常用的交换机也叫“以太网交换机” 以太网 认识以太网 “以太网” 不是一种具体的网络&#xff0c;而是一种技术标准&#xff1b;既包含…

MySql#MySql安装和配置

目录 一、卸载不需要的环境 二、安装mysql yum 源 三、开始安装 四、如果保证安装成功呢&#xff1f; 五、MySql 启动&#xff01; 六、登录mysql 七、配置文件说明 八、设置开机启动&#xff01; 本次安装是在Linux环境在centos7中完成 首先先将自己切换成root 一、…

【EI会议|稳定检索】2024年能源资源与动力、控制工程国际会议(ICERPCE 2024)

2024 International Conference on Energy Resources and Power, Control Engineering 一、大会信息 会议名称&#xff1a;2024年能源资源与动力、控制工程国际会议 会议简称&#xff1a;ICERPCE 2024 收录检索&#xff1a;提交Ei Compendex,CPCI,CNKI,Google Scholar等 会议官…

深入解析算法效率核心:时间与空间复杂度概览及优化策略

算法复杂度&#xff0c;即时间复杂度与空间复杂度&#xff0c;衡量算法运行时资源消耗。时间复杂度反映执行时间随数据规模增长的关系&#xff0c;空间复杂度表明额外内存需求。优化策略&#xff0c;如选择合适数据结构、算法改进、循环展开等&#xff0c;对于提升程序效率、减…

【高阶数据结构(一)】并查集详解

&#x1f493;博主CSDN主页:杭电码农-NEO&#x1f493;   ⏩专栏分类:高阶数据结构专栏⏪   &#x1f69a;代码仓库:NEO的学习日记&#x1f69a;   &#x1f339;关注我&#x1faf5;带你学习更多Go语言知识   &#x1f51d;&#x1f51d; 高阶数据结构 1. 前言2. 并查集…

【vue+echarts】绘制中国地图,3D地图,省、市、县三级下钻以及回钻,南海诸岛小窗化显示,点位飞线图,点位名称弹窗轮播展示,及一些常见问题

先看效果展示图 目录 准备工作一, 绘制3D地图1,调用官网地址接口获取2,去官网下载中国地图的json数据到本地,本地引入 二, 南海诸岛小窗化显示1, 手动过滤掉,只保留小窗化的南海诸岛2, 代码层面过滤掉,只保留小窗化的南海诸岛 三, 省、市、县三级地图下钻及回钻1, 下钻2, 回钻…

深度学习 --- stanford cs231学习笔记(一)

stanford cs231学习笔记(一) 1&#xff0c;先是讲到了机器学习中的kNN算法&#xff0c;然后因为kNN分类器的一些弊端&#xff0c;引入了线性分类器。 kNN算法的三大弊端&#xff1a; (1)&#xff0c;计算量大&#xff0c;当特征比较多时表示性差 (2)&#xff0c;训练时耗时少…

C++初阶之模板初阶

一、泛型编程 如何实现一个通用的交换函数呢&#xff1f; void Swap(int& left, int& right) {int temp left;left right;right temp; } void Swap(double& left, double& right) {double temp left;left right;right temp; } void Swap(char& left,…