1.工作队列
工作机制类似一个生产者,多个消费者。工作队列采用轮训的机制,即工作线程一次只能处理一个消息,轮流处理
公共方法
public class MqUtiles {
public static final String QUEUE_NAME="hello";
public static Channel function() throws IOException, TimeoutException {
ConnectionFactory factory=new ConnectionFactory();
//工厂ip连接rabbitmq的队列
factory.setHost("192.168.187.132");
factory.setUsername("admin");
factory.setPassword("admin");
//创建连接
Connection connection = factory.newConnection();
Channel channel=connection.createChannel();
return channel;
}
}
生产者
public class Provider {
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = MqUtiles.function();
channel.queueDeclare(MqUtiles.QUEUE_NAME,false,false,false,null);
Scanner scanner=new Scanner(System.in);
while (scanner.hasNext())
{
String msg = scanner.nextLine();
channel.basicPublish("",MqUtiles.QUEUE_NAME,null,msg.getBytes());
}
}
}
消费者
public class User {
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = MqUtiles.function();
//申明接收消息
DeliverCallback deliverCallback=(consumerTag, message)->
{
System.out.println(new String(message.getBody()));
};
CancelCallback cancelCallback= consumerTag->
{
System.out.println("消息被中断");
};
/**
* 消费者消费消息
* 1.消费哪个队列
* 2.消费成功后,是否需要自动应答,true表示自动应答
* 3.消费未成功的回调
* 4.消费者取消消费的回调
*/
System.out.println("请求B......");
channel.basicConsume(MqUtiles.QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
启用多个线程
队列持久化
channel.queueDeclare(队列名,队列是否持久化,是否共享,是否自动删除,其他参数)
//队列持久化,即mq重启后,队列还是存在
注意:队列持久化不等于消息持久化
2.消息应答
自动应答
1.在高吞吐量与数据安全性的方面进行权衡
2.这种模式追求的是一个吞吐量以及高速率处理信息,消费者接到消息后,mq就将信息删除,数据可能未完全读取,出现消息丢失
手动应答推荐
作用:消费者处理完信息后,给mq回复,mq就可以将该消息删除,避免消费者服务异常,导致消息未完全处理,而mq就将消息删除,导致消息丢失
手动消息应答
channel.basicAck(deliverTag,true)//第二个参数就是是否设置批量应答
//一个信道可能有多个数据,批量应答能回复此信道的消息,
//虽然能提高速率,解决拥堵问题,但是可能会造成数据丢失,所以尽量不要批量应答
2.1消息入队
概念:mq里面的消息发送到消费者后,未被正常消费或消费未完成,消费者机器就断开,消费者正在处理的消息会被从新排列到mq中,又另一台机器继续处理
变化1
//消费者
channel.basicConsume(MqUtiles.QUEUE_NAME,false,deliverCallback,cancelCallback);
//参数2为是否自动应答,true为自动
变化2
//申明接收消息
DeliverCallback deliverCallback=(consumerTag, message)->
{
try {
Thread.sleep(10*1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
//手动应答,需要确定是信道中的哪一条信息,即message.getEnvelope().getDeliveryTag(),相当于寻找下标,参数2为是否批量应答
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
System.out.println(new String(message.getBody()));
};
2.2消息持久化
队列持久化!=消息持久化
channel.basicPublish("",队列名,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes)
//参数三为消息持久化操作,消息持久化的前提是队列持久化
3.不公平分发
概念:即信道取消轮训分发,采用能者多劳
channel.basicQos(1)
//默认情况是0,0表示轮训分发,1表示不公平分发
//注意:当方法中出现其他数字,就代表预取值,即将一个信道内的消息按预取值分发给消费者
作用:可以提高消息处理的效率,充分利用空闲的消费者服务