支持多种推送方式的消息推送设计方案

news2025/1/24 11:48:51

文章目录

    • 1 摘要
    • 2 消息推送架构
    • 3 数据库设计脚本
      • 4 核心代码
      • 4.1 消息推送业务实现
      • 4.2 消息重试推送
    • 5 可优化思路
    • 6 Github 源码

1 摘要

通过邮件、短信等方式向用户发送通知是一项很常见的业务场景。如何设计一套好用、简洁消息推送架构?这是一个问题。本文将介绍一种支持多种推送方式的消息推送设计模型,该模型可以满足以下功能:

  • 保存推送消息本身
  • 一套消息文本支持多种类型消息推送
  • 消息推送记录占用空间小
  • 支持消息重试推送机制

2 消息推送架构

multi-type-message-push-v1.0

3 数据库设计脚本

./doc/sql/multi-push-type-demo.sql
drop table if exists message_push_result;

drop table if exists user_message;

drop table if exists user_push_type;

/*==============================================================*/
/* Table: message_push_result                                   */
/*==============================================================*/
create table message_push_result
(
   id                   bigint unsigned not null comment 'id',
   message_id           bigint unsigned comment '消息id',
   push_type            tinyint comment '推送方式,1-短信;2-邮件;3-app;4-wechat',
   push_result          tinyint comment '推送结果,0-失败,1-成功,2-未推送',
   push_record          varchar(32) comment '推送记录值,部分推送方式可根据记录值查询实际推送结果',
   retry_time           tinyint default 0 comment '消息发送失败重试次数',
   create_time          datetime default current_timestamp comment '创建时间',
   update_time          datetime default current_timestamp on update current_timestamp comment '更新时间',
   primary key (id)
)
engine = innodb default
charset = utf8mb4;

alter table message_push_result comment '消息推送结果';

/*==============================================================*/
/* Table: user_message                                          */
/*==============================================================*/
create table user_message
(
   id                   bigint not null comment 'id',
   user_id              bigint comment '用户信息',
   push_count           tinyint not null default 0 comment '实际消息推送次数',
   push_total           tinyint not null default 0 comment '总共消息所需推送次数',
   message_type         tinyint comment '消息类型;1-登录通知;2-费用通知;3-服务器报警',
   title                varchar(64) comment '消息标题',
   content              varchar(256) comment '消息内容',
   create_time          datetime default current_timestamp comment '创建时间',
   update_time          datetime default current_timestamp on update current_timestamp comment '更新时间',
   primary key (id)
)
engine = innodb default
charset = utf8;

alter table user_message comment '用户消息';

/*==============================================================*/
/* Table: user_push_type                                        */
/*==============================================================*/
create table user_push_type
(
   id                   bigint not null comment 'id',
   user_id              bigint comment '用户id',
   push_type            tinyint comment '推送方式,1-短信;2-邮件;3-app;4-wechat',
   receive_address      varchar(128) comment '通知推送接收地址',
   enable               tinyint comment '是否启用,0-未启用,1-启用',
   create_time          datetime default current_timestamp comment '创建时间',
   update_time          datetime default current_timestamp on update current_timestamp comment '更新时间',
   primary key (id)
)
engine = innodb default
charset = utf8mb4;

alter table user_push_type comment '用户消息推送方式';

4 核心代码

用户推送方式、用户消息的基本 CRUD 这里就不做具体展示,有需要的可以查看 Github 源码。

这里重点看消息推送以及重试模块的代码逻辑

4.1 消息推送业务实现

消息推送功能请求参数:

demo-knife4j-openapi3/src/main/java/com/ljq/demo/springboot/knife4j/model/param/messagepush/MessagePushParam.java
package com.ljq.demo.springboot.knife4j.model.param.messagepush;

import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;

import java.io.Serializable;

/**
 * @Description: 消息推送请求参数
 * @Author: junqiang.lu
 * @Date: 2023/8/16
 */
@Data
@Schema(description = "消息推送请求参数")
public class MessagePushParam implements Serializable {

    private static final long serialVersionUID = -9163471284052059262L;

    @Schema(description = "消息id", requiredMode = Schema.RequiredMode.REQUIRED)
    private Long messageId;

}

推送功能业务实现:

demo-knife4j-openapi3/src/main/java/com/ljq/demo/springboot/knife4j/service/impl/UserMessageServiceImpl.java
	/**
	 * 推送消息
	 *
	 * @param pushParam
	 * @return
	 */
	@Override
	public ApiResult push(MessagePushParam pushParam) {
		// 参数校验
		UserMessageEntity userMessageDb = super.getById(pushParam.getMessageId());
		// 校验消息是否存在
		if (Objects.isNull(userMessageDb)) {
			return ApiResult.fail(ApiMsgEnum.USER_MESSAGE_NOT_EXIST);
		}
		// 校验消息是否重复推送
		if (userMessageDb.getPushTotal() > 0) {
			return ApiResult.fail(ApiMsgEnum.USER_MESSAGE_PUSH_REPEAT);
		}

		// 查询用户支持的推送方式
		List<UserPushTypeEntity> userPushTypeDbList = userPushTypeService.list(Wrappers
				.lambdaQuery(UserPushTypeEntity.class)
				.eq(UserPushTypeEntity::getUserId, userMessageDb.getUserId()));
		// 设置推送总次数
		userMessageDb.setPushCount(0);
		userMessageDb.setPushTotal(userPushTypeDbList.size());
		super.updateById(userMessageDb);
		// TODO 升级思路: 异步推送

		// 根据支持类型推送
		for (UserPushTypeEntity pushTypeEntity: userPushTypeDbList) {
			switch (pushTypeEntity.getPushType()) {
				case MessagePushConst.USER_PUSH_TYPE_SMS:
					// 短信推送
					messagePushBySms(userMessageDb, pushTypeEntity.getReceiveAddress());
					continue;
				case MessagePushConst.USER_PUSH_TYPE_EMAIL:
					// 邮件推送
					messagePushByEmail(userMessageDb, pushTypeEntity.getReceiveAddress());
					continue;
				case MessagePushConst.USER_PUSH_TYPE_APP:
					// APP推送
					messagePushByApp(userMessageDb, pushTypeEntity.getReceiveAddress());
					continue;
				case MessagePushConst.USER_PUSH_TYPE_WECHAT:
					// 微信推送
					messagePushByWechat(userMessageDb, pushTypeEntity.getReceiveAddress());
					continue;
				default:
					log.warn("user push type is invalid. pushType:{}", pushTypeEntity.getPushType());
			}
		}

		return ApiResult.success();
	}

短信推送方法:

	/**
	 * 短信推送消息
	 *
	 * @param userMessage
	 * @param receiveAddress
	 */
	private void messagePushBySms(UserMessageEntity userMessage, String receiveAddress) {
		log.info("message pushed by sms. messageId:{},receiveAddress:{},messageTitle:{}",
				userMessage.getId(), receiveAddress, userMessage.getTitle());
		// 查询推送结果
		LambdaQueryWrapper<MessagePushResultEntity> wrapper = Wrappers.lambdaQuery(MessagePushResultEntity.class)
				.eq(MessagePushResultEntity::getMessageId, userMessage.getId())
				.eq(MessagePushResultEntity::getPushType, MessagePushConst.USER_PUSH_TYPE_SMS);
		MessagePushResultEntity pushResultDb = pushResultService.getOne(wrapper);
		// 预先创建推送结果
		MessagePushResultEntity pushResult = new MessagePushResultEntity();
		pushResult.setMessageId(userMessage.getId());
		pushResult.setPushType(MessagePushConst.USER_PUSH_TYPE_SMS);
		try {
			// 模拟消息发送
			Thread.sleep(100L);

			// 更新推送结果
			pushResult.setPushResult(MessagePushConst.MESSAGE_SEND_SUCCESS);
			if (Objects.isNull(pushResultDb)) {
				// 初次推送
				pushResult.setRetryTime(0);
				pushResultService.save(pushResult);
			} else {
				// 重试推送
				pushResultService.update(pushResult, wrapper);
			}
			// 更新推送次数
			userMessage.setPushCount(userMessage.getPushCount() + 1);
			super.updateById(userMessage);
		} catch (Exception e) {
			log.error("sms message push error", e);
			// 设置重试次数
			pushResult.setPushResult(MessagePushConst.MESSAGE_SEND_FAIL);
			if (Objects.isNull(pushResultDb)) {
				// 初次推送
				pushResult.setRetryTime(1);
				pushResultService.save(pushResult);
			} else {
				// 重试推送
				pushResult.setRetryTime(pushResultDb.getRetryTime());
				pushResultService.update(pushResult, wrapper);
			}
		}
	}

该方法中适配了初次推送以及重试推送

其他邮件、APP、微信推送的结构与此一样,代码不再重复贴出

4.2 消息重试推送

重试机制这里使用的通过时定时任务扫描失败的消息,然后进行消息重推

查询推送失败的消息:

demo-knife4j-openapi3/src/main/resources/mybatis/UserMessageMapper.xml
    <!-- 查询未推送成功的消息 -->
    <select id="queryPageFailMessage" resultMap="userMessageMap" >
        SELECT
        <include refid="user_message_base_field" />
        FROM `user_message` m
        WHERE m.push_count &lt; m.push_total
            AND (DATEDIFF(NOW(),m.create_time) &lt; 1)
    </select>

消息重试方法:

demo-knife4j-openapi3/src/main/java/com/ljq/demo/springboot/knife4j/service/impl/UserMessageServiceImpl.java
	/**
	 * 重新推送消息
	 *
	 * @param userMessage
	 * @return
	 */
	@Override
	public void repush(UserMessageEntity userMessage) {
		// 查询用户支持的推送方式
		List<UserPushTypeEntity> userPushTypeDbList = userPushTypeService.list(Wrappers
				.lambdaQuery(UserPushTypeEntity.class)
				.eq(UserPushTypeEntity::getUserId, userMessage.getUserId()));
		Map<Integer, String> pushTypeMap = userPushTypeDbList.stream()
				.collect(Collectors.toMap(UserPushTypeEntity::getPushType, UserPushTypeEntity::getReceiveAddress));
		// 查询推送失败的结果
		List<MessagePushResultEntity> pushResultList = pushResultService.list(Wrappers
				.lambdaQuery(MessagePushResultEntity.class)
				.eq(MessagePushResultEntity::getMessageId, userMessage.getId())
				.eq(MessagePushResultEntity::getPushResult, MessagePushConst.MESSAGE_SEND_FAIL));
		// TODO 升级思路: 异步推送

		// 根据支持类型推送
		for (MessagePushResultEntity pushResult: pushResultList) {
			switch (pushResult.getPushType()) {
				case MessagePushConst.USER_PUSH_TYPE_SMS:
					// 短信推送
					messagePushBySms(userMessage, pushTypeMap.get(pushResult.getPushType()));
					continue;
				case MessagePushConst.USER_PUSH_TYPE_EMAIL:
					// 邮件推送
					messagePushByEmail(userMessage, pushTypeMap.get(pushResult.getPushType()));
					continue;
				case MessagePushConst.USER_PUSH_TYPE_APP:
					// APP推送
					messagePushByApp(userMessage, pushTypeMap.get(pushResult.getPushType()));
					continue;
				case MessagePushConst.USER_PUSH_TYPE_WECHAT:
					// 微信推送
					messagePushByWechat(userMessage, pushTypeMap.get(pushResult.getPushType()));
					continue;
				default:
					log.warn("user push type is invalid. pushType:{}", pushResult.getPushType());
			}
		}
	}

定时任务扫描:

demo-knife4j-openapi3/src/main/java/com/ljq/demo/springboot/knife4j/job/MessagePushSchedule.java
package com.ljq.demo.springboot.knife4j.job;

import cn.hutool.core.collection.CollUtil;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.ljq.demo.springboot.knife4j.model.BasePageParam;
import com.ljq.demo.springboot.knife4j.model.entity.UserMessageEntity;
import com.ljq.demo.springboot.knife4j.service.UserMessageService;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * @Description: 消息推送定时任务
 * @Author: junqiang.lu
 * @Date: 2023/8/17
 */
@Component
public class MessagePushSchedule {

    @Resource
    private UserMessageService userMessageService;


    /**
     * 消息重试
     * 300 秒(5分钟) 1 次
     */
    @Scheduled(fixedDelay = 300000L, initialDelay = 10000L)
    public void checkAndRepush() {
        // 统计所有当天发送失败的消息
        int pageSize = 1000;
        BasePageParam pageParam = new BasePageParam();
        pageParam.setCurrentPage(1);
        pageParam.setPageSize(pageSize);
        IPage<UserMessageEntity> pageResult = userMessageService.queryPageFailMessage(pageParam);
        if (pageResult.getTotal() < 1) {
            return;
        }
        long countAll = pageResult.getTotal();
        long times = countAll % pageSize == 0 ? countAll / pageSize : (countAll / pageSize) + 1;
        pageResult.getRecords().forEach(userMessage -> userMessageService.repush(userMessage));
        for (int i = 2; i < times + 1; i++) {
            pageParam.setCurrentPage(i);
            pageResult = userMessageService.queryPageFailMessage(pageParam);
            if (CollUtil.isEmpty(pageResult.getRecords())) {
                continue;
            }
            pageResult.getRecords().forEach(userMessage -> userMessageService.repush(userMessage));
        }
    }


}

5 可优化思路

以上代码主要为多类型消息推送实现的基本方案,如果要应对大流量,可以有以下升级优化思路:

  • 将要推送的消息放到消息队列中
  • 使用线程池做专门的消息推送处理
  • 将推送失败的消息放到延时队列中,不使用定期扫表的方式

6 Github 源码

Gtihub 源码地址 : https://github.com/Flying9001/springBootDemo/tree/master/demo-knife4j-openapi3

个人公众号:404Code,分享半个互联网人的技术与思考,感兴趣的可以关注.
404Code

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/894723.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

AI绘画小程序源码:创业的新机遇

随着AI绘画的兴起&#xff0c;人们对于这项技术的需求与日俱增。现有的小程序和AI绘图软件已经无法满足用户的画图需求&#xff0c;经常出现排队等待的情况&#xff0c;甚至有些人需要等上数小时&#xff0c;或者面对长达数万人的排队队伍。 因此&#xff0c;我们迫切需要新的、…

排污口水质在线监测,实时掌握排口水质助力生态治理

水是生命之源&#xff0c;良好的水生态环境是社会发展的必然要求。然而随着工业化和城市化的发展&#xff0c;人类面临空气和水环境污染等严峻挑战&#xff0c;其中水环境问题尤为突出。排污成为城市和工业生产过程中不可避免的环保问题。 为加快解决生态环境突出问题&#xff…

SparkSQL源码分析系列02-编译环境准备

本文主要描述一些阅读Spark源码环境的准备工作&#xff0c;会涉及到源码编译&#xff0c;插件安装等。 1. 克隆代码。 打开IDEA&#xff0c;在Git下的Clone中&#xff0c;输入 https://github.com/apache/spark&#xff0c;克隆代码到本地&#xff0c;CheckOut到目标版本Spar…

音视频FAQ(一):视频直播卡顿

一、摘要 本文介绍了视频直播卡顿的四个主要原因&#xff0c;用户网络问题、用户设备性能问题、技术路线的选择和实现问题。因本文主要阐述视频直播的卡顿&#xff0c;故技术路线的实现指的是&#xff1a;CDN供应商的实现问题&#xff0c;包含CDN性能不足、CDN地区覆盖不足。对…

修复由于找不到vcruntime140.dll,无法继续执行代码的问题

前段时间时间Office软件出现故障&#xff0c;但是由于一个项目没有做完&#xff0c;就没有卸载重新安装。刚抽时间卸载了Office2019&#xff0c;然后发现CAD、NX、Comsol等软件打开的时候报错&#xff1a;“由于找不到vcruntime140.dll 无法继续执行”。 于是我推测是刚才卸载…

数据集成中的关键挑战与策略:数据安全与隐私保护

在互联网的普及和信息技术的发展下&#xff0c;数据集成成为了现代社会中各个领域中不可或缺的任务。数据集成是指将来自不同数据源的数据整合到一个统一的数据集中&#xff0c;以便进行进一步的分析和利用。然而&#xff0c;由于数据的多样性和复杂性&#xff0c;数据集成过程…

git 开发环境配置

系统&#xff1a;Mac OS 1、下载git&#xff0c;官网已经推荐使用命令下载。 /bin/zsh -c "$(curl -fsSL https://gitee.com/cunkai/HomebrewCN/raw/master/Homebrew.sh) 2、验证git是否安装成功 git --version 3、配置本地git全局变量 git config --global user.n…

面试题-React(三):什么是JSX?它与常规JavaScript有什么不同?

在React的世界中&#xff0c;JSX是一项引人注目的技术&#xff0c;它允许开发者在JavaScript中嵌套类似HTML的标签&#xff0c;用于描述UI组件的结构。本篇博客将通过丰富的代码示例&#xff0c;深入探索JSX语法&#xff0c;解析其在React中的用法和优势。 一、JSX基础语法 在…

决策准则之赫维兹准则、萨维奇遗憾准则、拉普拉斯不充分理由准则、沃尔德准则

一、Hurwicz criteria(赫维兹准则) 赫维兹准则是一种决策准则&#xff0c;用于在不确定条件下进行决策。考虑决策者对不同结果的态度&#xff0c;通过调整“乐观度参数”(optimism parameter)来权衡最优和最坏结果的可能性。 “乐观度参数”在0到1之间取值&#xff1a; 当乐…

推特群推王:引爆您的产品

作为出海市场的营销平台&#xff0c;Twitter的流量不断攀升&#xff0c;已然成为跨境贸易企业的一部分。当前&#xff0c;Twitter已不再是一个简单的社交平台&#xff0c;而是一个强大的营销平台&#xff0c;使企业能够与受众实时互动。然而&#xff0c;与其他社交媒体一样&…

springboot数据库密码加密的配置方法_Java

前言 由于系统安全的考虑&#xff0c;配置文件中不能出现明文密码的问题&#xff0c;本文就给大家详细介绍下springboot配置数据库密码加密的方法&#xff0c;下面话不多说了&#xff0c;来一起看看详细的介绍吧 1.导入依赖 <!--数据库密码加密--> <dependency>&…

密码学学习笔记(十九):密码学关键术语的解释1

数据加密标准(DES) 数据加密标准是使用最广泛的加密体制&#xff0c;它于1977年被美国国家标准和技术研究所(NIST)采纳为联邦信息处理标准FIPS PUB 46。 DES3DESAES明文分组长度&#xff08;位&#xff09;6464128密文分组长度&#xff08;位&#xff09;6464128密钥长度&…

Spring-4-掌握Spring事务传播机制

今日目标 能够掌握Spring事务配置 Spring事务管理 1 Spring事务简介【重点】 1.1 Spring事务作用 事务作用&#xff1a;在数据层保障一系列的数据库操作同成功同失败 Spring事务作用&#xff1a;在数据层或业务层保障一系列的数据库操作同成功同失败 1.2 案例分析Spring…

Nginx前后端服务器部署

Nginx作为正反向代理的中转站&#xff0c;是连接前后端网络服务的媒介 Nginx下载&#xff1a;http://nginx.org/download/http://nginx.org/download/ 一、上传到服务器固定路径下并解压 上传到/opt/software/nginx-1.19.0.tar.gz cd /opt/software/ tar -zxvf nginx-1.19.0.…

C++学习一STL

文章目录 一、STL基本概念1.泛型程序设计2.STL中的基本的概念 二、容器概述1.简介2.顺序容器3.关联容器4.容器适配器5.成员函数 三、迭代器1.概念2.双向迭代器3.随机访问迭代器4.容器上的迭代器类别 四、算法1.概念2.不变序列算法2.变值算法4.删除算法5.变序算法6.排序算法7. 堆…

使用神卓互联内网穿透搭建远程访问公司ERP系统

神卓互联是一款企业级内网穿透软件&#xff0c;可以将内网中的服务映射到公网上&#xff0c;实现内网服务的访问。通过神卓互联&#xff0c;您可以远程访问ERP系统。在使用神卓互联进行内网穿透时&#xff0c;您只需要在生成的公网地址后面加上ERP系统的端口号&#xff0c;即可…

未来公文的智能化进程

随着技术的飞速发展&#xff0c;公文——这个有着悠久历史的官方沟通方式&#xff0c;也正逐步走向智能化的未来。自动化、人工智能、区块链...这些现代科技正重塑我们的公文制度&#xff0c;让其变得更加高效、安全和智慧。 1.语义理解与自动生成 通过深度学习和NLP&#xff…

爬虫代理一分钟请求数量升级

Hello&#xff0c;各位爬中高手&#xff01;你是否曾经遇到过爬虫代理一分钟请求数量过少的问题&#xff1f;别急&#xff0c;今天我来分享一些方法&#xff0c;让你的爬虫代理请求数量快速飙升&#xff01;这些技巧简单易行&#xff0c;让你的爬虫工作更加高效。 在进行爬虫工…

TikTok连续12个季度跻身全球下载量排行第一

据报道&#xff0c;美国数据公司SensorTower发布了《2023年第二季度全球移动应用下载报告》&#xff0c;数据统计了全球范围内以及各地区下载量最高的App&#xff0c;以及购物类App下载量最高的市场。数据显示&#xff0c;TikTok再次荣登全球下载量最高的应用程序榜首&#xff…

vue项目使用qrcodejs2遇到Cannot read property ‘appendChild‘ of null

这个问题是节点还没创建渲染完就读取节点&#xff0c;这个时候应该先让节点渲染完成在生成&#xff0c;解决方法有以下两种 1、使用$nextTick&#xff08;&#xff09;方法进行&#xff0c;这个方法是用来在节点创建渲染完成后进行的操作 that.$nextTick(() > {let qrcode …