RabbitMQ入门前篇

news2024/10/5 17:20:57

本篇博文目录:

      • 一.RabbitMQ
        • 1.消息队列
        • 2.RabbitMQ
        • 3.安装RabbitMQ
        • 4.RabbitMQ常用命令
      • 二.使用RabbitMQ进行编程
        • 1.AMQP
        • 2.第一次MQ通信
      • 三.RabbitMQ六中工作模式
        • 1.RabbitMQ
        • 2.Work queues
        • 3.pub/sub订阅发布模式
        • 4.Routing模式
        • 5.主题Topic模式
      • 四.RabbitMQ消息确认机制
      • 五.源代码下载

一.RabbitMQ

1.消息队列

Message Queue中文意思消息队列,是一种进程的通信机制,用于上下游传递消息。其中在二个进程之间MQ起到消息中间件的作用,实现在进程之间的解耦操作,原来是进程与进程之间直接通信,这样的话耦合性大,并且如果通信失败无法确定那一面出现问题,而通过在中间多一个信息传递就可以实现解耦,并且当有一端出现问题时,也能够知道是那一端出现了问题,保证了数据传输的可靠性。

在这里插入图片描述

2.RabbitMQ

RabbitMQ是众多消息代理服务器中使用的较广,较多的一款,RabbitMQ支持几乎所有的操作系统与编程语言,并且Rabbit提供高并发,高可用的成熟方案,支持多种消息协议,易于部署与使用。

在这里插入图片描述

下图列出了RabbitMQ与其他MQ的对比图(注意时效性,视频大概是2018出的)

在这里插入图片描述

RabbitMQ的应用场景如下:

在这里插入图片描述

3.安装RabbitMQ

  • 安装教程

Winddos环境下安装教程: https://www.cnblogs.com/chenwolong/p/rabbitmq.html
Linux环境下安装教程:https://blog.csdn.net/qq_45173404/article/details/116429302

  • Widnos的安装包下载(官网下载太慢)

erlang25.0.1版本:otp_win64_25.0.1.exe
https://www.aliyundrive.com/s/NJwrinH1VNy 提取码: m92c
rabbitmq3.11.7版本: rabbitmq-server-3.11.7.exe
https://www.aliyundrive.com/s/wkfkd7ewzNa 提取码: d43w

  • 安装完毕,启动RabbitMQ管理模块的插件后,访问http://localhost:15672/ 输入账号guest,密码guest,进行登入,会进入如下界面,说明安装成功:

在这里插入图片描述

备注:如果无法访问,你可以看看这篇博文:https://itguye.blog.csdn.net/article/details/128770009

4.RabbitMQ常用命令

可以通过下面的命令来操作RabbitMQ,当然也可以在网站上通过图形化的方式进行操作。

rabbitmq-server 前台启动服务
rabbitmq-server -detached 后台启动服务
rabbitmqctl stop 停止服务
rabbitmqctl start_app 启动应用
rabbitmqctl stop_app 终止应用
rabbitmqctl add_user {username} {password} – 创建新用户
rabbitmqctl delete_user {username} – 删除用户
rabbitmqctl change_password {username} {newpassword}– 重置密码
rabbitmqctl set_user_tags {username} {tag} – 授予用户角色(Tag)
rabbitmqctl set_permissions -p / user_admin'.*' '.*''.*'– 设置用户允许访问的vhost

上面rabbitmqctl set_user_tags {username} {tag}命令中的tag,如下:

在这里插入图片描述

二.使用RabbitMQ进行编程

1.AMQP

AMQP是一个协议规范,二个不同应用程序只要遵循该协议就可以实现通信,其中RabbitMQ就是AMQP的一种实现方式。

在这里插入图片描述

AMQP中的一些知识概念,如生产者,消费者,消息,队列和虚拟主机等,解释如下:

在这里插入图片描述

2.第一次MQ通信

  • 添加一个名为/test的虚拟主机

在这里插入图片描述

  • 创建一个maven项目,并导入依赖
    <dependencies>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.3.0</version>
        </dependency>
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.8.5</version>
        </dependency>
    </dependencies>

备注:最新版本为5.16版本: https://mvnrepository.com/artifact/com.rabbitmq/amqp-client/5.16.0

在这里插入图片描述

  • 创建utiles工具包,并创建RabbitmqUtils和RabbitConstant类,如下:

RabbitmqUtils类,Rabbit工具类,该工具类就一个方法,就是获取RabbitMq的连接对象Connection :

package utils;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RabbitmqUtils {
    private static ConnectionFactory connectionFactory = new ConnectionFactory();
        // 静态代码块进行初始化
        static{
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);
            connectionFactory.setUsername("guest");
            connectionFactory.setPassword("guest");
            connectionFactory.setVirtualHost("/test");
        }


        // 获取mq连接对象
        public static Connection getConnection() {
            try {
                Connection connection = connectionFactory.newConnection();
                return connection;
            } catch (Exception e) {
              throw  new RuntimeException();
            }
        }
}

RabbitConstant类,该类存放RabbitMq的配置常量信息:

package utils;

public class RabbitConstant {
    public static final String QUEUE_HELLOWORLD = "helloworld";
    public static final String QUEUE_SMS = "sms";
    public static final String EXCHANGE_WEATHER = "weather";
    public static final String EXCHANGE_WEATHER_ROUTING = "weather_routing";
    public static final String QUEUE_BAIDU = "baidu";
    public static final String QUEUE_SINA = "sina";
    public static final String EXCHANGE_WEATHER_TOPIC = "weather_topic";
}


  • 创建一个helloworld的包,在该文件下创建生产者类和消费者类,来实现生产者发送一个helloworld的字符串,然后消费者接受该字符串的操作。

Producer类:首先通过Rabbitmq的工具类获取连接对象,然后通过连接对象创建通道(虚拟连接),接着通过虚拟连接创建一个名为helloworld的队列,并往队列中传递数据hellowrld:

package helloworld;


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import utils.RabbitConstant;
import utils.RabbitmqUtils;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

/**
 * 生产者
 */
public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 获取连接对象
        Connection connection = RabbitmqUtils.getConnection();
        // 创建通道,虚拟连接
        Channel channel = connection.createChannel();
        // 声明一个队列
        channel.queueDeclare(RabbitConstant.QUEUE_HELLOWORLD, false, false, false, null);
        String message = "hello world";
        channel.basicPublish("", RabbitConstant.QUEUE_HELLOWORLD, null, message.getBytes(StandardCharsets.UTF_8));
        System.out.println("发送数据成功");
        // 关闭虚拟通道
        channel.close();
        // 关闭连接
        connection.close();
    }
}


ConSummer类:通过工具类获取连接,然后创建通道,通过通道使用helloworld队列,并创建一个消费者对象,传入匿名内部类DefaultConsumer,并传入channel对象,获取helloworld消息队列的数据:

package helloworld;


import com.rabbitmq.client.*;
import utils.RabbitConstant;
import utils.RabbitmqUtils;

import java.io.IOException;

/**
 * 消费者
 */
public class ConSummer {
    public static void main(String[] args) throws IOException {
            // 获取mq连接
        Connection connection = RabbitmqUtils.getConnection();
        // 创建通道号
        Channel channel = connection.createChannel();
        // 获取队列
        channel.queueDeclare(RabbitConstant.QUEUE_HELLOWORLD, false, false, false, null);
        // 接收
        channel.basicConsume(RabbitConstant.QUEUE_HELLOWORLD,false, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收到的数据:"+new String(body));
                //签收消息,确认消息
                //envelope.getDeliveryTag() 获取这个消息的TagId
                //false只确认签收当前的消息,设置为true的时候则代表签收该消费者所有未签收的消息
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        });
    }
}

  • 运行效果

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

  • 消息的状态

在这里插入图片描述

三.RabbitMQ六中工作模式

1.RabbitMQ

RabbitMQ六种工作模式中,2~5使用较多,下面会给出实例代码,对于这几种方式存在一定的一致性,都是在方式1的基础上进行添加,功能更加丰富(上文中的案例代码就是Hello World方式)。

在这里插入图片描述

2.Work queues

本实例模拟短信通知服务,就是用户购买订单成功后,通过RabbitMQ发送短信给用户进行订单确认,实例代码,首先在helloworld项目中创建一个名为workqueue的包,并创建SMS,Producer,ConSummerOne,ConSummerTwo,ConSummerThree这五个类,详细代码如下:

在这里插入图片描述

在这里插入图片描述

  • SMS类:实体类
package workqueue;

public class SMS {
    private String name;
    private String mobile;
    private String content;

    public SMS(String name, String mobile, String content) {
        this.name = name;
        this.mobile = mobile;
        this.content = content;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getMobile() {
        return mobile;
    }

    public void setMobile(String mobile) {
        this.mobile = mobile;
    }

    public String getContent() {
        return content;
    }

    public void setContent(String content) {
        this.content = content;
    }
}

  • Producer:生产者,用来模拟100个用户同时订票
package workqueue;


import com.google.gson.Gson;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import utils.RabbitConstant;
import utils.RabbitmqUtils;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

/**
 * 生产者
 */
public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 获取连接对象
        Connection connection = RabbitmqUtils.getConnection();
        // 创建通道,虚拟连接
        Channel channel = connection.createChannel();
        // 声明一个队列
        channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);
        // 给100给订票的用户发送订票成功的短信信息
        for (int i = 1; i <= 100; i++) {
            SMS sms = new SMS("乘客" + i, "13900000" + i, "您的车票已预订成功");
            // 将sms转换为jSON对象
            String message = new Gson().toJson(sms);
            channel.basicPublish("", RabbitConstant.QUEUE_SMS, null, message.getBytes(StandardCharsets.UTF_8));
        }
        System.out.println("发送成功");
        // 关闭虚拟通道
        channel.close();
        // 关闭连接
        connection.close();

    }
}

  • ConSummerOne类:发送短信处理1
package workqueue;


import com.rabbitmq.client.*;
import utils.RabbitConstant;
import utils.RabbitmqUtils;

import java.io.IOException;

/**
 * 消费者
 */
public class ConSummerOne {
    public static void main(String[] args) throws IOException {
            // 获取mq连接
        Connection connection = RabbitmqUtils.getConnection();
        // 创建通道号
        Channel channel = connection.createChannel();
        // 获取队列
        channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);
        channel.basicQos(1);// 为1时表示处理完一个取一个,不加就是一次取多个

        // 接收
        channel.basicConsume(RabbitConstant.QUEUE_SMS,false, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1,进行发送:"+new String(body));
                try {
                    Thread.sleep(100);// 延时0.1s
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //签收消息,确认消息
                //envelope.getDeliveryTag() 获取这个消息的TagId
                //false只确认签收当前的消息,设置为true的时候则代表签收该消费者所有未签收的消息
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        });
    }
}

  • ConSummerTwo类:发送短信2
package workqueue;


import com.rabbitmq.client.*;
import utils.RabbitConstant;
import utils.RabbitmqUtils;

import java.io.IOException;

/**
 * 消费者
 */
public class ConSummerTwo {
    public static void main(String[] args) throws IOException {
            // 获取mq连接
        Connection connection = RabbitmqUtils.getConnection();
        // 创建通道号
        Channel channel = connection.createChannel();
        // 获取队列
        channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);
        channel.basicQos(1);// 为1时表示处理完一个取一个,不加就是一次取多个
        // 接收
        channel.basicConsume(RabbitConstant.QUEUE_SMS,false, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者2,进行发送:"+new String(body));
                try {
                    Thread.sleep(100);// 延时0.1s
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //签收消息,确认消息
                //envelope.getDeliveryTag() 获取这个消息的TagId
                //false只确认签收当前的消息,设置为true的时候则代表签收该消费者所有未签收的消息
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        });
    }
}

  • ConsumerThree类:发送短信3
package workqueue;

import com.rabbitmq.client.*;
import utils.RabbitConstant;
import utils.RabbitmqUtils;

import java.io.IOException;

public class ConsumerThree {
    public static void main(String[] args) throws IOException {
        // 获取mq连接
        Connection connection = RabbitmqUtils.getConnection();
        // 创建通道号
        Channel channel = connection.createChannel();
        // 获取队列
        channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);
        channel.basicQos(1);// 为1时表示处理完一个取一个,不加就是一次取多个
        // 接收
        channel.basicConsume(RabbitConstant.QUEUE_SMS,false, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者3,进行发送:"+new String(body));
                try {
                    Thread.sleep(1000);// 延时1s
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //签收消息,确认消息
                //envelope.getDeliveryTag() 获取这个消息的TagId
                //false只确认签收当前的消息,设置为true的时候则代表签收该消费者所有未签收的消息
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        });
    }
}

  • 运行效果(先运行Consumerxxx,再运行Producer)

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

  • 和helloworld的区别

helloworld是一个发送一个接收(一个生产者,一个消费者),而workqueneu是一个发送多个接收(一个生产者,多个消费者),并且代码上没有多大区别,区别就是多写几个消费者类和消费者类中多了一个 channel.basicQos(1);,这个不是强制性添加,如果加上了,表示处理完后取一个,如果不加就是一次取多个再进行处理,显然前者更好,更合理使用消费者。

在这里插入图片描述

3.pub/sub订阅发布模式

该实例代码是模拟中国气象局提供气象数据给所有订阅的消费者,如百度,新浪等消费者,实现就是将中国气象局的数据接入到交换机中,然后百度,新浪等平台绑定交换机就可以获取中国气象局的数据。

在这里插入图片描述

在这里插入图片描述

  • 首先,在http://localhost:15672/ 管理Web网页中,添加一个交换机,如下:

在这里插入图片描述

  • 在helloworld项目中创建一个名为pubsub的包,并创建WeatherProducer,SinaSummer,BaiduSummer三个类,详细代码如下:

WeatherProducer类:生成者,中国气象局发布天气数据

package pubsub;


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import utils.RabbitConstant;
import utils.RabbitmqUtils;

import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;

public class WeatherProducer {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitmqUtils.getConnection();
        String input = new Scanner(System.in).next();
        Channel channel = connection.createChannel();
        channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER,"" , null , input.getBytes());
        channel.close();
        connection.close();
    }
}

BaiduSummer类:百度消费者

package pubsub;


import com.rabbitmq.client.*;
import utils.RabbitConstant;
import utils.RabbitmqUtils;

import java.io.IOException;

public class BaiduSummer {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitmqUtils.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(RabbitConstant.QUEUE_BAIDU, false, false, false, null);
        //queueBind用于将队列与交换机绑定
        //参数1:队列名 参数2:交互机名  参数三:路由key(暂时用不到)
        channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER, "");
        channel.basicQos(1);
        channel.basicConsume(RabbitConstant.QUEUE_BAIDU , false , new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("百度收到气象信息:" + new String(body));
                channel.basicAck(envelope.getDeliveryTag() , false);
            }
        });
    }
}

SinaSummer类:新浪消费者

package pubsub;


import com.rabbitmq.client.*;
import utils.RabbitConstant;
import utils.RabbitmqUtils;

import java.io.IOException;

public class SinaSummer {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitmqUtils.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(RabbitConstant.QUEUE_SINA, false, false, false, null);
        //queueBind用于将队列与交换机绑定
        //参数1:队列名 参数2:交互机名  参数三:路由key(暂时用不到)
        channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER, "");
        channel.basicQos(1);
        channel.basicConsume(RabbitConstant.QUEUE_SINA , false , new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("新浪收到气象信息:" + new String(body));
                channel.basicAck(envelope.getDeliveryTag() , false);
            }
        });
    }
}

  • 运行效果(先运行消费者,在运行生产者)

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

  • 和workqueueu的区别

首先二者都是一个生成者多个消费者,不同的是后者需要在WEB网页上创建交换机,并且代码上订阅与发布多了一个queueBind绑定操作和生产者中的basicPublish()使用的是exchange,通过上面的配置就可以通过消费者根据绑定的交互机找到对应生成者交换机上的数据,也就是实现了生产者发布主题,消费者订阅主题,并收到订阅主题的数据。

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

4.Routing模式

Routing模式在订阅与发布的基础上进行扩展,增加了条件就是交换机会根据Routing Key的条件进行数据刷选再发给消费者队列。

在这里插入图片描述

  • 创建交换机weather_routing

在这里插入图片描述

  • 和上一个案例代码差不多,创建包和类:

在这里插入图片描述

  • 详细代码

WeatherBureau类:生产者

package com.itlaoqi.rabbitmq.routing;

import com.itlaoqi.rabbitmq.utils.RabbitConstant;
import com.itlaoqi.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;

public class WeatherBureau {
    public static void main(String[] args) throws IOException, TimeoutException {
        Map area = new LinkedHashMap<String, String>();
        area.put("china.hebei.shijiazhuang.20991011", "中国河北石家庄20991011天气数据");
        area.put("china.shandong.qingdao.20991011", "中国山东青岛20991011天气数据");
        area.put("china.henan.zhengzhou.20991011", "中国河南郑州20991011天气数据");
        area.put("us.cal.la.20991011", "美国加州洛杉矶20991011天气数据");

        area.put("china.hebei.shijiazhuang.20991012", "中国河北石家庄20991012天气数据");
        area.put("china.shandong.qingdao.20991012", "中国山东青岛20991012天气数据");
        area.put("china.henan.zhengzhou.20991012", "中国河南郑州20991012天气数据");
        area.put("us.cal.la.20991012", "美国加州洛杉矶20991012天气数据");

        Connection connection = RabbitUtils.getConnection();
        Channel channel = connection.createChannel();
        Iterator<Map.Entry<String, String>> itr = area.entrySet().iterator();
        while (itr.hasNext()) {
            Map.Entry<String, String> me = itr.next();
            //Routing key 第二个参数相当于数据筛选的条件
            channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_ROUTING,me.getKey() , null , me.getValue().getBytes());
        }

        channel.close();
        connection.close();
    }
}

Sina类:新浪消费者

package com.itlaoqi.rabbitmq.routing;

import com.itlaoqi.rabbitmq.utils.RabbitConstant;
import com.itlaoqi.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Sina {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitUtils.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(RabbitConstant.QUEUE_SINA, false, false, false, null);

        channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "us.cal.la.20991011");
        channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.henan.zhengzhou.20991011");
        channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "us.cal.la.20991012");
        channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.henan.zhengzhou.20991012");

        channel.basicQos(1);
        channel.basicConsume(RabbitConstant.QUEUE_SINA , false , new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("新浪收到气象信息:" + new String(body));
                channel.basicAck(envelope.getDeliveryTag() , false);
            }
        });
    }
}

Baidu类:百度消费者

package com.itlaoqi.rabbitmq.routing;

import com.itlaoqi.rabbitmq.utils.RabbitConstant;
import com.itlaoqi.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Baidu {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitUtils.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(RabbitConstant.QUEUE_BAIDU, false, false, false, null);
        //queueBind用于将队列与交换机绑定
        //参数1:队列名 参数2:交互机名  参数三:路由key
        channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.shandong.qingdao.20991011");
        channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.shandong.qingdao.20991012");
        channel.basicQos(1);
        channel.basicConsume(RabbitConstant.QUEUE_BAIDU , false , new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("百度收到气象信息:" + new String(body));
                channel.basicAck(envelope.getDeliveryTag() , false);
            }
        });
    }
}

  • 测试(先运行消费者,在运行生产者)

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

  • 和订阅与发布的区别

首先是交换机的类型变为了direct,并且Routing中生成者的basicPublish()多了一个routingKey参数,在消费者中会根据queueBind的routingKey和生产者中的routingKey做比较,如果一致就接收,不一致不接收。

在这里插入图片描述

在这里插入图片描述

5.主题Topic模式

主题Topic模式是在Routing模式下再一次进行扩展,和Routing不同的是,后者支持模糊查询,通过通配符的方式,*表示任意一个字符,#表示任意多个字符。

在这里插入图片描述

  • 在WEB管理上创建Topic的交换机

在这里插入图片描述

  • 项目和Routing差不做,直接说不同点

消费者代码中的queueBind()里的routingKey采用通配符

在这里插入图片描述
在这里插入图片描述

  • 运行效果(先执行消费者,在执行生产者)

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

四.RabbitMQ消息确认机制

通过RabbitMQ消息确认机制可以知道生产者和代理人(Broker)之间发送数据的情况,有二种情况,情况一Confirm表示消息送到Broker了,如果为ack表示Broker接收,为nack表示没有接收。情况二Return表示消息被Broker正常接收(ack)后,但Broker没有对应的队列进行投递时产生的状态,消息被退回给生产者。

在这里插入图片描述
在这里插入图片描述

  • 实例代码,那上面的例子为例,由于消息确认机制是生产者和Broker之间的事情,所以不用管消费者,所以只需要在生产者中添加代码,详细代码如下。
package confirm;

import com.rabbitmq.client.*;
import utils.RabbitConstant;
import utils.RabbitmqUtils;

import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

public class WeatherBureau {
    public static void main(String[] args) throws IOException, TimeoutException {
        Map area = new LinkedHashMap<String, String>();
        area.put("china.hebei.shijiazhuang.20991011", "中国河北石家庄20991011天气数据");
        area.put("china.shandong.qingdao.20991011", "中国山东青岛20991011天气数据");
        area.put("china.henan.zhengzhou.20991011", "中国河南郑州20991011天气数据");
        area.put("us.cal.la.20991011", "美国加州洛杉矶20991011天气数据");

        area.put("china.hebei.shijiazhuang.20991012", "中国河北石家庄20991012天气数据");
        area.put("china.shandong.qingdao.20991012", "中国山东青岛20991012天气数据");
        area.put("china.henan.zhengzhou.20991012", "中国河南郑州20991012天气数据");
        area.put("us.cal.la.20991012", "美国加州洛杉矶20991012天气数据");

        Connection connection = RabbitmqUtils.getConnection();
        Channel channel = connection.createChannel();
        //开启confirm监听模式
        channel.confirmSelect();
        // 进行监听
        channel.addConfirmListener(new ConfirmListener() {
            public void handleAck(long l, boolean b) throws IOException {
                //第二个参数代表接收的数据是否为批量接收,一般我们用不到。
                System.out.println("消息已被Broker接收,Tag:" + l);
            }

            public void handleNack(long l, boolean b) throws IOException {
                System.out.println("消息已被Broker拒收,Tag:" + l);
            }
        });
        channel.addReturnListener(new ReturnCallback() {
            public void handle(Return r) {
                System.err.println("===========================");
                System.err.println("Return编码:" + r.getReplyCode() + "-Return描述:" + r.getReplyText());
                System.err.println("交换机:" + r.getExchange() + "-路由key:" + r.getRoutingKey() );
                System.err.println("Return主题:" + new String(r.getBody()));
                System.err.println("===========================");
            }
        });
        Iterator<Map.Entry<String, String>> itr = area.entrySet().iterator();
        while (itr.hasNext()) {
            Map.Entry<String, String> me = itr.next();
            //Routing key 第二个参数相当于数据筛选的条件
            //第三个参数为:mandatory true代表如果消息无法正常投递则return回生产者,如果false,则直接将消息放弃。
            channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_TOPIC,me.getKey() ,true, null , me.getValue().getBytes());
        }

        /*channel.close();
        connection.close();*/
    }
}

  • 不同处

在这里插入图片描述

  • 运行效果

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

五.源代码下载

在我的微信公众号后台回复 rabbitmq 就可以获取本篇博文相关的源代码了,如果有什么疑问后台给为留言,我看见会第一时间回复你的。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/183471.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

我性格比较内向,适合做管理吗?

许多刚走上管理岗位的朋友&#xff0c;都有这样的困惑&#xff1a;1.我比较内向&#xff0c;不适合做管理。2.我不擅长演讲&#xff0c;没有领导才能。3.我太谨小慎微了&#xff0c;做不好领导。4.我太喜欢出风头&#xff0c;静不下心来做管理。5.我太强势了&#xff0c;团队很…

GuLi商城-项目初始结构创建,GitHub仓库创建

GitHub账号和密码 账号&#xff1a;11360XXXXXqq.com 密码&#xff1a;ZH**SH*19**1016 新建仓库&#xff1a; gulimall 记得勾选下Add a README file&#xff0c;上面忘记勾选了&#xff0c;实际建议还是要勾选下 复制路径&#xff1a; 打开IDEA检出项目 创建商品微服务模…

芯片设计|FPGA 设计的指导原则(一)

这一部分主要介绍 FPGA/CPLD 设计的指导性原则&#xff0c;如 FPGA 设计的基本原则、基本设计思想、基本操作技巧、常用模等。 FPGA/CPLD 设计的基本原则、思想、技巧和常用模块是一个非常大的问题&#xff0c;在此不可能面面俱到&#xff0c;只能我们公司项目中常用的一些设计…

权值线段树 详解+操作模板(c++)

文章目录权值线段树添加一个数字求某数出现的次数查询一段区间中数字出现的次数查询整个值域中第k小的数查询整个值域中第k大的数例子&#xff1a;求逆序对关于基本线段树与线段树的模板&#xff0c;请看我们之前发布的博客&#xff1a; 线段树入门详解 维护加法乘法&#xff0…

关于“茴香豆的‘茴’有几种写法”:学习过程中,若时间精力有限则优先记住最好用的一种

学习过程中的细节整理和精力节省权衡 我平时学习有整理总结、记笔记的习惯。 我学新东西总是很慢&#xff0c;因为细节处几乎都不会放过&#xff0c;会去发散&#xff0c;去深挖&#xff0c;去比较之前。 刚才上网&#xff0c;查了C语言中二维数组的赋值方式&#xff0c;某个…

UVM实战笔记(七)

第七章. UVM中的寄存器模型 7.1 寄存器模型简介 7.1.1 带寄存器配置总线的DUT 本章节使用的DUT带寄存器配置&#xff0c;代码如下&#xff1a; module dut(clk,rst_n,bus_cmd_valid,bus_op,bus_addr,bus_wr_data,bus_rd_data,rxd,rx_dv,txd,tx_en)input clk; …

DaVinci:Camera Raw(Sony RAW)

本文主要介绍 Sony RAW 格式素材相关的 Camera Raw 参数。解码质量Decode Quality解码质量决定了图像解拜耳之后所呈现的素质。默认为“使用项目设置” Use project setting&#xff0c;表示使用项目设置对话框中的“Camera RAW”解码质量设置。还可选择&#xff1a;全分辨率 -…

JavaEE-网络编程

目录一、网络编程套接字二、UDP Socket2.1 客户端服务器程序-回显服务(EchoServer)2.1.1 UdpEchoServer2.1.2 UdpEchoClient2.1.3 一个简单程序三、TCP 客户端服务器程序3.1 TCP API一、网络编程套接字 网络编程套接字就是操作系统给应用程序提供的一组API(叫做socket API)。 …

NLP学习笔记(七) BERT简明介绍

大家好&#xff0c;我是半虹&#xff0c;这篇文章来讲 BERT\text{BERT}BERT (Bidirectional Encoder Representations from Transformers) 原始论文请戳这里 0 概述 从某种程度上来说&#xff0c;深度学习至关重要的一环就是表征学习&#xff0c;也就是学习如何得到数据的向…

怎么把两个PDF合并?教你们几个简单的方法

不知道大家平时处理文件的数量多不多&#xff0c;但是小编日常处理文件真的特别多&#xff0c;所以小编经常会使用专业的格式转换器来处理文件&#xff0c;这样就可以高效处理文件了&#xff0c;例如我们需要将多个PDF文件合并&#xff0c;这样就只需要传输一个文件就可以了&am…

自定义starter解决请求绕过网关问题

引言 微服务项目中网关是一个常见的模块&#xff0c;通过网关的分发可以实现负载均衡、鉴权等操作&#xff1b;但是搭建好网关可以发现&#xff0c;虽然可以通过网关端口请求后端&#xff0c;如果有其他服务的地址依然可以使用其他服务地址绕过网关请求&#xff0c;这里我提供…

利用RadminLan和TcpRoute2将工作带回家

需要准备的工具 1.RadminLan 下载地址–>https://www.radmin-lan.cn/ 2.TcpRoute2 项目地址–>https://github.com/GameXG/TcpRoute2 *选用&#xff1a;浏览器插件proxy-switchyomega&#xff1a;https://microsoftedge.microsoft.com/addons/detail/proxy-switchyomega…

Visual Studio Code 的安装和使用

Visual Stuio Code 微软出的一款免费编辑器。 有 Windows、Linux 和macOS 三种版本的&#xff0c;属于跨平台的编辑器。它功能强大&#xff0c;支持插件工具安装&#xff0c;对于写代码、阅读代码的人来说是非常方便的。 1、安装 Visual Stuio Code 下载地址如下&#xff1a; h…

win10修改jdk版本之后不生效的有效解决方法

问题起因今天学习seata的时候&#xff0c;启动seata服务发现启动不了报下图错误。发现是自己jdk版本太高了&#xff0c;现在我用的是jdk17。然后我修改jdk的环境变量&#xff0c;确定保存好。发现jdk的版本还是没有变化。问题原因当使用安装版本的JDK程序时&#xff08;一般是1…

jmeter 并发测试

1.右键测试计划(Test plan), 添加线程组 2.线程组配置 3.右键线程组, 添加取样器-HTTP请求 4.HTTP请求配置 5. 添加查看结果树(也可以在Test plan 测试计划上右键添加)

Grafana 系列文章(二):使用 Grafana Agent 和 Grafana Tempo 进行 Tracing

&#x1f449;️URL: https://grafana.com/blog/2020/11/17/tracing-with-the-grafana-cloud-agent-and-grafana-tempo/ ✍Author: Robert Fratto • 17 Nov 2020 &#x1f4dd;Description: Heres your starter guide to configuring the Grafana Agent to collect traces and…

【刷题】多数元素

这是leetcode第169题的解答。 目录 一、多数元素 二、实现思路 1.排序中间下标求众数 2.投票法 总结 一、多数元素 多数元素是指在数组中出现次数 大于 ⌊ n/2 ⌋ 的元素。 二、实现思路 1.排序中间下标求众数 原理&#xff1a; 通过排序使得数组有序&#xff0c;因为多数元素…

ESP32设备驱动-TM1637-驱动4位7段数码管

TM1637-驱动4位7段数码管 1、TM1637介绍 TM1637是一款带键盘扫描接口的LED(发光二极管显示器)驱动控制专用电路,内部集成了MCU数字接口、数据锁存、LED高压驱动、键盘扫描等功能。 TM1637使用DIP20/SOP20封装,主要适用于电磁炉、微波炉、小家电的显示驱动。 TM1637有如下…

【C++】初识C++

本期博客我们来正式进入到期待已久C嘎嘎的学习希望C语言以后别给我打电话了&#xff0c;我怕C误会&#x1f63c;一、认识C1. 什么是C C语言是结构化和模块化的语言&#xff0c;适合处理较小规模的程序。对于复杂的问题&#xff0c;规模较大的 程序&#xff0c;需要高度的抽象和…

蓝奥声无线单火控制技术在单火开关应用中的优势

随着科技的发展&#xff0c;智能产品在生活中越来越常见&#xff0c;为方便业主使用&#xff0c;就连开关也有了高阶智能版&#xff0c;据相关专家介绍&#xff0c;智能开关主要分为单火和零火两种&#xff0c;很多非专业人士搞不明白&#xff0c;但又害怕因此选择失误。那么&a…