B080-RabbitMQ

news2025/1/12 11:59:52

目录

      • RabbitMQ认识
        • 概念
        • 使用场景
        • 优点
        • AMQP协议
        • JMS
      • RabbitMQ安装
        • 安装elang
        • 安装RabbitMQ
        • 安装管理插件
        • 登录RabbitMQ
        • 消息队列的工作流程
      • RabbitMQ常用模型
        • HelloWorld-基本消息模型
          • 生产者发送消息
            • 导包
            • 获取链接工具类
            • 消息的生产者
          • 消费者消费消息
            • 模拟消费者
            • 手动签收消息
        • Work Queues
          • Sender
          • Consume1
          • Consume2
        • 订阅模型-FANOUT-广播
          • Sender
          • Consume1
          • Consume2
        • 订阅模型-Direct-定向
          • Sender
          • Consume1
          • Consume2
        • 订阅模型-Topic-通配符
          • Sender
          • Consume1
          • Consume2
        • 总结
      • SpringBoot集成RabbitMQ
        • 导包
        • yml
        • config
        • producer
        • consumer

RabbitMQ认识

概念

MQ全称为Message Queue,即消息队列. 它也是一个队列,遵循FIFO原则 。RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue Protocol高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。官方地址:http://www.rabbitmq.com/

使用场景

在这里插入图片描述

优点

任务异步处理:
将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理。提高了应用程序的响应时间。(丢进去由接收方分别异步处理)

消除峰值:
异步化提速(发消息),提高系统稳定性(多系统调用),服务解耦(5-10个服务),排序保证,消除峰值
(放入队列中不用马上都处理完,有中间状态,消息分发后可由多个订阅方分别异步处理)

服务解耦:
应用程序解耦合 MQ相当于一个中介,生产方通过MQ与消费方交互,它将应用程序进行解耦合。
(将单体业务拆分为生产者,消息队列和消费者)

AMQP协议

AMQP是一套公开的消息队列协议,最早在2003年被提出,它旨在从协议层定义消息通信数据的标准格式, 为的就是解决MQ市场上协议不统一的问题。RabbitMQ就是遵循AMQP标准协议开发的MQ服务。
(其他Python,C#,PHP也都能用)

JMS

JMS是Java消息服务,是java提供的一套消息服务API标准,其目的是为所有的java应用程序提供统一的消息通信的标准,类似java的 jdbc,只要遵循jms标准的应用程序之间都可以进行消息通信。它和AMQP有什么不同,jms是java语言专属的消息服务标准,它是在api层定义标准,并且只能用于java应用;而AMQP是在协议层定义的标准,是跨语言的。
(只能Java用,基本已经被摒弃)

RabbitMQ安装

安装elang

otp_win64_20.2.exe
配置环境变量

安装RabbitMQ

rabbitmq-server-3.7.4.exe
可通过任务管理器或开始菜单启动或关闭服务

安装管理插件

安装rabbitMQ的管理插件,方便在浏览器端管理RabbitMQ ,进入到RabbitMQ的sbin目录,使用cmd执行命令: rabbitmq-plugins.bat enable rabbitmq_management , 安装成功后重新启动RabbitMQ
(开启可视化界面)

重启MQ

登录RabbitMQ

进入浏览器,输入:http://localhost:15672,初始账号和密码:guest/guest
在这里插入图片描述

消息队列的工作流程

在这里插入图片描述

RabbitMQ常用模型

HelloWorld-基本消息模型

一个生产者与一个消费者
在这里插入图片描述

生产者发送消息
导包
<dependencies>
    <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <!--和springboot2.0.5对应-->
        <version>5.4.1</version>
    </dependency>
</dependencies>
获取链接工具类
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class ConnectionUtil {
    /**
     * 建立与RabbitMQ的连接
     * @return
     * @throws Exception
     */
    public static Connection getConnection() throws Exception {
        //定义连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置服务地址
        factory.setHost("127.0.0.1");
        //端口,和管理端端口15672不一样,管理端是另外一台网页版的系统,5672才是MQ本身
        factory.setPort(5672);
        //设置账号信息,用户名、密码、vhost
        factory.setVirtualHost("/");//集群的时候才用这个参数
        factory.setUsername("guest");
        factory.setPassword("guest");
        // 通过工程获取连接
        Connection connection = factory.newConnection();
        return connection;
    }
}
消息的生产者
import cn.itsource.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

//消息的生产者
public class Sender {
    public static  final  String  HELLO_QUEUE="hello_queue";

    public static void main(String[] args) throws Exception {
        //1.获取连接
        Connection conn = ConnectionUtil.getConnection();
        //2.获取通道
        Channel channel = conn.createChannel();
        //3.创建队列(hello这里用默认的交换机)
    /*  String queue :队列的名字,可自定义,   
        boolean durable: 持久化,   
        boolean exclusive:是否独占;大家都能用,传false,
        boolean autoDelete: 用完即删;关了就没了,消费者还要拿,所以传false,
        Map<String, Object> arguments:没有其他要传的属性就传false          */
        channel.queueDeclare(HELLO_QUEUE, true, false, false, null);

        String msg="今天中午吃啥";
        //4.发送消息
        channel.basicPublish("", HELLO_QUEUE, null, msg.getBytes());

        channel.close();
        conn.close();
    }
}

在这里插入图片描述

消费者消费消息
模拟消费者
import com.rabbitmq.client.*;
import java.io.IOException;

//模拟消费者
public class Consume {

    public static void main(String[] args) throws Exception {
        //1.获取连接
        Connection conn = ConnectionUtil.getConnection();
        //2.获取通道
        Channel channel = conn.createChannel();
        //回调,可新建类实现Consumer接口或继承DefaultConsumer类或用匿名内部类覆写处理方法
        Consumer callback=new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumerTag:"+consumerTag);// 消费者标识
                System.out.println("envelope:"+envelope);// 消息队列里面的一些公共属性
                System.out.println("消息内容:"+new String(body));
                System.out.println("消费完成----------------");
            }
        };

        //3.监听队列
        /*
            queue :队列名字
            autoAck:自动签收
            Consumer callback: 回调
         */
        channel.basicConsume(Sender.HELLO_QUEUE,false,callback);

    }
}

在这里插入图片描述
在这里插入图片描述
只要消费者不关,生产者发一次消息消费者就自动监听消费一次消息

手动签收消息
import cn.itsource.utils.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;

//模拟消费者
public class Consume {

    public static void main(String[] args) throws Exception {
        //1.获取连接
        Connection conn = ConnectionUtil.getConnection();
        //2.获取通道
        Channel channel = conn.createChannel();
        //回调,可新建类实现Consumer接口或继承DefaultConsumer类或用匿名内部类覆写处理方法
        Consumer callback=new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumerTag:"+consumerTag);// 消费者标识
                System.out.println("envelope:"+envelope);// 消息队列里面的一些公共属性
//                System.out.println(1/0);
                System.out.println("消息内容:"+new String(body));
                System.out.println("消费完成----------------");
                //所有业务逻辑结束以后 手动签收
                channel.basicAck(envelope.getDeliveryTag(), false);// 第二个参数为是否同时签收多个,传false
            }
        };

        //3.监听队列
        /*
            queue :队列名字
            autoAck:自动签收    签收不等于消费成功,处理逻辑走完没有报错才算签收成功
            Consumer callback: 回调
         */
        channel.basicConsume(Sender.HELLO_QUEUE,false,callback);

    }
}

Work Queues

在这里插入图片描述
一个生产者与多个消费者。
默认轮询,也可以改成能者多劳

Sender
//消息的生产者
/*
    如果有多个消费者监听同一个队列,默认轮询
 */
public class Sender {
    public static  final  String  WORK_QUEUE="work_queue";

    public static void main(String[] args) throws Exception {
        //1.获取连接
        Connection conn = ConnectionUtil.getConnection();
        //2.获取通道
        Channel channel = conn.createChannel();
        //3.创建队列
        /*
        String queue :队列的名字
        boolean durable: 持久化
        boolean exclusive:是否独占
        boolean autoDelete: 用完即删
        Map<String, Object> arguments
         */
        channel.queueDeclare(WORK_QUEUE, true, false, false, null);

        String msg="今天中午吃啥";
        //4.发送消息
        channel.basicPublish("", WORK_QUEUE, null, msg.getBytes());

        channel.close();
        conn.close();
    }
}
Consume1
//模拟消费者
public class Consume1 {

    public static void main(String[] args) throws Exception {
        //1.获取连接
        Connection conn = ConnectionUtil.getConnection();
        //2.获取通道
        Channel channel = conn.createChannel();
        //同时处理的消息数量
//        channel.basicQos(1);
        //回调
        Consumer callback=new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("consumerTag:"+consumerTag);
                System.out.println("envelope:"+envelope);
                 //System.out.println(1/0);
                System.out.println("消息内容:"+new String(body));
//                try {
//                    Thread.sleep(100);
//                } catch (InterruptedException e) {
//                    e.printStackTrace();
//                }
                System.out.println("------------------------------------");
                //所有业务逻辑结束以后 手动签收
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };

        //3.监听队列
        /*
            queue :队列名字
            autoAck:自动签收    签收 不等于  消费成功
            Consumer callback: 回调
         */
        channel.basicConsume(Sender.WORK_QUEUE,false,callback);
    }
}
Consume2
//模拟消费者
public class Consume2 {

    public static void main(String[] args) throws Exception {
        //1.获取连接
        Connection conn = ConnectionUtil.getConnection();
        //2.获取通道
        Channel channel = conn.createChannel();
        //同时处理的消息数量
//        channel.basicQos(1);
        //回调
        Consumer callback=new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("consumerTag:"+consumerTag);
                System.out.println("envelope:"+envelope);
                System.out.println("消息内容:"+new String(body));
//                try {
//                    Thread.sleep(10000);
//                } catch (InterruptedException e) {
//                    e.printStackTrace();
//                }
                System.out.println("------------------------------------");
                //所有业务逻辑结束以后 手动签收
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };

        //3.监听队列
        /*
            queue :队列名字
            autoAck:自动签收    签收 不等于  消费成功
            Consumer callback: 回调
         */
        channel.basicConsume(Sender.WORK_QUEUE,false,callback);
    }
}

订阅模型-FANOUT-广播

在这里插入图片描述
在广播模式下,消息发送流程是这样的:
1) 可以有多个消费者
2) 每个消费者有自己的queue(队列)
3) 每个队列都要绑定到Exchange(交换机)
4) 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。
5) 交换机把消息发送给绑定过的所有队列
6) 队列的消费者都能拿到消息。实现一条消息被多个消费者消费

Sender
//消息的生产者
/*
    变化
        1.不创建 队列
        2.创建交换机
        3.给交换机发送消息
 */
public class Sender {
    public static  final  String  FANOUT_EXCHANGE="fanout_exchange";

    public static void main(String[] args) throws Exception {
        //1.获取连接
        Connection conn = ConnectionUtil.getConnection();
        //2.获取通道
        Channel channel = conn.createChannel();
        //3.创建交换机
        /*
            exchange:交换机的名字
            type:交换机的类型
            durable:是否持久化
         */
        channel.exchangeDeclare(FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT, true);

        String msg="今天晚上吃啥";
        //4.发送消息
        channel.basicPublish(FANOUT_EXCHANGE, "", null, msg.getBytes());

        channel.close();
        conn.close();
    }
}
Consume1
//模拟消费者
/*
    1.创建队列
    2.队列绑定到交换机
    3.每个消费者要监听自己的队列
 */
public class Consume1 {
    public  static final  String FANOUT_QUEUE1="fanout_queue1";

    public static void main(String[] args) throws Exception {
        //1.获取连接
        Connection conn = ConnectionUtil.getConnection();
        //2.获取通道
        Channel channel = conn.createChannel();
        //同时处理的消息数量
        channel.basicQos(1);
        //创建队列
        channel.queueDeclare(FANOUT_QUEUE1, true, false, false, null);
        //绑定到交换机
        channel.queueBind(FANOUT_QUEUE1, Sender.FANOUT_EXCHANGE, "");

        //回调
        Consumer callback=new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("consumerTag:"+consumerTag);
                System.out.println("envelope:"+envelope);
                 //System.out.println(1/0);
                System.out.println("消息内容:"+new String(body));
                System.out.println("------------------------------------");

                //所有业务逻辑结束以后 手动签收
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };

        //3.监听队列
        /*
            queue :队列名字
            autoAck:自动签收    签收 不等于  消费成功
            Consumer callback: 回调
         */
        channel.basicConsume(FANOUT_QUEUE1,false,callback);
    }
}
Consume2
//模拟消费者
/*
    1.创建队列
    2.队列绑定到交换机
    3.每个消费者要监听自己的队列
 */
public class Consume2 {
    public  static final  String FANOUT_QUEUE2="fanout_queue2";

    public static void main(String[] args) throws Exception {
        //1.获取连接
        Connection conn = ConnectionUtil.getConnection();
        //2.获取通道
        Channel channel = conn.createChannel();
        //同时处理的消息数量
        channel.basicQos(1);
        //创建队列
        channel.queueDeclare(FANOUT_QUEUE2, true, false, false, null);
        //绑定到交换机
        channel.queueBind(FANOUT_QUEUE2, Sender.FANOUT_EXCHANGE, "");

        //回调
        Consumer callback=new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("consumerTag:"+consumerTag);
                System.out.println("envelope:"+envelope);
                 //System.out.println(1/0);
                System.out.println("消息内容:"+new String(body));
                System.out.println("------------------------------------");

                //所有业务逻辑结束以后 手动签收
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };

        //3.监听队列
        /*
            queue :队列名字
            autoAck:自动签收    签收 不等于  消费成功
            Consumer callback: 回调
         */
        channel.basicConsume(FANOUT_QUEUE2,false,callback);
    }
}

订阅模型-Direct-定向

在这里插入图片描述
把消息交给符合指定routing key 的队列 一堆或一个

Sender
//消息的生产者
/*
    变化
        1.交换机类型
        2.给交换机发送消息,指定 routing key
 */
public class Sender {
    public static  final  String  DIRECT_EXCHANGE="direct_exchange";

    public static void main(String[] args) throws Exception {
        //1.获取连接
        Connection conn = ConnectionUtil.getConnection();
        //2.获取通道
        Channel channel = conn.createChannel();
        //3.创建交换机
        /*
            exchange:交换机的名字
            type:交换机的类型
            durable:是否持久化
         */
        channel.exchangeDeclare(DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT, true);

        String msg="今天晚上吃啥";
        //4.发送消息
        channel.basicPublish(DIRECT_EXCHANGE, "dept", null, msg.getBytes());

        channel.close();
        conn.close();
    }
}
Consume1
//模拟消费者
/*
    1.指定routing key
 */
public class Consume1 {
    public  static final  String DIRECT_QUEUE1="direct_queue1";

    public static void main(String[] args) throws Exception {
        //1.获取连接
        Connection conn = ConnectionUtil.getConnection();
        //2.获取通道
        Channel channel = conn.createChannel();
        //同时处理的消息数量
        channel.basicQos(1);
        //创建队列
        channel.queueDeclare(DIRECT_QUEUE1, true, false, false, null);
        //绑定到交换机
        channel.queueBind(DIRECT_QUEUE1, Sender.DIRECT_EXCHANGE, "emp.delete");

        //回调
        Consumer callback=new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("consumerTag:"+consumerTag);
                System.out.println("envelope:"+envelope);
                 //System.out.println(1/0);
                System.out.println("消息内容:"+new String(body));
                System.out.println("------------------------------------");

                //所有业务逻辑结束以后 手动签收
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };

        //3.监听队列
        /*
            queue :队列名字
            autoAck:自动签收    签收 不等于  消费成功
            Consumer callback: 回调
         */
        channel.basicConsume(DIRECT_QUEUE1,false,callback);
    }
}
Consume2
//模拟消费者
/*
    1.指定routing key
 */
public class Consume2 {
    public  static final  String DIRECT_QUEUE2="direct_queue2";

    public static void main(String[] args) throws Exception {
        //1.获取连接
        Connection conn = ConnectionUtil.getConnection();
        //2.获取通道
        Channel channel = conn.createChannel();
        //同时处理的消息数量
        channel.basicQos(1);
        //创建队列
        channel.queueDeclare(DIRECT_QUEUE2, true, false, false, null);
        //绑定到交换机
        channel.queueBind(DIRECT_QUEUE2, Sender.DIRECT_EXCHANGE, "dept");

        //回调
        Consumer callback=new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("consumerTag:"+consumerTag);
                System.out.println("envelope:"+envelope);
                 //System.out.println(1/0);
                System.out.println("消息内容:"+new String(body));
                System.out.println("------------------------------------");

                //所有业务逻辑结束以后 手动签收
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };

        //3.监听队列
        /*
            queue :队列名字
            autoAck:自动签收    签收 不等于  消费成功
            Consumer callback: 回调
         */
        channel.basicConsume(DIRECT_QUEUE2,false,callback);
    }
}

订阅模型-Topic-通配符

在这里插入图片描述
Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!

Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: goods.insert

通配符规则:
#:匹配一个或多个词
*:匹配不多不少恰好1个词

Sender
//消息的生产者
/*
    变化
        1.交换机类型
        2.给交换机发送消息,指定 routing key
 */
public class Sender {
    public static  final  String  TOPIC_EXCHANGE="topic_exchange";

    public static void main(String[] args) throws Exception {
        //1.获取连接
        Connection conn = ConnectionUtil.getConnection();
        //2.获取通道
        Channel channel = conn.createChannel();
        //3.创建交换机
        /*
            exchange:交换机的名字
            type:交换机的类型
            durable:是否持久化
         */
        channel.exchangeDeclare(TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC, true);

        String msg="今天晚上吃啥";
        //4.发送消息
        channel.basicPublish(TOPIC_EXCHANGE, "user.insert.add.pubilsh", null, msg.getBytes());

        channel.close();
        conn.close();
    }
}
Consume1
//模拟消费者
/*
    1.指定routing key
 */
public class Consume1 {
    public  static final  String TOPIC_QUEUE1="topic_queue1";

    public static void main(String[] args) throws Exception {
        //1.获取连接
        Connection conn = ConnectionUtil.getConnection();
        //2.获取通道
        Channel channel = conn.createChannel();
        //同时处理的消息数量
        channel.basicQos(1);
        //创建队列
        channel.queueDeclare(TOPIC_QUEUE1, true, false, false, null);
        //绑定到交换机
        /*
            #.1到多个单词
            *. 一个单词
         */
        channel.queueBind(TOPIC_QUEUE1,Sender.TOPIC_EXCHANGE, "user.#");

        //回调
        Consumer callback=new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("consumerTag:"+consumerTag);
                System.out.println("envelope:"+envelope);
                 //System.out.println(1/0);
                System.out.println("消息内容:"+new String(body));
                System.out.println("------------------------------------");

                //所有业务逻辑结束以后 手动签收
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };

        //3.监听队列
        /*
            queue :队列名字
            autoAck:自动签收    签收 不等于  消费成功
            Consumer callback: 回调
         */
        channel.basicConsume(TOPIC_QUEUE1,false,callback);
    }
}
Consume2
//模拟消费者
/*
    1.指定routing key
 */
public class Consume2 {
    public  static final  String TOPIC_QUEUE2="topic_queue2";

    public static void main(String[] args) throws Exception {
        //1.获取连接
        Connection conn = ConnectionUtil.getConnection();
        //2.获取通道
        Channel channel = conn.createChannel();
        //同时处理的消息数量
        channel.basicQos(1);
        //创建队列
        channel.queueDeclare(TOPIC_QUEUE2, true, false, false, null);
        //绑定到交换机
        /*
            #.1到多个单词
            *. 一个单词
         */
        channel.queueBind(TOPIC_QUEUE2,Sender.TOPIC_EXCHANGE, "email.*");

        //回调
        Consumer callback=new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("consumerTag:"+consumerTag);
                System.out.println("envelope:"+envelope);
                 //System.out.println(1/0);
                System.out.println("消息内容:"+new String(body));
                System.out.println("------------------------------------");

                //所有业务逻辑结束以后 手动签收
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };

        //3.监听队列
        /*
            queue :队列名字
            autoAck:自动签收    签收 不等于  消费成功
            Consumer callback: 回调
         */
        channel.basicConsume(TOPIC_QUEUE2,false,callback);
    }
}

总结

01_hello

生产者	1.获取连接	2.获取通道	3.创建队列	4.发送消息

消费者	1.获取连接	2.获取通道	3.监听队列 (并回调)

02_workqueue	默认轮询		可修改(能者多劳)

生产者	1.获取连接	2.获取通道	3.创建队列	4.发送消息

消费者	1.获取连接	2.获取通道	3.监听队列 (并回调)

03_fanout	广播	将消息交给所有绑定到交换机的队列(多个消费者都能收到)

生产者	1.获取连接	2.获取通道	3.创建交换机	4.发送消息到交换机

消费者	1.获取连接	2.获取通道	创建队列	绑定到交换机	3.监听队列 (并回调)

04_direct	定向	把消息交给符合指定 routing key 的队列 一堆或一个

生产者	1.获取连接	2.获取通道	3.创建交换机	4.发送消息到交换机

消费者	1.获取连接	2.获取通道	创建队列	绑定到交换机	3.监听队列 (并回调)

05_topic		通配符	把消息交给符合routing pattern (路由模式) 的队列 一堆或一个

生产者	1.获取连接	2.获取通道	3.创建交换机	4.发送消息到交换机

消费者	1.获取连接	2.获取通道	创建队列	绑定到交换机	3.监听队列 (并回调)

SpringBoot集成RabbitMQ

导包

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.5.RELEASE</version>
    </parent>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>

        <!--spirngboot集成rabbitmq-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
    </dependencies>

yml

server:
  port: 44000
spring:
  application:
    name: test‐rabbitmq‐producer
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtualHost: /
    listener:
      simple:
        acknowledge-mode: manual #手动签收
        prefetch: 1 #消费者的消息并发处理数量
    #publisher-confirms: true #消息发送到交换机失败回调
    #publisher-returns: true  #消息发送到队列失败回调
    template:
      mandatory: true # 必须设置成true 消息路由失败通知监听者,而不是将消息丢弃

config

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {

    public static final String EXCHANGE_SPRINGBOOT="exchange_springboot";
    public static final String QUEUE1_SPRINGBOOT="queue1_springboot";
    public static final String QUEUE2_SPRINGBOOT="queue2_springboot";

    //创建一个交换机
    @Bean
    public Exchange createExchange(){
        return ExchangeBuilder.topicExchange(EXCHANGE_SPRINGBOOT).durable(true).build();
    }

    //创建两个队列
    @Bean
    public Queue createQueue1(){
        return  new Queue(QUEUE1_SPRINGBOOT,true);
    }
    @Bean
    public Queue createQueue2(){
        return  new Queue(QUEUE2_SPRINGBOOT,true);
    }

    //把交换机和队列绑定到一起
    @Bean
    public Binding bind1(){
        return BindingBuilder.bind(createQueue1()).to(createExchange()).with("user.*").noargs();
    }
    @Bean
    public Binding bind2(){
        return BindingBuilder.bind(createQueue2()).to(createExchange()).with("email.*").noargs();
    }


    //消费者 还原对象方式(从MQ里取出json转为对象)
    @Bean("rabbitListenerContainerFactory")
    public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        factory.setPrefetchCount(1);
        return factory;
    }

    //放到消息队列里面的转换(转为json存进MQ)
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        return rabbitTemplate;
    }
}

producer

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@SpringBootTest(classes = App.class)
@RunWith(SpringRunner.class)
public class Sender {

    @Autowired
    RabbitTemplate rabbitTemplate;
    
    @Test
    public void test(){
        /*
            问题:多系统之间 信息交互  传递对象
               解决方案:转换为json存储
               实现:
                    1.fastjson    对象 - josn  (作业)
                    2.重写转换器模式
        */

        for (int i = 0; i < 5; i++) {
            rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_SPRINGBOOT
                    , "email.save", new User(1L,"文达"));
        }
        System.out.println("消息发送完毕");
    }
}

consumer

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import java.io.IOException;

//消费者
@Component
public class Consu {

    @RabbitListener(queues = {RabbitMQConfig.QUEUE1_SPRINGBOOT},containerFactory = "rabbitListenerContainerFactory")//用这个转换器接
    public void user(@Payload User user, Channel channel, Message message) throws IOException {
        System.out.println(message);

        System.out.println("user队列:"+user);
        //手动签收
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

    @RabbitListener(queues = {RabbitMQConfig.QUEUE2_SPRINGBOOT})
    public void email(@Payload User user,Channel channel,Message message ) throws IOException {
        System.out.println(message);
        System.out.println("email队列:"+user);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

队列内容可传string,entity序列化对象,json对象,

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/952918.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

docker启动paddlespeech服务,并使用接口调用

一、检查docker容器是否启动 1.输入命令 systemctl status docker 启动 systemctl start docker 守护进程重启 sudo systemctl daemon-reload 重启docker服务 systemctl restart docker 重启docker服务 sudo service docker restart 关闭docker service docker…

【Nacos】使用Nacos进行服务发现、配置管理

Nacos Nacos是 Dynamic Naming and Configuration Service 的首字母简称&#xff0c;一个更易于构建云原生应用的动态服务发现、配置管理和服务管理平台。 版本说明&#xff1a;版本说明 alibaba/spring-cloud-alibaba Wiki GitHub <properties><java.version>…

21 Linux高级篇-日志管理

21 Linux高级篇-日志管理 文章目录 21 Linux高级篇-日志管理21.1 系统常用的日志21.2 日志管理服务rsyslogd21.2.1 *日志记录原理21.2.2 配置文件/etc/rsyslog.conf21.2.3 日志文件格式 21.3 日志轮替21.3.1 配置文件/etc/logrotate.conf & /etc/logrotate.d/21.3.2 可执行…

memcpy 函数

目录 函数介绍&#xff1a; 函数解析&#xff1a; memcpy函数复制的数据长度 内存重叠 凑不出元素的字节数 模拟memcpy 函数介绍&#xff1a; memcpy函数是一个用于内存复制的函数&#xff0c;声明在 string.h 中&#xff08;C是 cstring&#xff09;。 其原型是&…

Excel操作技巧:如何粘贴保留单元格大小

有时我们需要在Excel中复制和粘贴并保持单元格大小。它在工作中节省了很多时间。也使数据集更具吸引力。在这篇文章中,我们将通过一些简单快捷的示例和解释来学习如何做到这一点。 一、使用上下文菜单在Excel中复制和粘贴以保持单元格大小 上下文菜单是Excel的一个重要功能。…

Openlayers 叠加天地图-中国近海海洋等深面图层服务

Openlayers 叠加天地图-中国近海海洋等深面图层服务 核心代码完整代码&#xff1a;在线示例 偶然发现天地图有一个近海海洋图层&#xff0c;觉得不错&#xff0c;于是尝试叠加一下&#xff0c;花费了一些时间&#xff0c;叠加成功&#xff0c;这里分享一下。 本文包括核心代码…

将OSGB格式数据转换为3d tiles的格式

现有需求需要将已有的一些OSGB数据加载到CesiumJS中展示,但是CesiumJS本身不支持osbg格式的数据渲染所以我们需要将其转换一下,有两种格式可以转换一种是glTF格式,另一种是我们今天要介绍的3D Tiles格式 下载开源工具 在github上其实有好多这种工具,每个工具的用法大同小异,这…

Python2021年3月Python二级 -- 编程题解析

题目一 设计一个停车场收费计算器 (收费规则&#xff0c;2小时以内收费5元&#xff0c;超出部分每小时加收2元)&#xff0c;:要求如下: 1.设计的程序要能输入停车时间 (单位为小时&#xff0c;输入的小时数为整数 2.程序可以根据输入的停车时间自动计算出停车费&#xff0c;并且…

java操作cmd执行adb命令【搬代码】

操作具体代码如下&#xff1a; 须注意的是commandStr0里面如果不加 cmd /的话会报 java.io.IOException: Cannot run program "cd": CreateProcess error2, 系统找不到指定的文件。的错误 package com.znzdh.until;import java.io.BufferedReader; import java.io.…

Unity 之 方括号[ ] 的用法以及作用

文章目录 在Unity中&#xff0c;方括号 [ ] 通常用于表示属性、特性&#xff08;Attributes&#xff09;或者元数据&#xff08;Metadata&#xff09;。这些标记提供了附加信息&#xff0c;可以用于修改类、方法、字段等的行为或者在编辑器中进行设置。 以下是一些常见的用法&…

面试:25Wqps高吞吐写Mysql,100W数据4秒写完,如何实现?

25Wqps是什么概念&#xff1f; QPS&#xff08;Queries Per Second&#xff09;&#xff1a;是衡量信息检索系统&#xff08;例如搜索引擎或数据库&#xff09;在一秒钟内接收到的搜索流量的一种常见度量。 通过概念我们能很清楚知道 QPS 并发数/响应时间&#xff0c;即100W…

conda创建python虚拟环境

1.查看当前存在那些虚拟环境 conda env list conda info -e 2.conda安装虚拟环境 conda create -n my_env_name python3.6 2.1在anaconda下改变python版本 当前3.7 安装3.7 conda create -n py37 python3.7 conda activate py37 conda create -n py37 python3.7conda a…

多通道振弦数据记录仪应用于大坝岩土工程监测

多通道振弦数据记录仪应用于大坝岩土工程监测 随着现代科技的不断发展&#xff0c;多通道振弦数据记录仪的应用越来越广泛&#xff0c;其中在大坝岩土工程监测中的应用也越来越普遍。多通道振弦数据记录仪通过采集振动信号的信息&#xff0c;可以有效地监测大坝的安全性和稳定…

【校招VIP】数据库理论之存储过程

考点介绍&#xff1a; 存储过程可以说是一个记录集&#xff0c;它是由一些T-SQL语句组成的代码块&#xff0c;这些T-SQL语句代码像一个方法一样实现一些功能&#xff08;对单表或多表的增删改查&#xff09;&#xff0c;然后再给这个代码块取一个名字&#xff0c;在用到这个功能…

给数组中多次出现的数据添加不同的标记

/*** params 取数组中第二次出现的元素之后的数据* returns*/ export const getElementsAfterSecondOccurrence (arr, column, targetValue) > {let count 0;let secondIndex -1;for (let i 0; i < arr.length; i) {if (arr[i][column] targetValue) {count;if (co…

VmWare安装CentOs8

文章目录 创建虚拟机1、创建虚拟机2、选择虚拟机硬件兼容性3、安装客户机操作系统4、安装客户机操作系统5、命名虚拟机6、处理器配置7、分配虚拟机的内存8、配置网络类型9、选择I/O控制器类型10、选择磁盘类型11、选择磁盘12、指定磁盘容量13、指定磁盘文件14、完成创建 2.安装…

STM32的HAL库的定时器使用

用HAL库老是忘记了定时器中断怎么配置&#xff0c;该调用哪个回调函数。今天记录一下&#xff0c;下次再忘了就来翻一下。 系统的时钟配置&#xff0c;定时器的时钟是84MHz 这里定时器时钟是84M&#xff0c;分频是8400后&#xff0c;时基就是1/10000s&#xff0c;即0.1ms。Per…

百度文心一言GPT免费入口也来了!!!

文心一言入口地址&#xff1a;https://cloud.baidu.com/wenxin.html?daohang

【校招VIP】前端校招考点之UDP

考点介绍&#xff1a; UDP是非面向连接协议&#xff0c;使用udp协议通讯并不需要建立连接&#xff0c;它只负责把数据尽可能发送出去&#xff0c;并不可靠&#xff0c;在接收端&#xff0c;UDP把每个消息断放入队列中&#xff0c;接收端程序从队列中读取数据。 『前端校招考点…