docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.12-management
docker会自己下载,然后运行
进入docker:
docker exec -it rabbitmq bash
进入容器,重启rabbitmq:rabbitmq-server restart
感觉所有的消息队列都差不多,都是创建,连接,发消息,获得消息
package com.quxiao; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @program: springboot * @author: quxiao * @create: 2023-10-29 09:39 **/ public class t1 { public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setUsername("123"); factory.setPassword("123"); factory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { //连接 connection = factory.newConnection("生产者1"); //通道 channel = connection.createChannel(); channel.queueDeclare("duilie1", false, false, false, null); channel.basicPublish("", "duilie1", null, "队列消息".getBytes()); } catch (IOException e) { throw new RuntimeException(e); } catch (TimeoutException e) { throw new RuntimeException(e); } finally { channel.close(); connection.close(); } } }
消费:
package com.quxiao; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @program: springboot * @author: quxiao * @create: 2023-10-29 10:11 **/ public class t2 { public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setUsername("123"); factory.setPassword("123"); factory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { //连接 connection = factory.newConnection("生产者1"); //通道 channel = connection.createChannel(); // channel.queueDeclare("duilie1", false, false, false, null); // channel.basicPublish("", "duilie1", null, "队列消息".getBytes()); channel.basicConsume("duilie1", true, new DeliverCallback() { @Override public void handle(String consumerTag, Delivery message) throws IOException { System.out.println(new String(message.getBody(), "UTF-8")); } }, new CancelCallback() { @Override public void handle(String consumerTag) throws IOException { System.out.println("接收消息失败"); } }); } catch (IOException e) { throw new RuntimeException(e); } catch (TimeoutException e) { throw new RuntimeException(e); } finally { channel.close(); connection.close(); } } }
路由分组模式:
定义路由key,将队列绑定,发送到路由key,就会发到被绑定的所有队列。
package com.quxiao; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @program: springboot * @author: quxiao * @create: 2023-10-29 09:39 **/ public class t1 { public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127"); factory.setPort(5672); factory.setUsername("123"); factory.setPassword("123"); factory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { //连接 connection = factory.newConnection("生产者1"); //通道 channel = connection.createChannel(); // channel.queueDeclare("", false, false, false, null); channel.basicPublish("amq.direct", "type2", null, "队列消息".getBytes()); //21 } catch (IOException e) { throw new RuntimeException(e); } catch (TimeoutException e) { throw new RuntimeException(e); } finally { channel.close(); connection.close(); } } }