异步消息原理

news2025/1/24 5:11:31
作者简介:大家好,我是smart哥,前中兴通讯、美团架构师,现某互联网公司CTO

联系qq:184480602,加我进群,大家一起学习,一起进步,一起对抗互联网寒冬

在日常开发中,偶尔需要在主业务逻辑之外做一些附加操作,比如下单成功后通知商家、课程报名成功后通知老师、简历投递成功后通知HR。一般来讲,这些业务是不适合放在主线程中的:

@Slf4j
@SpringBootTest
public class AsyncNotifyTest {

    @Test
    public void testAsyncNotify() throws InterruptedException {

        long start = System.currentTimeMillis();

        // 投递简历,插入投递记录
        TimeUnit.SECONDS.sleep(2);
        log.info("插入投递记录完毕...");

        // 发送短信通知HR,并留存发送记录
        notifyHR("mx", "叉车师傅");
        writeMsg("mx", "叉车师傅");

        log.info("耗时:{}毫秒", System.currentTimeMillis() - start);
    }

    public void notifyHR(String username, String jobName) throws InterruptedException {
        TimeUnit.SECONDS.sleep(1);
        log.info("【发送消息】HR你好,用户:{}, 投递你的岗位:{}", username, jobName);
    }

    public void writeMsg(String username, String jobName) {
        // 留存消息发送记录
        log.info("【保存消息】保存到数据库, 用户:{}, 岗位:{}", username, jobName);
    }

}

结果

[main] INFO - 插入投递记录完毕...

[main] INFO - 【发送消息】HR你好,用户:mx, 投递你的岗位:叉车师傅

[main] INFO - 【保存消息】保存到数据库, 用户:mx, 岗位:叉车师傅

[main] INFO - com.bravo.happy.AsyncNotifyTest - 耗时:3019毫秒

消息通知等附加操作为什么不适合放在主流程呢?

  • 首先,消息通知相对没那么重要,即使发送失败了,一般还有发送记录,重新发送或者只要能追溯即可
  • 其次,在主流程中加入消息通知会减慢响应速度
  • 最后,万一消息发送失败,还可能导致事务回滚,但系统本身其实是没有问题的

多线程异步消息

一个解决办法是使用多线程,把消息发送的逻辑单独放在一个异步线程中执行,主流程处理完毕直接返回即可。为了尽可能简单,这里就不配置线程池或使用@Async了,换CompletableFuture做演示:

@Slf4j
@SpringBootTest
public class AsyncNotifyTest {

    @Test
    public void testAsyncNotify() throws InterruptedException {

        long start = System.currentTimeMillis();

        // 投递简历,插入投递记录
        TimeUnit.SECONDS.sleep(2);
        log.info("插入投递记录完毕...");

        // 异步发送短信通知HR,并留存发送记录
        CompletableFuture.runAsync(() -> {
            try {
                notifyHR("bravo1988", "叉车师傅");
                writeMsg("bravo1988", "叉车师傅");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        log.info("耗时:{}毫秒", System.currentTimeMillis() - start);

        // 为了观察到异步线程里的打印信息,主线程sleep一会儿
        TimeUnit.SECONDS.sleep(2);
    }

    public void notifyHR(String username, String jobName) throws InterruptedException {
        TimeUnit.SECONDS.sleep(1);
        log.info("【发送消息】HR你好,用户:{}, 投递你的岗位:{}", username, jobName);
    }

    public void writeMsg(String username, String jobName) {
        // 留存消息发送记录
        log.info("【保存消息】保存到数据库, 用户:{}, 岗位:{}", username, jobName);
    }

}

结果

[main] INFO - 插入投递记录完毕...

[main] INFO - 耗时:2145毫秒

[ForkJoinPool.commonPool-worker-9] INFO - 【发送消息】HR你好,用户:mx, 投递你的岗位:叉车师傅

[ForkJoinPool.commonPool-worker-9] INFO - 【保存消息】保存到数据库, 用户:mx, 岗位:叉车师傅

可以看到,主线程耗时变成了2秒,如此一来用户整个投递简历的响应时间缩短了。

Spring事件监听机制

具体请参考Spring事件监听机制,本质上和多线程异步消息是一样的。

Redis实现消息队列

上面多线程版本的异步消息其实已经挺不错了,但小概率的情况下可能会出现消息丢失(虽然当前情境下无所谓):

  • 情况1:消息过多,线程数不够触发拒绝策略
  • 情况2:异步线程宕机了,消息丢失(类似于消费者挂了)

此时可以考虑使用Redis做一个简单的消息队列,数据类型可以选择List。

对于Redis的List,如果使用lpush+rpop即可实现先进先出的简单队列,而如果配合brpop则可实现阻塞队列。所谓的brpop,其实就是blocking right pop,即阻塞等待队列中的消息,一旦有消息被push进队列就从右边取出消费。

注意,rpop和brpop的区别是,rpop发现队列为空直接返回null,不会等待,也不能设置等待时间 :

lpush和brpop反映到Java代码里,可以使用RedisTemplate或StringRedisTemplate实现。

public interface RedisService {

    /**
     * 向队列插入消息
     *
     * @param queue 自定义队列名称
     * @param obj   要存入的消息
     */
    void pushQueue(String queue, Object obj);

    /**
     * 从队列取出消息
     *
     * @param queue    自定义队列名称
     * @param timeout  最长阻塞等待时间
     * @param timeUnit 时间单位
     * @return
     */
    Object popQueue(String queue, long timeout, TimeUnit timeUnit);
}
@Component
public class RedisServiceImpl implements RedisService {

    @Autowired
    private StringRedisTemplate redisTemplate;
    @Autowired
    private ObjectMapper objectMapper;

    @Override
    public void pushQueue(String queue, Object obj) {
        try {
            redisTemplate.opsForList().leftPush(queue, objectMapper.writeValueAsString(obj));
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
    }

    @Override
    public Object popQueue(String queue, long timeout, TimeUnit timeUnit) {
        return redisTemplate.opsForList().rightPop(queue, timeout, timeUnit);
    }
}
Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class RedisQueueTest {

    @Autowired
    private RedisService redisService;

    public static final String ORDER_MESSAGE = "order_message";

    @Test
    public void testRedisBlockingQueue() throws InterruptedException {
        // 订单服务
        orderService("bravo1988", 10086L);

        // 启动消费者,取出消息,逐一发送
        new Thread(this::consumeMsg).start();

        // 10秒后再发一条消息,模拟第二次下单
        TimeUnit.SECONDS.sleep(10);
        orderService("bravo2020", 99999L);

        // 等待一会儿,观察第二条消息
        TimeUnit.SECONDS.sleep(10);
    }

    public void orderService(String username, Long orderId) {
        // 1.操作数据库,插入订单

        // 2.其他操作

        // 3.发送消息
        redisService.pushQueue(ORDER_MESSAGE, new Order(username, orderId));
    }

    public void consumeMsg() {
        for (; ; ) {
            Object order = redisService.popQueue(ORDER_MESSAGE, 5, TimeUnit.SECONDS);
            log.info("每隔5秒循环获取,期间for循环阻塞");
            if (order != null) {
                log.info("order:{}", order.toString());
            }
        }
    }

    @Data
    @AllArgsConstructor
    static class Order {
        private String username;
        private Long resumeId;
    }

}

Redis实现消息队列的好处是,可以把消息存起来慢慢消费,而且项目挂了不影响已经存入的消息,重新启动后仍可继续消费:

可能有同学不禁要问:如果消息还没发送到队列中就丢失了呢?发送方也无法感知(没有应答机制),所以Redis作为消息队列还是存在很多问题的。

那为什么要在这一章节安排Redis实现消息队列呢?

首先,就我个人的感受而言,入行后有很长一段时间我都对Redis很抵触、很畏惧,导致自己一直停滞不前,其实Redis并没有我们想的那么难,只要你敢动手去敲,就会迅速熟悉起来(其他技术也是如此)。

其次,实际开发中一些小项目还是有人会用,主要是从系统复杂性考虑,不轻易引入MQ,所以仍有学习的必要。尤其是对于一些消息通知,丢了就丢了,影响不是特别大,而订单操作就不适合用Redis这么简陋的消息队列了。

上面的demo仅仅用作学习,大家有兴趣可以自行拓展,Redis还提供了消息的发布/订阅模式:

用过MQ的同学会觉得上面的模式很熟悉!

小坑

我在Redis实现消息通知的代码中,留了一个小坑,是大家平时可能会不小心犯的。

提示:和SpringBoot定时任务故事中我同事遇到的类似的坑。

在执行上面的代码时可能遇到:

nested exception is io.lettuce.core.RedisException: io.lettuce.core.RedisException: Connection closed

这是因为主线程结束导致Redis断开,而两个for循环还在操作队列。实际生产环境一般不会有问题,因为线程是一直跑着的。

作者简介:大家好,我是smart哥,前中兴通讯、美团架构师,现某互联网公司CTO

进群,大家一起学习,一起进步,一起对抗互联网寒冬

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1330109.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

什么是递归

概述 递归是一种解决问题的方法,它通过将一个问题分解为同样类型的子问题来解决问题。在递归中,函数会调用自身,并向下逐步解决问题,直到到达问题的基本情况。 递归的示例可以是计算一个数的阶乘。阶乘的定义是对于正整数n&…

React学习计划-React16--React基础(五)脚手架创建项目、todoList案例、配置代理、消息订阅与发布

一、使用脚手架create-react-app创建项目 react脚手架 xxx脚手架:用来帮助程序员快速创建一个基于xxx库的模板项目 包含了所有需要的配置(语法检查、jsx编译、devServe…)下载好了所有相关的依赖可以直接运行一个简单的效果 react提供了一个…

Flink快速部署集群,体验炸了!

📢📢📢📣📣📣 哈喽!大家好,我是【IT邦德】,江湖人称jeames007,10余年DBA及大数据工作经验 一位上进心十足的【大数据领域博主】!😜&am…

【Element】el-table 使用 el-table-infinite-scroll 插件实现滚动加载

虽然 el 官方提供了 Infinite Scroll 无限滚动 组件 但是却不支持 el-table 组件,这就很难受了,还好已经有大佬写好了插件,并且支持 element-plus/infinite-scroll 组件的所有选项。 el-table-infinite-scroll el-table-infinite-scroll 看…

Linux目录切换相关命令@cd/pwd

目录 基础指令 cd命令原型命令的搭配以及效果命令本身cd cd 指定目录 基础指令 pwd命令原型pwd 总结: 基础指令 cd cd 取自英文 Change Directory 的首字母组成。 英文的中文翻译为:更改目录。 很明显该指令是用来更改目录的。 命令原型 cd [Linux路径…

企业“数据入表”之政策及业务模式解读

2023年8月21日,财政部重磅发布了《企业数据资源相关会计处理暂行规定》(以下简称“暂行规定”),该规定将于2024年1月1日正式施行。 “暂行规定”发布后,引起全社会的广泛关注,关注的焦点集中在数据入表概念…

Unity中Shader旋转矩阵(四维旋转矩阵)

文章目录 前言一、围绕X轴旋转1、可以使用上篇文章中,同样的方法推导得出围绕X轴旋转的点阵。2、求M~rotate~ 二、围绕Y轴旋转1、可以使用上篇文章中,同样的方法推导得出围绕Y轴旋转的点阵。2、求M~rotate~ 三、围绕Z轴旋转1、可以使用上篇文章中&#x…

【数据结构之单链表】

数据结构学习笔记---003 数据结构之单链表1、什么是单链表?1.1、概念及结构 2、单链表接口的实现2.1、单链表的SList.h2.1.1、定义单链表的结点存储结构2.1.2、声明单链表各个接口的函数 2.2、单链表的SList.c2.2.1、遍历打印链表2.2.2、销毁单链表2.2.3、打印单链表元素2.2.4…

2023 Intellij IDEA的热部署配置

第一步&#xff1a;导入依赖 <!--热部署--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId></dependency>第二步&#xff1a;配置idea

计算机网络——计算机网络的概述(一)

前言&#xff1a; 面对马上的期末考试&#xff0c;也为了以后找工作&#xff0c;需要掌握更多的知识&#xff0c;而且我们现实生活中也已经离不开计算机&#xff0c;更离不开计算机网络&#xff0c;今天开始我们就对计算机网络的知识进行一个简单的学习与记录。 目录 一、什么…

0.618算法和基于Armijo准则的线搜索回退法

0.618代码如下&#xff1a; import math # 定义函数h(t) t^3 - 2t 1 def h(t): return t**3 - 2*t 1 # 0.618算法 def golden_section_search(a, b, epsilon): ratio 0.618 while (b - a) > epsilon: x1 b - ratio * (b - a) x2 a ratio * (b - a) h_…

Python实现广义最小二乘法线性回归模型(GLS算法)项目实战

说明&#xff1a;这是一个机器学习实战项目&#xff08;附带数据代码文档视频讲解&#xff09;&#xff0c;如需数据代码文档视频讲解可以直接到文章最后获取。 1.项目背景 广义最小二乘法&#xff08;Generalized Least Squares&#xff09;是一种回归分析方法&#xff0c;适…

msyql 24day 数据库主从 主从复制 读写分离 master slave 有数据如何增加

目录 环境介绍读写分离纵向扩展横向扩展 数据库主从准备环境主库环境(master)从库配置(slave)状态分析重新配置问题分析 报错解决从库验证 有数据的情况下 去做主从清理环境环境准备数据库中的锁的机制主库配置从库配置最后给主库解锁常见错误 环境介绍 将一个数据库的数据 复…

数据库制作简易工人信息表

代码实现: #include <head.h>int add_msg(sqlite3* ppdb) {int id;char name[20];int age;double salary;//输入要插入的工人信息printf("请输入工号:");scanf("%d",&id);printf("请输入姓名:");scanf("%s",name);printf(&…

idea 注入mapper报错报红的几种解决方案

文章目录 前言方法1&#xff1a;为 Autowired 注解设置required false方法2&#xff1a;用 Resource 替换 Autowired方法3&#xff1a;在Mapper接口上加上Repository注解方法4&#xff1a;用Lombok方法5&#xff1a;把IDEA的警告关闭掉方法6&#xff1a;不用管他 前言 相信大…

2015年第四届数学建模国际赛小美赛A题飞机上的细长座椅解题全过程文档及程序

2015年第四届数学建模国际赛小美赛 A题 飞机上的细长座椅 原题再现&#xff1a; 航空公司座位是指在旅途中乘客可以乘坐的座位。一些航空公司现在推出了新的经济舱“超薄”座位。这些座椅除了重量较轻外&#xff0c;理论上还允许航空公司在不显著影响乘客舒适度的情况下增加运…

Zookeeper的学习笔记

Zookeeper概念 Zookeeper是一个树形目录服务&#xff0c;简称zk。 Zookeeper是一个分布式的、开源的分布式应用程序的协调服务 Zookeeper提供主要的功能包括&#xff1a;配置管理&#xff0c;分布式锁&#xff0c;集群管理 Zookeeper命令操作 zk数据模型 zk中的每一个节点…

初识大数据,一文掌握大数据必备知识文集(3)

&#x1f3c6;作者简介&#xff0c;普修罗双战士&#xff0c;一直追求不断学习和成长&#xff0c;在技术的道路上持续探索和实践。 &#x1f3c6;多年互联网行业从业经验&#xff0c;历任核心研发工程师&#xff0c;项目技术负责人。 &#x1f389;欢迎 &#x1f44d;点赞✍评论…

SpringMVC基础知识(持续更新中~)

笔记&#xff1a; https://gitee.com/zhengguangqq/ssm-md/blob/master/ssm%20md%E6%A0%BC%E5%BC%8F%E7%AC%94%E8%AE%B0/%E4%B8%89%E3%80%81SpringMVC.md 细节补充&#xff1a; ​​​​​​​

你还不会排序算法吗

>欢迎关注博主 Mindtechnist 或加入【智能科技社区】一起学习和分享Linux、C、C、Python、Matlab&#xff0c;机器人运动控制、多机器人协作&#xff0c;智能优化算法&#xff0c;滤波估计、多传感器信息融合&#xff0c;机器学习&#xff0c;人工智能等相关领域的知识和技术…