using
public static <T,D> Flux<T> using(Callable<? extends D> resourceSupplier, Function<? super D,? extends Publisher<? extends T>> sourceSupplier, Consumer<? super D> resourceCleanup)
Uses a resource, generated by a supplier for each individual Subscriber, while streaming the values from a Publisher derived from the same resource and makes sure the resource is released if the sequence terminates or the Subscriber cancels.
使用一个资源,该资源由供应者为每个订阅者生成,同时从与该资源相关的 Publisher 中流式传输值,并确保在序列结束或订阅者取消时释放该资源。
Eager resource cleanup happens just before the source termination and exceptions raised by the cleanup Consumer may override the terminal event.
在源终止之前,资源会被提前清理。如果清理过程中 Consumer 抛出异常,这些异常可能会覆盖终止事件。
For an asynchronous version of the cleanup, with distinct path for onComplete, onError and cancel terminations, see usingWhen(Publisher, Function, Function, BiFunction, Function).
对于清理的异步版本,具有不同的处理路径用于 onComplete、onError 和取消终止,可以参见 usingWhen(Publisher, Function, Function, BiFunction, Function)。
Type Parameters:
T - emitted type
D - resource type
Parameters:
resourceSupplier - a Callable that is called on subscribe to generate the resource
sourceSupplier - a factory to derive a Publisher from the supplied resource
resourceCleanup - a resource cleanup callback invoked on completion
Returns:
a new Flux built around a disposable resource
See Also:
usingWhen(Publisher, Function, Function, BiFunction, Function), usingWhen(Publisher, Function, Function)
类型参数:
T - 发出的类型
D - 资源类型
参数:
resourceSupplier - 一个在订阅时调用的 Callable,用于生成资源
sourceSupplier - 从提供的资源派生 Publisher 的工厂
resourceCleanup - 在完成时调用的资源清理回调
返回:
一个围绕可处置资源构建的新 Flux
另请参见:
usingWhen(Publisher, Function, Function, BiFunction, Function)
usingWhen(Publisher, Function, Function)
using 是一个高级操作符,专门用于管理资源的使用和清理。
它可以确保当资源使用完毕后,执行相应的清理操作,这在处理文件、数据库连接等需要手动管理生命周期的资源时尤为有用。
1. 方法简介
java public static <T,D> Flux<T> using(Callable<? extends D> resourceSupplier, Function<? super D,? extends Publisher<? extends T>> sourceSupplier, Consumer<? super D> resourceCleanup)
- using 是一个 Flux 的工厂方法,创建一个基于资源的 Flux。它负责在资源开始使用时提供资源,并确保在流结束时进行资源的清理操作。
2. 参数说明
- resourceSupplier:用于提供资源的 Callable。当 Flux 被订阅时,它会调用该函数来生成资源。
- sourceSupplier:这是一个函数,它将资源传入并返回一个 Publisher。该 Publisher 将发出实际的数据流。
- resourceCleanup:当 Publisher 完成、取消或者发生错误时,调用此函数进行资源的清理。
3. 返回值
- Flux<T>:返回一个包含流数据的 Flux,并且会自动管理资源的生命周期(包括创建和清理)。
4. 使用场景
- 自动资源管理:当你处理像文件、网络连接、数据库连接等需要管理生命周期的资源时,using 可以帮助你在确保流数据处理完后,自动清理这些资源,防止资源泄露。
- 复杂资源流:如果数据流依赖于某个外部资源,比如从文件读取数据或者从数据库中获取数据,using 操作符可以确保这些资源的正确管理。
5. 示例代码
示例 1:文件读取并自动关闭资源
假设我们需要读取文件内容,using 操作符可以确保文件在读取完后自动关闭:
java import reactor.core.publisher.Flux; import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.util.concurrent.Callable; public class UsingExample { public static void main(String[] args) { Flux<String> fileFlux = Flux.using( // 提供资源:打开文件 (Callable<BufferedReader>) () -> new BufferedReader(new FileReader("example.txt")), // 使用资源:读取文件内容 reader -> Flux.fromStream(reader.lines()), // 清理资源:关闭文件 reader -> { try { reader.close(); } catch (IOException e) { e.printStackTrace(); } } ); // 订阅并打印文件内容 fileFlux.subscribe( line -> System.out.println("Line: " + line), error -> System.err.println("Error: " + error), () -> System.out.println("File reading completed") ); } }
输出:
Line: This is line 1 Line: This is line 2 Line: This is line 3 File reading completed
在这个示例中,using 操作符确保 BufferedReader 在文件读取完毕后能够自动关闭,即使读取过程中发生错误,也能够正常关闭资源。
6. 资源清理时机
- onComplete:当 Flux 正常结束时,调用 resourceCleanup。
- onError:当 Flux 抛出错误时,仍然会调用 resourceCleanup,这可以确保即使在异常情况下,资源也能够得到清理。
- onCancel:当 Flux 被取消时,也会调用 resourceCleanup。
7. 错误处理
当 resourceSupplier 或 sourceSupplier 抛出异常时,using 操作符会立即终止并调用 resourceCleanup,以确保资源的正确释放。
示例 2:资源抛出异常的处理
java Flux<String> flux = Flux.using( () -> { // 抛出异常的资源 throw new IOException("Failed to open resource"); }, resource -> Flux.just("Hello"), resource -> System.out.println("Resource closed") ); flux.subscribe( System.out::println, error -> System.err.println("Error: " + error) );
输出:
Error: java.io.IOException: Failed to open resource
在这个示例中,IOException 导致流无法创建,但 using 仍然能够保证资源的释放。
8. 注意事项
- 懒加载资源:resourceSupplier 只有在 Flux 被订阅时才会被调用,这意味着资源不会被过早创建。这一点在处理昂贵的资源(如数据库连接)时尤为重要。
- 多次订阅的行为:每次订阅时,都会重新调用 resourceSupplier 和 sourceSupplier,因此每个订阅都有各自独立的资源和数据流。
9. 总结
using 是一个用于管理资源生命周期的关键操作符,它确保在资源使用完后,能够自动进行清理,无论是流正常完成还是发生错误。通过 using,你可以避免许多常见的资源泄露问题,尤其是在处理文件、网络连接等需要手动管理的资源时。
这个操作符在流式编程中非常实用,因为它将资源管理和数据流控制整合在一起,确保资源的安全和高效使用。
using
public static <T,D> Flux<T> using(Callable<? extends D> resourceSupplier, Function<? super D,? extends Publisher<? extends T>> sourceSupplier, Consumer<? super D> resourceCleanup, boolean eager)
Uses a resource, generated by a supplier for each individual Subscriber, while streaming the values from a Publisher derived from the same resource and makes sure the resource is released if the sequence terminates or the Subscriber cancels.Eager resource cleanup happens just before the source termination and exceptions raised by the cleanup Consumer may override the terminal event.
为每个单独的订阅者使用由供应商生成的资源,同时从基于相同资源派生的 Publisher 中流式传输值,并确保在序列终止或订阅者取消时释放该资源。积极的资源清理发生在源终止之前,清理消费者引发的异常可能会覆盖终止事件。
Non-eager cleanup will drop any exception.
非积极的清理将会忽略任何异常。
For an asynchronous version of the cleanup, with distinct path for onComplete, onError and cancel terminations, see usingWhen(Publisher, Function, Function, BiFunction, Function).
有关异步版本的清理,其中对 onComplete、onError 和 cancel 终止有不同的处理路径,请参见 usingWhen(Publisher, Function, Function, BiFunction, Function)。
Type Parameters:
T - emitted type
D - resource type
Parameters:
resourceSupplier - a Callable that is called on subscribe to generate the resource
sourceSupplier - a factory to derive a Publisher from the supplied resource
resourceCleanup - a resource cleanup callback invoked on completion
eager - true to clean before terminating downstream subscribers
Returns:
a new Flux built around a disposable resource
See Also:
usingWhen(Publisher, Function, Function, BiFunction, Function), usingWhen(Publisher, Function, Function)
类型参数:
T - 发出的类型
D - 资源类型
参数:
resourceSupplier - 在订阅时调用的 Callable,用于生成资源
sourceSupplier - 从提供的资源派生 Publisher 的工厂
resourceCleanup - 在完成时调用的资源清理回调
eager - 为 true 时在终止下游订阅者之前进行清理
返回:
一个围绕可丢弃资源构建的新 Flux
另见:
usingWhen(Publisher, Function, Function, BiFunction, Function)
usingWhen(Publisher, Function, Function)
using 操作符的这一变体增加了一个新的参数 eager,用于控制资源的清理时机。
相比于之前的版本,这个参数决定了资源是在终止信号(onComplete 或 onError)发出前还是之后进行清理。
方法签名
java public static <T,D> Flux<T> using(Callable<? extends D> resourceSupplier, Function<? super D, ? extends Publisher<? extends T>> sourceSupplier, Consumer<? super D> resourceCleanup, boolean eager)
参数说明
- resourceSupplier: 提供资源的 Callable,该资源将在流被订阅时创建。
- sourceSupplier: 一个函数,它接受资源作为参数,并返回一个 Publisher,该 Publisher 将发出实际的数据流。
- resourceCleanup: 当流终止时调用的资源清理逻辑。无论是因为流完成、发生错误还是取消,都会执行这个清理逻辑。
- eager: 一个布尔值,表示资源清理的时机。如果设置为 true,则资源会在发出终止信号(onComplete 或 onError)之前清理;如果设置为 false,则资源会在终止信号发出之后才清理。
返回值
- Flux<T>: 返回一个包含流数据的 Flux,并会根据 eager 参数来确定何时清理资源。
资源清理时机(eager 参数的作用)
- eager == true: 当 eager 设置为 true 时,资源会在 onComplete 或 onError 信号发送之前进行清理。这意味着,在订阅者接收到终止信号之前,资源已经被释放了。
- eager == false: 当 eager 设置为 false 时,资源将在终止信号发送之后才被清理。订阅者将先接收到 onComplete 或 onError 信号,然后资源才会被清理。
使用场景
这个版本的 using 方法在某些特定场景下非常有用:
- 需要严格的资源管理时:当资源清理需要在终止信号之前完成时,eager 模式可以确保资源提前释放,以便避免资源占用或其他副作用。
- 松散管理资源时:如果资源清理的时机并不严格,或者希望资源在信号之后清理,非 eager 模式可以稍微延迟资源的释放。
例子
示例 1:eager 模式下的资源清理
java import reactor.core.publisher.Flux; import java.util.concurrent.Callable; public class UsingExample { public static void main(String[] args) { Flux<String> flux = Flux.using( // 提供资源 (Callable<String>) () -> "Resource", // 使用资源resource -> Flux.just("Using resource: " + resource), // 清理资源resource -> System.out.println("Cleaning resource: " + resource), // eager == true,清理将在终止信号前进行true ); flux.subscribe( System.out::println, error -> System.err.println("Error: " + error), () -> System.out.println("Completed") ); } }
输出:
Using resource: Resource Cleaning resource: Resource Completed
在这个例子中,资源清理操作在流发出终止信号之前执行,资源被提前释放。
示例 2:非 eager 模式的资源清理
java Flux<String> flux = Flux.using( () -> "Resource", resource -> Flux.just("Using resource: " + resource), resource -> System.out.println("Cleaning resource: " + resource), false // eager == false,清理将在终止信号发出后进行 ); flux.subscribe( System.out::println, error -> System.err.println("Error: " + error), () -> System.out.println("Completed") );
输出:
Using resource: Resource Completed Cleaning resource: Resource
在这个例子中,资源清理操作在流发出 onComplete 信号之后执行。
资源清理的时机差异
使用 eager 模式时,清理资源的时机会比流的终止信号更早。这对于需要在 onComplete 或 onError 信号发出前释放资源的情况非常重要,比如在高并发场景下,立即释放资源可以降低系统的资源占用。
而非 eager 模式更适合资源释放时机不那么关键的场景,延迟清理操作不会对系统带来负担。
总结
using 方法中的 eager 参数为我们提供了对资源管理更细粒度的控制。当需要在终止信号之前释放资源时,可以启用 eager 模式;否则,可以选择稍微延迟资源释放操作。根据不同的业务需求,可以灵活使用这两种模式来管理资源的生命周期。
using
public static <T,D extends AutoCloseable> Flux<T> using(Callable<? extends D> resourceSupplier, Function<? super D,? extends Publisher<? extends T>> sourceSupplier)
Uses an AutoCloseable resource, generated by a supplier for each individual Subscriber, while streaming the values from a Publisher derived from the same resource and makes sure the resource is released if the sequence terminates or the Subscriber cancels.
使用可自动关闭的资源,该资源由供应者为每个单独的订阅者生成,同时从派生自同一资源的 Publisher 中流式传输值,并确保在序列终止或订阅者取消时释放该资源。
Eager AutoCloseable resource cleanup happens just before the source termination and exceptions raised by the cleanup Consumer may override the terminal event.
在源终止之前,主动的 AutoCloseable 资源清理会发生,并且由清理消费者引发的异常可能会覆盖终止事件。
For an asynchronous version of the cleanup, with distinct path for onComplete, onError and cancel terminations, see usingWhen(Publisher, Function, Function, BiFunction, Function).
有关异步版本的清理,其中 onComplete、onError 和取消终止有不同的处理路径,请参见 usingWhen(Publisher, Function, Function, BiFunction, Function)。
Type Parameters:
T - emitted type
D - resource type
Parameters:
resourceSupplier - a Callable that is called on subscribe to generate the resource
sourceSupplier - a factory to derive a Publisher from the supplied resource
Returns:
a new Flux built around a disposable resource
See Also:
usingWhen(Publisher, Function, Function, BiFunction, Function), usingWhen(Publisher, Function, Function)
类型参数:
T - 发出的类型
D - 资源类型
参数:
resourceSupplier - 在订阅时调用的 Callable,用于生成资源
sourceSupplier - 用于从提供的资源派生 Publisher 的工厂
返回:
一个围绕可丢弃资源构建的新 Flux
参见:
usingWhen(Publisher, Function, Function, BiFunction, Function)
usingWhen(Publisher, Function, Function)
这个 using 方法的重载版本与前面类似,但它专门处理实现了 AutoCloseable 接口的资源(通常用于带有关闭行为的资源,如文件、数据库连接等)。
因为它依赖于 AutoCloseable,所以不需要显式提供 resourceCleanup 操作,资源的关闭由 AutoCloseable 的 close() 方法自动处理。
方法签名
java Copy code public static <T, D extends AutoCloseable> Flux<T> using( Callable<? extends D> resourceSupplier, Function<? super D, ? extends Publisher<? extends T>> sourceSupplier )
参数说明
- resourceSupplier: 提供资源的 Callable,该资源会在 Flux 被订阅时创建。该资源必须实现 AutoCloseable,以确保它具备自动关闭的能力。
- sourceSupplier: 一个函数,接受资源作为参数并返回一个 Publisher,该 Publisher 发出实际的数据流。
返回值
- Flux<T>: 返回一个包含流数据的 Flux,并会在流终止时自动关闭资源。
功能和原理
这个方法的核心功能是,利用 AutoCloseable 接口自动关闭资源,而不需要你显式地指定清理逻辑。它与普通的 using 方法类似,只是简化了资源清理的过程。
在流的生命周期结束后,无论是正常完成还是因为错误终止,using 都会调用资源的 close() 方法来释放资源。这使得它非常适合那些实现了 AutoCloseable 的资源,比如 InputStream、BufferedReader、Connection 等。
适用场景
- 自动管理资源的关闭: 当你使用的资源(如文件或数据库连接)需要在操作结束后关闭时,using 方法是个不错的选择。由于该资源实现了 AutoCloseable,你不需要显式处理关闭逻辑,using 方法会在适当的时机调用 close() 方法。
- 简化代码: 相较于显式提供 resourceCleanup 的版本,这个方法减少了一部分手动关闭资源的工作量,代码更加简洁。
示例
示例 1:使用 AutoCloseable 资源(如 BufferedReader)
java import reactor.core.publisher.Flux; import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.util.concurrent.Callable; public class UsingExample { public static void main(String[] args) { Callable<BufferedReader> resourceSupplier = () -> new BufferedReader(new FileReader("example.txt")); Flux<String> flux = Flux.using( resourceSupplier, reader -> Flux.fromStream(reader.lines()), true // eager, 确保资源在终止信号之前关闭 ); flux.subscribe( System.out::println, error -> System.err.println("Error: " + error), () -> System.out.println("Completed") ); } }
示例 2:处理数据库连接
java import reactor.core.publisher.Flux; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; import java.util.concurrent.Callable; public class UsingExample { public static void main(String[] args) { Callable<Connection> resourceSupplier = () -> DriverManager.getConnection("jdbc:example", "user", "password"); Flux<String> flux = Flux.using( resourceSupplier, connection -> Flux.just("Using connection: " + connection), true ); flux.subscribe( System.out::println, error -> System.err.println("Error: " + error), () -> System.out.println("Completed") ); } }
在这两个例子中,资源(BufferedReader 或 Connection)的关闭由 AutoCloseable 接口的 close() 方法处理,Flux 会在流终止时自动释放这些资源。
总结
这个 using 方法适合那些需要自动管理资源关闭的场景,特别是资源已经实现了 AutoCloseable 接口的情况。它通过简化资源清理的逻辑,使代码更加简洁,同时确保资源能够及时、安全地关闭。这种机制非常适合处理文件、数据库连接等需要严格管理生命周期的资源。
using
public static <T,D extends AutoCloseable> Flux<T> using(Callable<? extends D> resourceSupplier, Function<? super D,? extends Publisher<? extends T>> sourceSupplier, boolean eager)
Uses an AutoCloseable resource, generated by a supplier for each individual Subscriber, while streaming the values from a Publisher derived from the same resource and makes sure the resource is released if the sequence terminates or the Subscriber cancels.Eager AutoCloseable resource cleanup happens just before the source termination and exceptions raised by the cleanup Consumer may override the terminal event.
使用由供应商为每个独立的订阅者生成的可自动关闭资源,同时从派生自同一资源的 Publisher 中流式传输值,并确保在序列终止或订阅者取消时释放该资源。急切的可自动关闭资源清理在源终止之前发生,清理消费者引发的异常可能会覆盖终端事件。
Non-eager cleanup will drop any exception.
非急切清理将丢弃任何异常。
For an asynchronous version of the cleanup, with distinct path for onComplete, onError and cancel terminations, see usingWhen(Publisher, Function, Function, BiFunction, Function).
有关异步清理的版本,以及对 onComplete、onError 和取消终止的不同处理,请参见 usingWhen(Publisher, Function, Function, BiFunction, Function)。
Type Parameters:
T - emitted type
D - resource type
Parameters:
resourceSupplier - a Callable that is called on subscribe to generate the resource
sourceSupplier - a factory to derive a Publisher from the supplied resource
eager - true to clean before terminating downstream subscribers
Returns:
a new Flux built around a disposable resource
See Also:
usingWhen(Publisher, Function, Function, BiFunction, Function), usingWhen(Publisher, Function, Function)
类型参数:
T - 发出的类型
D - 资源类型
参数:
resourceSupplier - 一个 Callable,在订阅时调用以生成资源
sourceSupplier - 一个工厂,用于从提供的资源派生 Publisher
eager - 如果为 true,则在终止下游订阅者之前进行清理
返回:
一个围绕可处理资源构建的新 Flux
参见:
usingWhen(Publisher, Function, Function, BiFunction, Function)、usingWhen(Publisher, Function, Function)
这个 using 方法重载版本与之前类似,只是在资源关闭的行为上有一些细微的差别。
它允许通过 eager 参数来控制资源何时关闭。
方法签名
java public static <T, D extends AutoCloseable> Flux<T> using( Callable<? extends D> resourceSupplier, Function<? super D, ? extends Publisher<? extends T>> sourceSupplier, boolean eager )
参数说明
- resourceSupplier: 一个 Callable,用于提供资源,该资源必须实现 AutoCloseable 接口。Flux 在订阅时会调用这个 Callable 来获取资源。
- sourceSupplier: 接收资源作为参数的函数,返回一个 Publisher,该 Publisher 发出要处理的数据流。
- eager: 这是一个布尔值,表示是否“提前”关闭资源:
- 如果设置为 true,则资源会在 onComplete 或 onError 信号发出后立即关闭。
- 如果设置为 false,资源会在 onComplete 或 onError 信号处理完毕后才关闭。
返回值
- Flux<T>: 返回一个 Flux,当流终止时自动处理资源的关闭(依据 AutoCloseable 的 close() 方法)。
关键功能
这个重载版本的关键在于 eager 参数:
- eager = true:
- 资源会在流发送 onComplete 或 onError 信号时立即关闭。这意味着资源会在流终止之前就被清理。
- eager = false:
- 资源的关闭会延迟到 onComplete 或 onError 信号被完全处理后才进行。换句话说,资源的生命周期会更长一些。
适用场景
- eager = true:如果你希望资源在流终止时立即被释放,这种情况下适合使用 eager = true,比如你需要尽快释放一些昂贵的资源(如数据库连接或文件)。
- eager = false:在某些情况下,你可能希望处理完所有终止信号后再释放资源,这时可以使用 eager = false。例如,资源的关闭顺序对后续的操作没有太大影响,或者资源关闭的时机不那么紧迫。
示例
示例 1:eager = true 情况下的文件处理
java import reactor.core.publisher.Flux; import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.util.concurrent.Callable; public class UsingExample { public static void main(String[] args) { Callable<BufferedReader> resourceSupplier = () -> new BufferedReader(new FileReader("example.txt")); Flux<String> flux = Flux.using( resourceSupplier, reader -> Flux.fromStream(reader.lines()), true // eager, 在发送 onComplete/onError 时立即关闭资源 ); flux.subscribe( System.out::println, error -> System.err.println("Error: " + error), () -> System.out.println("Completed") ); } }
在这个例子中,BufferedReader 会在流终止时立即关闭,因为 eager = true。
示例 2:eager = false 情况下的数据库连接处理
java import reactor.core.publisher.Flux; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; import java.util.concurrent.Callable; public class UsingExample { public static void main(String[] args) { Callable<Connection> resourceSupplier = () -> DriverManager.getConnection("jdbc:example", "user", "password"); Flux<String> flux = Flux.using( resourceSupplier, connection -> Flux.just("Using connection: " + connection), false // non-eager, 在 onComplete/onError 处理完后再关闭资源 ); flux.subscribe( System.out::println, error -> System.err.println("Error: " + error), () -> System.out.println("Completed") ); } }
在这个例子中,Connection 会在处理完终止信号后再关闭,因为 eager = false。
总结
这个重载版本的 using 方法允许通过 eager 参数控制资源何时关闭。根据不同的场景,你可以选择资源是提前关闭还是延迟关闭。
- 如果资源非常宝贵,建议使用 eager = true 以尽快释放它。
- 如果资源的关闭顺序不那么敏感,可以选择 eager = false,这样资源会在所有信号处理完毕后再关闭。
合理使用 eager 参数可以优化资源管理,确保在不同场景下的最佳资源释放时机。
usingWhen
public static <T,D> Flux<T> usingWhen(Publisher<D> resourceSupplier, Function<? super D,? extends Publisher<? extends T>> resourceClosure, Function<? super D,? extends Publisher<?>> asyncCleanup)
Uses a resource, generated by a Publisher for each individual Subscriber, while streaming the values from a Publisher derived from the same resource. Whenever the resulting sequence terminates, a provided Function generates a "cleanup" Publisher that is invoked but doesn't change the content of the main sequence. Instead it just defers the termination (unless it errors, in which case the error suppresses the original termination signal).
使用由 Publisher 为每个独立的 Subscriber 生成的资源,同时从派生自相同资源的 Publisher 中流式传输值。每当结果序列终止时,提供的 Function 会生成一个“清理” Publisher,该 Publisher 会被调用,但不会改变主序列的内容。相反,它只是推迟了终止(除非它出错,在这种情况下,错误会抑制原始终止信号)。
Note that if the resource supplying Publisher emits more than one resource, the subsequent resources are dropped ( Operators.onNextDropped(Object, Context)). If the publisher errors AFTER having emitted one resource, the error is also silently dropped ( Operators.onErrorDropped(Throwable, Context)). An empty completion or error without at least one onNext signal triggers a short-circuit of the main sequence with the same terminal signal (no resource is established, no cleanup is invoked).
请注意,如果提供资源的 Publisher 发出多个资源,则后续资源将被丢弃(Operators.onNextDropped(Object, Context))。如果 Publisher 在发出一个资源后发生错误,该错误也会被静默丢弃(Operators.onErrorDropped(Throwable, Context))。如果没有至少一个 onNext 信号就发出了空的完成或错误,则主序列会以相同的终止信号短路(不建立资源,也不会调用清理)。
Type Parameters:
T - the type of elements emitted by the resource closure, and thus the main sequence
D - the type of the resource object
Parameters:
resourceSupplier - a Publisher that "generates" the resource, subscribed for each subscription to the main sequence
resourceClosure - a factory to derive a Publisher from the supplied resource
asyncCleanup - an asynchronous resource cleanup invoked when the resource closure terminates (with onComplete, onError or cancel)
Returns:
a new Flux built around a "transactional" resource, with asynchronous cleanup on all terminations (onComplete, onError, cancel)
类型参数:
T - 由资源闭包发出的元素类型,因此是主序列的类型
D - 资源对象的类型
参数:
resourceSupplier - 一个 Publisher,它为每个主序列的订阅生成资源
resourceClosure - 一个工厂,用于从提供的资源派生出一个 Publisher
asyncCleanup - 当资源闭包终止时(包括 onComplete、onError 或取消)调用的异步资源清理
返回:
一个新的 Flux,构建在“事务性”资源之上,并在所有终止事件(onComplete、onError、取消)时进行异步清理。
usingWhen 是 Reactor 中一个非常强大的资源管理工具,允许在响应式流中动态管理资源的生命周期,特别是那些需要异步清理的资源。
与 using 不同,usingWhen 提供了更灵活的异步资源管理方式,使得资源可以在非阻塞的情况下被安全关闭。
方法签名
java public static <T, D> Flux<T> usingWhen( Publisher<D> resourceSupplier, Function<? super D, ? extends Publisher<? extends T>> resourceClosure, Function<? super D, ? extends Publisher<?>> asyncCleanup )
参数说明
- resourceSupplier: 这是一个 Publisher<D>,用于提供所需的资源(类型为 D)。每次订阅时,Flux 会通过 resourceSupplier 提供的流来获取该资源。
- resourceClosure: 这是一个函数,接收资源 D 作为参数,并返回一个 Publisher<? extends T>,该 Publisher 发出所需的数据流。资源在该函数内被使用。
- asyncCleanup: 这是一个函数,负责清理资源。它接收资源 D 作为参数,返回一个 Publisher<?>,用来异步执行清理操作(例如关闭资源或其他清理工作)。
返回值
- Flux<T>: 返回一个 Flux,当资源使用完毕时,自动调用 asyncCleanup 来异步清理资源。这个清理过程是非阻塞的。
工作原理
- usingWhen 的核心思想是在 resourceSupplier 中动态创建资源,然后通过 resourceClosure 来使用资源,最后在资源使用完毕后,通过 asyncCleanup 异步地清理资源。
- 资源的关闭是异步的,由 asyncCleanup 控制。可以确保即使是耗时的资源清理,也不会阻塞主线程或响应式流。
适用场景
该方法非常适合以下场景:
- 数据库连接:在一个响应式操作中,动态打开数据库连接,完成操作后异步关闭连接。
- 文件处理:打开一个文件流,在流操作结束后,异步关闭文件句柄。
- 网络连接:处理网络资源时,在流结束时安全释放连接,避免阻塞主线程。
示例
示例 1:处理文件,并异步关闭文件流
java import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.nio.file.StandardOpenOption; public class UsingWhenExample { public static void main(String[] args) { Path path = Paths.get("example.txt"); // resourceSupplier:提供资源(打开文件) Mono<Path> resourceSupplier = Mono.just(path); // resourceClosure:使用资源(读取文件行) Function<Path, Flux<String>> resourceClosure = p -> { try { return Flux.fromStream(Files.lines(p)); } catch (Exception e) { return Flux.error(e); } }; // asyncCleanup:清理资源(关闭文件,无需关闭文件句柄,但这里可以模拟其他清理操作) Function<Path, Mono<Void>> asyncCleanup = p -> Mono.fromRunnable(() -> { System.out.println("Cleanup done for file: " + p); }); // 使用 usingWhen 管理资源 Flux<String> flux = Flux.usingWhen( resourceSupplier, resourceClosure, asyncCleanup ); flux.subscribe( System.out::println, error -> System.err.println("Error: " + error), () -> System.out.println("Completed") ); } }
这个例子展示了如何在响应式流中读取文件,并在流结束后通过 asyncCleanup 模拟异步清理操作。
示例 2:数据库连接管理
java import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; public class UsingWhenExample { public static void main(String[] args) { // resourceSupplier:异步获取数据库连接 Mono<Connection> resourceSupplier = Mono.fromCallable(() -> DriverManager.getConnection("jdbc:example", "user", "password")); // resourceClosure:使用数据库连接 Function<Connection, Flux<String>> resourceClosure = connection -> { // 模拟数据库操作 return Flux.just("Record 1", "Record 2"); }; // asyncCleanup:异步关闭数据库连接 Function<Connection, Mono<Void>> asyncCleanup = connection -> Mono.fromRunnable(() -> { try { connection.close(); System.out.println("Connection closed"); } catch (SQLException e) { e.printStackTrace(); } }); // 使用 usingWhen 管理数据库连接的异步生命周期 Flux<String> flux = Flux.usingWhen( resourceSupplier, resourceClosure, asyncCleanup ); flux.subscribe( System.out::println, error -> System.err.println("Error: " + error), () -> System.out.println("Completed") ); } }
在这个示例中,我们模拟了异步管理数据库连接。资源(Connection)会在流结束后被异步关闭,从而保证了非阻塞操作。
总结
usingWhen 提供了灵活的方式来异步管理资源生命周期。无论是数据库连接、文件句柄还是其他耗时的清理操作,都可以通过 usingWhen 进行安全、高效的管理。这在处理复杂的异步操作时非常有用,因为它允许我们将资源的获取、使用和清理完全纳入响应式流程中,同时避免阻塞操作。
usingWhen
public static <T,D> Flux<T> usingWhen(Publisher<D> resourceSupplier, Function<? super D,? extends Publisher<? extends T>> resourceClosure, Function<? super D,? extends Publisher<?>> asyncComplete, BiFunction<? super D,? super Throwable,? extends Publisher<?>> asyncError, Function<? super D,? extends Publisher<?>> asyncCancel)
Uses a resource, generated by a Publisher for each individual Subscriber, while streaming the values from a Publisher derived from the same resource. Note that all steps of the operator chain that would need the resource to be in an open stable state need to be described inside the resourceClosure Function.
使用由 Publisher 为每个单独的订阅者生成的资源,同时从派生自同一资源的 Publisher 中流式传输值。请注意,操作符链中所有需要资源处于打开稳定状态的步骤都需要在 resourceClosure 函数内描述。
Whenever the resulting sequence terminates, the relevant Function generates a "cleanup" Publisher that is invoked but doesn't change the content of the main sequence. Instead it just defers the termination (unless it errors, in which case the error suppresses the original termination signal).
每当结果序列终止时,相关的函数会生成一个“清理” Publisher,该 Publisher 会被调用,但不会更改主序列的内容。相反,它只是推迟终止(除非发生错误,在这种情况下错误会抑制原始的终止信号)。
Individual cleanups can also be associated with main sequence cancellation and error terminations:
个别的清理操作也可以与主序列的取消和错误终止关联:
Note that if the resource supplying Publisher emits more than one resource, the subsequent resources are dropped ( Operators.onNextDropped(Object, Context)). If the publisher errors AFTER having emitted one resource, the error is also silently dropped ( Operators.onErrorDropped(Throwable, Context)). An empty completion or error without at least one onNext signal triggers a short-circuit of the main sequence with the same terminal signal (no resource is established, no cleanup is invoked).
请注意,如果提供资源的 Publisher 发出多个资源,则后续的资源会被丢弃(Operators.onNextDropped(Object, Context))。如果该 Publisher 在发出一个资源后发生错误,则错误也会被静默丢弃(Operators.onErrorDropped(Throwable, Context))。如果在没有至少一个 onNext 信号的情况下发生空的完成或错误,会触发主序列的短路,使用相同的终止信号(不建立资源,不执行清理操作)。
Additionally, the terminal signal is replaced by any error that might have happened in the terminating Publisher:
此外,终止信号会被终止 Publisher 中可能发生的任何错误替代。
Finally, early cancellations will cancel the resource supplying Publisher:
最后,提前取消会取消资源提供 Publisher。
Type Parameters:
T - the type of elements emitted by the resource closure, and thus the main sequence
D - the type of the resource object
Parameters:
resourceSupplier - a Publisher that "generates" the resource, subscribed for each subscription to the main sequence
resourceClosure - a factory to derive a Publisher from the supplied resource
asyncComplete - an asynchronous resource cleanup invoked if the resource closure terminates with onComplete
asyncError - an asynchronous resource cleanup invoked if the resource closure terminates with onError. The terminating error is provided to the BiFunction
asyncCancel - an asynchronous resource cleanup invoked if the resource closure is cancelled. When null, the asyncComplete path is used instead.
Returns:
a new Flux built around a "transactional" resource, with several termination path triggering asynchronous cleanup sequences
See Also:
usingWhen(Publisher, Function, Function)
类型参数:
T - 由资源闭包发出的元素类型,因此也是主序列的类型。
D - 资源对象的类型。
参数:
resourceSupplier - 一个 Publisher,用于“生成”资源,针对每个主序列的订阅进行订阅。
resourceClosure - 用于从提供的资源派生 Publisher 的工厂。
asyncComplete - 如果资源闭包以 onComplete 终止,则调用的异步资源清理。
asyncError - 如果资源闭包以 onError 终止,则调用的异步资源清理。终止错误将提供给 BiFunction。
asyncCancel - 如果资源闭包被取消,则调用的异步资源清理。如果为 null,则使用 asyncComplete 路径。
返回:
一个新的 Flux,围绕一个“事务性”资源构建,具有多个终止路径触发异步清理序列。
另请参见:
usingWhen(Publisher, Function, Function)。
usingWhen 方法的重载版本添加了对资源处理过程中可能发生的异常、任务取消等事件的处理机制,使得在响应式流中更灵活地控制资源的释放。
这种高级管理方式非常适合需要处理资源释放时的异常、任务取消等场景。
方法签名
java public static <T, D> Flux<T> usingWhen( Publisher<D> resourceSupplier, Function<? super D, ? extends Publisher<? extends T>> resourceClosure, Function<? super D, ? extends Publisher<?>> asyncComplete, BiFunction<? super D, ? super Throwable, ? extends Publisher<?>> asyncError, Function<? super D, ? extends Publisher<?>> asyncCancel )
参数说明
- resourceSupplier:提供资源的 Publisher,每次订阅时动态创建资源。
- resourceClosure:使用资源的函数,返回一个发出数据的 Publisher,基于资源执行操作。
- asyncComplete:当资源正常使用完毕后执行的异步清理操作(如关闭连接、文件句柄等),返回一个 Publisher<?>。
- asyncError:处理发生异常时的资源清理逻辑,接收资源和异常,返回一个 Publisher<?> 进行异步错误清理。
- asyncCancel:处理任务取消时的资源清理逻辑,返回一个 Publisher<?>,异步处理资源清理。
返回值
- Flux<T>:返回一个 Flux,可以动态处理资源生命周期,包括正常结束、错误、取消的情况,并根据相应的处理逻辑异步释放资源。
工作原理
- 资源供应:resourceSupplier 提供资源,并在 resourceClosure 中使用。
- 资源的异步释放:根据流的结束状态,触发不同的资源清理逻辑:
- asyncComplete:流正常完成时执行。
- asyncError:流发生异常时执行,处理异常并清理资源。
- asyncCancel:流被取消时执行,异步释放资源。
适用场景
此方法非常适合复杂的资源管理场景,尤其是需要处理各种资源释放情形(如正常结束、异常或取消)的场景。适用于数据库连接、网络连接、文件操作等场景中需要高度定制化的清理策略。
示例
示例 1:处理数据库连接,包括错误与取消场景
java import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; import java.util.function.BiFunction; import java.util.function.Function; public class UsingWhenExample { public static void main(String[] args) { // resourceSupplier:提供数据库连接 Mono<Connection> resourceSupplier = Mono.fromCallable(() -> DriverManager.getConnection("jdbc:example", "user", "password")); // resourceClosure:使用数据库连接执行操作 Function<Connection, Flux<String>> resourceClosure = connection -> { return Flux.just("Data 1", "Data 2"); }; // asyncComplete:正常完成时关闭连接 Function<Connection, Mono<Void>> asyncComplete = connection -> Mono.fromRunnable(() -> { try { connection.close(); System.out.println("Connection closed on complete"); } catch (SQLException e) { e.printStackTrace(); } }); // asyncError:出错时关闭连接 BiFunction<Connection, Throwable, Mono<Void>> asyncError = (connection, error) -> Mono.fromRunnable(() -> { try { connection.close(); System.out.println("Connection closed on error: " + error.getMessage()); } catch (SQLException e) { e.printStackTrace(); } }); // asyncCancel:任务取消时关闭连接 Function<Connection, Mono<Void>> asyncCancel = connection -> Mono.fromRunnable(() -> { try { connection.close(); System.out.println("Connection closed on cancel"); } catch (SQLException e) { e.printStackTrace(); } }); // 使用 usingWhen 管理数据库连接的整个生命周期 Flux<String> flux = Flux.usingWhen( resourceSupplier, resourceClosure, asyncComplete, asyncError, asyncCancel ); flux.subscribe( System.out::println, error -> System.err.println("Error: " + error), () -> System.out.println("Completed") ); } }
示例 2:处理文件,包括取消和错误场景
java import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.nio.file.StandardOpenOption; import java.util.function.BiFunction; import java.util.function.Function; public class UsingWhenFileExample { public static void main(String[] args) { Path path = Paths.get("example.txt"); // resourceSupplier:打开文件资源 Mono<Path> resourceSupplier = Mono.just(path); // resourceClosure:使用文件进行操作 Function<Path, Flux<String>> resourceClosure = p -> { try { return Flux.fromStream(Files.lines(p)); } catch (Exception e) { return Flux.error(e); } }; // asyncComplete:文件读取完成后的异步关闭操作 Function<Path, Mono<Void>> asyncComplete = p -> Mono.fromRunnable(() -> { System.out.println("File closed on complete: " + p); }); // asyncError:文件读取出错时的异步关闭操作 BiFunction<Path, Throwable, Mono<Void>> asyncError = (p, e) -> Mono.fromRunnable(() -> { System.out.println("File closed on error: " + p + " with error: " + e.getMessage()); }); // asyncCancel:任务取消时的异步关闭操作 Function<Path, Mono<Void>> asyncCancel = p -> Mono.fromRunnable(() -> { System.out.println("File closed on cancel: " + p); }); // 使用 usingWhen 管理文件资源的生命周期 Flux<String> flux = Flux.usingWhen( resourceSupplier, resourceClosure, asyncComplete, asyncError, asyncCancel ); flux.subscribe( System.out::println, error -> System.err.println("Error: " + error), () -> System.out.println("Completed") ); } }
总结
- usingWhen 是一个高度灵活的工具,允许在响应式流中以异步的方式动态管理资源。
- 支持对资源正常完成、发生错误或任务取消时进行不同的资源清理策略。
- 适合复杂的场景,例如数据库、文件、网络连接等的异步管理。