RabbitMq(一)

news2024/11/24 19:00:11

一、基本概念、常见工作模式以及简单使用

MQ全称Message Queue (消息队列),是在消息的传输过程中保存消息的容器。多用于分布式系统之间进行通信。
在这里插入图片描述

小结

  • MQ消息队列,存储消息的中间件
  • 分布式系统通信两种方式:直接远程调用借助第三方完成间接通信
  • 发送方称为生产者,接收方称为消费者

1.Mq的优势和劣势

1.1优势

1.1.1 应用解耦

用户从订单系统向库存系统、支付系统等这一链路发送消息,如果库存系统出现错误,可能也会导致订单系统出错,从而给用户带来不好的体验。
并且如果要在该下单链路上添加新的服务,为了完成服务间的通信,只能修改订单系统的代码

在这里插入图片描述
如果在订单系统和后面的系统中间加一个消息中间件,情况就会好很多。
假设库存系统出错了,这并不会影响到订单系统,因为中间有一个Mq隔离,有时库存系统出错时暂时的,等一会儿库存系统被维护好了,可以继续从Mq中重新取出消息进行操作。
并且如果需要新增一个服务,也不需要更改订单系统的代码。
Mq使得应用间解耦提升容错性和可维护性
在这里插入图片描述

1.1.2 异步提速

在黑马点评中就用到了异步提速,不过没有使用Mq,而是使用的java阻塞队列(后面改成了redis的消息中间件)
原始得订单系统是同步进行的,需要各个服务都完成之后才向用户返回信息
在这里插入图片描述
使用Mq后可以在订单系统将消息发送到Mq后直接向用户返回信息,后面的库存系统和支付系统可以慢慢的从Mq中取出消息去执行
在这里插入图片描述
提升用户体验和系统吞吐量(单位时间内处理请求的数目)。

1.1.3 削峰填谷

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
小结:

  • 应用解耦: 提高系统容错性和可维护性
  • 异步提速: 提升用户体验和系统吞吐量
  • 削峰填谷: 提高系统稳定性

1.2 劣势

  • 系统的可用性降低
    在这里插入图片描述
    系统引入的外部依赖越多,系统稳定性越差。一旦 MQ宕机,就会对业务造成影响。如何保证MQ的高可用?
  • 系统的复杂度提高
    MQ的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过MQ进行异步调用。如何保证消息没有被重复消费?怎么处理消息丢失情况?那么保证消息传递的顺序性?
  • 一致性问题
    A系统处理完业务,通过MQ给B、C、D三个系统发消息数据,如果B系统、C系统处理成功,D系统处理失败。如何保证消息数据处理的一致性?

1.3 小结

既然MQ有优势也有劣势,那么使用MQ需要满足什么条件呢?

产者不需要从消费者处获得反馈。引入消息队列之前的直接调用,其接口的返回值应该为空,这才让明明下层的动作还没做,.上层却当成动作做完了继续往后走,即所谓异步成为了可能。
容许短暂的不一致性。
确实是用了有效果。即解耦、提速、削峰这些方面的收益,超过加入MQ,管理MQ这些成本。

2.常见的MQ产品

目前业界有很多的MQ产品,例如RabbitMQ、RocketMQ、ActiveMQ、Kafka、 ZeroMQ、MetaMq等,也有直接使用Redis充当消息队列的案例 (黑马点评就是) ,而这些消息队列产品,各有侧重,在实际选型时,需要结合自身需求及MQ产品特征,综合考虑。
在这里插入图片描述

3.RabbitMq简介

3.1 AMQP协议

AMQP,即Advanced Message Queuing Protosol (高级消息队列协议) , 是一个网络协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。2006年, AMQP规范发布。类比HTTP。
AMQP协议流程图
在这里插入图片描述

3.2 RabbitMQ

2007年,Rabbit 技术公司基于AMQP标准开发的RabbitMQ 1.0发布。RabbitMQ 采用Erlang语言开发。Erlang语言由Ericson设计,专为开发高并发和分布式系统的一种语言,在电信领域使用广泛。

3.3 RabbitMQ架构

①中间这一块是服务端生产者消费者会与服务段建立connect连接,频繁的建立connect连接会影响性能消耗资源,所以在一个connect连接中有很多channel,可以通过channel建立连接。
②服务端的内部有很多的虚拟机(virtual host)
虚拟机中有交换机队列,交换机通过一些规则和队列绑定
在这里插入图片描述

3.4 RabbitMQ的相关概念

  • Broker: 接收和分发消息的应用,RabbitMQ Server就是Message Broker

  • Virtual host: 出于多租户和安全因素设计的,把AMQP的基本组件划分到一个虚拟的分组中,类似于网络中的namespace概念。当多个不同的用户使用同一个RabbitMQ server提供的服务时,可以划分出多个vhost,每个用户在自己的vhost创建exchange / queue等

  • Connection: publisher / consumer和broker之间的TCP连接

  • Channel: 如果每一次访问 RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connection的开销将是巨大的,效率也较低。Channel 是在connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的channel进行通讯,AMQP method包含了channel id帮助客户端和message broker识别channel,所以channel之间是完全隔离的。Channel 作为轻量级的Connection极大减少了操作系统建立TCP connection的开销

  • Exchange: message 到达broker的第一站,根据分发规则,匹配查询表中的routing key,分发消息到queue中去。常用的类型有: direct (point-to point), topic (publish-subscribe) and fanout (multicast)

  • Queue: 消息最终被送到这里等待consumer取走

  • Binding: exchange 和queue之间的虚拟连接, binding 中可以包含routing key。 Binding 信息被保存到exchange中的查询表中,用于message的分发依据

  • RabbitMQ提供了6种工作模式: ①简单模式、②work queues、③Routing路由模式、④Topics 主题模式、⑤Publish/Subscribe 发布与订阅模式、⑥RPC远程调用模式(远程调用,不太算MQ;暂不作介绍)。
    官网对应模式介绍: https://www.rabbitmq.com/getstarted.html
    在这里插入图片描述

3.5 JMS

  • JMS即Java消息服务(JavaMessage Service)应用程序接口,是一个Java平台中关于面向消息中间件的API
  • JMS是JavaEE规范中的一种,类比JDBC
  • 很多消息中间件都实现了JMS规范,例如: ActiveMQ。 RabbitMQ 官方没有提供JMS的实现包,但是开源社区有

3.6 小结

在这里插入图片描述

4.RabbitMQ的安装与配置

自行在网上找教程

5.RabbitMQ的快速入门

5.1 生产者-消费者模型

简单模式
在这里插入图片描述
使用一个入门程序了解生产者和消费者的代码编写:
步骤
①创建工程(生产者,消费者)
②分别添加依赖
③编写生产者发送消息
④编写消费者接收消息
在这里插入图片描述
生产者代码
工厂参数要和rabbitmq部署时严格一致,以及和rabbitmq management网页中设置的参数一致

package ratbbitMq;


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author Watching
 * * @date 2023/7/12
 * * Describe:消息队列生产者
 */
public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //2.设置工厂参数
        connectionFactory.setHost("1.12.244.105");//ip地址,默认localhost
        connectionFactory.setVirtualHost("itcast");//虚拟机名称 默认 /
        connectionFactory.setPort(5673);//端口号 默认值 5672
        connectionFactory.setUsername("heima");//用户名 默认 guest
        connectionFactory.setPassword("heima");//密码 默认 guest
        //3.创建connection连接
        Connection connection = connectionFactory.newConnection();
        //4.创建channel连接
        Channel channel = connection.createChannel();
        //5.创建队列
        /*
        queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
         queue:队列名称
         durable:是否持久化,重启后是否存在
         exclusive:是否独占,只能有一个交换机与其绑定,当connection关闭时,是否删除队列
         autoDelete:是否自动删除,当没有consumer的时候,是否删除队列
         arguments:参数
         */
        channel.queueDeclare("hello_world", true, false, false, null);
        //6.发送消息
        /*
        basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
        exchange:交换机名称,简单模式使用默认的交换机
        routingKey:路由名称,简单模式下为queue名称
        props:配置信息
        body:消息
         */
        channel.basicPublish("","hello_world",null,"你好".getBytes());
        channel.close();
        connection.close();
    }
}

消费者代码
消费者代码和生产者代码创建连接工厂、连接、channel的部分相同,需要注意的是,使用channel消费消息时需要实现一个回调函数。

package rabbitMq;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author Watching
 * * @date 2023/7/12
 * * Describe:
 * 消费者
 */
public class consumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("1.12.244.105");
        connectionFactory.setVirtualHost("itcast");//虚拟机名称 默认 /
        connectionFactory.setPort(5673);//端口号
        connectionFactory.setUsername("heima");//用户名 默认 guest
        connectionFactory.setPassword("heima");//密码 默认 guest
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        /*
        basicConsume(String queue, boolean autoAck, Consumer callback)
        queue:队列名称
        autoAck:是否自动确认消息
        callback:回调函数
         */
        channel.basicConsume("hello_world", true, new Consumer() {
            @Override
            public void handleConsumeOk(String s) {

            }

            @Override
            public void handleCancelOk(String s) {

            }

            @Override
            public void handleCancel(String s) throws IOException {

            }

            @Override
            public void handleShutdownSignal(String s, ShutdownSignalException e) {

            }

            @Override
            public void handleRecoverOk(String s) {

            }

            /*
            s:标识
            envelope:获取一些信息,交换机,路由Key等
            basicProperties:参数
            bytes:消息
             */
            @Override
            public void handleDelivery(String s, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
                System.out.println("consumerTag:" + s);
                System.out.println("envelope_exchange:" + envelope.getExchange());
                System.out.println("envelope_RoutingKey:" + envelope.getRoutingKey());
                System.out.println("properties:" + basicProperties);
                System.out.println("body:" + new String(bytes));
            }
        });
    }
}

小结:
在这里插入图片描述

5.2工作队列模式

在这里插入图片描述

Work Queues:与入门程序的简单模式相比,多了一个或一些消费端 ,多个消费端共同消费同一一个队列中的消息。同一条消息只能被一个消费者获取。
应用场景:对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。

生产者代码:
工作队列的生产者代码与简单模式的生产者代码区别不大,为了展示多条消息被多个消费者消费,这里使用循环发送了10条消息

package ratbbitMq.produce_work_queues;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author Watching
 * * @date 2023/7/13
 * * Describe:工作队列模式 生产者
 */
public class producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("1.12.244.105");
        connectionFactory.setVirtualHost("itcast");
        connectionFactory.setUsername("heima");
        connectionFactory.setPassword("heima");
        connectionFactory.setPort(5673);
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare("work_queue", true, false, false, null);
        for (int i = 0; i < 10; i++) {
            String s = i + "测试工作队列";
            channel.basicPublish("", "work_queue", null, s.getBytes());
        }
        channel.close();
        connection.close();
    }
}

消费者代码

package rabbitMq.simple;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author Watching
 * * @date 2023/7/12
 * * Describe:
 * 消费者
 */
public class consumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("1.12.244.105");
        connectionFactory.setVirtualHost("itcast");//虚拟机名称 默认 /
        connectionFactory.setPort(5673);//端口号
        connectionFactory.setUsername("heima");//用户名 默认 guest
        connectionFactory.setPassword("heima");//密码 默认 guest
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        /*
        basicConsume(String queue, boolean autoAck, Consumer callback)
        queue:队列名称
        autoAck:是否自动确认消息
        如果autoAck参数设置为false,即消费者不自动确认消息,那么在消费者接收到消息后,消息将保留在队列中,直到消费者显式地确认消息为止。
        如果消费者不确认消息,并且没有进行任何处理,那么该消息将一直留在队列中,不会被移除
        callback:回调函数
         */
        channel.basicConsume("hello_world", true, new Consumer() {
            @Override
            public void handleConsumeOk(String s) {

            }

            @Override
            public void handleCancelOk(String s) {

            }

            @Override
            public void handleCancel(String s) throws IOException {

            }

            @Override
            public void handleShutdownSignal(String s, ShutdownSignalException e) {

            }

            @Override
            public void handleRecoverOk(String s) {

            }

            /*
            s:标识
            envelope:获取一些信息,交换机,路由Key等
            basicProperties:参数
            bytes:消息
             */
            @Override
            public void handleDelivery(String s, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
                System.out.println("consumerTag:" + s);
                System.out.println("envelope_exchange:" + envelope.getExchange());
                System.out.println("envelope_RoutingKey:" + envelope.getRoutingKey());
                System.out.println("properties:" + basicProperties);
                System.out.println("body:" + new String(bytes));
            }
        });
    }
}

小结
在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系。
②Work Queues对于任务过重或(任务较多情况使用工作队列可以提高任务处理的速度。例如:短信服务部署多个,只需要有一个节点成功发送即可。只要有一个服务器获取到短信并发送就行

5.3 Pub/Sub订阅模式

在这里插入图片描述
在订阅模型中,多了一个Exchange角色,而且过程略有变化:

  • P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X (交换机)
  • C:消费者,消息的接收者,会- -直等待消息到来
  • Queue:消息队列,接收消息、缓存消息
  • Exchange: 交换机(X) 。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。
  • Exchange有常见以下3种类型:
    Fanout:广播,将消息发送到所有绑定了交换机的队列
    Direct:定向,把消息交给符合指定routing key的队列
    Topic:通配符,把消息交给符合routing pattern(路由模式)的队列

生产者代码:
Pubsub模式的生产者代码和工作队列的区别:
Pubsub模式多了一个交换机,我们需要创建交换机并且将交换机与创建的队列进行绑定。代码细节请看注释

package ratbbitMq.produce_Pubsub;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author Watching
 * * @date 2023/7/17
 * * Describe:
 */
public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //2.设置连接参数
        connectionFactory.setVirtualHost("itcast");
        connectionFactory.setHost("1.12.244.105");
        connectionFactory.setUsername("heima");
        connectionFactory.setPassword("heima");
        connectionFactory.setPort(5673);
        //3.创建连接
        Connection connection = connectionFactory.newConnection();
        //4.创建Channel
        Channel channel = connection.createChannel();
        //5.创建队列
        String queueName1 = "test_fanout_queue1";
        String queueName2 = "test_fanout_queue2";
        channel.queueDeclare(queueName1,true,false,false,null);
        channel.queueDeclare(queueName2,true,false,false,null);
        //6.创建交换机
        /*
        exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
        exchange:交换机名称
        type:交换机类型
            fanout:广播
            direct:定向
            topic:通配符
            headers:参数匹配
        durable:是否持久化
        aotuDelete:没有任何与之关联的队列或交换器时,会被自动删除
        internal:指定交换器是否是内部的。如果设置为true,则表示该交换器仅可用于内部使用,不能被客户端用来发布消息。一般用于实现一些特殊的交换器交互模式。
        arguments:一组可选的附加参数,用于设置交换器的特定属性。这些参数是键值对的形式,允许用户自定义交换器的行为。
         */
        String exchangeName = "test_fanout";
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true);
        //7.绑定队列和交换机
        /*
        queueBind(String queue, String exchange, String routingKey)
        queue:队列名称
        exchange:交换机名称
        routingKey:路由键,绑定规则:
            如果交换机类型为fanout,则路由键设置为""
         */
        channel.queueBind(queueName1,exchangeName,"");
        channel.queueBind(queueName2,exchangeName,"");
        //8.发送消息
        channel.basicPublish(exchangeName,"",null,"测试Pubsub——fanout".getBytes());
        //9.释放资源
        channel.close();
        connection.close();
    }
}

消费者代码:
消费者代码和工作队列模式的代码没有区别

package rabbitMq.produce_Pubsub;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author Watching
 * * @date 2023/7/17
 * * Describe:
 */
public class Consumer_Pubsub1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("1.12.244.105");
        connectionFactory.setVirtualHost("itcast");
        connectionFactory.setUsername("heima");
        connectionFactory.setPassword("heima");
        connectionFactory.setPort(5673);
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        channel.basicConsume("test_fanout_queue1",true, new com.rabbitmq.client.Consumer() {
            @Override
            public void handleConsumeOk(String s) {

            }

            @Override
            public void handleCancelOk(String s) {

            }

            @Override
            public void handleCancel(String s) throws IOException {

            }

            @Override
            public void handleShutdownSignal(String s, ShutdownSignalException e) {

            }

            @Override
            public void handleRecoverOk(String s) {

            }

            @Override
            public void handleDelivery(String s, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
//                System.out.println("consumerTag:" + s);
//                System.out.println("envelope_exchange:" + envelope.getExchange());
//                System.out.println("envelope_RoutingKey:" + envelope.getRoutingKey());
//                System.out.println("properties:" + basicProperties);
                System.out.println("body:" + new String(bytes));
            }
        });
    }
}

Pubsub工作模式和工作队列的工作模式的区别在于:
Pubsub多了一个交换机(非默认交换机),交换机将消息发送到与之绑定的每个队列,每个队列都会有同样的消息,不同的消费者再分别从不同的队列中获取相同的消息。
在这里插入图片描述

工作队列只有一个消息队列,没有交换机(非默认交换机),消息会被发送到一个队列中,不同的消费者从同一个队列中获取不同的消息。
在这里插入图片描述

5.4 Routing工作模式

模式说明:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey (路由key)
  • 消息的发送方在向Exchange发送消息时,也必须指定消息的RoutingKey
  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的
  • Routingkey与消息的Routing key完全一致, 才会接收到消息
    在这里插入图片描述
    我们通过一个例子来了解Routing路由模式:
    在项目中,我们需要把一些普通日志信息(info,warning)打印到控制台,不需要存在数据库中;还有一些重要的日志信息(error)在打印到控制台的同时,也需要存在数据库中。
    这里我们就需要用到Routing路由工作模式。
    在Pubsub订阅模式中,交换机并没有指定RoutingKey,而在Routing工作模式中我们需要指定Routingkey,以将交换机与指定的队列进行绑定。
    生产者代码:
    Routing工作模式时,生产者发送消息需要指定RoutingKey,在交换机和队列绑定时也需要指定RoutingKey,之前介绍的三种工作模式都是不需要指定RoutingKey的。
package ratbbitMq.produce_routing;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author Watching
 * * @date 2023/7/17
 * * Describe:
 */
public class producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //2.设置工厂参数
        connectionFactory.setPort(5673);
        connectionFactory.setUsername("heima");
        connectionFactory.setPassword("heima");
        connectionFactory.setHost("1.12.244.105");
        connectionFactory.setVirtualHost("itcast");
        //3.创建连接对象
        Connection connection = connectionFactory.newConnection();
        //4.创建channel
        Channel channel = connection.createChannel();
        //5.创建队列
        channel.queueDeclare("test_routing1", true, false, false, null);
        channel.queueDeclare("test_routing2", true, false, false, null);
        //6.创建交换机
        channel.exchangeDeclare("routing", BuiltinExchangeType.DIRECT);
        //7.绑定交换机和队列并设置RoutingKey
        channel.queueBind("test_routing1","routing","error");
        channel.queueBind("test_routing2","routing","warning");
        channel.queueBind("test_routing2","routing","info");
        //8.发送消息,并在发送消息的时候指定RoutingKey
        channel.basicPublish("routing","warning",null,"xxx访问了xxx方法 【warning】".getBytes());
        channel.basicPublish("routing","error",null,"xxx访问了xxx方法 【error】".getBytes());
        //9.释放资源
        channel.close();
        connection.close();
    }
}

消费者代码:
消费者代码与前面三种模式无区别

package rabbitMq.consumer_routing;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author Watching
 * * @date 2023/7/17
 * * Describe:
 */
public class Consumer_routing1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("1.12.244.105");
        connectionFactory.setVirtualHost("itcast");
        connectionFactory.setUsername("heima");
        connectionFactory.setPassword("heima");
        connectionFactory.setPort(5673);
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        channel.basicConsume("test_routing1",true, new Consumer() {
            @Override
            public void handleConsumeOk(String s) {

            }

            @Override
            public void handleCancelOk(String s) {

            }

            @Override
            public void handleCancel(String s) throws IOException {

            }

            @Override
            public void handleShutdownSignal(String s, ShutdownSignalException e) {

            }

            @Override
            public void handleRecoverOk(String s) {

            }

            @Override
            public void handleDelivery(String s, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
//                System.out.println("consumerTag:" + s);
//                System.out.println("envelope_exchange:" + envelope.getExchange());
//                System.out.println("envelope_RoutingKey:" + envelope.getRoutingKey());
//                System.out.println("properties:" + basicProperties);
                System.out.println("body:" + new String(bytes));
            }
        });
    }
}

在这里插入图片描述

5.5 Topic工作模式

Topic工作模式是更灵活的工作模式
在这里插入图片描述

在 RabbitMQ 的 Topic 模式中,通配符用于匹配特定的 Routing Key。Topic 模式支持两种通配符:星号(*)和井号(#)。

星号(*)通配符

(星号)可以匹配一个单词,单词是由点号(.)分隔的字符串。
例如,“*.apple” 可以匹配 “red.apple”、“green.apple”,但不能匹配 “apple” 或 “red.green.apple”。
井号(#)通配符

#(井号)可以匹配零个或多个单词,单词是由点号(.)分隔的字符串。
例如,“fruit.#” 可以匹配 “fruit.apple”、“fruit.orange”、“fruit.red.apple”,以及类似的任何其他组合。
注意事项:

  • 通配符只匹配单词,不匹配点号(.)本身。
  • Topic 模式中的 Routing Key 可以包含任意数量的单词。
  • 匹配时,较长的通配符(含有更多单词)将会优先匹配。
    举例说明:
    假设有一个 Topic Exchange 名为 “fruits_exchange”,有两个队列绑定到该 Exchange 上:队列 A 绑定键为 “*.apple”队列 B 绑定键为 “fruit.#”。当消息被发布到 “fruits_exchange” 时,根据消息的 Routing Key,RabbitMQ 将会将消息分发到匹配的队列。

发布消息到 “fruits_exchange”,Routing Key 为 “green.apple”,此消息将被发送到队列 A 。
发布消息到 “fruits_exchange”,Routing Key 为 “fruit.orange”,此消息将仅被发送到队列 B,因为它只匹配了队列 B 的绑定键。
通过使用不同的通配符规则,Topic 模式允许更灵活和精确地路由消息到不同的队列,以满足复杂的消息路由需求。
举例:
下面我们将使用Topic工作模式实现下面的要求
在这里插入图片描述
生产者代码:
Topic工作模式的生产者代码与Routing路由模式的生产者代码的区别是,Topic的代码在绑定队列和交换机时,需要指定带有通配符的Routingkey
,并且在定义交换机时不要忘了将交换机的类型设置为topic.

package ratbbitMq.produce_Topic;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author Watching
 * * @date 2023/7/18
 * * Describe:
 */
public class producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //2.设置工厂参数
        connectionFactory.setPort(5673);
        connectionFactory.setUsername("heima");
        connectionFactory.setPassword("heima");
        connectionFactory.setHost("1.12.244.105");
        connectionFactory.setVirtualHost("itcast");
        //3.创建连接对象
        Connection connection = connectionFactory.newConnection();
        //4.创建channel
        Channel channel = connection.createChannel();
        //5.创建队列
        channel.queueDeclare("test_Queue_Topic1", true, false, false, null);
        channel.queueDeclare("test_Queue_Topic2", true, false, false, null);
        channel.queueDeclare("test_Queue_Topic3", true, false, false, null);
        channel.queueDeclare("test_Queue_Topic4", true, false, false, null);
        //6.创建交换机
        channel.exchangeDeclare("Topic_exchange", BuiltinExchangeType.TOPIC);
        //7.绑定交换机和队列并设置RoutingKey
        channel.queueBind("test_Queue_Topic1","Topic_exchange","usa.#");
        channel.queueBind("test_Queue_Topic2","Topic_exchange","#.news");
        channel.queueBind("test_Queue_Topic3","Topic_exchange","#.weather");
        channel.queueBind("test_Queue_Topic4","Topic_exchange","europe.#");
        //8.发送消息,并在发送消息的时候指定RoutingKey
        channel.basicPublish("Topic_exchange","usa.news",null,"美国新闻【usa.news】".getBytes());
        channel.basicPublish("Topic_exchange","usa.weather",null,"美国天气【usa.weather】".getBytes());
        channel.basicPublish("Topic_exchange","europe.news",null,"欧洲新闻【europe.news】".getBytes());
        channel.basicPublish("Topic_exchange","europe.weather",null,"欧洲天气【europe.weather】".getBytes());
        //9.释放资源
        channel.close();
        connection.close();
    }
}

消费者代码:
在这个例子中我创建了四个消费者,消费者的代码都相同,只是在绑定队列时要指定获取消息的队列名称。

package rabbitMq.consumer_Topic;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author Watching
 * * @date 2023/7/17
 * * Describe:
 */
public class Consumer_topic1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("1.12.244.105");
        connectionFactory.setVirtualHost("itcast");
        connectionFactory.setUsername("heima");
        connectionFactory.setPassword("heima");
        connectionFactory.setPort(5673);
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        channel.basicConsume("test_Queue_Topic1",true, new Consumer() {
            @Override
            public void handleConsumeOk(String s) {

            }

            @Override
            public void handleCancelOk(String s) {

            }

            @Override
            public void handleCancel(String s) throws IOException {

            }

            @Override
            public void handleShutdownSignal(String s, ShutdownSignalException e) {

            }

            @Override
            public void handleRecoverOk(String s) {

            }

            @Override
            public void handleDelivery(String s, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
//                System.out.println("consumerTag:" + s);
//                System.out.println("envelope_exchange:" + envelope.getExchange());
//                System.out.println("envelope_RoutingKey:" + envelope.getRoutingKey());
//                System.out.println("properties:" + basicProperties);
                System.out.println("body:" + new String(bytes));
            }
        });
    }
}

运行结果:
在这里插入图片描述在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
小结:
Topic主题模式可以实现Pub/Sub发布与订阅模式和Routing路由模式的功能,只是Topic 在配置routing key的时候可以使用通配符,显得更加灵活。

6.Springboot整合RabbitMQ

在实际的项目中,我们是不可能直接使用原生的代码进行开发的,springboot的整合会帮助我们简化开发。
下面是springboot整合rabbitMQ的流程步骤:
生产者:

  • ①导入依赖
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
  • ②编写yml配置文件
spring:
  rabbitmq:
    password: heima
    username: heima
    port: 5673
    virtual-host: itcast
    host: 1.12.244.105
  • ③创建配置类,定义交换机、队列、绑定关系。
    创建配置类和交换机和绑定关系的代码是将原生的代码封装了,不指定的属性会是默认属性。
    这里定义的是Topic工作模式,其它类型的模式代码类似。
package com.rabbitmq.springboot_mqproducer.rabbitMQconfig;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author Watching
 * * @date 2023/7/18
 * * Describe:
 */
@Configuration
public class MQConfig {
    public static final String EXCHANGE_NAME = "boot_topic_exchange";
    public static final String QUEUE_NAME = "boot_topic_queue";

    /*
    创建交换机
     */
    @Bean("topic_exchange_bean")
    public Exchange bootExchange() {
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
    }

    /*
    创建队列
     */
    @Bean("topic_queue_bean")
    public Queue bootQueue() {
        return QueueBuilder.durable(QUEUE_NAME).build();
    }

    /*
    绑定交换机
     */
    @Bean
    public Binding bootBinding(@Qualifier("topic_exchange_bean") Exchange exchange,@Qualifier("topic_queue_bean") Queue queue){
        return BindingBuilder.bind(queue).to(exchange).with("test.*").noargs();
    }
}

  • ④注入RabbitTemplate对象,调用方法完成操作。
    这里是在测试类中执行生产者操作。
package com.rabbitmq.springboot_mqproducer;

import com.rabbitmq.springboot_mqproducer.rabbitMQconfig.MQConfig;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;

import javax.annotation.Resource;

@SpringBootTest
class SpringbootMqProducerApplicationTests {

    @Resource
    RabbitTemplate rabbitTemplate;

    @Test
    void contextLoads() {
        rabbitTemplate.convertAndSend(MQConfig.EXCHANGE_NAME,"test.hello","测试springboot整合交换机");
    }
}

消费者:
消费者前两步代码和生产者相同,不需要再编写定义交换机、队列、绑定关系的配置类,最后一步需要使用
@RabbitListener 注解进行监听。
代码:

package com.rabbit.springboot_mqconsumer;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @author Watching
 * * @date 2023/7/19
 * * Describe:
 */
@Component
public class RabbitMQListener {
    @RabbitListener(queues = {"boot_topic_queue"})//填写队列名称,可以以字符串数组的方式监听多个队列
    public void listener(Message message){
        System.out.println(message);
    }
}

小结:
SpringBoot提供了快速整合RabbitMQ的方式
基本信息再ym|中配置,队列交互机以及绑定关系在配置类中使用Bean的方式配置
生产端直接注入 RabbitTemplate 完成消息发送
消费端直接使用 @RabbitListener 完成消息接收

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/770896.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【C语言初阶(18)】结构体

文章目录 前言Ⅰ结构体的声明Ⅱ 结构体的定义Ⅲ 结构体初始化Ⅳ 访问结构体成员⒈结构体变量访问结构体成员⒉结构体指针访问结构体成员 Ⅴ 结构体的嵌套Ⅵ 结构体传参 前言 C 语言提供了一些非常基本的数据类型&#xff0c;如 int、float、double、char 等&#xff0c;这些不同…

基于SpringBoot + EasyExcel + Vue + Blob实现导出Excel文件的前后端完整过程

首先前端发起HTTP请求之后&#xff0c;后端返回一个Excel输出流&#xff0c;然后前端用Blob类型接收数据&#xff0c;并且解析响应头数据以及提取源文件名&#xff0c;最后用a标签完成下载。 一、后端代码 &#xff08;1&#xff09;导入阿里巴巴的EasyExcel依赖&#xff08;…

【C++进阶之路】list的基本使用和模拟实现

文章目录 初步认识①定义②底层原理③迭代器的分类 一、基本使用1.插入结点元素2.删除结点元素3.合并两个有序链表4.将一条链表的某一部分转移到另一条链表5.对链表排序并去重6.vector与list排序的比较 二、模拟实现①要点说明②基本框架③迭代器构造函数- -*->list里的迭代…

HG20202-2014脱脂工程施工及验收规范

为提高脱脂工程施工技术水平,加强施工过程的质量控制,保证施工质量和安全,制定本规范。 本规范适用于化工建设工程中忌油工艺介质系统的设备、管道和管道组成件仪表和仪表组成件等的脱脂。 本规范不适用于下列情况的脱脂: 1、制造领域; 2、工厂停车检修。 设计文件或用户规…

2023云曦期末复现

目录 WEB sign SSTI serialize WEB sign 有10000个 进行bp爆破 能发现 410 和 414长度 还有 420 410 414存在16进制的字符 拼凑出来为 \x66\x6c\x61\x67\x7b\x61\x63\x63\x39\x39\x66\x39\x30\x34\x66\x30\x65\x61\x66\x61\x34\x31\x63\x30\x36\x34\x33\x36\x38\x31\x3…

行为型模式 - 策略模式

概述 先看下面的图片&#xff0c;我们去旅游选择出行模式有很多种&#xff0c;可以骑自行车、可以坐汽车、可以坐火车、可以坐飞机。 作为一个程序猿&#xff0c;开发需要选择一款开发工具&#xff0c;当然可以进行代码开发的工具有很多&#xff0c;可以选择Idea进行开发&…

WPF嵌入外部exe应用程序-使用Winfom控件承载外部程序

使用Winform控件承载外部程序 在WPF中使用Winfom控件添加winform相关的程序集在XAML头中加入对这两个程序集命名空间的引用使用Winform控件效果&#xff1a;问题 在Winfom控件中嵌入exe程序准备Winfrom控件更换父窗体的句柄完整实现代码&#xff1a;实现效果&#xff1a; 问题和…

王道计算机网络学习笔记(5)——传输层和应用层

前言 文章中的内容来自B站王道考研计算机网络课程&#xff0c;想要完整学习的可以到B站官方看完整版。 五&#xff1a;传输层 5.1&#xff1a;传输层基本概述 传输层的功能&#xff1a; 1传输层提供进程和进程之间的逻辑通信 2复用和分用 微信和QQ都使用传输层的协议进行发…

设计模式-外观模式在Java中的使用示例

场景 外观模式 外观模式是一种使用频率非常高的结构型设计模式&#xff0c;它通过引入一个外观角色来简化客户端与子系统 之间的交互&#xff0c;为复杂的子系统调用提供一个统一的入口&#xff0c;降低子系统与客户端的耦合度&#xff0c;且客户端调用非常方便。 示例 自…

【区块链+体育】“数智化”的杭州亚运会,中创助力区块链技术发展

“智能”&#xff0c;是杭州亚运会的办赛理念之一。除了数字藏品开亚运先河&#xff0c;杭州亚组委充分应用区块链、大数据、人工智能等前沿技术&#xff0c;为观众提供从购票、出行、观赛到住宿、美食和旅游等“一站式”服务。 本次亚运会将全程智能陆续落到了实处&#xff0…

10亿级用户,如何做 熔断降级架构?微信和hystrix的架构对比

说在前面 在40岁老架构师 尼恩的读者社区(50)中&#xff0c;最近有小伙伴拿到了一线互联网企业如极兔、有赞、希音、百度、网易、滴滴的面试资格&#xff0c;遇到一几个很重要的面试题&#xff1a; (1) 什么是熔断&#xff0c;降级&#xff1f;如何实现&#xff1f; (2) 服务熔…

测试用例(2)

项目管理工具 主要用tapd&#xff0c;jira少用 acp 敏捷项目管理证书 task:故事&#xff0c;一个故事有开始也有结束&#xff0c;那么在项目管理里面&#xff0c;会把每个任务按照一个task来看&#xff0c;那么这个task也可以叫story&#xff0c;具体指的就是任务有开始有结…

利用鸿鹄优化共享储能的SCADA 系统功能,赋能用户数据自助分析

摘要 本文主要介绍了共享储能的 SCADA 系统大数据架构&#xff0c;以及如何利用鸿鹄来更好的优化 SCADA 系统功能&#xff0c;如何为用户进行数据自助分析赋能。 1、共享储能介绍 说到共享储能&#xff0c;可能不少朋友比较陌生&#xff0c;下面我们简单介绍一下共享储能的价值…

数组的递归筛选

数组递归筛选 根据一个值筛选出来通过 includes 递归 const options [{name: "ikun",options: [{name: "YAY11",},],},{name: "YAY",}, ];function findValue(orgOptions,val) {let newArr1 []orgOptions.forEach(item>{if(item.options…

费尔法克斯水务通过使用 Liquid UI 移动化和定制 SAP PM 来提高收入和数据完整性

背景 费尔法克斯水务是北弗吉尼亚州地区领先的水县。它是华盛顿特区大都会区的三大供水商之一。它每天为近171万居民提供2.<>亿加仑的水。它渴望坚持其愿景&#xff0c;即保持以客户为中心&#xff0c;同时帮助维持该地区的高质量生活和经济状况。 挑战 由于桌面系统&…

Druid-排查conditionDoubleConstAllow配置问题(double const condition)

Druid-排查conditionDoubleConstAllow配置问题(double const condition) 报错信息 Caused by: java.sql.SQLException: sql injection violation, dbType postgresql, druid-version 1.2.18, double const condition : SELECT * FROM test where 11 AND TRUE AND TRUE关键词&…

02-线性结构2 一元多项式的乘法与加法运算

一个小时敲&#xff0c;五分钟改错。比一年前进步还是很大的。 但是如果测试点没有提示的话&#xff0c;改到哪年就不一定了( ◔︎ ‸◔︎) 思路 多项式加法&#xff0c;极其类似Merge &#xff08;测试点2&#xff1a;系数加完要是0的话就不用添入结果多项式里了~&#xff…

业务安全分析第19期 | 今年暑假,博物馆的门票为什么抢不到?

目录 “黄牛”&#xff1a;加价代预约、加价售票、兜售野导游套餐 “黄牛”倒票带来的危害 “黄牛”为什么能够抢到票 博物馆与“黄牛”的门票攻防 “黄牛”使用的作弊软件有什么特征 技术上防范“黄牛”的作弊软件抢票 遏制“黄牛”倒票给博物馆带来的收益 随着暑期参观…

电影《碟中谍7:致命清算(上)》观后感

上周看了电影《碟中谍7&#xff1a;致命清算&#xff08;上&#xff09;》&#xff0c;从电影名称就知道&#xff0c;这部电影会有下部&#xff0c;讲述科学进步之后&#xff0c;有AI引发的技术变革&#xff0c;出现了一种AI变体叫做智体的东西&#xff0c;它有自主意思&#x…

【代码随想录 | Leetcode | 第八天】哈希表 | 有效的字母异位词 | 两个数组的交集 | 两数之和

前言 欢迎来到小K的Leetcode|代码随想录|专题化专栏&#xff0c;今天将为大家带来哈希法~有效的字母异位词 | 两个数组的交集 | 两数之和的分享✨ 目录 前言242. 有效的字母异位词349. 两个数组的交集1. 两数之和总结 242. 有效的字母异位词 ✨题目链接点这里 给定两个字符串…