文章目录
- JUC并发编程
- 1、什么是JUC?
- 2、线程和进程
- 并发、并行
- 线程有几个状态? 6个
- wait/sleep区别?
- 3、Lock锁
- 传统sychronized锁卖票实例
- 使用Lock锁卖票实例
- sychnorized 和 lock锁区别
- 4、生产者和消费者问题
- synorchized版生产者和消费者问题
- 虚假唤醒问题
- JUC版生产者和消费者问题
- 线程ABCD随机无序交替打印1/0
- 线程ABCD精准有序交替打印1/0
- 5、如何判断锁的是谁?
- 6、集合类不安全
- List
- Map
- Set
- 7、Callable
- 8、常用辅助类(必会:加减法计数器、信号量)
- CountDownLatch
- CyclicBarrier
- Semaphore
- 9、读写锁
- 10、阻塞队列
- 同步队列
- 11、线程池(重点)
- 三大方法
- 七大参数
- 结合业务理解线程池
- 四种拒绝策略
- 代码
- 最大线程如何设置?
- 12、四大函数式接口(必会)
- 函数式接口
- Function 函数型接口
- Predicate 断定型接口
- Custmer 消费型接口
- Supplier 供给型接口
- 13、Stream流式计算
- 14、ForkJoin
- 15、异步回调
- 16、JMM
- 17、Volatile
- 验证1:保证可见性
- 验证2:不保证原子性
- 验证3:禁止指令重排
- 18、单例模式
- 饿汉式
- 懒汉式
- 静态内部类
- 枚举类
- 19、CAS
- CSA的ABA问题
- 20、各种锁的理解
- 公平锁、非公平锁
- 可重入锁
- 自旋锁
- 死锁
- 实例代码
JUC并发编程
1、什么是JUC?
java util concurrent 简称, java并发编程
源码+官网文档 jdk1.8在线文档
先回顾点基础知识:
2、线程和进程
进程:大, 一个程序,QQ.exe , MUSIC.exe 程序的集成
一个进程可以包含多个线程,至少包含一个!!!
java 默认有几个线程?2个 main、GC
线程:开了一个Typora进程,写字,自动保存(线程负责的)
Thread、Runnable、Callable
java真的可以开启线程吗? 开不了 new Thread().start() 源码中是调用native start0()本地方法,底层中的C++, java 无法直接操作硬件
并发、并行
并发编程:并发、并行
并发:(多线程操作同一资源)
- CPU一核,模拟出来多线程,快速交替
并行:(多个人一起行走)
- CPU多核,多个线程可以同时执行;线程池
/**
* 代码获取CPU核数
*
* @author sdm
* @version 1.0
* @date 2023/6/19 15:03
*/
public class test1 {
public static void main(String[] args) {
// 获取CPU的核数
// CPU密集型、IO密集型
System.out.println(Runtime.getRuntime().availableProcessors()); //8核
}
}
并发编程本质 充分利用CPU的资源
线程有几个状态? 6个
public enum State {
// 1.新生
NEW,
// 2.运行
RUNNABLE,
// 3.阻塞
BLOCKED,
// 4.等待(死死的等)
WAITING,
// 5.超时等待(等3年,三年后等不下去了)
TIMED_WAITING,
// 6.终止
TERMINATED;
}
wait/sleep区别?
1.来自不同的类
wait => Object
sleep => Thread
2.关于锁的释放
wait:会释放锁,是醒着等
sleep:睡觉了,抱着锁睡,不会释放!!!
3.使用的范围不同
wait:必选在同步代码块中
sleep:可以在任何地方睡
4.是否需要捕获异常
wait:必须需要捕获异常
sleep:必须要捕获异常,
因为可能会有超时等待 throws InterruptedException
3、Lock锁
Java 8 中文版 - 在线API中文手册 - 码工具 (matools.com)
传统Sychronize锁
传统sychronized锁卖票实例
package com.kuang.demo01;
/**
* 基本的卖票例子 不使用JUC(java.util.concurrent.locks 三个接口: 1.Condition 2.Lock 3.ReadWriteLock)
*
* @author sdm
* @version 1.0
* @date 2023/6/19 17:27
*/
public class SaleTicketDemo01 {
// 1. 准备资源类(属性、方法(sychnorized加锁,本质:队列、锁),对共享资源加锁,控制并发)
// 2. 创建多线程
// 3. 将资源类抛进去
// 参考文章:Java线程池的正确使用方式——不要再new Thread了 http://t.csdn.cn/ytpMd
public static void main(String[] args) {
Ticket ticket = new Ticket(); // 并发:多线程操作同一资源类,将资源类丢入线程
new Thread().start();
new Thread(new Runnable() { // @FunctionalInterface 函数式接口
public void run() {
// 匿名内部类,new Runnable(重写run方法)
}
}).start();
new Thread(()->{ }, "线程名").start(); // jdk1.8之后,简化为Lambda表达式
// 三个线程操作同一资源,三个线程同时卖票
new Thread(()->{ for (int i = 0; i < 10; i++) { ticket.sale();}}, "A").start(); // 线程A卖60次
new Thread(()->{ for (int i = 0; i < 10; i++) { ticket.sale();}}, "B").start(); // 线程B卖60次
new Thread(()->{ for (int i = 0; i < 10; i++) { ticket.sale();}}, "C").start(); // 线程C卖60次
// 期望出现结果:
/*
A卖出了第30票,剩余:29
A卖出了第29票,剩余:28
A卖出了第28票,剩余:27
A卖出了第27票,剩余:26
A卖出了第26票,剩余:25
A卖出了第25票,剩余:24
A卖出了第24票,剩余:23
A卖出了第23票,剩余:22
A卖出了第22票,剩余:21
A卖出了第21票,剩余:20
B卖出了第20票,剩余:19
B卖出了第19票,剩余:18
B卖出了第18票,剩余:17
B卖出了第17票,剩余:16
B卖出了第16票,剩余:15
B卖出了第15票,剩余:14
B卖出了第14票,剩余:13
B卖出了第13票,剩余:12
B卖出了第12票,剩余:11
B卖出了第11票,剩余:10
B卖出了第10票,剩余:9
B卖出了第9票,剩余:8
B卖出了第8票,剩余:7
B卖出了第7票,剩余:6
B卖出了第6票,剩余:5
C卖出了第5票,剩余:4
C卖出了第4票,剩余:3
C卖出了第3票,剩余:2
C卖出了第2票,剩余:1
C卖出了第1票,剩余:0
*/
// 实际出现结果, 买票方法需要加锁,排队 public synchronized void sale()
/*
A卖出了第30票,剩余:29
A卖出了第29票,剩余:28
A卖出了第28票,剩余:27
B卖出了第27票,剩余:26
A卖出了第26票,剩余:25
B卖出了第25票,剩余:24
A卖出了第24票,剩余:23
B卖出了第23票,剩余:22
A卖出了第22票,剩余:21
B卖出了第21票,剩余:20
A卖出了第20票,剩余:19
B卖出了第19票,剩余:18
A卖出了第18票,剩余:17
B卖出了第17票,剩余:16
A卖出了第16票,剩余:15
C卖出了第14票,剩余:13
C卖出了第12票,剩余:11
C卖出了第11票,剩余:10
C卖出了第10票,剩余:9
C卖出了第9票,剩余:8
C卖出了第8票,剩余:7
C卖出了第7票,剩余:6
C卖出了第6票,剩余:5
C卖出了第5票,剩余:4
B卖出了第15票,剩余:14
C卖出了第4票,剩余:3
A卖出了第13票,剩余:12
B卖出了第3票,剩余:2
B卖出了第2票,剩余:1
B卖出了第1票,剩余:0
*/
}
}
// 资源类OOP 属性+方法
class Ticket {
// 票数
private int number = 30;
// 卖票:每卖一次,票数减一
public synchronized void sale(){
if (number > 0){
System.out.println(Thread.currentThread().getName() + "卖出了第" + (number--) + "票,剩余:" + number);
}
}
}
Lock接口
公平锁、非公平锁区别:
公平锁:先来后到
非公平锁:可以插队(默认非公平锁)
使用Lock锁卖票实例
package com.kuang.demo01;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* 基本的卖票例子 使用JUC(java.util.concurrent.locks 三个接口: 1.Condition 2.Lock 3.ReadWriteLock,使用Lock)
*
* @author sdm
* @version 1.0
* @date 2023/6/19 17:27
*/
public class SaleTicketDemo02 {
/*
公平锁、非公平锁区别:
公平锁:先来后到
非公平锁:可以插队(默认非公平锁)
public ReentrantLock() {
sync = new ReentrantLock.NonfairSync(); // 默认非公平锁
}
public ReentrantLock(boolean fair) {
sync = fair ? new ReentrantLock.FairSync() : new ReentrantLock.NonfairSync();
}
sychnorized 和 lock锁使用区别:
sychnorized:手动挡,需要手动创建线程
lock锁:自动挡,走lock三部曲就行
1. new ReentrantLock(), new可重入锁
2. Lock.lock()加锁,
3. finally {Lock.unlock()} 解锁
*/
public static void main(String[] args) {
// 并发:多线程操作同一资源类,将资源类丢入线程
Ticket1 ticket1 = new Ticket1();
new Thread(()->{for (int i = 0; i < 15; i++) ticket1.sale();}, "线程A").start(); // A线程卖15张票
new Thread(()->{for (int i = 0; i < 15; i++) ticket1.sale();}, "线程B").start(); // B线程卖15张票
new Thread(()->{for (int i = 0; i < 15; i++) ticket1.sale();}, "线程C").start(); // C线程卖15张票
/*
线程A卖出了第30票,剩余:29
线程A卖出了第29票,剩余:28
线程C卖出了第28票,剩余:27
线程C卖出了第27票,剩余:26
线程C卖出了第26票,剩余:25
线程C卖出了第25票,剩余:24
线程C卖出了第24票,剩余:23
线程C卖出了第23票,剩余:22
线程C卖出了第22票,剩余:21
线程C卖出了第21票,剩余:20
线程C卖出了第20票,剩余:19
线程C卖出了第19票,剩余:18
线程C卖出了第18票,剩余:17
线程C卖出了第17票,剩余:16
线程C卖出了第16票,剩余:15
线程C卖出了第15票,剩余:14
线程C卖出了第14票,剩余:13
线程B卖出了第13票,剩余:12
线程B卖出了第12票,剩余:11
线程B卖出了第11票,剩余:10
线程B卖出了第10票,剩余:9
线程B卖出了第9票,剩余:8
线程B卖出了第8票,剩余:7
线程B卖出了第7票,剩余:6
线程B卖出了第6票,剩余:5
线程B卖出了第5票,剩余:4
线程B卖出了第4票,剩余:3
线程B卖出了第3票,剩余:2
线程B卖出了第2票,剩余:1
线程B卖出了第1票,剩余:0
*/
}
}
class Ticket1 {
// new一个可重入锁
private final ReentrantLock lock = new ReentrantLock();
private int number = 30;
public void sale(){
lock.lock(); //加锁
try {
// 业务代码
if (number > 0){
System.out.println(Thread.currentThread().getName() + "卖出了第" + (number--) + "票,剩余:" + number);
}
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock(); // 释放锁
}
}
}
sychnorized 和 lock锁区别
sychnorized:自动挡,不需要手动创建锁、释放锁
lock锁:手动挡,需要手动创建锁,释放锁,走lock三部曲就行
- new ReentrantLock(), new可重入锁
- Lock.lock()加锁,
- finally {Lock.unlock()} 解锁
4、生产者和消费者问题
synorchized版生产者和消费者问题
生产者和消费者 synorchized版 实现效果:交替打印A=>1、B=>0
package com.kuang.demo02;
/**
* 线程之间的通信问题:生产者消费者 (一个窗口,一次放一碗饭饭,阿姨这边打饭,同学这边拿饭)
* 线程交替执行 A B 线程操作同一个变量 等待唤醒 wait,通知唤醒 notify
* A num+1
* B num-1
*
* @author sdm
* @version 1.0
* @date 2023/6/24 11:12
*/
public class Test {
public static void main(String[] args) {
// 1.准备资源类
Goods goods = new Goods();
// 2.创建多线程, 3.将资源类抛进去
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
goods.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "A").start(); // A线程+1
new Thread(() -> {
for (int i = 0; i < 5; i++) {
try {
goods.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "B").start(); // B线程-1
// 预期结果:A=>1 B=>0 A=>1 B=>0 ...
/*
为什么出现交替打印结果?
A=>1
B=>0
A=>1
B=>0
A=>1
B=>0
A=>1
B=>0
A=>1
B=>0
A=>1
线程A,判断number是否等于0,等于0,干活+1,然后唤醒B,不等于0,等着
线程B,判断number是否等于1,等于1,干活-1,然后唤醒A,不等于0,等着
*/
}
}
/*
资源类:饭 一个窗口
生产者消费者问题:六字真言: (判断)等待、业务、通知
*/
class Goods{
private int number = 0;
//+1 生产了一件物品
public synchronized void increment() throws InterruptedException {
if (number != 0){ //O的时候,干活,不是0的时候,等待
// 等待 窗口有一碗饭时,等会
this.wait();
}
// 业务
number ++;
System.out.println(Thread.currentThread().getName() + "=>" + number);
// 通知其他线程,我+1完毕了
this.notifyAll(); // 通知唤醒其他线程
}
//-1 消费了一件物品
public synchronized void decrement() throws InterruptedException {
if (number == 0){ //1的时候,干活,不是1的时候,等待
// 等待 窗口没有饭时,等会
this.wait();
}
// 业务
number --;
System.out.println(Thread.currentThread().getName() + "=>" + number);
// 通知其他线程,我-1完毕了
this.notifyAll(); // 通知唤醒其他线程
}
}
虚假唤醒问题
问题存在,A B C D 4个线程,还安全吗?
实现效果:交替打印 1、0 ,但是A B C D 不会精准交替
答:不安全 存在if判断等待时,发生虚假唤醒,需要替换为while判断等待
package com.kuang.demo02;
/**
* A B C D 线程交替打印
*/
public class ThreadDemoABCD {
public static void main(String[] args) {
// 1.准备资源类
Goods1 goods = new Goods1();
// 2.创建多线程, 3.将资源类抛进去
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
goods.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "A").start(); // A线程+1
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
goods.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "B").start(); // B线程-1
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
goods.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "C").start(); // C线程+1
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
goods.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "D").start(); // D线程-1
/*
打印结果:
...
B=>0
C=>1
A=>2
C=>3
B=>2
B=>1
B=>0
...
存在这种打印结果,为什么?
因为if判断,判断一次,会造成虚假唤醒(线程被唤醒,不会接到通、中断或超时,即虚假唤醒,所以唤醒条件防范,如果条件不满足,继续等待,等待应该总是出现在循环汇总),
解决办法:while循环判断 替换后,打印结果:1 0 交替
*/
}
}
class Goods1{
private int number = 0;
//+1 生产了一件物品
public synchronized void increment() throws InterruptedException {
while (number != 0){ //O的时候,干活,不是0的时候,等待
// 等待 窗口有一碗饭时,等会
this.wait();
}
// 业务
number ++;
System.out.println(Thread.currentThread().getName() + "=>" + number);
// 通知其他线程,我+1完毕了
this.notifyAll(); // 通知唤醒其他线程
}
//-1 消费了一件物品
public synchronized void decrement() throws InterruptedException {
while (number == 0){ //1的时候,干活,不是1的时候,等待
// 等待 窗口没有饭时,等会
this.wait();
}
// 业务
number --;
System.out.println(Thread.currentThread().getName() + "=>" + number);
// 通知其他线程,我-1完毕了
this.notifyAll(); // 通知唤醒其他线程
}
}
JUC版生产者和消费者问题
Sychronized : Object
Lock:JUC
线程ABCD随机无序交替打印1/0
实现效果:A B C D 线程精准唤醒,交替打印
package com.kuang.demo02;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* A B C D 线程通信 JUC Lock+Condition
*
* @author sdm
* @version 1.0
* @date 2023/6/24 15:23
*/
public class ProductConsumerConditionDemoABCDJUC {
public static void main(String[] args) {
// 获取资源类
Goods2 goods2 = new Goods2();
// 创建多线程,将资源类抛进去
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
goods2.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"A").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
goods2.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"B").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
goods2.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"C").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
goods2.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"D").start();
/* 打印结果:A B C D 线程无序交替打印1、0
A=>1
B=>0
A=>1
B=>0
A=>1
B=>0
A=>1
B=>0
A=>1
B=>0
A=>1
B=>0
A=>1
B=>0
A=>1
B=>0
A=>1
B=>0
A=>1
B=>0
C=>1
D=>0
C=>1
D=>0
C=>1
D=>0
C=>1
D=>0
C=>1
D=>0
C=>1
D=>0
C=>1
D=>0
C=>1
D=>0
C=>1
D=>0
C=>1
D=>0
*/
}
}
// 等待,业务,通知
class Goods2{
private int number = 0;
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
public void increment() throws InterruptedException {
lock.lock();
try {
// 判断等待
while (number != 0){
condition.await();
}
// 业务操作
number++;
System.out.println(Thread.currentThread().getName() + "=>" + number);
// 通知其他线程,我+1操作完成了
condition.signalAll();
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
public void decrement() throws InterruptedException {
lock.lock();
try {
// 判断等待
while (number == 0){
condition.await();
}
// 业务操作
number--;
System.out.println(Thread.currentThread().getName() + "=>" + number);
// 通知其他线程,我+1操作完成了
condition.signalAll();
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
}
线程ABCD精准有序交替打印1/0
package com.kuang.demo02;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.zip.DataFormatException;
/**
* A B C D 线程通信 JUC Lock+Condition 精准通知打印:循环执行A->B->C
*
* @author sdm
* @version 1.0
* @date 2023/6/24 15:23
*/
public class ProductConsumerConditionDemoABCDJUCOrderly {
public static void main(String[] args) {
// 获取资源类
Goods3 goods3 = new Goods3();
// 创建多线程,将资源类抛进去
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
goods3.printA();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"A").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
goods3.printB();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"B").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
goods3.printC();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"C").start();
/* 实现效果:
A=>线程A执行
B=>线程B执行
C=>线程C执行
A=>线程A执行
B=>线程B执行
C=>线程C执行
A=>线程A执行
B=>线程B执行
C=>线程C执行
A=>线程A执行
B=>线程B执行
C=>线程C执行
A=>线程A执行
B=>线程B执行
C=>线程C执行
A=>线程A执行
B=>线程B执行
C=>线程C执行
A=>线程A执行
B=>线程B执行
C=>线程C执行
A=>线程A执行
B=>线程B执行
C=>线程C执行
A=>线程A执行
B=>线程B执行
C=>线程C执行
A=>线程A执行
B=>线程B执行
C=>线程C执行
*/
}
}
// 等待,业务,通知
class Goods3{ //资源类
Lock lock = new ReentrantLock();
Condition condition1 = lock.newCondition(); // 同步监视器:一个监视器只监视一个
Condition condition2 = lock.newCondition(); // 同步监视器:一个监视器只监视一个
Condition condition3 = lock.newCondition(); // 同步监视器:一个监视器只监视一个
private int number = 1; // 1A 3B 5C
public void printA() throws InterruptedException {
lock.lock();
try {
// 业务 等待,执行,通知
while (number != 1){
condition1.await();
}
number = 2;
System.out.println(Thread.currentThread().getName() + "=>线程A执行");
// 唤醒指定的人,B
condition2.signal();
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
public void printB() throws InterruptedException {
lock.lock();
try {
// 业务 等待,执行,通知
while (number != 2){
condition2.await();
}
number = 3;
System.out.println(Thread.currentThread().getName() + "=>线程B执行");
// 唤醒指定的人,C
condition3.signal();
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
public void printC() throws InterruptedException {
lock.lock();
try {
// 业务 等待,执行,通知
while (number != 3){
condition3.await();
}
number = 1;
System.out.println(Thread.currentThread().getName() + "=>线程C执行");
// 唤醒指定的人,A
condition1.signal();
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
}
5、如何判断锁的是谁?
package com.kuang.lock8;
import java.util.concurrent.TimeUnit;
public class Test1 {
public static void main(String[] args) {
//Q1:无锁时,先发消息?先打电话?
//Phone1 phone1 = new Phone1();
//
//new Thread(phone1::sendMessage, "A").start();
//new Thread(phone1::call, "B").start();
/*
sendMessage
call
*/
//Q2: sychronized锁类中的方法,先发消息?先打电话?
// 有锁,锁了对象的调用者 phone2 ,所以肯定先 发短信,再打电话,
// 验证:发短信里面延迟4s,依然是先 发短信,再 打电话,因为 phone2 在4s内都被锁死了,B线程不能执行
// 两个方法用的同一把锁。谁先拿到谁先执行
//Phone2 phone2 = new Phone2();
//new Thread(phone2::sendMessage,"A").start();
延迟1s
//try {
// TimeUnit.SECONDS.sleep(1);
//} catch (InterruptedException e) {
// e.printStackTrace();
//}
//new Thread(phone2::call,"B").start();
/*
发短信
打电话
*/
//Q3:发短信有锁sychronized,hello无锁 先发消息?先hello?
//Phone2 phone2 = new Phone2();
//new Thread(phone2::sendMessage,"A").start();
//new Thread(phone2::hello,"B").start();
/*
hello
发短信
*/
//Q4:两个对象分别调用sychronized方法 哪个先发消息?
//Phone2 phone21 = new Phone2();
//Phone2 phone22 = new Phone2();
//new Thread(()->{
// for (int i = 0; i < 10; i++) {
// phone21.sendMessage();
// }
//},"A").start();
//new Thread(()->{
// for (int i = 0; i < 10; i++) {
// phone22.sendMessage();
// }
//},"B").start();
/*
...
发短信-A 没有规则顺序,因为调用的不是一个对象,所以锁没有什么作用
发短信-B
发短信-B
发短信-A
发短信-B
...
*/
//Q5:静态synchronized方法,先发消息?先打电话? 坑:线程中间需要加睡眠,否则锁没有效果
//Phone3 phone3 = new Phone3();
//new Thread(()->{phone3.sendMessage();},"A").start();
//new Thread(()->{phone3.call();},"B").start();
/*
发短信-A
打电话-B
*/
//Phone3 phone31 = new Phone3();
//Phone3 phone32 = new Phone3();
//new Thread(()->{phone31.sendMessage();},"A").start();
//new Thread(()->{phone32.sendMessage();},"B").start();
/*
发短信-A
打电话-B
*/
//new Thread(()->{
// for (int i = 0; i < 100; i++) {
// phone31.sendMessage();
// }
//},"A").start();
//new Thread(()->{
// for (int i = 0; i < 100; i++) {
// phone32.sendMessage();
// }
//},"B").start();
/*
...
发短信-A
发短信-B
发短信-A
发短信-A
...
为什么存在这种现象?既然静态锁的对象是类的模板,不应该是 发完消息,再打电话?
猜测:
会不会是执行完方法后就释放模板锁,在这个间隙,B线程获取到了模板锁,然后执行了?
原因:
参考文章:http://t.csdn.cn/K8e8e
近期在使用多线程开发时遇到一些有意思的东西—在线程run方法中是否需要当前线程睡眠一段时间。
要了解sleep方法,那么首先得了解到它的原理及其使用方法。
使用简介:
在这里以JAVA的API为例(当然其它语言亦可以),JAVA的API中是这么描述的,
public static void sleep(long millis,
int nanos)
throws InterruptedException
在指定的毫秒数加指定的纳秒数内让当前正在执行的线程休眠(暂停执行),
此操作受到系统计时器和调度程序精度和准确性的影响。
参数: millis - 以毫秒为单位的休眠时间。 nanos - 要休眠的另外 0-999999 纳秒。 抛出: IllegalArgumentException - 如果 millis 值为负或 nanos 值不在 0-999999 范围内。 InterruptedException - 如果任何线程中断了当前线程。当抛出该异常时,当前线程的中断状态 被清除 API中说的很明确,这个方法的目的就是让线程休眠,并且这个操作其实是没有释放锁的。
那么问题来了,sleep到底什么时候用呢?
我们在使用多线程的时候会发现,有时候需要程序运行时间特别长了会经常出现一些问题,或者当前我们开启了多个线程它们分别执行几个任务,但是因为执行的任务时间非常短,有时候cpu切换时候会出现一系列的问题,那么这时候可能的原因就有是否因为cpu一直在执行一个线程或者其他的原因呢。
当我们设置sleep时,等于告诉cpu,当前的线程不再运行,持有当前对象的锁。那么这个时候cpu就会切换到另外的线程了。这种操作有些时候是非常好的。
那么回归标题,究竟应不应该使用sleep呢,LZ认为还是具体业务具体分析,看是否需要添加此方法。
*/
//new Thread(()->{
// for (int i = 0; i < 100; i++) {
// phone31.sendMessage();
// }
//},"A").start();
//try {
// TimeUnit.SECONDS.sleep(5); // 这里为什么要睡一下,才能打印出预想结果?
// // 猜测:
// // A调用方法的时候,持有锁,循环下一次的间隙中,CPU会想要切换执行线程B
// // 如果不加睡眠,线程B会在这个间隙获取锁、获取CPU执行,这样就会有问题,锁失去了意义
// // 所以,在想开启线程B的时候加个睡眠,等待A下次循环调用发消息方法时唤醒A线程,且让其获取锁,这样保证A继续持有CPU,避免B线程执行
// // A执行完后,睡5秒,5秒内A没有循环调用持有锁了,就开始执行线程B,B获取锁
// // 原因:
// // 因为锁在for循环里面,所以锁会被释放,可以把锁放在for循环外面,如Test2,这样就不会有这个问题了
//} catch (InterruptedException e) {
// e.printStackTrace();
//}
//new Thread(()->{
// for (int i = 0; i < 100; i++) {
// phone32.sendMessage();
// }
//},"B").start();
//Q5:1对象分别调用static sychronized发消息, sychronized打电话,哪个先执行?
//Phone4 phone4 = new Phone4();
//new Thread(Phone4::sendMessage,"A").start();// 锁Class对象
//new Thread(phone4::call,"B").start();// 锁调用者
/*
打电话-B 锁的不是一个东西,发消息睡眠1s,就先打印出打电话了
发短信-A
*/
//Q6:2对象分别调用static sychronized发消息, sychronized打电话,哪个先执行?
Phone4 phone5 = new Phone4();
Phone4 phone6 = new Phone4();
new Thread(Phone4::sendMessage,"A").start();// 锁Class对象
new Thread(phone6::call,"B").start();// 锁调用者
/*
打电话-B 锁的不是一个东西,发消息睡眠1s,就先打印出打电话了
发短信-A
*/
}
}
/*
无锁
*/
class Phone1{
public void sendMessage(){
System.out.println("sendMessage");
}
public void call(){
System.out.println("call");
}
}
/*
sychronized锁
锁的对象是方法的调用者
*/
class Phone2{
public synchronized void sendMessage() {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("发短信-"+Thread.currentThread().getName());
}
public synchronized void call(){
System.out.println("打电话-"+Thread.currentThread().getName());
}
public void hello(){
System.out.println("hello-"+Thread.currentThread().getName());
}
}
/*
static sychronized锁
锁的对象是类的模板
类一加载就有了Class对象
Phone3只有唯一的Class对象 Class<Phone3> phone3Class = Phone3.class;
sendMessage()和call()方法都被static修饰时,用的是同一个锁
*/
class Phone3{
public static synchronized void sendMessage(){
System.out.println("发短信-"+Thread.currentThread().getName());
}
public static synchronized void call(){
System.out.println("打电话-"+Thread.currentThread().getName());
}
}
/*
1个static sychronized锁
1个sychronized锁
*/
class Phone4{
// 锁的Class类模板
public static synchronized void sendMessage(){
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("发短信-"+Thread.currentThread().getName());
}
// 锁的调用者
public synchronized void call(){
System.out.println("打电话-"+Thread.currentThread().getName());
}
}
package com.kuang.lock8;
import javax.swing.plaf.synth.SynthCheckBoxMenuItemUI;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
/**
* 实现Runnable接口,重写run方法
*
* Java多线程实现-Runnable接口 http://t.csdn.cn/pJhXV
*/
class Person implements Runnable {
private String id;
private String name;
private String loginType;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getLoginType() {
return loginType;
}
public void setLoginType(String loginType) {
this.loginType = loginType;
}
@Override
public void run() {
// 类锁,锁类的Class对象(模板)
synchronized (Person.class){
for (int i = 0; i < 1000; i++) {
System.out.println(Thread.currentThread().getName()+"===========");
}
}
// for循环放在锁外面,线程之间就需要加睡眠,否则会出现CPU多次切换线程的情况
//synchronized (Person.class){
// System.out.println(Thread.currentThread().getName()+"===========");
//}
}
@Override
public boolean equals(Object obj) {
if (obj == null){
return false;
}
if (obj == this || getClass() == obj.getClass()){
return true;
}
if (obj instanceof Person){
Person person = (Person) obj;
return this.id.equals(person.getId()) && this.name.equals(person.getName());
}
return false;
}
@Override
public int hashCode() {
return Objects.hash(id,name);
}
@Override
public String toString() {
return "Person{" +
"id='" + id + '\'' +
", name='" + name + '\'' +
", loginType='" + loginType + '\'' +
'}';
}
public static void main(String[] args) {
Person personA = new Person();
Person personB = new Person();
new Thread(personA).start();
new Thread(personB).start();
//for循环放在锁外面,线程之间就需要加睡眠,否则会出现CPU多次切换线程的情况
//new Thread(()->{
// for (int i = 0; i < 1000; i++) {
// personA.run();
// }
//}).start();
//try {
// TimeUnit.SECONDS.sleep(1);
//} catch (InterruptedException e) {
// e.printStackTrace();
//}
//new Thread(()->{
// for (int i = 0; i < 1000; i++) {
// personB.run();
// }
//}).start();
}
}
6、集合类不安全
List、单线程下是安全的,并发情况下不安全。
并发情况下,有些集合类会报错:并发修改异常 ConcurrentModificationException
List
package com.kuang.unsafe;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
/**
* List 单线程下是安全的,并发情况下不安全。
* 并发情况下,有些集合类会报错:并发修改异常 ConcurrentModificationException
*/
public class ListTest {
public static void main(String[] args) {
/**
* 并发下ArrayList不安全
*
* 解决方案:
* 1. Vector List<String> list = new Vector<>();
* 2. Collections.synchronizedList List<String> list = Collections.synchronizedList(new ArrayList<>());
* 3. CopyOnWriteArrayList() List<String> list = new CopyOnWriteArrayList<>();
* 写入时复制,cow,计算机程序设计领域的一种优化策略
* 多个线程调用的时候,list, 读取的时候,固定的,写入(覆盖)
* 在写入的时候避免重复,造成数据问题!
* 读写分类
* CopyOnWriteArrayList比 Vector 好在哪里? vector 用的 sychronized,效率低一点,CopyOnWriteArrayList比 用了lock锁,效率搞一点
*
*/
// 单线程没有问题
//List<String> list = new ArrayList<>();
//for (int i = 0; i < 1000; i++) {
// list.add(UUID.randomUUID().toString().substring(0,5));
// System.out.println(list);
//}
//出现并发修改异常
//List<String> list = new ArrayList<>();
//for (int i = 0; i < 1000; i++) {
// new Thread(()->{
// list.add(UUID.randomUUID().toString().substring(0,5));
// System.out.println(list);
// }).start();
//}
// 方式1 Vector
//List<String> list = new Vector<>();
//for (int i = 0; i < 1000; i++) {
// new Thread(()->{
// list.add(UUID.randomUUID().toString().substring(0,5));
// System.out.println(list);
// }).start();
//}
// 方式2 将list抛入Collections.synchronizedList()
//List<String> list = Collections.synchronizedList(new ArrayList<>());
//for (int i = 1; i <= 10; i++) {
// new Thread(()->{
// list.add(UUID.randomUUID().toString().substring(0,5));
// System.out.println(list);
// }, String.valueOf(i)).start();
//}
// 方式3 new一个写入时复制list, CopyOnWriteArrayList()
List<String> list = new CopyOnWriteArrayList<>();
for (int i = 1; i <= 10; i++) {
new Thread(()->{
list.add(UUID.randomUUID().toString().substring(0,5));
System.out.println(list);
}, String.valueOf(i)).start();
}
}
}
Map
package com.kuang.unsafe;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
/**
* Map 单线程下是安全的,并发情况下不安全。
* 并发情况下,有些集合类会报错:并发修改异常 ConcurrentModificationException
*
* 解决办法:
* 方式1 集合工具类+同步锁 Collections.synchronizedMap
* 方式2 并发哈希Map ConcurrentHashMap
*/
public class MapTest {
public static void main(String[] args) {
// 单线程下安全
//Map<String, String> map = new HashMap<>();
加载因子,初始化容量 new HashMap<>(16, 0.75);
//for (int i = 0; i < 1000; i++) {
// map.put(Thread.currentThread().getName(), UUID.randomUUID().toString().substring(0,5));
// System.out.println(map);
//}
// 多线程下,发生并发修改异常
//Map<String, String> map = new HashMap<>();
加载因子,初始化容量 new HashMap<>(16, 0.75);
//for (int i = 0; i < 100; i++) {
// new Thread(()->{
// map.put(Thread.currentThread().getName(), UUID.randomUUID().toString().substring(0,5));
// System.out.println(map);
// }).start();
//}
// 方式1 集合工具类+同步锁 Collections.synchronizedMap
//Map<String, String> map = Collections.synchronizedMap(new HashMap<>());
加载因子,初始化容量 new HashMap<>(16, 0.75);
//for (int i = 0; i < 100; i++) {
// new Thread(()->{
// map.put(Thread.currentThread().getName(), UUID.randomUUID().toString().substring(0,5));
// System.out.println(map);
// }).start();
//}
// 方式2 并发哈希Map ConcurrentHashMap
Map<String, String> map = new ConcurrentHashMap<>();
// 加载因子,初始化容量 new HashMap<>(16, 0.75);
for (int i = 0; i < 100; i++) {
new Thread(()->{
map.put(Thread.currentThread().getName(), UUID.randomUUID().toString().substring(0,5));
System.out.println(map);
}).start();
}
}
}
Set
package com.kuang.unsafe;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArraySet;
/**
* Set 单线程下是安全的,并发情况下不安全。
* 并发情况下,有些集合类会报错:并发修改异常 ConcurrentModificationException
*
* 解决办法:
* 方式1 工具类Collections.synchronizedSet
* 方式2 写入时复制 new CopyOnWriteArraySet<>()
*
*
* hashSet底层时什么?
* public HashSet() {
* map = new HashMap<>();
* }
* // set.add set本质是map,key无法重复!
* public boolean add(E e) {
* return map.put(e, PRESENT)==null;
* }
* // 不变的值
* private static final Object PRESENT = new Object();
*/
public class SetTest {
public static void main(String[] args) {
// 单线程下,没有问题
//HashSet<Object> set = new HashSet<>();
//for (int i = 0; i < 1000; i++) {
// set.add(UUID.randomUUID().toString().substring(0,5));
// System.out.println(set);
//}
// 多线程下,并发修改异常
//Set<Object> set = new HashSet<>();
//for (int i = 0; i < 1000; i++) {
// new Thread(()->{
// set.add(UUID.randomUUID().toString().substring(0,5));
// System.out.println(set);
// }, String.valueOf(i)).start();
//}
// 方式1 工具类Collections.synchronizedSet
//Set<Object> set = Collections.synchronizedSet(new HashSet<>());
//for (int i = 0; i < 1000; i++) {
// new Thread(()->{
// set.add(UUID.randomUUID().toString().substring(0,5));
// System.out.println(set);
// }, String.valueOf(i)).start();
//}
// 方式2 写入时复制 new CopyOnWriteArraySet<>()
Set<Object> set = new CopyOnWriteArraySet<>();
for (int i = 0; i < 1000; i++) {
new Thread(()->{
set.add(UUID.randomUUID().toString().substring(0,5));
System.out.println(set);
}, String.valueOf(i)).start();
}
}
}
7、Callable
-
与Runnable区别:
Callable有返回值(类型就是参数类型);Runnable无返回值
- Callable通过Runnable开启线程
package com.kuang.callable;
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;
/**
* Callable实现模板
*
*/
public class CallableTest {
public static void main(String[] args) throws Exception {
//启动callable
MyThread thread = new MyThread();
//适配类:FutureTask
FutureTask futureTask = new FutureTask(thread);
new Thread(futureTask,"A").start();
new Thread(futureTask,"B").start();
/*
这里两个线程的时候,不会打印2个call
这里测试过了,实际上不是缓存,是只执行了一次,也就是两个线程后面进入的会覆盖前面的,也有可能是后面进入的被抛弃了,暂时没有更细化的研究
这个并不是缓存,是由于JVM第二次再调用FutrueTask对象所持有的线程,此时FutrueTask的state此时已非NEW状态(各个状态,这边不做详细解释)
Possible state transitions:
NEW -> COMPLETING -> NORMAL
NEW -> COMPLETING -> EXCEPTIONAL
NEW -> CANCELLED
NEW -> INTERRUPTING -> INTERRUPTED
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
从这些中间状态到最终状态的转换使用更便宜的有序/延迟写入,因为值是唯一的,不能进一步修改。可能的状态转换:新建->完成->正常新建->完成->异常新建->取消新建->中断->中断
*/
//获取Callable的返回结果
Integer o = (Integer) futureTask.get();//这个get方法可能会产生阻塞!
//或者使用异步通信来处理!
System.out.println(o);
}
}
class MyThread implements Callable<Integer>{
@Override
public Integer call() throws Exception {
System.out.println("call");
//耗时的操作
return 1024;
}
}
8、常用辅助类(必会:加减法计数器、信号量)
CountDownLatch
减法计数器
package com.kuang.helperclass;
import java.util.concurrent.CountDownLatch;
/**
* 倒计时计数器(减法计数器)
*
* 理解:
* 就像保安锁门,需要等所有人都走完再锁门,如果还有人,就不能锁门
*
* 原理:
* countDownLatch.countDown();数量减一
* countDownLatch.await();等待计数器归0
* 每次有线程调用 countDown()数量-1,假设计数器变为0,countDownLatch.await()就会被唤醒,继续执行
*
*/
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
// 总数是6
CountDownLatch countDownLatch = new CountDownLatch(6);
for (int i = 1; i <= 6; i++) {
final int finalI = i;
new Thread(()->{
System.out.println(Thread.currentThread().getName()+ "线程走了,i="+ finalI);
countDownLatch.countDown();//每走一个线程,数量-1
}).start();
}
countDownLatch.await(); //等待计数器归0,再向下执行,本质理解:await()执行时,会判断上面线程是否执行完,没有完的话,不会切换CPU给主线程
System.out.println(Thread.currentThread().getName()+"保安要等6个线程都执行完了,主线程关门");
/*
Thread-0线程走了,i=1
Thread-4线程走了,i=5
Thread-5线程走了,i=6
Thread-3线程走了,i=4
Thread-2线程走了,i=3
Thread-1线程走了,i=2
main保安要等6个线程都执行完了,主线程关门
*/
}
}
CyclicBarrier
加法计数器
package com.kuang.helperclass;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
/**
* 加法计数器
*
* 理解:
* 就相当于集齐7个龙珠,才能召唤神龙!
*
*/
public class CyclicBarrierDemo {
public static void main(String[] args) {
// 召唤神龙主线程
CyclicBarrier cyclicBarrier = new CyclicBarrier(7, () -> { // 如果这里需要收集8颗龙珠,那线程就会一直等待
System.out.println("7颗龙珠收集成功,神龙出世!!!");
});
// 收集7龙珠子线程
for (int i = 1; i <= 7; i++) {
final int finalI = i;
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"收集龙珠"+finalI+"号");
try {
cyclicBarrier.await(); //等待,等这个线程走完,才能收集下一个, 直到收集到第7个,唤醒主线程
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
/*
Thread-0收集龙珠1号
Thread-4收集龙珠5号
Thread-3收集龙珠4号
Thread-2收集龙珠3号
Thread-1收集龙珠2号
Thread-6收集龙珠7号
Thread-5收集龙珠6号
7颗龙珠收集成功,神龙出世!!!
*/
}
}
Semaphore
信号量
package com.kuang.helperclass;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
/**
* Semaphore 信号量
* 限流的时候使用!流量入口就这么大,不可能一次进来很多!
* Semaphore: 同一时间只能有指定数量个得到线程
* CyclicBarrier : 指定个数线程执行完毕再执行操作
*
*
* 理解:
* 抢车位!6个车-3个停车位
* 123 3走了,5进来 -> 125 2走了,6进来 -> 165
*
*
* 原理:
* semaphore.acquire(); 获得,假设如果已经满了,等待,等待被释放为止!
* semaphore.release(); 释放,会将当前的信号量释放+1,然后唤醒等待的线程!
* 作用:
* 多个共享资源互斥的使用!
* 并发限流,控制最大的线程数!
*/
public class SemaphoreDemo {
public static void main(String[] args) {
// 线程数量入口限流!停车位3
Semaphore semaphore = new Semaphore(3);
// 6个线程访问!6个车抢停车位
for (int i = 1; i <= 6; i++) {
new Thread(()->{
try {
// acquire() 得到
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "抢到车位");
TimeUnit.SECONDS.sleep(5);
System.out.println(Thread.currentThread().getName() + "离开车位");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release(); // 释放
}
},String.valueOf(i)).start();
}
/*
1抢到车位
2抢到车位
4抢到车位
(========停顿5s========)
1离开车位
4离开车位
2离开车位
5抢到车位
3抢到车位
6抢到车位
(========停顿5s========)
6离开车位
3离开车位
5离开车位
*/
}
}
9、读写锁
ReadWriteLock
package com.kuang.rwlock;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* ReadWriteLock 读写锁
* 读-读 可以共存!
* 读-写 可以共存!
* 写-写 不能共存!
* 独占锁(写锁):一次只能被一个线程占有!
* 共享锁(读锁):多个线程可以同时占用!
*
* 需求:
* 想要写的时候,只有一个线程写,读的时候,可以多个线程读(写的时候,希望是原子性操作)
* 使用 sychronized 锁的是方法的调用者, 使用 static sychronized 锁的是Class模板,调用这些方法都会锁住,
* 但是我们想要更细力度控制:写的时候,只有一个线程写,读的时候,可以多个线程读,
* 所以,使用读写锁
*
*
*
*
*/
public class ReadWriteLockDemo {
public static void main(String[] args) {
// 无锁时
//MyCache myCache = new MyCache();
写入
//for (int i = 1; i <= 5; i++) {
// final int finalI = i;
// new Thread(()->{
// myCache.put(finalI+"", "值");
// },"写线程-"+i).start();
//}
读取
//for (int i = 1; i <= 5; i++) {
// final int finalI = i;
// new Thread(()->{
// myCache.get(finalI+"");
// },"读线程-"+i).start();
//}
// 读写锁时
MyCacheWithReadWriteLock myCache = new MyCacheWithReadWriteLock();
// 写入
for (int i = 1; i <= 5; i++) {
final int finalI = i;
new Thread(()->{
myCache.put(finalI+"", "值");
},"写线程-"+i).start();
}
// 读取
for (int i = 1; i <= 5; i++) {
final int finalI = i;
new Thread(()->{
myCache.get(finalI+"");
},"读线程-"+i).start();
}
/*
实现效果:写线程原子性操作,读线程随意读取,所有人都可以读,可以随意插队
写线程-1写入key=1,value=值
写线程-1写入ok
写线程-4写入key=4,value=值
写线程-4写入ok
写线程-5写入key=5,value=值
写线程-5写入ok
写线程-2写入key=2,value=值
写线程-2写入ok
写线程-3写入key=3,value=值
写线程-3写入ok
读线程-2读取2
读线程-2读取ok值
读线程-3读取3
读线程-1读取1
读线程-5读取5
读线程-1读取ok值
读线程-3读取ok值
读线程-4读取4
读线程-5读取ok值
读线程-4读取ok值
*/
}
}
/*
自定义缓存 读写锁
*/
class MyCacheWithReadWriteLock{
private volatile Map<String, Object> map = new HashMap<>();
//private Lock lock = new ReentrantLock();
// 读写锁:更细粒度控制加锁
private ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
// 存,写入的时候,只希望同时只有一个线程写!!!
public void put(String key, Object value){
// 写锁加锁
readWriteLock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName() + "写入key=" + key + ",value=" + value);
map.put(key, value);
System.out.println(Thread.currentThread().getName() + "写入ok");
}catch (Exception e){
e.printStackTrace();
}finally {
// 读写锁释放锁
readWriteLock.writeLock().unlock();
}
}
// 取,读的时候,希望所有人都能读!!!
public void get(String key){
// 读锁加锁
readWriteLock.readLock().lock();
try {
System.out.println(Thread.currentThread().getName() + "读取" + key);
Object o = map.get(key);
System.out.println(Thread.currentThread().getName() + "读取ok"+o);
}catch (Exception e){
e.printStackTrace();
}finally {
readWriteLock.readLock().unlock();
}
}
}
/*
自定义缓存 无锁的
*/
class MyCache{
private volatile Map<String, Object> map = new HashMap<>();
// 存,写
public void put(String key, Object value){
System.out.println(Thread.currentThread().getName() + "写入key=" + key + ",value=" + value);
map.put(key, value);
System.out.println(Thread.currentThread().getName() + "写入ok");
}
// 取,读
public void get(String key){
System.out.println(Thread.currentThread().getName() + "读取" + key);
Object o = map.get(key);
System.out.println(Thread.currentThread().getName() + "读取ok"+o);
}
/*
预期出现:1写入,1写入完成, 2写入,2写入完成,...
出现结果:1还没有写入oK,5写入,写的时候会被插队
这样会有什么问题吗? 同时写入,CPU切换过于频繁,你想要的效果是,一个写入完成后写入下一个,所以没有达到预期效果
写线程-1写入key=1value=值
写线程-5写入key=5value=值
写线程-5写入ok
写线程-4写入key=4value=值
写线程-3写入key=3value=值
写线程-2写入key=2value=值
写线程-3写入ok
写线程-4写入ok
写线程-1写入ok
写线程-2写入ok
读线程-1读取1
读线程-2读取2
读线程-1读取ok
读线程-2读取ok
读线程-3读取3
读线程-3读取ok
读线程-4读取4
读线程-5读取5
读线程-4读取ok
读线程-5读取ok
*/
}
10、阻塞队列
方式 | 抛出异常 | 有返回值,不抛出异常 | 阻塞,等待 | 超时等待 |
---|---|---|---|---|
添加 | add() | offer() | put() | offer(,) |
移除 | remove() | poll() | take() | poll(,) |
检测队首元素 | element | peek | - | - |
package com.kuang.blockqueue;
import com.sun.media.sound.SoftTuning;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
/**
* 阻塞队列: BlockingQueue
* 下面有:
* ArrayBlockingQueue (这个)
* LinkedBlockingDeque
* 什么情况下对列发生阻塞?
* FIFO先进先出
* 写入:如果队列满了,就必须阻塞等待
* 取:如果是队列为空,必须阻塞等待生产
* 什么情况下使用阻塞队列?
* 多线程并发处理,线程池!
* 阻塞队列有哪些操作?
* 添加、移除两个操作
* 四组API,实现不同效果:
* 1.抛出异常
* 检查队首元素:element()
* add() remove() 组合处理添加、移除操作
* 插入成功返回true,队列满了,插入失败返回异常
* 非法状态异常:IllegalStateException Exception:Queue full
* 2.不会抛出异常
* 检查队首元素:peek()
* offer() poll() 组合处理添加、移除操作
* 插入成功返回true,队列满了,插入失败返回false
* 如果队列满了,再添加时,不会抛出异常
* 3.阻塞等待
* put() task() 组合处理添加、移除操作
* 无返回值
* 如果队列满了,再添加时,会一直阻塞等待,不结束
* 4.超时等待
* offer(,,) poll(,) 组合处理添加、移除操作
* 如果队列满了,再添加时,会一直阻塞等待,直到超多一定时间,就不等待了
*/
public class Test {
public static void main(String[] args) throws InterruptedException {
//test1();
//test2();
//test3();
test4();
}
/*
1、抛出异常
*/
public static void test1(){
// 队列大小3
ArrayBlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(3);
// add 存,返回true|false
System.out.println(blockingQueue.add("a"));
System.out.println(blockingQueue.add("b"));
System.out.println(blockingQueue.add("c"));
// 查看队首元素
System.out.println(blockingQueue.element());
System.out.println("========多余的开始进入=======");
System.out.println(blockingQueue.add("d"));
// remove 取,返回true|false
System.out.println(blockingQueue.remove("a"));
System.out.println(blockingQueue.remove("b"));
System.out.println(blockingQueue.remove("c"));
/*
true
true
true
a
========多余的开始进入=======
Exception:IllegalStateException非法状态异常 Queue full
*/
}
/*
2、不会抛出异常
*/
public static void test2(){
// 队列大小3
ArrayBlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(3);
// offer 存,返回true|false
System.out.println(blockingQueue.offer("a"));
System.out.println(blockingQueue.offer("b"));
System.out.println(blockingQueue.offer("c"));
// 查看队首元素
System.out.println(blockingQueue.peek());
System.out.println("========多余的开始进入=======");
System.out.println(blockingQueue.offer("d"));
// poll 取,返回true|false
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
/*
true
true
true
a
========多余的开始进入=======
false
a
b
c
*/
}
/*
3、阻塞等待
*/
public static void test3() throws InterruptedException {
// 队列大小3
ArrayBlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(3);
// offer 存,无返回值
blockingQueue.put("a");
blockingQueue.put("b");
blockingQueue.put("c");
System.out.println("========多余的开始进入=======");
blockingQueue.put("d");
// poll 取,无返回值
blockingQueue.take();
blockingQueue.take();
blockingQueue.take();
/*
========多余的开始进入=======
一直阻塞,不结束
*/
}
/*
4、超时等待
*/
public static void test4() throws InterruptedException {
// 队列大小3
ArrayBlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(3);
// offer 存,无返回值
blockingQueue.offer("a");
System.out.println("放入a");
blockingQueue.offer("b");
System.out.println("放入b");
blockingQueue.offer("c");
System.out.println("放入c");
System.out.println("========多余的开始进入=======");
blockingQueue.offer("d",5, TimeUnit.SECONDS); // 等待超过5秒,还没有位置,就结束等待
// poll 取,无返回值
blockingQueue.poll();
System.out.println("取出a");
blockingQueue.poll();
System.out.println("取出b");
blockingQueue.poll();
System.out.println("取出c");
System.out.println("========开始取出队列已经没有的=======");
blockingQueue.poll(5,TimeUnit.SECONDS); // 等待超过2秒,还没有东西可以拿,就结束等待,不拿了,下面没有执行语句了,就退出
/*
放入a
放入b
放入c
========多余的开始进入=======
取出a
取出b
取出c
========开始取出队列已经没有的=======
等待5s后,没有进入的可以取出,退出
*/
}
}
同步队列
package com.kuang.blockqueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
/**
* 同步队列
* 和其他 BlockingQueue 不一样,SychronousQueue 不存储元素
* 没有容量,进去一个元素,必须等待取出来之后,才能再往里放一个元素!
* put、take
*/
public class SynchronousQueueTest {
public static void main(String[] args) {
// 同步队列 SynchronousQueue mplements BlockingQueue
//SynchronousQueue<Object> synchronousQueue = new SynchronousQueue<>();
BlockingQueue<Object> synchronousQueue = new SynchronousQueue<>();
new Thread(()->{
try {
System.out.println(Thread.currentThread().getName()+"put 1");
synchronousQueue.put("1");
System.out.println(Thread.currentThread().getName()+"put 2");
synchronousQueue.put("2");
System.out.println(Thread.currentThread().getName()+"put 3");
synchronousQueue.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"T1:").start();
new Thread(()->{
try {
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName()+"=>"+synchronousQueue.take());
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName()+"=>"+synchronousQueue.take());
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName()+"=>"+synchronousQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
},"T2:").start();
}
/*
T1:put 1
T2:=>1
T1:put 2
T2:=>2
T1:put 3
T2:=>3
*/
}
11、线程池(重点)
三大方法
七大参数
结合业务理解线程池
核心线程数2;最大线程数5;阻塞队列:候客区;拒绝进入:拒绝策略
超时释放:
四种拒绝策略
代码实例:
第一个拒绝策略:
1个人进去
5个人进去(3个人在候客厅)
8个人进去(5个窗口都开放+3个在候客厅)
9个人进去(多1个人,被拒绝进入,抛出异常)
第二个拒绝策略:
第三种拒绝策略:
第四种拒绝策略:
池的最大线程的大小如何设置?
代码
package com.kuang.threadpool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 如果创建线程池?
* 线程池不允许使用Executors直接创建,需要通过ThreadPoolExecutor创建,避免资源耗尽;
* 原因1:固定线程池(FixedThreadPool)和单例线程池(SingleThreadPool)允许的 请求队列 长度为Integer.MAX_VALU,约21亿,可能会堆积大量请求,导致OOM内存溢出
* 原因2:可变线程池(CachedThreadPool)和调度线程池(ScheduledThreadPool)允许的 创建线程数量 为Integer.MAX_VALUE,可能会创建大量线程,导致OOM
* Executors 工具类,3大方法
* 1.Executors.newSingleThreadExecutor(); //单个线城池
* 2.Executors.newFixedThreadPool(5); //固定的线程池大小
* 3.Executors.newCachedThreadPool(); //可伸缩的
*/
public class Demo01 {
public static void main(String[] args) {
// 1.单个线城池
ExecutorService singleThread = Executors.newSingleThreadExecutor();
// 2.创建一个固定的线程池大小
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
// 3.可伸缩的,遇强则强,遇弱则弱
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
// 1.单线程池;创建100个线程
//try{
// for (int i = 1; i < 100; i++) {
// singleThread.execute(()->{
// System.out.println(Thread.currentThread().getName() + ":ok");
// });
// }
//}catch (Exception e){
// e.printStackTrace();
//}finally {
// // 线程池用完,程序结束,关闭线程池
// singleThread.shutdown();
//}
/*
...
pool-1-thread-1:ok 使用的都是线程池1中的线程1
pool-1-thread-1:ok
pool-1-thread-1:ok
...
*/
// 2.固定的线程池;创建100个线程
//try{
// for (int i = 1; i < 100; i++) {
// fixedThreadPool.execute(()->{
// System.out.println(Thread.currentThread().getName() + ":ok");
// });
// }
//}catch (Exception e){
// e.printStackTrace();
//}finally {
// // 线程池用完,程序结束,关闭线程池
// singleThread.shutdown();
//}
/*
...
pool-2-thread-2:ok 使用线程池2中的线程,最大5
pool-2-thread-5:ok
pool-2-thread-4:ok
pool-2-thread-1:ok
pool-2-thread-3:ok
pool-2-thread-1:ok
pool-2-thread-4:ok
pool-2-thread-2:ok
pool-2-thread-5:ok
...
*/
// 3.可伸缩的线程池;创建100个线程
try{
for (int i = 1; i < 100; i++) {
cachedThreadPool.execute(()->{
System.out.println(Thread.currentThread().getName() + ":ok");
});
}
}catch (Exception e){
e.printStackTrace();
}finally {
// 线程池用完,程序结束,关闭线程池
singleThread.shutdown();
}
/*
...
pool-3-thread-12:ok 使用线程池3中的线程
pool-3-thread-16:ok
pool-3-thread-18:ok
pool-3-thread-16:ok
pool-3-thread-8:ok
pool-3-thread-14:ok
pool-3-thread-9:ok
pool-3-thread-8:ok
pool-3-thread-11:ok
...
*/
}
}
package com.kuang.threadpool;
import java.util.concurrent.*;
/**
* 创建线程池的3大方法,都有7大参数,参数含义是什么?
* corePoolSize, // 核心线程池大小
* maximumPoolSize, // 最大核心线程池大小
* keepAliveTime, // 保持存活时长
* unit, // 超时释放单位
* workQueue, // 阻塞队列:默认最大21亿
* Executors.defaultThreadFactory(), // 线程工厂,创建线程的,一般不动
* defaultHandler); // 拒绝策略:默认AbortPolicy()堕胎政策
*
* 结合业务理解线程池及参数含义:
* 银行办理业务:
* 银行大厅 --> 线程池
* 柜台:5个柜台:2个开设柜台,3个备用柜台 --> 最大核心线程数:5,核心线程数:2,
* 候客区:3个座位 --> 阻塞队列
* 1个人进去,直接办理
* 5个人进去(3个人在候客厅)
* 8个人进去(5个窗口都开放+3个在候客厅)
* 9个人进去(多1个人,被拒绝进入,抛出异常)
* 1个小时后,只剩2个窗口在服务,3个备用柜台就关闭释放
*/
public class Demo02 {
// 1.单个线城池
ExecutorService singleThread = Executors.newSingleThreadExecutor();
// 2.创建一个固定的线程池大小
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
// 3.可伸缩的,遇强则强,遇弱则弱
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
// 本质:ThreadPoolExecutor 源码分析
//public ThreadPoolExecutor(int corePoolSize, // 核心线程池大小
// int maximumPoolSize, // 最大核心线程池大小
// long keepAliveTime, // 超时了没有人调用就会释放
// TimeUnit unit, // 超时释放单位
// BlockingQueue<Runnable> workQueue) { // 阻塞队列 链接阻塞队列LinkedBlockingQueue
// 返回内容:
// this(corePoolSize, // 核心线程池大小
// maximumPoolSize, // 最大核心线程池大小
// keepAliveTime, // 保持存活时长
// unit, // 超时释放单位
// workQueue, // 阻塞队列:默认最大21亿
// Executors.defaultThreadFactory(), // 线程工厂,创建线程的,一般不动
// defaultHandler); // 拒绝策略:默认AbortPolicy()堕胎政策
//
//public ThreadPoolExecutor(int corePoolSize,
// int maximumPoolSize,
// long keepAliveTime,
// TimeUnit unit,
// BlockingQueue<Runnable> workQueue,
// ThreadFactory threadFactory,
// RejectedExecutionHandler handler)
//}
public static void main(String[] args) {
// 创建线程池
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
2,
5,
3,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
ThreadPoolExecutor threadPool1 = new ThreadPoolExecutor(
2,
5,
3,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(3));
try {
// 使用线程池
for (int i = 1; i < 100; i++) {
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName() + ":ok");
});
}
}catch (Exception e){
e.printStackTrace();
}finally {
// 线程池用完,线程结束,关闭线程池
threadPool.shutdown();
}
}
}
package com.kuang.threadpool;
import java.util.concurrent.*;
/**
* 结合业务理解线程池及参数含义:
* 银行办理业务:
* 银行大厅 --> 线程池
* 柜台:5个柜台:2个开设柜台,3个备用柜台 --> 最大核心线程数:5,核心线程数:2,
* 候客区:3个座位 --> 阻塞队列
* 1个人进去,直接办理
* 5个人进去(3个人在候客厅)
* 8个人进去(5个窗口都开放+3个在候客厅)
* 9个人进去(多1个人,被拒绝进入,抛出异常)
* 1个小时后,只剩2个窗口在服务,3个备用柜台就关闭释放
*
* 四种拒绝策略: RejectedExecutionHandler
* AbortPolicy() 默认 堕胎策略 第九个进来时,抛出异常
* CallerRunsPolicy() 调用者运行策略 第九个进来时,不抛出异常,调用者执行(main主线程执行)
* DiscardPolicy() 丢弃策略 第九个进来时,不抛出异常,丢弃任务
* DiscardOldestPolicy() 丢弃最老的策略 第九个进来时,不抛出异常,尝试和最早的竞争
*
*/
public class Demo03 {
public static void main(String[] args) {
// 1.AbortPolicy() 默认 堕胎策略,抛出异常
// 创建线程池
//ThreadPoolExecutor threadPool1 = new ThreadPoolExecutor(
// 2,
// 5,
// 3,
// TimeUnit.SECONDS,
// new LinkedBlockingQueue<>(3),
// Executors.defaultThreadFactory(),
// new ThreadPoolExecutor.AbortPolicy()); //拒绝策略1
//try {
// // 使用线程池
// for (int i = 1; i <= 100; i++) {
// threadPool1.execute(()->{
// System.out.println(Thread.currentThread().getName() + ":ok");
// });
// }
//}catch (Exception e){
// e.printStackTrace();
//}finally {
// // 线程池用完,线程结束,关闭线程池
// threadPool1.shutdown();
//}
/*
pool-1-thread-3:ok
pool-1-thread-5:ok
pool-1-thread-4:ok
pool-1-thread-1:ok
pool-1-thread-2:ok
pool-1-thread-4:ok
pool-1-thread-5:ok
pool-1-thread-3:ok
第九个进来时,抛出RejectedExecutionException
*/
// 2.CallerRunsPolicy() 调用者运行策略,不抛出异常
// 创建线程池
//ThreadPoolExecutor threadPool2 = new ThreadPoolExecutor(
// 2,
// 5,
// 3,
// TimeUnit.SECONDS,
// new LinkedBlockingQueue<>(3),
// Executors.defaultThreadFactory(),
// new ThreadPoolExecutor.CallerRunsPolicy()); //拒绝策略2
//try {
// // 使用线程池
// for (int i = 1; i <= 9; i++) {
// threadPool2.execute(()->{
// System.out.println(Thread.currentThread().getName() + ":ok");
// });
// }
//}catch (Exception e){
// e.printStackTrace();
//}finally {
// // 线程池用完,线程结束,关闭线程池
// threadPool2.shutdown();
//}
/*
pool-1-thread-4:ok
pool-1-thread-3:ok
pool-1-thread-1:ok
main:ok 第9个人,主线程来处理,谁让他进入线程池的,就回到原来地方处理,就像公司让你去银行办理,银行拒绝,让你去公司办理,公司可以代办
pool-1-thread-4:ok
pool-1-thread-4:ok
pool-1-thread-2:ok
pool-1-thread-1:ok
pool-1-thread-5:ok
*/
// 3.DiscardPolicy() 丢弃策略,不抛出异常
// 创建线程池
//ThreadPoolExecutor threadPool3 = new ThreadPoolExecutor(
// 2,
// 5,
// 3,
// TimeUnit.SECONDS,
// new LinkedBlockingQueue<>(3),
// Executors.defaultThreadFactory(),
// new ThreadPoolExecutor.DiscardPolicy()); //拒绝策略3
//try {
// // 使用线程池
// for (int i = 1; i <= 9; i++) {
// threadPool3.execute(()->{
// System.out.println(Thread.currentThread().getName() + ":ok");
// });
// }
//}catch (Exception e){
// e.printStackTrace();
//}finally {
// // 线程池用完,线程结束,关闭线程池
// threadPool3.shutdown();
//}
/*
pool-1-thread-1:ok
pool-1-thread-5:ok
pool-1-thread-1:ok
pool-1-thread-1:ok
pool-1-thread-4:ok
pool-1-thread-3:ok
pool-1-thread-2:ok
pool-1-thread-5:ok 不要第9个人
*/
// 4.DiscardOldestPolicy() 抛弃保单策略,不抛出异常
// 创建线程池
ThreadPoolExecutor threadPool4 = new ThreadPoolExecutor(
2,
5,
3,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardOldestPolicy()); //拒绝策略4
try {
// 使用线程池
for (int i = 1; i <= 9; i++) {
threadPool4.execute(()->{
System.out.println(Thread.currentThread().getName() + ":ok");
});
}
}catch (Exception e){
e.printStackTrace();
}finally {
// 线程池用完,线程结束,关闭线程池
threadPool4.shutdown();
}
/*
pool-1-thread-1:ok
pool-1-thread-4:ok
pool-1-thread-1:ok
pool-1-thread-3:ok
pool-1-thread-2:ok
pool-1-thread-1:ok
pool-1-thread-4:ok
pool-1-thread-5:ok 第9个进来,尝试和最早 的竞争,不抛出异常,这里没有竞争到,就抛弃了
*/
}
}
最大线程如何设置?
1.CPU密集型,几核,就是几,可以保证CPU效率最高
获取CPU数量:注意:不要写死,代码获取CPU核数,because运维CPU设备比你牛时,你写死了,会被打死!
①任务管理器查看
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ckEi7fG8-1689140798570)(C:\Users\admin\AppData\Roaming\Typora\typora-user-images\image-20230703170543989.png)]
②设备管理器查看
③代码查看
Runtime.getRuntime().availableProcessors();
2.IO密集型(调优)
假设有15个大型任务IO十分占用资源,这时设计时,最好设置为 > 2*15 ~ 30核
12、四大函数式接口(必会)
新时代程序员:lambda表达式、链式编程、函数式接口、Stream流式计算
函数式接口
java.util.function
只有一个方法的接口,如:
@FunctionalInterface
public interface Runnable {
public abstract void run();
}
//简化编程模型
//foreach() 消费者类型的函数式接口
Function 函数型接口
只有一行时,lambda简化:
Predicate 断定型接口
Custmer 消费型接口
只有输入,没有返回,饕餮!
Supplier 供给型接口
只有返回,乜有输入,不用吃东西,只产出是啥!爷爷的爱
package com.kuang.FunctionalInterface;
import java.util.function.Function;
/**
* Function 函数型接口:
* 一个输入,一个输出
* T:传入参数 R:返回类型
* @FunctionalInterface
* public interface Function<T, R> {
* R apply(T t);
* }
*/
public class Test1 {
public static void main(String[] args) {
Function function = new Function<String, String>() {
@Override
public String apply(String s) {
return s;
}
};
System.out.println(function.apply("输出s"));
// lambda简化
Function function1 = (str)->{return str;};
System.out.println(function.apply("简化s"));
}
}
package com.kuang.FunctionalInterface;
import java.util.function.Function;
import java.util.function.Predicate;
/**
* Predicate 断定型接口
* 一个输入,boolean输出
*
*/
public class Test2 {
public static void main(String[] args) {
Predicate predicate = new Predicate<String>() {
@Override
public boolean test(String s) {
return s.isEmpty();
}
};
System.out.println(predicate.test("输出s"));
System.out.println(predicate.test(""));
// lambda简化
Predicate<String> predicate1 = (str)->{return str.isEmpty();};
Predicate<String> predicate11 = str->{return str.isEmpty();};
Predicate<String> predicate2 = String::isEmpty;
System.out.println(predicate1.test("简化"));
System.out.println(predicate11.test(""));
System.out.println(predicate2.test("简化"));
System.out.println(predicate2.test(""));
}
}
package com.kuang.FunctionalInterface;
import java.util.function.Consumer;
import java.util.function.Predicate;
/**
* Cunsumer 消费型接口
* 只有输入,没有返回,饕餮!
*/
public class Test3 {
public static void main(String[] args) {
Consumer<String> consumer = new Consumer<String>() {
@Override
public void accept(String s) {
System.out.println(s);
}
};
consumer.accept("测试");
// lambda简化
Consumer<String> consumer1 = (str) -> {System.out.println(str);};
Consumer<String> consumer11 = System.out::println;
consumer1.accept("简化测试");
consumer11.accept("简化测试");
}
}
package com.kuang.FunctionalInterface;
import java.util.function.Supplier;
/**
* Supplier 供给型接口
* 只有返回,乜有输入,不用吃东西,只产出是啥!爷爷的爱
*/
public class Test4 {
public static void main(String[] args) {
Supplier<Integer> supplier = new Supplier<Integer>(){
@Override
public Integer get() {
return 1024;
}
};
System.out.println(supplier.get());
// lambda简化
Supplier<String> supplier1 = ()->{return "test";};
System.out.println(supplier1.get());
}
}
13、Stream流式计算
14、ForkJoin
在JDK1.7,作用:并行情况下,提高效率,大数据量情况下使用!
特点:工作窃取
B线程执行的快,就会窃取A线程的任务,
里面维护的是双端队列
操作:
package com.kuang.forkjoin;
import java.util.concurrent.RecursiveTask;
/**
* ForkJoin
* 分支合并,在JDK1.7
* 作用:并行情况下,提高效率,大数据量情况下使用!一个线程并发为多个线程使用
* 特点:工作窃取 B线程执行的快,就会窃取A线程的任务,里面维护的是双端队列(两边都可以操作)
*
* 实例:1+2+...10亿求和
* 1.一般操作 fori,sum+=i;
* 2.使用ForkJoin(更快)
* 3.使用Stream并行流(更快)
*
* 如何使用ForkJoin?
* 1.通过 ForkJoinPool()来执行
* 2.计算任务 forkJoinPool.execute(ForkJoinTask task)
* submit() 异步 提交任务,有结果
* execute() 同步 执行任务,没有结果
* 3.计算类继承 extends ForkJoinTask
*
*
*/
public class ForkJoinDemo extends RecursiveTask<Long> { //继承JUC递归任务
private Long start; //1
private Long end; //1990900000
private Long num = 1_0000_0000L; //临界值:小于10w个数相加时,一般方法,大于10w个数相加时,使用ForkJoin/Stream流
public ForkJoinDemo(Long start, Long end) {
this.start = start;
this.end = end;
}
// 计算方法
@Override
protected Long compute() {
// 定义一个临界值,如果 <临界值,就用普通求和方式,如果 >临界值,就用ForkJoin
if ((end - start) < num){
Long sum = 0L;
for (Long i = start; i < end; i++) {
sum += i;
}
return sum;
}else {
Long middle = (start + end)/2;
System.out.println("forkjoin middle="+middle);
ForkJoinDemo task1 = new ForkJoinDemo(start,middle);
task1.fork(); //拆分任务,把任务压入线程队列
ForkJoinDemo task2 = new ForkJoinDemo(middle,end);
task2.fork(); //拆分任务,把任务压入线程队列
return task1.join()+task2.join(); //分解任务,最终每个分支都走了if里面
}
}
}
15、异步回调
CompletableFuture
package com.kuang.feture;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
/**
* 异步回调
* Feture:未来,设计初中:对将来的某个事件进行建模
* 异步执行
* 成功回调
* 失败回调
* 理解:
* AJAX,客户端发异步请求到服务器
* Feture,多个线程同时执行时,异步执行线程请求,可以获取请求结果(or Void不获取请求结果)
* 使用:
* 没有返回值的 runAsync 异步回调,可以通过get()获取
* 有返回值的 supplyAsync 异步回调,返回类型为泛型的类型
*/
//没有返回值的 runAsync 异步回调
public class Test1 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 发起一个异步请求
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "runAsync=>Void");
});
// 主线程
System.out.println("main:1111");
// 获取阻塞执行结果:异步请求默认没有返回,可以通过get()来获取结果
voidCompletableFuture.get();
/*执行结果:
main:1111
ForkJoinPool.commonPool-worker-1runAsync=>Void
*/
}
}
package com.kuang.feture;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
//有返回值的 supplyAsync 异步回调
// ajax,成功和失败的回调
// 返回的是错误信息
public class Test2 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("========start========");
// 异步任务
CompletableFuture<Integer> integerCompletableFuture = CompletableFuture.supplyAsync(() -> { //Supplier 供给型接口,前面四大函数式接口讲过
int i = 10/0;
System.out.println(Thread.currentThread().getName() + "supplyAsync=>Integer");
return 1024;
});
// 完成异步任务后,打印输出成功回到、或者失败回调
System.out.println(integerCompletableFuture.whenComplete((t, u) -> { //BiConsumer<T, U> 增强版消费型接口
// success
System.out.println("t=>" + t); // t:正常返回结果
System.out.println("u=>" + u); // u:错误信息:没有就输出null
}).exceptionally((e) -> {
// error
System.out.println(e.getMessage()); // 输出异常信息
return 233; // 可以获取到错误的返回结果
}).get());
System.out.println("========end========");
/*
异步执行内容中没有错误时:
========start========
ForkJoinPool.commonPool-worker-1supplyAsync=>Integer
t=>1024
u=>null
1024
========end========
异步执行内容中有错误时:
========start========
t=>null
u=>java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
java.lang.ArithmeticException: / by zero
233
========end========
*/
}
}
16、JMM
问题:线程A正在用flag = true,进行工作,还没有写入主内存时,线程B修改flag = false,并写入了内容中,这时候就会有线程A工作内容中的flag与主存中值不一致。
参考文章:Java内存模型(JMM)总结 - Doing的文章 - 知乎 https://zhuanlan.zhihu.com/p/29881777
JMM模型下的线程间通信:
线程间通信必须要经过主内存。
如下,如果线程A与线程B之间要通信的话,必须要经历下面2个步骤:
1)线程A把本地内存A中更新过的共享变量刷新到主内存中去。
2)线程B到主内存中去读取线程A之前已更新过的共享变量。
关于主内存与工作内存之间的具体交互协议,即一个变量如何从主内存拷贝到工作内存、如何从工作内存同步到主内存之间的实现细节,Java内存模型定义了以下八种操作来完成:
- lock(锁定):作用于主内存的变量,把一个变量标识为一条线程独占状态。
- unlock(解锁):作用于主内存变量,把一个处于锁定状态的变量释放出来,释放后的变量才可以被其他线程锁定。
- read(读取):作用于主内存变量,把一个变量值从主内存传输到线程的工作内存中,以便随后的load动作使用
- load(载入):作用于工作内存的变量,它把read操作从主内存中得到的变量值放入工作内存的变量副本中。
- use(使用):作用于工作内存的变量,把工作内存中的一个变量值传递给执行引擎,每当虚拟机遇到一个需要使用变量的值的字节码指令时将会执行这个操作。
- assign(赋值):作用于工作内存的变量,它把一个从执行引擎接收到的值赋值给工作内存的变量,每当虚拟机遇到一个给变量赋值的字节码指令时执行这个操作。
- store(存储):作用于工作内存的变量,把工作内存中的一个变量的值传送到主内存中,以便随后的write的操作。
- write(写入):作用于主内存的变量,它把store操作从工作内存中一个变量的值传送到主内存的变量中。
Java内存模型还规定了在执行上述八种基本操作时,必须满足如下规则:
- 如果要把一个变量从主内存中复制到工作内存,就需要按顺寻地执行read和load操作, 如果把变量从工作内存中同步回主内存中,就要按顺序地执行store和write操作。但Java内存模型只要求上述操作必须按顺序执行,而没有保证必须是连续执行。
- 不允许read和load、store和write操作之一单独出现
- 不允许一个线程丢弃它的最近assign的操作,即变量在工作内存中改变了之后必须同步到主内存中。
- 不允许一个线程无原因地(没有发生过任何assign操作)把数据从工作内存同步回主内存中。
- 一个新的变量只能在主内存中诞生,不允许在工作内存中直接使用一个未被初始化(load或assign)的变量。即就是对一个变量实施use和store操作之前,必须先执行过了assign和load操作。
- 一个变量在同一时刻只允许一条线程对其进行lock操作,但lock操作可以被同一条线程重复执行多次,多次执行lock后,只有执行相同次数的unlock操作,变量才会被解锁。lock和unlock必须成对出现
- 如果对一个变量执行lock操作,将会清空工作内存中此变量的值,在执行引擎使用这个变量前需要重新执行load或assign操作初始化变量的值
- 如果一个变量事先没有被lock操作锁定,则不允许对它执行unlock操作;也不允许去unlock一个被其他线程锁定的变量。
- 对一个变量执行unlock操作之前,必须先把此变量同步到主内存中(执行store和write操作)。
17、Volatile
验证1:保证可见性
参考文章:C语言丨深入理解volatile关键字 - 沐歌爱编程的文章 - 知乎 https://zhuanlan.zhihu.com/p/343688629
问题:线程A正在用flag = true,进行工作,还没有写入主内存时,线程B修改flag = false,并写入了内容中,这时候就会有线程A工作内存中的flag与主存中值不一致。
处理思路:
解决办法:
对共享变量用 volatile进行修饰,说明这个变量是“易变的”,提醒编译器编译后的程序每次需要对这个变量操作的时候,不要做编译优化(不要读取工作内存的值),直接从变量内存地址中读取数据,从而解决A线程不可见问题。
测试:
package com.kuang.volatiledemo;
import java.util.concurrent.TimeUnit;
/**
* volatile :
* 1.保证可见性
* 2.不保证原子性
* 3.禁止指令重排
*
* 验证1:保证可见性
* 参考文章:
* C语言丨深入理解volatile关键字 - 沐歌爱编程的文章 - 知乎 https://zhuanlan.zhihu.com/p/343688629
* 多线程操作共享变量时,可见性问题:
* 线程A正在用flag = true,进行工作,还没有写入主内存时,线程B修改flag = false,并写入了内容中,
* 这时候就会有线程A工作内存中的flag与主存中值不一致。
* 处理思路:
* 需要线程A知道主内存中的值发生了变化
* 解决办法:
* 对共享变量用 volatile进行修饰,说明这个变量是“易变的”,
* 提醒编译器,编译后的程序每次需要对这个变量操作的时候,不要做编译优化(不要读取工作内存的值),
* 直接从变量内存地址中读取数据,从而解决对主内存共享变量变化时对A线程不可见现象。
*/
public class Demo1 {
// 共享变量
// 当没有volatile修饰时,打印出1,还继续陷入死循环,程序不中断
// 当有volatile修饰时,打印出1,立即中断
//private static int num = 0;
private volatile static int num = 0;
public static void main(String[] args) {
// 线程1 当num=0时,死循环
new Thread(()->{
while (num == 0){
}
},"线程1").start();
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
// main线程 修改num=1
num = 1;
System.out.println(num);
}
}
验证2:不保证原子性
num++; 不是一个原子操作,所以多线程操作共享变量num时,会出现问题。
使用原子包装类解决这个问题
package com.kuang.volatiledemo;
import java.util.concurrent.ForkJoinPool;
/**
* volatile :
* 1.保证可见性
* 2.不保证原子性
* 3.禁止指令重排
*
* 验证2:不保证原子性
* 原子性:不可分割,线程A在执行任务的时候,不能被打扰,也不能被分割,要么同时成功,要么同时失败。
* sychronized、Lock锁可以保证原子性
* 问题:
* 如果不加sychronized、Lock锁,怎么可以保证原子性?
* 使用原子类解决 AtomicInteger num Demo3
*
*/
public class Demo2 {
//private static int num = 0;
private static volatile int num = 0;
// volatile 不保证原子性
public static void add(){
num ++; //不是一个原子性操作
// 底层多步骤操作,多个线程进来时,就会有是否原子性操作问题
// 打开字节码文件,cmd进入,javap -c 分析
// 1. getstatic 获取这个值
// 2. iconst_l 常量i
// 3. i+1
// 4. putstatic 写回这个值
}
// synchronized 保证原子性
public static synchronized void add1(){
num ++;
}
public static void main(String[] args) {
// 创建20个线程,每个线程执行1000次,预期得到结果2w
for (int i = 0; i < 20; i++) {
new Thread(()->{
for (int j = 0; j < 1000; j++) {
add(); // main 19383
//add1(); //main 20000
}
}).start();
}
// 主线程
// 判定执行:如果活跃的线程数>2,主线程和垃圾回收线程礼让,让其余线程先执行
// 默认执行线程:main gc
while (Thread.activeCount() > 2){
Thread.yield();
}
System.out.println(Thread.currentThread().getName() + " " +num);
}
}
package com.kuang.volatiledemo;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 问题:
* 如果不加sychronized、Lock锁,怎么可以保证原子性?
* 使用原子包装类解决 AtomicInteger num Demo3
*/
public class Demo3 {
// volatile 保证可见性,不保证原子性
// AtomicInteger 原子包装类声明变量,保证原子性
private static volatile AtomicInteger num = new AtomicInteger();
public static void add(){
num.incrementAndGet();
}
public static void main(String[] args) {
// 20个线程,每个线程加到1000,预计num=2w
for (int i = 0; i < 20; i++) {
new Thread(()->{
for (int j = 0; j < 1000; j++) {
add();
}
}).start();
}
// 打印出20个线程执行完的num
while (Thread.activeCount() > 2){ // main,gc
Thread.yield();
}
System.out.println("num=" + num);
}
}
验证3:禁止指令重排
内存屏障
内存屏障在单例模式里面使用最多!!!!!!!!
18、单例模式
实现方式:饿汉式、懒汉式、静态内部类实现
饿汉式
DCL饿汉式:一上来就加载对象,分配内存空间。
单例模式:构造器私有
懒汉式
懒汉式:用的时候再分配。
这样写有问题,多线程并发拿到实例,拿到多个实例;
所以加入双重检测锁模式,这样拿到的是一个单例
这样写还可能有个问题,指令重排
双重检测锁+volatile禁止指令重排操作:保证避免指令重排,来确保原子性操作
静态内部类
静态内部类:
饿汉、懒汉、静态内部类都是不安全的!因为有反射~
反射会破坏单例!!!
怎么办呢?
解决办法:
1.构造器锁住,构造的时候判断下,是否为空,如果存在了,就抛出异常
上面这种情况解决了,但是如果通过构造器拿到两个实例,是否是相同的? 不同
说明上面的解决办法,没有处理这种情况
解决办法:
通过一个标志位,默认false,通过反射newInstance后,设置为true,这时候再次通过反射 newInstance时候,抛出异常
这样是不是没有问题了?
不,可以破坏
拿到标志位字段,破坏私有权限,把标志位的值又改为 false, 单例又就被破坏了
那这个如何解决?
源码 xxx.newInstance() 进去
如果类是一个枚举类型,就不能通过反射创建对象!
枚举类
枚举是什么?
为什么枚举能避免单例被破坏?
反射不能破坏枚举,我们来试一下
报错:没有空参构造方法,不是预期的报错,枚举类不能通过反射创建对象
反编译class文件,查看也不是想要的错误
使用专业工具,jad.exe 复制到 EnumSingle.class 同级目录下,生成java文件,打开查看源码
写入参数,抛出想要的异常
饿汉实例:
package com.kuang.single;
/**
* 单例模式:构造器私有 ,只能创建一个任务管理器
* 实现方式:
* 1.饿汉式 一会吃拿过来就加载对象,分配内存空间,但是如果里面声明了固定空间变量,就会造成资源浪费
* 2.懒汉式 用的时候再分配
* 3.静态内部类 static修饰获取实例方法
* 存在问题:
* 饿汉、懒汉、静态内部类都是不安全的!因为有反射~
* 反射会破坏单例!!!
* 解决办法:
* 枚举类
* 1.构造器锁住,构造的时候判断下,是否为空,如果存在了,就抛出异常
* 但是如果通过构造器拿到两个实例,是否是相同的? 不同
* 2.通过一个标志位,默认false,通过反射newInstance后,设置为true,这时候再次通过反射 newInstance时候,抛出异常
* 这样是不是没有问题了?
* 不,可以破坏,拿到标志位字段,破坏私有权限,把标志位的值又改为 false, 单例又就被破坏了
* 那这个如何解决?
* 源码 xxx.newInstance() 进去
* 如果类是一个枚举类型,就不能通过反射创建对象!
* 为什么枚举能避免单例被破坏?
* 3.枚举能避免单例被反射破坏,枚举会抛出异常,不能通过反射创建枚举对象
* 结论:
* 实现单例最保险的方式:枚举类
*/
// 饿汉式单例
public class HungryMan {
// 可能会资源浪费
private byte[] date1 = new byte[1024*1024];
private byte[] date2 = new byte[1024*1024];
private byte[] date3 = new byte[1024*1024];
private byte[] date4 = new byte[1024*1024];
private byte[] date5 = new byte[1024*1024];
private final static HungryMan hungryMan = new HungryMan();
public HungryMan() {
}
public static HungryMan getInstance(){
return hungryMan;
}
}
懒汉实例:
package com.kuang.single;
// 懒汉模式
public class LazyMan {
private static LazyMan LazyMan;
public LazyMan() {
System.out.println(Thread.currentThread().getName() + "ok");
}
public static LazyMan lazyMan;
// 双重检测锁+volatile禁止指令重排操作
public static volatile LazyMan lazyMan1;
public static LazyMan getInstance1(){
if (lazyMan == null){
lazyMan = new LazyMan();
}
return lazyMan;
}
// 双重检测锁模式 懒汉单例模式 DCL懒汉式
// 双层锁模式,实现效果:多线程并发情况下,只有一个线程获取到实例
public static LazyMan getInstance2(){
if (lazyMan == null){
synchronized (LazyMan.class){
if (lazyMan == null){
lazyMan = new LazyMan();
}
}
}
return lazyMan;
}
// 双重检测锁+volatile禁止指令重排操作:保证避免指令重排,来确保原子性操作
public static LazyMan getInstance3(){
if (lazyMan1 == null){
synchronized (LazyMan.class){
if (lazyMan1 == null){
lazyMan1 = new LazyMan();
}
}
}
return lazyMan1;
}
// 多线程并发测试
public static void main(String[] args) {
// 多个线程获取实例,判断获取到的实例是否同一个
for (int i = 0; i < 1000; i++) {
new Thread(()->{
LazyMan.getInstance3();
}).start();
}
}
/*
Thread-0ok 打印输出了多个线程,构造了多个实例,所以加双层检测模式
Thread-4ok
Thread-3ok
Thread-6ok
Thread-1ok
*/
/*
Thread-0ok 双层检测模式,实现效果:多线程并发情况下,只有一个线程获取到实例
但是这样可能会有指令重排问题,因为 new LazyMan() 不是一个原子操作
里面操作多个步骤:
1.分配内存空间
2.执行构造器房费,初始化对象
3.把这个对象指向这个空间
双重检测锁+volatile禁止指令重排操作:保证避免指令重排,来确保原子性操作
*/
/*
Thread-0ok 双重检测锁+volatile禁止指令重排操作 最终实现了单例模式
*/
}
静态内部类实例:
package com.kuang.single;
// 静态内部类
public class Holder {
private Holder(){}
public static Holder getInstance(){
return InnerClass.HOLDER;
}
public static class InnerClass{
private static final Holder HOLDER = new Holder();
}
}
懒汉模式实现单例,反射不安全测试:
package com.kuang.single;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
// 懒汉模式实现单例,反射不安全测试
public class LazyManReflectTest {
//public LazyManReflectTest() {
// System.out.println(Thread.currentThread().getName() + "ok");
//}
// 构造器加锁:这样多线程通过类获取实例时,会先判断下,如果实例已存在,就抛出异常,从而实现单例
//public LazyManReflectTest() {
// synchronized (LazyManReflectTest.class){
// if (lazyMan != null){
// throw new RuntimeException("不要试图使用反射破坏单例");
// }
// }
//}
// 标志位
private static boolean song = false;
// 构造器加锁+标志位 避免通过构造器创建2个实例时,获取到不同实例,实现单例模式
public LazyManReflectTest() {
synchronized (LazyManReflectTest.class){
if (!song){
song = true;
}else {
throw new RuntimeException("不要试图使用反射破坏单例");
}
}
}
// 双重检测锁模式:禁止指令重排,确保原子性操作
private volatile static LazyManReflectTest lazyMan;
public static LazyManReflectTest getInstance(){
if (lazyMan == null){
synchronized (LazyMan.class){
if (lazyMan == null){
lazyMan = new LazyManReflectTest();
}
}
}
return lazyMan;
}
// 反射破坏单例测试
public static void main(String[] args) throws NoSuchMethodException, IllegalAccessException, InvocationTargetException, InstantiationException, NoSuchFieldException {
// 通过反射获取1个实例、通过构造器获取1个实例,判断是否单例
//LazyManReflectTest instance1 = LazyManReflectTest.getInstance();
//Constructor<LazyManReflectTest> declaredConstructor = LazyManReflectTest.class.getDeclaredConstructor(null);
//declaredConstructor.setAccessible(true);
//LazyManReflectTest instance2 = declaredConstructor.newInstance();
//LazyManReflectTest instance3 = declaredConstructor.newInstance(); // 通过构造器获取2个实例,判断是否单例
// 通过反射获取到的实例和构造器获取到实例不是一个,获取到2个实例,破坏了单例模式
// 解决办法:1.构造器锁住,构造的时候判断下,是否为空,如果存在了,就抛出异常 reflect.InvocationTargetException 不要试图使用反射破坏单例
// 通过构造器获取2个实例,单例被破坏
// 解决办法:通过一个标志位,默认false,通过反射newInstance后,设置为true,这时候再次通过反射 newInstance时候,抛出异常
//System.out.println(instance1); //com.kuang.single.LazyManReflectTest@7f31245a
//System.out.println(instance2); //com.kuang.single.LazyManReflectTest@6d6f6e28
//System.out.println(instance3); //com.kuang.single.LazyManReflectTest@135fbaa4 单例被破坏
// 拿到标志位字段,破坏私有权限,把标志位的值又改为 false, 单例又就被破坏了
Field song = LazyManReflectTest.class.getDeclaredField("song");
song.setAccessible(true); //设置属性为可访问
Constructor<LazyManReflectTest> constructor = LazyManReflectTest.class.getDeclaredConstructor();
constructor.setAccessible(true); //设置构造器为可访问
LazyManReflectTest newInstance1 = constructor.newInstance();
song.set(newInstance1,false); //将标志位设为未生成实例时的false
LazyManReflectTest newInstance2 = constructor.newInstance();
System.out.println(newInstance1); //com.kuang.single.LazyManReflectTest@6d6f6e28
System.out.println(newInstance2); //com.kuang.single.LazyManReflectTest@135fbaa4 单例被破坏
// 那这个如何解决? 源码 xxx.newInstance() 进去,如果类是一个枚举类型,就不能通过反射创建对象!
}
}
枚举类实例:
package com.kuang.single;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
// 枚举避免单例被反射破坏 enum本身也是一个Class类
public enum EnumSingle {
// 实例
INSTANCE;
// 获取实例方法
public static EnumSingle getInstance(){
return INSTANCE;
}
// 枚举私有构造器
private EnumSingle(){}
}
class Test{
public static void main(String[] args) throws NoSuchMethodException, IllegalAccessException, InvocationTargetException, InstantiationException {
//EnumSingle instance1 = EnumSingle.INSTANCE;
//EnumSingle instance2 = EnumSingle.INSTANCE;
//EnumSingle instance3 = EnumSingle.getInstance();
//System.out.println(instance1);//INSTANCE
//System.out.println(instance2);//INSTANCE
//System.out.println(instance3);//INSTANCE
// 测试反射是否能破坏枚举单例,预期:如果通过反射创建多个实例,会抛出异常
EnumSingle instance1 = EnumSingle.INSTANCE;
//Constructor<EnumSingle> declaredConstructor = EnumSingle.class.getDeclaredConstructor(); //报错:EnumSingle.<init>() NoSuchMethodException,不能抛出想要的异常
Constructor<EnumSingle> declaredConstructor = EnumSingle.class.getDeclaredConstructor(String.class, int.class); //IllegalArgumentException: Cannot reflectively create enum objects
declaredConstructor.setAccessible(true); // 设置为可访问
EnumSingle instance2 = declaredConstructor.newInstance();
System.out.println(instance1);
System.out.println(instance2); //不能反射地创建枚举对象
}
}
19、CAS
cas:
原子类自增操作:点进去
unsafe类
CSA的ABA问题
模拟ABA出现场景:
如何解决?
原子引用,带个版本号
这里有个坑:Integer
cas测试实例:
package com.kuang.cas;
import java.util.concurrent.atomic.AtomicInteger;
/**
* CAS是什么?
* compareAndSet(int expect, int update):比较并交换
* 入参:期望,更新;
* 如果我期望的结果达到预期,就去更新,如果没有达到预期,就不更新
* 因为 num++ 操作在底层不是原子操作,所以多线程并发会有问题,所以使用原子包装类
* 但是原子包装类CAS有个问题(ABA问题)
* 就是主内存:num=0,线程A,修改num=1,然后又修改回去0,这时候操作对线程B来说,线程B拿到的是0,B是不知道A改过
* 如何解决CAS的ABA问题?
* ABA问题:有线程修改了主内存,又修改回去,其他线程感知不到
* 使用原子引用,带版本号
* CAS缺点?
* CAS比较当前工作内存中的值和主内存中的值,如果这个值和期望值相同,就执行更新操作,否则就一直循环!
* 1.循环会耗时
* 2.一次性只能保证一个共享变量的原子性
* 3.ABA问题
*
*/
// CAS简单实例:比较并交换
public class Test1 {
public static void main(String[] args) {
// 创建一个原子包装类,设置初识值2020
AtomicInteger atomicInteger = new AtomicInteger(2020);
// public final boolean compareAndSet(int expect, int update)
// atomicInteger进行判断,如果期望值达到2020,就更新为2021
System.out.println(atomicInteger.compareAndSet(2020, 2021));
System.out.println(atomicInteger.get()); // 获取原子包装类的值
System.out.println(atomicInteger.compareAndSet(2020, 2021));
System.out.println(atomicInteger.get());
/*
true
2021
false
2021
*/
}
}
package com.kuang.cas;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
// 模拟CAS出现的ABA问题
public class Test2 {
public static void main(String[] args) throws InterruptedException {
AtomicInteger atomicInteger = new AtomicInteger(1);
// 线程A:捣乱的线程,修改下,又修改回去
new Thread(()->{
// 如果原子Integer类值和预期值一样,就更新
System.out.println(atomicInteger.compareAndSet(1, 2));
System.out.println(atomicInteger.get());
System.out.println(atomicInteger.compareAndSet(2, 1));
System.out.println(atomicInteger.get());
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
// 线程B
new Thread(()->{
System.out.println(atomicInteger.compareAndSet(1, 6));
System.out.println(atomicInteger.get());
}).start();
/*
true
2
true
6
停顿5s后
true
6
*/
}
}
package com.kuang.cas;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicStampedReference;
// 如何解决CAS的ABA问题:原子戳引用,带个版本号
// 注意包装类的坑:
// 包装类数据范围导致获取到的值不是预期内存中的值
// Integer使用了对象缓存机制,默认范围-128-127,超过区间外数据会在堆上,不会复用已有对象
public class Test3 {
// 初始化原子戳引用 初识引用:2022, 初识版本:1
static AtomicStampedReference<Integer> integerAtomicStampedReference = new AtomicStampedReference<>(1, 1);
public static void main(String[] args) {
// 模拟两个线程并发执行,获取版本号,只要有线程对值进行修改,就修改版本号,避免ABA问题
new Thread(()->{
int stamp = integerAtomicStampedReference.getStamp(); // 先获取版本号,睡5秒
System.out.println("a1=>" + stamp);
try {
TimeUnit.SECONDS.sleep(2); // 等2s
} catch (InterruptedException e) {
e.printStackTrace();
}
// 线程a要把1改成6
System.out.println(Thread.currentThread().getName()+"-1=>6->"+integerAtomicStampedReference.compareAndSet(
1,
6,
integerAtomicStampedReference.getStamp(),
integerAtomicStampedReference.getStamp() + 1));//获取是否达到期望修改
System.out.println("a2=>" + integerAtomicStampedReference.getStamp());
// 再把6改成1
System.out.println(Thread.currentThread().getName()+"-6=>1->"+integerAtomicStampedReference.compareAndSet(
6,
1,
integerAtomicStampedReference.getStamp(),
integerAtomicStampedReference.getStamp() + 1));
System.out.println("a3=>" + integerAtomicStampedReference.getStamp());
},"a").start();
new Thread(()->{
int stamp = integerAtomicStampedReference.getStamp(); // 先获取版本号,睡5秒
System.out.println("b1=>" + stamp);
try {
TimeUnit.SECONDS.sleep(5);// 等5s
} catch (InterruptedException e) {
e.printStackTrace();
}
// 好人线程要把1改成6,改不了,因为预期版本可能不是1,是已经被A改过的2
System.out.println(Thread.currentThread().getName()+"-1=>6-"+integerAtomicStampedReference.compareAndSet(
1,
6,
stamp,
stamp + 1));
System.out.println("b2=>" + integerAtomicStampedReference.getStamp());
},"b").start();
/*
a1=>1
b1=>1
a-1=>6->true
a2=>2
a-6=>1->true
a3=>3
b-1=>6-false b线程等5s,所以a会先修改完,这时候,版本为3,b修改时,与预期版本1不一致,就不修改
b2=>3
*/
}
}
20、各种锁的理解
公平锁、非公平锁
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-0P20M1kB-1689140798596)(C:\Users\admin\AppData\Roaming\Typora\typora-user-images\image-20230712090320300.png)]
可重入锁
测试传统版Synchronized锁:
测试结果:A拿到外面的锁,执行sms后,不会释放锁,再拿到call里面的锁,知道执行完后一起释放锁,B再拿到锁
测试可重复锁(lock版锁):lock锁必须配对
自旋锁
不断尝试获取锁,直到成功为止
实现自旋锁:实现方式CAS
测试自旋锁:线程1、线程2,
线程1加锁后,线程2想加锁,得等T1解锁后,T2才能加锁,T2等T1解锁的时候,T2在自旋等待T1释放锁
死锁
死锁测试:两个线程互相争抢资源
死锁时,程序卡住,怎么排查哪里死锁?
java bin 目录下,
-
使用 jps -l定位进程号 查看当前或者的java进程
-
使用 jstack 进程号,查看信息,找到死锁
找到一个死锁:
实例代码
synchronized可重入锁测试:
package com.kuang.lock;
import java.util.concurrent.Callable;
/**
* 1.公平锁、非公平锁:
* 公平锁:非常公平,不能插队,必须先来后到!
* 非公平锁:非常不公平,可以插队(默认都是非公平锁) public ReentrantLock(){sync = new NonfairSync();}
* 2.可重入锁
* 拿到了外面的锁之后,就可以拿到里面的锁
* 测试传统版Synchronized锁
* 测试结果:A拿到外面的锁,执行sms后,不会释放锁,再拿到call里面的锁,知道执行完后一起释放锁,B再拿到锁
* 测试可重复锁(lock版锁):lock锁必须配对
* 3.自旋锁
* 不断尝试获取锁,直到成功为止
* 实现自旋锁:实现方式CAS
* 测试自旋锁:线程1、线程2,
* 线程1加锁后,线程2想加锁,得等T1解锁后,T2才能加锁,T2等T1解锁的时候,T2在自旋等待T1释放锁
* 4.死锁
* 死锁测试:两个线程互相争抢资源
* 死锁时,程序卡住,怎么排查哪里死锁?java bin 目录下,
* 1)使用 jps -l定位进程号 查看当前或者的java进程
* 2)使用 jstack 进程号,查看死锁信息,找到死锁
*/
// 测试可重入锁(传统版Synchronized版)
// sms一层锁,call一层锁,sms调用call,这时候,两个线程A、B同时sms,
// 假设两个锁不一样,A线程先获取sms的锁,再获取里面call的锁,B线程想获取sms的锁,需要等A释放sms的锁,那A什么时候释放?
// 是在获取call的锁之前,还是释放call锁之后?
// 如果A在获取call之前释放sms的锁,B就可以获取,这时候打印顺序应该是 A:sms, B:sms, A:call, B:call
// 如果A在释放call锁之后释放sms的锁,B就可以获取,这时候打印顺序应该是 A:sms, A:call,B:sms, B:call (√)
// 结论:sms锁包裹call锁,拿到sms锁,就拿到call锁(可重入锁)
public class Test1 {
public static void main(String[] args) {
Phone phone = new Phone();
new Thread(phone::sms,"A").start();
new Thread(phone::sms,"B").start();
/*
Asms
Acall
Bsms
Bcall
*/
}
}
class Phone{
public synchronized void sms(){ // 大锁包小锁,大肠包小肠, 获取大锁,就自动获取小锁,其他线程需要等当前线程释放大锁、小锁后才能获取锁执行
System.out.println(Thread.currentThread().getName()+"sms");
call();
}
public synchronized void call(){
System.out.println(Thread.currentThread().getName()+"call");
}
}
lock可重入锁测试:
package com.kuang.lock;
import java.util.concurrent.locks.ReentrantLock;
//测试可重复锁(lock版锁):lock锁必须配对,否则会死在里面
public class Test2 {
public static void main(String[] args) {
Phone1 phone = new Phone1();
new Thread(phone::sms,"A").start();
new Thread(phone::sms,"B").start();
/*
Asms 结果:A获取sms锁,再获取call锁,然后释放call锁,再释放sms锁,然后B开始获取
Acall
Bsms
Bcall
*/
}
}
class Phone1{
ReentrantLock lock = new ReentrantLock();
public void sms(){
lock.lock();
lock.lock();
try {
System.out.println(Thread.currentThread().getName()+"sms");
call();//call lock锁
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
lock.unlock();
}
}
public void call(){
lock.lock();
try {
System.out.println(Thread.currentThread().getName()+"call");
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
}
自旋锁:
package com.kuang.lock;
import jdk.internal.org.objectweb.asm.tree.TryCatchBlockNode;
import sun.awt.geom.AreaOp;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
// 测试自旋锁
// 不断尝试获取锁加锁,直到成功为止
// 实现自旋锁:实现方式CAS 比较并交换,达到预期时更新,否则死循环
// 测试自旋锁:线程1、线程2,
// 线程1加锁后,线程2想加锁,得等T1解锁后,T2才能加锁,T2等T1解锁的时候,T2在自旋等待T1释放锁
public class MySpinLock {
// 加锁对象是一个个线程,所以泛型里面是<Thread>
AtomicReference<Thread> atomicReference = new AtomicReference(); // 原子引用:避免多线程并发指令重排
// 本质:控制一次原子引用里面有一个线程
// 加锁
public void myLock(){
// 获取当前线程,如果原子引用里面是Null,就把当前线程放入,如果有线程,进入死循环,一直等待,直到其他线程释放原子引用
Thread currentThread = Thread.currentThread();
System.out.println(Thread.currentThread().getName() + "==> mylock");
// 自旋锁
while (!atomicReference.compareAndSet(null,currentThread)){
//System.out.println(Thread.currentThread().getName() + "==> 在自旋等待锁被释放");
// 返回false,代表没有达到预期更新,就说明原子引用不为null,陷入死循环,相当于加锁
}
}
// 释放锁
public void myUnLock(){
// 如果原子引用里面线程是当前线程,就释放原子引用为null,结束死循环,相当于解锁
Thread currentThread = Thread.currentThread();
System.out.println(Thread.currentThread().getName() + "==> myUnlock");
atomicReference.compareAndSet(currentThread,null);
}
}
// 测试自己写的自旋锁,测试并发
class TestMySpinLock{
public static void main(String[] args) {
MySpinLock mySpinLock = new MySpinLock();
new Thread(()->{
mySpinLock.myLock();
try {
// 等待,业务,通知
TimeUnit.SECONDS.sleep(3);
} catch (Exception e){
e.printStackTrace();
}finally {
mySpinLock.myUnLock();
}
},"A").start();
new Thread(()->{
mySpinLock.myLock();
try {
// 等待,业务,通知
TimeUnit.SECONDS.sleep(1);
} catch (Exception e){
e.printStackTrace();
}finally {
mySpinLock.myUnLock();
}
},"B").start();
}
/*
A==> mylock
B==> mylock A先获取锁,B自旋等待,A释放锁,B获取锁,再释放锁
A==> myUnlock
B==> myUnlock
*/
}
模拟死锁:
package com.kuang.lock;
import java.util.concurrent.TimeUnit;
// 模拟死锁
// 线程1先锁资源A,再锁资源B,线程2先锁资源B,再锁资源A
// 模拟:线程1先获取资源A并锁住、线程2先获取资源B并锁住,
// 这时候,
// 线程1等线程2释放资源B才能继续执行释放资源A,供线程2获取
// 线程2等线程1释放资源A才能继续执行释放资源B,供线程1获取
// 两个线程都在等,但是都获取不到,就发生了死锁
/*
使用 jps -l定位进程号 查看当前或者的java进程
使用 jstack 进程号,查看信息,找到死锁
"T2":
at com.kuang.lock.MyThread.run(deadLock.java:54)
- waiting to lock <0x00000000d6c06348> (a java.lang.String)
- locked <0x00000000d6c06378> (a java.lang.String)
at java.lang.Thread.run(Thread.java:748)
"T1":
at com.kuang.lock.MyThread.run(deadLock.java:54)
- waiting to lock <0x00000000d6c06378> (a java.lang.String)
- locked <0x00000000d6c06348> (a java.lang.String)
at java.lang.Thread.run(Thread.java:748)
*/
public class deadLock {
public static void main(String[] args) {
String A = "A";
String B = "B";
new Thread(new MyThread(A,B),"T1").start();
new Thread(new MyThread(B,A),"T2").start();
}
/*
T1lock:A=>to getB 线程都在死等对方释放锁
T2lock:B=>to getA
*/
}
class MyThread implements Runnable{
private final String lockA;
private final String lockB;
public MyThread(String lockA, String lockB) {
this.lockA = lockA;
this.lockB = lockB;
}
@Override
public void run() {
synchronized (lockA){
System.out.println(Thread.currentThread().getName()+"lock:"+lockA+"=>to get"+ lockB);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (lockB){
System.out.println(Thread.currentThread().getName()+"lock:"+lockB+"=> to get"+lockA);
}
}
}
}