目录
一、项目创建
二、生产者
三、消费者
一、项目创建
我们先在idea里创建两个Maven项目一个项目作为生产者,另一个作为消费者。创建完成后,在各自的pom.xml文件里引入Java使用RabbitMQ的依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.14.2</version>
</dependency>
然后在对应的项目里创建 消费者类与生产者类
二、生产者
由之前文章里我们了解到RabbitMQ的通信如下图,我们要想将生产者生产的消息存入队列,我们就一个先获得Connection(连接)然后通过连接获取到channel,然后选择虚拟机交换机以及队列等最后关闭连接【RabbitMQ】RabbitMQ的简介_1373i的博客-CSDN博客https://blog.csdn.net/qq_61903414/article/details/130139970?spm=1001.2014.3001.5501
我们获取连接是通过连接工厂进行获取的所以此时我们先要去创建连接工厂并给他配置相应的信息
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.获取配置连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setVirtualHost("/DemoVirtualHost");
factory.setUsername("guest");
factory.setPassword("guest");
}
}
此时我们可以通过连接工厂获取到连接connection,然后获取到channel
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.获取配置连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setVirtualHost("/DemoVirtualHost");
factory.setUsername("guest");
factory.setPassword("guest");
// 2.建立连接
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
}
}
此时我们需要去在MQ里创建队列,通过channel里的queueDelclare方法来创建该方法有以下参数
参数 | 说明 |
queue | 要创建队列的名称 |
durable | 该队列里的消息是否持久化 |
exclusive | 1》是否只允许一个消费者监听消费2》当连接关闭时是否销毁该队列 |
autoDelete | 没有consumer是否自动删除 |
arguments | 在后续文章里会详细讲到 |
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.获取配置连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setVirtualHost("/DemoVirtualHost");
factory.setUsername("guest");
factory.setPassword("guest");
// 2.建立连接
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 3.创建队列
channel.queueDeclare("EmailQueue",true,false,false,null);
}
}
此时我们就可以通过channel里的basicPublish方法进行发送消息,该方法有以下参数
参数 | 说明 |
exchange | 交换机(后续详细讲到,此处使用默认交换机) |
routingKey | 路由名称(后续讲到,由于此处使用默认交换机所以路由名称为队列名) |
props | 相关的配置信息 |
body | 要发送的消息 |
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.获取配置连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setVirtualHost("/DemoVirtualHost");
factory.setUsername("guest");
factory.setPassword("guest");
// 2.建立连接
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 3.创建队列
channel.queueDeclare("EmailQueue",true,false,false,null);
// 4.发送消息
channel.basicPublish("","EmailQueue",null,"hello mq".getBytes());
// 5.关闭连接
channel.close();
connection.close();
}
}
发送完成后关闭连接即可,运行程序后此时mq队列里存在了一条消息
三、消费者
在实现消费者时,我们也需要先建立连接所以前个步骤与生产者相同
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.获取配置连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setVirtualHost("/DemoVirtualHost");
factory.setUsername("guest");
factory.setPassword("guest");
// 2.建立连接
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
}
}
此时建立连接后我们通过basicConsume该方法进行获取消息,参数如下
参数 | 说明 |
queue | 从哪个队列获取消息 |
autoAck | 获取到信息后是否自动给MQ服务器发送确认收到 |
callback | 收到消息后的回调对象,需要手动实现 |
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.获取配置连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setVirtualHost("/DemoVirtualHost");
factory.setUsername("guest");
factory.setPassword("guest");
// 2.建立连接
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 3.获取消息
channel.basicConsume("EmailQueue",true,consumer);
}
}
我们此时还需要去手动实现回调对象,通过匿名内部类实现回调对象的回调方法
import com.rabbitmq.client.*;
import com.rabbitmq.client.Consumer;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.获取配置连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setVirtualHost("/DemoVirtualHost");
factory.setUsername("guest");
factory.setPassword("guest");
// 2.建立连接
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 3.获取消息
Consumer consumer = new DefaultConsumer(channel) {
/**
* 回调方法 收到信息后自动执行该方法
* @param consumerTag 消息唯一标识
* @param envelope 获取信息
* @param properties 获取配置信息
* @param body 消息
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag :" + consumerTag);
System.out.println("exchange :" + envelope.getExchange());
System.out.println("routingKey ;" + envelope.getRoutingKey());
System.out.println("properties :"+ properties);
System.out.println("consumer 消费消息 :" + new String(body));
}
};
channel.basicConsume("EmailQueue",true,consumer);
}
}
运行代码此时就可以获得存入的消息