在虚拟机上安装Erlang的GCC环境,装erlong,然后安装rabbitmq
参考:安装说明链接
安装web端面板
创建交换机
先学习一下工作模式(详细介绍可见官网)
上代码
1.Hello Word模式
写在测试类中:
Providucer
@Test
void contextLoads()throws Exception {
//1.创建链接
ConnectionFactory factory = new ConnectionFactory();
//2。设置参数
factory.setHost("192.168.63.130");
factory.setPort(5672);
factory.setVirtualHost("/peng");
factory.setUsername("peng");
factory.setPassword("peng");
//3.创建链接Connection
Connection connection = factory.newConnection();
//4.创建Channel
Channel channel = connection.createChannel();
//5.创建队列Queue
//queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
/**
* 1.queue 队列名
* 2.durable 是否持久化
* 3.exclusive (1)是否独占站,只能有一个消费的监听者 (2)当Connection关闭时是否删除队列
* 4.autoDelete 是否自动删除,当没有consumer会自动删除掉
* 5.arguments:
* */
channel.queueDeclare("peng",true,false,false,null);
//6.发送消息
//basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
/**
* 1.exchange:交换机名称
* 2.routingKey:路由名称
* 3.props:配置信息
* 4.body:发送的消息数据
*/
String body="第一个消息";
channel.basicPublish("","peng",null,body.getBytes());
//7.释放资源
channel.close();
connection.close();
}
Consumer
@Test
void contextLoads()throws Exception {
//1.创建链接
ConnectionFactory factory = new ConnectionFactory();
//2。设置参数
factory.setHost("192.168.63.130");
factory.setPort(5672);
factory.setVirtualHost("/peng");
factory.setUsername("peng");
factory.setPassword("peng");
//3.创建链接Connection
Connection connection = factory.newConnection();
//4.创建Channel
Channel channel = connection.createChannel();
//5.创建队列Queue
//queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
/**
* 1.queue 队列名
* 2.durable 是否持久化
* 3.exclusive (1)是否独占站,只能有一个消费的监听者 (2)当Connection关闭时是否删除队列
* 4.autoDelete 是否自动删除,当没有consumer会自动删除掉
* 5.arguments:
* */
channel.queueDeclare("peng",true,false,false,null);
//basicConsume(String queue, DeliverCallback deliverCallback, CancelCallback cancelCallback)
/**
* 1.queue:队列名
* 2.deliverCallback:是否自动确认收到
* 3.cancelCallback:回调对象
*/
Consumer consumer= new DefaultConsumer(channel){
/**
* 1.consumerTag:
* 2.envelope:
* 3.properties:
* 4.body:
* @param consumerTag
* @param envelope
* @param properties
* @param body
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag"+consumerTag);
System.out.println("envelope"+envelope.getExchange());
System.out.println("properties"+envelope.getRoutingKey());
System.out.println("properties"+properties);
System.out.println("body"+new String(body));
}
};
channel.basicConsume("peng",true,consumer);
}
2.Work Queues模式
生产者生产,两个消费者循环消费
P:
package com.providucer.factory;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @ClassName: Providucerfactory
* @author: 鹏
* @date: 2023/7/4 14:42
*/
public class ProvideFactory {
public static void main(String[] args) throws Exception{
//1.创建链接
ConnectionFactory factory = new ConnectionFactory();
//2。设置参数
factory.setHost("192.168.63.130");
factory.setPort(5672);
factory.setVirtualHost("/peng");
factory.setUsername("peng");
factory.setPassword("peng");
//3.创建链接Connection
Connection connection = factory.newConnection();
//4.创建Channel
Channel channel = connection.createChannel();
//5.创建队列Queue
//queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
/**
* 1.queue 队列名
* 2.durable 是否持久化
* 3.exclusive (1)是否独占站,只能有一个消费的监听者 (2)当Connection关闭时是否删除队列
* 4.autoDelete 是否自动删除,当没有consumer会自动删除掉
* 5.arguments:
* */
channel.queueDeclare("pengwork",true,false,false,null);
//6.发送消息
//basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
/**
* 1.exchange:交换机名称
* 2.routingKey:路由名称
* 3.props:配置信息
* 4.body:发送的消息数据
*/
for (int i = 1; i <= 10; i++) {
String body="第"+i+"个消息";
channel.basicPublish("","pengwork",null,body.getBytes());
}
//7.释放资源
channel.close();
connection.close();
}
}
C1:
package com.consumer.factory;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @ClassName: ConsumerFactory
* @author: 鹏
* @date: 2023/7/4 14:44
*/
public class ConsumerFactory {
public static void main(String[] args)throws Exception {
//1.创建链接
ConnectionFactory factory = new ConnectionFactory();
//2。设置参数
factory.setHost("192.168.63.130");
factory.setPort(5672);
factory.setVirtualHost("/peng");
factory.setUsername("peng");
factory.setPassword("peng");
//3.创建链接Connection
Connection connection = factory.newConnection();
//4.创建Channel
Channel channel = connection.createChannel();
//5.创建队列Queue
//queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
/**
* 1.queue 队列名
* 2.durable 是否持久化
* 3.exclusive (1)是否独占站,只能有一个消费的监听者 (2)当Connection关闭时是否删除队列
* 4.autoDelete 是否自动删除,当没有consumer会自动删除掉
* 5.arguments:
* */
channel.queueDeclare("pengwork",true,false,false,null);
//basicConsume(String queue, DeliverCallback deliverCallback, CancelCallback cancelCallback)
/**
* 1.queue:队列名
* 2.deliverCallback:是否自动确认收到
* 3.cancelCallback:回调对象
*/
Consumer consumer= new DefaultConsumer(channel){
/**
* 1.consumerTag:
* 2.envelope:
* 3.properties:
* 4.body:
* @param consumerTag
* @param envelope
* @param properties
* @param body
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
/* System.out.println("consumerTag"+consumerTag);
System.out.println("envelope"+envelope.getExchange());
System.out.println("properties"+envelope.getRoutingKey());
System.out.println("properties"+properties);*/
System.out.println("body"+new String(body));
}
};
channel.basicConsume("pengwork",true,consumer);
}
}
C2:
package com.consumer.factory;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @ClassName: ConsumerFactory
* @author: 鹏
* @date: 2023/7/4 14:44
*/
public class ConsumerFactory1 {
public static void main(String[] args)throws Exception {
//1.创建链接
ConnectionFactory factory = new ConnectionFactory();
//2。设置参数
factory.setHost("192.168.63.130");
factory.setPort(5672);
factory.setVirtualHost("/peng");
factory.setUsername("peng");
factory.setPassword("peng");
//3.创建链接Connection
Connection connection = factory.newConnection();
//4.创建Channel
Channel channel = connection.createChannel();
//5.创建队列Queue
//queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
/**
* 1.queue 队列名
* 2.durable 是否持久化
* 3.exclusive (1)是否独占站,只能有一个消费的监听者 (2)当Connection关闭时是否删除队列
* 4.autoDelete 是否自动删除,当没有consumer会自动删除掉
* 5.arguments:
* */
channel.queueDeclare("pengwork",true,false,false,null);
//basicConsume(String queue, DeliverCallback deliverCallback, CancelCallback cancelCallback)
/**
* 1.queue:队列名
* 2.deliverCallback:是否自动确认收到
* 3.cancelCallback:回调对象
*/
Consumer consumer= new DefaultConsumer(channel){
/**
* 1.consumerTag:
* 2.envelope:
* 3.properties:
* 4.body:
* @param consumerTag
* @param envelope
* @param properties
* @param body
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
/* System.out.println("consumerTag"+consumerTag);
System.out.println("envelope"+envelope.getExchange());
System.out.println("properties"+envelope.getRoutingKey());
System.out.println("properties"+properties);*/
System.out.println("body"+new String(body));
}
};
channel.basicConsume("pengwork",true,consumer);
}
}
消费结果:
3.Publish/Subscribe订阅模式
消费着只需要绑定相应的队列,生产者需要创建交换机
public class PubFactory {
public static void main(String[] args) throws Exception{
//1.创建链接
ConnectionFactory factory = new ConnectionFactory();
//2。设置参数
factory.setHost("192.168.63.130");
factory.setPort(5672);
factory.setVirtualHost("/peng");
factory.setUsername("peng");
factory.setPassword("peng");
//3.创建链接Connection
Connection connection = factory.newConnection();
//4.创建Channel
Channel channel = connection.createChannel();
//5.创建交换机
//exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
String exchange="test_fanout";
channel.exchangeDeclare(exchange, BuiltinExchangeType.FANOUT,
true,false,false,null);
String queue1Name = "test_fanout_queue1";
String queue2Name = "test_fanout_queue2";
//1.exchange:交换机名称
//2.type:交换机类型
/**
* DIRECT("direct"),定向
* FANOUT("fanout"),扇形(广播)
* TOPIC("topic"),通配符方式
* HEADERS("headers")参数匹配
*/
//3.durable:是否持久化
//4.autoDelete:是福哦自动删除
//5.internal: 内部使用一般用false
//6.arguments: 参数
//channel.exchangeDeclare();
//6.创建队列
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);
//7.绑定交换机与队列
/**
* 1.queue:队列名称
* 2.exchange:交换机名称
* 3.routingKey:路由键,绑定规则
* 如果交换机的类型为fanout,routingKey设置为""
*/
channel.queueBind(queue1Name,exchange,"");
channel.queueBind(queue2Name,exchange,"");
String body="日志信息:接收成功";
channel.basicPublish(exchange,"",null,body.getBytes());
//8.释放资源
channel.close();
connection.close();
}
}
4.Routing路由模式
路由模式相当于增加一层限制,只有通过相应的限制交换机才能将消息发布到对应的队列,也就是在发布的时候路由参数数设置值,且交换机类型必须为direct
channel.basicPublish(exchange,"error",null,body.getBytes());
此处限制队列路由为error的可以发送
public class RoutingFactory {
public static void main(String[] args) throws Exception{
//1.创建链接
ConnectionFactory factory = new ConnectionFactory();
//2。设置参数
factory.setHost("192.168.63.130");
factory.setPort(5672);
factory.setVirtualHost("/peng");
factory.setUsername("peng");
factory.setPassword("peng");
//3.创建链接Connection
Connection connection = factory.newConnection();
//4.创建Channel
Channel channel = connection.createChannel();
//5.创建交换机
//exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
String exchange="test_direct";
channel.exchangeDeclare(exchange, BuiltinExchangeType.DIRECT,
true,false,false,null);
String queue1Name = "test_fanout_queue1";
String queue2Name = "test_fanout_queue2";
//1.exchange:交换机名称
//2.type:交换机类型
/**
* DIRECT("direct"),定向
* FANOUT("fanout"),扇形(广播)
* TOPIC("topic"),通配符方式
* HEADERS("headers")参数匹配
*/
//3.durable:是否持久化
//4.autoDelete:是福哦自动删除
//5.internal: 内部使用一般用false
//6.arguments: 参数
//channel.exchangeDeclare();
//6.创建队列
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);
//7.绑定交换机与队列
/**
* 1.queue:队列名称
* 2.exchange:交换机名称
* 3.routingKey:路由键,绑定规则
* 如果交换机的类型为fanout,routingKey设置为""
*/
channel.queueBind(queue1Name,exchange,"error");
channel.queueBind(queue2Name,exchange,"info");
channel.queueBind(queue2Name,exchange,"error");
channel.queueBind(queue2Name,exchange,"warming");
String body="日志信息:接收成功";
channel.basicPublish(exchange,"error",null,body.getBytes());
//8.释放资源
channel.close();
connection.close();
}
}
5. Topics模式
相对于routing在队列增加了匹配规则,让交换机发送与队列接受更加灵活*
匹配一个单词,#
匹配多个单词
channel.exchangeDeclare(exchange, BuiltinExchangeType.TOPIC, true,false,false,null);
设置为BuiltinExchangeType.TOPIC
public class TopicsFactory {
public static void main(String[] args) throws Exception{
//1.创建链接
ConnectionFactory factory = new ConnectionFactory();
//2。设置参数
factory.setHost("192.168.63.130");
factory.setPort(5672);
factory.setVirtualHost("/peng");
factory.setUsername("peng");
factory.setPassword("peng");
//3.创建链接Connection
Connection connection = factory.newConnection();
//4.创建Channel
Channel channel = connection.createChannel();
//5.创建交换机
//exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
String exchange="test_topic";
channel.exchangeDeclare(exchange, BuiltinExchangeType.TOPIC,
true,false,false,null);
String queue1Name = "test_topic_queue1";
String queue2Name = "test_topic_queue2";
//1.exchange:交换机名称
//2.type:交换机类型
/**
* DIRECT("direct"),定向
* FANOUT("fanout"),扇形(广播)
* TOPIC("topic"),通配符方式
* HEADERS("headers")参数匹配
*/
//3.durable:是否持久化
//4.autoDelete:是福哦自动删除
//5.internal: 内部使用一般用false
//6.arguments: 参数
//channel.exchangeDeclare();
//6.创建队列
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);
//7.绑定交换机与队列
/**
* 1.queue:队列名称
* 2.exchange:交换机名称
* 3.routingKey:路由键,绑定规则
* 如果交换机的类型为fanout,routingKey设置为""
*/
channel.queueBind(queue1Name,exchange,"*.*");
channel.queueBind(queue2Name,exchange,"*.one");
channel.queueBind(queue2Name,exchange,"*.two");
channel.queueBind(queue2Name,exchange,"ok.*");
String body="日志信息:接收成功";
channel.basicPublish(exchange,"error",null,body.getBytes());
channel.basicPublish(exchange,"123.one",null,body.getBytes());
channel.basicPublish(exchange,"123.two",null,body.getBytes());
//8.释放资源
channel.close();
connection.close();
}
}