发布订阅模式
一个消息可以由多个消费者消费同一个消息
消费者1和2同时消费了该消息
举例
public static void main(String[] args) throws IOException, TimeoutException { //1 创建连接工厂 ConnectionFactory connectionFactory=new ConnectionFactory(); //2 设置rabbitmq ip地址 connectionFactory.setHost("localhost"); //3 创建连接对象 Conection对象 Connection connection=connectionFactory.newConnection(); //4 创建管道 Chanel Channel channel=connection.createChannel(); //5 设置队列属性 /** * 第一个参数:队列的名称 * 第二个参数:队列是否要持久化 * 第三个参数:是否排他性(是否在同一个Connection,如果设置为true,不同的Connection是获得不到消息的) * 第四个参数:是否自动删除消息 * 第五个参数:是否要设置一些额外的参数 */ //channel.queueDeclare("02-work",false,false,true,null); /** * 发布订阅模式需要指定交换机和类型,不能用上面的模式 * 交换机 Exchange 只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定 * 或者没有符合路由规则的队列,那么消息会丢失 * 第一个参数:交换机名字 * 第二个参数:交换机类型 * fanout:广播,将消息交给所有绑定到交换机的队列 * direct:定向,把消息交给符合指定routing key的队列 * topic:通配符,把消息交给符合routing pattern(路由模式)的队列 */ channel.exchangeDeclare("03-pubsub1", "fanout"); //6 发送消息 /** * 第一个参数:交换机名称 没有交换机就设置"" * 第二个参数:路由key * 第三个参数:消息属性 * 第四个参数:消息内容 */ channel.basicPublish("03-pubsub1","", MessageProperties.PERSISTENT_TEXT_PLAIN, "hello rabbitMQ".getBytes()); //7 关闭消息 //channel.close(); connection.close(); }
消费者1和2同时消费了该消息,比如说消息是发短信,发邮件, 那么1和发短息 2可以发邮件
public static void main(String[] args) throws IOException, TimeoutException { //1 创建连接工厂 ConnectionFactory connectionFactory=new ConnectionFactory(); //2 设置rabbitmq ip地址 connectionFactory.setHost("localhost"); //3 创建连接对象 Conection对象 Connection connection=connectionFactory.newConnection(); //4 创建管道 Chanel Channel channel=connection.createChannel(); //5 设置队列属性 /** * 第一个参数:队列的名称 * 第二个参数:队列是否要持久化 * 第三个参数:是否排他性(是否在同一个Connection,如果设置为true,不同的Connection是获得不到消息的) * 第四个参数:是否自动删除消息 * 第五个参数:是否要设置一些额外的参数 */ //channel.queueDeclare("02-work",false,false,true,null); //channel.basicQos(1); //6 使用chanel 去 rabbitmq 获取消息进行消费 /** * 第一个参数:队列的名称 * 第二个参数:是否自动签收 * 第三个参数:消息属性 * 第四个参数:消息内容 */ channel.exchangeDeclare("03-pubsub1", "fanout"); //绑定 String queue = channel.queueDeclare().getQueue(); channel.queueBind(queue, "03-pubsub1", ""); channel.basicConsume(queue, false,new DeliverCallback(){ /** * 当消息从mq 中取出来了会回调这个方法 * 消费者消费消息就在这个 handle中进行处理 */ @Override public void handle(String s, Delivery delivery){ System.out.println("消费者 1 消息中的内容为:"+new String(delivery.getBody())); } },new CancelCallback(){ /** * 当消息取消了会回调这个方法 */ @Override public void handle(String s) throws IOException { System.out.println(111); } }); //7 关闭消息 注意消费者 需要持续监听,不要关闭 //channel.close(); //connection.close(); }
routing路由模式
就是说哪些让谁干
哪些让谁干区分出来
也可以让所有消费者都消费
选择性的让某个消费者消费,或者都消费
生产者
public static void main(String[] args) throws IOException, TimeoutException { //1 创建连接工厂 ConnectionFactory connectionFactory=new ConnectionFactory(); //2 设置rabbitmq ip地址 connectionFactory.setHost("localhost"); //3 创建连接对象 Conection对象 Connection connection=connectionFactory.newConnection(); //4 创建管道 Chanel Channel channel=connection.createChannel(); //5 设置队列属性 /** * 第一个参数:队列的名称 * 第二个参数:队列是否要持久化 * 第三个参数:是否排他性(是否在同一个Connection,如果设置为true,不同的Connection是获得不到消息的) * 第四个参数:是否自动删除消息 * 第五个参数:是否要设置一些额外的参数 */ //channel.queueDeclare("02-work",false,false,true,null); /** * 发布订阅模式需要指定交换机和类型,不能用上面的模式 * 交换机 Exchange 只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定 * 或者没有符合路由规则的队列,那么消息会丢失 * 第一个参数:交换机名字 * 第二个参数:交换机类型 * fanout:广播,将消息交给所有绑定到交换机的队列 * direct:定向,把消息交给符合指定routing key的队列 * topic:通配符,把消息交给符合routing pattern(路由模式)的队列 */ channel.exchangeDeclare("04-routing1", "direct"); //6 发送消息 /** * 第一个参数:交换机名称 没有交换机就设置"" * 第二个参数:路由key routing模式需要路由key * 第三个参数:消息属性 * 第四个参数:消息内容 */ channel.basicPublish("04-routing1","info", MessageProperties.PERSISTENT_TEXT_PLAIN, "hello rabbitMQ".getBytes()); //7 关闭消息 //channel.close(); connection.close(); }
消费者1
public static void main(String[] args) throws IOException, TimeoutException { //1 创建连接工厂 ConnectionFactory connectionFactory=new ConnectionFactory(); //2 设置rabbitmq ip地址 connectionFactory.setHost("localhost"); //3 创建连接对象 Conection对象 Connection connection=connectionFactory.newConnection(); //4 创建管道 Chanel Channel channel=connection.createChannel(); //5 设置队列属性 /** * 第一个参数:队列的名称 * 第二个参数:队列是否要持久化 * 第三个参数:是否排他性(是否在同一个Connection,如果设置为true,不同的Connection是获得不到消息的) * 第四个参数:是否自动删除消息 * 第五个参数:是否要设置一些额外的参数 */ //channel.queueDeclare("02-work",false,false,true,null); //channel.basicQos(1); //6 使用chanel 去 rabbitmq 获取消息进行消费 /** * 第一个参数:队列的名称 * 第二个参数:是否自动签收 * 第三个参数:消息属性 * 第四个参数:消息内容 */ channel.exchangeDeclare("04-routing1", "direct"); //绑定 String queue = channel.queueDeclare().getQueue(); //可与绑定多个 channel.queueBind(queue, "04-routing1", "info"); channel.queueBind(queue, "04-routing1", "error"); channel.queueBind(queue, "04-routing1", "waring"); channel.basicConsume(queue, true,new DeliverCallback(){ /** * 当消息从mq 中取出来了会回调这个方法 * 消费者消费消息就在这个 handle中进行处理 */ @Override public void handle(String s, Delivery delivery){ System.out.println("消费者 1 消息中的内容为:"+new String(delivery.getBody())); } },new CancelCallback(){ /** * 当消息取消了会回调这个方法 */ @Override public void handle(String s) throws IOException { System.out.println(111); } }); //7 关闭消息 注意消费者 需要持续监听,不要关闭 //channel.close(); //connection.close(); }
消费者2
public static void main(String[] args) throws IOException, TimeoutException { //1 创建连接工厂 ConnectionFactory connectionFactory=new ConnectionFactory(); //2 设置rabbitmq ip地址 connectionFactory.setHost("localhost"); //3 创建连接对象 Conection对象 Connection connection=connectionFactory.newConnection(); //4 创建管道 Chanel Channel channel=connection.createChannel(); //5 设置队列属性 /** * 第一个参数:队列的名称 * 第二个参数:队列是否要持久化 * 第三个参数:是否排他性(是否在同一个Connection,如果设置为true,不同的Connection是获得不到消息的) * 第四个参数:是否自动删除消息 * 第五个参数:是否要设置一些额外的参数 */ //channel.queueDeclare("02-work",false,false,true,null); //channel.basicQos(1); //6 使用chanel 去 rabbitmq 获取消息进行消费 /** * 第一个参数:队列的名称 * 第二个参数:是否自动签收 * 第三个参数:消息属性 * 第四个参数:消息内容 */ channel.exchangeDeclare("04-routing1", "direct"); //绑定 String queue = channel.queueDeclare().getQueue(); //可与绑定多个 channel.queueBind(queue, "04-routing1", "trace"); channel.basicConsume(queue, true,new DeliverCallback(){ /** * 当消息从mq 中取出来了会回调这个方法 * 消费者消费消息就在这个 handle中进行处理 */ @Override public void handle(String s, Delivery delivery){ System.out.println("消费者 2 消息中的内容为:"+new String(delivery.getBody())); } },new CancelCallback(){ /** * 当消息取消了会回调这个方法 */ @Override public void handle(String s) throws IOException { System.out.println(111); } }); //7 关闭消息 注意消费者 需要持续监听,不要关闭 //channel.close(); //connection.close(); }
上面的只有消费者1消费了消息
可以根据channel.queueBind(queue, "04-routing1", "trace"); 绑定消息 也可以让1和2都消费,
topic模式和Routing模式高度相识,用通配符的形式指定让谁消费,或者都消费
生产者
public static void main(String[] args) throws IOException, TimeoutException { //1 创建连接工厂 ConnectionFactory connectionFactory=new ConnectionFactory(); //2 设置rabbitmq ip地址 connectionFactory.setHost("localhost"); //3 创建连接对象 Conection对象 Connection connection=connectionFactory.newConnection(); //4 创建管道 Chanel Channel channel=connection.createChannel(); //5 设置队列属性 /** * 第一个参数:队列的名称 * 第二个参数:队列是否要持久化 * 第三个参数:是否排他性(是否在同一个Connection,如果设置为true,不同的Connection是获得不到消息的) * 第四个参数:是否自动删除消息 * 第五个参数:是否要设置一些额外的参数 */ //channel.queueDeclare("02-work",false,false,true,null); /** * 发布订阅模式需要指定交换机和类型,不能用上面的模式 * 交换机 Exchange 只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定 * 或者没有符合路由规则的队列,那么消息会丢失 * 第一个参数:交换机名字 * 第二个参数:交换机类型 * fanout:广播,将消息交给所有绑定到交换机的队列 * direct:定向,把消息交给符合指定routing key的队列 * topic:通配符,把消息交给符合routing pattern(路由模式)的队列 */ channel.exchangeDeclare("05-topic1", "topic"); //6 发送消息 /** * 第一个参数:交换机名称 没有交换机就设置"" * 第二个参数:路由key routing模式需要路由key * 第三个参数:消息属性 * 第四个参数:消息内容 */ channel.basicPublish("05-topic1","employee.save", MessageProperties.PERSISTENT_TEXT_PLAIN, "hello rabbitMQ".getBytes()); //7 关闭消息 //channel.close(); connection.close(); }
消费者1
public static void main(String[] args) throws IOException, TimeoutException { //1 创建连接工厂 ConnectionFactory connectionFactory=new ConnectionFactory(); //2 设置rabbitmq ip地址 connectionFactory.setHost("localhost"); //3 创建连接对象 Conection对象 Connection connection=connectionFactory.newConnection(); //4 创建管道 Chanel Channel channel=connection.createChannel(); //5 设置队列属性 /** * 第一个参数:队列的名称 * 第二个参数:队列是否要持久化 * 第三个参数:是否排他性(是否在同一个Connection,如果设置为true,不同的Connection是获得不到消息的) * 第四个参数:是否自动删除消息 * 第五个参数:是否要设置一些额外的参数 */ //channel.queueDeclare("02-work",false,false,true,null); //channel.basicQos(1); //6 使用chanel 去 rabbitmq 获取消息进行消费 /** * 第一个参数:队列的名称 * 第二个参数:是否自动签收 * 第三个参数:消息属性 * 第四个参数:消息内容 */ channel.exchangeDeclare("05-topic1", "topic"); //绑定 String queue = channel.queueDeclare().getQueue(); //可与绑定多个 channel.queueBind(queue, "05-topic1", "employee.*"); channel.basicConsume(queue, true,new DeliverCallback(){ /** * 当消息从mq 中取出来了会回调这个方法 * 消费者消费消息就在这个 handle中进行处理 */ @Override public void handle(String s, Delivery delivery){ System.out.println("消费者 1 消息中的内容为:"+new String(delivery.getBody())); } },new CancelCallback(){ /** * 当消息取消了会回调这个方法 */ @Override public void handle(String s) throws IOException { System.out.println(111); } }); //7 关闭消息 注意消费者 需要持续监听,不要关闭 //channel.close(); //connection.close(); }
消费者2
public static void main(String[] args) throws IOException, TimeoutException { //1 创建连接工厂 ConnectionFactory connectionFactory=new ConnectionFactory(); //2 设置rabbitmq ip地址 connectionFactory.setHost("localhost"); //3 创建连接对象 Conection对象 Connection connection=connectionFactory.newConnection(); //4 创建管道 Chanel Channel channel=connection.createChannel(); //5 设置队列属性 /** * 第一个参数:队列的名称 * 第二个参数:队列是否要持久化 * 第三个参数:是否排他性(是否在同一个Connection,如果设置为true,不同的Connection是获得不到消息的) * 第四个参数:是否自动删除消息 * 第五个参数:是否要设置一些额外的参数 */ //channel.queueDeclare("02-work",false,false,true,null); //channel.basicQos(1); //6 使用chanel 去 rabbitmq 获取消息进行消费 /** * 第一个参数:队列的名称 * 第二个参数:是否自动签收 * 第三个参数:消息属性 * 第四个参数:消息内容 */ channel.exchangeDeclare("05-topic1", "topic"); //绑定 String queue = channel.queueDeclare().getQueue(); //可与绑定多个 channel.queueBind(queue, "05-topic1", "user.*"); channel.basicConsume(queue, true,new DeliverCallback(){ /** * 当消息从mq 中取出来了会回调这个方法 * 消费者消费消息就在这个 handle中进行处理 */ @Override public void handle(String s, Delivery delivery){ System.out.println("消费者 2 消息中的内容为:"+new String(delivery.getBody())); } },new CancelCallback(){ /** * 当消息取消了会回调这个方法 */ @Override public void handle(String s) throws IOException { System.out.println(111); } }); //7 关闭消息 注意消费者 需要持续监听,不要关闭 //channel.close(); //connection.close(); }
结果就是消费者1消费了消息
所有工作模式总结