目录
前言介绍
(1)启动RabbitMQ
(2)账户管理
一、简单模式
(1)概念
(2)生产者代码
(3)消费者代码
二、工作队列模式
(1)概念
(1)生产者代码
(2)消费者代码
三、发布订阅模式
(1)概念
(2)生产者代码
(3)消费者代码
四、路由模式
(1)概念
(2)生产者代码
(3)消费者代码
五、通配符模式
(1)概念
(2)生产者代码
(3)消费者代码
前言介绍
前言:想要进行RabbitMQ的工作模式,首先你得先启动RabbitMQ并同时进行账号管理
(1)启动RabbitMQ
为了可以正常启动RabbitMQ,需要将防火墙关闭 #查看防火墙的运行状态 firewall-cmd --state #关闭正在运行的防火墙 systemctl stop firewalld.service #禁止防火墙自启动 systemctl disable firewalld.service #开启管控台插件 rabbitmq-plugins enable rabbitmq_managment #启动rabbitmq rabbitmq-server -detached #停止rabbitmq rabbitmqctl stop #重启rabbitmq rabbitmqctl stop_app rabbitmqctl reset rabbitmqctl start_app 通过管控台访问rabbitmq,在网址栏输入http://IP地址:15672
(2)账户管理
1:创建账户 rabbitmqctl add_user 用户名 密码 2:给用户创建管理员角色 rabbitmqctl set_user_tags 用户名 administrator 3:给用户授权 "/" 表示虚拟机 "." "." "." 表示完整权限 rabbitmqctl set_permissions -p "/" 用户名 "." "." "."
一、简单模式
(1)概念
简单工作模式的特点:
(1)一个生产者对应一个消费者,通过队列进行消息的传递
(2)该模式使用的是direct交换机,也是默认的交换机
(2)生产者代码
package com.itbaizhan.simple; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException; public class Producer { public static void main(String[] args) throws IOException, TimeoutException { //创建连接工厂 ConnectionFactory connectionFactory=new ConnectionFactory(); //自己的服务器IP地址 connectionFactory.setHost("192.168.66.130"); //rabbitmq的端口号 connectionFactory.setPort(5672); //用户名和密码 connectionFactory.setUsername("root"); connectionFactory.setPassword("root"); //虚拟主机路径 connectionFactory.setVirtualHost("/"); //建立连接 Connection connection= connectionFactory.newConnection(); //建立信道 Channel channel=connection.createChannel(); //创建队列 //参数一是自己起的队列名 //参数二是是否持久化,true表示重启rabbitmq时队列还在 //参数三表示是否私有化,false表示所有的消费者都可以访问,true表示还有第一次拥有他的消费者可以访问 //参数四表示是否自动删除,true表示不再使用队列时自动删除队列 //参数五表示额外参数 channel.queueDeclare("simple_queue",false,false,false,null); //发送信息 String m="再一次的说,我是简单模式 "; //参数一表示交换机名,简单模式可不写 //参数二表示路由键,简单模式就是队列名 //参数三表示其他额外参数 //参数四表示要传递的消息的字节数组 channel.basicPublish("","simple_queue",null,m.getBytes(StandardCharsets.UTF_8)); //关闭信道和连接 channel.close(); connection.close(); System.out.println("====发送成功====="); } }
(3)消费者代码
package com.itbaizhan.simple; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Consumer { public static void main(String[] args) throws IOException, TimeoutException { //建立连接工厂 ConnectionFactory connectionFactory=new ConnectionFactory(); connectionFactory.setHost("192.168.66.130"); connectionFactory.setUsername("root"); connectionFactory.setPassword("root"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); //创建连接 Connection connection=connectionFactory.newConnection(); //创建信道 Channel channel=connection.createChannel(); //监听队列 //参数一表示监听的队列名 //参数二表示是否自动签收,false表示需要手动确认消息已经收到 //参数三表示consumer的实现类,重写该方法表示接收到消息要做的事情 channel.basicConsume("simple_queue",true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message=new String(body,"UTF-8"); System.out.println("接收到的消息="+message); } }); } }
二、工作队列模式
(1)概念
与简单模式相比,工作模式多了一些消费者,该模式使用的是direct
交换机,应用消息较多的情况特点:
(1)一个队列对应多个消费者
(2)一条消息只会被一个消费者消费
(3)消息队列默认采用轮询的方式将消息平均发送给消费者,详细点说就是队列中的信息一个一个排队给消费者,其中的消息只有一次(不懂可以看代码展示)
(1)生产者代码
package com.itbaizhan.work; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException; public class Producer { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory=new ConnectionFactory(); connectionFactory.setHost("192.168.66.130"); connectionFactory.setUsername("root"); connectionFactory.setPassword("root"); connectionFactory.setVirtualHost("/"); connectionFactory.setPort(5672); //创建连接 Connection connection=connectionFactory.newConnection(); //建立信道 Channel channel=connection.createChannel(); //创建队列 channel.queueDeclare("work_queue",true,false,false,null); //发送信息 for(int i=1;i<=100;i++){ //参数三表示的是持久化信息,即除了保存到内存中还会保存到磁盘中 channel.basicPublish("","work_queue",MessageProperties.PERSISTENT_TEXT_PLAIN,("现在是第"+i+"条信息").getBytes(StandardCharsets.UTF_8)); } //关闭资源 channel.close(); connection.close(); } }
(2)消费者代码
package com.itbaizhan.work; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; //工作模式消费者1 public class Consumer1 { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory=new ConnectionFactory(); connectionFactory.setPort(5672); connectionFactory.setHost("192.168.66.130"); connectionFactory.setUsername("root"); connectionFactory.setPassword("root"); connectionFactory.setVirtualHost("/"); //创建连接 Connection connection=connectionFactory.newConnection(); //创建信道 Channel channel=connection.createChannel(); //接受信息 channel.basicConsume("work_queue",true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者1号接受="+new String(body,"UTF-8")); } }); } } //后面的消费者2和3与1一样
三、发布订阅模式
(1)概念
一些消息需要不同的消费者进行不同的处理,如电商网站中消息的发送有邮件,
短信,弹窗等等,这就用到了订阅模式特点
(1)生产者将信息发送给交换机,交换机将消息转发到绑定此交换机的每个队列中
(2)工作模式中的交换机(direct交换机)只能将交换机发送给一个
队列,而订阅模式的交换机可以发送给多个队列。
(3)该模式的交换机使用fanout交换机
(2)生产者代码
package com.itbaizhan.publish; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException; //发布订阅模式生产者 public class Producer { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory=new ConnectionFactory(); connectionFactory.setVirtualHost("/"); connectionFactory.setUsername("root"); connectionFactory.setPassword("root"); connectionFactory.setHost("192.168.66.130"); connectionFactory.setPort(5672); //建立连接 Connection connection=connectionFactory.newConnection(); Channel channel=connection.createChannel(); //创建交换机 //参数一是交换机名 //参数二是交换机类型 //参数三是交换机持久化 channel.exchangeDeclare("exchange_fanout", BuiltinExchangeType.FANOUT,true); //创建队列 channel.queueDeclare("send1",true,false,false,null); channel.queueDeclare("send2",true,false,false,null); channel.queueDeclare("send3",true,false,false,null); //交换机绑定队列 //参数一是队列名 //参数二是交换机名 //参数三是路由关键字,订阅模式使用"" channel.queueBind("send1","exchange_fanout",""); channel.queueBind("send2","exchange_fanout",""); channel.queueBind("send3","exchange_fanout",""); //发送信息 for(int i=1;i<=100;i++){ //参数二是路由键 channel.basicPublish("exchange_fanout"," ",null,("这是第"+i+"条信息").getBytes(StandardCharsets.UTF_8)); } channel.close(); connection.close(); } }
(3)消费者代码
package com.itbaizhan.publish; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Consumer_send1 { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory=new ConnectionFactory(); connectionFactory.setVirtualHost("/"); connectionFactory.setUsername("root"); connectionFactory.setPassword("root"); connectionFactory.setPort(5672); connectionFactory.setHost("192.168.66.130"); //建立连接 Connection connection=connectionFactory.newConnection(); Channel channel=connection.createChannel(); channel.basicConsume("send1",true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("send1的队列="+new String(body,"Utf-8")); } }); } } //send2和send3如上一致
四、路由模式
(1)概念
使用发布订阅模式时,所有消息都活发送到绑定的队列中,
但是我们并不是希望所有的消息都无差别的发送到所有队列中,
比如一个活动中,有些消息需要邮件发送,有些需要短信发送,
而路由模式便是如此。特点:
(1)每个队列绑定路由关键字rountingkey
(2)生产者将带有rountingkey的消息发送给交换机,交换机再根据
rountingkey转发到指定队列。(该模式使用direct交换机)
(2)生产者代码
package com.itbaizhan.rounter; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException; public class Producer { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory=new ConnectionFactory(); connectionFactory.setHost("192.168.66.130"); connectionFactory.setUsername("root"); connectionFactory.setPassword("root"); connectionFactory.setVirtualHost("/"); connectionFactory.setPort(5672); Connection connection=connectionFactory.newConnection(); Channel channel=connection.createChannel(); //创建交换机 channel.exchangeDeclare("exchange_rounter", BuiltinExchangeType.DIRECT,true); //创建队列 channel.queueDeclare("mail1",true,false,false,null); channel.queueDeclare("mail2",true,false,false,null); channel.queueDeclare("mail3",true,false,false,null); //交换机绑定队列 //参数三表示路由关键字 channel.queueBind("mail1","exchange_rounter","import"); channel.queueBind("mail2","exchange_rounter","import"); channel.queueBind("mail3","exchange_rounter","normal"); //发送信息 channel.basicPublish("exchange_rounter","import",null,"我是import".getBytes(StandardCharsets.UTF_8)); channel.basicPublish("exchange_rounter","normal",null,"我是normal".getBytes(StandardCharsets.UTF_8)); //关闭资源 channel.close(); connection.close(); } }
(3)消费者代码
package com.itbaizhan.rounter; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Consumer_mail1 { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory=new ConnectionFactory(); connectionFactory.setHost("192.168.66.130"); connectionFactory.setUsername("root"); connectionFactory.setPassword("root"); connectionFactory.setVirtualHost("/"); connectionFactory.setPort(5672); //建立连接 Connection connection=connectionFactory.newConnection(); Channel channel=connection.createChannel(); //监听队列 channel.basicConsume("mail1",true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("mail1队列="+new String(body,"UTF-8")); } }); } } //监听mail2和mail3与上基本一致
五、通配符模式
(1)概念
通配符模式是在路由模式的基础上,给队列绑定带通配符的
路由关键字,只要消息的RoutingKey能实现通配符匹配,就会将消
息转发到该队列。通配符模式比路由模式更灵活
(该模式使用topic交换机)
通配符规则:消息设置RoutingKey时,RoutingKey由多个单词构成,中间以.分割。
队列设置RoutingKey时,#可以匹配任意多个单词,*可以匹配任意一个单词。
注意:通配符和路由模式都是根据指定规则或者关键字绑定到对应队列,是队列!而不是消费者!
(2)生产者代码
package com.itbaizhan.topic; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException; public class Producer { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory=new ConnectionFactory(); connectionFactory.setHost("192.168.66.130"); connectionFactory.setUsername("root"); connectionFactory.setPassword("root"); connectionFactory.setVirtualHost("/"); connectionFactory.setPort(5672); //建立连接 Connection connection=connectionFactory.newConnection(); Channel channel=connection.createChannel(); //创建交换机 channel.exchangeDeclare("exchange_topic", BuiltinExchangeType.TOPIC,true); //创建队列 channel.queueDeclare("em1",true,false,false,null); channel.queueDeclare("em2",true,false,false,null); channel.queueDeclare("em3",true,false,false,null); //交换机绑定队列 channel.queueBind("em1","exchange_topic","#.import.#"); channel.queueBind("em2","exchange_topic","#.normal.#"); channel.queueBind("em3","exchange_topic","#.low.#"); //发送信息 channel.basicPublish("exchange_topic","import.noraml",null,"我是一号".getBytes(StandardCharsets.UTF_8)); channel.basicPublish("exchange_topic","import.low",null,"我是二号".getBytes(StandardCharsets.UTF_8)); channel.basicPublish("exchange_topic","import.low.normal",null,"我是全部".getBytes(StandardCharsets.UTF_8)); //关闭资源 channel.close(); connection.close(); } }
(3)消费者代码
package com.itbaizhan.topic; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Consumer_em1{ public static void main(String[] args) throws IOException, TimeoutException { //建立连接工厂 ConnectionFactory connectionFactory=new ConnectionFactory(); connectionFactory.setHost("192.168.66.130"); connectionFactory.setUsername("root"); connectionFactory.setPassword("root"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); //创建连接 Connection connection=connectionFactory.newConnection(); //创建信道 Channel channel=connection.createChannel(); //监听队列 //参数一表示监听的队列名 //参数二表示是否自动签收,false表示需要手动确认消息已经收到 //参数三表示consumer的实现类,重写该方法表示接收到消息要做的事情 channel.basicConsume("em1",true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message=new String(body,"UTF-8"); System.out.println("em1队列的消息="+message); } }); } } //em2队列和em3队列与上基本一致