1 java操作消息队列
1.1 java实现生产者
新建一个springboot项目,导入依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.14.2</version>
</dependency>
导入依赖后,实现生产者和消费者,首先是生产者,生产者负责将消息发送到消息队列
public static void main(String[] args) {
//使用ConnectionFactory来创建连接
ConnectionFactory factory = new ConnectionFactory();
//设定连接信息,基操
factory.setHost("8.130.172.119");
factory.setPort(5672); //注意这里写5672,是amqp协议端口
factory.setUsername("admin");
factory.setPassword("password");
factory.setVirtualHost("/test");
//创建连接
try(Connection connection = factory.newConnection()){
}catch (Exception e){
e.printStackTrace();
}
}
直接在程序种定义并创建消息队列,客户端需要通过连接(connection)创建一个新的通道(Channel),同一个连接下可以很多个通道,这样就不用创建很多个连接也能支持分开发送。
try(Connection connection = factory.newConnection();
Channel channel = connection.createChannel()){ //通过Connection创建新的Channel
//声明队列,如果此队列不存在,会自动创建
channel.queueDeclare("yyds", false, false, false, null);
//将队列绑定到交换机
channel.queueBind("yyds", "amq.direct", "my-yyds");
//发布新的消息,注意消息需要转换为byte[]
channel.basicPublish("amq.direct", "my-yyds", null, "Hello World!".getBytes());
}catch (Exception e){
e.printStackTrace();
}
其中queeuDeclare方法的参数如下:
- queue: 队列的名称 (默认创建后routingKey和队列名称一致)
- durable: 是否持久化。
- exclusive: 是否排他,如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。排他队列是基于Connection可见,同-个Connection的不同Channel是可以同时访问同-个连接创建的排他队列,并且,如果一个Connection已经声明了一个排他队列,其他的Connection是不允许建立同名的排他队列的,即使该队列是持久化的,一旦Connection关闭或者客户端退出,该排他队列都会自动被删除。
- autoDelete: 是否自动删除
- arguments:设置队列的其他一些参数,这里我们暂时不需要什么其他参数
其中queueBind方法的参数如下:
- queue: 需要绑定的队列名称
- exchange: 需要绑定的交换机名称。
- routingKey:
其中basicPublic方法的参数如下:
- exchange: 对应的Exchange名称,我们这里就使用第二个直连交换机。
- routingKey: 这里我们填写绑定时指定的routingKey,其实和之前在管理页面操作一样
- props: 其他的配置。
- body: 消息本体
当前队列状态
运行测试类
出现新的队列
点击队列名称进入详情
获取到java发送的消息
1.2 java实现消费者
直接从指定的队列去取出数据,不需要再管交换机,但是还是通过connection去创建channel。消费者代码如下
package com.example.rbbitmqtest;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public static void main(String[] args) throws IOException, TimeoutException {
//使用ConnectionFactory来创建连接
ConnectionFactory factory = new ConnectionFactory();
//设定连接信息,基操
factory.setHost("8.130.172.119");
factory.setPort(5672); //注意这里写5672,是amqp协议端口
factory.setUsername("admin");
factory.setPassword("password");
factory.setVirtualHost("/test");
//这里不使用try-with-resource,因为消费者是一直等待新的消息到来,然后按照
//我们设定的逻辑进行处理,所以这里不能在定义完成之后就关闭连接
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//创建一个基本的消费者
channel.basicConsume("yyds", false, (s, delivery) -> {
System.out.println(new String(delivery.getBody()));
//basicAck是确认应答,第一个参数是当前的消息标签,后面的参数是
//是否批量处理消息队列中所有的消息,如果为false表示只处理当前消息
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
//basicNack是拒绝应答,最后一个参数表示是否将当前消息放回队列,如果
//为false,那么消息就会被丢弃
//channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);
//跟上面一样,最后一个参数为false,只不过这里省了
//channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
}, s -> {});
}
当创建完basicConsume后会一直等待消息,不会自动关闭。
其中basucConsume方法参数如下:
- queue - 消息队列名称,直接指定。
- autoAck今自动应答,消费者从消息队列取出数据后,需要跟服务器进行确认应答,当服务器收到确认后,会自动将消息删除,如果开启自动应答,那么消息发出后会直接删除。
- deliver - 消息接收后的函数回调,我们可以在回调中对消息进行处理,处理完成后,需要给服务器确认应答
- cancel - 当消费者取消订阅时进行的函数回调,这里暂时用不到。
其中第二个参数为false时,需要手动调用ack的四种方式,若为true,则会默认为ack中的第二个方式,即拿到消息后就把消息消耗掉队列就不存在。
测试channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false,false);
其中第一个false代表是否把后边消息全部处理了,false代表不处理
第二个false代表是否丢回消息队列,若为true,得到消息后还会把消息返回给消息队列
此时消息队列没有消息了
在界面把队列删除
以上为RabbitMQ的简单使用,通过java连接到服务器。
2 SpringBoot整合消息队列客户端
新建一个springboot项目
导入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
修改配置文件
创建配置类
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfiguration {
@Bean("directExchange") //定义交换机Bean,可以很多个
public Exchange exchange(){
return ExchangeBuilder.directExchange("amq.direct").build();
}
@Bean("yydsQueue") //定义消息队列
public Queue queue(){
return QueueBuilder
.nonDurable("yyds") //非持久化类型
.build();
}
@Bean("binding")
public Binding binding(@Qualifier("directExchange") Exchange exchange,
@Qualifier("yydsQueue") Queue queue){
//将我们刚刚定义的交换机和队列进行绑定
return BindingBuilder
.bind(queue) //绑定队列
.to(exchange) //到交换机
.with("my-yyds") //使用自定义的routingKey
.noargs();
}
}
2.1 创建一个生产者
当前的管理端队列为空
直接在测试类中:
@SpringBootTest
class SpringCloudMqApplicationTests {
//RabbitTemplate为我们封装了大量的RabbitMQ操作,已经由Starter提供,因此直接注入使用即可
@Resource
RabbitTemplate template;
@Test
void publisher() {
//使用convertAndSend方法一步到位,参数基本和之前是一样的
//最后一个消息本体可以是Object类型,非常方便
template.convertAndSend("amq.direct", "my-yyds", "Hello World11!");
}
}
运行测试类
新的消息队列
查看详情
取出消息
2.2 创建消费者
因为消费者实际上就是一直等待消息然后处理的角色,因此只需要创建一个监听器就行了,它会一直等待消息到来然后在进行处理:
@Component //注册为Bean
public class TestListener {
@RabbitListener(queues = "yyds") //定义此方法为队列yyds的监听器,一旦监听到新的消息,就会接受并处理
public void test(Message message){
System.out.println(new String(message.getBody()));
}
}
修改pom文件,改变依赖
修改依赖改为web方式后会一直启动,这样监听器也会一直监听
启动后可以看到队列的消息即被监听到了。
测试监听效果
控制台不用重启可以监听到消息
若需要确保消息能够被消费者接收并处理,然后得到消费者的反馈,也可以,修改测试类中生产者测试代码:
@Test
void publisher() {
//会等待消费者消费然后返回响应结果
Object res = template.convertSendAndReceive("amq.direct", "my-yyds", "Hello World!");
System.out.println("收到消费者响应:"+res);
}
修改消费者监听代码
@RabbitListener(queues = "yyds") //定义此方法为队列yyds的监听器,一旦监听到新的消息,就会接受并处理
public String test(String message){
System.out.println(message);
return "响应成功";
}
重启application,然后运行生产者发送消息测试类.
如果说需要直接接收一个JSON格式消息,并希望得到实体类。
创建用于JSON转换的Bean,这样就可以接收的JSON格式转化为实体类对象,发送的时候也是以JSON格式发送
消费者指定转换器,修改接收对象。然后重启application服务
在rabbitmq网页管理端发送json数据{"id":1,"name":"LB"}
接收成功
测试类发送实体类格式信息
接收到消息
这样就实现了Springboot操作rabbitmq消息队列。