文章目录
- 概述
- 事件驱动架构的基本概念
- 工程结构
- Code
- 创建事件和事件处理器
- 创建事件总线
- 创建消息通道和发送逻辑
- 创建事件处理器
- 消息持久化
- 创建消息发送事件
- 配置 Spring Boot 启动类
- 测试
- 消息消费
- 运行项目

概述
在微服务架构和大规模分布式系统中,事件驱动架构(EDA)成为了非常重要的设计模式。通过事件驱动,我们可以解耦系统的各个组件,提高系统的可扩展性、可维护性和响应能力。
接下来,我们将演示一下如何在 Spring Boot 中实现一个基于事件驱动的消息发送和接收流程,从消息的发送、事件的发布到事件的监听。
事件驱动架构的基本概念
在事件驱动架构中,系统的各个组件通过事件进行通信。每个事件代表一个特定的行为或状态变化,当事件发布时,系统的其他部分可以响应这些事件并做出相应的处理。消息发送和接收的流程正是通过发布和监听事件来实现的。
接下来我们使用 Spring Boot 来实现一个基于事件驱动的消息系统。、
系统包含以下几个部分:
- 消息发送: 消息将通过一个
MessageEventProcessor
进行处理,并且在处理完成后会发布一个事件。 - 事件发布: 消息成功发送后,通过
ApplicationEventPublisher
发布一个MessageSentEvent
。 - 事件监听: 一个监听器会接收到发布的事件并进行相应的处理(比如记录日志、通知其他组件等)
工程结构
EventBus
:事件总线,负责发布事件。MessageEventProcessor
:处理消息事件的处理器。Event
、MessageEvent
、MessageSentEvent
:事件类,MessageEvent
和MessageSentEvent
继承自Event
。MessageChannel
:消息通道接口,EmailMessageChannel
是其具体实现。MessageRepository
:消息存储库,用于保存消息事件。MessageChannelConfig
:消息通道配置,配置了消息通道的Bean。MessageController
:消息控制器,处理发送消息的请求。MessageSentEventListener
:监听消息发送事件的监听器。
Code
创建事件和事件处理器
Event.java - 定义基础事件
package com.artisan.booteventbus.domain;
public abstract class Event {
// 事件的基本字段
}
MessageEvent.java - 定义具体的消息事件
package com.artisan.booteventbus.domain;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
@EqualsAndHashCode(callSuper = true)
@Data
@Slf4j
@NoArgsConstructor
@AllArgsConstructor
public class MessageEvent extends Event {
private String message;
private String channel;
private Map<String, Object> metadata;
}
EventHandler.java - 定义事件处理器接口
package com.artisan.booteventbus.bus;
import com.artisan.booteventbus.domain.Event;
public interface EventHandler<T extends Event> {
void handle(T event);
}
创建事件总线
EventBus.java - 用于发布事件
package com.artisan.booteventbus.bus;
import com.artisan.booteventbus.domain.Event;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Component;
@Component
public class EventBus {
private final ApplicationEventPublisher publisher;
public EventBus(ApplicationEventPublisher publisher) {
this.publisher = publisher;
}
public void publish(Event event) {
publisher.publishEvent(event);
}
}
创建消息通道和发送逻辑
MessageChannel.java - 定义消息通道接口
package com.artisan.booteventbus.service;
import com.artisan.booteventbus.domain.MessageEvent;
import java.util.concurrent.CompletableFuture;
public interface MessageChannel {
boolean supports(MessageEvent event);
CompletableFuture<Void> sendAsync(MessageEvent event);
}
MessageChannelConfig.java - 初始化channel
package com.artisan.booteventbus.config;
import com.artisan.booteventbus.service.MessageChannel;
import com.artisan.booteventbus.service.impl.EmailMessageChannel;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.ArrayList;
import java.util.List;
@Configuration
public class MessageChannelConfig {
@Bean
public List<MessageChannel> messageChannels() {
List<MessageChannel> channels = new ArrayList<>();
channels.add(new EmailMessageChannel());
// 可以继续添加其他类型的通道
return channels;
}
}
EmailMessageChannel.java - 实现邮件发送通道
package com.artisan.booteventbus.service.impl;
import com.artisan.booteventbus.domain.MessageEvent;
import com.artisan.booteventbus.service.MessageChannel;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CompletableFuture;
@Slf4j
public class EmailMessageChannel implements MessageChannel {
@Override
public boolean supports(MessageEvent event) {
return "email".equals(event.getChannel());
}
@Override
public CompletableFuture<Void> sendAsync(MessageEvent event) {
return CompletableFuture.runAsync(() -> {
// 模拟邮件发送
System.out.println(Thread.currentThread().getName() + "- Sending email: " + event.getMessage());
log.info("Sending email: {}", event.getMessage());
});
}
}
创建事件处理器
MessageEventProcessor.java - 处理消息事件,保存事件并发送
package com.artisan.booteventbus.bus;
import com.artisan.booteventbus.dao.MessageRepository;
import com.artisan.booteventbus.domain.MessageEvent;
import com.artisan.booteventbus.domain.MessageSentEvent;
import com.artisan.booteventbus.service.MessageChannel;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.scheduling.annotation.Async;
import java.util.List;
@Component
public class MessageEventProcessor implements EventHandler<MessageEvent> {
private final EventBus eventBus;
private final MessageRepository messageRepository;
private final List<MessageChannel> channels;
@Autowired
public MessageEventProcessor(EventBus eventBus, MessageRepository messageRepository, List<MessageChannel> channels) {
this.eventBus = eventBus;
this.messageRepository = messageRepository;
this.channels = channels;
}
/**
* @param event
* Asyn 请使用自定义线程池,这里仅仅是 为了演示异步
*/
@Async
@Override
public void handle(MessageEvent event) {
// 1. 消息持久化
messageRepository.save(event);
// 2. 通道路由
MessageChannel channel = channels.stream()
.filter(ch -> ch.supports(event))
.findFirst()
.orElseThrow();
// 3. 异步发送
channel.sendAsync(event)
.thenRun(() -> eventBus.publish(new MessageSentEvent(event)));
}
}
消息持久化
MessageRepository.java - 用于消息的持久化(可以使用内存或数据库)
package com.artisan.booteventbus.dao;
import com.artisan.booteventbus.domain.MessageEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Repository;
import java.util.ArrayList;
import java.util.List;
@Slf4j
@Repository
public class MessageRepository {
private final List<MessageEvent> messageStore = new ArrayList<>();
public void save(MessageEvent event) {
// 模拟存储
messageStore.add(event);
System.out.println(Thread.currentThread().getName() + " - Message saved: " + event.getMessage());
log.info("Message saved {}", event.getMessage());
}
}
创建消息发送事件
MessageSentEvent.java - 定义发送后的事件
package com.artisan.booteventbus.domain;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
@EqualsAndHashCode(callSuper = true)
@Data
@NoArgsConstructor
@AllArgsConstructor
public class MessageSentEvent extends Event {
private MessageEvent originalEvent;
}
配置 Spring Boot 启动类
package com.artisan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;
@EnableAsync(proxyTargetClass=true)
@SpringBootApplication
public class BootEventBusApplication {
public static void main(String[] args) {
SpringApplication.run(BootEventBusApplication.class, args);
}
}
测试
为了测试整个架构,创建一个控制器来模拟发送消息。
package com.artisan.booteventbus.controller;
import com.artisan.booteventbus.domain.MessageEvent;
import com.artisan.booteventbus.bus.EventBus;
import com.artisan.booteventbus.bus.MessageEventProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.HashMap;
@RestController
@RequestMapping("/messages")
public class MessageController {
private final EventBus eventBus;
private final MessageEventProcessor eventProcessor;
@Autowired
public MessageController(EventBus eventBus, MessageEventProcessor eventProcessor) {
this.eventBus = eventBus;
this.eventProcessor = eventProcessor;
}
@RequestMapping("/send")
public String sendMessage(@RequestParam String message, @RequestParam String channel) {
MessageEvent event = new MessageEvent(message, channel, new HashMap<>());
eventProcessor.handle(event); // 异步处理消息
return "Message is being processed";
}
}
消息消费
package com.artisan.booteventbus.listeners;
import com.artisan.booteventbus.domain.MessageSentEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class MessageSentEventListener {
@Async
@EventListener
public void handleMessageSentEvent(MessageSentEvent event) {
// 模拟处理事件
System.out.println(Thread.currentThread().getName() + " - Received MessageSentEvent: " + event.getOriginalEvent().getMessage());
log.info("Sending email: {}", event.getOriginalEvent().getMessage());
}
}
运行项目
http://localhost:8080/messages/send?message=artisan&channel=email
当然了,你也可以基于此种模式,使用kafka