个人项目:社交支付项目(小老板)
作者:三哥,https://j3code.cn
项目文档:https://www.yuque.com/g/j3code/dvnbr5/collaborator/join?token=CFMcFNwMdhpp6u2s&source=book_collaborator#
预览地址(未开发完):http://admire.j3code.cn/small-boss
- 内网穿透部署,第一次访问比较慢
我相信,打开一个带有社交类型的网站,你或多或少都可以看到如下的界面:
1)消息提示
2)消息列表
这样
这样
那,这就是我们今天要聊的【消息中心】。
1、设计
老规矩先来搞清楚消息中心的需求,再来代码实现。
我们知道在社交类项目中,有很多评论、点赞等数据的产生,而如果这些数据的产生不能让用户感知到,那你们想想这会带来什么影响?
用户A:太鸡肋了,发布的内容被人评论点赞了,我居然看不到,下次不用了…
用户B:还好没用这个系统…
所以,看到这些结果我们是不是能够意识到一个健全的社交功能,是不是少不了这种通知用户的机制啊!而这种机制我就把他定义为【消息中心】功能。
再来拆分一下这四个字:消息中心
- 消息
- 中心
消息:这个可以是由我们自己定义,如:把帖子被用户评论当作一条消息,把评论被用户点赞也可以当作一条消息,甚至系统发布的通知也是一条消息。
中心:这个就是字面意思,将上面所提到的所有消息,归拢到一个地方进行展示。
上面我们也提到消息基本就是这两种:
- 用户对用户:用户消息
- 平台对用户:系统消息
针对用户消息,就类似这样,用户 A 给用户 B 的一条评论进行了点赞,那这个点赞动作就会产生一条消息,并且通知到用户 B 的一个存储消息的地方,这里通常就指用户的收件箱。这个收件箱就是专门用来存储用户发给用户的消息,而这个点对点的模式是不是就是推送模式啊!(A 推送消息给 B)
接着针对系统消息,就类似这样,平台管理人员发布了一条通知,告诉大家平台有啥 XXX 活动。那这个活动通知肯定是要让平台的所有用户都知道把,所以这个通知就要存在一个发件箱中。这个发件箱就是专门存储平台的通知,所有用户都来这个发件箱中读取消息就行,而这个一对多的模式是不是就是拉取模式啊!(所有用户都来拉取平台消息)
这样一来,我们根据不同的消息场景就抽出了一个基本的消息推拉模型,模型图如下:
推:
拉:
针对这两种模式,不知道大家有没有看出区别,好像乍一看没啥区别,都是发消息,读消息,对吧!
没错,确实都是一个发,一个读,但是两者的读写频率确实有着巨大的差异。先来看推模型,一个普通用户发表了一条帖子,然后获得了寥寥无几的评论和赞,这好似也没啥特别之处,对吧!那如果这个普通用户发表的帖子成为了热门帖子呢,也即该贴子获得了上万的评论和赞。那,你们想想是不是发消息的频率非常高,而该普通用户肯定是不可能一下子读取这么多消息的,所以是不是一个写多读少的场景。再来看看拉模型,如果你的平台用户人数寥寥无几,那倒没啥特别之处,但如果用户人数几万甚至几十万。那,每个用户都过来拉取系统消息是不是就是一个读频率非常高,而发消息频率非常低(系统消息肯定不会发的很快),所以这是不是一个读多写少的场景。
1.1 推:写多读少
针对这个模式,我们肯定是要将写这个动作交给性能更高的中间件来处理,而不是 MySQL,所以此时我们的 RocketMQ 就出来了。
当系统中产生了评论、点赞类的高频消息,那就无脑的丢给 MQ 吧,让其在消息中间件中呆会,等待消费者慢慢的将消息进行消费并发到各个用户的收件箱中,就类似下面这张图的流程:
2.2 拉:读多写少
那对于这个模式,所实话,我觉得不用引入啥就可以实现,因为对于读多的话无非就是一个查,MySQL 肯定是能搞定的,即使你的用户几万、几十万都是 ok 的。
但咱们是不是可以这样想一下,一个系统的官方通知肯定是不多的,或者说几天或者几个星期一次,且一旦发送就不可更改。那是不是可以考虑缓存,让用户读取官方通知的时候走缓存,如果缓存没有再走 MySQL 这样应该是可以提高查询效率,提高响应速度。
具体流程如下图:
2.3 表结构设计
基本的业务流程已经分析的差不多了,现在可以把表字段抽一下了,先根据上面分析的,看看我们需要那些表:
- 用户收件箱表
- 系统发件箱表
看似好像就这两张表,但是应该还有第三张表:
- 用户读取系统消息记录表
我们看到页面是不是每次有一条新的消息都会有一个小标点记录新消息数量,而第三张表就是为了这个作用而设计的。
具体原理如下:
- 首先运营人员发布的消息都是存储在第二张表中,这肯定是没错的
- 那用户每次过来拉取系统消息时,将最近拉取的一条消息写入到第三种表中
- 这样等用户下次再来拉取的时候,就可以根据第三张表的读取记录,来确定他有几条系统消息未查看了
可能有人会发出疑问:那用户的收件箱为啥不出一个用户读取记录表呢!
这个很简单,因为收件箱中的数据已经表示这个用户需要都这些个消息了,只是不知道那些是已读的那些是未读的,我们只需要再收件箱表中加一个字段,这个字段的作用就是记录最新一次读取的消息 ID 就行,等下次要读消息时,找到上传读取读取消息的记录ID,往后读新消息即可。
好,现在来看看具体的表字段:
1)用户收件箱表(sb_user_inbox)
- id
- 消息数据唯一 id:MQ唯一消息凭证
- 消息类型:评论消息或者点赞消息
- 帖子id:业务id
- 业务数据id:业务id
- 内容:消息内容
- 业务数据类型:业务数据类型(商品评论、帖子、帖子一级评论、帖子二级评论)
- 发起方的用户ID:用户 A 对用户 B 进行点赞,那这就是用户 A 的ID
- 接收方的用户ID:用户 B 的 ID
- 用户最新读取位置ID:用户最近一次读取记录的 ID
SQL
CREATE TABLE `sb_user_inbox` (
`id` bigint(20) NOT NULL,
`uuid` varchar(128) COLLATE utf8mb4_german2_ci NOT NULL COMMENT '消息数据唯一id',
`message_type` tinyint(1) NOT NULL COMMENT '消息类型',
`post_id` bigint(20) DEFAULT NULL COMMENT '帖子id',
`item_id` bigint(20) NOT NULL COMMENT '业务数据id',
`content` varchar(1000) COLLATE utf8mb4_german2_ci DEFAULT NULL COMMENT '内容',
`service_message_type` tinyint(1) NOT NULL COMMENT '业务数据类型',
`from_user_id` bigint(20) NOT NULL COMMENT '发起方的用户ID',
`to_user_id` bigint(20) NOT NULL COMMENT '接收方的用户ID',
`read_position_id` bigint(20) DEFAULT '0' COMMENT '用户最新读取位置ID',
`create_time` datetime DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `un01` (`uuid`),
UNIQUE KEY `un02` (`item_id`,`service_message_type`,`to_user_id`),
KEY `key` (`to_user_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_german2_ci
可以看到,我加了很多业务相关的字段,这个主要是为了方便查询数据和展示数据。
2)系统发件箱表(sb_sys_outbox)
- id
- 内容
SQL
CREATE TABLE `sb_sys_outbox` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`content` varchar(2000) COLLATE utf8mb4_german2_ci NOT NULL COMMENT '内容',
`create_time` datetime DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_german2_ci
这个表就非常简单了,没啥业务字段冗余。
3)用户读取系统消息记录表(sb_user_read_sys_outbox)
- id
- 系统收件箱数据读取id
- 读取的用户id
SQL
CREATE TABLE `sb_user_read_sys_outbox` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`sys_outbox_id` bigint(20) NOT NULL COMMENT '系统收件箱数据读取id',
`user_id` bigint(20) NOT NULL COMMENT '读取的用户id',
PRIMARY KEY (`id`),
UNIQUE KEY `un` (`user_id`),
KEY `key` (`user_id`)
) ENGINE=InnoDB AUTO_INCREMENT=17 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_german2_ci
ok,这是消息中心所有分析阶段了,下面就开始实操。
2、实现
先来引入引入一下 RocketMQ 的依赖
<!--rocketmq-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.1</version>
</dependency>
RocketMQ 的双主双从同步刷新集群搭建教程:https://blog.csdn.net/qq_40399646/article/details/128168466
MQ 配置:
2.1 生产者
先来实现生产者如何发送消息。
1)消息体对象:LikeAndCommentMessageDTO
位置:cn.j3code.config.dto.mq
@Data
public class LikeAndCommentMessageDTO {
/**
* 该消息的唯一id
* 业务方可以不设置,如果为空,代码会自动填充
*/
private String uuid;
/**
* 消息类型
*/
private UserCenterMessageTypeEnum messageType;
/**
* 冗余一个帖子id进来
*/
private Long postId;
/**
* 业务数据id
*/
private Long itemId;
/**
* 如果是评论消息,这个内容就是评论的内容
*/
private String content;
/**
* 业务数据类型
*/
private UserCenterServiceMessageTypeEnum serviceMessageType;
/**
* 发起方的用户ID
*/
private Long fromUserId;
/**
* 接收方的用户ID
*/
private Long toUserId;
/*
例子:
用户 A 发表了一个帖子,B 对这个帖子进行了点赞,那这个实体如下:
messageType = UserCenterMessageTypeEnum.LIKE
itemId = 帖子ID(对评论进行点赞,就是评论id,对评论进行回复,就是刚刚评论的id)
serviceMessageType = UserCenterServiceMessageTypeEnum.POST(这个就是说明 itemId 的 ID 是归于那个业务的,方便后续查询业务数据)
fromUserId = 用户B的ID
toUserId = 用户 A 的ID
*/
}
2)发送消息代码
位置:cn.j3code.community.mq.producer
@Slf4j
@Component
@AllArgsConstructor
public class LikeAndCommentMessageProducer {
private final RocketMQTemplate rocketMQTemplate;
/**
* 单个消息发送
*
* @param dto
*/
public void send(LikeAndCommentMessageDTO dto) {
if (Objects.isNull(dto.getUuid())) {
dto.setUuid(IdUtil.simpleUUID());
}
checkMessageDTO(dto);
Message<LikeAndCommentMessageDTO> message = MessageBuilder
.withPayload(dto)
.build();
rocketMQTemplate.send(RocketMQConstants.USER_MESSAGE_CENTER_TOPIC, message);
}
/**
* 批量消息发送
*
* @param dtos
*/
public void send(List<LikeAndCommentMessageDTO> dtos) {
/**
* 将 dtos 集合分割成 1MB 大小的集合
* MQ 批量推送的消息大小最大 1MB 左右
*/
ListSizeSplitUtil.split(1 * 1024 * 1024L, dtos).forEach(items -> {
List<Message<LikeAndCommentMessageDTO>> messageList = new ArrayList<>(items.size());
items.forEach(dto -> {
if (Objects.isNull(dto.getUuid())) {
dto.setUuid(IdUtil.simpleUUID());
}
checkMessageDTO(dto);
Message<LikeAndCommentMessageDTO> message = MessageBuilder
.withPayload(dto)
.build();
messageList.add(message);
});
rocketMQTemplate.syncSend(RocketMQConstants.USER_MESSAGE_CENTER_TOPIC, messageList);
});
}
private void checkMessageDTO(LikeAndCommentMessageDTO dto) {
AssertUtil.isTrue(Objects.isNull(dto.getMessageType()), "消息类型不为空!");
AssertUtil.isTrue(Objects.isNull(dto.getItemId()), "业务数据ID不为空!");
AssertUtil.isTrue(Objects.isNull(dto.getServiceMessageType()), "业务数据类型不为空!");
AssertUtil.isTrue(Objects.isNull(dto.getFromUserId()), "发起方用户ID不为空!");
AssertUtil.isTrue(Objects.isNull(dto.getToUserId()), "接收方用户ID不为空!");
}
/**
* 发送点赞消息
*
* @param messageType 消息类型
* @param serviceMessageType 业务类型
* @param itemToUserIdMap 业务ID对应的用户id
* @param saveLikeList 点赞数据
*/
public void sendLikeMQMessage(
UserCenterMessageTypeEnum messageType,
UserCenterServiceMessageTypeEnum serviceMessageType,
Map<Long, Long> itemToUserIdMap, List<Like> saveLikeList) {
if (CollectionUtils.isEmpty(saveLikeList)) {
return;
}
List<LikeAndCommentMessageDTO> dtos = new ArrayList<>();
for (Like like : saveLikeList) {
LikeAndCommentMessageDTO messageDTO = new LikeAndCommentMessageDTO();
messageDTO.setItemId(like.getItemId());
messageDTO.setMessageType(messageType);
messageDTO.setServiceMessageType(serviceMessageType);
messageDTO.setFromUserId(like.getUserId());
messageDTO.setToUserId(itemToUserIdMap.get(like.getItemId()));
dtos.add(messageDTO);
}
try {
send(dtos);
} catch (Exception e) {
//错误处理
log.error("发送MQ消息失败!", e);
}
}
}
注意:这里我用了 MQ 批量发送消息的一个功能,但是他有一个限制就是每次只能发送 1MB 大小的数据。所以我需要做一个功能工具类将业务方丢过来的批量数据进行分割。
工具类:ListSizeSplitUtil
位置:cn.j3code.config.util
public class ListSizeSplitUtil {
private static Long maxByteSize;
/**
* 根据传进来的 byte 大小限制,将 list 分割成对应大小的 list 集合数据
*
* @param byteSize 每个 list 数据最大大小
* @param list 待分割集合
* @param <T>
* @return
*/
public static <T> List<List<T>> split(Long byteSize, List<T> list) {
if (Objects.isNull(list) || list.size() == 0) {
return new ArrayList<>();
}
if (byteSize <= 100) {
throw new RuntimeException("参数 byteSize 值不小于 100 bytes!");
}
ListSizeSplitUtil.maxByteSize = byteSize;
if (isSurpass(List.of(list.get(0)))) {
throw new RuntimeException("List 中,单个对象都大于 byteSize 的值,分割失败");
}
List<List<T>> result = new ArrayList<>();
List<T> itemList = new ArrayList<>();
for (int i = 0; i < list.size(); i++) {
itemList.add(list.get(i));
if (isSurpass(itemList)) {
i = i - 1;
itemList.remove(itemList.size() - 1);
result.add(new ArrayList<>(itemList));
itemList = new ArrayList<>();
}
}
result.add(new ArrayList<>(itemList));
return result;
}
private static <T> Boolean isSurpass(List<T> obj) {
// 字节(byte)
long objSize = RamUsageEstimator.sizeOfAll(obj.toArray());
return objSize >= ListSizeSplitUtil.maxByteSize;
}
}
至此呢,生产者的逻辑就算是完成了,每次有消息的时候就调用这个方法即可。
2.2 消费者
位置:cn.j3code.user.mq.consumer
@Slf4j
@Component
@AllArgsConstructor
@RocketMQMessageListener(topic = RocketMQConstants.USER_MESSAGE_CENTER_TOPIC,
consumerGroup = RocketMQConstants.GROUP,
messageModel = MessageModel.CLUSTERING,
consumeMode = ConsumeMode.CONCURRENTLY
)
public class LikeAndCommentMessageConsumer implements RocketMQListener<LikeAndCommentMessageDTO> {
private final UserInboxService userInboxService;
@Override
public void onMessage(LikeAndCommentMessageDTO message) {
userInboxService.saveMessage(message);
}
}
saveMessage 方法的逻辑就是将消息保存到 MySQL 中,至此消息的产生和存储就算完成了,下面来看看用户如何查看吧!
2.3 用户消息查看
对于用户查看普通的消息就是访问一下 MySQL,并且更新一下最新读取的字段值即可,我贴一下关键代码就行了,代码如下:
public IPage<UserMessageVO> page(UserMessagePageRequest request) {
// 获取消息
IPage<UserMessageVO> page = getBaseMapper().page(new Page<UserMessageVO>(request.getCurrent(), request.getSize()), request);
if (CollectionUtils.isEmpty(page.getRecords())) {
return page;
}
// 记录一下消息读取位置,默认进来就把全部消息读完了,类似掘金
if (request.getCurrent() == 1) {
if (Objects.isNull(page.getRecords().get(0).getReadPositionId()) ||
page.getRecords().get(0).getReadPositionId() == 0) {
UserInbox userInbox = new UserInbox();
userInbox.setId(page.getRecords().get(0).getId());
userInbox.setReadPositionId(userInbox.getId());
updateById(userInbox);
}
}
return page;
}
2.4 系统消息查看
对于系统消息的查看也是,只贴出关键代码,查询和更新读取记录逻辑,代码如下:
@Override
public IPage<SysOutboxVO> lookSysPage(SysOutboxPageRequest request) {
Page<SysOutbox> page = lambdaQuery()
.orderByDesc(SysOutbox::getId)
.page(new Page<>(request.getCurrent(), request.getSize()));
IPage<SysOutboxVO> outboxVOIPage = page.convert(userInboxConverter::converter);
if (CollectionUtils.isEmpty(outboxVOIPage.getRecords())) {
return outboxVOIPage;
}
// 记录一下消息读取位置,默认进来就把全部消息读完了,类似掘金
if (request.getCurrent() == 1) {
userReadSysOutboxService.updateReadLog(page.getRecords().get(0).getId(), SecurityUtil.getUserId());
}
return outboxVOIPage;
}
这里,可能有人会发现,没有按照上面分析的那用从缓存中读,是的。这里的实现我没有用到 Redis,这里我偷了一下懒,如果有拿到我代码的同学可以试着优化一下这个逻辑。