消息模式 Simple(简单)模式
前提,开放5672:RabbitMQ的通讯端口,及查看创建用户的权限
构建maven工程
导入依赖
- 依赖下载地址: https://mvnrepository.com/artifact/com.rabbitmq/amqp-client
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.16.0</version>
</dependency>
启动服务
systemctl start rabbitmq-server
定义生产者
package com.cn.simple;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* 简单队列 生产者
*/
public class Producer {
public static void main(String[] args) {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设置工厂属性
factory.setHost("请填写自己的ip地址");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
factory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
//3.从连接工厂中获取连接
connection = factory.newConnection("生产者1");
//4.从连接中获取通道
channel = connection.createChannel();
//5.申请队列存储信息
/*
* 如果队列不存在,则会创建
* Rabbitmq不允许创建两个相同的队列名称,否则会报错。
*
* @params1: queue 队列的名称
* @params2: durable 队列是否持久化
* @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
* @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
* @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
*/
channel.queueDeclare("queue01", false ,false,false, null);
//6.准备发送消息的内容
String message = "hello,rabbitmq!";
// 7: 发送消息给中间件rabbitmq-server
/*
* @params1: 交换机exchange
* @params2: 队列名称
* @params3: 属性配置
* @params4: 发送消息的内容
*/
channel.basicPublish("", "queue01", null, message.getBytes());
System.out.println("消息发送成功!");
} catch (Exception e) {
e.printStackTrace();
System.out.println("发送消息出现异常...");
} finally {
// 8: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
-
可以看到控制台发送消息成功,我们访问图形化管理界面如下:
-
同样我们在队列的详情信息中也可以看到消息队列中的消息情况
- 队列在不指定交换机的情况下会绑定一个系统默认的交换机
定义消费者
package com.cn.simple;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.concurrent.TimeoutException;
/**
* 消费者
*/
public class Consumer {
public static void main(String[] args) {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设置工厂属性
factory.setHost("填写自己的ip地址");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
factory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
//3.从连接工厂中获取连接
connection = factory.newConnection("生产者1");
//4.从连接中获取通道
channel = connection.createChannel();
//5.接收消息
channel.basicConsume("queue01", true, new DeliverCallback() {
public void handle(String s, Delivery delivery) throws IOException {
System.out.println("收到消息是:" + new String(delivery.getBody(), Charset.defaultCharset()));
}
}, new CancelCallback() {
public void handle(String s) throws IOException {
System.out.println("接收消息失败了...");
}
});
System.out.println("开始接收消息");
System.in.read();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 6: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
- 可以看到队列中消息已经被消费,消息总数为0
核心组成部分
Server:
又称Broker ,接受客户端的连接,实现AMQP实体服务。 安装rabbitmq-server
Connection:
连接,应用程序与Broker的网络连接 TCP/IP/ 三次握手和四次挥手
Channel:
网络信道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道,客户端可以建立对各Channel,每个Channel代表一个会话任务
Message :
消息:服务与应用程序之间传送的数据,由Properties和body组成,Properties可是对消息进行修饰,比如消息的优先级,延迟等高级特性,Body则就是消息体的内容
Virtual Host
虚拟地址,用于进行逻辑隔离,最上层的消息路由,一个虚拟主机理由可以有若干个Exhange和Queueu,同一个虚拟主机里面不能有相同名字的Exchange
Exchange:
交换机,接受消息,根据路由键发送消息到绑定的队列
Bindings:
Exchange和Queue之间的虚拟连接,binding中可以保护多个routing key
Routing key:
是一个路由规则,虚拟机可以用它来确定如何路由一个特定消息
Queue:
Message Queue,消息队列,保存消息并将它们转发给消费者