为什么使用 SSE 而不使用 WebSocket, 请看 SEE 对比 Websocket 的优缺点。
特性 | SSE | WebSocket |
---|---|---|
通信方向 | 单向(服务器→客户端) | 双向(全双工) |
协议 | 基于 HTTP | 独立协议(需 ws:// 前缀) |
兼容性 | 现代浏览器(IE 不支持) | 广泛支持 |
复杂度 | 简单(服务器只需返回流数据) | 较复杂(需处理握手、帧协议等) |
适用场景 | 实时推送(日志、通知、新闻) | 双向交互(聊天、实时协作) |
我的架构如下:
前端
后端中间件
后端服务
[前端] → 打开模态框 → 发起SSE连接 → [后端中间件] → 转发请求 → [后端服务] → 查询日志信息
← 实时日志推送 ← (SSE流) ← 捕获进程输出流 ← 返回实时日志流
前端html代码:
前端代码使用 bootstrap + jquery
<a href='#' data-bs-toggle='modal' data-bs-target='#logModal'>日志</a>
<!-- Modal -->
<div class="modal fade" id="logModal" tabindex="-1" aria-labelledby="logModalLabel" aria-hidden="true">
<div class="modal-dialog modal-fullscreen">
<div class="modal-content">
<div class="modal-header">
<h1 class="modal-title fs-5" id="logModalLabel">Modal title</h1>
<button type="button" class="btn-close" data-bs-dismiss="modal" aria-label="Close"></button>
</div>
<div class="modal-body" id="logContent">
...
</div>
<div class="modal-footer">
<button type="button" class="btn btn-secondary" data-bs-dismiss="modal">Close</button>
<button type="button" class="btn btn-primary">Save changes</button>
</div>
</div>
</div>
</div>
前端js代码:
$("#logModal").on('shown.bs.modal', function (){
const eventSource = new EventSource("http://localhost:9998/middle/logs");
const logContent = $('#logContent');
eventSource.onmessage = function (event){
const logLine = $('<div>').text(event.data)
logContent.append(logLine);
logContent[0].scrollTop = logContent[0].scrollHeight; // 自动滚动到底部
};
eventSource.onerror = function (error ) {
console.log('日志连接失败: ', error);
eventSource.close();
logContent.text('日志获取失败,请检查容器常或网络连接')
}
$("#logModal").on('hide.bs.modal', function (){
eventSource.close();
logContent.empty(); // 清空日志
})
})
后端中间件需要引入 webflux 的依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
后端中间件代码:
controller:
@GetMapping(value = "/middle/logs", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter getDockerLogs(){
return publishServerService.getDockerLogs();
}
service:
private final WebClient webClient = WebClient.builder().clientConnector(new ReactorClientHttpConnector(HttpClient.create())).build();
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
public SseEmitter getDockerLogs() {
String url = "http://127.0.0.1:9999/service/logs";
SseEmitter emitter = new SseEmitter();
executorService.submit(() -> {
try {
webClient.get().uri(url)
.retrieve()
.bodyToFlux(String.class)
.subscribe(
log -> {
try {
emitter.send(SseEmitter.event().data(log)); // 发送消息到前端
} catch (IOException e) {
emitter.completeWithError(e);
}
},
error -> emitter.completeWithError(error),
emitter::complete
);
} catch (Exception e) {
emitter.completeWithError(e);
}
});
return emitter;
}
后端服务代码:
controller:
@GetMapping(value = "/service/logs", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter getDockerLogs() {
return publishServerService.getDockerLogs();
}
service:
private final ExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
public SseEmitter getDockerLogs() {
SseEmitter emitter = new SseEmitter();
executorService.submit(() -> {
try {
int i = 0;
while (i < 100000000) {
try {
emitter.send(SseEmitter.event().data(i)); // 发送消息到后端中间件
Thread.sleep(1000);
i++;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
emitter.complete();
} catch (IOException e) {
throw new RuntimeException(e);
}
});
return emitter;
}
提示:你可以把 i 换成真实的应用程序日志,如下:
public SseEmitter getDockerLogs() {
SseEmitteremitter = new SseEmitter();
executorService.submit(() -> {
try {
Process process = Runtime.getRuntime().exec("docker logs -f nginx");
InputStreamReader inputStreamReader = new InputStreamReader(process.getInputStream());
BufferedReader reader = new BufferedReader(inputStreamReader);
String line;
while ((line = reader.readLine()) != null) {
emitter.send(SseEmitter.event().data(line));
}
emitter.complete();
} catch (IOException e) {
throw new RuntimeException(e);
}
});
return emitter;
}
最终前端显示效果如下:
到此前端就可以实时的获取后端日志在页面中显示了。
但是你会发现后端控制台会时不时报错,错误信息如下:
2025-04-24T11:01:12.488Z WARN 26616 --- [nio-9990-exec-2] .w.s.m.s.DefaultHandlerExceptionResolver : Ignoring exception, response committed. : org.springframework.web.context.request.async.AsyncRequestTimeoutException
2025-04-24T11:01:12.489Z WARN 26616 --- [nio-9990-exec-2] .w.s.m.s.DefaultHandlerExceptionResolver : Resolved [org.springframework.web.context.request.async.AsyncRequestTimeoutException]
org.springframework.web.context.request.async.AsyncRequestTimeoutException
at org.springframework.web.context.request.async.TimeoutDeferredResultProcessingInterceptor.handleTimeout(TimeoutDeferredResultProcessingInterceptor.java:42)
at org.springframework.web.context.request.async.DeferredResultInterceptorChain.triggerAfterTimeout(DeferredResultInterceptorChain.java:81)
at org.springframework.web.context.request.async.WebAsyncManager.lambda$startDeferredResultProcessing$5(WebAsyncManager.java:442)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
at org.springframework.web.context.request.async.StandardServletAsyncWebRequest.onTimeout(StandardServletAsyncWebRequest.java:154)
at org.apache.catalina.core.AsyncListenerWrapper.fireOnTimeout(AsyncListenerWrapper.java:44)
at org.apache.catalina.core.AsyncContextImpl.timeout(AsyncContextImpl.java:135)
at org.apache.catalina.connector.CoyoteAdapter.asyncDispatch(CoyoteAdapter.java:135)
at org.apache.coyote.AbstractProcessor.dispatch(AbstractProcessor.java:243)
at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:57)
at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:896)
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1744)
at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:52)
at org.apache.tomcat.util.threads.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1191)
at org.apache.tomcat.util.threads.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:659)
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
javaweb 报错如下:
2025-04-24T11:02:41.538Z WARN 26653 --- [nio-9991-exec-1] .w.s.m.s.DefaultHandlerExceptionResolver : Ignoring exception, response committed. : org.springframework.web.context.request.async.AsyncRequestTimeoutException
2025-04-24T11:02:41.538Z WARN 26653 --- [nio-9991-exec-1] .w.s.m.s.DefaultHandlerExceptionResolver : Resolved [org.springframework.web.context.request.async.AsyncRequestTimeoutException]
2025-04-24T11:02:42.480Z ERROR 26653 --- [or-http-epoll-2] reactor.core.publisher.Operators : Operator called default onErrorDropped
java.lang.IllegalStateException: The response object has been recycled and is no longer associated with this facade
at org.apache.catalina.connector.ResponseFacade.checkFacade(ResponseFacade.java:478) ~[tomcat-embed-core-10.1.16.jar!/:na]
at org.apache.catalina.connector.ResponseFacade.isFinished(ResponseFacade.java:154) ~[tomcat-embed-core-10.1.16.jar!/:na]
at org.apache.catalina.connector.ResponseFacade.flushBuffer(ResponseFacade.java:240) ~[tomcat-embed-core-10.1.16.jar!/:na]
at org.springframework.http.server.ServletServerHttpResponse.flush(ServletServerHttpResponse.java:104) ~[spring-web-6.0.14.jar!/:6.0.14]
at org.springframework.http.server.DelegatingServerHttpResponse.flush(DelegatingServerHttpResponse.java:61) ~[spring-web-6.0.14.jar!/:6.0.14]
at org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitterReturnValueHandler$HttpMessageConvertingHandler.complete(ResponseBodyEmitterReturnValueHandler.java:231) ~[spring-webmvc-6.0.14.jar!/:6.0.14]
at org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter.complete(ResponseBodyEmitter.java:266) ~[spring-webmvc-6.0.14.jar!/:6.0.14]
at reactor.core.publisher.LambdaSubscriber.onComplete(LambdaSubscriber.java:132) ~[reactor-core-3.5.12.jar!/:3.5.12]
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onComplete(MonoFlatMapMany.java:260) ~[reactor-core-3.5.12.jar!/:3.5.12]
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2205) ~[reactor-core-3.5.12.jar!/:3.5.12]
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onComplete(FluxOnAssembly.java:549) ~[reactor-core-3.5.12.jar!/:3.5.12]
at reactor.core.publisher.FluxConcatMapNoPrefetch$FluxConcatMapNoPrefetchSubscriber.onComplete(FluxConcatMapNoPrefetch.java:240) ~[reactor-core-3.5.12.jar!/:3.5.12]
at reactor.core.publisher.DrainUtils.postComplete(DrainUtils.java:183) ~[reactor-core-3.5.12.jar!/:3.5.12]
at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.onComplete(FluxBufferPredicate.java:356) ~[reactor-core-3.5.12.jar!/:3.5.12]
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableConditionalSubscriber.onComplete(FluxPeekFuseable.java:595) ~[reactor-core-3.5.12.jar!/:3.5.12]
at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onComplete(FluxMapFuseable.java:350) ~[reactor-core-3.5.12.jar!/:3.5.12]
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onComplete(FluxContextWrite.java:126) ~[reactor-core-3.5.12.jar!/:3.5.12]
at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onComplete(FluxDoFinally.java:128) ~[reactor-core-3.5.12.jar!/:3.5.12]
at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:230) ~[reactor-core-3.5.12.jar!/:3.5.12]
at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drainAsync(FluxFlattenIterable.java:371) ~[reactor-core-3.5.12.jar!/:3.5.12]
at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drain(FluxFlattenIterable.java:724) ~[reactor-core-3.5.12.jar!/:3.5.12]
at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.onComplete(FluxFlattenIterable.java:273) ~[reactor-core-3.5.12.jar!/:3.5.12]
at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144) ~[reactor-core-3.5.12.jar!/:3.5.12]
at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260) ~[reactor-core-3.5.12.jar!/:3.5.12]
at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144) ~[reactor-core-3.5.12.jar!/:3.5.12]
at reactor.netty.channel.FluxReceive.terminateReceiver(FluxReceive.java:483) ~[reactor-netty-core-1.1.13.jar!/:1.1.13]
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:275) ~[reactor-netty-core-1.1.13.jar!/:1.1.13]
at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:419) ~[reactor-netty-core-1.1.13.jar!/:1.1.13]
at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:446) ~[reactor-netty-core-1.1.13.jar!/:1.1.13]
at reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:500) ~[reactor-netty-core-1.1.13.jar!/:1.1.13]
at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:768) ~[reactor-netty-http-1.1.13.jar!/:1.1.13]
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:114) ~[reactor-netty-core-1.1.13.jar!/:1.1.13]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.101.Final.jar!/:4.1.101.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.101.Final.jar!/:4.1.101.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.101.Final.jar!/:4.1.101.Final]
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436) ~[netty-transport-4.1.101.Final.jar!/:4.1.101.Final]
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346) ~[netty-codec-4.1.101.Final.jar!/:4.1.101.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318) ~[netty-codec-4.1.101.Final.jar!/:4.1.101.Final]
at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251) ~[netty-transport-4.1.101.Final.jar!/:4.1.101.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[netty-transport-4.1.101.Final.jar!/:4.1.101.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.101.Final.jar!/:4.1.101.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.101.Final.jar!/:4.1.101.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.101.Final.jar!/:4.1.101.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[netty-transport-4.1.101.Final.jar!/:4.1.101.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.101.Final.jar!/:4.1.101.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.101.Final.jar!/:4.1.101.Final]
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:800) ~[netty-transport-classes-epoll-4.1.101.Final.jar!/:4.1.101.Final]
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:509) ~[netty-transport-classes-epoll-4.1.101.Final.jar!/:4.1.101.Final]
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:407) ~[netty-transport-classes-epoll-4.1.101.Final.jar!/:4.1.101.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[netty-common-4.1.101.Final.jar!/:4.1.101.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.101.Final.jar!/:4.1.101.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.101.Final.jar!/:4.1.101.Final]
at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
这是因为 SpringMVC 默认设置了 30 秒超时时间,你需要修改的长一点,具体多少由你自己定义。后端中间件和后端服务都需要配置。
错误分析
1. AsyncRequestTimeoutException
此错误表明异步请求超时。在 Spring 里,当一个异步请求在规定的时间内没有完成时,就会抛出该异常。默认情况下,Spring 的异步请求超时时间是 30 秒。在 SSE 场景中,因为要保持长连接以实现实时数据推送,所以很容易超出这个时间限制。
2. IllegalStateException
该错误显示响应对象已被回收,不再和当前的请求关联。这通常是在异步请求超时后,响应对象被关闭或者回收,而代码还尝试对其进行操作时发生的。
解决办法
1. 增加异步请求超时时间
你可以通过配置 WebMvcConfigurer 来增加异步请求的超时时间,避免因超时引发异常。
在 后端中间件 和 后端服务 中添加如下配置类:
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.AsyncSupportConfigurer;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
@Configuration
class LogWebConfig implements WebMvcConfigurer {
@Override
public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
// 设置异步请求超时时间为 3600 秒(1 小时)
configurer.setDefaultTimeout(3600 * 1000);
}
}
扩展:
我通过前端传递参数(容器id)给后端执行,我发现了一个奇怪的问题。
我第一次打开模态框,可以正常的实时显示日志。但是我点击 close 的时候,这段代码确实执行了。
$("#logModal").on('hide.bs.modal', function (){
eventSource.close();
logContent.empty(); // 清空日志
})
但是我再次打开模态框,发现没有日志显示了,我甚至刷新页面,然后再次打开模态框依然没有日志显示,只有我把后端服务重启之后,再次打开模态框,才有日志显示,这是为什么?
这个问题可能是由于EventSource 实例未正确释放或重复创建导致的资源冲突。以下是具体分析和解决方案:
问题原因分析
1. EventSource 作用域问题:
在当前代码中,eventSource变量是在shown.bs.modal事件处理函数的闭包中创建的。当模态框关闭时,虽然通过hide.bs.modal事件调用了eventSource.close(),但闭包中的变量可能未被正确释放,导致再次打开模态框时,旧的eventSource实例残留或引用错误。
2. 事件绑定重复问题:
每次打开模态框时,shown.bs.modal事件处理函数会重复绑定hide.bs.modal事件,导致多个处理函数共存,可能混淆不同实例的eventSource。
3. 后端连接未正确断开:
若后端未正确处理EventSource的断开(如未关闭响应流),可能导致端口或连接资源被占用,后续请求无法建立新连接。
解决方案:
使用模态框数据存储 EventSource 实例
修改代码,将eventSource实例存储在模态框元素的data属性中,确保每次关闭时正确释放当前实例:
// 修改后的模态框显示事件处理函数
$("#logModal").on('shown.bs.modal', function (event) {
const trigger = $(event.relatedTarget);
const itemStr = trigger.data('item');
if (!itemStr) return;
const item = JSON.parse(itemStr);
const modal = $(this); // 获取当前模态框实例
// 先关闭可能存在的旧连接
const oldEventSource = modal.data('eventSource');
if (oldEventSource) {
oldEventSource.close();
modal.removeData('eventSource');
}
// 创建新的EventSource并存储到模态框数据中
const eventSource = new EventSource(`http://localhost:9998/middle/logs?containerId=${item.containerId}&ip=${item.ip}`);
modal.data('eventSource', eventSource);
const logContent = $('#logContent');
logContent.empty();
eventSource.onmessage = function (event) {
const logLine = $('<div>').text(event.data);
logContent.append(logLine);
logContent[0].scrollTop = logContent[0].scrollHeight;
};
eventSource.onerror = function (error) {
console.log('日志连接失败: ', error);
eventSource.close();
logContent.text('日志获取失败,请检查容器状态或网络连接');
modal.removeData('eventSource'); // 清除错误状态的实例
};
});
// 修改后的模态框隐藏事件处理函数
$("#logModal").on('hide.bs.modal', function () {
const modal = $(this);
const eventSource = modal.data('eventSource');
if (eventSource) {
eventSource.close(); // 关闭当前实例
modal.removeData('eventSource'); // 清除数据引用
}
$('#logContent').empty(); // 清空日志
});
其实不是上面这个问题,经过再次排查,我发现了问题所在,当我的后端服务的代码这样写的时候,就会出现前面说的问题。
public SseEmitter getDockerLogs(PublishServerDto publishServerDto) {
SseEmitter emitter = new SseEmitter();
executorService.submit(() -> {
try {
Process process = Runtime.getRuntime().exec("docker logs -f " + publishServerDto.getContainerId());
InputStreamReader inputStreamReader = new InputStreamReader(process.getInputStream());
BufferedReader reader = new BufferedReader(inputStreamReader);
String line;
while ((line = reader.readLine()) != null) {
emitter.send(SseEmitter.event().data(line));
}
emitter.complete();
} catch (IOException e) {
emitter.completeWithError(e);
}
});
return emitter;
}
当我的后端服务的代码修改成这样的时候。
public SseEmitter getDockerLogs(PublishServerDto publishServerDto) {
SseEmitter emitter = new SseEmitter();
executorService.submit(() -> {
try {
int i = 0;
while (i < 100000000) {
try {
emitter.send(SseEmitter.event().data(i)); // 发送消息到后端中间件
Thread.sleep(1000);
i++;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
emitter.complete();
} catch (IOException e) {
emitter.completeWithError(e);
}
});
return emitter;
}
前端关闭再打开模态框也能正常显示数据。为什么?
这是因为 docker logs -f 命令会持续阻塞直到进程被终止,而我的后端代码在 EventSource 关闭时未正确终止正在运行的 Process 对象,导致资源占用和连接冲突。
docker logs -f 命令导致线程阻塞,无法响应 SseEmitter 的关闭事件。当前端关闭 EventSource 时,后端的 reader.readLine() 处于阻塞状态,无法感知 SseEmitter 的异常或完成信号,导致 finally 块和异常处理代码无法执行。
另外,
以下是详细分析和解决方案:
问题根源:未终止后台进程
1. docker logs -f 的阻塞特性
-
docker logs -f 会持续读取容器日志并阻塞当前线程,直到容器停止或命令被中断(如按下 Ctrl+C)。
-
当前端关闭 EventSource 时,后端的 SseEmitter 会触发 completeWithError,但 Process 对象(docker logs 进程)仍在后台运行,其输入流被占用,导致:
- 再次打开模态框时,新的 Process 无法创建(端口 / 资源被占用)。
- 旧的 Process 残留数据可能干扰新连接。
2. 模拟数据与真实命令的差异
模拟数据场景: 模拟数据的循环中使用了 Thread.sleep(1000),该方法会响应线程中断(InterruptedException)。当 SseEmitter 关闭时,会抛出异常并中断线程,而 docker logs -f 的阻塞式读取无法响应中断,导致清理逻辑失效。
真实命令场景: docker logs -f 是外部进程,不受 Java 线程控制,EventSource 关闭时未终止该进程,导致资源泄漏。
3. 阻塞式读取的局限性
BufferedReader.readLine() 是阻塞式方法,当 docker logs -f 没有新日志时,线程会一直阻塞在此处,无法处理 SseEmitter 的关闭事件(如 emitter.completeWithError() 或客户端断开连接引发的异常)。可以使用非阻塞式读取或中断机制 解决这个问题。
解决方案:
分离读取逻辑
将日志读取放到独立线程中,避免主线程被 docker logs -f 阻塞,确保能响应 SseEmitter 的关闭事件。
监听 SseEmitter 事件
使用 emitter.onCompletion() 和 emitter.onError() 回调,在连接关闭或出错时执行清理逻辑。
修改后的代码:
service :
// 使用原子引用存储进程对象,确保多线程环境下的可见性和原子性
private final AtomicReference<Process> processHolder = new AtomicReference<>();
// 使用原子布尔控制日志读取循环,确保线程安全的状态变更
private final AtomicBoolean isRunning = new AtomicBoolean(true);
public SseEmitter getDockerLogs(PublishServerDto publishServerDto) {
SseEmitter emitter = new SseEmitter();
// 提交异步任务处理日志流
executorService.submit(() -> {
try {
// 1. 执行docker logs命令,获取容器实时日志
String containerId = publishServerDto.getContainerId();
Process process = Runtime.getRuntime().exec("docker logs -f " + containerId);
processHolder.set(process); // 保存进程引用到原子容器中
// 2. 启动独立线程读取日志输入流(避免主线程阻塞)
new Thread(() -> {
try ( // 使用try-with-resources自动关闭流
BufferedReader reader = new BufferedReader(
new InputStreamReader(process.getInputStream())
)
) {
String line;
// 循环读取日志,直到isRunning为false或读取到null(流结束)
while (isRunning.get() && (line = reader.readLine()) != null) {
// 通过SSE发送日志数据到前端
emitter.send(SseEmitter.event().data(line));
}
} catch (IOException e) {
// 读取流时发生异常(如管道关闭),终止进程
Process p = processHolder.get();
if (p != null) {
p.destroy(); // 尝试正常终止进程
System.out.println("taskagent - 读取日志流异常: " + e.getMessage());
}
}
}).start();
// 3. 监听SSE连接的生命周期事件
// 当前端正常关闭连接时触发(如关闭模态框)
emitter.onCompletion(() -> {
handleShutdown("正常关闭", processHolder.get()); // 统一处理关闭逻辑
});
// 当SSE连接发生错误时触发(如网络中断)
emitter.onError(ex -> {
handleShutdown("错误关闭: " + ex.getMessage(), processHolder.get());
});
// 当SSE连接超时时触发(需配置超时时间,默认30秒)
emitter.onTimeout(() -> {
handleShutdown("超时关闭", processHolder.get());
});
} catch (Exception e) {
// 初始化进程时发生异常(如命令格式错误)
isRunning.set(false);
Process p = processHolder.get();
if (p != null) p.destroy();
System.out.println("taskagent - 初始化docker进程失败: " + e.getMessage());
emitter.completeWithError(e); // 通知前端连接失败
}
});
return emitter;
}
/**
* 统一处理进程关闭逻辑
* @param reason 关闭原因(用于日志输出)
* @param process 待关闭的进程
*/
private void handleShutdown(String reason, Process process) {
isRunning.set(false); // 停止日志读取循环
if (process != null) {
process.destroy(); // 发送中断信号(等效于Ctrl+C)
System.out.println("taskagent - " + reason + ", 已终止docker进程"); // 关键日志输出点
}
}
现在我发现另一个问题。我的页面有很多容器,首先我打开nginx容器的日志,模态框被打开,实时日志能正常显示在页面上,当我关闭模态框,然后再次打开 nginx的日志,模态框被打开,依然能正常显示实时日志。 关键问题来了,然后我关闭模态框,接着打开 redis 的容器日志,日志没有显示在页面上了。
问题分析:
1. isRunning 标志的重置问题 - 可能
在 handleShutdown 方法里,isRunning 被设置为 false,不过在开启新的日志流时,没有将其重置为 true。这会让后续的日志读取循环无法正常运行。
解决办法:
在开启新的日志流之前,把 isRunning 重置为 true。
2. processHolder 未正确更新 - 可能
当切换容器查看日志时,processHolder 可能还保存着之前的 Process 对象,这会对新的日志读取产生影响。
解决办法:
在开启新的日志流之前,确保 processHolder 被清空。
3. webClient 连接问题(taskweb 端) - 可能
taskweb 端的 webClient 在处理多个连接时,可能会出现连接复用或者资源未正确释放的情况。
解决办法:
确保 webClient 在每次请求结束后都能正确释放资源。可以考虑为每个请求创建独立的 webClient 实例。
4. 前端 EventSource 管理问题 - 可能
前端在切换容器时,EventSource 可能没有正确关闭或者重新创建。
解决办法:
确保在切换容器时,EventSource 被正确关闭并重新创建。你的前端代码已经有了关闭逻辑,不过可以添加一些调试日志来确认是否正常执行。
最终修改的代码如下:
前端js代码:
// 显示实时日志
$("#logModal").on('shown.bs.modal', function (event){
// 从触发模态框的元素中获取 containerId
const trigger = $(event.relatedTarget);
const itemStr = trigger.data('item');
if (!itemStr) {
return;
}
const modal = $(this);
let oldEventSource = modal.data('eventSource');
if (oldEventSource) {
oldEventSource.close();
oldEventSource = null;
modal.removeData('eventSource');
}
const item = JSON.parse(itemStr);
let eventSource = new EventSource(`http://localhost:9998/middle/logs?containerId=${item.containerId}&ip=${item.ip}`);
modal.data('eventSource', eventSource);
const logContent = $('#logContent');
logContent.empty(); // 清空日志
eventSource.onmessage = function (event){
const logLine = $('<div>').text(event.data)
logContent.append(logLine);
logContent[0].scrollTop = logContent[0].scrollHeight;
};
eventSource.onerror = function (error ) {
console.log('日志连接失败: ', error);
eventSource.close();
eventSource = null; // 确保引用被清除
logContent.text('日志获取失败,请检查容器常或网络连接')
}
})
// 监听模态框关闭逻辑
$("#logModal").on('hide.bs.modal', function (){
const modal = $(this);
let eventSource = modal.data('eventSource');
if (eventSource) {
eventSource.close(); // 关闭当前实例
eventSource = null;
modal.removeData('eventSource'); // 清除数据引用
}
$("#logContent").empty(); // 清空日志
})
后端中间件代码:
service:
public SseEmitter getDockerLogs(PublishServer publishServer) {
String url = taskagentConfig.getPrefixAddress(publishServer.getIp(), taskagentConfig.getGetDockerLogsUrl());
SseEmitter emitter = new SseEmitter();
executorService.submit(() -> {
try {
WebClient webClient = WebClient.builder().clientConnector(new ReactorClientHttpConnector(HttpClient.create())).build();
webClient.get().uri(url, publishServer.getContainerId())
.retrieve()
.bodyToFlux(String.class)
.subscribe(
log -> {
try {
emitter.send(SseEmitter.event().data(log));
} catch (IOException e) {
emitter.completeWithError(e);
}
},
emitter::completeWithError,
emitter::complete
);
} catch (Exception e) {
emitter.completeWithError(e);
}
});
return emitter;
}
后端服务代码:
// 使用原子应用存储进程对象,确保多线程环境下的可见性和原子性
private final AtomicReference<Process> processHolder = new AtomicReference<>();
// 使用原子布尔值控制日志读取循环,确保线程安全的状态变更
private final AtomicBoolean isRunning = new AtomicBoolean(true);
public SseEmitter getDockerLogs(PublishServerDto publishServerDto) {
SseEmitter emitter = new SseEmitter();
isRunning.set(true);
processHolder.set(null); // 清空进程引用
// 提交异步任务处理日志流
executorService.submit(() -> {
try {
// 1. 执行 docker logs 命令,获取容器实时日志
Process process = Runtime.getRuntime().exec("docker logs -f "+publishServerDto.getContainerId());
processHolder.set(process); // 保存进程引用到原子容器中
// 2. 启动独立线程读取日志输入流(避免主线程阻塞)
new Thread(() -> {
// 使用 try-with-resources 自动关闭流资源
try (BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()))){
String line;
// 循环读取日志,直到isRunning为false或读取到null(流结束)
while (isRunning.get() && (line = reader.readLine()) != null) {
// 通过 SSE 发送日志数据到taskweb
emitter.send(SseEmitter.event().data(line));
}
} catch (IOException e) {
// 读取流时发生异常(如管道关闭),终止进程
Process p = processHolder.get();
if (p != null) {
p.destroy(); // 尝试正常终止进程
log.error("taskagent - 读取流时发生异常", e.getMessage());
}
}
}).start();
// 3. 监听SSE连接的生命周期事件
// 当前端正常关闭连接时触发
emitter.onCompletion(() -> {
// 统一处理关闭逻辑
handleShutdown("正常关闭", processHolder.get());
});
// 当SSE连接发生错误时触发(如网络中断)
emitter.onError(ex -> {
handleShutdown("错误关闭: "+ex.getMessage(), processHolder.get());
});
// 当SSE连接超时时触发
emitter.onTimeout(() -> {
handleShutdown("超时关闭", processHolder.get());
});
} catch (IOException e) {
// 初始化进程时发生异常
isRunning.set(false);
Process p = processHolder.get();
if (p != null) {
p.destroy();
}
log.error("taskagent - 初始化 docker 进程失败:", e.getMessage());
emitter.completeWithError(e); // 通知taskweb连接失败
}
});
return emitter;
}
/**
* 统一处理进程关闭逻辑
* @param reason 关闭原因(用于日志输出)
* @param process 待关闭的进程
*/
private void handleShutdown(String reason, Process process) {
isRunning.set(false); // 停止日志读取循环
if (process != null) {
process.destroy(); // 发送中断信号(等效于Ctrl+C)
log.info("taskagent - " + reason + ", 已终止docker进程");
}
}
修改成上面之后,前端实时显示日志的效果就全部正常了。