目录
原子操作类
实现原子性原理
保证原子性的方法
Fork/Join框架
分而治之
工作窃取算法
Fork/Join框架的设计
示例
原子操作类
线程A和线程B同时更新变量i进行操作i+1,最后的结果可能i不等于3而是等于2。这是线程不安全的更新操作,一般我们会使用Synchronized解决,但Java提供了更轻量级的选择——原子操作类:一种用法简单、性能高效、线程安全地更新一个变量的方法。
JUC下Atomic包一共提供了13个类,属于4中类型的原子更新方式,分别是原子更新基本类型、原子更新数组、原子更新引用和原子更新属性:
-
AtomicBoolean: 用于原子性地操作布尔值。
-
AtomicInteger: 用于原子性地操作整数值。
-
AtomicLong: 用于原子性地操作长整数值。
-
AtomicReference: 用于原子性地操作引用类型。
-
AtomicReferenceArray: 用于原子性地操作引用类型的数组。
-
AtomicMarkableReference: 用于同时操作引用类型和布尔标志的原子引用。
-
AtomicStampedReference: 用于同时操作引用类型和整数标志的原子引用。
-
AtomicIntegerArray: 用于原子性地操作整数数组。
-
AtomicLongArray: 用于原子性地操作长整数数组。
-
AtomicReferenceFieldUpdater: 用于原子性地更新指定类中的字段。
-
AtomicIntegerFieldUpdater: 用于原子性地更新指定类中的整数字段。
-
AtomicLongFieldUpdater: 用于原子性地更新指定类中的长整数字段。
-
AtomicAdder: Java 8 引入的类,用于原子性地执行加法操作。
实现原子性原理
一句话:通过Unsafe类使用CAS实现。
// AtomicInteger的添加方法 public final int getAndIncrement() { return unsafe.getAndAddInt(this, valueOffset, 1); }
public final int getAndAddInt(Object var1, long var2, int var4) { int var5; do { var5 = this.getIntVolatile(var1, var2); } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4)); return var5; }
CompareAndSwapInt是一个本地方法,基于CAS操作int类型变量。其他的原子操作基本都是大同小异。
保证原子性的方法
-
使用原子操作类,如AtomicInteger实现i++原子操作
-
使用JUC下的锁,如ReentrantLock,对i++操作加锁lock.loc()实现原子操作
-
使用Synchronized,对i++操作加锁
Fork/Join框架
Fork/Join 框架是 Java 并发编程中的一个重要工具,用于并行处理任务,特别适用于分治算法。其中有两个关键概念:分而治之和工作窃取。
分而治之
将大任务划分为小任务,然后并行地执行这些小任务,最后将它们的结果合并。
工作窃取算法
在执行小任务的过程中,线程可以从其他线程的任务队列中窃取任务,从而保持线程的利用率。
-
优点:充分利用线程进行并行计算,减少了线程间的竞争。
-
缺点::在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并 且该算法会消耗了更多的系统资源,比如创建多个线程和多个双端队列。
Fork/Join框架的设计
-
分割任务。
首先我们需要有一个fork类来把大任务分割成子任务,有可能子任务还是很大,所以还需要不停地分割,直到分割出的子任务足够小。
-
执行任务并合并结果
分割的子任务分别放在双端队列里,然后几个启动线程分别从双端队列里获取任务执行。子任务执行完的结果都统一放在一个队列里,启动一个线程 从队列里拿数据,然后合并这些数据。
Fork/Join使用两个类来完成以上两件事情:
-
ForkJoinTask:我们要使用ForkJoin框架,必须首先创建一个ForkJoin任务。它提供在任务中执行fork()和join()操作的机制。通常情况下,我们不需要直接继承ForkJoinTask类,只需要继承它的子类,Fork/Join框架提供了以下两个子类。
-
RecursiveAction:用于没有返回结果的任务。
-
RecursiveTask:用于有返回结果的任务。
-
-
ForkJoinPool:ForkJoinTask需要通过ForkJoinPool来执行。
任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任 务。
示例
使用Fork/Join框架计算1+2+3+4+5+6+7+8+9+10的结果。
-
如何分割任务?暂定单个子任务最多执行两个数相加,设置分割的阈值是2,那就是3个子任务,最后join3个子任务的结果。因为有结果,所以使用RecursiveTask。
public class CountTask extends RecursiveTask<Integer> { // 设置阈值 private static final int THRESHOLD = 2; private int start; private int end; public CountTask(int start, int end){ this.start = start; this.end = end; } @Override protected Integer compute() { int sum = 0; // 如果任务足够小就计算任务 boolean canCompute = (end - start) <= THRESHOLD; if (canCompute) { for (int i = start; i <= end ; i++) { sum += i; } } else { // 如果任务大于阈值,就分裂为两个子任务计算 int middle = (start + end) / 2; CountTask leftTask = new CountTask(start, middle); CountTask rightTask = new CountTask(middle + 1, end); // 执行子任务 leftTask.fork(); rightTask.fork(); int leftResult = leftTask.join(); int rightResult = rightTask.join(); sum = leftResult + rightResult; } return sum; } public static void main(String[] args) { ForkJoinPool forkJoinPool = new ForkJoinPool(); CountTask task = new CountTask(1, 10); Future<Integer> result = forkJoinPool.submit(task); try { System.out.println(result.get()); } catch (ExecutionException e) { throw new RuntimeException(e); } catch (InterruptedException e) { throw new RuntimeException(e); } } }
ForkJoinTask与一般任务的主要区别在于它需要实现compute方法进行任务分割。使用join方法会等待子任务执行完并得到其结果。