20240803
- 一、 如何解决消息堆积问题?
- 一般认为单条队列消息差值>=10w时 算堆积问题
- 生产太快了
- 线程数量的设置
- 挤压问题
- 消费者消费出现问题
- 如果堆积的消息不想要了,可以直接跳过堆积
- 二、 信息丢失问题
- 为什么会丢失
- 解决思路1 记录下来
- 解决思路2 使用rocketmq中的轨迹
- 如何确保消息不丢失?(总结)
- 三、安全
- 四、时间复杂度
一、 如何解决消息堆积问题?
一般认为单条队列消息差值>=10w时 算堆积问题
一般认为单条队列消息差值>=10w时 算堆积问题
生产太快了
-
生产方可以做业务限流
-
增加消费者数量,但是消费者数量<=队列数量,适当的设置最大的消费线程数量(根据IO(2n)/CPU(n+1))
-
动态扩容队列数量,从而增加消费者数量
线程数量的设置
// 最大线程数写几个? 根据电脑来看,比如我现在是20个处理器,那么如果是IO密集型,那么我的线程数为2*n=40个
// 如果我是CPU密集型,那么我的线程数为n + 1 = 21个
/* public ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(
10,
Runtime.getRuntime().availableProcessors()*2, //这个是获取核心线程数
// 最大线程数写几个? 根据电脑来看,比如我现在是20个处理器,那么如果是IO密集型,那么我的线程数为2*n=40个
// 如果我是CPU密集型,那么我的线程数为n + 1 = 21个
);*/
因为我们的mq一般都是增删改查,所以线程数一般设置为40(IO密集型)
所以:
/**
* CLUSTERING 集群模式下,队列会被消费者分摊,队列数量>=消费者数量,消息的消费位点 mq服务器回记录处理,用的比较多
* BROADCASTING 广播模式下,队列会被消费者全部消费(每个消费者都处理一次),mq服务器不会记录消费位点,也不会重试。
*/
@Component
@RocketMQMessageListener(topic = "modeTopic",
consumerGroup = "mode-consumer-group-a",
messageModel = MessageModel.CLUSTERING ,//集群模式 负载均衡
consumeThreadNumber = 40 //线程数 因为我们的mq一般都是增删改查,所以线程数一般设置为40(IO密集型)
//这个值不能写太大,不然cpu会频繁的做切换,导致性能下降
)
public class DC1 implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("我是mode-consumer-group-a组的第一个消费者"+message);
}
}
挤压问题
生产者
//积压问题
@Test
void jydelTest() throws Exception{
for (int i=1;i<=10000;i++){
Thread.sleep(500L);
rocketMQTemplate.syncSend("jyTopic","我是第"+i+"个消息");
}
}
先不设置消费者,看可视化界面
首先是四个线程:
换到八线程
重平衡
停止程序,开始消费:(两份)
@Component
@RocketMQMessageListener(topic = "jyTopic",
consumerGroup = "jy-consumer-group",
consumeThreadNumber = 40,
consumeMode = ConsumeMode.CONCURRENTLY //并发消费
)
public class EJyListener1 implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("我是第一个消费者:"+message);
}
}
恢复成4线程
缩容,重分配:可以看到把新增的删除了
消费者消费出现问题
排查消费者程序的问题
如果堆积的消息不想要了,可以直接跳过堆积
二、 信息丢失问题
为什么会丢失
解决思路1 记录下来
解决思路2 使用rocketmq中的轨迹
- 可以开启mq的trace机制,消息跟踪机制。在broker.conf中开启消息追踪
关闭的时候,不要kill -9 因为有可能里面还有很多数据在执行,可以直接 kill 这样会等里面的线程数据走完,才会关闭进程
配置(在最下面加入traceTopicEnable=true),重启broke.conf
然后在yml中配置:
rocketmq:
name-server: 192.168.88.128:9876
producer:
group: boot-producer-group
enable-msg-trace: true #开启发送方的消息轨迹
生产者:
//消息跟踪
@Test
void tranceTest() throws Exception{
rocketMQTemplate.syncSend("traceTopic","我是消息");
}
开始消费
@Component
@RocketMQMessageListener(topic = "traceTopic",
consumerGroup = "trace-consumer-group",
consumeThreadNumber = 40,
consumeMode = ConsumeMode.CONCURRENTLY ,//并发消费
enableMsgTrace = true//开启消费者方轨迹,一般不开,开了会影响性能
)
public class GTranceListener implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("我是消费者:"+message);
}
}
如何确保消息不丢失?(总结)
1. 生产者使用同步发送模式 ,收到mq的返回确认以后 顺便往自己的数据库里面写
msgId status(0) time
2. 消费者消费以后 修改数据这条消息的状态 = 1
3. 写一个定时任务 间隔两天去查询数据 如果有status = 0 and time < day-2
4. 将mq的刷盘机制设置为同步刷盘
5. 使用集群模式 ,搞主备模式,将消息持久化在不同的硬件上
6. 可以开启mq的trace机制,消息跟踪机制
1.在broker.conf中开启消息追踪
traceTopicEnable=true
2.重启broker即可
3.生产者配置文件开启消息轨迹
enable-msg-trace: true
1. 消费者开启消息轨迹功能,可以给单独的某一个消费者开启
enableMsgTrace = true
在rocketmq的面板中可以查看消息轨迹
默认会将消息轨迹的数据存在 RMQ_SYS_TRACE_TOPIC 主题里面
三、安全
- 开启acl的控制 在broker.conf中开启aclEnable=true
- 配置账号密码 修改plain_acl.yml
- 修改控制面板的配置文件 放开52/53行 把49行改为true 上传到服务器的jar包平级目录下即可
四、时间复杂度
速度排序: 数组>Hash>B+tree