SpringBoot整合SSE,实现后端主动推送DEMO
前些日子写了整合SSE得demo。但是SSE对象是存储在ConcurrentHashMap<String, SseEmitter>中。在正式环境明显就不行了,服务重启一下的话都没有了。
那么要持久化,第一选择放redis
1、写了一个redis操作组件
SseEmitterStore
/**
* 不考虑redis 连接异常问题
* @author cmy
* @date 2024/8/21 10:55
*/
@Component
public class SseEmitterStore {
private ConcurrentHashMap<String, SseEmitter> emitters = new ConcurrentHashMap<>();
@Resource
private RedisTemplate<String, Object> redisTemplate;
public void addEmitter(String key, SseEmitter emitter) {
emitters.put(key, emitter);
redisTemplate.opsForHash().put("sse-emitters", key, emitter);
}
public void removeEmitter(String key) {
emitters.remove(key);
redisTemplate.opsForHash().delete("sse-emitters", key);
}
@PostConstruct
private void init() {
Map<Object, Object> temp = redisTemplate.opsForHash().entries("sse-emitters");
temp.forEach((key, value) -> {
if (value instanceof SseEmitter) {
emitters.put(key.toString(), (SseEmitter) value);
}
});
}
public ConcurrentHashMap<String, SseEmitter> getEmitters() {
return emitters;
}
}
Controller修改
public class SseController {
@Resource
SseEmitterStore sseEmitterStore;
@GetMapping("/subscribe/{id}")
@CrossOrigin(origins = "*")
public SseEmitter subscribe(@PathVariable String id) {
SseEmitter emitter = new SseEmitter(Long.MAX_VALUE);
sseEmitterStore.addEmitter(id,emitter);
emitter.onCompletion(() -> sseEmitterStore.removeEmitter(id));
emitter.onError(e -> sseEmitterStore.removeEmitter(id));
return emitter;
}
@GetMapping("/unbind/{id}")
@CrossOrigin(origins = "*")
public ServerResponse deleteItem(@PathVariable String id) {
this.sseEmitterStore.removeEmitter(id);
return ServerResponse.success(true);
}
}
异步发送消息service
@Async
public void broadcastMessage(String message) {
List<String> keysToDelete = new ArrayList<>();
this.sseEmitterStore.getEmitters().forEach((k, v) -> {
try {
v.send(message);
} catch (Throwable t) {
keysToDelete.add(k);
}
});
keysToDelete.forEach(this.sseEmitterStore::removeEmitter);
}
2、无法序列化的问题
跑起来之后,结果报错
DefaultSerializer requires a Serializable payload but received an object of type [org.springframework.web.servlet.mvc.method.annotation.SseEmitter]
错误信息已经很明显了
因为 SseEmitter
并不是一个实现了 Serializable
接口的类,因此不能被默认的序列化器正确处理。
问了AI
3、解决无法序列化问题
3.1自定义redis自定义序列化器
public class CustomJackson2JsonRedisSerializer<T> implements RedisSerializer<T> {
private static final long serialVersionUID = -7649863253433761554L;
private final ObjectMapper objectMapper;
public CustomJackson2JsonRedisSerializer() {
this.objectMapper = new ObjectMapper();
this.objectMapper.registerModule(new JavaTimeModule());
this.objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
}
@Override
public byte[] serialize(T t) throws SerializationException {
if (t == null) {
return new byte[0];
}
try {
return objectMapper.writeValueAsBytes(t);
} catch (JsonProcessingException e) {
throw new SerializationException("Could not write JSON: " + e.getMessage(), e);
}
}
@Override
public T deserialize(byte[] bytes) throws SerializationException {
if (bytes == null || bytes.length == 0) {
return null;
}
try {
return (T) objectMapper.readValue(bytes, SseEmitter.class);
} catch (IOException e) {
throw new SerializationException("Could not read JSON: " + e.getMessage(), e);
}
}
}
3.2redis配置,使序列化器生效
@Configuration
public class RedisConfig {
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(factory);
StringRedisSerializer stringSerializer = new StringRedisSerializer();
CustomJackson2JsonRedisSerializer<Object> jacksonSerializer = new CustomJackson2JsonRedisSerializer<>();
// 根据实际情况,自行修改
template.setKeySerializer(stringSerializer);
template.setValueSerializer(jacksonSerializer);
template.setHashKeySerializer(stringSerializer);
template.setHashValueSerializer(jacksonSerializer);
template.afterPropertiesSet();
return template;
}
}
再次启动服务,即生效。