使用 SSE + WebFlux 推送日志信息到前端

news2025/7/14 13:57:24

为什么使用 SSE 而不使用 WebSocket, 请看 SEE 对比 Websocket 的优缺点。

特性SSEWebSocket
通信方向单向(服务器→客户端)双向(全双工)
协议基于 HTTP独立协议(需 ws:// 前缀)
兼容性现代浏览器(IE 不支持)广泛支持
复杂度简单(服务器只需返回流数据)较复杂(需处理握手、帧协议等)
适用场景实时推送(日志、通知、新闻)双向交互(聊天、实时协作)

我的架构如下:

前端
后端中间件
后端服务

[前端] → 打开模态框 → 发起SSE连接 → [后端中间件] → 转发请求 → [后端服务] → 查询日志信息
← 实时日志推送 ← (SSE流) ← 捕获进程输出流 ← 返回实时日志流

前端html代码:

前端代码使用 bootstrap + jquery

<a href='#' data-bs-toggle='modal' data-bs-target='#logModal'>日志</a>

<!-- Modal -->
<div class="modal fade" id="logModal" tabindex="-1" aria-labelledby="logModalLabel" aria-hidden="true">
    <div class="modal-dialog modal-fullscreen">
        <div class="modal-content">
            <div class="modal-header">
                <h1 class="modal-title fs-5" id="logModalLabel">Modal title</h1>
                <button type="button" class="btn-close" data-bs-dismiss="modal" aria-label="Close"></button>
            </div>
            <div class="modal-body" id="logContent">
                ...
            </div>
            <div class="modal-footer">
                <button type="button" class="btn btn-secondary" data-bs-dismiss="modal">Close</button>
                <button type="button" class="btn btn-primary">Save changes</button>
            </div>
        </div>
    </div>
</div>

前端js代码:

$("#logModal").on('shown.bs.modal', function (){
    const eventSource = new EventSource("http://localhost:9998/middle/logs");
    const logContent = $('#logContent');
    eventSource.onmessage = function (event){
        const logLine = $('<div>').text(event.data)
        logContent.append(logLine);
        logContent[0].scrollTop = logContent[0].scrollHeight; // 自动滚动到底部
    };
    eventSource.onerror = function (error ) {
        console.log('日志连接失败: ', error);
        eventSource.close();
        logContent.text('日志获取失败,请检查容器常或网络连接')
    }
    $("#logModal").on('hide.bs.modal', function (){
        eventSource.close();
        logContent.empty(); // 清空日志
    })
})

后端中间件需要引入 webflux 的依赖:

<dependency>
    	<groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

后端中间件代码:

controller:

@GetMapping(value = "/middle/logs", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter getDockerLogs(){
    return publishServerService.getDockerLogs();
}

service:

private final WebClient webClient = WebClient.builder().clientConnector(new ReactorClientHttpConnector(HttpClient.create())).build();
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
    
public SseEmitter getDockerLogs() {
     String url = "http://127.0.0.1:9999/service/logs";
     SseEmitter emitter = new SseEmitter();
     executorService.submit(() -> {
         try {
             webClient.get().uri(url)
                     .retrieve()
                     .bodyToFlux(String.class)
                     .subscribe(
                             log -> {
                                 try {
                                     emitter.send(SseEmitter.event().data(log)); // 发送消息到前端
                                 } catch (IOException e) {
                                     emitter.completeWithError(e);
                                 }
                             },
                             error -> emitter.completeWithError(error),
                             emitter::complete
                     );
         } catch (Exception e) {
             emitter.completeWithError(e);
         }
     });
     return emitter;
}

后端服务代码:

controller:

    @GetMapping(value = "/service/logs", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public SseEmitter getDockerLogs() {
        return publishServerService.getDockerLogs();
    }

service:

private final ExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
    
public SseEmitter getDockerLogs() {
    SseEmitter emitter = new SseEmitter();
    executorService.submit(() -> {
        try {
            int i = 0;
            while (i < 100000000) {
                try {
                    emitter.send(SseEmitter.event().data(i)); // 发送消息到后端中间件
                    Thread.sleep(1000);
                    i++;
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
            emitter.complete();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    });
    return emitter;
}

提示:你可以把 i 换成真实的应用程序日志,如下:

public SseEmitter getDockerLogs() {
    SseEmitteremitter = new SseEmitter();
    executorService.submit(() -> {
        try {
            Process process = Runtime.getRuntime().exec("docker logs -f nginx");
            InputStreamReader inputStreamReader = new InputStreamReader(process.getInputStream());
            BufferedReader reader = new BufferedReader(inputStreamReader);
            String line;
            while ((line = reader.readLine()) != null) {
                emitter.send(SseEmitter.event().data(line));
            }
            emitter.complete();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    });
    return emitter;
}

最终前端显示效果如下:

在这里插入图片描述

到此前端就可以实时的获取后端日志在页面中显示了。

但是你会发现后端控制台会时不时报错,错误信息如下:

2025-04-24T11:01:12.488Z  WARN 26616 --- [nio-9990-exec-2] .w.s.m.s.DefaultHandlerExceptionResolver : Ignoring exception, response committed. : org.springframework.web.context.request.async.AsyncRequestTimeoutException
2025-04-24T11:01:12.489Z  WARN 26616 --- [nio-9990-exec-2] .w.s.m.s.DefaultHandlerExceptionResolver : Resolved [org.springframework.web.context.request.async.AsyncRequestTimeoutException]







org.springframework.web.context.request.async.AsyncRequestTimeoutException
	at org.springframework.web.context.request.async.TimeoutDeferredResultProcessingInterceptor.handleTimeout(TimeoutDeferredResultProcessingInterceptor.java:42)
	at org.springframework.web.context.request.async.DeferredResultInterceptorChain.triggerAfterTimeout(DeferredResultInterceptorChain.java:81)
	at org.springframework.web.context.request.async.WebAsyncManager.lambda$startDeferredResultProcessing$5(WebAsyncManager.java:442)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
	at org.springframework.web.context.request.async.StandardServletAsyncWebRequest.onTimeout(StandardServletAsyncWebRequest.java:154)
	at org.apache.catalina.core.AsyncListenerWrapper.fireOnTimeout(AsyncListenerWrapper.java:44)
	at org.apache.catalina.core.AsyncContextImpl.timeout(AsyncContextImpl.java:135)
	at org.apache.catalina.connector.CoyoteAdapter.asyncDispatch(CoyoteAdapter.java:135)
	at org.apache.coyote.AbstractProcessor.dispatch(AbstractProcessor.java:243)
	at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:57)
	at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:896)
	at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1744)
	at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:52)
	at org.apache.tomcat.util.threads.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1191)
	at org.apache.tomcat.util.threads.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:659)
	at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)

javaweb 报错如下:
2025-04-24T11:02:41.538Z  WARN 26653 --- [nio-9991-exec-1] .w.s.m.s.DefaultHandlerExceptionResolver : Ignoring exception, response committed. : org.springframework.web.context.request.async.AsyncRequestTimeoutException
2025-04-24T11:02:41.538Z  WARN 26653 --- [nio-9991-exec-1] .w.s.m.s.DefaultHandlerExceptionResolver : Resolved [org.springframework.web.context.request.async.AsyncRequestTimeoutException]
2025-04-24T11:02:42.480Z ERROR 26653 --- [or-http-epoll-2] reactor.core.publisher.Operators         : Operator called default onErrorDropped

java.lang.IllegalStateException: The response object has been recycled and is no longer associated with this facade
	at org.apache.catalina.connector.ResponseFacade.checkFacade(ResponseFacade.java:478) ~[tomcat-embed-core-10.1.16.jar!/:na]
	at org.apache.catalina.connector.ResponseFacade.isFinished(ResponseFacade.java:154) ~[tomcat-embed-core-10.1.16.jar!/:na]
	at org.apache.catalina.connector.ResponseFacade.flushBuffer(ResponseFacade.java:240) ~[tomcat-embed-core-10.1.16.jar!/:na]
	at org.springframework.http.server.ServletServerHttpResponse.flush(ServletServerHttpResponse.java:104) ~[spring-web-6.0.14.jar!/:6.0.14]
	at org.springframework.http.server.DelegatingServerHttpResponse.flush(DelegatingServerHttpResponse.java:61) ~[spring-web-6.0.14.jar!/:6.0.14]
	at org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitterReturnValueHandler$HttpMessageConvertingHandler.complete(ResponseBodyEmitterReturnValueHandler.java:231) ~[spring-webmvc-6.0.14.jar!/:6.0.14]
	at org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter.complete(ResponseBodyEmitter.java:266) ~[spring-webmvc-6.0.14.jar!/:6.0.14]
	at reactor.core.publisher.LambdaSubscriber.onComplete(LambdaSubscriber.java:132) ~[reactor-core-3.5.12.jar!/:3.5.12]
	at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onComplete(MonoFlatMapMany.java:260) ~[reactor-core-3.5.12.jar!/:3.5.12]
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2205) ~[reactor-core-3.5.12.jar!/:3.5.12]
	at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onComplete(FluxOnAssembly.java:549) ~[reactor-core-3.5.12.jar!/:3.5.12]
	at reactor.core.publisher.FluxConcatMapNoPrefetch$FluxConcatMapNoPrefetchSubscriber.onComplete(FluxConcatMapNoPrefetch.java:240) ~[reactor-core-3.5.12.jar!/:3.5.12]
	at reactor.core.publisher.DrainUtils.postComplete(DrainUtils.java:183) ~[reactor-core-3.5.12.jar!/:3.5.12]
	at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.onComplete(FluxBufferPredicate.java:356) ~[reactor-core-3.5.12.jar!/:3.5.12]
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableConditionalSubscriber.onComplete(FluxPeekFuseable.java:595) ~[reactor-core-3.5.12.jar!/:3.5.12]
	at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onComplete(FluxMapFuseable.java:350) ~[reactor-core-3.5.12.jar!/:3.5.12]
	at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onComplete(FluxContextWrite.java:126) ~[reactor-core-3.5.12.jar!/:3.5.12]
	at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onComplete(FluxDoFinally.java:128) ~[reactor-core-3.5.12.jar!/:3.5.12]
	at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:230) ~[reactor-core-3.5.12.jar!/:3.5.12]
	at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drainAsync(FluxFlattenIterable.java:371) ~[reactor-core-3.5.12.jar!/:3.5.12]
	at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drain(FluxFlattenIterable.java:724) ~[reactor-core-3.5.12.jar!/:3.5.12]
	at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.onComplete(FluxFlattenIterable.java:273) ~[reactor-core-3.5.12.jar!/:3.5.12]
	at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144) ~[reactor-core-3.5.12.jar!/:3.5.12]
	at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260) ~[reactor-core-3.5.12.jar!/:3.5.12]
	at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144) ~[reactor-core-3.5.12.jar!/:3.5.12]
	at reactor.netty.channel.FluxReceive.terminateReceiver(FluxReceive.java:483) ~[reactor-netty-core-1.1.13.jar!/:1.1.13]
	at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:275) ~[reactor-netty-core-1.1.13.jar!/:1.1.13]
	at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:419) ~[reactor-netty-core-1.1.13.jar!/:1.1.13]
	at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:446) ~[reactor-netty-core-1.1.13.jar!/:1.1.13]
	at reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:500) ~[reactor-netty-core-1.1.13.jar!/:1.1.13]
	at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:768) ~[reactor-netty-http-1.1.13.jar!/:1.1.13]
	at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:114) ~[reactor-netty-core-1.1.13.jar!/:1.1.13]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.101.Final.jar!/:4.1.101.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.101.Final.jar!/:4.1.101.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.101.Final.jar!/:4.1.101.Final]
	at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436) ~[netty-transport-4.1.101.Final.jar!/:4.1.101.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346) ~[netty-codec-4.1.101.Final.jar!/:4.1.101.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318) ~[netty-codec-4.1.101.Final.jar!/:4.1.101.Final]
	at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251) ~[netty-transport-4.1.101.Final.jar!/:4.1.101.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[netty-transport-4.1.101.Final.jar!/:4.1.101.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.101.Final.jar!/:4.1.101.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.101.Final.jar!/:4.1.101.Final]
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.101.Final.jar!/:4.1.101.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[netty-transport-4.1.101.Final.jar!/:4.1.101.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.101.Final.jar!/:4.1.101.Final]
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.101.Final.jar!/:4.1.101.Final]
	at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:800) ~[netty-transport-classes-epoll-4.1.101.Final.jar!/:4.1.101.Final]
	at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:509) ~[netty-transport-classes-epoll-4.1.101.Final.jar!/:4.1.101.Final]
	at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:407) ~[netty-transport-classes-epoll-4.1.101.Final.jar!/:4.1.101.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[netty-common-4.1.101.Final.jar!/:4.1.101.Final]
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.101.Final.jar!/:4.1.101.Final]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.101.Final.jar!/:4.1.101.Final]
	at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]

这是因为 SpringMVC 默认设置了 30 秒超时时间,你需要修改的长一点,具体多少由你自己定义。后端中间件和后端服务都需要配置。

错误分析

1. AsyncRequestTimeoutException

此错误表明异步请求超时。在 Spring 里,当一个异步请求在规定的时间内没有完成时,就会抛出该异常。默认情况下,Spring 的异步请求超时时间是 30 秒。在 SSE 场景中,因为要保持长连接以实现实时数据推送,所以很容易超出这个时间限制。

2. IllegalStateException

该错误显示响应对象已被回收,不再和当前的请求关联。这通常是在异步请求超时后,响应对象被关闭或者回收,而代码还尝试对其进行操作时发生的。

解决办法

1. 增加异步请求超时时间

你可以通过配置 WebMvcConfigurer 来增加异步请求的超时时间,避免因超时引发异常。

在 后端中间件 和 后端服务 中添加如下配置类:

import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.AsyncSupportConfigurer;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;

@Configuration
class LogWebConfig implements WebMvcConfigurer {

    @Override
    public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
        // 设置异步请求超时时间为 3600 秒(1 小时)
        configurer.setDefaultTimeout(3600 * 1000); 
    }
}

扩展:

我通过前端传递参数(容器id)给后端执行,我发现了一个奇怪的问题。

我第一次打开模态框,可以正常的实时显示日志。但是我点击 close 的时候,这段代码确实执行了。

$("#logModal").on('hide.bs.modal', function (){
    eventSource.close();
    logContent.empty(); // 清空日志
})

但是我再次打开模态框,发现没有日志显示了,我甚至刷新页面,然后再次打开模态框依然没有日志显示,只有我把后端服务重启之后,再次打开模态框,才有日志显示,这是为什么?

这个问题可能是由于EventSource 实例未正确释放或重复创建导致的资源冲突。以下是具体分析和解决方案:

问题原因分析

1. EventSource 作用域问题:

在当前代码中,eventSource变量是在shown.bs.modal事件处理函数的闭包中创建的。当模态框关闭时,虽然通过hide.bs.modal事件调用了eventSource.close(),但闭包中的变量可能未被正确释放,导致再次打开模态框时,旧的eventSource实例残留或引用错误。

2. 事件绑定重复问题:

每次打开模态框时,shown.bs.modal事件处理函数会重复绑定hide.bs.modal事件,导致多个处理函数共存,可能混淆不同实例的eventSource。

3. 后端连接未正确断开:

若后端未正确处理EventSource的断开(如未关闭响应流),可能导致端口或连接资源被占用,后续请求无法建立新连接。

解决方案:

使用模态框数据存储 EventSource 实例

修改代码,将eventSource实例存储在模态框元素的data属性中,确保每次关闭时正确释放当前实例:

// 修改后的模态框显示事件处理函数
$("#logModal").on('shown.bs.modal', function (event) {
    const trigger = $(event.relatedTarget);
    const itemStr = trigger.data('item');
    if (!itemStr) return;

    const item = JSON.parse(itemStr);
    const modal = $(this); // 获取当前模态框实例
    
    // 先关闭可能存在的旧连接
    const oldEventSource = modal.data('eventSource');
    if (oldEventSource) {
        oldEventSource.close();
        modal.removeData('eventSource');
    }

    // 创建新的EventSource并存储到模态框数据中
    const eventSource = new EventSource(`http://localhost:9998/middle/logs?containerId=${item.containerId}&ip=${item.ip}`);
    modal.data('eventSource', eventSource);

    const logContent = $('#logContent');
    logContent.empty();

    eventSource.onmessage = function (event) {
        const logLine = $('<div>').text(event.data);
        logContent.append(logLine);
        logContent[0].scrollTop = logContent[0].scrollHeight;
    };

    eventSource.onerror = function (error) {
        console.log('日志连接失败: ', error);
        eventSource.close();
        logContent.text('日志获取失败,请检查容器状态或网络连接');
        modal.removeData('eventSource'); // 清除错误状态的实例
    };
});

// 修改后的模态框隐藏事件处理函数
$("#logModal").on('hide.bs.modal', function () {
    const modal = $(this);
    const eventSource = modal.data('eventSource');
    if (eventSource) {
        eventSource.close(); // 关闭当前实例
        modal.removeData('eventSource'); // 清除数据引用
    }
    $('#logContent').empty(); // 清空日志
});

其实不是上面这个问题,经过再次排查,我发现了问题所在,当我的后端服务的代码这样写的时候,就会出现前面说的问题。

public SseEmitter getDockerLogs(PublishServerDto publishServerDto) {
        SseEmitter emitter = new SseEmitter();
        executorService.submit(() -> {
            try {
                Process process = Runtime.getRuntime().exec("docker logs -f " + publishServerDto.getContainerId());
                InputStreamReader inputStreamReader = new InputStreamReader(process.getInputStream());
                BufferedReader reader = new BufferedReader(inputStreamReader);
                String line;
                while ((line = reader.readLine()) != null) {
                    emitter.send(SseEmitter.event().data(line));
                }
                emitter.complete();
            } catch (IOException e) {
                emitter.completeWithError(e);
            }
        });
        return emitter;
    }

当我的后端服务的代码修改成这样的时候。

public SseEmitter getDockerLogs(PublishServerDto publishServerDto) {
        SseEmitter emitter = new SseEmitter();
        executorService.submit(() -> {
            try {
                int i = 0;
                while (i < 100000000) {
                    try {
                        emitter.send(SseEmitter.event().data(i)); // 发送消息到后端中间件
                        Thread.sleep(1000);
                        i++;
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
                emitter.complete();
            } catch (IOException e) {
                emitter.completeWithError(e);
            }
        });
        return emitter;
    }

前端关闭再打开模态框也能正常显示数据。为什么?

这是因为 docker logs -f 命令会持续阻塞直到进程被终止,而我的后端代码在 EventSource 关闭时未正确终止正在运行的 Process 对象,导致资源占用和连接冲突。

docker logs -f 命令导致线程阻塞,无法响应 SseEmitter 的关闭事件。当前端关闭 EventSource 时,后端的 reader.readLine() 处于阻塞状态,无法感知 SseEmitter 的异常或完成信号,导致 finally 块和异常处理代码无法执行。

另外,

以下是详细分析和解决方案:

问题根源:未终止后台进程

1. docker logs -f 的阻塞特性

  • docker logs -f 会持续读取容器日志并阻塞当前线程,直到容器停止或命令被中断(如按下 Ctrl+C)。

  • 当前端关闭 EventSource 时,后端的 SseEmitter 会触发 completeWithError,但 Process 对象(docker logs 进程)仍在后台运行,其输入流被占用,导致:

    • 再次打开模态框时,新的 Process 无法创建(端口 / 资源被占用)。
    • 旧的 Process 残留数据可能干扰新连接。

2. 模拟数据与真实命令的差异

模拟数据场景: 模拟数据的循环中使用了 Thread.sleep(1000),该方法会响应线程中断(InterruptedException)。当 SseEmitter 关闭时,会抛出异常并中断线程,而 docker logs -f 的阻塞式读取无法响应中断,导致清理逻辑失效。

真实命令场景: docker logs -f 是外部进程,不受 Java 线程控制,EventSource 关闭时未终止该进程,导致资源泄漏。

3. 阻塞式读取的局限性

BufferedReader.readLine() 是阻塞式方法,当 docker logs -f 没有新日志时,线程会一直阻塞在此处,无法处理 SseEmitter 的关闭事件(如 emitter.completeWithError() 或客户端断开连接引发的异常)。可以使用非阻塞式读取或中断机制 解决这个问题。

解决方案:

分离读取逻辑

将日志读取放到独立线程中,避免主线程被 docker logs -f 阻塞,确保能响应 SseEmitter 的关闭事件。

监听 SseEmitter 事件

使用 emitter.onCompletion() 和 emitter.onError() 回调,在连接关闭或出错时执行清理逻辑。

修改后的代码:

service :

// 使用原子引用存储进程对象,确保多线程环境下的可见性和原子性
    private final AtomicReference<Process> processHolder = new AtomicReference<>();
    // 使用原子布尔控制日志读取循环,确保线程安全的状态变更
    private final AtomicBoolean isRunning = new AtomicBoolean(true);

    public SseEmitter getDockerLogs(PublishServerDto publishServerDto) {
        SseEmitter emitter = new SseEmitter();
        
        // 提交异步任务处理日志流
        executorService.submit(() -> {
            try {
                // 1. 执行docker logs命令,获取容器实时日志
                String containerId = publishServerDto.getContainerId();
                Process process = Runtime.getRuntime().exec("docker logs -f " + containerId);
                processHolder.set(process); // 保存进程引用到原子容器中

                // 2. 启动独立线程读取日志输入流(避免主线程阻塞)
                new Thread(() -> {
                    try ( // 使用try-with-resources自动关闭流
                        BufferedReader reader = new BufferedReader(
                            new InputStreamReader(process.getInputStream())
                        )
                    ) {
                        String line;
                        // 循环读取日志,直到isRunning为false或读取到null(流结束)
                        while (isRunning.get() && (line = reader.readLine()) != null) {
                            // 通过SSE发送日志数据到前端
                            emitter.send(SseEmitter.event().data(line));
                        }
                    } catch (IOException e) {
                        // 读取流时发生异常(如管道关闭),终止进程
                        Process p = processHolder.get();
                        if (p != null) {
                            p.destroy(); // 尝试正常终止进程
                            System.out.println("taskagent - 读取日志流异常: " + e.getMessage());
                        }
                    }
                }).start();

                // 3. 监听SSE连接的生命周期事件
                // 当前端正常关闭连接时触发(如关闭模态框)
                emitter.onCompletion(() -> {
                    handleShutdown("正常关闭", processHolder.get()); // 统一处理关闭逻辑
                });

                // 当SSE连接发生错误时触发(如网络中断)
                emitter.onError(ex -> {
                    handleShutdown("错误关闭: " + ex.getMessage(), processHolder.get());
                });

                // 当SSE连接超时时触发(需配置超时时间,默认30秒)
                emitter.onTimeout(() -> {
                    handleShutdown("超时关闭", processHolder.get());
                });

            } catch (Exception e) {
                // 初始化进程时发生异常(如命令格式错误)
                isRunning.set(false);
                Process p = processHolder.get();
                if (p != null) p.destroy();
                System.out.println("taskagent - 初始化docker进程失败: " + e.getMessage());
                emitter.completeWithError(e); // 通知前端连接失败
            }
        });

        return emitter;
    }

    /**
     * 统一处理进程关闭逻辑
     * @param reason 关闭原因(用于日志输出)
     * @param process 待关闭的进程
     */
    private void handleShutdown(String reason, Process process) {
        isRunning.set(false); // 停止日志读取循环
        if (process != null) {
            process.destroy(); // 发送中断信号(等效于Ctrl+C)
            System.out.println("taskagent - " + reason + ", 已终止docker进程"); // 关键日志输出点
        }
    }

现在我发现另一个问题。我的页面有很多容器,首先我打开nginx容器的日志,模态框被打开,实时日志能正常显示在页面上,当我关闭模态框,然后再次打开 nginx的日志,模态框被打开,依然能正常显示实时日志。 关键问题来了,然后我关闭模态框,接着打开 redis 的容器日志,日志没有显示在页面上了。

问题分析:

1. isRunning 标志的重置问题 - 可能

在 handleShutdown 方法里,isRunning 被设置为 false,不过在开启新的日志流时,没有将其重置为 true。这会让后续的日志读取循环无法正常运行。

解决办法:

在开启新的日志流之前,把 isRunning 重置为 true。

2. processHolder 未正确更新 - 可能

当切换容器查看日志时,processHolder 可能还保存着之前的 Process 对象,这会对新的日志读取产生影响。

解决办法:
在开启新的日志流之前,确保 processHolder 被清空。

3. webClient 连接问题(taskweb 端) - 可能

taskweb 端的 webClient 在处理多个连接时,可能会出现连接复用或者资源未正确释放的情况。

解决办法:

确保 webClient 在每次请求结束后都能正确释放资源。可以考虑为每个请求创建独立的 webClient 实例。

4. 前端 EventSource 管理问题 - 可能

前端在切换容器时,EventSource 可能没有正确关闭或者重新创建。

解决办法:

确保在切换容器时,EventSource 被正确关闭并重新创建。你的前端代码已经有了关闭逻辑,不过可以添加一些调试日志来确认是否正常执行。

最终修改的代码如下:

前端js代码:

// 显示实时日志
    $("#logModal").on('shown.bs.modal', function (event){
        // 从触发模态框的元素中获取 containerId
        const trigger = $(event.relatedTarget);
        const itemStr = trigger.data('item');
        if (!itemStr) {
            return;
        }

        const modal = $(this);
        let oldEventSource = modal.data('eventSource');
        if (oldEventSource) {
            oldEventSource.close();
            oldEventSource = null;
            modal.removeData('eventSource');
        }

        const item = JSON.parse(itemStr);
        let eventSource = new EventSource(`http://localhost:9998/middle/logs?containerId=${item.containerId}&ip=${item.ip}`);
        modal.data('eventSource', eventSource);

        const logContent = $('#logContent');
        logContent.empty(); // 清空日志
        eventSource.onmessage = function (event){
            const logLine = $('<div>').text(event.data)
            logContent.append(logLine);
            logContent[0].scrollTop = logContent[0].scrollHeight;
        };
        eventSource.onerror = function (error ) {
            console.log('日志连接失败: ', error);
            eventSource.close();
            eventSource = null; // 确保引用被清除
            logContent.text('日志获取失败,请检查容器常或网络连接')
        }
    })

    // 监听模态框关闭逻辑
    $("#logModal").on('hide.bs.modal', function (){
        const modal = $(this);
        let eventSource = modal.data('eventSource');
        if (eventSource) {
            eventSource.close(); // 关闭当前实例
            eventSource = null;
            modal.removeData('eventSource');  // 清除数据引用
        }
        $("#logContent").empty(); // 清空日志
    })

后端中间件代码:

service:

public SseEmitter getDockerLogs(PublishServer publishServer) {
        String url = taskagentConfig.getPrefixAddress(publishServer.getIp(), taskagentConfig.getGetDockerLogsUrl());
        SseEmitter emitter = new SseEmitter();
        executorService.submit(() -> {
            try {
                WebClient webClient = WebClient.builder().clientConnector(new ReactorClientHttpConnector(HttpClient.create())).build();
                webClient.get().uri(url, publishServer.getContainerId())
                        .retrieve()
                        .bodyToFlux(String.class)
                        .subscribe(
                                log -> {
                                    try {
                                        emitter.send(SseEmitter.event().data(log));
                                    } catch (IOException e) {
                                        emitter.completeWithError(e);
                                    }
                                },
                                emitter::completeWithError,
                                emitter::complete
                        );
            } catch (Exception e) {
                emitter.completeWithError(e);
            }
        });
        return emitter;
    }

后端服务代码:

// 使用原子应用存储进程对象,确保多线程环境下的可见性和原子性
    private final AtomicReference<Process> processHolder = new AtomicReference<>();
    // 使用原子布尔值控制日志读取循环,确保线程安全的状态变更
    private final AtomicBoolean isRunning = new AtomicBoolean(true);

    public SseEmitter getDockerLogs(PublishServerDto publishServerDto) {
        SseEmitter emitter = new SseEmitter();
        isRunning.set(true);
        processHolder.set(null); // 清空进程引用

        // 提交异步任务处理日志流
        executorService.submit(() -> {
            try {
                // 1. 执行 docker logs 命令,获取容器实时日志
                Process process = Runtime.getRuntime().exec("docker logs -f "+publishServerDto.getContainerId());
                processHolder.set(process); // 保存进程引用到原子容器中

                // 2. 启动独立线程读取日志输入流(避免主线程阻塞)
                new Thread(() -> {
                    // 使用 try-with-resources 自动关闭流资源
                    try (BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()))){
                        String line;
                        // 循环读取日志,直到isRunning为false或读取到null(流结束)
                        while (isRunning.get() && (line = reader.readLine()) != null) {
                            // 通过 SSE 发送日志数据到taskweb
                            emitter.send(SseEmitter.event().data(line));
                        }
                    } catch (IOException e) {
                        // 读取流时发生异常(如管道关闭),终止进程
                        Process p = processHolder.get();
                        if (p != null) {
                            p.destroy(); // 尝试正常终止进程
                            log.error("taskagent - 读取流时发生异常", e.getMessage());
                        }
                    }
                }).start();

                // 3. 监听SSE连接的生命周期事件
                // 当前端正常关闭连接时触发
                emitter.onCompletion(() -> {
                    // 统一处理关闭逻辑
                    handleShutdown("正常关闭", processHolder.get());
                });

                // 当SSE连接发生错误时触发(如网络中断)
                emitter.onError(ex -> {
                    handleShutdown("错误关闭: "+ex.getMessage(), processHolder.get());
                });

                // 当SSE连接超时时触发
                emitter.onTimeout(() -> {
                    handleShutdown("超时关闭", processHolder.get());
                });
            } catch (IOException e) {
                // 初始化进程时发生异常
                isRunning.set(false);
                Process p = processHolder.get();
                if (p != null) {
                    p.destroy();
                }
                log.error("taskagent - 初始化 docker 进程失败:", e.getMessage());
                emitter.completeWithError(e); // 通知taskweb连接失败
            }
        });
        return emitter;
    }

    /**
     * 统一处理进程关闭逻辑
     * @param reason 关闭原因(用于日志输出)
     * @param process 待关闭的进程
     */
    private void handleShutdown(String reason, Process process) {
        isRunning.set(false); // 停止日志读取循环
        if (process != null) {
            process.destroy(); // 发送中断信号(等效于Ctrl+C)
            log.info("taskagent - " + reason + ", 已终止docker进程");
        }
    }

修改成上面之后,前端实时显示日志的效果就全部正常了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2343150.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

二叉树的遍历(广度优先搜索)

二叉树的第二种遍历方式&#xff0c;层序遍历&#xff0c;本质是运用队列对二叉树进行搜索。 层序遍历是指将二叉树的每一层按顺序遍历&#xff0c;通过队列实现就是先将根节点push入队&#xff0c;统计此时的队列中的元素数量size&#xff0c;将size元素全部pop出去&#xff0…

2025年计算机视觉与智能通信国际会议(ICCVIC 2025)

2025 International Conference on Computer Vision and Intelligent Communication 一、大会信息 会议简称&#xff1a;ICCVIC 2025 大会地点&#xff1a;中国杭州 收录检索&#xff1a;提交Ei Compendex,CPCI,CNKI,Google Scholar等 二、会议简介 2025年计算机视觉与智能通…

手工收集统计信息

有时想对某些表收集统计信息 CREATE OR REPLACE PROCEDURE GATHER_STATS ASDECLAREV_SQL1 VARCHAR(1000);--表游标CURSOR C1 ISSELECT (SELECT USER) AS TABLE_OWNER,TABLE_NAMEFROM USER_TABLES; --可以在这里加过滤条件--索引游标CURSOR C2 ISSELECT TABLE_OWNER,INDEX_NAM…

flume整合Kafka和spark-streaming核心编程

flume整合Kafka 需求1&#xff1a;利用flume监控某目录中新生成的文件&#xff0c;将监控到的变更数据发送给kafka&#xff0c;kafka将收到的数据打印到控制台&#xff1a; 1.查看topic 2.编辑flume-Kafka.conf&#xff0c;并启动flume 3.启动Kafka消费者 4.新增测试数据 5.查…

EDI 如何与 ERP,CRM,WMS等系统集成

在数字化浪潮下&#xff0c;与制造供应链相关产业正加速向智能化供应链转型。传统人工处理订单、库存和物流的方式已难以满足下单客户对响应速度和数据准确性的严苛要求。EDI技术作为企业间数据交换的核心枢纽&#xff0c;其与ERP、CRM、WMS等业务系统的深度集成&#xff0c;成…

面试踩过的坑

1、 “”和equals 的区别 “”是运算符&#xff0c;如果是基本数据类型&#xff0c;则比较存储的值&#xff1b;如果是引用数据类型&#xff0c;则比较所指向对象的地址值。equals是Object的方法&#xff0c;比较的是所指向的对象的地址值&#xff0c;一般情况下&#xff0c;重…

多物理场耦合低温等离子体装置求解器PASSKEy2

文章目录 PASSKEy2简介PASSKEY2计算流程PASSKEy2 中求解的物理方程电路模型等离子体模型燃烧模型 PASSKEy2的使用 PASSKEy2简介 PASSKEy2 是在 PASSKEy1 的基础上重新编写的等离子体数值模拟程序。 相较于 PASSKEy1&#xff0c; PASSKEy2 在具备解决低温等离子体模拟问题的能力…

视频噪点多,如何去除画面噪点?

你是否遇到过这样的困扰&#xff1f;辛辛苦苦拍摄的视频&#xff0c;导出后却满屏 “雪花”&#xff0c;夜景变 “噪点盛宴”&#xff0c;低光环境秒变 “马赛克现场”&#xff1f; 无论是日常拍摄的vlog、珍贵的家庭录像&#xff0c;还是专业制作的影视作品&#xff0c;噪点问…

09前端项目----分页功能

分页功能 分页器的优点实现分页功能自定义分页器先实现静态分页器调试分页器动态数据/交互 Element UI组件 分页器的优点 电商平台同时展示的数据很多&#xff0c;所以采用分页功能实现分页功能 Element UI已经有封装好的组件&#xff0c;但是也要掌握原理&#xff0c;以及自定…

第十二届蓝桥杯 2021 C/C++组 直线

目录 题目&#xff1a; 题目描述&#xff1a; 题目链接&#xff1a; 思路&#xff1a; 核心思路&#xff1a; 两点确定一条直线&#xff1a; 思路详解&#xff1a; 代码&#xff1a; 第一种方式代码详解&#xff1a; 第二种方式代码详解&#xff1a; 题目&#xff1a;…

《Piper》皮克斯技术解析:RIS系统与云渲染如何创造奥斯卡级动画短片

本文由专业专栏作家 Mike Seymour 撰写&#xff0c;内容包含非常有价值的行业资讯。 译者注 《Piper》是皮克斯动画工作室的一部技术突破性的短片&#xff0c;讲述了一只小鸟在海滩上寻找食物并面对自然挑战的故事。它不仅凭借其精美的视觉效果和细腻的情感表达赢得了2017年奥…

Java在excel中导出动态曲线图DEMO

1、环境 JDK8 POI 5.2.3 Springboot2.7 2、DEMO pom <dependency><groupId>org.apache.poi</groupId><artifactId>poi-ooxml</artifactId><version>5.2.3</version></dependency><dependency><groupId>commons…

Kotlin Multiplatform--02:项目结构进阶

Kotlin Multiplatform--02&#xff1a;项目结构进阶 引言正文 引言 在上一章中&#xff0c;我们对 Kotlin Multiplatform 项目有了基本的了解&#xff0c;已经可以进行开发了。但我们只是使用了系统默认的项目结构。本章介绍了如何进行更复杂的项目结构管理。 正文 在上一章中&…

代码随想录算法训练营第五十八天 | 1.拓扑排序精讲 2.dijkstra(朴素版)精讲 卡码网117.网站构建 卡码网47.参加科学大会

1.拓扑排序精讲 题目链接&#xff1a;117. 软件构建 文章讲解&#xff1a;代码随想录 思路&#xff1a; 把有向无环图进行线性排序的算法都可以叫做拓扑排序。 实现拓扑排序的算法有两种&#xff1a;卡恩算法&#xff08;BFS&#xff09;和DFS&#xff0c;以下BFS的实现思…

linux ptrace 图文详解(七) gdb、strace跟踪系统调用

目录 一、gdb/strace 跟踪程序系统调用 二、实现原理 三、代码实现 四、总结 &#xff08;代码&#xff1a;linux 6.3.1&#xff0c;架构&#xff1a;arm64&#xff09; One look is worth a thousand words. —— Tess Flanders 相关链接&#xff1a; linux ptrace 图…

【前端】ES6 引入的异步编程解决方案Promise 详解

Promise 详解 1. 基本概念 定义&#xff1a;Promise 是 ES6 引入的异步编程解决方案&#xff0c;表示一个异步操作的最终完成&#xff08;或失败&#xff09;及其结果值。核心作用&#xff1a;替代回调函数&#xff0c;解决“回调地狱”问题&#xff0c;提供更清晰的异步流程控…

const(C++)

打印出来的结果是 a是12 *p是200 const修饰指针 const修饰引用

python21-循环小作业

课程&#xff1a;B站大学 记录python学习&#xff0c;直到学会基本的爬虫&#xff0c;使用python搭建接口自动化测试就算学会了&#xff0c;在进阶webui自动化&#xff0c;app自动化 循环语句小作业 for-in作业斐波那契 for 固定数值计算素数字符统计数字序列range 函数 水仙花…

小白电路设计-设计11-恒功率充电电路设计

介绍 作为电子信息工程的我&#xff0c;电路学习是一定要学习的&#xff0c;可惜目前作为EMC测试工程师&#xff0c;无法兼顾太多&#xff0c;索性不如直接将所学的知识进行运用&#xff0c;并且也可以作为契机&#xff0c;进行我本人的个人提升。祝大家与我一起进行提升。1.本…

Spring AI 快速入门:从环境搭建到核心组件集成

Spring AI 快速入门&#xff1a;从环境搭建到核心组件集成 一、前言&#xff1a;Java开发者的AI开发捷径 对于Java生态的开发者来说&#xff0c;将人工智能技术融入企业级应用往往面临技术栈割裂、依赖管理复杂、多模型适配困难等挑战。Spring AI的出现彻底改变了这一局面——…