EDA - Spring Boot构建基于事件驱动的消息系统

news2025/2/23 2:41:22

文章目录

  • 概述
  • 事件驱动架构的基本概念
  • 工程结构
  • Code
    • 创建事件和事件处理器
    • 创建事件总线
    • 创建消息通道和发送逻辑
    • 创建事件处理器
    • 消息持久化
    • 创建消息发送事件
    • 配置 Spring Boot 启动类
    • 测试
    • 消息消费
    • 运行项目

在这里插入图片描述


概述

在微服务架构和大规模分布式系统中,事件驱动架构(EDA)成为了非常重要的设计模式。通过事件驱动,我们可以解耦系统的各个组件,提高系统的可扩展性、可维护性和响应能力。

接下来,我们将演示一下如何在 Spring Boot 中实现一个基于事件驱动的消息发送和接收流程,从消息的发送、事件的发布到事件的监听。


事件驱动架构的基本概念

在事件驱动架构中,系统的各个组件通过事件进行通信。每个事件代表一个特定的行为或状态变化,当事件发布时,系统的其他部分可以响应这些事件并做出相应的处理。消息发送和接收的流程正是通过发布和监听事件来实现的。

接下来我们使用 Spring Boot 来实现一个基于事件驱动的消息系统。、

系统包含以下几个部分:

  • 消息发送: 消息将通过一个 MessageEventProcessor 进行处理,并且在处理完成后会发布一个事件。
  • 事件发布: 消息成功发送后,通过 ApplicationEventPublisher 发布一个 MessageSentEvent
  • 事件监听: 一个监听器会接收到发布的事件并进行相应的处理(比如记录日志、通知其他组件等)

工程结构

在这里插入图片描述

  • EventBus:事件总线,负责发布事件。
  • MessageEventProcessor:处理消息事件的处理器。
  • EventMessageEventMessageSentEvent:事件类,MessageEventMessageSentEvent继承自Event
  • MessageChannel:消息通道接口,EmailMessageChannel是其具体实现。
  • MessageRepository:消息存储库,用于保存消息事件。
  • MessageChannelConfig:消息通道配置,配置了消息通道的Bean。
  • MessageController:消息控制器,处理发送消息的请求。
  • MessageSentEventListener:监听消息发送事件的监听器。

Code

创建事件和事件处理器

Event.java - 定义基础事件

package com.artisan.booteventbus.domain;

public abstract class Event {
    // 事件的基本字段
}

MessageEvent.java - 定义具体的消息事件

package com.artisan.booteventbus.domain;


import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import java.util.Map;

@EqualsAndHashCode(callSuper = true)
@Data
@Slf4j
@NoArgsConstructor
@AllArgsConstructor
public class MessageEvent extends Event {
    private String message;
    private String channel;
    private Map<String, Object> metadata;


}

EventHandler.java - 定义事件处理器接口

package com.artisan.booteventbus.bus;

import com.artisan.booteventbus.domain.Event;

public interface EventHandler<T extends Event> {
    void handle(T event);
}

创建事件总线

EventBus.java - 用于发布事件

package com.artisan.booteventbus.bus;


import com.artisan.booteventbus.domain.Event;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Component;

@Component
public class EventBus {

    private final ApplicationEventPublisher publisher;

    public EventBus(ApplicationEventPublisher publisher) {
        this.publisher = publisher;
    }

    public void publish(Event event) {
        publisher.publishEvent(event);
    }
}

创建消息通道和发送逻辑

MessageChannel.java - 定义消息通道接口

package com.artisan.booteventbus.service;

import com.artisan.booteventbus.domain.MessageEvent;

import java.util.concurrent.CompletableFuture;

public interface MessageChannel {

    boolean supports(MessageEvent event);

    CompletableFuture<Void> sendAsync(MessageEvent event);
}

MessageChannelConfig.java - 初始化channel

package com.artisan.booteventbus.config;

import com.artisan.booteventbus.service.MessageChannel;
import com.artisan.booteventbus.service.impl.EmailMessageChannel;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.ArrayList;
import java.util.List;

@Configuration
public class MessageChannelConfig {

    @Bean
    public List<MessageChannel> messageChannels() {
        List<MessageChannel> channels = new ArrayList<>();
        channels.add(new EmailMessageChannel());
        // 可以继续添加其他类型的通道
        return channels;
    }
}

EmailMessageChannel.java - 实现邮件发送通道

package com.artisan.booteventbus.service.impl;

import com.artisan.booteventbus.domain.MessageEvent;
import com.artisan.booteventbus.service.MessageChannel;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CompletableFuture;

@Slf4j
public class EmailMessageChannel implements MessageChannel {
    @Override
    public boolean supports(MessageEvent event) {
        return "email".equals(event.getChannel());
    }

    @Override
    public CompletableFuture<Void> sendAsync(MessageEvent event) {
        return CompletableFuture.runAsync(() -> {
            // 模拟邮件发送
            System.out.println(Thread.currentThread().getName() + "- Sending email: " + event.getMessage());
            log.info("Sending email: {}", event.getMessage());
        });
    }
}

创建事件处理器

MessageEventProcessor.java - 处理消息事件,保存事件并发送

package com.artisan.booteventbus.bus;

import com.artisan.booteventbus.dao.MessageRepository;
import com.artisan.booteventbus.domain.MessageEvent;
import com.artisan.booteventbus.domain.MessageSentEvent;

import com.artisan.booteventbus.service.MessageChannel;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.scheduling.annotation.Async;

import java.util.List;

@Component
public class MessageEventProcessor implements EventHandler<MessageEvent> {

    private final EventBus eventBus;
    private final MessageRepository messageRepository;
    private final List<MessageChannel> channels;

    @Autowired
    public MessageEventProcessor(EventBus eventBus, MessageRepository messageRepository, List<MessageChannel> channels) {
        this.eventBus = eventBus;
        this.messageRepository = messageRepository;
        this.channels = channels;
    }

    /**
     * @param event
     * Asyn 请使用自定义线程池,这里仅仅是 为了演示异步
     */
    @Async
    @Override
    public void handle(MessageEvent event) {
        // 1. 消息持久化
        messageRepository.save(event);

        // 2. 通道路由
        MessageChannel channel = channels.stream()
                .filter(ch -> ch.supports(event))
                .findFirst()
                .orElseThrow();

        // 3. 异步发送
        channel.sendAsync(event)
                .thenRun(() -> eventBus.publish(new MessageSentEvent(event)));
    }

}

消息持久化

MessageRepository.java - 用于消息的持久化(可以使用内存或数据库)

package com.artisan.booteventbus.dao;

import com.artisan.booteventbus.domain.MessageEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Repository;

import java.util.ArrayList;
import java.util.List;

@Slf4j
@Repository
public class MessageRepository {

    private final List<MessageEvent> messageStore = new ArrayList<>();

    public void save(MessageEvent event) {
        // 模拟存储
        messageStore.add(event);
        System.out.println(Thread.currentThread().getName() + " - Message saved: " + event.getMessage());
        log.info("Message saved {}", event.getMessage());
    }
}

创建消息发送事件

MessageSentEvent.java - 定义发送后的事件

package com.artisan.booteventbus.domain;


import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;

@EqualsAndHashCode(callSuper = true)
@Data
@NoArgsConstructor
@AllArgsConstructor
public class MessageSentEvent extends Event {
    private MessageEvent originalEvent;


}

配置 Spring Boot 启动类

package com.artisan;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;

@EnableAsync(proxyTargetClass=true)
@SpringBootApplication
public class BootEventBusApplication {

    public static void main(String[] args) {
        SpringApplication.run(BootEventBusApplication.class, args);
    }

}

测试

为了测试整个架构,创建一个控制器来模拟发送消息。

package com.artisan.booteventbus.controller;


import com.artisan.booteventbus.domain.MessageEvent;
import com.artisan.booteventbus.bus.EventBus;
import com.artisan.booteventbus.bus.MessageEventProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

import java.util.HashMap;

@RestController
@RequestMapping("/messages")
public class MessageController {

    private final EventBus eventBus;
    private final MessageEventProcessor eventProcessor;

    @Autowired
    public MessageController(EventBus eventBus, MessageEventProcessor eventProcessor) {
        this.eventBus = eventBus;
        this.eventProcessor = eventProcessor;
    }

    @RequestMapping("/send")
    public String sendMessage(@RequestParam String message, @RequestParam String channel) {
        MessageEvent event = new MessageEvent(message, channel, new HashMap<>());
        eventProcessor.handle(event); // 异步处理消息
        return "Message is being processed";
    }
}

消息消费

package com.artisan.booteventbus.listeners;

import com.artisan.booteventbus.domain.MessageSentEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class MessageSentEventListener {

    @Async
    @EventListener
    public void handleMessageSentEvent(MessageSentEvent event) {
        // 模拟处理事件
        System.out.println(Thread.currentThread().getName() + " - Received MessageSentEvent: " + event.getOriginalEvent().getMessage());
        log.info("Sending email: {}", event.getOriginalEvent().getMessage());

    }
}

运行项目

http://localhost:8080/messages/send?message=artisan&channel=email

在这里插入图片描述
在这里插入图片描述

当然了,你也可以基于此种模式,使用kafka
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2258971.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

仿iOS日历、飞书日历、Google日历的日模式

仿iOS日历、飞书日历、Google日历的日模式&#xff0c;24H内事件可自由上下拖动、自由拉伸。 以下是效果图&#xff1a; 具体实现比较简单&#xff0c;代码如下&#xff1a; import android.content.Context; import android.graphics.Canvas; import android.graphics.Color;…

软考高级架构 - 10.5 软件架构演化评估方法

10.4 软件架构演化原则总结 本节提出了18条架构演化的核心原则&#xff0c;并为每条原则设计了简单而有效的度量方法&#xff0c;用于从系统整体层面提供实用信息&#xff0c;帮助评估和指导架构演化。 演化成本控制&#xff1a;成本小于重新开发成本&#xff0c;经济高效。进…

DocFlow票据AI自动化处理工具:出色的文档解析+抽取能力,提升企业文档数字化管理效能

目录 财务应付 金融信贷业务 近期&#xff0c;DocFlow票据自动化产品正式上线。DocFlow是一款票据AI自动化处理工具&#xff0c;支持不同版式单据智能分类扩展&#xff0c;可选功能插件配置流程&#xff0c;满足多样业务场景。 随着全球化与信息化进程&#xff0c;企业的文件…

C# 探险之旅:第二节 - 定义变量与变量赋值

欢迎再次踏上我们的C#学习之旅。今天&#xff0c;我们要聊一个超级重要又好玩的话题——定义变量与变量赋值。想象一下&#xff0c;你正站在一个魔法森林里&#xff0c;手里拿着一本空白的魔法书&#xff08;其实就是你的代码编辑器&#xff09;&#xff0c;准备记录下各种神奇…

有道云笔记批量导出

前言 最近使用有道云笔记遇到打开过慢&#xff0c;导致笔记丢失&#xff0c;需要会员才能找回之前笔记问题。 决定改用思源&#xff0c;程序中的格式比较难于通过复制保留&#xff0c;即使导出成word 或者pdf&#xff0c;需要一个专门工具导出成Markdown格式&#xff0c;批量…

离线无网环境中基于OpenEuler的everything ISO安装软件

文章目录 1.创建挂载点 2.挂载 ISO 文件: 3.配置 YUM 源 4.清理 YUM 缓存并生成新的缓存: 5.使用 YUM 安装软件包 要在 OpenEuler 系统中挂载ISO &#xff08;下载地址&#xff1a;https://repo.openeuler.openatom.cn/openEuler-20.03-LTS/ISO/x86_64/&#xff09;并使用…

2024最新树莓派4b安装ubuntu20.04.5-server版本全流程解决方案:从烧录到配置桌面到联网!!!

准备工作 硬件工具 树莓派4b&#xff0c;32GSD卡&#xff0c;读卡器 软件工具 ubuntu20.04.5镜像&#xff0c;SD卡格式化工具&#xff0c;烧录软件&#xff0c;远程连接工具。 下面是我通过百度网盘分享的文件&#xff1a;树莓派4bubuntu20.04链接&#xff1a;https://pan…

STM32 OLED屏幕驱动详解

一、介绍 OLED是有机发光二极管&#xff0c;又称为有机电激光显示&#xff08;Organic Electroluminescence Display&#xff0c; OLED&#xff09;。OLED由于同时具备自发光&#xff0c;不需背光源、对比度高、厚度薄、视角广、反应速度快、可用于挠曲性面板、使用温度范围广…

商业银行基于容器云的分布式数据库架构设计与创新实践

导读 本文介绍了某商业银行基于 TiDB 和 Kubernetes(简称 K8s) 构建的云化分布式数据库平台&#xff0c;重点解决了传统私有部署模式下的高成本、低资源利用率及运维复杂等问题。 通过引入 TiDB Operator 自动化管理与容器化技术&#xff0c;银行能够实现多个业务系统的高可用…

项目组件框架介绍[etcd]

文章目录 前言etcd安装Ubuntu 上通过包管理器安装通过源码安装配置 客户端开发包开发包的安装接口介绍添加一个键值对获取一个键值对租约保活机制监听 封装服务注册与发现服务注册服务发现 前言 Etcd 是一个 golang 编写的分布式、高可用的一致性键值存储系统&#xff0c;用于配…

网页爬虫技术全解析:从基础到实战

引言 在当今信息爆炸的时代&#xff0c;互联网上的数据量每天都在以惊人的速度增长。网页爬虫&#xff08;Web Scraping&#xff09;&#xff0c;作为数据采集的重要手段之一&#xff0c;已经成为数据科学家、研究人员和开发者不可或缺的工具。本文将全面解析网页爬虫技术&…

Jmeter如何对UDP协议进行测试?

Jmeter如何对UDP协议进行测试&#xff1f; 1 jmeter-plugins安装2 UDP-Protocol Support安装3 UDP协议测试 1 jmeter-plugins安装 jmeter-plugins是Jmeter的插件管理器&#xff1b;可以组织和管理Jmeter的所有插件&#xff1b;直接进入到如下页面&#xff0c;选择如图的选项进…

计算机网络之网络层超详细讲解

个人主页&#xff1a;C忠实粉丝 欢迎 点赞&#x1f44d; 收藏✨ 留言✉ 加关注&#x1f493;本文由 C忠实粉丝 原创 计算机网络之网络层超详细讲解 收录于专栏【计算机网络】 本专栏旨在分享学习计算机网络的一点学习笔记&#xff0c;欢迎大家在评论区交流讨论&#x1f48c; …

微信小程序:实现节点进度条的效果;正在完成的节点有动态循环效果;横向,纵向排列

参考说明 微信小程序实现流程进度功能 - 知乎 上面的为一个节点进度条的例子&#xff0c;但并不完整&#xff0c;根据上述代码&#xff0c;进行修改完善&#xff0c;实现其效果 横向效果 代码 wxml <view classorder_process><view classprocess_wrap wx:for&quo…

如何不重启修改K8S containerd容器的内存限制(Cgroup方法)

1. 使用crictl 查看容器ID crictl ps2. 查看Cgroup位置 crictl inspect 容器ID3. 到容器Cgroup的目录下 使用上个命令就能找到CgroupPath 4 . 到cgroup目录下 正确目录是 : /sys/fs/cgroup/memory/kubepods.slice/kubepods-burstable.slice/kubepods-burstable-podf68e18…

海康威视摄像头RTSP使用nginx推流到服务器直播教程

思路&#xff1a; 之前2020年在本科的时候&#xff0c;由于项目的需求需要将海康威视的摄像头使用推流服务器到网页进行直播。这里将自己半个月琢磨出来的步骤给大家发一些。切勿转载&#xff01;&#xff01;&#xff01;&#xff01; 使用网络摄像头中的rtsp协议---------通…

智简模型,边缘智能:AI 轻量化与边缘计算的最佳实践

文章目录 摘要引言模型轻量化与优化方法模型量化模型剪枝知识蒸馏合理使用边缘计算硬件 轻量化图像分类实战1. 模型量化2. 知识蒸馏3. 学生模型的创建与训练 QA环节总结参考资料 摘要 边缘计算与 AI 模型的结合&#xff0c;能够在资源受限的环境中提供实时智能服务。通过模型轻…

flink sink kafka的事务提交现象猜想

现象 查看flink源码时 sink kafka有事务提交机制&#xff0c;查看源码发现是使用两阶段提交策略&#xff0c;而事务提交是checkpoint完成后才执行&#xff0c;那么如果checkpoint设置间隔时间比较长时&#xff0c;事务未提交之前&#xff0c;后端应该消费不到数据&#xff0c…

推送(push)项目到gitlab

文章目录 1、git init1.1、在当前目录中显示隐藏文件&#xff1a;1.2、查看已有的远程仓库1.3、确保你的本地机器已经生成了 SSH 密钥&#xff1a;1.4、将生成的公钥文件&#xff08;通常位于 ~/.ssh/id_rsa.pub&#xff09;复制到 GitLab 的 SSH 设置中&#xff1a;1.5、测试 …

7.Vue------$refs与$el详解 ------vue知识积累

$refs 与 $el是什么&#xff1f; 作用是什么? ref&#xff0c;$refs&#xff0c;$el &#xff0c;三者之间的关系是什么&#xff1f; ref (给元素或者子组件注册引用信息) 就像你要给元素设置样式&#xff0c;就需要先给元素设定一个 class 一样&#xff0c;同理&#xff0c;…