目录
一、拉取运行镜像
1.1、拉取镜像环境
1.2、运行镜像
二、工作模式
2.1、消息的发送者
2.2、消息的接收者
2.3、生产队列模式
2.3.1、消息的发送者
2.3.2、消息的接收者
2.4、发布订阅模式
2.4.1、消息的发送者
2.4.2、消息的接收者
2.5、路由模式
2.5.1、消息的发布者
2.5.2、消息的接收者
2.6、主题模式
2.6.1、消息的发布者
2.6.2、消息的接收者
一、拉取运行镜像
1.1、拉取镜像环境
docker pull rabbitmq:3.13-management
1.2、运行镜像
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 -v rabbitmq-plugin:/plugins -e RABBITMQ_DEFAULT_USER=guest -e RABBITMQ_DEFAULT_PASS=123456 rabbitmq:3.13-management
测试访问:
二、工作模式
引入pom依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.20.0</version>
</dependency>
2.1、消息的发送者
public class Producer {
public static void main(String[] args) {
//创建链接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置主机地址
connectionFactory.setHost("192.168.200.110");
//连接端口号
connectionFactory.setPort(5672);
//虚拟机主机名称默认 /
connectionFactory.setVirtualHost("/");
//连接用户名
connectionFactory.setUsername("guest");
//连接用户密码
connectionFactory.setPassword("123456");
Connection connection = null;
Channel channel = null;
try {
//创建连接
connection = connectionFactory.newConnection();
//创建连接频道
channel = connection.createChannel();
//申明队列
//String var1, 队列名称
// boolean var2, 是否持久化队列
// boolean var3, 是否独占本次连接,独占只能有一个消费者
// boolean var4, 是否不使用时删除队列
// Map<String, Object> var5 队列其他参数
channel.queueDeclare("simple_quest",true,false,false,null);
//发送的队列消息
String message = "你好,沸羊羊";
//String var1, 交换机名称,没有使用默认
// String var2, 路由key,简单模式可以传递队列
// AMQP.BasicProperties var3, 配置信息
// byte[] var4 消息内存
channel.basicPublish("","simple_quest",null,message.getBytes());
System.out.println("发送已完成:" + message);
} catch (IOException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}finally {
//关闭资源
try {
channel.close();
connection.close();
} catch (IOException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
}
}
}
2.2、消息的接收者
public class Consumer {
public static void main(String[] args) {
//创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置主机地址
connectionFactory.setHost("192.168.200.110");
//连接端口号
connectionFactory.setPort(5672);
//虚拟机主机名称默认 /
connectionFactory.setVirtualHost("/");
//连接用户名
connectionFactory.setUsername("guest");
//连接用户密码
connectionFactory.setPassword("123456");
Connection connection = null;
Channel channel = null;
try {
//创建连接
connection = connectionFactory.newConnection();
//创建连接频道
channel = connection.createChannel();
//申明队列
//String var1, 队列名称
// boolean var2, 是否持久化队列
// boolean var3, 是否独占本次连接,独占只能有一个消费者
// boolean var4, 是否不使用时删除队列
// Map<String, Object> var5 队列其他参数
//channel.queueDeclare("simple_quest",true,false,false,null);
//接收消息
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
@Override
//String consumerTag,标识
// Envelope envelope, 获取某些信息
// AMQP.BasicProperties properties, 配置信息
// byte[] body 数据
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag:" + consumerTag);
System.out.println("Exchange:" + envelope.getExchange());
System.out.println("RoutingKey:" + envelope.getRoutingKey());
System.out.println("properties:" + properties);
System.out.println("body:" + new String(body));
}
};
//String var1, 队列名称
// boolean var2, 是否自动确认
// Consumer var3 回调对象
//消费者类似监听程序,主要用来监听对象
channel.basicConsume("simple_quest",true,defaultConsumer);
} catch (IOException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}finally {
//关闭资源
try {
channel.close();
connection.close();
} catch (IOException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
}
}
}
2.3、生产队列模式
2.3.1、消息的发送者
public class Producer {
public static void main(String[] args) {
//创建链接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置主机地址
connectionFactory.setHost("192.168.200.110");
//连接端口号
connectionFactory.setPort(5672);
//虚拟机主机名称默认 /
connectionFactory.setVirtualHost("/");
//连接用户名
connectionFactory.setUsername("guest");
//连接用户密码
connectionFactory.setPassword("123456");
Connection connection = null;
Channel channel = null;
try {
connection = connectionFactory.newConnection();
channel = connection.createChannel();
channel.queueDeclare("work_quest",true,false,false,null);
for (int i = 1; i <= 10 ; i++) {
String message = "你好,美羊羊" + i;
channel.basicPublish("","work_quest",null,message.getBytes());
}
} catch (IOException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}finally {
//关闭资源
try {
channel.close();
connection.close();
} catch (IOException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
}
}
}
2.3.2、消息的接收者
public class Consumer1 {
public static void main(String[] args) {
//创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置主机地址
connectionFactory.setHost("192.168.200.110");
//连接端口号
connectionFactory.setPort(5672);
//虚拟机主机名称默认 /
connectionFactory.setVirtualHost("/");
//连接用户名
connectionFactory.setUsername("guest");
//连接用户密码
connectionFactory.setPassword("123456");
Connection connection = null;
Channel channel = null;
try {
//创建连接
connection = connectionFactory.newConnection();
//创建连接频道
channel = connection.createChannel();
//申明队列
//String var1, 队列名称
// boolean var2, 是否持久化队列
// boolean var3, 是否独占本次连接,独占只能有一个消费者
// boolean var4, 是否不使用时删除队列
// Map<String, Object> var5 队列其他参数
//channel.queueDeclare("simple_quest",true,false,false,null);
//接收消息
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
@Override
//String consumerTag,标识
// Envelope envelope, 获取某些信息
// AMQP.BasicProperties properties, 配置信息
// byte[] body 数据
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag:" + consumerTag);
System.out.println("Exchange:" + envelope.getExchange());
System.out.println("RoutingKey:" + envelope.getRoutingKey());
System.out.println("properties:" + properties);
System.out.println("body:" + new String(body));
}
};
//String var1, 队列名称
// boolean var2, 是否自动确认
// Consumer var3 回调对象
//消费者类似监听程序,主要用来监听对象
channel.basicConsume("work_quest",true,defaultConsumer);
} catch (IOException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
}
}
在复制拷贝一个消费者2
两个消费者会轮流拿到消息
2.4、发布订阅模式
2.4.1、消息的发送者
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//创建链接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置主机地址
connectionFactory.setHost("192.168.200.110");
//连接端口号
connectionFactory.setPort(5672);
//虚拟机主机名称默认 /
connectionFactory.setVirtualHost("/");
//连接用户名
connectionFactory.setUsername("guest");
//连接用户密码
connectionFactory.setPassword("123456");
Connection connection = connectionFactory.newConnection();
Channel channel= connection.createChannel();
//String var1,交换机名称
// BuiltinExchangeType var2, 交换机类型
// boolean var3, 是否持久化
// boolean var4, 自动删除
// boolean var5, 内部使用
// Map<String, Object> var6 其他参数
channel.exchangeDeclare("fanout_guest", BuiltinExchangeType.FANOUT,true,false,false,null);
//创建队列
channel.queueDeclare("queue_guest1",true,false,false,null);
channel.queueDeclare("queue_guest2",true,false,false,null);
//绑定队列和交换机
//String var1, 队列名称
// String var2, 交换机名称
// String var3 路由键,绑定规则
channel.queueBind("queue_guest1","fanout_guest","");
channel.queueBind("queue_guest2","fanout_guest","");
String message = "已成功发送消息";
//发送消息
channel.basicPublish("fanout_guest","",null,message.getBytes());
//关闭资源
channel.close();
connection.close();
}
}
2.4.2、消息的接收者
public class Consumer1 {
public static void main(String[] args) {
//创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置主机地址
connectionFactory.setHost("192.168.200.110");
//连接端口号
connectionFactory.setPort(5672);
//虚拟机主机名称默认 /
connectionFactory.setVirtualHost("/");
//连接用户名
connectionFactory.setUsername("guest");
//连接用户密码
connectionFactory.setPassword("123456");
Connection connection = null;
Channel channel = null;
try {
//创建连接
connection = connectionFactory.newConnection();
//创建连接频道
channel = connection.createChannel();
//申明队列
//String var1, 队列名称
// boolean var2, 是否持久化队列
// boolean var3, 是否独占本次连接,独占只能有一个消费者
// boolean var4, 是否不使用时删除队列
// Map<String, Object> var5 队列其他参数
//channel.queueDeclare("simple_quest",true,false,false,null);
//接收消息
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
@Override
//String consumerTag,标识
// Envelope envelope, 获取某些信息
// AMQP.BasicProperties properties, 配置信息
// byte[] body 数据
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:" + new String(body));
}
};
//String var1, 队列名称
// boolean var2, 是否自动确认
// Consumer var3 回调对象
//消费者类似监听程序,主要用来监听对象
channel.basicConsume("queue_guest1",true,defaultConsumer);
} catch (IOException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
}
}
2.5、路由模式
2.5.1、消息的发布者
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//创建链接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置主机地址
connectionFactory.setHost("192.168.200.110");
//连接端口号
connectionFactory.setPort(5672);
//虚拟机主机名称默认 /
connectionFactory.setVirtualHost("/");
//连接用户名
connectionFactory.setUsername("guest");
//连接用户密码
connectionFactory.setPassword("123456");
Connection connection = connectionFactory.newConnection();
Channel channel= connection.createChannel();
//String var1,交换机名称
// BuiltinExchangeType var2, 交换机类型
// boolean var3, 是否持久化
// boolean var4, 自动删除
// boolean var5, 内部使用
// Map<String, Object> var6 其他参数
channel.exchangeDeclare("test_routing", BuiltinExchangeType.DIRECT,true,false,false,null);
//创建队列
channel.queueDeclare("queue_routing1",true,false,false,null);
channel.queueDeclare("queue_routing2",true,false,false,null);
//绑定队列和交换机
//String var1, 队列名称
// String var2, 交换机名称
// String var3 路由键,绑定规则
channel.queueBind("queue_routing1","test_routing","error");
channel.queueBind("queue_routing2","test_routing","info");
String message = "已成功发送消息";
//发送消息
channel.basicPublish("test_routing","error",null,message.getBytes());
//关闭资源
channel.close();
connection.close();
}
}
2.5.2、消息的接收者
public class Consumer1 {
public static void main(String[] args) {
//创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置主机地址
connectionFactory.setHost("192.168.200.110");
//连接端口号
connectionFactory.setPort(5672);
//虚拟机主机名称默认 /
connectionFactory.setVirtualHost("/");
//连接用户名
connectionFactory.setUsername("guest");
//连接用户密码
connectionFactory.setPassword("123456");
Connection connection = null;
Channel channel = null;
try {
//创建连接
connection = connectionFactory.newConnection();
//创建连接频道
channel = connection.createChannel();
//申明队列
//String var1, 队列名称
// boolean var2, 是否持久化队列
// boolean var3, 是否独占本次连接,独占只能有一个消费者
// boolean var4, 是否不使用时删除队列
// Map<String, Object> var5 队列其他参数
channel.queueDeclare("queue_routing1",true,false,false,null);
//接收消息
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
@Override
//String consumerTag,标识
// Envelope envelope, 获取某些信息
// AMQP.BasicProperties properties, 配置信息
// byte[] body 数据
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:" + new String(body));
}
};
//String var1, 队列名称
// boolean var2, 是否自动确认
// Consumer var3 回调对象
//消费者类似监听程序,主要用来监听对象
channel.basicConsume("queue_routing1",true,defaultConsumer);
} catch (IOException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
}
}
2.6、主题模式
2.6.1、消息的发布者
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//创建链接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置主机地址
connectionFactory.setHost("192.168.200.110");
//连接端口号
connectionFactory.setPort(5672);
//虚拟机主机名称默认 /
connectionFactory.setVirtualHost("/");
//连接用户名
connectionFactory.setUsername("guest");
//连接用户密码
connectionFactory.setPassword("123456");
Connection connection = connectionFactory.newConnection();
Channel channel= connection.createChannel();
//String var1,交换机名称
// BuiltinExchangeType var2, 交换机类型
// boolean var3, 是否持久化
// boolean var4, 自动删除
// boolean var5, 内部使用
// Map<String, Object> var6 其他参数
channel.exchangeDeclare("test_topic", BuiltinExchangeType.TOPIC,true,false,false,null);
//创建队列
channel.queueDeclare("queue_topic1",true,false,false,null);
channel.queueDeclare("queue_topic2",true,false,false,null);
//绑定队列和交换机
//String var1, 队列名称
// String var2, 交换机名称
// String var3 路由键,绑定规则
channel.queueBind("queue_topic1","test_topic","%.error");
channel.queueBind("queue_topic1","test_topic","*.*");
String message = "已成功发送消息";
//发送消息
channel.basicPublish("test_topic","info.error",null,message.getBytes());
//关闭资源
channel.close();
connection.close();
}
}
2.6.2、消息的接收者
public class Consumer1 {
public static void main(String[] args) {
//创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置主机地址
connectionFactory.setHost("192.168.200.110");
//连接端口号
connectionFactory.setPort(5672);
//虚拟机主机名称默认 /
connectionFactory.setVirtualHost("/");
//连接用户名
connectionFactory.setUsername("guest");
//连接用户密码
connectionFactory.setPassword("123456");
Connection connection = null;
Channel channel = null;
try {
//创建连接
connection = connectionFactory.newConnection();
//创建连接频道
channel = connection.createChannel();
//申明队列
//String var1, 队列名称
// boolean var2, 是否持久化队列
// boolean var3, 是否独占本次连接,独占只能有一个消费者
// boolean var4, 是否不使用时删除队列
// Map<String, Object> var5 队列其他参数
channel.queueDeclare("queue_topic1",true,false,false,null);
//接收消息
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
@Override
//String consumerTag,标识
// Envelope envelope, 获取某些信息
// AMQP.BasicProperties properties, 配置信息
// byte[] body 数据
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:" + new String(body));
}
};
//String var1, 队列名称
// boolean var2, 是否自动确认
// Consumer var3 回调对象
//消费者类似监听程序,主要用来监听对象
channel.basicConsume("queue_topic1",true,defaultConsumer);
} catch (IOException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
}
}