1.工作队列
工作机制类似一个生产者,多个消费者。工作队列采用轮训的机制,即工作线程一次只能处理一个消息,轮流处理
公共方法
public class MqUtiles {
public static final String QUEUE_NAME="hello";
public static Channel function() throws IOException, TimeoutException {
ConnectionFactory factory=new ConnectionFactory();
//工厂ip连接rabbitmq的队列
factory.setHost("192.168.187.132");
factory.setUsername("admin");
factory.setPassword("admin");
//创建连接
Connection connection = factory.newConnection();
Channel channel=connection.createChannel();
return channel;
}
}
生产者
public class Provider {
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = MqUtiles.function();
channel.queueDeclare(MqUtiles.QUEUE_NAME,false,false,false,null);
Scanner scanner=new Scanner(System.in);
while (scanner.hasNext())
{
String msg = scanner.nextLine();
channel.basicPublish("",MqUtiles.QUEUE_NAME,null,msg.getBytes());
}
}
}
消费者
public class User {
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = MqUtiles.function();
//申明接收消息
DeliverCallback deliverCallback=(consumerTag, message)->
{
System.out.println(new String(message.getBody()));
};
CancelCallback cancelCallback= consumerTag->
{
System.out.println("消息被中断");
};
/**
* 消费者消费消息
* 1.消费哪个队列
* 2.消费成功后,是否需要自动应答,true表示自动应答
* 3.消费未成功的回调
* 4.消费者取消消费的回调
*/
System.out.println("请求B......");
channel.basicConsume(MqUtiles.QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
启用多个线程
2.消息应答
自动应答
1.在高吞吐量与数据安全性的方面进行权衡
2.这种模式追求的是一个吞吐量以及高速率处理信息,消费者接到消息后,mq就将信息删除,数据可能未完全读取,出现消息丢失
手动应答推荐
作用:消费者处理完信息后,给mq回复,mq就可以将该消息删除,避免消费者服务异常,导致消息未完全处理,而mq就将消息删除,导致消息丢失
手动消息应答
channel.basicAck(deliverTag,true)//第二个参数就是是否设置批量应答
//一个信道可能有多个数据,批量应答能回复此信道的消息,
//虽然能提高速率,解决拥堵问题,但是可能会造成数据丢失,所以尽量不要批量应答