前面的文章我们讲过消息中心站内信的实现 【消息中心】
那么本章我们来说说异步导入导出完成后,如何使用消息中心站内信的功能进行通知用户业务处理完成了
在async-excel中异步逻辑处理完成后会调用一个callback方法进行回调,所以我们可以再对async-excel的handler接口再做一层封装,让所有的handler继承自我们封装的这个类
此处我就讲一个导出示例
public abstract class AbstractNoticeExportHandler<T> implements ExportHandler<T> {
private static final int COMPLETE = 2;
private static final int FAIL = 3;
@Override
public void callBack(ExcelContext ctx, DataParam param) {
callBack0(ctx, param);
Notice.NoticeBuilder noticeBuilder = Notice.builder()
.toUserCode(ctx.getTask().getCreateUserCode());
//导出成功
if (ctx.getTask().getStatus() == COMPLETE) {
//导出成功
if (StringUtils.isNotEmpty(ctx.getTask().getFileUrl())) {
noticeBuilder.contentType(NoticeContentType.DOWNLOAD.type)
.title("文件导出任务已完成,任务id="+ctx.getTask().getId())
.content(ctx.getTask().getFileUrl());
} else {
//导出成功,但是没有url代表是导出了0条的情况
noticeBuilder.contentType(NoticeContentType.TEXT.type)
.title("文件导出任务已完成,任务id="+ctx.getTask().getId())
.content(String.format("导出%s条", ctx.getSuccessCount()));
}
} else if (ctx.getTask().getStatus() == FAIL) {
//导出失败
noticeBuilder.contentType(NoticeContentType.TEXT.type)
.title("文件导出任务失败,任务id="+ctx.getTask().getId())
.content(ctx.getTask().getFailedMessage());
}
NoticeHelper.send(noticeBuilder.build());
}
public void callBack0(ExcelContext ctx, DataParam param) {
}
}
然后编写业务导出处理类继承自我们自定义的回调处理
@ExcelHandle
public class UserExportHandler implements AbstractNoticeExportHandler<UserExportModel> {
@Autowired
IUserService userService;
@Override
public void init(ExcelContext ctx, DataParam param) {
ExportContext context = (ExportContext) ctx;
//此处的sheetNo会被覆盖,为了兼容一个文件多sheet导出
WriteSheet sheet = EasyExcel.writerSheet(0, "第一个sheet").head(UserExportModel.class).build();
context.setWriteSheet(sheet);
}
@Override
public ExportPage<UserExportModel> exportData(int startPage, int limit, DataExportParam dataExportParam) {
IPage<User> iPage = new Page<>(startPage, limit);
IPage page = userService.page(iPage);
List<UserExportModel> list = ExportListUtil.transform(page.getRecords(), UserExportModel.class);
ExportPage<UserExportModel> result = new ExportPage<>();
result.setTotal(page.getTotal());
result.setCurrent(page.getCurrent());
result.setSize(page.getSize());
result.setRecords(list);
return result;
}
}
业务处理类还是该怎么写怎么写,所以我们来讲讲callback中做了什么事情,首先我们判断导出结果是成功还是失败,有没有异常,根据不同的类型我们包装不同的消息内容类型,如果出现异常了我们包装一个纯文本消息,如果导出正常并且有链接我们包装一个下载链接的消息最后通过消息工具类发送广播消息给mq。这个工具类有就如我们在消息中心的文章中写的那样,我们在发送前先保存下这条消息,并且异步操作,即使失败也不关业务什么事情。
public class NoticeHelper {
/**
* 同步日志
*
* @param notice
*/
public static void send(Notice notice) {
Assert.notNull(notice.getToUserCode(), "接收人不能为空");
Assert.notNull(notice.getTitle(), "消息标题不能为空");
Assert.notNull(notice.getContent(), "消息内容不能为空");
Assert.notNull(NoticeContentType.fromValue(notice.getContentType()), "内容类型不正确");
notice.setCode(BusinessCodeGenerator.getCode(BusinessCodeEnums.AUTO));
notice.setHasRead(0);
notice.setType(0);
if (StringUtils.isEmpty(notice.getFromUserCode())) {
notice.setFromUserCode("system");
notice.setFromUserName("system");
}
doSend(notice);
}
private static void doSend(Notice notice) {
Map<String, String> contextMap = SystemContext.getContextMap();
MqSenderPool.senderExecutor.execute(()->{
try {
SystemContext.setContextMap(contextMap);
NoticeSender sender = SpringContextUtil.getBean(NoticeSender.class);
sender.send(notice);
}finally {
SystemContext.clearAll();
}
});
}
}
public class NoticeSender {
private final static Logger log = LoggerFactory.getLogger(NoticeSender.class);
@Autowired
RabbitTemplate rabbitTemplate;
@Autowired(required = false)
NoticeProxy noticeProxy;
public void send(Notice notice) {
//发送之前保存数据异步处理
try {
if (noticeProxy != null) {
JsonResult<Void> result = noticeProxy.saveMessage(notice);
if (result.isSuccess()) {
rabbitTemplate.convertAndSend(QueueConst.NOTICE_FANOUT_EXCHANGE,
QueueConst.NOTICE_FANOUT_BIND_KEY, JSONUtil.toJsonStr(notice));
} else {
log.error("通知保存失败:{}", JSONUtil.toJsonStr(notice));
//throw new BizException("消息保存失败");
}
}
} catch (Exception e) {
log.error("发送通知异常:{}", JSONUtil.toJsonStr(notice));
}
}
}
NoticeProxy 不多说就是通过rest调用下服务接口保存下消息
在回到之前下消息中心我们写了个消费者,监听广播消息,我消费者可能部署多个节点,所以客户端可能连接在不同的节点上,所以消费者在收到消息的时候判断下目标客户端有没有连接在当前节点,如果不存在消息直接丢弃,如果存在则发送出去。
@Component
@Slf4j
public class NoticeReceiver {
private static ObjectMapper MAPPER = new ObjectMapper();
@RabbitListener(queues = QueueConst.NOTICE_FANOUT_QUEUE)
@RabbitHandler
public void receiveTopic(Message message) throws Exception {
try {
String receiveMsg = new String(message.getBody());
Notice notice = JSONUtil.toBean(receiveMsg, Notice.class);
List<Channel> channels = UserChannelContext.get(notice.getToUserCode());
if (channels != null && channels.size() > 0) {
for (Channel channel : channels) {
if (channel != null) {
channel.writeAndFlush(
new TextWebSocketFrame(MAPPER.writeValueAsString(notice)));
}
}
}
} catch (Exception e) {
log.error("notice接收处理错误:{}",e.getMessage());
}
// 补充:如果用户不在线则直接放弃;
// 补充:无论如何消息消费后需要返回ack,所以此处直接catch起来
}
最后的效果如下弹窗消息,并附带下载按钮
消息下拉框,圆点气泡需要联动推送的弹窗,当接收到消息时弹个窗红点数字加1,前端部分的代码也是相当简单,我就不多说了。UI用的是ant-design,前端根据不同的消息类型进行不同的展示,我们把消息分为
- 纯文本消息
- 路由消息
- 链接跳转消息
- 下载消息等等
当然下拉框的功能是通过调用单独接口实现,后端直接通知给前端的功能是触发上面那个弹窗,而下拉框是可以前端通过点击事件触发的。
消息中心站内信模块是个核心功能,后续可以依据这个功能还可以做什么呢?
- 工作台的功能实现
- 审批流的消息处理
- 程序的强制更新
- 任务的下发
- 简易的IM及时通讯
有了这个核心,系统的功能就能充满无限的遐想,并且用户体验感满满。