JUC
一、Java JUC 简介
- 在 Java 5.0 提供了 java.util.concurrent (简称 JUC )包,在此包中增加了在并发编程中很常用 的实用工具类,用于定义类似于线程的自定义子 系统,包括线程池、异步 IO 和轻量级任务框架。 提供可调的、灵活的线程池。还提供了设计用于 多线程上下文中的 Collection 实现等
二、volatile关键字-内存可见性
1. 问题引入
public class volatile01 {
public static void main(String[] args) {
ThreadDao threadDao = new ThreadDao();
new Thread(threadDao).start();
while (true){
if(threadDao.isFlag()){
System.out.println("------------");
break;
}
}
}
static class ThreadDao implements Runnable{
//插件加了volatile关键字和不加的运行差别
private boolean flag = false;
@Override
public void run() {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
flag = true;
System.out.println("flag = "+flag);
}
public boolean isFlag() {
return flag;
}
public void setFlag(boolean flag) {
this.flag = flag;
}
}
}
2. 内存可见性
- 是指当某个线程正在使用对象状态 而另一个线程在同时修改该状态,需要确保当一个线程修改了对象 状态后,其他线程能够看到发生的状态变化。
- 可见性错误是指当读操作与写操作在不同的线程中执行时,我们无 法确保执行读操作的线程能适时地看到其他线程写入的值,有时甚 至是根本不可能的事情。
- 我们可以通过同步来保证对象被安全地发布。除此之外我们也可以 使用一种更加轻量级的 volatile 变量
3. volatile
-
关键字,当多个线程进行操作共享数据时,可以保证内存中的数据可见
-
volatile
不具备互斥性
-
volatile
不能保证变量的原子性
-
相较于
synchronized
是一种较为轻量级的同步策略
三、原子变量-CAS算法
1. 问题引入
- 不能保证数据安全
public static void main(String[] args) {
AtomicDemo atomicDemo = new AtomicDemo();
for (int i = 0; i < 10; i++) {
new Thread(atomicDemo).start();
}
}
static class AtomicDemo implements Runnable{
private volatile int serialNumber = 0;
public int getSerialNumber() {
return serialNumber++;
}
@Override
public void run() {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+":"+getSerialNumber());
}
}
2. 原子变量
- 类的小工具包,支持在单个变量上解除锁的线程安全编程
- jdk1.5后java.util.concurrent.atomic包下提供了常用的原子变量
- volatile保证内存可见性
- CAS(Compare-And-Swap)算法保证数据的原子性
3. CAS算法
- CAS算法是硬件对于并发操作共享数据的支持
- 包含了三个操作数
- 内存之V
- 预估值A
- 更新值B
- 当且仅当V==A时,V=B,否则,将不做任何操作
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-SKYxE1sz-1669130509612)(https://gitee.com/jerrygrj/img/raw/master/img/image-20210720220109739.png)]
4. 修改上述部分代码,解决问题
private AtomicInteger serialNumber = new AtomicInteger();
public int getSerialNumber() {
//以原子方式将当前值加 1
return serialNumber.getAndIncrement();
}
四、ConcurrentHashMap锁分段机制
1. 介绍
- ConcurrentHashMap 同步容器类是Java 5 增加的一个线程安全的哈希表。对 与多线程的操作,介于 HashMap 与 Hashtable 之间。内部采用“锁分段” 机制替代 Hashtable 的独占锁。进而提高性能。
2. 代码
static class HelloThread implements Runnable{
//
// private static List<String> list = Collections.synchronizedList(new ArrayList<>());
private static CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
static {
list.add("AA");
list.add("BB");
list.add("CC");
}
@Override
public void run() {
Iterator<String> it = list.iterator();
while (it.hasNext()){
System.out.println(it.next());
list.add("AA");
}
}
}
- 注意:添加操作时,效率低,因为每次添加时都会进行复制,开销大
五、CountDownLatch闭锁
1. 介绍
- CountDownLatch 一个同步辅助类,在完成一组正在其他线程中执行的操作 之前,它允许一个或多个线程一直等待。
2. CountDownLatch
- 闭锁,在完成某些运算时,只有其他所有线程的运算全部完成,当前运算才继续执行
3. 闭锁的案例
package juc04_CountDownLatch闭锁;
import java.util.concurrent.CountDownLatch;
public class TestCountDownLatch {
public static void main(String[] args) {
//闭锁
final CountDownLatch latch = new CountDownLatch(5);
LatchDemo latchDemo = new LatchDemo(latch);
long start = System.currentTimeMillis();
//与上面统一线程数
for (int i = 0; i < 5; i++) {
new Thread(latchDemo).start();
}
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
long end = System.currentTimeMillis();
System.out.println("耗费时间为"+(end-start));
}
static class LatchDemo implements Runnable{
private CountDownLatch latch;
public LatchDemo(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void run() {
synchronized (this){
try {
for (int i = 0; i < 50000; i++) {
if(i%2==0){
System.out.println(i);
}
}
} catch (Exception exception) {
exception.printStackTrace();
}finally {
//递减1
latch.countDown();
}
}
}
}
}
六、实现Callable接口
1. 定义
- Callable 接口类似于 Runnable,两者都是为那些其实例可 能被另一个线程执行的类设计的。但是 Runnable 不会返 回结果,并且无法抛出经过检查的异常。
- Callable 需要依赖FutureTask ,FutureTask 也可以用作闭锁
2. 演示
public class TestCallable {
public static void main(String[] args) {
ThreadDemo td = new ThreadDemo();
//1.执行Callable方式,需要FutureTask实现类的支持,用于接收运算接口
FutureTask<Integer> result = new FutureTask<>(td);
new Thread(result).start();
//2.接收线程运算后的结果
try {
Integer sum = result.get();
System.out.println(sum);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
static class ThreadDemo implements Callable{
@Override
public Object call() throws Exception {
int sum = 0;
for (int i = 0; i < 100; i++) {
sum+=i;
}
return sum;
}
}
}
3. 解释
-
创建执行线程的方式三:实现Callable接口
-
相较于Runnable接口的方式,方法可以有返回值,并且可以抛出异常
-
上述代码,相当于从主线程分出来一个分支进行计算数值总和,然后通过result.get()方法进行数值和总线程的汇合。
七、Lock同步锁
1. 用于解决多线程安全问题的方式
1.1 同步代码块(synchronized隐式锁)
1.2 同步方法(synchronized隐式锁)
1.3 JDK1.5后的同步锁Lock。
- 注意:是一个显示锁,需要通过lock()方法上锁,必须通过unlock()方法进行释放锁
public class TestLock {
public static void main(String[] args) {
Ticket ticket = new Ticket();
new Thread(ticket, "1号窗口").start();
new Thread(ticket, "2号窗口").start();
new Thread(ticket, "3号窗口").start();
}
static class Ticket implements Runnable {
private int tick = 100;
private Lock lock = new ReentrantLock();
@Override
public void run() {
while (true) {
lock.lock(); //上锁
try {
if (tick > 0) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
}
System.out.println(Thread.currentThread().getName() + " 完成售票,余票为:" + --tick);
}
} finally {
lock.unlock(); //释放锁
}
}
}
}
}
- 生产者和消费者问题
public class TestProductIssue {
public static void main(String[] args) {
Clerk clerk = new Clerk();
Productor pro = new Productor(clerk);
Consumer cus = new Consumer(clerk);
new Thread(pro,"生产者A").start();
new Thread(cus,"消费者B").start();
new Thread(pro,"生产者C").start();
new Thread(cus,"消费者D").start();
}
static class Clerk{
private int product = 0;
//进货
public synchronized void get(){
while (product>=1){//为了避免虚假唤醒问题,应该总是使用在循环中
System.out.println("产品已满");
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName()+":"+ ++product);
this.notifyAll();
}
//卖货
public synchronized void sale(){
while (product <=0){
System.out.println("缺货");
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName()+":"+ --product);
this.notifyAll();
}
}
static class Productor implements Runnable{
private Clerk clerk;
public Productor(Clerk clerk) {
this.clerk = clerk;
}
@Override
public void run() {
for (int i = 0; i < 20; i++) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
clerk.get();
}
}
}
static class Consumer implements Runnable{
private Clerk clerk;
public Consumer(Clerk clerk){
this.clerk = clerk;
}
@Override
public void run() {
for (int i = 0; i < 20; i++) {
clerk.sale();
}
}
}
}
八、Condition控制线程通信
1. 定义
- Condition 接口描述了可能会与锁有关联的条件变量。这些变量在用 法上与使用 Object.wait 访问的隐式监视器类似,但提供了更强大的 功能。需要特别指出的是,单个 Lock 可能与多个 Condition 对象关 联。为了避免兼容性问题,Condition 方法的名称与对应的 Object 版 本中的不同。
- 在 Condition 对象中,与 wait、notify 和 notifyAll 方法对应的分别是 await、signal 和 signalAll。
- Condition 实例实质上被绑定到一个锁上。要为特定 Lock 实例获得 Condition 实例,请使用其 newCondition() 方法
2. 利用Condition解决生产者问题
九、线程按序交替
1. 问题
/**
* 编写一个程序,开启三歌线程,这三个线程的id分别为A、B、C,每个线程将自己的ID在屏幕上打印10遍,要求输出的结果必须按顺序显示
* 如ABCABCABC...依次递归
*/
public class TestABCAlternate {
public static void main(String[] args) {
AlternateDemo ad = new AlternateDemo();
new Thread(new Runnable() {
@Override
public void run() {
for (int i = 1; i <= 20; i++) {
ad.loopA(i);
}
}
},"A").start();
new Thread(new Runnable() {
@Override
public void run() {
for (int i = 1; i <= 20; i++) {
ad.loopB(i);
}
}
},"B").start();
new Thread(new Runnable() {
@Override
public void run() {
for (int i = 1; i <= 20; i++) {
ad.loopC(i);
System.out.println("---------------");
}
}
},"C").start();
}
static class AlternateDemo{
private int number = 1;//表示当前正在执行线程的标记
private Lock lock = new ReentrantLock();
private Condition condition1 = lock.newCondition();
private Condition condition2 = lock.newCondition();
private Condition condition3 = lock.newCondition();
/**
*
* @param totalLoop 循环几轮
*/
public void loopA(int totalLoop){
lock.lock();
try{
//1.判断
if(number!=1){
condition1.await();
}
//2.否则打印
for (int i = 1; i <= 1; i++) {
System.out.println(Thread.currentThread().getName()+"\t"+i+"\t"+totalLoop);
}
//3. 唤醒
number = 2;
condition2.signal();
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
public void loopB(int totalLoop){
lock.lock();
try{
//1.判断
if(number!=2){
condition2.await();
}
//2.否则打印
for (int i = 1; i <= 1; i++) {
System.out.println(Thread.currentThread().getName()+"\t"+i+"\t"+totalLoop);
}
//3. 唤醒
number = 3;
condition3.signal();
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
public void loopC(int totalLoop){
lock.lock();
try{
//1.判断
if(number!=3){
condition3.await();
}
//2.否则打印
for (int i = 1; i <= 1; i++) {
System.out.println(Thread.currentThread().getName()+"\t"+i+"\t"+totalLoop);
}
//3. 唤醒
number = 1;
condition1.signal();
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
}
}
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-x16eNOno-1669130509617)(https://gitee.com/jerrygrj/img/raw/master/img/image-20210721103927322.png)]
十、ReadWriteLock读写锁
1. 读-写锁 ReadWriteLock定义
- ReadWriteLock 维护了一对相关的锁,一个用于只读操作, 另一个用于写入操作。只要没有 writer,读取锁可以由 多个 reader 线程同时保持。写入锁是独占的。
- ReadWriteLock 读取操作通常不会改变共享资源,但执行 写入操作时,必须独占方式来获取锁。对于读取操作占 多数的数据结构。 ReadWriteLock 能提供比独占锁更高 的并发性。而对于只读的数据结构,其中包含的不变性 可以完全不需要考虑加锁操作
2. 注意
- 写写/读写 需要互斥
- 读读 不需要互斥
3. 程序
public class TestReadWriteLock {
public static void main(String[] args) {
ReadWriteLockDemo rw = new ReadWriteLockDemo();
new Thread(new Runnable() {
@Override
public void run() {
rw.write((int)Math.random()*101);
}
},"write").start();
for (int i = 0; i < 100; i++) {
new Thread(new Runnable() {
@Override
public void run() {
rw.read();
}
}).start();
}
}
static class ReadWriteLockDemo{
private int num = 0;
private ReadWriteLock lock = new ReentrantReadWriteLock();
public void read(){
lock.readLock().lock();//上锁
try{
System.out.println(Thread.currentThread().getName()+":"+num);
}finally {
lock.readLock().unlock();//释放锁
}
}
public void write(int num){
lock.writeLock().lock();
try{
System.out.println(Thread.currentThread().getName());
this.num = num;
}finally {
lock.writeLock().unlock();
}
}
}
}
十一、线程八锁
/*
* 题目:判断打印的 "one" or "two" ?
*
* 1. 两个普通同步方法,两个线程,标准打印, 打印? //one two
* 2. 新增 Thread.sleep() 给 getOne() ,打印? //one two
* 3. 新增普通方法 getThree() , 打印? //three one two
* 4. 两个普通同步方法,两个 Number 对象,打印? //two one
* 5. 修改 getOne() 为静态同步方法,打印? //two one
* 6. 修改两个方法均为静态同步方法,一个 Number 对象? //one two
* 7. 一个静态同步方法,一个非静态同步方法,两个 Number 对象? //two one
* 8. 两个静态同步方法,两个 Number 对象? //one two
*
* 线程八锁的关键:
* ①非静态方法的锁默认为 this, 静态方法的锁为 对应的 Class 实例
* ②某一个时刻内,只能有一个线程持有锁,无论几个方法。
*/
public class TestThread8Monitor {
public static void main(String[] args) {
Number number = new Number();
Number number2 = new Number();
new Thread(new Runnable() {
@Override
public void run() {
number.getOne();
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
// number.getTwo();
number2.getTwo();
}
}).start();
/*new Thread(new Runnable() {
@Override
public void run() {
number.getThree();
}
}).start();*/
}
}
class Number{
public static synchronized void getOne(){//Number.class
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
}
System.out.println("one");
}
public synchronized void getTwo(){//this
System.out.println("two");
}
public void getThree(){
System.out.println("three");
}
}
十二、线程池
1. 线程池定义
- 提供了一个线程队列,队列中保存所有等待状态的线程。避免了创建与销毁额外开销,提高了响应的速度
2. 体系结构
java.util.concurrent.Executor : 负责线程的使用与调度的根接口
|--**ExecutorService 子接口: 线程池的主要接口
|--ThreadPoolExecutor 线程池的实现类
|--ScheduledExecutorService 子接口:负责线程的调度
|--ScheduledThreadPoolExecutor :继承 ThreadPoolExecutor, 实现 ScheduledExecutorService,功能强大
3. 工具类 Executors
3.1 ExecutorService newFixedThreadPool()
- 创建固定大小的线程池
3.2 ExecutorService newCachedThreadPool()
- 缓存线程池,线程池的数量不固定,可以根据需求自动的更改数量。
3.3 ExecutorService newSingleThreadExecutor()
- 创建单个线程池。线程池中只有一个线程
3.4 ScheduledExecutorService newScheduledThreadPool()
- 创建固定大小的线程,可以延迟或定时的执行任务。
4. 创建过程
public class TestThreadPool {
public static void main(String[] args) {
//1. 创建线程池
ExecutorService pool = Executors.newFixedThreadPool(5);
ThreadPool tp = new ThreadPool();
//2. 为线程池中的线程分配任务
// pool.submit(tp);
// 提交10此相当于有十个线程
for (int i = 0; i < 10; i++) {
pool.submit(tp);
}
//3. 关闭线程池
pool.shutdown();
}
static class ThreadPool implements Runnable{
private int i =0;
@Override
public void run() {
while (i<=10000){
System.out.println(Thread.currentThread().getName()+":"+i++);
}
}
}
}
5. Callable接口创建线程池
public class TestThreadPoolAndCallable {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//创建一个线程池
ExecutorService pool = Executors.newFixedThreadPool(5);
List<Future<Integer>> list = new ArrayList<>();
for (int i = 0; i < 10; i++) {
//2. 为线程池中的线程分配任务
Future<Integer> future =
pool.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
int sum = 0;
for (int i = 0; i <= 100; i++) {
sum+=i;
}
return sum;
}
});
list.add(future);
}
for (Future<Integer> f :
list) {
System.out.println(f.get());
}
//3. 关闭线程池
pool.shutdown();
}
}
十三、线程调度
package juc11_线程池;
import java.util.Random;
import java.util.concurrent.*;
public class TestScheduledThreadPool {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ScheduledExecutorService pool = Executors.newScheduledThreadPool(5);
for (int i = 0; i < 5; i++) {
Future<Integer> result = pool.schedule(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
int num = new Random().nextInt(100);//生成随机数
System.out.println(Thread.currentThread().getName() + ":" + num);
return num;
}
}, 1, TimeUnit.SECONDS);
System.out.println(result.get());
}
pool.shutdown();
}
}
十四、ForkJoinPool分支/合并框架工作窃取
1. Fork/Join框架
public class TestForkJoinPool {
public static void main(String[] args) {
Instant start = Instant.now();
ForkJoinPool pool = new ForkJoinPool();
ForkJoinTask<Long> task = new ForkJoinSumCalculate(0L, 50000000000L);
Long sum = pool.invoke(task);
System.out.println(sum);
Instant end = Instant.now();
System.out.println("耗费时间为:" + Duration.between(start, end).toMillis());//166-1996-10590
}
@Test
public void test1(){
Instant start = Instant.now();
long sum = 0L;
for (long i = 0L; i <= 50000000000L; i++) {
sum += i;
}
System.out.println(sum);
Instant end = Instant.now();
System.out.println("耗费时间为:" + Duration.between(start, end).toMillis());//35-3142-15704
}
//java8 新特性
@Test
public void test2(){
Instant start = Instant.now();
Long sum = LongStream.rangeClosed(0L, 50000000000L)
.parallel()
.reduce(0L, Long::sum);
System.out.println(sum);
Instant end = Instant.now();
System.out.println("耗费时间为:" + Duration.between(start, end).toMillis());//1536-8118
}
}
class ForkJoinSumCalculate extends RecursiveTask<Long>{
/**
*
*/
private static final long serialVersionUID = -259195479995561737L;
private long start;
private long end;
private static final long THURSHOLD = 10000L; //临界值
public ForkJoinSumCalculate(long start, long end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
long length = end - start;
if(length <= THURSHOLD){
long sum = 0L;
for (long i = start; i <= end; i++) {
sum += i;
}
return sum;
}else{
long middle = (start + end) / 2;
ForkJoinSumCalculate left = new ForkJoinSumCalculate(start, middle);
left.fork(); //进行拆分,同时压入线程队列
ForkJoinSumCalculate right = new ForkJoinSumCalculate(middle+1, end);
right.fork(); //
return left.join() + right.join();
}
}
}