生产者
package com.qf.mq2302.publishSub;
import com.qf.mq2302.utils.MQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class EmitLog {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception {
Connection conn = MQUtils.getConnection();
Channel channel = conn.createChannel();
// 在mq中声明一个交换机
/**
* 第一个参数:交换机的名字
* 第二个参数:交换机的类型,fanout代表该交换机会把收到的消息无差别投递给所有他关联的队列
*/
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String msg = "hello fanout!";
/**
* 第一个参数,交换机的名字
* 第二个参数:如果交换机是 fanout类型的,可以写空串 ;因为fanout类型的交换机会把消息无差别向关联队列投递
*/
channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes("utf-8"));
channel.close();
conn.close();
}
}
消费者1
package com.qf.mq2302.publishSub;
import com.qf.mq2302.utils.MQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
import java.io.IOException;
public class ReceiveLogs01 {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception {
Connection conn = MQUtils.getConnection();
Channel channel = conn.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 在mq中声明一个名字是随机字符串的队列(队列的所有属性都是默认值),返回队列的名字
String queueName = channel.queueDeclare().getQueue();
// 把队列和交换机建立好绑定关系
/**
* 参数1: 队列名
* 参数2: 交换机名
* 参数3: routingkey,注意,如果交换机是fanout类型,可以写空串
*/
channel.queueBind(queueName,EXCHANGE_NAME,"");
channel.basicQos(1);
channel.basicConsume(queueName, false, new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
byte[] body = message.getBody();
String msg = new String(body, "utf-8");
System.out.println("01:"+msg);
// 手动ack
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
}
},c -> {});
}
}
消费者2
package com.qf.mq2302.publishSub;
import com.qf.mq2302.utils.MQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
import java.io.IOException;
public class ReceiveLogs02 {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception {
Connection conn = MQUtils.getConnection();
Channel channel = conn.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 在mq中声明一个名字是随机字符串的队列(队列的所有属性都是默认值),返回队列的名字
String queueName = channel.queueDeclare().getQueue();
// 把队列和交换机建立好绑定关系
/**
* 参数1: 队列名
* 参数2: 交换机名
* 参数3: routingkey,注意,如果交换机是fanout类型,可以写空串
*/
channel.queueBind(queueName,EXCHANGE_NAME,"");
channel.basicQos(1);
channel.basicConsume(queueName, false, new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
byte[] body = message.getBody();
String msg = new String(body, "utf-8");
System.out.println("02:"+msg);
// 手动ack
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
}
},c -> {});
}
}
消费者3
package com.qf.mq2302.publishSub;
import com.qf.mq2302.utils.MQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
import java.io.IOException;
public class ReceiveLogs03 {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception {
Connection conn = MQUtils.getConnection();
Channel channel = conn.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 在mq中声明一个名字是随机字符串的队列(队列的所有属性都是默认值),返回队列的名字
String queueName = channel.queueDeclare().getQueue();
// 把队列和交换机建立好绑定关系
/**
* 参数1: 队列名
* 参数2: 交换机名
* 参数3: routingkey,注意,如果交换机是fanout类型,可以写空串
*/
channel.queueBind(queueName,EXCHANGE_NAME,"");
channel.basicQos(1);
channel.basicConsume(queueName, false, new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
byte[] body = message.getBody();
String msg = new String(body, "utf-8");
System.out.println("03:"+msg);
// 手动ack
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
}
},c -> {});
}
}