更丝滑的并发编程模式
如果说之前的 JDK17你还觉得没必要折腾,那 JDK21确实有必要关注一下了。因为 JDK21 引入了一种新型的并发编程模式。
当前 Java 中的多线程并发编程绝对是另我们都非常头疼的一部分,感觉就是学起来难啃,用起来难用。但是转头看看使用其他语言的朋友们,根本就没有这个烦恼嘛,比如 GoLang,感觉人家用起来就很丝滑呢。
JDK21 中就在这方面做了很大的改进,让Java并发编程变得更简单一点,更丝滑一点。确切的说,在 JDK19或JDK20中就有这些改进了。
那具体是什么呢?让我们来具体来看一下。下面是JDK21的 Feature。
其中Virtual Threads、Scoped Values、Structured Concurrency就是针对多线程并发编程的几个功能。我们今天也主要来说一下他们。
虚拟线程(Virtual Threads)
虚拟线程是基于协程的线程,它们与其他语言中的协程具有相似之处,但也存在一些不同之处。
虚拟线程是依附于主线程的,如果主线程销毁了,那虚拟线程也不复存在。
相同之处:
- 虚拟线程和协程都是轻量级的线程,它们的创建和销毁的开销都比传统的操作系统线程要小。
- 虚拟线程和协程都可以通过暂停和恢复来实现线程之间的切换,从而避免了线程上下文切换的开销。
- 虚拟线程和协程都可以使用异步和非阻塞的方式来处理任务,提高应用程序的性能和响应速度。
不同之处:
- 虚拟线程是在 JVM 层面实现的,而协程则是在语言层面实现的。因此,虚拟线程的实现可以与任何支持 JVM 的语言一起使用,而协程的实现则需要特定的编程语言支持。
- 虚拟线程是一种基于线程的协程实现,因此它们可以使用线程相关的 API,如 ThreadLocal、Lock 和 Semaphore。而协程则不依赖于线程,通常需要使用特定的异步编程框架和 API。
- 虚拟线程的调度是由 JVM 管理的,而协程的调度是由编程语言或异步编程框架管理的。因此,虚拟线程可以更好地与其他线程进行协作,而协程则更适合处理异步任务。
总的来说,虚拟线程是一种新的线程类型,它可以提高应用程序的性能和资源利用率,同时也可以使用传统线程相关的 API。虚拟线程与协程有很多相似之处,但也存在一些不同之处。
虚拟线程确实可以让多线程编程变得更简单和更高效。相比于传统的操作系统线程,虚拟线程的创建和销毁的开销更小,线程上下文切换的开销也更小,因此可以大大减少多线程编程中的资源消耗和性能瓶颈。
使用虚拟线程,开发者可以像编写传统的线程代码一样编写代码,而无需担心线程的数量和调度,因为 JVM 会自动管理虚拟线程的数量和调度。此外,虚拟线程还支持传统线程相关的 API,如 ThreadLocal、Lock 和 Semaphore,这使得开发者可以更轻松地迁移传统线程代码到虚拟线程。
虚拟线程的引入,使得多线程编程变得更加高效、简单和安全,使得开发者能够更加专注于业务逻辑,而不必过多地关注底层的线程管理。
结构化并发(Structured Concurrency)
结构化并发是一种编程范式,旨在通过提供结构化和易于遵循的方法来简化并发编程。使用结构化并发,开发人员可以创建更容易理解和调试的并发代码,并且不容易出现竞争条件和其他与并发有关的错误。在结构化并发中,所有并发代码都被结构化为称为任务的定义良好的工作单元。任务以结构化方式创建、执行和完成,任务的执行总是保证在其父任务完成之前完成。
Structured Concurrency(结构化并发)可以让多线程编程更加简单和可靠。在传统的多线程编程中,线程的启动、执行和结束是由开发者手动管理的,因此容易出现线程泄露、死锁和异常处理不当等问题。
使用结构化并发,开发者可以更加自然地组织并发任务,使得任务之间的依赖关系更加清晰,代码逻辑更加简洁。结构化并发还提供了一些异常处理机制,可以更好地管理并发任务中的异常,避免因为异常而导致程序崩溃或数据不一致的情况。
除此之外,结构化并发还可以通过限制并发任务的数量和优先级,防止资源竞争和饥饿等问题的发生。这些特性使得开发者能够更加方便地实现高效、可靠的并发程序,而无需过多关注底层的线程管理。
作用域值(Scoped Values)
作用域值是JDK 20中的一项功能,允许开发人员创建作用域限定的值,这些值限定于特定的线程或任务。作用域值类似于线程本地变量,但是设计为与虚拟线程和结构化并发配合使用。它们允许开发人员以结构化的方式在任务和虚拟线程之间传递值,无需复杂的同步或锁定机制。作用域值可用于在应用程序的不同部分之间传递上下文信息,例如用户身份验证或请求特定数据。
试验一下
进行下面的探索之前,你要下载至少 JDK19或者直接下载 JDK20,JDK 20 目前(截止到2023年9月份)是正式发布的最高版本,如果你用 JDK 19的话,没办法体验到Scoped Values的功能。
或者是直接下载 JDK 21 的 Early-Access Builds(早期访问版本)。在这个地址下载 「jdk.java.net/21/」,下载对应的版…
如果你用的是 IDEA ,那你的IDEA 版本最起码是2022.3 这个版本或者之后的,否则不支持这么新的 JDK 版本。
如果你用的是 JDK19或者 JDK20的话,要在你的项目设置中将 language level设置为19或20的 Preview 级别,否则编译的时候会提示你无法使用预览版的功能,虚拟线程就是预览版的功能。
如果你用的是 JDK21的话,将 language level 设置为 X -Experimental Features,另外,因为 JDK21不属于正式版本,所以需要到 IDEA 的设置中(注意是 IDEA 的设置,不是项目的设置了),将这个项目的 Target bytecode version手动修改为21,目前可选的最高就是20,也就是JDK20。设置为21之后,就可以使用 JDK21中的这些功能了。
虚拟线程的例子
我们现在启动线程是怎么做的呢?
先声明一个线程类,implements 自 Runnable,并实现 run方法。
public class SimpleThread implements Runnable{
@Override
public void run() {
System.out.println("当前线程名称:" + Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
然后就可以使用这个线程类,然后启动线程了。
Thread thread = new Thread(new SimpleThread());
thread.start();
中规中矩,没毛病。
有了虚拟线程之后呢,怎么实现呢?
Thread.ofPlatform().name("thread-test").start(new SimpleThread());
下面是几种使用虚拟线程的方式。
1、直接启动一个虚拟线程
Thread thread = Thread.startVirtualThread(new SimpleThread());
2、使用 ofVirtual(),builder 方式启动虚拟线程,可以设置线程名称、优先级、异常处理等配置
Thread.ofVirtual()
.name("thread-test")
.start(new SimpleThread());
//或者
Thread thread = Thread.ofVirtual()
.name("thread-test")
.uncaughtExceptionHandler((t, e) -> {
System.out.println(t.getName() + e.getMessage());
})
.unstarted(new SimpleThread());
thread.start();
3、使用 Factory 创建线程
ThreadFactory factory = Thread.ofVirtual().factory();
Thread thread = factory.newThread(new SimpleThread());
thread.setName("thread-test");
thread.start();
4、使用 Executors 方式
ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor();
Future<?> submit = executorService.submit(new SimpleThread());
Object o = submit.get();
结构化编程的例子
想一下下面这个场景,假设你有三个任务要同时进行,只要任意一个任务执行完成并返回结果了,那就可以直接用这个结果了,其他的两个任务就可以停止了。比如说一个天气服务,通过三个渠道获取天气情况,只要有一个渠道返回就可以了。
这种场景下, 在 Java 8 下应该怎么做呢,当然也可以了。
// 执行任务并返回 Future 对象列表
List<Future<String>> futures = executor.invokeAll(tasks);
// 等待任一任务完成并获取结果
String result = executor.invokeAny(tasks);
使用 ExecutorService的invokeAll和invokeAny实现,但是会有一些额外的工作,在拿到第一个结果后,要手动关闭另外的线程。
而 JDK21中呢,可以用结构化编程实现。
ShutdownOnSuccess捕获第一个结果并关闭任务范围以中断未完成的线程并唤醒调用线程。 适用于任意子任务的结果都可以直接使用,并且无需等待其他未完成任务的结果的情况。 它定义了获取第一个结果或在所有子任务失败时抛出异常的方法
public static void main(String[] args) throws IOException {
try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
Future<String> res1 = scope.fork(() -> runTask(1));
Future<String> res2 = scope.fork(() -> runTask(2));
Future<String> res3 = scope.fork(() -> runTask(3));
scope.join();
System.out.println("scope:" + scope.result());
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
}
public static String runTask(int i) throws InterruptedException {
Thread.sleep(1000);
long l = new Random().nextLong();
String s = String.valueOf(l);
System.out.println("第" + i + "个任务:" + s);
return s;
}
ShutdownOnFailure
执行多个任务,只要有一个失败(出现异常或其他主动抛出异常情况),就停止其他未执行完的任务,使用scope.throwIfFailed捕捉并抛出异常。 如果所有任务均正常,则使用 Feture.get() 或*Feture.resultNow() 获取结果
public static void main(String[] args) throws IOException {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
Future<String> res1 = scope.fork(() -> runTaskWithException(1));
Future<String> res2 = scope.fork(() -> runTaskWithException(2));
Future<String> res3 = scope.fork(() -> runTaskWithException(3));
scope.join();
scope.throwIfFailed(Exception::new);
String s = res1.resultNow(); //或 res1.get()
System.out.println(s);
String result = Stream.of(res1, res2,res3)
.map(Future::resultNow)
.collect(Collectors.joining());
System.out.println("直接结果:" + result);
} catch (Exception e) {
e.printStackTrace();
//throw new RuntimeException(e);
}
}
// 有一定几率发生异常
public static String runTaskWithException(int i) throws InterruptedException {
Thread.sleep(1000);
long l = new Random().nextLong(3);
if (l == 0) {
throw new InterruptedException();
}
String s = String.valueOf(l);
System.out.println("第" + i + "个任务:" + s);
return s;
}
Scoped Values 的例子
我们肯定都用过 ThreadLocal,它是线程本地变量,只要这个线程没销毁,可以随时获取 ThredLocal 中的变量值。Scoped Values 也可以在线程内部随时获取变量,只不过它有个作用域的概念,超出作用域就会销毁。
public class ScopedValueExample {
final static ScopedValue<String> LoginUser = ScopedValue.newInstance();
public static void main(String[] args) throws InterruptedException {
ScopedValue.where(LoginUser, "张三")
.run(() -> {
new Service().login();
});
Thread.sleep(2000);
}
static class Service {
void login(){
System.out.println("当前登录用户是:" + LoginUser.get());
}
}
}
上面的例子模拟一个用户登录的过程,使用 ScopedValue.newInstance()声明了一个 ScopedValue,用 ScopedValue.where给 ScopedValue设置值,并且使用 run 方法执行接下来要做的事儿,这样一来,ScopedValue就在 run() 的内部随时可获取了,在run方法中,模拟调用了一个service 的login方法,不用传递LoginUser这个参数,就可以直接通过LoginUser.get方法获取当前登录用户的值了。