本篇博文目录:
- 一.RabbitMQ
- 1.消息队列
- 2.RabbitMQ
- 3.安装RabbitMQ
- 4.RabbitMQ常用命令
- 二.使用RabbitMQ进行编程
- 1.AMQP
- 2.第一次MQ通信
- 三.RabbitMQ六中工作模式
- 1.RabbitMQ
- 2.Work queues
- 3.pub/sub订阅发布模式
- 4.Routing模式
- 5.主题Topic模式
- 四.RabbitMQ消息确认机制
- 五.源代码下载
一.RabbitMQ
1.消息队列
Message Queue中文意思消息队列,是一种进程的通信机制,用于上下游传递消息。其中在二个进程之间MQ起到消息中间件的作用,实现在进程之间的解耦操作,原来是进程与进程之间直接通信,这样的话耦合性大,并且如果通信失败无法确定那一面出现问题,而通过在中间多一个信息传递就可以实现解耦,并且当有一端出现问题时,也能够知道是那一端出现了问题,保证了数据传输的可靠性。
2.RabbitMQ
RabbitMQ是众多消息代理服务器中使用的较广,较多的一款,RabbitMQ支持几乎所有的操作系统与编程语言,并且Rabbit提供高并发,高可用的成熟方案,支持多种消息协议,易于部署与使用。
下图列出了RabbitMQ与其他MQ的对比图(注意时效性,视频大概是2018出的)
RabbitMQ的应用场景如下:
3.安装RabbitMQ
- 安装教程
Winddos环境下安装教程: https://www.cnblogs.com/chenwolong/p/rabbitmq.html
Linux环境下安装教程:https://blog.csdn.net/qq_45173404/article/details/116429302
- Widnos的安装包下载(官网下载太慢)
erlang25.0.1版本:otp_win64_25.0.1.exe
https://www.aliyundrive.com/s/NJwrinH1VNy 提取码: m92c
rabbitmq3.11.7版本: rabbitmq-server-3.11.7.exe
https://www.aliyundrive.com/s/wkfkd7ewzNa 提取码: d43w
- 安装完毕,启动RabbitMQ管理模块的插件后,访问http://localhost:15672/ 输入账号guest,密码guest,进行登入,会进入如下界面,说明安装成功:
备注:如果无法访问,你可以看看这篇博文:https://itguye.blog.csdn.net/article/details/128770009
4.RabbitMQ常用命令
可以通过下面的命令来操作RabbitMQ,当然也可以在网站上通过图形化的方式进行操作。
rabbitmq-server 前台启动服务
rabbitmq-server -detached 后台启动服务
rabbitmqctl stop 停止服务
rabbitmqctl start_app 启动应用
rabbitmqctl stop_app 终止应用
rabbitmqctl add_user {username} {password} – 创建新用户
rabbitmqctl delete_user {username} – 删除用户
rabbitmqctl change_password {username} {newpassword}– 重置密码
rabbitmqctl set_user_tags {username} {tag} – 授予用户角色(Tag)
rabbitmqctl set_permissions -p / user_admin'.*' '.*''.*'– 设置用户允许访问的vhost
上面rabbitmqctl set_user_tags {username} {tag}命令中的tag,如下:
二.使用RabbitMQ进行编程
1.AMQP
AMQP是一个协议规范,二个不同应用程序只要遵循该协议就可以实现通信,其中RabbitMQ就是AMQP的一种实现方式。
AMQP中的一些知识概念,如生产者,消费者,消息,队列和虚拟主机等,解释如下:
2.第一次MQ通信
- 添加一个名为/test的虚拟主机
- 创建一个maven项目,并导入依赖
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.3.0</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.5</version>
</dependency>
</dependencies>
备注:最新版本为5.16版本: https://mvnrepository.com/artifact/com.rabbitmq/amqp-client/5.16.0
- 创建utiles工具包,并创建RabbitmqUtils和RabbitConstant类,如下:
RabbitmqUtils类,Rabbit工具类,该工具类就一个方法,就是获取RabbitMq的连接对象Connection :
package utils;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RabbitmqUtils {
private static ConnectionFactory connectionFactory = new ConnectionFactory();
// 静态代码块进行初始化
static{
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/test");
}
// 获取mq连接对象
public static Connection getConnection() {
try {
Connection connection = connectionFactory.newConnection();
return connection;
} catch (Exception e) {
throw new RuntimeException();
}
}
}
RabbitConstant类,该类存放RabbitMq的配置常量信息:
package utils;
public class RabbitConstant {
public static final String QUEUE_HELLOWORLD = "helloworld";
public static final String QUEUE_SMS = "sms";
public static final String EXCHANGE_WEATHER = "weather";
public static final String EXCHANGE_WEATHER_ROUTING = "weather_routing";
public static final String QUEUE_BAIDU = "baidu";
public static final String QUEUE_SINA = "sina";
public static final String EXCHANGE_WEATHER_TOPIC = "weather_topic";
}
- 创建一个helloworld的包,在该文件下创建生产者类和消费者类,来实现生产者发送一个helloworld的字符串,然后消费者接受该字符串的操作。
Producer类:首先通过Rabbitmq的工具类获取连接对象,然后通过连接对象创建通道(虚拟连接),接着通过虚拟连接创建一个名为helloworld的队列,并往队列中传递数据hellowrld:
package helloworld;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import utils.RabbitConstant;
import utils.RabbitmqUtils;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
/**
* 生产者
*/
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
// 获取连接对象
Connection connection = RabbitmqUtils.getConnection();
// 创建通道,虚拟连接
Channel channel = connection.createChannel();
// 声明一个队列
channel.queueDeclare(RabbitConstant.QUEUE_HELLOWORLD, false, false, false, null);
String message = "hello world";
channel.basicPublish("", RabbitConstant.QUEUE_HELLOWORLD, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println("发送数据成功");
// 关闭虚拟通道
channel.close();
// 关闭连接
connection.close();
}
}
ConSummer类:通过工具类获取连接,然后创建通道,通过通道使用helloworld队列,并创建一个消费者对象,传入匿名内部类DefaultConsumer,并传入channel对象,获取helloworld消息队列的数据:
package helloworld;
import com.rabbitmq.client.*;
import utils.RabbitConstant;
import utils.RabbitmqUtils;
import java.io.IOException;
/**
* 消费者
*/
public class ConSummer {
public static void main(String[] args) throws IOException {
// 获取mq连接
Connection connection = RabbitmqUtils.getConnection();
// 创建通道号
Channel channel = connection.createChannel();
// 获取队列
channel.queueDeclare(RabbitConstant.QUEUE_HELLOWORLD, false, false, false, null);
// 接收
channel.basicConsume(RabbitConstant.QUEUE_HELLOWORLD,false, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到的数据:"+new String(body));
//签收消息,确认消息
//envelope.getDeliveryTag() 获取这个消息的TagId
//false只确认签收当前的消息,设置为true的时候则代表签收该消费者所有未签收的消息
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
}
}
- 运行效果
- 消息的状态
三.RabbitMQ六中工作模式
1.RabbitMQ
RabbitMQ六种工作模式中,2~5使用较多,下面会给出实例代码,对于这几种方式存在一定的一致性,都是在方式1的基础上进行添加,功能更加丰富(上文中的案例代码就是Hello World方式)。
2.Work queues
本实例模拟短信通知服务,就是用户购买订单成功后,通过RabbitMQ发送短信给用户进行订单确认,实例代码,首先在helloworld项目中创建一个名为workqueue的包,并创建SMS,Producer,ConSummerOne,ConSummerTwo,ConSummerThree这五个类,详细代码如下:
- SMS类:实体类
package workqueue;
public class SMS {
private String name;
private String mobile;
private String content;
public SMS(String name, String mobile, String content) {
this.name = name;
this.mobile = mobile;
this.content = content;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getMobile() {
return mobile;
}
public void setMobile(String mobile) {
this.mobile = mobile;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
}
- Producer:生产者,用来模拟100个用户同时订票
package workqueue;
import com.google.gson.Gson;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import utils.RabbitConstant;
import utils.RabbitmqUtils;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
/**
* 生产者
*/
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
// 获取连接对象
Connection connection = RabbitmqUtils.getConnection();
// 创建通道,虚拟连接
Channel channel = connection.createChannel();
// 声明一个队列
channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);
// 给100给订票的用户发送订票成功的短信信息
for (int i = 1; i <= 100; i++) {
SMS sms = new SMS("乘客" + i, "13900000" + i, "您的车票已预订成功");
// 将sms转换为jSON对象
String message = new Gson().toJson(sms);
channel.basicPublish("", RabbitConstant.QUEUE_SMS, null, message.getBytes(StandardCharsets.UTF_8));
}
System.out.println("发送成功");
// 关闭虚拟通道
channel.close();
// 关闭连接
connection.close();
}
}
- ConSummerOne类:发送短信处理1
package workqueue;
import com.rabbitmq.client.*;
import utils.RabbitConstant;
import utils.RabbitmqUtils;
import java.io.IOException;
/**
* 消费者
*/
public class ConSummerOne {
public static void main(String[] args) throws IOException {
// 获取mq连接
Connection connection = RabbitmqUtils.getConnection();
// 创建通道号
Channel channel = connection.createChannel();
// 获取队列
channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);
channel.basicQos(1);// 为1时表示处理完一个取一个,不加就是一次取多个
// 接收
channel.basicConsume(RabbitConstant.QUEUE_SMS,false, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1,进行发送:"+new String(body));
try {
Thread.sleep(100);// 延时0.1s
} catch (InterruptedException e) {
e.printStackTrace();
}
//签收消息,确认消息
//envelope.getDeliveryTag() 获取这个消息的TagId
//false只确认签收当前的消息,设置为true的时候则代表签收该消费者所有未签收的消息
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
}
}
- ConSummerTwo类:发送短信2
package workqueue;
import com.rabbitmq.client.*;
import utils.RabbitConstant;
import utils.RabbitmqUtils;
import java.io.IOException;
/**
* 消费者
*/
public class ConSummerTwo {
public static void main(String[] args) throws IOException {
// 获取mq连接
Connection connection = RabbitmqUtils.getConnection();
// 创建通道号
Channel channel = connection.createChannel();
// 获取队列
channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);
channel.basicQos(1);// 为1时表示处理完一个取一个,不加就是一次取多个
// 接收
channel.basicConsume(RabbitConstant.QUEUE_SMS,false, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2,进行发送:"+new String(body));
try {
Thread.sleep(100);// 延时0.1s
} catch (InterruptedException e) {
e.printStackTrace();
}
//签收消息,确认消息
//envelope.getDeliveryTag() 获取这个消息的TagId
//false只确认签收当前的消息,设置为true的时候则代表签收该消费者所有未签收的消息
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
}
}
- ConsumerThree类:发送短信3
package workqueue;
import com.rabbitmq.client.*;
import utils.RabbitConstant;
import utils.RabbitmqUtils;
import java.io.IOException;
public class ConsumerThree {
public static void main(String[] args) throws IOException {
// 获取mq连接
Connection connection = RabbitmqUtils.getConnection();
// 创建通道号
Channel channel = connection.createChannel();
// 获取队列
channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);
channel.basicQos(1);// 为1时表示处理完一个取一个,不加就是一次取多个
// 接收
channel.basicConsume(RabbitConstant.QUEUE_SMS,false, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者3,进行发送:"+new String(body));
try {
Thread.sleep(1000);// 延时1s
} catch (InterruptedException e) {
e.printStackTrace();
}
//签收消息,确认消息
//envelope.getDeliveryTag() 获取这个消息的TagId
//false只确认签收当前的消息,设置为true的时候则代表签收该消费者所有未签收的消息
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
}
}
- 运行效果(先运行Consumerxxx,再运行Producer)
- 和helloworld的区别
helloworld是一个发送一个接收(一个生产者,一个消费者),而workqueneu是一个发送多个接收(一个生产者,多个消费者),并且代码上没有多大区别,区别就是多写几个消费者类和消费者类中多了一个 channel.basicQos(1);,这个不是强制性添加,如果加上了,表示处理完后取一个,如果不加就是一次取多个再进行处理,显然前者更好,更合理使用消费者。
3.pub/sub订阅发布模式
该实例代码是模拟中国气象局提供气象数据给所有订阅的消费者,如百度,新浪等消费者,实现就是将中国气象局的数据接入到交换机中,然后百度,新浪等平台绑定交换机就可以获取中国气象局的数据。
- 首先,在http://localhost:15672/ 管理Web网页中,添加一个交换机,如下:
- 在helloworld项目中创建一个名为pubsub的包,并创建WeatherProducer,SinaSummer,BaiduSummer三个类,详细代码如下:
WeatherProducer类:生成者,中国气象局发布天气数据
package pubsub;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import utils.RabbitConstant;
import utils.RabbitmqUtils;
import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;
public class WeatherProducer {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitmqUtils.getConnection();
String input = new Scanner(System.in).next();
Channel channel = connection.createChannel();
channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER,"" , null , input.getBytes());
channel.close();
connection.close();
}
}
BaiduSummer类:百度消费者
package pubsub;
import com.rabbitmq.client.*;
import utils.RabbitConstant;
import utils.RabbitmqUtils;
import java.io.IOException;
public class BaiduSummer {
public static void main(String[] args) throws IOException {
Connection connection = RabbitmqUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(RabbitConstant.QUEUE_BAIDU, false, false, false, null);
//queueBind用于将队列与交换机绑定
//参数1:队列名 参数2:交互机名 参数三:路由key(暂时用不到)
channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER, "");
channel.basicQos(1);
channel.basicConsume(RabbitConstant.QUEUE_BAIDU , false , new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("百度收到气象信息:" + new String(body));
channel.basicAck(envelope.getDeliveryTag() , false);
}
});
}
}
SinaSummer类:新浪消费者
package pubsub;
import com.rabbitmq.client.*;
import utils.RabbitConstant;
import utils.RabbitmqUtils;
import java.io.IOException;
public class SinaSummer {
public static void main(String[] args) throws IOException {
Connection connection = RabbitmqUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(RabbitConstant.QUEUE_SINA, false, false, false, null);
//queueBind用于将队列与交换机绑定
//参数1:队列名 参数2:交互机名 参数三:路由key(暂时用不到)
channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER, "");
channel.basicQos(1);
channel.basicConsume(RabbitConstant.QUEUE_SINA , false , new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("新浪收到气象信息:" + new String(body));
channel.basicAck(envelope.getDeliveryTag() , false);
}
});
}
}
- 运行效果(先运行消费者,在运行生产者)
- 和workqueueu的区别
首先二者都是一个生成者多个消费者,不同的是后者需要在WEB网页上创建交换机,并且代码上订阅与发布多了一个queueBind绑定操作和生产者中的basicPublish()使用的是exchange,通过上面的配置就可以通过消费者根据绑定的交互机找到对应生成者交换机上的数据,也就是实现了生产者发布主题,消费者订阅主题,并收到订阅主题的数据。
4.Routing模式
Routing模式在订阅与发布的基础上进行扩展,增加了条件就是交换机会根据Routing Key的条件进行数据刷选再发给消费者队列。
- 创建交换机weather_routing
- 和上一个案例代码差不多,创建包和类:
- 详细代码
WeatherBureau类:生产者
package com.itlaoqi.rabbitmq.routing;
import com.itlaoqi.rabbitmq.utils.RabbitConstant;
import com.itlaoqi.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;
public class WeatherBureau {
public static void main(String[] args) throws IOException, TimeoutException {
Map area = new LinkedHashMap<String, String>();
area.put("china.hebei.shijiazhuang.20991011", "中国河北石家庄20991011天气数据");
area.put("china.shandong.qingdao.20991011", "中国山东青岛20991011天气数据");
area.put("china.henan.zhengzhou.20991011", "中国河南郑州20991011天气数据");
area.put("us.cal.la.20991011", "美国加州洛杉矶20991011天气数据");
area.put("china.hebei.shijiazhuang.20991012", "中国河北石家庄20991012天气数据");
area.put("china.shandong.qingdao.20991012", "中国山东青岛20991012天气数据");
area.put("china.henan.zhengzhou.20991012", "中国河南郑州20991012天气数据");
area.put("us.cal.la.20991012", "美国加州洛杉矶20991012天气数据");
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
Iterator<Map.Entry<String, String>> itr = area.entrySet().iterator();
while (itr.hasNext()) {
Map.Entry<String, String> me = itr.next();
//Routing key 第二个参数相当于数据筛选的条件
channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_ROUTING,me.getKey() , null , me.getValue().getBytes());
}
channel.close();
connection.close();
}
}
Sina类:新浪消费者
package com.itlaoqi.rabbitmq.routing;
import com.itlaoqi.rabbitmq.utils.RabbitConstant;
import com.itlaoqi.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Sina {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(RabbitConstant.QUEUE_SINA, false, false, false, null);
channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "us.cal.la.20991011");
channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.henan.zhengzhou.20991011");
channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "us.cal.la.20991012");
channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.henan.zhengzhou.20991012");
channel.basicQos(1);
channel.basicConsume(RabbitConstant.QUEUE_SINA , false , new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("新浪收到气象信息:" + new String(body));
channel.basicAck(envelope.getDeliveryTag() , false);
}
});
}
}
Baidu类:百度消费者
package com.itlaoqi.rabbitmq.routing;
import com.itlaoqi.rabbitmq.utils.RabbitConstant;
import com.itlaoqi.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Baidu {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(RabbitConstant.QUEUE_BAIDU, false, false, false, null);
//queueBind用于将队列与交换机绑定
//参数1:队列名 参数2:交互机名 参数三:路由key
channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.shandong.qingdao.20991011");
channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.shandong.qingdao.20991012");
channel.basicQos(1);
channel.basicConsume(RabbitConstant.QUEUE_BAIDU , false , new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("百度收到气象信息:" + new String(body));
channel.basicAck(envelope.getDeliveryTag() , false);
}
});
}
}
- 测试(先运行消费者,在运行生产者)
- 和订阅与发布的区别
首先是交换机的类型变为了direct,并且Routing中生成者的basicPublish()多了一个routingKey参数,在消费者中会根据queueBind的routingKey和生产者中的routingKey做比较,如果一致就接收,不一致不接收。
5.主题Topic模式
主题Topic模式是在Routing模式下再一次进行扩展,和Routing不同的是,后者支持模糊查询,通过通配符的方式,*表示任意一个字符,#表示任意多个字符。
- 在WEB管理上创建Topic的交换机
- 项目和Routing差不做,直接说不同点
消费者代码中的queueBind()里的routingKey采用通配符
- 运行效果(先执行消费者,在执行生产者)
四.RabbitMQ消息确认机制
通过RabbitMQ消息确认机制可以知道生产者和代理人(Broker)之间发送数据的情况,有二种情况,情况一Confirm表示消息送到Broker了,如果为ack表示Broker接收,为nack表示没有接收。情况二Return表示消息被Broker正常接收(ack)后,但Broker没有对应的队列进行投递时产生的状态,消息被退回给生产者。
- 实例代码,那上面的例子为例,由于消息确认机制是生产者和Broker之间的事情,所以不用管消费者,所以只需要在生产者中添加代码,详细代码如下。
package confirm;
import com.rabbitmq.client.*;
import utils.RabbitConstant;
import utils.RabbitmqUtils;
import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
public class WeatherBureau {
public static void main(String[] args) throws IOException, TimeoutException {
Map area = new LinkedHashMap<String, String>();
area.put("china.hebei.shijiazhuang.20991011", "中国河北石家庄20991011天气数据");
area.put("china.shandong.qingdao.20991011", "中国山东青岛20991011天气数据");
area.put("china.henan.zhengzhou.20991011", "中国河南郑州20991011天气数据");
area.put("us.cal.la.20991011", "美国加州洛杉矶20991011天气数据");
area.put("china.hebei.shijiazhuang.20991012", "中国河北石家庄20991012天气数据");
area.put("china.shandong.qingdao.20991012", "中国山东青岛20991012天气数据");
area.put("china.henan.zhengzhou.20991012", "中国河南郑州20991012天气数据");
area.put("us.cal.la.20991012", "美国加州洛杉矶20991012天气数据");
Connection connection = RabbitmqUtils.getConnection();
Channel channel = connection.createChannel();
//开启confirm监听模式
channel.confirmSelect();
// 进行监听
channel.addConfirmListener(new ConfirmListener() {
public void handleAck(long l, boolean b) throws IOException {
//第二个参数代表接收的数据是否为批量接收,一般我们用不到。
System.out.println("消息已被Broker接收,Tag:" + l);
}
public void handleNack(long l, boolean b) throws IOException {
System.out.println("消息已被Broker拒收,Tag:" + l);
}
});
channel.addReturnListener(new ReturnCallback() {
public void handle(Return r) {
System.err.println("===========================");
System.err.println("Return编码:" + r.getReplyCode() + "-Return描述:" + r.getReplyText());
System.err.println("交换机:" + r.getExchange() + "-路由key:" + r.getRoutingKey() );
System.err.println("Return主题:" + new String(r.getBody()));
System.err.println("===========================");
}
});
Iterator<Map.Entry<String, String>> itr = area.entrySet().iterator();
while (itr.hasNext()) {
Map.Entry<String, String> me = itr.next();
//Routing key 第二个参数相当于数据筛选的条件
//第三个参数为:mandatory true代表如果消息无法正常投递则return回生产者,如果false,则直接将消息放弃。
channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_TOPIC,me.getKey() ,true, null , me.getValue().getBytes());
}
/*channel.close();
connection.close();*/
}
}
- 不同处
- 运行效果
五.源代码下载
在我的微信公众号后台回复 rabbitmq
就可以获取本篇博文相关的源代码了,如果有什么疑问后台给为留言,我看见会第一时间回复你的。