topic交换机和fanout交换机类似,也是广播机制,但是topic需要绑定RoutingKey,绑定RoutingKey时可以使用通配符(*,#)代替。
*:只能一个单词
#:0个或多个单词
编写topic消息发送类
1.编写Receive1类
package com.it.rabbitmq.topic;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Receive1 {
public static void main(String[] args) {
//创建链接工厂对象
ConnectionFactory factory=new ConnectionFactory();
factory.setUsername("root");
factory.setPassword("123456");
factory.setHost("192.168.174.129");
factory.setPort(5672);
Connection connection=null;//定义链接对象
Channel channel=null;//定义通道对象
try {
connection=factory.newConnection();//实例化链接对象
channel=connection.createChannel();//实例化通道对象
channel.queueDeclare("topicQueue1",true,false,false,null);
channel.exchangeDeclare("topicExchange", "topic", true);
channel.queueBind("topicQueue1","topicExchange","aa");
channel.basicConsume("topicQueue1",true, "",new DefaultConsumer(channel) {
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
//获取消息数据
String bodyStr = new String(body);
System.out.println("1消费者aa:"+bodyStr);
}
});
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
编写Receive2类
package com.it.rabbitmq.topic;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Receive2 {
public static void main(String[] args) {
//创建链接工厂对象
ConnectionFactory factory=new ConnectionFactory();
factory.setUsername("root");
factory.setPassword("123456");
factory.setHost("192.168.174.129");
factory.setPort(5672);
Connection connection=null;//定义链接对象
Channel channel=null;//定义通道对象
try {
connection=factory.newConnection();//实例化链接对象
channel=connection.createChannel();//实例化通道对象
channel.queueDeclare("topicQueue2",true,false,false,null);
channel.exchangeDeclare("topicExchange", "topic", true);
channel.queueBind("topicQueue2","topicExchange","aa.*");
channel.basicConsume("topicQueue2",true, "",new DefaultConsumer(channel) {
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
//获取消息数据
String bodyStr = new String(body);
System.out.println("2消费者aa.*:"+bodyStr);
}
});
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
编写Receive3类
package com.it.rabbitmq.topic;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Receive3 {
public static void main(String[] args) {
//创建链接工厂对象
ConnectionFactory factory=new ConnectionFactory();
factory.setUsername("root");
factory.setPassword("123456");
factory.setHost("192.168.174.129");
factory.setPort(5672);
Connection connection=null;//定义链接对象
Channel channel=null;//定义通道对象
try {
connection=factory.newConnection();//实例化链接对象
channel=connection.createChannel();//实例化通道对象
channel.queueDeclare("topicQueue3",true,false,false,null);
channel.exchangeDeclare("topicExchange", "topic", true);
channel.queueBind("topicQueue3","topicExchange","aa.#");
channel.basicConsume("topicQueue3",true, "",new DefaultConsumer(channel) {
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
//获取消息数据
String bodyStr = new String(body);
System.out.println("3消费者aa.#:"+bodyStr);
}
});
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
2.分别启动三个接收类
注意:
1、在topic模式中必须要指定Routingkey,并且可以同时指定多层的RoutingKey,每个层次之间使用 点分隔即可 例如 test.myRoutingKey
编写topic的消息接收类
1.编写发送类
package com.it.rabbitmq.exchange.topic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Send {
public static void main(String[] args) {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//配置rabbitMQ的连接信息
factory.setHost("192.168.174.129");
factory.setPort(5672);
factory.setUsername("root");
factory.setPassword("123456");
//定义连接
Connection connection = null;
//定义通道
Channel channel = null;
try {
connection = factory.newConnection();
channel = connection.createChannel();
channel.exchangeDeclare("topicExchange", "topic", true);
String message = "topic的测试消息!";
channel.basicPublish("topicExchange", "aa", null, message.getBytes("utf-8"));
System.out.println("消息发送成功,direct");
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} finally {
if (channel != null) {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
2.测试发送类
1)当RoutingKey为aa时
接收类1(aa),接收类3(aa.#)可以接收到消息
2)当RoutingKey为aa.bb时
接收类2(aa.*)和接收类3(aa.#)可以接收消息
3)当RoutingKey为aa.bb.cc时
接收类3(aa.#)可以接收消息
注意:
1、Topic模式的消息接收时必须要指定RoutingKey并且可以使用# 和 *来做统配符号,#表示通配任意一个单词 *表示通配任意多个单词,例如消费者的RoutingKey为test.#或#.myRoutingKey都可以获取RoutingKey为test.myRoutingKey发送者发送的消息