生产者,交换机,队列,消费者
交换机和队列通过 rounting key 绑定者,rounting key 可以是#.,*.这类topic模式,
生产者发送消息内容+ rountingkey, 到达交换机后交换机检查与之绑定的队列,
如果能匹配上则把消息丢到队列里,消费者监听某个队列,如果有消息则进行消费。
生产者:confirm 消息到达 交换机 进行确认;return 消息不能到达 队列时 进行 回退
优点:
1.应用解耦
2.流量削峰
3.异步提速
缺点:
1.系统增加了消息中间件,增加了系统的复杂度,
2.要保证消息中间件的高可用
安装:
使用 Erlang (二郎神) 语言开发.
安装参考:https://blog.csdn.net/m0_67392182/article/details/126040124
wget --content-disposition https://packagecloud.io/rabbitmq/erlang/packages/el/7/erlang-22.3.4.12-1.el7.x86_64.rpm/download.rpm
yum localinstall erlang-22.3.4.12-1.el7.x86_64.rpm
wget --content-disposition https://packagecloud.io/rabbitmq/rabbitmq-server/packages/el/7/rabbitmq-server-3.8.13-1.el7.noarch.rpm/download.rpm
rpm --import https://www.rabbitmq.com/rabbitmq-release-signing-key.asc
yum install rabbitmq-server-3.8.13-1.el7.noarch.rpm
启用rabbitmq server
systemctl start rabbitmq-server
启用管理界面
rabbitmq-plugins enable rabbitmq_management
重启
systemctl restart rabbitmq-server
访问 前端控制台
http://yourIp:15672/#/
增加用户
rabbitmqctl add_user admin admin
为用户设置权限
rabbitmqctl set_user_tags admin administrator
可能遇到的问题:
解决方案:
rabbitmq 挂掉解决方案:https://blog.csdn.net/qq_41950229/article/details/105957872
启用rabbitmq server
systemctl start rabbitmq-server
启用管理界面
rabbitmq-plugins enable rabbitmq_management
重启
systemctl restart rabbitmq-server
访问 前端控制台
http://192.168.186.141:15672/#/
增加用户
rabbitmqctl add_user admin admin
给用户分配权限
rabbitmqctl set_user_tags admin administrator
rabbitmq 挂掉解决方案:https://blog.csdn.net/qq_41950229/article/details/105957872
代码链接
交换机和队列通过路由key绑定,生产者首先发送消息(带着key的消息)到 交换机,交换机根据接收到的key 检查与它绑定的队列,如果符合 topic则发送到该队列上
package org.example.topic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.example.utils.RabbitConstant;
import org.example.utils.RabbitUtils;
import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
//生产者
/*
创建 direct 交换机
* */
public class WeatherBureau {
public static void main(String[] args) throws IOException, TimeoutException {
Map area = new LinkedHashMap<String,String>();
area.put("china.hunan.changsha.20221120","中国湖南长沙20221120天气数据");
area.put("china.hubei.wuhan.20221120","中国湖北武汉20221120天气数据");
area.put("china.hunan.changsha.20221128","中国湖南长沙20221128天气数据");
area.put("us.cal.lsj.20221120","美国加州洛杉矶20221120天气数据");
area.put("china.hebei.shijiazhuang.20221120","中国河北石家庄20221120天气数据");
area.put("china.henan.zhengzhou.20221120","中国河南郑州20221120天气数据");
area.put("china.hunan.changsha.20221129","中国湖南长沙20221129天气数据");
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
Iterator<Map.Entry<String,String>> itr = area.entrySet().iterator();
while(itr.hasNext()){
Map.Entry<String,String> me = itr.next();
//arg1: 交换机的名字,arg2: 作为消息的 key
channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_TOPIC, me.getKey(), null,me.getValue().getBytes());
}
channel.close();
connection.close();
}
}
package org.example.topic;
import com.rabbitmq.client.*;
import org.example.utils.RabbitConstant;
import org.example.utils.RabbitUtils;
import java.io.IOException;
public class BaiDu {
public static void main(String[] args) throws IOException {
//获取长连接
Connection connection = RabbitUtils.getConnection();
//获取虚拟连接
final Channel channel = connection.createChannel();
//声明队列信息
channel.queueDeclare(RabbitConstant.QUEUE_BAIDU,false,false,false,null);
//指定队列与交换机的关系以及rounting key 之间的关系
channel.queueBind(RabbitConstant.QUEUE_BAIDU,RabbitConstant.EXCHANGE_WEATHER_TOPIC,"#.20221120");
channel.queueBind(RabbitConstant.QUEUE_BAIDU,RabbitConstant.EXCHANGE_WEATHER_TOPIC,"china.hubei.#");
channel.basicQos(1);
channel.basicConsume(RabbitConstant.QUEUE_BAIDU,false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("百度天气收到气象信息: "+new String(body));
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
}
}
package org.example.topic;
import com.rabbitmq.client.*;
import org.example.utils.RabbitConstant;
import org.example.utils.RabbitUtils;
import java.io.IOException;
//消费者
public class Sina {
public static void main(String[] args) throws IOException {
//获取长连接
Connection connection = RabbitUtils.getConnection();
//获取虚拟连接
final Channel channel = connection.createChannel();
//声明队列信息
channel.queueDeclare(RabbitConstant.QUEUE_SINA,false,false,false,null);
//指定队列与交换机的关系以及rounting key 之间的关系
channel.queueBind(RabbitConstant.QUEUE_SINA,RabbitConstant.EXCHANGE_WEATHER_TOPIC,"*.hunan.changsha.20221128");
channel.queueBind(RabbitConstant.QUEUE_SINA,RabbitConstant.EXCHANGE_WEATHER_TOPIC,"us.*.*.*");
channel.basicQos(1);
channel.basicConsume(RabbitConstant.QUEUE_SINA,false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("新浪天气收到气象信息: "+new String(body));
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
}
}
confirm 确认到达 broker里,
return 表示被broker正常接收后,没有没有投递到对应的队列里面去
package org.example.confirm;
import com.rabbitmq.client.*;
import org.example.utils.RabbitConstant;
import org.example.utils.RabbitUtils;
import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
//生产者
/*
创建 direct 交换机
https://www.cnblogs.com/dwlovelife/p/10991371.html#%E5%A6%82%E4%BD%95%E7%90%86%E8%A7%A3
* */
public class WeatherBureau {
public static void main(String[] args) throws IOException, TimeoutException {
Map area = new LinkedHashMap<String,String>();
area.put("china.hunan.changsha.20221120","中国湖南长沙20221120天气数据");
area.put("china.hubei.wuhan.20221120","中国湖北武汉20221120天气数据");
area.put("china.hunan.changsha.20221128","中国湖南长沙20221128天气数据");
area.put("us.cal.lsj.20221120","美国加州洛杉矶20221120天气数据");
area.put("china.hebei.shijiazhuang.20221120","中国河北石家庄20221120天气数据");
area.put("china.henan.zhengzhou.20221120","中国河南郑州20221120天气数据");
area.put("china.hunan.changsha.20221129","中国湖南长沙20221129天气数据");
area.put("china.hunan.changsha.20221129","中国湖南长沙20221129天气数据");
area.put("cn","中国湖南长沙20221129天气数据");
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
//开启confirm 监听模式
channel.confirmSelect();
//到达到broker中被确认
channel.addConfirmListener(new ConfirmListener() {
public void handleAck(long l, boolean b) throws IOException {
// 第二个参数代表接收的数据是否为批量接收,一般我们用不到
System.out.println("消息已被Broker接收,Tag: "+l);
}
public void handleNack(long l, boolean b) throws IOException {
System.out.println("消息已被Broker接收,Tag: "+l);
}
});
//到达broker被确认后,但是找不到对应的队列投递
channel.addReturnListener(new ReturnCallback() {
public void handle(Return aReturn) {
System.err.println("============================");
System.err.println("Return 编码: "+ aReturn.getReplyCode() + "-Return 描述"+ aReturn.getReplyText());
System.err.println("交换机: "+ aReturn.getExchange()+"-路由key: "+ aReturn.getRoutingKey());
System.err.println("Return 主题: "+new String(aReturn.getBody()));
System.err.println("==============================");
}
});
Iterator<Map.Entry<String,String>> itr = area.entrySet().iterator();
while(itr.hasNext()){
Map.Entry<String,String> me = itr.next();
//arg1: 交换机的名字,arg2: 作为消息的routing key,arg3:如果exchange在将消息route到queue(s)时发现对应的queue上没有消费者,那么这条消息不会放入队列中
channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_TOPIC, me.getKey(), true,null,me.getValue().getBytes());
}
//这里不能关掉
// channel.close();
//这里不能关掉
// connection.close();
}
}
https://github.com/ranhaoliu/rabbitmq-demo
生产者:
<!--===============================TTL 开始======================= -->
<rabbit:queue name="test_queue_ttl" id="test_queue_ttl">
<rabbit:queue-arguments>
<entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"></entry>
</rabbit:queue-arguments>
</rabbit:queue>
<rabbit:topic-exchange name="test_exchange_ttl">
<rabbit:bindings>
<rabbit:binding pattern="ttl.#" queue="test_queue_ttl"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
@Test
public void testTtl(){
for(int i=0;i<10;i++){
rabbitTemplate.convertAndSend("test_exchange_ttl","ttl.baiqi","message ttl ...");
}
}
10秒后会消失
死信队列
生产者:
<!--
死信队列:
1.声明正常得队列(test_queue_dlx)和交换机(test_exchange_dlx)
2.声明死信队列(queue_dlx)和死信交换机(exchange_dlx)
3.正常队列绑定死信交换机
设置两个参数:
* x-dead-letter-exchange: 死信交换机名称
* x-dead-letter-routing-key: 发送给死信交换机的 routingkey
-->
<!-- 1.1 声明正常得队列(test_queue_dlx)和交换机(test_exchange_dlx) -->
<rabbit:queue name="test_queue_dlx" id="test_queue_dlx">
<rabbit:queue-arguments>
<!--3.1: x-dead-letter-exchange 死信交换机名称; value 的值是 2.2 中的声明的死信交换机名称 -->
<entry key="x-dead-letter-exchange" value="exchange_dlx"></entry>
<!--3.2: x-dead-letter-routing-key: 发送给死信交换机的routingkey-->
<entry key="x-dead-letter-routing-key" value="dlx.hehe"></entry>
<!--4.1 设置队列的过期时间 ttl-->
<entry key="x-message-ttl" value="1000" value-type="java.lang.Integer"></entry>
<!--4.2 设置队列的长度限制 max-length-->
<entry key="x-max-length" value="10" value-type="java.lang.Integer"></entry>
</rabbit:queue-arguments>
</rabbit:queue>
<!--1.2 声明正常的交换机 -->
<rabbit:topic-exchange name="test_exchange_dlx">
<rabbit:bindings>
<rabbit:binding pattern="test.dlx.#" queue="test_queue_dlx"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
<!--2.1 声明死信队列(queue_dlx) -->
<rabbit:queue name="queue_dlx" id="queue_dlx"></rabbit:queue>
<!--2.2 死信交换机(exchange_dlx)-->
<rabbit:topic-exchange name="exchange_dlx">
<rabbit:bindings>
<rabbit:binding pattern="dlx.#" queue="queue_dlx"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
<!--=====================死信队列结束=======================
/*
* 发送测试死信消息:
* 1.过期时间
* 2.长度限制
* 3.消息拒收
* */
@Test
public void testDlx(){
rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.hehe","死信消息...");
}
一些细节:
生产者:
<!--=======================延迟队列=============================-->
<!--
延迟队列:
1.声明正常得队列(order_queue)和交换机(order_exchange)
2.声明死信队列(order_queue_dlx)和死信交换机(order_exchange_dlx)
3.绑定,设置正常队列过期时间为30分钟
正常队列绑定死信交换机
设置两个参数:
* x-dead-letter-exchange: 死信交换机名称
* x-dead-letter-routing-key: 发送给死信交换机的 routingkey
-->
<!-- 1.1 声明正常得队列(order_queue)和交换机(order_exchange) -->
<rabbit:queue name="order_queue" id="order_queue">
<rabbit:queue-arguments>
<!--3.绑定,设置正常队列过期时间为30分钟,此处设置10秒钟演示来用 -->
<!--3.1: x-dead-letter-exchange 死信交换机名称; value 的值是 2.2 中的声明的死信交换机名称 -->
<entry key="x-dead-letter-exchange" value="order_exchange_dlx"></entry>
<!--3.2: x-dead-letter-routing-key: 发送给死信交换机的routingkey-->
<entry key="x-dead-letter-routing-key" value="dlx.order.cannel"></entry>
<!--4.1 设置队列的过期时间 ttl-->
<entry key="x-message-ttl" value="1000" value-type="java.lang.Integer"></entry>
</rabbit:queue-arguments>
</rabbit:queue>
<!--1.2 声明正常的交换机 -->
<rabbit:topic-exchange name="order_exchange">
<rabbit:bindings>
<rabbit:binding pattern="order.#" queue="order_queue"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
<!--2.1 声明死信队列(order_queue_dlx) -->
<rabbit:queue name="order_queue_dlx" id="order_queue_dlx"></rabbit:queue>
<!--2.2 死信交换机(exchange_dlx)-->
<rabbit:topic-exchange name="order_exchange_dlx">
<rabbit:bindings>
<rabbit:binding pattern="dlx.order.#" queue="order_queue_dlx"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
消费者:
package org.example.rabbitmq.listener;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
@Component
public class OrderListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try{
//1.接收转化消息
System.out.println("message: "+new String(message.getBody()));
//2.进行业务处理
System.out.println("进行业务逻辑处理...");
System.out.println("根据订单id查询其状态...");
System.out.println("判断状态是否为支付成功...");
System.out.println("取消订单,回滚库存..");
//3.手动签收
channel.basicAck(deliveryTag,true);
}catch (Exception e){
//拒绝签收
/*第三个参数:requeue:重回队列。如果设置为true,则消息从新回到queue,broker会重新发送该消息给消费端,
如果为false则拒绝签收
* */
channel.basicNack(deliveryTag,true,true);
// channel.basicNack(deliveryTag,true,false);
}
}
@Override
public void onMessage(Message message) {
}
@Override
public void containerAckMode(AcknowledgeMode mode) {
}
}
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="2">
<rabbit:listener ref="orderListener" queue-names="order_queue_dlx"></rabbit:listener>
</rabbit:listener-container>
先运行消费者,再运行生产者测试代码发现 10秒钟后,消费者开始消费
== rabbitmq 集群:==
https://cloud.tencent.com/developer/article/1631148
https://www.leeks.info/zh_CN/latest/Linux_Notes/rabbitmq/RabbitMQ.html#id2
rabbitmq高可用集群搭建踩坑
systemctl start rabbitmq-server.service
1.环境准备(需重启客户端)
hostnamectl set-hostname m1
hostnamectl set-hostname m2
2.统一 erlang.cookie 文件中 cookie值 将m1 中的 .erlang.cookie 同步到 m2 中(在m1机器操作)
scp /var/lib/rabbitmq/.erlang.cookie m2:/var/lib/rabbitmq/.erlang.cookie
或者 scp /var/lib/rabbitmq/.erlang.cookie m2的ip:/var/lib/rabbitmq/.erlang.cookie
3.Rabbitmq 集群添加节点(在m2机器上操作)
#重启 m2机器中 rabbitmq 的服务
rabbitmqctl stop_app
rabbitmqctl join_cluster --ram rabbit@m1 (m1上操作)
rabbitmqctl start_app
#启用管理界面
rabbitmq-plugins enable rabbitmq_management
systemctl restart rabbitmq-server.service
#4, 查看集群信息
rabbitmqctl cluster_status
可能出现的问题:
unable to connect to epmd (port 4369) on m1: nxdomain (non-existing domain)
最总解决: rabbitmq高可用集群搭建踩坑
1.5.1
#1.安装
yum install haproxy
#2.配置haproxy.cfg 文件 具体参考 如下 1.5.2 配置HAProxy
vim /etc/haproxy/haproxy.cfg
#3.启动 haproxy
systemctl start haproxy
#4.查看haproxy 进程状态
systemctl status haproxy.service
#状态如下说明 已经启动成功 Active: active(running)
#访问如下地址对mq 结点进行监控
http://服务器Ip:1080/haproxy_stats
#代码中访问mq的地址则变为haproxy的地址:5672
1.5.2
#对mq集群进行监听
listen rabbitmq_cluster
bind 0.0.0.0:5672
option tcplog
mode tcp
option clitcpka
timeout connect 1s
timeout client 10s
timeout server 10s
balance roundrobin
server node1 192.168.1.9:5672 check inter 5s rise 2 fall 3
server node2 192.168.1.10:5672 check inter 5s rise 2 fall 3
#开启haproxy 监控服务
listen http_front
bind 0.0.0.0:1090
stats refresh 30s
stats uri /haproxy_stats
stats auth admin:admin
效果