2.7日学习打卡----初学RabbitMQ(二)

news2025/1/16 13:48:31

2.7日学习打卡

在这里插入图片描述
JMS
由于MQ产品很多,操作方式各有不同,于是JAVA提供了一套规则
——JMS,用于操作消息中间件。JMS即Java消息服务
(JavaMessage Service)应用程序接口,是一个Java平台中关于面
向消息中间件的API。JMS是JavaEE规范中的一种,类比JDBC。很多
MQ产品都实现了JMS规范,例如ActiveMQ。RabbitMQ官方并没
有实现JMS规范,但是开源社区有JMS的实现包。

创建项目

# 开启管控台插件
rabbitmq-plugins enable
rabbitmq_management
# 启动rabbitmq
rabbitmq-server -detached

创建普通maven项目,添加RabbitMQ依赖:

<dependencies>
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqpclient</artifactId>
        <version>5.14.0</version>
    </dependency>
</dependencies>

一. RabbitMQ 简单模式在这里插入图片描述

P:生产者,也就是要发送消息的程序

C:消费者:消息的接收者,会一直等待消息到来

queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息

特点:

  1. 一个生产者对应一个消费者,通过队列进行消息传递。
  2. 该模式使用direct交换机,direct交换机是RabbitMQ默认交换机

生产者代码实现

步骤:

  1. 创建连接工厂ConnectionFactory
  2. 设置工厂的参数
  3. 创建连接 Connection
  4. 创建管道 Channel
  5. 简单模式中没有交换机exchange,所以不用创建(RabbitMQ会使用默认的交换机!)
  6. 创建队列 queue
  7. 设置发送内容,使用channal.basicPublish()发送
  8. 释放资源

代码实现

package com.jjy.mq.simple;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

//生产者
public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //使用自己的服务器ip地址
        connectionFactory.setHost("192.168.66.100");
        //rabbitmq的默认端口5672
        connectionFactory.setPort(5672);
        //用户名
        connectionFactory.setUsername("jjy");
        //密码
        connectionFactory.setPassword("jjy");
        //虚拟机
        connectionFactory.setVirtualHost("/");
        //2.创建连接
        Connection connection = connectionFactory.newConnection();
        //3.建立信道
        Channel channel = connection.createChannel();
        //4.创建队列,如果队列已存在,则使用该队列
        /**
        //     * 参数1:队列名
        //     * 参数2:是否持久化,true表示MQ重启后队列还在。
        //     * 参数3:是否私有化,false表示所有消费者都可以访问,true表示只有第一次拥有它的消费者才能访问
        //     * 参数4:是否自动删除,true表示不再使用队列时自动删除队列
        //     * 参数5:其他额外参数
        //     */
        channel.queueDeclare("simple_queue",false,false,false,null);
        //5.发送消息
        String mesg="hello rabbitmq";
        /**
         * 参数1:交换机名,""表示默认交换机
         * 参数2:路由键,简单模式就是队列名
         * 参数3:其他额外参数
         * 参数4:要传递的消息字节数组
         */
        channel.basicPublish("","simple_queue",null,mesg.getBytes());

        //6.关闭资源(信道和连接)
        channel.close();
        connection.close();
        System.out.println("发送成功");
    }
}

消费者代码实现

在这里插入图片描述

步骤:

1.创建连接工厂ConnectionFactory
2.设置工厂参数
3.创建连接
4.创建信道
前四步代码基本是一致的,需要注意的是生产者与消费者的Channel是不同Connection中的!不是同一个对象.
5. 最简单的模型没有交换机exchange,所以此处RabbitMQ会使用默认的交换机
6. 接收消息,有一个回调方法 channel.basicConsume()

代码实现

package com.jjy.mq.simple;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Customer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.66.100");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("jjy");
        connectionFactory.setPassword("jjy");
        connectionFactory.setVirtualHost("/");
        //2.创建连接
        Connection connection = connectionFactory.newConnection();
        //3.建立信道
        Channel channel = connection.createChannel();
        //4.监听队列
        /**
         * 参数1:监听的队列名
         * 参数2:是否自动签收,如果设置为false,则需要手动确认消息已收到,否则MQ会一直发送消息
         * 参数3:Consumer的实现类,重写该类方法表示接受到消息后如何消费
         */
        channel.basicConsume("simple_queue",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body,"UTF-8");
                System.out.println("接受消息,消息为:"+message);
            }
        });
        //
    }
}

二. RabbitMQ 工作队列模式

在这里插入图片描述
与简单模式相比,工作队列模式(Work Queue)多了一些消费者,该
模式也使用direct交换机,应用于处理消息较多的情况。特点如
下:

  1. 一个队列对应多个消费者。
  2. 一条消息只会被一个消费者消费。
  3. 消息队列默认采用轮询的方式将消息平均发送给消费者

应用场景:对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度

生产者代码实现

代码实现

package com.jjy.mq.work;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.66.100");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("jjy");
        connectionFactory.setPassword("jjy");
        connectionFactory.setVirtualHost("/");
        // 2.创建连接
        Connection connection = connectionFactory.newConnection();
        // 3.建立信道
        Channel channel = connection.createChannel();
        // 4.创建队列,持久化队列
        channel.queueDeclare("work_queue",true,false,false,null);
        // 5.发送大量消息,参数3表示该消息为持久化消息,即除了保存到内存还会保存到磁盘中
        for(int i=0;i<100;i++){

            channel.basicPublish("","work_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, ("你好,这是今天的第"+i+"条消息").getBytes());
        }
        // 6.关闭资源
        channel.close();
        connection.close();
    }
}

消费者代码实现

在这里插入图片描述

消费者1:

package com.jjy.mq.work;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Customer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.66.100");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("jjy");
        connectionFactory.setPassword("jjy");
        connectionFactory.setVirtualHost("/");
        //2.创建连接
        Connection connection = connectionFactory.newConnection();
        //3.建立信道
        Channel channel = connection.createChannel();
        // 4.监听队列,处理消息
        channel.basicConsume("work_queue",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("消费者1消费消息,消息为:" + message);
            }
        });

    }
}

消费者2

package com.jjy.mq.work;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Customer2 {

        public static void main(String[] args) throws IOException, TimeoutException {
            //1.创建连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("192.168.66.100");
            connectionFactory.setPort(5672);
            connectionFactory.setUsername("jjy");
            connectionFactory.setPassword("jjy");
            connectionFactory.setVirtualHost("/");
            //2.创建连接
            Connection connection = connectionFactory.newConnection();
            //3.建立信道
            Channel channel = connection.createChannel();
            // 4.监听队列,处理消息
            channel.basicConsume("work_queue",true,new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println("消费者2消费消息,消息为:" + message);
                }
            });

        }


}


消费者3

package com.jjy.mq.work;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Customer3 {

        public static void main(String[] args) throws IOException, TimeoutException {
            //1.创建连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("192.168.66.100");
            connectionFactory.setPort(5672);
            connectionFactory.setUsername("jjy");
            connectionFactory.setPassword("jjy");
            connectionFactory.setVirtualHost("/");
            //2.创建连接
            Connection connection = connectionFactory.newConnection();
            //3.建立信道
            Channel channel = connection.createChannel();
            // 4.监听队列,处理消息
            channel.basicConsume("work_queue",true,new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println("消费者3消费消息,消息为:" + message);
                }
            });

        }
    }


三. RabbitMQ 发布订阅模式

在这里插入图片描述
P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机

C:消费者,消息的接收者,会一直等待消息到来

Queue:消息队列,接收消息、缓存消息

在开发过程中,有一些消息需要不同消费者进行不同的处理,如电
商网站的同一条促销信息需要短信发送、邮件发送、站内信发送
等。此时可以使用发布订阅模式(Publish/Subscribe)
特点:

  1. 生产者将消息发送给交换机,交换机将消息转发到绑定此交换机的每个队列中。
  2. 工作队列模式的交换机只能将消息发送给一个队列,发布订阅模式的交换机能将消息发送给多个队列。发布订阅模式使用fanout交换机。

Exchange:交换机(X)一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、 递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:

Fanout:广播,将消息交给所有绑定到交换机的队列
Direct:定向,把消息交给符合指定routing key 的队列
Topic(常用):通配符,把消息交给符合routing pattern(路由模式)的队列

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合路由规则的队列,那么消息会丢失
在这里插入图片描述

生产者代码实现

与之前的步骤相比,多了创建交换机和绑定交换机与队列的操作

代码实现

package com.jjy.mq.publish;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

public class produce {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.66.100");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("jjy");
        connectionFactory.setPassword("jjy");
        connectionFactory.setVirtualHost("/");
        //2.创建连接
        Connection connection = connectionFactory.newConnection();
        //3.建立信道
        Channel channel = connection.createChannel();
        //4.创建交换机
        
        /*
        exchangeDeclare(String exchange,                  -- 交换机的名称
                        String type,                      -- 交换机的类型,4种
                                                             枚举(direct,fanout,topic,headers)
                        boolean durable,                  -- 持久化
                        boolean autoDelete,               -- 自动删除
                        boolean internal,                 -- 内部使用,基本是false
                        Map<String, Object> arguments)    -- 参数
         *
        /**
         * 参数1:交换机名
         * 参数2:交换机类型
         * 参数3:交换机持久化
         */
        channel.exchangeDeclare("exchange_fanout", BuiltinExchangeType.FANOUT,true);
        //5.创建队列
        //短信队列
        channel.queueDeclare("SEND_MAIL",true,false,false,null);
        //消息队列
        channel.queueDeclare("SEND_MESSAGE",true,false,false,null);
        //站内信息
        channel.queueDeclare("SEND_STATION",true,false,false,null);
        //6.交换机绑定队列
        /**
         * 参数1:队列名
         * 参数2:交换机名
         * 参数3:路由关键字,发布订阅模式写""即可
         */
        channel.queueBind("SEND_MAIL","exchange_fanout","");
        channel.queueBind("SEND_MESSAGE","exchange_fanout","");
        channel.queueBind("SEND_STATION","exchange_fanout","");
        //7.发送消息
        for (int i = 1; i <= 10 ; i++) {
            channel.basicPublish("exchange_fanout","",null,
                    ("你好,尊敬的用户,秒杀商品开抢了!"+i).getBytes(StandardCharsets.UTF_8));
        }
        //8.关闭资源
        channel.close();
        connection.close();
    }
}

消费者代码实现

接下来编写三个消费者,分别监听各自的队列。
//站内信消费者

package com.jjy.mq.publish;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

// 站内信消费者
public class CustomerStation {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.66.100");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("jjy");
        connectionFactory.setPassword("jjy");
        connectionFactory.setVirtualHost("/");// 默认虚拟机
        //2.创建连接
        Connection conn = connectionFactory.newConnection();
        //3.建立信道
        Channel channel = conn.createChannel();
        // 4.监听队列
        channel.basicConsume("SEND_STATION", true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "utf-8");
                System.out.println("发送站内信:"+message);
            }
        });
    }
}

邮件消费者

 
package com.jjy.mq.publish;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class CustomerMail {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.66.100");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("jjy");
        connectionFactory.setPassword("jjy");
        connectionFactory.setVirtualHost("/");// 默认虚拟机
        //2.创建连接
        Connection conn = connectionFactory.newConnection();
        //3.建立信道
        Channel channel = conn.createChannel();
        // 4.监听队列
        channel.basicConsume("SEND_MAIL", true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "utf-8");
                System.out.println("发送邮件:"+message);
            }
        });
    }
}

短信消费者

package com.jjy.mq.publish;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class CustomerMessage {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.66.100");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("jjy");
        connectionFactory.setPassword("jjy");
        connectionFactory.setVirtualHost("/");// 默认虚拟机
        //2.创建连接
        Connection conn = connectionFactory.newConnection();
        //3.建立信道
        Channel channel = conn.createChannel();
        // 4.监听队列
        channel.basicConsume("SEND_MESSAGE", true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "utf-8");
                System.out.println("发送短信:"+message);
            }
        });
    }
}


也可以使用工作队列+发布订阅模式同时使用,两个消费者同时监听
一个队列:


// 短信消费者2
public class CustomerMessage2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.0.162");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("itbaizhan");
        connectionFactory.setPassword("itbaizhan");
        connectionFactory.setVirtualHost("/");// 默认虚拟机
        //2.创建连接
        Connection conn = connectionFactory.newConnection();
        //3.建立信道
        Channel channel = conn.createChannel();
        // 4.监听队列
        channel.basicConsume("SEND_MESSAGE", true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "utf-8");
                System.out.println("发送短信2:"+message);
            }
        });
    }
}

两个不一样的系统,对同一条消息做不一样的处理

发布订阅模式与工作队列模式的区别
(1)工作队列模式不用定义交换机,而发布/订阅模式需要定义交换机

(2)发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方是面向队列发送消息(底层使用 默认交换机)

(3)发布/订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置,实际上工作队列模式会将队列绑定到默认的交换机

四. RabbitMQ 路由模式

在这里插入图片描述
使用发布订阅模式时,所有消息都会发送到绑定的队列中,但很多
时候,不是所有消息都无差别的发布到所有队列中。比如电商网站
的促销活动,双十一大促可能会发布到所有队列;而一些小的促销
活动为了节约成本,只发布到站内信队列。此时需要使用路由模式
(Routing)完成这一需求。
特点:

  1. 每个队列绑定路由关键字RoutingKey
  2. 生产者将带有RoutingKey的消息发送给交换机,交换机根据RoutingKey转发到指定队列。路由模
    式使用direct交换机。

队列与交换机的绑定,不能是任意绑定了,而是要指定一个 RoutingKey(路由key)

消息的发送方在向 Exchange 发送消息时,也必须指定消息的 RoutingKey

Exchange 不再把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列的 Routingkey 与消息的 Routing key 完全一致,才会接收到消息
在这里插入图片描述

生产者代码实现

package com.jjy.mq.routing;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

public class produce {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.66.100");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("jjy");
        connectionFactory.setPassword("jjy");
        connectionFactory.setVirtualHost("/");
        //2.创建连接
        Connection connection = connectionFactory.newConnection();
        //3.建立信道
        Channel channel = connection.createChannel();
        //4.创建交换机
        /**
         * 参数1:交换机名
         * 参数2:交换机类型
         * 参数3:交换机持久化
         */
        channel.exchangeDeclare("exchange_routing", BuiltinExchangeType.DIRECT,true);
        // 5.创建队列
        channel.queueDeclare("SEND_MAIL2",true,false,false,null);
        channel.queueDeclare("SEND_MESSAGE2",true,false,false,null);
        channel.queueDeclare("SEND_STATION2",true,false,false,null);
        //6.交换机绑定队列
        /**
         * 参数1:队列名
         * 参数2:交换机名
         * 参数3:路由关键字,发布订阅模式写""即可
         */
        channel.queueBind("SEND_MAIL2","exchange_routing","import");
        channel.queueBind("SEND_MESSAGE2","exchange_routing","import");
        channel.queueBind("SEND_STATION2","exchange_routing","import");
        channel.queueBind("SEND_STATION2","exchange_routing","normal");
        //7.发送消息
        channel.basicPublish("exchange_routing","import",null,
                "双十一大促活动".getBytes());
        channel.basicPublish("exchange_routing","normal",null,
                "小型促销活动".getBytes());
        //8.关闭资源
        channel.close();
        connection.close();
    }
}

消费者代码实现

站内信消费者

package com.jjy.mq.routing;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

// 站内信消费者
public class CustomerStation {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.66.100");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("jjy");
        connectionFactory.setPassword("jjy");
        connectionFactory.setVirtualHost("/");// 默认虚拟机
        //2.创建连接
        Connection conn = connectionFactory.newConnection();
        //3.建立信道
        Channel channel = conn.createChannel();
        // 4.监听队列
        channel.basicConsume("SEND_STATION2", true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "utf-8");
                System.out.println("发送站内信:"+message);
            }
        });
    }
}

短信消费者

package com.jjy.mq.routing;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class CustomerMessage {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.66.100");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("jjy");
        connectionFactory.setPassword("jjy");
        connectionFactory.setVirtualHost("/");// 默认虚拟机
        //2.创建连接
        Connection conn = connectionFactory.newConnection();
        //3.建立信道
        Channel channel = conn.createChannel();
        // 4.监听队列
        channel.basicConsume("SEND_MESSAGE2", true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "utf-8");
                System.out.println("发送短信:"+message);
            }
        });
    }
}

邮件消费者

package com.jjy.mq.routing;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class CustomerMail {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.66.100");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("jjy");
        connectionFactory.setPassword("jjy");
        connectionFactory.setVirtualHost("/");// 默认虚拟机
        //2.创建连接
        Connection conn = connectionFactory.newConnection();
        //3.建立信道
        Channel channel = conn.createChannel();
        // 4.监听队列
        channel.basicConsume("SEND_MAIL2", true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "utf-8");
                System.out.println("发送邮件:"+message);
            }
        });
    }
}

总的来说就一句话:

Routing 模式要求队列在绑定交换机时要指定 routing key,消息会转发到符合 routing key 的队列。

五. RabbitMQ 通配符模式

在这里插入图片描述
通配符模式(Topic)是在路由模式的基础上,给队列绑定带通配符的
路由关键字,只要消息的RoutingKey能实现通配符匹配,就会将消
息转发到该队列。通配符模式比路由模式更灵活,使用topic交换
机.
通配符规则

  1. 消息设置RoutingKey时,RoutingKey由多个单词构成,中间以 . 分割。
  2. 队列设置RoutingKey时, # 可以匹配任意多个单词, * 可以匹配任意一个单词。

生产者代码实现

在这里插入图片描述
代码实现

package com.jjy.mq.topic;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class produce {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.66.100");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("jjy");
        connectionFactory.setPassword("jjy");
        connectionFactory.setVirtualHost("/");
        //2.创建连接
        Connection connection = connectionFactory.newConnection();
        //3.建立信道
        Channel channel = connection.createChannel();
        //4.创建交换机
        /**
         * 参数1:交换机名
         * 参数2:交换机类型
         * 参数3:交换机持久化
         */
        channel.exchangeDeclare("exchange_topic", BuiltinExchangeType.TOPIC,true);
        // 5.创建队列
        channel.queueDeclare("SEND_MAIL3",true,false,false,null);
        channel.queueDeclare("SEND_MESSAGE3",true,false,false,null);
        channel.queueDeclare("SEND_STATION3",true,false,false,null);
        //6.交换机绑定队列

        channel.queueBind("SEND_MAIL3","exchange_topic","#.mail.#");
        channel.queueBind("SEND_MESSAGE3","exchange_topic","#.message.#");
        channel.queueBind("SEND_STATION3","exchange_topic","#.station.#");
        //7.发送消息
        channel.basicPublish("exchange_topic","mail.message.station",null,
                "双十一大促活动".getBytes());
        channel.basicPublish("exchange_topic","station",null,
                "小型促销活动".getBytes());
        //8.关闭资源
        channel.close();
        connection.close();
    }
}

消费者代码实现

站内信消费者

package com.jjy.mq.topic;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

// 站内信消费者
public class CustomerStation {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.66.100");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("jjy");
        connectionFactory.setPassword("jjy");
        connectionFactory.setVirtualHost("/");// 默认虚拟机
        //2.创建连接
        Connection conn = connectionFactory.newConnection();
        //3.建立信道
        Channel channel = conn.createChannel();
        // 4.监听队列
        channel.basicConsume("SEND_STATION3", true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "utf-8");
                System.out.println("发送站内信:"+message);
            }
        });
    }
}

短信消费者

package com.jjy.mq.topic;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class CustomerMessage {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.66.100");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("jjy");
        connectionFactory.setPassword("jjy");
        connectionFactory.setVirtualHost("/");// 默认虚拟机
        //2.创建连接
        Connection conn = connectionFactory.newConnection();
        //3.建立信道
        Channel channel = conn.createChannel();
        // 4.监听队列
        channel.basicConsume("SEND_MESSAGE3", true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "utf-8");
                System.out.println("发送短信:"+message);
            }
        });
    }
}

邮件消费者

package com.jjy.mq.topic;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class CustomerMail {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.66.100");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("jjy");
        connectionFactory.setPassword("jjy");
        connectionFactory.setVirtualHost("/");// 默认虚拟机
        //2.创建连接
        Connection conn = connectionFactory.newConnection();
        //3.建立信道
        Channel channel = conn.createChannel();
        // 4.监听队列
        channel.basicConsume("SEND_MAIL3", true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "utf-8");
                System.out.println("发送邮件:"+message);
            }
        });
    }
}

总述:topics模式比routing模式要更加灵活,笼统的说就是routing模式加上通配符

如果我的内容对你有帮助,请点赞,评论,收藏。创作不易,大家的支持就是我坚持下去的动力!
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1442385.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ssm+vue的医药垃圾分类管理系统(有报告)。Javaee项目,ssm vue前后端分离项目。

演示视频&#xff1a; ssmvue的医药垃圾分类管理系统&#xff08;有报告&#xff09;。Javaee项目&#xff0c;ssm vue前后端分离项目。 项目介绍&#xff1a; 采用M&#xff08;model&#xff09;V&#xff08;view&#xff09;C&#xff08;controller&#xff09;三层体系结…

在本地运行大型语言模型 (LLM) 的六种方法(2024 年 1 月)

一、说明 &#xff08;开放&#xff09;本地大型语言模型&#xff08;LLM&#xff09;&#xff0c;特别是在 Meta 发布LLaMA和后Llama 2&#xff0c;变得越来越好&#xff0c;并且被越来越广泛地采用。 在本文中&#xff0c;我想演示在本地&#xff08;即在您的计算机上&#x…

Android 粒子喷泉动效

一、前言&#xff1a; 在学习open gl es实现动效的时候&#xff0c;打算回顾了一下用普通的2D坐标系实现粒子效果和 open gl 3d 坐标系的区别&#xff0c;以及难易程度&#xff0c;因此本篇以Canvas 2D坐标系实现了一个简单的demo。 粒子动效原理&#xff1a; 粒子动效本质上…

504. Base 7(七进制数)

题目描述 给定一个整数 num&#xff0c;将其转化为 7 进制&#xff0c;并以字符串形式输出。 问题分析 按照二进制转换的方式进行转换即可 代码 char* convertToBase7(int num) {int count 0;char *x (char *)malloc(sizeof(char)*32);char *y (char *)malloc(sizeof(c…

【Redis】深入理解 Redis 常用数据类型源码及底层实现(3.详解String数据结构)

【Redis】深入理解 Redis 常用数据类型源码及底层实现&#xff08;1.结构与源码概述&#xff09;-CSDN博客 【Redis】深入理解 Redis 常用数据类型源码及底层实现(2.版本区别dictEntry & redisObject详解)-CSDN博客 紧接着前两篇的总体介绍&#xff0c;从这篇开始&#x…

windows安装sqlite

windows安装sqlite比linux麻烦很多 1.下载 下载链接&#xff1a;链接 下载dll的zip文件 2.解压并放到文件夹 将压缩包内的文件解压&#xff0c;放到C://sqlite下 3.编辑环境变量 添加到系统变量的path中 4.验证 打开命令提示符&#xff0c;输入 sqlite3有结果就行

JavaWeb02-MyBatis

目录 一、MyBatis 1.概述 2.JavaEE三层架构简单介绍 &#xff08;1&#xff09;表现层 &#xff08;2&#xff09;业务层 &#xff08;3&#xff09;持久层 3.框架 4.优势 &#xff08;1&#xff09;JDBC的劣势 &#xff08;2&#xff09;MyBatis优化 5.使用 &#…

Golang的for循环变量和goroutine的陷阱,1.22版本的更新

先来看一段golang 1.22版本之前的for循环的代码 package mainimport "fmt"func main() {done : make(chan bool)values : []string{"chen", "hai", "feng"}for _, v : range values {fmt.Println("start")go func() {fmt.P…

Gemini 下一章节即将拉开帷幕

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗&#xff1f;订阅我们的简报&#xff0c;深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同&#xff0c;从行业内部的深度分析和实用指南中受益。不要错过这个机会&#xff0c;成为AI领…

树与二叉树---数据结构

树作为一种逻辑结构&#xff0c;同时也是一种分层结构&#xff0c;具有以下两个特点&#xff1a; 1&#xff09;树的根结点没有前驱&#xff0c;除根结点外的所有结点有 且只有一个前驱。 2&#xff09;树中所有结点可以有零个或多个后继。 树结点数据结构 满二叉树和完全二…

洛谷使用指南

详细看——洛谷的规则 1.注册账号 1.打开洛谷首页 这样就对了&#xff01;&#xff01;&#xff01; 2.点击注册 当显示以上页面时表示进入了注册页面。 3.登录 当注册好后就可以登陆了。 当显示以上页面时表示进入了登录页面。 2.题库使用 当单击题库后&#xff0c;会出…

算法练习-二叉搜索树中的搜索(思路+流程图+代码)

难度参考 难度&#xff1a;中等 分类&#xff1a;二叉树 难度与分类由我所参与的培训课程提供&#xff0c;但需要注意的是&#xff0c;难度与分类仅供参考。且所在课程未提供测试平台&#xff0c;故实现代码主要为自行测试的那种&#xff0c;以下内容均为个人笔记&#xff0c;旨…

二、OpenAI开发者快速入门

启动并运行OpenAI API OpenAI API 为开发者提供一个简单的接口&#xff0c;使其能够在他们的应用中创建一个智能层&#xff0c;由OpenAI最先进的模型提供支持。聊天补全端点为ChatGPT提示支持&#xff0c;一种简单的方法是&#xff1a;输入文本&#xff0c;使用GPT-4模型输出。…

在JSP中实现JAVABEAN

在JSP中实现JAVABEAN 问题陈述 创建Web应用程序以连接数据库并检索作者名、地址、城市、州及邮政编码等与作者的详细信息。JavaBean组件应接受作者ID、驱动程序名及URL作为参数。信息要从authors表中检索。 解决方案 要解决上述问题,需要执行以下任务: 创建Web应用程序。创…

macbookair怎么清理内存 ?如何利用 CleanMyMac X 进行系统清理

macbookair怎么清理内存 清理MacBook Air的内存可以通过以下几种方法&#xff1a; 优化储存空间。在MacBook Air上&#xff0c;可以通过“优化储存空间”来释放空间。这包括将文件储存在iCloud中&#xff0c;如桌面、文稿和iCloud信息&#xff0c;以及自动移除在iCloud中观看…

Lombok 高级说明

优质博文&#xff1a;IT-BLOG-CN 一、痛点 【1】代码臃肿&#xff1a;POJO中的getter/setter/equals/hashcode/toString等&#xff1b; 【2】样板式代码&#xff1a;I/O流的关闭操作等&#xff1b; Lombok是一个可以通过注解简化Java代码开发的工具&#xff0c;能够在我们编…

2.8日学习打卡----初学RabbitMQ(三)

2.8日学习打卡 一.springboot整合RabbitMQ 之前我们使用原生JAVA操作RabbitMQ较为繁琐&#xff0c;接下来我们使用 SpringBoot整合RabbitMQ&#xff0c;简化代码编写 创建SpringBoot项目&#xff0c;引入RabbitMQ起步依赖 <!-- RabbitMQ起步依赖 --> <dependency&g…

关节点检测

https://www.bilibili.com/video/BV19g4y1777q/?p2&spm_id_frompageDriver 关节点检测全流程 YOLO:单阶段&#xff0c;快&#xff1b; MMPose&#xff1a;双阶段&#xff0c;准&#xff1b; 标注工具Labelme 用Labelme标注样本数据集

利用Pybind11封装Python版的WiringPi!

原版的WiringPi是一个用于树莓派的GPIO库&#xff0c;用C语言开发&#xff0c;仓库地址&#xff1a;https://github.com/WiringPi/WiringPi。该库允许用户以编程方式访问和控制树莓派的GPIO引脚。而随着Python在嵌入式设备上的快速发展&#xff0c;其对底层引脚的操作也变得越来…

OpenAI给DALL-E 3来了个新动作,加入了全新水印技术

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗&#xff1f;订阅我们的简报&#xff0c;深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同&#xff0c;从行业内部的深度分析和实用指南中受益。不要错过这个机会&#xff0c;成为AI领…