一、线程的概述
1.1 线程的相关概念
1.1.1 进程(Process)
进程(Process)是计算机的程序关于某数据集合上的一次运行活动,是操作系统进行资源分配与调度的基本单位。
可以把进程简单的理解为操作系统中正在有运行的一个程序。
1.1.2 线程
线程(thread)是进程的一个执行单元。
一个线程就是进程中一个单一顺序的控制流,进程的一个执行分支。
进程是线程的容器,一个进程至少有一个线程。一个进程中也可以有多个线程。
在操作系统中是以进程为分配资源,如虚拟存储空间,文件描述符等,每个线程都有各自的线程栈。都有自己的寄存器环境,都有自己的线程本地存储。
1.1.3主线程与子线程
JVM 启动时会创建一个主线程,该主线程负责执行main方法,主线程就是运行main方法的线程。
java中的线程不是孤立的,线程之间也存在一些联系,如果在A线程中创建了B线程,称B线程为A线程的子线程,相应的A线程就是B线程的父线程。
1.1.4 串行,并发与并行
假设有三个任务:
任务A准备5分钟,等待10分钟。
任务B准备2分组,等待8分组。
任务C准备10分钟;
并发可以提高以事物的处理效率,即一段事件内可以处理或者完成更多的事情。
并行是一种更为严格,理想的并发。
从硬件角度来说,如果单核CPU,一个处理器一次只能执行一个线程的情况下,处理器可以使用时间片轮转技术,可以让CPU 快速的在各个线程之间进行切换,对于用户来说,感觉是三个线程在同时执行,如果是多核CPU,可以为不同的线程分配不同的CPU内核。
1.2 线程的创建与启用
在Java中,创建一个线程,就是创建一个Thread类(子类)的对象(实例)。Thread类有两个常用的构造方法:Thread()
与Thread(Runnable)
对应的创先线程的两种方式:
定义Thread类的子类
定义一个Runnable接口的实现类
这两种创建线程的方式没有本质的区别
1.2.1 定义Thread类的子类
package com.company.createthread.p1;
/**
* 1) 定义类继承Thread
*/
public class MyThread extends Thread{
// 2) 重写Thread 父类中的run()
// run() 方法体中的代码就是子线程要执行的任务。
@Override
public void run() {
System.out.println("这是子线程打印的内容");
}
}
package com.company;
import com.company.createthread.p1.MyThread;
public class Main {
public static void main(String[] args) {
System.out.println("JVM启动main线程,main线程执行main方法");
// 3) 创建子线程对象
MyThread thread = new MyThread();
//4 启动线程
thread.start();
/**
* 调用线程的start方法来启动线程,启动线程的实质就是请求JVM 运行相应的线程,这个线程具体在什么时候由线程调度器(Scheduler)决定。
* 注意:
* start() 方法调用结束并不意味着子线程开始运行,
* 新开启的线程会执行run()方法。
* 如果开启了多个线程,start()调用的顺序并不一定就是线程启动的线程。
* 多线程运行结果与代码执行顺序或调用顺序无关。
*/
System.out.println("main 线程后面其他的代码。。。");
}
}
运行结果
1.2.2 多线程运行结果是随机的
package com.company.createthread.p2;
public class MyThread2 extends Thread{
@Override
public void run() {
try {
for (int i = 1; i <=10 ; i++) {
System.out.println("sub thread:" + i);
int time = (int) (Math.random() * 1000);
Thread.sleep(time); // 线程睡眠,单位是毫秒,1秒=1000ms
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
package com.company.createthread.p2;
public class Test2 {
public static void main(String[] args) {
MyThread2 thread2 = new MyThread2();
thread2.start(); // 开启子线程
// 当前是main 线程
try {
for (int i = 1; i <=10 ; i++) {
System.out.println("main:" + i);
int time = (int) (Math.random() * 1000);
Thread.sleep(time); // 线程睡眠,单位是毫秒,1秒=1000ms
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
运行结果:
1.2.3 实现Runnable接口的形式创建线程
/**
* 当线程类已经有父类了,就不能用继承Thread 类的形式创建线程,可以使用Runnable 接口的形式
* 1) 定义类实现Runnable 接口
*/
public class MyRunnable implements Runnable{
//2) 重写Runnable 接口中的抽象方法run(), run 方法就是子线程要执行的代码
@Override
public void run() {
for (int i = 1; i <= 1000 ; i++) {
System.out.println("sub Thread-->" + i);
}
}
}
package com.company.createthread.p3;
/**
* 测试实现runnable接口的形式创建线程
*/
public class Test {
public static void main(String[] args) {
// 3) 创建Runnable 接口的实现类对象
MyRunnable runnable = new MyRunnable();
// 4) 创建线程对象
Thread thread = new Thread(runnable);
// 5) 开启线程
thread.start();
for (int i = 1; i <= 1000 ; i++) {
System.out.println("main-->" + i);
}
// 有时调用Thread(Runnable)构造方法时,实参也会传递匿名内部类对象。
Thread thread1 = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 1; i <= 1000; i++) {
System.out.println("sub---------------->" + i);
}
}
});
thread1.start();
}
}
运行结果:
1.3 线程的常用方法
1.3.1 currentThread 方法
Thread.currentThread() 方法可以获得当前线程。
java中的任何一段代码都是执行在某个线程当中的,执行当前代码的线程就是当前线程。
同一段代码可能被不同的线程执行,因此当前线程是相对的Thread.currentThread()方法的返回值是在
代码实际运行时候的次线程对象
/**
* 定义线程类:
* 分别在构造方法中和run方法中打印当前线程。
*/
public class SubThread1 extends Thread{
public SubThread1() {
System.out.println("构造方法打印当前线程名称:" +Thread.currentThread().getName());
}
@Override
public void run() {
System.out.println("run方法打印:" +Thread.currentThread().getName());
}
}
public class Test01CurrentThread {
public static void main(String[] args) {
System.out.println("Main 方法中打印当前线程:" + Thread.currentThread().getName());
// 创建子线程 调用SubThread1() 构造方法,在main 线程中调用构造方法,所以构造方法中的当前线程就是mian线程
SubThread1 subThread1 = new SubThread1();
subThread1.start(); // 启动子线程,子线程会调用run 方法,所以在run方法中的当前线程就是Thread-0
// subThread1.run(); // 在main方法中直接调用run方法,没有开启新的线程,所以 在run方法中的当前线程是main线程
}
}
测试结果:
currentThread 复杂案例
public class SubThread2 extends Thread{
public SubThread2(){
System.out.println("构造方法中,Thread.currentThread().getName():" +Thread.currentThread().getName());
System.out.println("构造方法中,this.getName():" + this.getName());
}
@Override
public void run() {
System.out.println("run方法中,Thread.currentThread().getName():" +Thread.currentThread().getName());
System.out.println("run 方法中,this.getName():" + this.getName());
}
}
public class Test02CurrentThread {
public static void main(String[] args) throws InterruptedException {
// 创建子线程对象
SubThread2 thread2 = new SubThread2();
thread2.setName("t2"); // 设置线程的名称
thread2.start();
Thread.sleep(5000); // main 线程睡眠500毫秒
// Thread(Runnable)构造方法形参是Runnable接口,调用时传递的实参是接口的实现类对象
Thread t3 = new Thread(thread2);
t3.start();
}
}
运行结果
1.3.2 setName()/getName()
thread.setName(线程名称) // 设置线程名称
thread.getName() 返回线程名称
通过设置线程名称,有助于程序调试,提高程序的可读性,建议为每个线程都设置一个能够体现线程功能的名称
1.3.3 isAlive()
thread.isAlive() 判断当前线程是否处于活动状态
活动状态就是线程已启动并且尚未终止
public class SubThread3 extends Thread{
@Override
public void run() {
System.out.println("run 方法,isAlive=" + this.isAlive()); // true ,运行状态
}
}
/**
* 测试线程的活动状态
*/
public class Test {
public static void main(String[] args) {
SubThread3 thread3 = new SubThread3();
System.out.println("begin--" + thread3.isAlive()); // false ,在启动线程之前
thread3.start();
System.out.println("end--" +thread3.isAlive()); // 结果不一定,打印这一行时,如过thread3线程还没结束就返回true,如果t3线程已结束,返回false
}
}
运行结果
1.3.4 sleep()
Thread.sleep(millis);让当前线程休眠指定的毫秒数,
当前线程是指Thread.currentThread()返回的线程
/**
* 子线程休眠
*/
public class SubThread4 extends Thread{
@Override
public void run() {
try {
System.out.println("run, threadname=" +Thread.currentThread().getName() +", begin=" + System.currentTimeMillis());
Thread.sleep(2000); // 当前线程睡眠2000毫秒
System.out.println("run, threadname=" +Thread.currentThread().getName() +", end=" + System.currentTimeMillis());
} catch (InterruptedException e) {
// 在子线程的run方法中,如果有受检异常(编译时异常)需要处理,只有选择捕获处理,不能抛出异常
e.printStackTrace();
}
}
}
public class Test {
public static void main(String[] args) {
SubThread4 subThread4 = new SubThread4();
System.out.println("main_begin->" +System.currentTimeMillis());
subThread4.start(); //开启新的线程
// subThread4.run(); // 在main 线程中调用示例方法run() 没有开启新的线程。
System.out.println("main_end->" +System.currentTimeMillis());
}
}
运行结果
public class Test {
public static void main(String[] args) {
SubThread4 subThread4 = new SubThread4();
System.out.println("main_begin->" +System.currentTimeMillis());
// subThread4.start(); //开启新的线程
subThread4.run(); // 在main 线程中调用示例方法run() 没有开启新的线程。
System.out.println("main_end->" +System.currentTimeMillis());
}
}
运行结果
1.3.4.1 使用sleep 实现倒计时器
/**
* 使用线程休眠Thread.sleep 完成一个简单的计时器
*/
public class SimpleTimer {
public static void main(String[] args) {
int remaining = 60; // 从60秒开启计时
// 读取main方法的参数
if(args.length == 1) {
remaining = Integer.parseInt(args[0]);
}
while (true){
System.out.println("Remaining:" +remaining);
remaining--;
if(remaining <0){
break;
}
try {
Thread.sleep(1000); // 线程休眠
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
main方法的args 如何传参
运行结果
1.3.5 getId()
thread.getId() 可以获得线程的唯一标识
注意:某个编号的线程运行结束后·,该编号可能被后续创建的线程使用。
重启JVM 后,同一个线程的编号可能不一样,
public class SubThread5 extends Thread{
@Override
public void run() {
System.out.println("thread name =" + Thread.currentThread().getName()
+ ", id = " +this.getId());
}
}
public class Test {
public static void main(String[] args) throws InterruptedException {
System.out.println(Thread.currentThread().getName() +", id=" + Thread.currentThread().getId());
for (int i = 0; i < 20 ; i++) {
new SubThread5().start();
Thread.sleep(100);
}
}
}
1.3.6 yield()
Thread.yield()方法的作用是放弃当前CPU 资源
public class SubThread6 extends Thread {
@Override
public void run() {
long begin = System.currentTimeMillis();
long sum = 0;
for (int i = 0; i < 1000000 ; i++) {
sum += i;
Thread.yield(); // 线程让步,放弃CPU 执行权
}
long end = System.currentTimeMillis();
System.out.println("用时:" + (end-begin));
}
}
public class Test {
public static void main(String[] args) {
// 开启子线程,计算累加和
new SubThread6().start();
// 在main线程中计算累加和。
long begin = System.currentTimeMillis();
long sum = 0;
for (int i = 0; i < 1000000 ; i++) {
sum += i;
}
long end = System.currentTimeMillis();
System.out.println("main用时:" + (end-begin));
}
}
运行结果:
1.3.7 setPriority()
thread.setPriority(num); 设置线程的优先级
java线程的优先级取值范围是1~10,如果超出这个范围会抛出异常illegalArgumentExcetion。
在操作系统中,优先级较高的线程获得CPU的资源越多,
线程的优先级本质上只是给线程调度器一个提示信息,以便于调度器决定调度哪些线程,注意不能保证优先级高的线程先运行,
java 优先级设置不当或者滥用可能会导致某些线程永远无法得到运行,即产生了线程饥饿。
线程的优先级并不是设置的越高越好,一般情况下使用普通的优先级即可,`即在开发时不必设置线程的优先级`
线程的优先级具有继承性,在A线程中创建了B线程,则B线程的优先级与A线程是一样的,
public class ThreadA extends Thread{
@Override
public void run() {
long begin = System.currentTimeMillis();
long sum = 0;
for (long i = 0; i <= 10000000000L ; i++) {
sum +=i;
}
long end = System.currentTimeMillis();
System.out.println("threadA " +(end-begin));
}
}
public class ThreadB extends Thread{
@Override
public void run() {
long begin = System.currentTimeMillis();
long sum = 0;
for (long i = 0; i <= 10000000000L ; i++) {
sum +=i;
}
long end = System.currentTimeMillis();
System.out.println("threadB " +(end-begin));
}
}
public class Test {
public static void main(String[] args) {
ThreadA threadA = new ThreadA();
threadA.setPriority(1);
threadA.start();
ThreadB threadB = new ThreadB();
threadB.setPriority(10);
threadB.start();
}
}
运行结果:
1.3.8 interrupt()
中断线程
注意调用interupt() 方法仅仅是在当前线程打一个停止标志,并不是真正的停止线程。
public class SubThread2 extends Thread{
@Override
public void run() {
for (int i = 0; i <= 10000 ; i++) {
// 判断线程的中断标志,线程有isInterrupted()方法。该方法返回线程的中断标志
if(this.isInterrupted()){
System.out.println("当前线程的中断标志为true,我要退出了");
// break; // 中断循环,run() 方法体执行完毕,子线程运行完毕。
return;// 直接结束当前run() 方法的执行。
}
System.out.println("sub thread-->" + i);
}
}
}
public class Test02 {
public static void main(String[] args) {
SubThread2 subThread = new SubThread2();
subThread.start(); // 开启子线程
// 当前线程是main线程
for (int i = 0; i <=100 ; i++) {
System.out.println("main-->" +i);
}
// 中断子线程
subThread.interrupt(); // 仅仅是给子线程标记中断,子线程没有症状的中断
}
}
运行结果:
1.3.9 setDaemon()
java 中的线程分为用户线程与守护线程。
守护线程是为其他线程提供服务的线程,如垃圾回收器(GC)就是一个典型的守护线程。
守护线程不能单独运行,当JVM中没有其他用户线程,只有守护线程时,守护线程会自动销毁,JVM 会退出。
public class SumDeamonThread extends Thread {
@Override
public void run() {
while (true) {
// 这里是个死循环,如果线程没有设置为守护线程setDaemon,那么当main线程结束了,这里依旧接着打印
// 如果设置了守护线程,则在main线程结束了,子线程run也会跟着结束,不在打印
System.out.println("sub thread....");
}
}
}
/**
* 设置线程为守护线程
*/
public class Test {
public static void main(String[] args) {
SumDeamonThread sumDeamonThread = new SumDeamonThread();
//设置线程为守护线程
sumDeamonThread.setDaemon(true); // 设置守护线程的代码应该在线程启动前
sumDeamonThread.start();
// 当前线程为main线程
for (int i = 0; i <10 ; i++) {
System.out.println("main ==" + i);
}
// 当main 线程结束,守护线程thread也销毁了
}
}
运行结果
1.4 线程的生命周期
线程的生命周期是线程对象的生老病死,即线程的状态。
线程生命周期可以通过getState()方法获得
,线程的状态是Thread.state枚举类型定义的,
由一下几种:
NEW
: 新建状态,创建了线程对象,在调用start()启动之前的状态;
RUNNABLE
:可运行状态,它是一个复合状态,包含:READY 和RUNNING 两个状态,READY状态改线程可以被线程调度器进行调度使他处于RUNNING状态。RUNNING状态表示改线程正在执行。
Thread.yield ()方法可以把线程由RUNNING 状态转换为READY状态。
BLOCKED
阻塞状态,线程发起阻塞的I/O操作,或者申请由其他线程占用的独占资源,线程会转换为BLOCKED阻塞状态,处于阻塞状态的线程不会占用CPU资源,当阻塞I/O操作执行完,或者线程获得了其神奇的资源,,线程可以转换为RUNNABLE。
WAITING
:等待状态,线程执行了object.wait(),thread.join()方法会把线程转换为WAITING等待状态,执行object.notify()方法,或者加入的线程执行完毕,当前线程会转换为RUNNABLE状态。
TIMED_WAITING
状态,与WAITING 状态类似,都是等待状态,区别在于处于改状态的线程不会无线的等待,如果线程没有在指定的时间范围内完成期望的操作,该线程会自动转换为RUNNABLE。
TERMINATED
终止状态,线程结束处于终止状态
1.5 多线程编程的优势与存储的风险
多线程编程具有以下优势:
1)提高系统的吞吐率(Throughout)。多线程编程可以使一个进程有多个并发(concurrent,即同时进行的)的操作。
2) 提高响应性(Responsiveness)。Web 服务器会采用一些专门的线程负责用户的请求处理,缩短了用户的等待时间。
3)充分利用多核(Multicore)处理器资源,通过多线程可以充分的利用CPU资源
多线程编程存在的问题与风险:
1) 线程安全(Thread safe)问题:多线程共享数据时,如果没有采用正确的并发访问控制措施,就可能会产生数据一致性问题,如读取脏数据(过期的数据),如丢失数据更新。
2)线程活性(thread liveness)问题。由于程序自身的缺陷或者由资源稀缺性导致线程一直处于非RUNNABLE状态,这就是线程活性问题,常见的活性故障有以下几种
- 死锁(Deadlock)类似于鹬蚌相争。
- 锁死(Lockout)类似于睡美人,故事中王子挂了。
- 活锁(Livelock)类似于小猫咬自己尾巴
- 饥饿(Starvation)类似于健壮的雏鸟总是从母鸟嘴里抢到实物。
3)上下文切换(Context Switch)处理器从执行一个线程切换到执行另外一个线程
4)可靠性。可能会由一个线程导致JVM意外终止,其他的线程也无法执行。
二、线程安全问题
非线程安全主要是指多个线程对同一个对象的实例变量进行操作时,会出现值被更改,值不同步的情况
线程安全问题表现为三个方面: 原子性,可见性、有序性
2.1 原子性
原子(Atomic)就是不可分割的意思。原子操作的不可分割有两层含义:
1)访问(读、写)某个共享变量的操作从其他线程来看,该操作要么已经执行完毕,要么尚未发生,即其他线程年示到当前操作的中间结果
2)访问同一组共享变量的原子操作时不能够交错的。如现实生活中从ATM机取款,对于用户来说,要么操作成功,用户拿到钱,余额减少了,增加了一条交易记录;要么没拿到钱,相当于取款操作没有发生。
java有两种方式实现原子性:一种是使用锁;另一种利用处理器的CAS(Compare and Swap)指令。
锁具有排他性,保证共享变了在某一时刻只能被一个线程访问。
CAS指令直接在硬件(处理器和内存)层次上实现,看作是硬件锁。
public class Test01 {
public static void main(String[] args) {
MyInt myInt = new MyInt();
for (int i = 0; i <=2 ; i++) {
new Thread(new Runnable() {
@Override
public void run() {
while (true) {
System.out.println(Thread.currentThread().getName() +"-->" +myInt.getNum());
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
}
}
static class MyInt{
int num;
public int getNum(){
return num++;
/**
* ++自增操作实现步骤:
* 读取num的值,
* num 自增,
* 把自增后的值在赋值给num变量
*/
}
}
}
在java中提供了一个线程安全的AtomicInteger类,保证了操作的原子性
public class Test01 {
public static void main(String[] args) {
MyInt myInt = new MyInt();
for (int i = 0; i <=2 ; i++) {
new Thread(new Runnable() {
@Override
public void run() {
while (true) {
System.out.println(Thread.currentThread().getName() +"-->" +myInt.getNum());
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
}
}
// 在java中提供了一个线程安全的AtomicInteger类,保证了操作的原子性
static class MyInt{
AtomicInteger num = new AtomicInteger();
public int getNum(){
return num.getAndAccumulate();
}
}
}
2.2 可见性
在多线程环境中,一个线程对某个共享变量进行更新之后,后续其他线程可能无法立即读取到这个更新的结果,这个就是线程安全问题的另外一种形式:可见性(visibility)
如果一个线程对共享变量更新后,后续访问该变量的其他线程可以读到更新的结果,称这个线程对共享变量的更新对其他线程可见,否则称这个线程对共享变量的更新对其他线程不可见
多线程程序因为可见性问题可能会导致其他线称读取到了旧数据(脏数据)
public class Test02 {
public static void main(String[] args) throws InterruptedException {
MyTesk tesk = new MyTesk();
new Thread(tesk).start();
Thread.sleep(1000);
// 主线程1s后取消子线程
tesk.cancel();
/**
* 可能会出现以下情况
* 在main线程中调用task.cancel()方法,把task对象的toCancel变量修改为true
* 可鞥存在展现出看不到main线程对toCancel做的修改,在子线程中toCancel变量一直为false
* 导致子线程看不到main线程对tocancel 变量更新的原因:可能:
* 1)JIT即时编译器可能,会对run方法中的while循环进行优化为:
* if(!toCancel){
* while (toCancel) {
* if(doSomething()){
* }
* }
* }
* 2) 可能与计算机的存储系统有关,假设分别有两个cup内核运行main线程与子线程,运行子线程的cpu无法立即读取运行main线程的CPu中的数据
*
*/
}
static class MyTesk implements Runnable{
private boolean toCancel = false;
@Override
public void run() {
while (!toCancel) {
if(doSomething()){
}
}
if (toCancel){
System.out.println("任务被取消");
} else {
System.out.println("任务正常结束");
}
}
private boolean doSomething() {
System.out.println("执行某个任务。。。");
try {
Thread.sleep(new Random().nextInt(1000)); // 模拟执行任务时间
} catch (InterruptedException e) {
e.printStackTrace();
}
return true;
}
public void cancel() {
toCancel = true;
System.out.println("收到取消线程的消息");
}
}
}
运行结果:
2.3 有序性
有序性(Ordering)是指在什么情况下一个处理器上运行的一个线程所执行的,内存访问操作在另外一个处理器运行的其他线程来看是乱序的(Out of Order)。
乱序是指内存访问操作的顺序看起来发生了变化。
2.3.1 重排序
在多核处理器的环境下,编写的顺序结构,这种操作执行的顺序可能是没有保障的:
- 编译器可能会改变两个操作的顺序;
- 处理器也可能不会按照目标代码的顺序执行;
这种一个处理器上执行的多个操作,在其他处理器来看它的顺序与目标代码指定的顺序可能不一样,这种现象称为重排序。
重排序是对内存访问有序操作的一种优化,可以在不影响单线程程序正确的情况下提升程序的性能。但是,可能对多线程程序的正确性产生影响,即可能导致线程安全问题。
重排序与可见性问题类型,不是必然出现的。
与内存操作循序有关的几个概念:
- 源代码顺序,就是源码中指定的内存访问顺序。
- 程序顺序,处理器上运行的目标代码所指定的内存访问顺序。
- 执行顺序,内存访问操作在处理器上实际执行顺序。
- 感知顺序,给定处理器所感知到的该处理器及其他处理器的内存访问操作的顺序。
可以把重排序分为指令重排序与存储子系统重排序两种。
指令重排序主要是由JIT编译器,处理器引起的,指程序顺序与执行顺序不一样。
存储子系统重排序是由高速缓存,写缓冲器引起的,感知顺序与执行顺序不一致
2.3.2 指令重排序
在源码顺序与程序顺序不一致,或者程序顺序与执行顺序不一致的情况下,我们就说发生了指令重排序(Instruction Reorder)。
指令重排序是一种动作,确实对指令的顺序做了调整,重排序的对象指令。
javac编译器一般不会执行指令重排序,而JIT编译器可能执行指令重排序。
处理器也可能执行指令重排序,使得执行顺序与程序顺序不一致。
指令重排不会对单线程程序的结果正确性产生影响,可能导致多线程程序出现非预期的结果。
2.3.3 存储子系统重排序
存储子系统是指写缓冲器与高速缓存。
- 高速缓存(Cache)是CPU中为了匹配与主内存处理速度不匹配而设计的一个高速缓存。
- 写缓冲器(Store buffer,Write buffer)用来提高写高速缓存操作的效率。
即使处理器严格按照程序顺序执行两个内存访问操作,在存储子系统的作用下,其他处理器对这两个操作的感知顺序与程序顺序不一致,即这两个操作的顺序,顺序看起来像是发生了变化,这种现象为存储子系统重排序。
存储子系统重排序并没有真正的对指定执行顺序进行调整,而是造成一种指令执行顺序被调整的假象。
存储子系统重排序对象是内存操作的结果,
从处理器角度来看,读内存就是从指定的RAM地址中加载数据到寄存器,称为Load操作;写内存就是把数据存储到指定的地址表示的RAM存储单元中,称为Store操作,内存重排序有以下四种可能:
- LoadLoad重排序,一个处理器先后执行两个读操作L1和L2,其他处理器对两个内存操作的感知顺序可能L2->L1;
- StoreStore重排序,一个处理器先后执行两个写操作W1和W2,其他处理器对两个内存操作的感知顺序可能W2->W1;
- LoadStore重排序,一个处理器先执行读内存操作L1,再执行写内存操作W1,其他处理器对两个内存操作的感知顺序可能是W1->L1;
- StoreLoad重排序:一个处理器先执行写内存操作W1,在执行读内存操作L1,其他处理器对两个内存操作的感知顺序可能是L1->W1;
内存重排序与具体的处理器微架构有关,不同架构的处理器所允许的内存重排序不同。
内存重排序可能会导致线程安全问题。假设有两个共享变量,int data=0;boolean ready = false;
2.3.4 貌似串行语义
JIT编译器、处理器、存储子系统是按照一定的规则对指令,内存操作的结果进行重排序,给单线程程序造成一种假象------指令是按照源码的顺序执行的,这种假象称为貌似串行语义。并不能保证多线程环境程序的正确性,
为了保证貌似串行语义,有数据依赖关系的雨具不会被重排序,只有不存在数据依赖关系的雨具才会被重排序。如果两个操作(指令)访问同一个变量,且其中一个操作(指令)为写操作,那么这两个操作之间就存在数据依赖关系(data dependency)。
如:
x=1;y=x+1;后一条语句的操作数包含前一条语句的执行结果;
y=x;x=1;先读取X变量,再更新x变量的值;
x=1;x=2;两条语句同时对一个变量进行写操作;
如果不存在数据依赖关系则可能重排序,如:
double price = 45.8;//可能重排序
int quantity = 10;// 可能重排序 double sum =
price * quantiry;// 不可能重排序
存在控制依赖关系的语句允许重排,一条语句(指令)的执行结果会决定另一条语句(指令)能否被执行,这两条语句(指令)存在控制依赖关系(Control Dependency)。如在if语句中运行重排,可能存在处理器先执行if代码块,在判断if条件是否成立。
2.3.5 保证内存访问的顺序性
可以使用volatile关键字,synchronized关键字实现有序性。
2.4 JAVA内存模型
三. 线程同步
3.1 线程同步机制简介
线程同步机制是一套用于协调线程之间的数据访问的机制,该机制可以保障线程安全。
java 平台提供的线程同步机制包括:锁,volatile 关键字,final关键字,static 关键字,以及相关的API,如Object.wait()/Object.notify()等。
3.2 锁概述
线程安全问题的产生前提是多个线程并发访问共享数据。
将多个线程对共享数据的并发访问,转换为串行访问,即一个共享数据一次只能被一个线程访问,锁就是利用这种思路来保障线程安全带。
锁(Lock)可以理解为对共享数据进行保护的一个许可证。对于同一个许可证保护的共享数据来说,任何线程想要访问这些共享数据必须先持有该许可证,一个线程只有在持有许可证的情况下,才能对这些共享数据进行访问,并且一个许可证一次只能被一个线程持有;许可证线程结束在结束对共享数据的访问后必须释放其持有的许可证。
一线程在访问共享数据前必须先获得锁,获得锁的线程称为锁的持有线程;一个锁一次只能被一个线程持有,所得持有线程在获得锁之后和释放锁之前,这段时间锁执行的代码称为临界区(Critical Section)。
锁具有排他性(Exclusive),即一个锁一次只能被一个线程持有。这种锁称为排它锁或互斥锁(Mutex)。
JVM 把锁分为内部锁和显示锁两种,内部锁通过synchrinized关键字实现;显示锁通过java.concurrent.looks.Lock 接口的实现类实现的。
3.2.1 锁的作用
锁可以实现对共享数据的安全访问,保障线程的原子性,可见性与有序性。
锁是通过互斥保障原子性。一个锁只能被一个线程持有,这就保证临界区的代码一次只能被一个线程执行。使得临界区代码所执行的操作自然而然的具有不可分割的特性,即具备了原子性。
可见性的保障是通过写线程来冲刷处理器的缓存和读线程刷新处理器缓存这两个动作实现的,在java平台中,锁的获得隐含着刷新处理器缓存的动作,锁的释放隐含着冲刷处理器缓存的动作。
锁能够保障有序性,写线程在临界区所执行的在读线程所执行的临界区看来像是完全按照源码顺序执行的。
注意:使用锁保障线程的安全性,必须满足以下条件:
- 这些线程在访问共享数据时必须使用同一个锁。
- 即使是读取共享数据的线程也需要使用同步锁。
3.3.2 锁相关的概念
-
可重入性
可重入性(Reentrancy)描述这样一个问题:一个线程持有该锁的时候能再次(多次)申请该锁。
如果一个线程持有一个锁的时候,还能够继续成功申请该锁,称该锁是可重入的,否则就称为不可重入的。 -
锁的争用与调度
java 平台中内部锁属于非公平锁,显示Lock锁即支持公平锁又支持非公平锁。 -
锁的粒度
一个锁可以保护的共享数据的数量大小称为锁的粒度。
锁保护共享数据量大,称该锁的粒度粗,否则就称该锁的粒度细。
锁的粒度过粗,会导致线程在申请锁时会进行不必要的等待;锁的粒度过细会增加锁调度的开销。
3.3 内部锁:synchronized关键字
java中的每个对象都有一个与之关联的内部锁(Intrinsic lock),这种锁也称为监视器(Monitor),这种内部锁是一种排他锁,可以保障原子性,可见性与有序性。
内部锁时通过synchronized关键字实现的,synchronized关键字修饰代码块,修饰该方法。
修饰代码块的语法:
synchronized(对象锁) {
同步代码块,可以在同步代码块中访问共享数据
}
修饰示例方法就称为同步实例方法
修饰静态方法称为同步静态方法
3.3.1 synchronized同步代码块
3.3.1.1 同步代码块 this 锁对象
/**
* synchronized同步代码块
* this 锁对象
*/
public class Test01 {
public static void main(String[] args) {
// 创建两个线程,分别调用mm()方法,
// 先创建Test01对象,通过对象名调用mm()方法
Test01 obj = new Test01();
new Thread(new Runnable() {
@Override
public void run() {
obj.mm(); // 使用的锁对象就是obj对象
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
obj.mm(); // 使用的锁对象也是obj对象
}
}).start();
}
// 定义方法,打印100行字符串
public void mm() {
synchronized (this) { // 经常使用this 当前对象作为锁对象
for (int i = 1; i<=100; i++) {
System.out.println(Thread.currentThread().getName() + "--->" + i);
}
}
}
}
运行效果
3.3.1.2 锁对象不同,不能实现同步
3.3.1.3 使用常量作为锁对象
3.3.1.4 不管是实例方法还是静态方法,只要是同一个锁对象,就能实现同步
/**
* synchronized同步代码块
* 如果线程的锁不同,不能实现同步,
* 想要实现同步必须使用同一个锁对象。
*/
public class Test04 {
public static void main(String[] args) {
// 创建两个线程,分别调用mm()方法,
// 先创建Test01对象,通过对象名调用mm()方法
Test04 obj = new Test04();
Test04 obj2= new Test04();
new Thread(new Runnable() {
@Override
public void run() {
obj.mm(); // 使用的锁对象就是obj常量
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
obj2.mm(); // 使用的锁对象也是obj常量
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
sm(); // 使用的锁对象也是obj常量
}
}).start();
}
public static final Object OBJ = new Object(); // 定义一个常亮
// 定义方法,打印100行字符串
public void mm() {
synchronized (OBJ) { // 也可以使用常亮对象作为锁对象
for (int i = 1; i<=100; i++) {
System.out.println(Thread.currentThread().getName() + "--->" + i);
}
}
}
// 定义方法,打印100行字符串
public static void sm() {
synchronized (OBJ) { // 也可以使用常亮对象作为锁对象
for (int i = 1; i<=100; i++) {
System.out.println(Thread.currentThread().getName() + "--->" + i);
}
}
}
}
运行结果
3.3.2 同步方法
3.3.2.1 同步实例方法
/**
* synchronized同步示例方法
* 把整个方法体作为同步代码块
* 默认的锁对象是this对象
* this 锁对象
*/
public class Test05 {
public static void main(String[] args) {
// 先创建Test01对象,通过对象名调用mm()方法
Test05 obj = new Test05();
// 一个线程调用mm 方法
new Thread(new Runnable() {
@Override
public void run() {
obj.mm(); // 使用的锁对象就是obj对象
}
}).start();
// 另一个线程调用mm2方法
new Thread(new Runnable() {
@Override
public void run() {
obj.mm2(); // 使用的锁对象也是obj对象,可以同步
// new Test05().mm2(); // 使用的锁对象是刚刚new创建的一个新对象,不是同一个锁对象,不能同步
}
}).start();
}
// 定义方法,打印100行字符串
public void mm() {
synchronized (this) { // 经常使用this 当前对象作为锁对象
for (int i = 1; i<=100; i++) {
System.out.println(Thread.currentThread().getName() + "--->" + i);
}
}
}
// 使用 synchronized 修饰实例方法,同步实例方法,默认this作为锁对象
public synchronized void mm2() {
for (int i = 1; i<=100; i++) {
System.out.println(Thread.currentThread().getName() + "--->" + i);
}
}
}
运行结果
3.3.2.2 同步静态方法
/**
* synchronized同步示例方法
* 把整个方法体作为同步代码块
* 默认的锁对象是当前类的运行时类对象,Test06.class, 有人称它为类锁
* this 锁对象
*/
public class Test06 {
public static void main(String[] args) {
// 先创建Test01对象,通过对象名调用mm()方法
Test06 obj = new Test06();
// 一个线程调用mm 方法
new Thread(new Runnable() {
@Override
public void run() {
obj.m1(); // 使用的锁对象就是obj对象
}
}).start();
// 另一个线程调用mm2方法
new Thread(new Runnable() {
@Override
public void run() {
Test06.sm2(); // 使用的锁对象也是obj对象,可以同步
// new Test05().mm2(); // 使用的锁对象是刚刚new创建的一个新对象,不是同一个锁对象,不能同步
}
}).start();
}
// 定义方法,打印100行字符串
public void m1() {
// 使用当前类的运行时类对象作为锁对象,可以简单的理解为把Test06类的字节码文件作为锁对象。
synchronized (Test06.class) {
for (int i = 1; i<=100; i++) {
System.out.println(Thread.currentThread().getName() + "--->" + i);
}
}
}
// 使用 synchronized 修饰静态方法,同步静态方法,默认运行时类Test06.class作为锁对象
public synchronized static void sm2() {
for (int i = 1; i<=100; i++) {
System.out.println(Thread.currentThread().getName() + "--->" + i);
}
}
}
运行结果
3.3.2.3 同步方法 和 同步代码块哪个执行效率高
package com.company.intrinsiclock;
/**
* 同步方法与同步代码块如何选择
* 同步方法:锁的粒度粗,并发效率低,同步代码块执行效率高。
*/
public class Test07 {
public static void main(String[] args) {
Test07 obj = new Test07();
new Thread(new Runnable() {
@Override
public void run() {
obj.doLongTimeTask();
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
obj.doLongTimeTask();
}
}).start();
}
// 同步代码块,锁的粒度细,执行效率高
// public void doLongTimeTask() {
// try {
// System.out.println("Tesk Begin");
// Thread.sleep(3000); // 模拟这个任务需要准备3秒钟
// synchronized (this){
// System.out.println("开始同步");
// for (int i = 1; i <=100 ; i++) {
// System.out.println(Thread.currentThread().getName() +"-->"+ i);
// }
// }
// System.out.println("Task end");
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// }
// 同步方法,执行效率低
public synchronized void doLongTimeTask() {
try {
System.out.println("Tesk Begin");
Thread.sleep(3000); // 模拟这个任务需要准备3秒钟
System.out.println("开始同步");
for (int i = 1; i <=100 ; i++) {
System.out.println(Thread.currentThread().getName() +"-->"+ i);
}
System.out.println("Task end");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
3.3.2.4 脏读
/**
* 脏读
* 出现读取属性值出现了一些意外,读取的是中间值,而不是修改之后的值。
* 出现脏读的原因是 对共享数据的修改与对共享数据的去读,有可能出现脏读,
* 解决方法:
* 不仅对修改数据的代码块进行同步,还要对读取数据的代码块同步
*/
public class Test08 {
public static void main(String[] args) throws InterruptedException {
// 开启子线程设置用户名和密码
PublicValue publicValue = new PublicValue();
SubThread t1 = new SubThread(publicValue);
t1.start();
// 为了确定设置成功
Thread.sleep(100);
// 在main线程中读取用户名,密码
publicValue.getValue();
}
// 定义线程,设置用户名和密码
static class SubThread extends Thread{
private PublicValue publicValue;
public SubThread(PublicValue publicValue) {
this.publicValue = publicValue;
}
@Override
public void run() {
publicValue.setValue("bjpowernode","123");
}
}
static class PublicValue {
private String name = "wocto";
private String pwd = "666";
public void getValue(){
System.out.println(Thread.currentThread().getName() + ", getter -- name:" +name + ", -- pwd:" +pwd);
}
public void setValue(String name,String pwd) {
this.name = name; // 模拟操作name属性需要一定时间。
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
this.pwd = pwd;
System.out.println(Thread.currentThread().getName() + ", getter -- name:" +name + ", -- pwd:" +pwd);
}
}
}
运行结果:出现脏读
解决:
3.3.2.5 线程出现异常,会自动释放锁
package com.company.intrinsiclock;
/**
*
* 同步过程中出现异常
*/
public class Test09 {
public static void main(String[] args) {
// 先创建Test01对象,通过对象名调用mm()方法
Test09 obj = new Test09();
// 一个线程调用mm 方法
new Thread(new Runnable() {
@Override
public void run() {
obj.m1(); // 使用的锁对象就是obj对象
}
}).start();
// 另一个线程调用mm2方法
new Thread(new Runnable() {
@Override
public void run() {
Test09.sm2(); // 使用的锁对象也是obj对象,可以同步
// new Test05().mm2(); // 使用的锁对象是刚刚new创建的一个新对象,不是同一个锁对象,不能同步
}
}).start();
}
// 定义方法,打印100行字符串
public void m1() {
// 使用当前类的运行时类对象作为锁对象,可以简单的理解为把Test06类的字节码文件作为锁对象。
synchronized (Test09.class) {
for (int i = 1; i<=100; i++) {
System.out.println(Thread.currentThread().getName() + "--->" + i);
if(i==50){
Integer.parseInt("abc"); // 把字符串转换为int类型时,如果字符串不符合数字格式会产生异常
}
}
}
}
// 使用 synchronized 修饰静态方法,同步静态方法,默认运行时类Test06.class作为锁对象
public synchronized static void sm2() {
for (int i = 1; i<=100; i++) {
System.out.println(Thread.currentThread().getName() + "--->" + i);
}
}
}
运行结果
3.3 死锁
在多线程程序中,同步时可能需要使用多个锁,如果获得锁的顺序不一致,可能会导致死锁;
如何避免死锁?
当需要获得多个锁时,所有线程获得锁的顺序保持一致即可
/**
* 死锁
* 在多线程程序中,同步时可能需要使用多个锁,如果获得锁的顺序不一致,可能会导致死锁;
* 如何避免死锁?
* 当需要获得多个锁时,所有线程获得锁的顺序保持一致即可
*/
public class Test10 {
public static void main(String[] args) {
SubTread t1 = new SubTread();
t1.setName("a");
t1.start();
SubTread t2 = new SubTread();
t2.setName("b");
t2.start();
}
static class SubTread extends Thread {
private static final Object lock1= new Object();
private static final Object lock2= new Object();
@Override
public void run() {
if("a".equals(Thread.currentThread().getName())) {
synchronized (lock1){
System.out.println("a线程获得了Lock1锁,还需要获得lock2锁");
synchronized (lock2){
System.out.println("a线程获得lock1后又获得lock2,可以干任何想做的事");
}
}
}
if("b".equals(Thread.currentThread().getName())) {
synchronized (lock2){
System.out.println("a线程获得了Lock2锁,还需要获得lock1锁");
synchronized (lock1){
System.out.println("a线程获得lock2后又获得lock1,可以干任何想做的事");
}
}
}
}
}
}
3.4 轻量级同步机制:volative 关键字
3.4.1 volatile 关键字
volatile关键的作用使变量在多个线程之间可见,
为什么要使用volatile
package com.company.volatilekw;
public class Test01 {
public static void main(String[] args) {
// 创建PrintString对象
PrintString printString = new PrintString();
// 调用方法打印字符串
printString.printStringMethod();
//main 线程睡眠1000毫秒
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("在main线程中修改打印标志");
printString.setContinuePrint(false);
//修改完打印标志后,运行程序,查看程序运行结果
//程序根本不会停止,因为printString.printStringMethod();一直处于死循环状态,程序根本执行不到printString.setContinuePrint(false); 语句
// 解决方法:可以使用多线程技术
}
static class PrintString{
private boolean continuePrint = true;
public PrintString setContinuePrint(boolean continuePrint){
this.continuePrint = continuePrint;
return this;
}
public void printStringMethod() {
while (continuePrint) {
System.out.println(Thread.currentThread().getName() +"开始。。。");
while (continuePrint) {
System.out.println("sub thread...");
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() +"结束。。。");
}
}
}
}
使用多线程案例:
public class Test02 {
public static void main(String[] args) {
// 创建PrintString对象
PrintString printString = new PrintString();
// 开启子线程,让子线程执行pringString对象的printStringMethod()方法
new Thread(new Runnable() {
@Override
public void run() {
// 调用方法打印字符串
printString.printStringMethod();
}
}).start();
//main 线程睡眠1000毫秒
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("在main线程中修改打印标志");
printString.setContinuePrint(false);
// 程序运行,查看main线程中修改了打印标志之后,子线程打印是否可以结束打印。
// 程序运行后,可能会出现死循环情况:
// 分析原因:main 线程修改了printString对象的打印标志后,子线程读取不到
// 解决办法:使用volatile关键字修饰printString对象的打印标志,
// volatile的作用可以强制线程从公共内存中读取变量的值,而不是从工作内存中读取。
}
static class PrintString{
// private boolean continuePrint = true;
private volatile boolean continuePrint = true;
public PrintString setContinuePrint(boolean continuePrint){
this.continuePrint = continuePrint;
return this;
}
public void printStringMethod() {
System.out.println(Thread.currentThread().getName() +"开始。。。");
while (continuePrint) {
// System.out.println("sub thread...");
// try {
// Thread.sleep(500);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
}
System.out.println(Thread.currentThread().getName() +"结束。。。");
}
}
}
volatile 与Synchronized 比较
- volatile关键字是线程同步的轻量级实现,所以volatile性能肯定比Synchronized要好,volatile只能修饰变量,而Synchronized可以修饰方法、代码块。随着JDK新版本的发布,synchronized的执行效率也有较大的提升,在开发使用Synchronized的比率还是很大的。
- 多线程访问volatile 变量不会发生阻塞,而Synchronized可能会阻塞
- volatile 能保证数据的可见性,但是不能保证原子性;而Synchronized可以保证原子性,也可以保证可见性。
- 关键字volatile解决的是变量在多个线程之间的可见性;而Synchronized关键字解决多个线程之间的访问公共资源同步性。
3.4.2 volatile 非原子特性
volatile 关键字增加了实例变量在多个线程之间的可见性,但是不具备原子性。
public class Test03 {
public static void main(String[] args) {
// 在main线程中创建10个子线程
for (int i = 0; i < 100 ; i++) {
new MyThread().start();
}
}
static class MyThread extends Thread {
// volatile 关键仅仅是表示所有线程从主内存读取count变量的值。
/*
// 这段代码运行后不是线程安全的,想要线程安全,需要使用Synchronized进行同步,如果使用Synchronized同时也就不需要volatile了
volatile public static int count;
public static void addCount() {
for (int i = 0; i <1000 ; i++) {
// count ++ 不是原子操作
count++;
}
System.out.println(Thread.currentThread().getName() +"count = " +count);
}
*/
/**
*
*/
public static int count;
public synchronized static void addCount() {
for (int i = 0; i <1000 ; i++) {
// count ++ 不是原子操作
count++;
}
System.out.println(Thread.currentThread().getName() +"count = " +count);
}
@Override
public void run() {
addCount();
}
}
}
3.4.3 常用原子类进行自增自减操作
我们知道i++ 操作不是原子操作,除了使用Synchronized进行同步外,也可以使用AtomicInteger/AtomicLong 原子类进行实现。
package com.company.volatilekw;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 使用原子类进行自增
*/
public class Test04 {
public static void main(String[] args) {
// 在main线程中创建10个子线程
for (int i = 0; i < 100 ; i++) {
new MyThread().start();
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(MyThread.count.get());
}
static class MyThread extends Thread {
// 使用AtomicInteger对象
private static AtomicInteger count = new AtomicInteger();
public synchronized static void addCount() {
for (int i = 0; i <1000 ; i++) {
// 自增的后缀形式
count.getAndIncrement();
}
System.out.println(Thread.currentThread().getName() +"count = " +count.get());
}
@Override
public void run() {
addCount();
}
}
}
运行结果
3.5 CAS
3.5.1 CAS概述
CAS(Compare And Swap)是由硬件实现的。
CSA可以将read-modify-write 这类操作转换为原子操作。
i++自增操作包括三个子操作:
从主内存读取i变量值,
对i的值加1
在把加1之后的值保存到主内存
CAS原理:在把数据更新到主内存时,再次读取主内存变量的值,如果现在变量的值与期望的值(操作起始时读取的值)一样就更新。
3.5.2 使用CAS实现线程安全的计数器:
package com.company.cas;
/**
* 使用CAS 实现一个线程安全的计数器
*/
public class CasTest {
public static void main(String[] args) {
CASCounter casCounter = new CASCounter();
for (int i = 0; i <1000 ; i++) {
new Thread(new Runnable() {
@Override
public void run() {
System.out.println(casCounter.incrementAndGet());
}
}).start();
}
}
}
class CASCounter {
volatile private long value;
public long getValue(){
return value;
}
// 定义comate and swap 方法
private boolean compareAndSwap(long expectedValue,long newValue){
// 如果当前value的值与期望的expectedValue值不一样,就把当前的value字典替换为newValue值
synchronized (this) {
if(value == expectedValue) {
value = newValue;
return true;
} else {
return false;
}
}
}
// 定义自增的方法
public long incrementAndGet(){
long oldValue;
long newValue;
do {
oldValue = value;
newValue = oldValue +1;
} while (!compareAndSwap(oldValue,newValue));
return value;
}
}
运行结果
3.5.3 CAS中ABA问题
CAS 实现原子操作背后有一个假设:共享变量的当前值与当前线程提供的期望值相同,就认为这个变量没有被其他线程修改过。
实际上这种假设不一定总是成立,如有共享变量:count = 0;
A线程对count修改为10;
B线程对count修改为20 ;
C线程对count 修改为10;
当前线程看到count变量的值现在是0,现在是否认为count变量的值没有被其他线程更新呢?这种结果是否能够接受?
这就是CAS中的ABA问题,即共享变量经历了A->B->A的更新。
是否能够接受ABA问题跟实现算法有关。
如果想要规避ABA问题,可以为共享变量引入一个修订号(时间戳),没次修改共享变量时,相应的修订号就会增加1。ABA变量的更新过程:[A,0]-》[B,1]-》[A,2],每次对共享变量的修改都会导致修订号的增加,通过修订号依然可以准确判断变量是否被其他线程修改过。AtomicStampedReference
类就是基于这种思想产生的。
3.6 原子变量类
原子变量类基于CSA实现的,当对共享变量进行read-modefy-write更新操作时,通过原子变量类可以保障操作的原子性与可见性,对变量的read-modefy-write 更新操作时指当前操作不是一个简单的赋值,而是变量的新值依赖变量的旧值,如自增操作i++;由于volatile 只能保证可见性,无法保障原子性,原子变量类的内部就是借助一个volatile变量,并且保障了该变量的read-modefy-write操作的原子性,有时把原子变量类看作增强的volatile变量,原子变量类有12个
如
分组 | 原子变量类 |
---|---|
基础数据型 | AtomicInteger,AtomicLong,AtomicBoolean |
数组型 | AtomicIntegerArray, AtomicLongArray,AtomicReferenceArray |
字段更新器 | AtomicIntegerFieldUpdater,AtomicLongFieldUpdaer,AtomicReferenceFieldUpdater |
引用型 | AtomicReference,AtomicStampedRefernce,AtomicMarkableReference |
3.6.1 AtomicLong
使用原子变量类定义一个计数器
该计数器,在整个程序中都能使用,并且所有的地方都使用这一个计数器,这个计数器可以设计为单例
import java.util.concurrent.atomic.AtomicLong;
/**
* 使用原子变量类定义一个计数器
* 该计数器,在整个程序中都能使用,并且所有的地方都使用这一个计数器,这个计数器可以设计为单例
*/
public class Indecator {
// 构造方法私有化
private Indecator(){}
//定义一个私有的本类静态的对象
private static final Indecator indecator= new Indecator();
// 提供一个公共静态方法返回该类唯一实例;
public static Indecator getInstance() {
return indecator;
}
// 使用原子变量类保存请求总数,成功数,失败数。
private final AtomicLong requestCount = new AtomicLong(0); // 记录请求总数
private final AtomicLong successCount = new AtomicLong(0); // 处理成功总数
private final AtomicLong fialureCount = new AtomicLong(0); // 处理失败总数
// 有新的请求
public void newRequestReceive(){
requestCount.incrementAndGet();
}
// 处理成功
public void requestProcessSuccess(){
successCount.incrementAndGet();
}
// 处理失败
public void requestProcessFailure() {
fialureCount.incrementAndGet();
}
// 查看总数,成功数,失败数
public long getRequestCount(){
return requestCount.get();
}
public long getSuccessCount(){
return successCount.get();
}
public long getFailureCount(){
return fialureCount.get();
}
}
/**
* 模拟服务器的请求总数,处理成功数,处理失败数
*/
public class Test {
public static void main(String[] args) {
// 通过线程模拟请求,在实际应用中可以通过ServletFilter中调用Indicator计数器的相关方法。
for (int i = 0; i < 10000; i++) {
new Thread(new Runnable() {
@Override
public void run() {
// 每个线程就是一个请求,请求总数要加1
Indecator.getInstance().newRequestReceive();
int num = new Random().nextInt();
if(num%2 == 0) {
//偶数模拟成功
Indecator.getInstance().requestProcessSuccess();
}else {
Indecator.getInstance().requestProcessFailure();
}
}
}).start();
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Indecator.getInstance().getRequestCount()); // 总请求数
System.out.println(Indecator.getInstance().getSuccessCount()); //成功数
System.out.println(Indecator.getInstance().getFailureCount()); // 失败数
}
}
运行结果:
3.6.2 AtomicIntegerArray
原子更新数组
3.6.2.1 AtomicIntegerArray 基本操作
/**
* AtomicIntegerArray的基本操作
* 原子更新数组
*/
public class Test {
public static void main(String[] args) {
// 创建一个指定长度的原子数组。
AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(10);
System.out.println(atomicIntegerArray); // [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
// 返回指定位置元素
System.out.println(atomicIntegerArray.get(0)); // 0
System.out.println(atomicIntegerArray.get(1)); // 0
// 设置指定位置元素
atomicIntegerArray.set(0,10);
// 在设置数组元素的新值时,同时返回数组元素的旧值
System.out.println(atomicIntegerArray.getAndSet(1,11)); // 0
System.out.println(atomicIntegerArray); // [10, 11, 0, 0, 0, 0, 0, 0, 0, 0]
// 修改数组元素的值,把数组元素加上某个值
System.out.println(atomicIntegerArray.addAndGet(0,22)); //32
System.out.println(atomicIntegerArray.getAndAdd(1,33)); // 11
System.out.println(atomicIntegerArray); //[32, 44, 0, 0, 0, 0, 0, 0, 0, 0]
//CAS操作
// 如果数组中索引值为0的元素值为32,就将值修改为222
System.out.println(atomicIntegerArray.compareAndSet(0,32,222)); //true
System.out.println(atomicIntegerArray); // [222, 44, 0, 0, 0, 0, 0, 0, 0, 0]
System.out.println(atomicIntegerArray.compareAndSet(1,11,333)); // false
System.out.println(atomicIntegerArray); //[222, 44, 0, 0, 0, 0, 0, 0, 0, 0]
// 自增/自减
System.out.println(atomicIntegerArray.incrementAndGet(0)); //223 先赋值后返回
System.out.println(atomicIntegerArray.getAndIncrement(1)); // 44 先返回,后新增
System.out.println(atomicIntegerArray); //[223, 45, 0, 0, 0, 0, 0, 0, 0, 0]
System.out.println(atomicIntegerArray.decrementAndGet(2)); // -1 先减一后返回
System.out.println(atomicIntegerArray.getAndDecrement(3)); // 0 先返回后减1
}
}
3.6.2.1 多线程中使用原子数组
把原子数组的每个元素自增1000次。
import java.util.concurrent.atomic.AtomicIntegerArray;
/**
* 在多线程中使用AtomicIntegerArray原子数组
*/
public class Test02 {
// 定义原子数组
static AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(10);
public static void main(String[] args) {
// 定义线程数组
Thread[] threads = new Thread[10];
// 给线程数组元素赋值
for (int i = 0; i < threads.length ; i++) {
threads[i] = new AddThread();
}
// 开启子线程
for (Thread thread: threads) {
thread.start();
}
// 把主线程中查看自增玩以后原子数组中的各个元素的值,在主线程中需要再所有子线程都执行完后再查看
// 把所有的子线程合并到当前主线程中。
for (Thread thread: threads){
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(atomicIntegerArray);
}
// 定义一个线程类, 在线程类中修改原子数组
static class AddThread extends Thread {
@Override
public void run() {
// 把原子数组的每个元素自增1000次。
for (int i = 0; i < 1000 ; i++) {
for (int j=0; j<atomicIntegerArray.length();j++){
atomicIntegerArray.getAndIncrement(j%atomicIntegerArray.length());
}
}
}
}
}
运行结果:
3.6.3 AtomicIntegerFieldUpdater原子整数字段进行更新
要求:
1) 字符必须使用volatile修饰,使线程之间可见;
2)只能是实例变量,不能是静态变量,也不能是final变量。
/**
* 使用AtomicIntegerFieldUpdater更新的字段必须使用volaile修饰
*/
public class User {
int id;
volatile int age;
public User(int id,int age) {
this.id = id;
this.age = age;
}
@Override
public String toString() {
return "User{" +
"id=" + id +
", age=" + age +
'}';
}
}
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
/**
* 线程类
*/
public class SubThread extends Thread {
private User user; // 要更新的User对象
// 创建AtomicIntegerFieldUpdater更新器
private AtomicIntegerFieldUpdater<User> Updater= AtomicIntegerFieldUpdater.newUpdater(User.class,"age");
public SubThread(User user){
this.user = user;
}
@Override
public void run() {
// 在子线程中对User 对象的age字段自增10次
for (int i = 0; i < 10 ; i++) {
System.out.println(Updater.getAndIncrement(user));
}
}
}
public class Test {
public static void main(String[] args) {
User user = new User(1234,10);
// 开启10个线程
for (int i = 0; i < 10; i++) {
new SubThread(user).start();
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(user.toString());
}
}
运行结果:
3.6.4 AtomicReference 原子操作对象
3.6.4.1AtomicReference 创建100个线程修改字符串
import java.util.Random;
import java.util.concurrent.atomic.AtomicReference;
/**
* 使用AtomicReference 原子读写一个对象
*/
public class Test01 {
// 创建一个AtomicReference 对象
static AtomicReference<String> atomicReference = new AtomicReference<>("abc");
public static void main(String[] args) throws InterruptedException {
// 创建100个线程修改字符串
for (int i = 0; i < 100 ; i++) {
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(new Random().nextInt(20));
} catch (InterruptedException e) {
e.printStackTrace();
}
if (atomicReference.compareAndSet("abc","def")) {
System.out.println(Thread.currentThread().getName() +"把字符串abc更改为def");
}
}
}).start();
}
// 在创建100个线程
for (int i = 0; i < 100 ; i++) {
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(new Random().nextInt(20));
} catch (InterruptedException e) {
e.printStackTrace();
}
if (atomicReference.compareAndSet("def","abc")) {
System.out.println(Thread.currentThread().getName() +"把字符串def还原为abc");
}
}
}).start();
}
Thread.sleep(1000);
System.out.println(atomicReference.get());
}
}
运行结果:
3.6.4.2 AtomicReference 中的ABA问题演示
package com.company.atomics.atimicreference;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
/**
* 演示AtomicREference 可能出现CAS的ABA问题
*/
public class Test02 {
private static AtomicReference<String> atomicReference = new AtomicReference<>("abc");
public static void main(String[] args) throws InterruptedException {
// 创建第1个线程,先把abc字符串改为def,再把字符串还原为abc
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
atomicReference.compareAndSet("abc","def");
System.out.println(Thread.currentThread().getName() +"---" +atomicReference.get());
atomicReference.compareAndSet("def","abc");
}
});
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(atomicReference.compareAndSet("abc","ghg"));
}
});
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println(atomicReference.get());
}
}
运行结果:
3.6.4.3 AtomicStampedReference 原子类可以解决CAS中的ABA问题
package com.company.atomics.atimicreference;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicStampedReference;
/**
* AtomicStampedReference 原子类可以解决CAS中的ABA问题
* 在AtomicStampedReference 原子类中有一个整数标记值stamp,每次执行CAS操作时,需要对比它的版本,即比较stamp的值,
*/
public class Test03 {
// private static AtomicReference<String> atomicReference = new AtomicReference<>("abc");
// 定义AtomicStampedReference 引用操作“abc”字符串,指定初始化版本号为0
public static AtomicStampedReference<String> atomicStampedReference = new AtomicStampedReference<>("abc",0);
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
atomicStampedReference.compareAndSet("abc","def",atomicStampedReference.getStamp(),atomicStampedReference.getStamp()+1);
System.out.println(Thread.currentThread().getName() +"---"+ atomicStampedReference.getReference());
atomicStampedReference.compareAndSet("def","abc",atomicStampedReference.getStamp(),atomicStampedReference.getStamp()+1);
}
});
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
int stamp = atomicStampedReference.getStamp(); // 获得版本号
System.out.println(atomicStampedReference.compareAndSet("abc","ggg",stamp,stamp+1));
}
});
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println(atomicStampedReference.getReference());
}
}
运行结果:
四、线程间的通信
4.1 等待/通知机制
4.1.1 什么是等待通知机制
在单线程编程中,要执行的操作需要满足一定的条件才能执行,可以把这个操作放在if语句块中。
在多线程编程中,可能A线程的条件没有满足只是暂时的,稍后其他的线程B可能会更新条件,使得A线程的条件得到满足,可以将A线程暂停,直到它的条件得到满足后再将A线程唤醒。
它的伪代码:
atomics {
while(条件不成立){
等待
}
当前线程被唤醒,条件满足后,继续执行下面的操作
}
4.1.2 等待/通知机制的实现
Object类中的wait() 方法可以使执行当前代码的线程等待,暂停执行,直到接到通知或被中断为止。
注意:
1.wait()方法只能在同步代码块中由锁对象调用。
2.调用wait()方法,当前线程会释放锁。
它的伪代码:
// 在调用wait方法前获得对象的内部锁
synchronized(锁对象){
while(条件不成立){
// 通过锁对象调用wait()方法暂停线程,会释放锁对象
锁对象.wait();
}
// 线程的条件满足了,继续向下执行
}
Object类的notify() 可以唤醒线程,该方法也必须在同步代码块中由锁对象调用,没有使用锁对象调用wait()/notify()会抛出IIegaIMonitorStateException异常,如果有多个等待的线程,nodify()方法只能唤醒其中的一个。在同步代码块中调用notify()方法后,并不会立即释放锁对象。需要等当前同步代码块执行完后才会释放锁对象,一般将notify方法放在同步代码块的最后,
它的伪代码如下:
// 在调用wait方法前获得对象的内部锁
synchronized(锁对象){
// 执行修改保护条件的代码
// 唤醒其他线程
锁对象.notify();
}
实例代码
例1:演示wait 必须不放在同步代码块的效果
/**
* 演示wait()/notify()方法需要放在同步代码块中,否则会抛出java.lang.IllegalMonitorStateException 异常
* 任何对象都可以调用wait()/notify(),这两个方法是从Object类继承来的
*/
public class Test01 {
public static void main(String[] args) {
String test = "wkito";
try {
test.wait(); // Exception in thread "main" java.lang.IllegalMonitorStateException 异常
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
例2:wait()方法的正确使用
/**
* wait()会使线程等待
* 需要放在同步代码块中,通过锁对象调用
*/
public class Test02 {
public static void main(String[] args) {
try {
String text = "wkcto";
String another = "hello";
System.out.println("同步前的代码");
synchronized (text) {
System.out.println("同步代码块开始。。。");
text.wait(); // 调用wait()方法后,当前线程就会等待,释放锁对象。当前线程需要被唤醒,如果没有唤醒就会一直等待。
// another.wait(); //不是锁对象调用,会产生java.lang.IllegalMonitorStateException 异常
System.out.println("同步代码块结束。。。");
}
System.out.println("同步代码块后面的代码。。。");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
例3:wati()/notify()完整的实例
/**
* 需要通过notify() 唤醒等待的线程
*/
public class Test03 {
public static void main(String[] args) throws InterruptedException {
String lock = "wkcto"; // 定义一个字符串作为锁对象
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
synchronized (lock) {
System.out.println("线程1开始等待:" + System.currentTimeMillis());
try {
lock.wait(); // 线程等待,会释放锁对象,当前线程会转入blocked阻塞状态。
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程1结束等待:" + System.currentTimeMillis());
}
}
});
// 定义第2个线程,在第二个线程唤醒第1个线程
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
// notify() 方法也需要在同步代码块中,由锁对象调用
synchronized (lock) {
System.out.println("线程2开始唤醒:" + System.currentTimeMillis());
lock.notify(); // 唤醒在lock 锁对象上等待的某一个线程
System.out.println("线程2结束唤醒:" + System.currentTimeMillis());
}
}
});
t1.start(); // 开启t1线程,t1线程等待
Thread.sleep(3000); // mian线程睡眠3秒,确保t1入睡
t2.start(); // t1线程开启3秒后,再开启t2线程唤醒t1线程
}
}
4.1.3 notify() 方法后不会立即释放锁对象
/**
* notify()不会立即释放锁对象
*/
public class Test04 {
public static void main(String[] args) throws InterruptedException {
// 定义一个list集合存储String数据
List<String> list = new ArrayList<>();
// 定义一个线程,当list集合中元素的数量不等于5时线程等待
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
synchronized (list) {
if (list.size() != 5) {
System.out.println("线程1开始等待:" + System.currentTimeMillis());
try {
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程1被唤醒:" + System.currentTimeMillis());
}
}
}
});
// 定义第二个线程,向list结合中添加元素
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
synchronized (list) {
for (int i = 0; i < 10; i++) {
System.out.println("线程2添加第" + (i+1) + "个数据");
list.add("data==" + i);
//判断元素的数量是否满足唤醒线程1
if (list.size() == 5) {
list.notify(); // 唤醒线程,不会立即释放锁对象,需要等到当前同步代码块都执行完后才能是否锁对象
System.out.println("线程2已经发出唤醒通知");
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
});
t1.start();
//为了确保线程1成功进入睡眠,这里将主线程睡一会
Thread.sleep(500);
t2.start();
}
}
运行结果:
4.1.4 interrupt()方法会中断wait()
当线程处于wait()等待状态时,调用线程对象的interrupt()方法会中断线程的等待状态,会产生interruptedException异常。
/**
* Interrupt()会中断线程的wait()等待
*/
public class Test05 {
public static void main(String[] args) throws InterruptedException {
SubThread t = new SubThread();
t.start();
Thread.sleep(2000); // 主线程睡眠2秒,确保子线程处于wait等待状态
t.interrupt();
}
private static final Object Lock = new Object(); // 定义常量作为锁对象
static class SubThread extends Thread {
@Override
public void run() {
synchronized (Lock) {
try {
System.out.println("begin wait...");
Lock.wait();
System.out.println("end wait...");
} catch (InterruptedException e) {
System.out.println("wait()等待被中断");
}
}
}
}
}
运行结果:
4.1.5 notify() 与notifyAll()
notify()一次只能唤醒1个线程,如果有多个等待的线程,只能随机唤醒其中的某一个; 想要唤醒所以等待线程,需要调用notifyAll()
/**
* notify() 与notifyAll()
*/
public class Test06 {
public static void main(String[] args) throws InterruptedException {
Object lock = new Object(); // 定义一个对象作为子线程的锁对象
SubThread t1 = new SubThread(lock);
SubThread t2 = new SubThread(lock);
SubThread t3 = new SubThread(lock);
t1.setName("t1");
t2.setName("t2");
t3.setName("t3");
t1.start();
t2.start();
t3.start();
Thread.sleep(2000);
//调用notify() 唤醒子线程
synchronized (lock) {
// lock.notify(); // 调用1次notify()只能唤醒其中的一个线程,其他等待的线程依然处于等待状态,对于处于等待状态的线程
// 来说,错过了通知信号,这种现象也称为信号丢失。
lock.notifyAll(); // 唤醒所以线程
}
}
static class SubThread extends Thread{
private Object lock; // 定义实例变量作为锁对象
public SubThread (Object lock) {
this.lock = lock;
}
@Override
public void run() {
synchronized (lock){
try {
System.out.println(Thread.currentThread().getName() +"--begin wait ---");
lock.wait();
System.out.println(Thread.currentThread().getName() + "--end wait --");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
运行结果:
4.1.6 wait(long)的使用
wait(long)带有long类型参数的wait()等待,如果在参数指定的时间内没有被唤醒,超时后自动唤醒。
/**
* wait(long)
*/
public class Test07 {
public static void main(String[] args) {
final Object obj = new Object();
new Thread(new Runnable() {
@Override
public void run() {
synchronized (obj){
try {
System.out.println("thread begin wait");
obj.wait(5000); // 如果5000毫秒内没有被唤醒,会自动唤醒
System.out.println("end wait");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
}
}
运行结果
4.1.7 notify通知过早
线程wait()等待后,可以调用notify()唤醒线程,如果notify()唤醒的过早,在等待之前就调用了notify() 可能打乱程序正常的执行逻辑。
通知过早实例:
/**
* notify()通知过早
*/
public class Test08 {
public static void main(String[] args) {
final Object lock = new Object();// 定义对象作为锁对象
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
synchronized (lock) {
try {
System.out.println("begin wait");
lock.wait();
System.out.println("wait end。。。");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
synchronized (lock) {
System.out.println("begin notify");
lock.notify();
System.out.println("end notify");
}
}
});
// 如果先开启t1 ,再开启t2线程,大多数情况下,t1先等待,t2在把t1唤醒
// t1.start();
// t2.start();
// 如果先开启t2 通知线程,再开启t1等待线程,可能会出现t1线程等待没有收到通知的情况。
t2.start();
t1.start();
}
}
实际上,调用start()就是高速线程调度器,当前线程准备就绪,
线程调度器在什么时候开启这个线程不确定,即调用start()方法的顺序,并不一定就是线程开启的顺序
在当前实例中,t1线程等待后,再让t2线程唤醒,如果t2线程先唤醒了,就不让t1线程等待了。
解决通知过早实例:
package com.company.wait;
/**
* notify()通知过早,就不让线程等待了
* // 实际上,调用start()就是高速线程调度器,当前线程准备就绪,
* // 线程调度器在什么时候开启这个线程不确定,即调用start()方法的顺序,并不一定就是线程开启的顺序
* // 在当前实例中,t1线程等待后,再让t2线程唤醒,如果t2线程先唤醒了,就不让t1线程等待了。
*/
public class Test09 {
static boolean isFirst = true; // 定义静态变量作为是否第一个运行线程的标志
public static void main(String[] args) {
final Object lock = new Object();// 定义对象作为锁对象
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
synchronized (lock) {
while (isFirst){ // 当线程是第一个开启的线程就等待
try {
System.out.println("begin wait");
lock.wait();
System.out.println("wait end。。。");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
});
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
synchronized (lock) {
System.out.println("begin notify");
lock.notify();
System.out.println("end notify");
isFirst = false; // 通知后,就把第一个线程标志修改为false
}
}
});
// 如果先开启t1 ,再开启t2线程,大多数情况下,t1先等待,t2在把t1唤醒
// t1.start();
// t2.start();
// 如果先开启t2 通知线程,再开启t1等待线程,可能会出现t1线程等待没有收到通知的情况。
t2.start();
t1.start();
}
}
4.1.8 wait 等待条件发生了变化
在使用wait/nodify 模式时,注意wait条件发生了变化,也可能会造成程序逻辑的混乱。
package com.company.wait;
import java.util.ArrayList;
import java.util.List;
/**
* wait 条件发生变化
* 定义一个集合,
* 定义一个线程向集合中添加数据,添加完数据后,通知另外一个线程从集合中取数据。
* 定义一个线程从集合中取数据,如果集合中没有数据,就等待
*/
public class Test10 {
public static void main(String[] args) {
// 定义添加数据的线程对象
ThreadAdd threadAdd = new ThreadAdd();
// 定义取数据的线程对象
ThreadSubTract threadSubTract = new ThreadSubTract();
threadSubTract.setName("subtract 1");
// 测试1: 先开启添加数据的线程,再开启一个取数据的线程,大多数情况下会正常取数据
// threadAdd.start();
// threadSubTract.start();
// 测试2: 先开启取数据的线程,在开启添加数据的线程, 取数据的线程会先等待,等到数据添加之后,再取数据
// threadSubTract.start();
// threadAdd.start();
// 测试3: 开启两个取数据的线程,在开启添加数据的线程
ThreadSubTract threadSubTract2 = new ThreadSubTract();
threadSubTract2.setName("subtract 2");
threadSubTract.start();
threadSubTract2.start();
threadAdd.start();
/*
测试3运行结果:
subtract 1begin wait...
subtract 2begin wait...
subtract 2end wait...
subtract 2从集合中去了data后,集合中的数据:0
subtract 1end wait...
Exception in thread "subtract 1" java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
分析可能执行顺序:
threadSubTract 线程先启动,取数据时,集合中没有数据,wait()等待;
threadAdd 线程获得cpu执行权,添加数据,把threadSubTract 线程唤醒;
threadSubTract2 线程开启后获得CPU执行权,正常取数据;
threadSubTract 获得CPU执行权,打印 end wait..,然后在执行list.remove(0)取数据时,现在list集合中已经没有数据了。
这时会产生Exception in thread "subtract 1" java.lang.IndexOutOfBoundsException: Index: 0, Size: 0异常
出现异常的原因,向list集合中添加了1个数据,remove()了两次。
如何解决:
当等待的线程被唤醒后,在判断集合中是否有数据可取,即需要把sutrct()方法中的if判断改为while
*/
}
// 1 定义list 集合
static List list = new ArrayList<>();
// 2 定义方法从集合中取数据
public static void subtract() {
synchronized (list) {
// if (list.size() == 0) {
while (list.size() ==0){
try {
System.out.println(Thread.currentThread().getName() +"begin wait...");
list.wait(); // 等待
System.out.println(Thread.currentThread().getName() + "end wait...");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Object data = list.remove(0); // 从集合中取出一个数据
System.out.println(Thread.currentThread().getName() +"从集合中去了"+ data + "后,集合中的数据:" + list.size() );
}
}
// 3 定义方法向集合中添加数据后,通知等待的线程取数据,
public static void add(){
synchronized (list) {
list.add("data");
System.out.println(Thread.currentThread().getName() +"存储了一个数据");
list.notifyAll();
}
}
// 4) 定义线程类,add()取数据的方法
static class ThreadAdd extends Thread {
@Override
public void run() {
add();
}
}
// 定义线程类,调用subtract()取数据的方法
static class ThreadSubTract extends Thread{
@Override
public void run() {
subtract();
}
}
}
4.1.9 生产者/消费者模式
在java中,负责产生数据的模块是生产者,负责使用数据的模块是消费者。生产者消费者解决数据的平衡问题,即现有数据然后才能使用,没用数据时消费者需要等待。
- 生产-消:操作值
代码示例:
package com.company.producerdata;
/**
* 定义一个操作数据的类
*/
public class ValueOP {
private String value = "";
// 定义方法修改value的值
public void setValue() {
// 如果 Value的值不是空串就等待
synchronized (this){
while (!value.equalsIgnoreCase("")) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 如果value的值是空串,就设置value字段的值
String value = System.currentTimeMillis() + "--" + System.nanoTime();
System.out.println("set 设置的值是" + value);
this.value = value;
// this.notify(); // 在多生产多消费者环境中,notify()不能保证是生产者唤醒消费者,如果生产者唤醒的还是生产者的可能会出现假死的情况
this.notifyAll();
}
}
//
public void getValue() {
synchronized (this) {
// 如果value 是空串就等待
while (value.equalsIgnoreCase("")) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 不是空串,就读取字段值
System.out.println("get的值" + this.value);
this.value = "";
// this.notify(); // 在多生产多消费者环境中,notify()不能保证是生产者唤醒消费者,如果生产者唤醒的还是生产者的可能会出现假死的情况
this.notifyAll();
}
}
}
/**
* 定义线程类,模拟生产者
*/
public class ProducerThread extends Thread{
// 生产者生产数据就是调用ValueOP 类的setValue 方法给value字段。
private ValueOP valueOP;
ProducerThread(ValueOP valueOP) {
this.valueOP = valueOP;
}
@Override
public void run() {
while (true) {
valueOP.setValue();
}
}
}
/**
* 定义线程类,模拟消费者
*/
public class ConsumerThread extends Thread{
// 生产者使用数据就是使用ValueOP 的value字段值。
private ValueOP valueOP;
ConsumerThread(ValueOP valueOP) {
this.valueOP = valueOP;
}
@Override
public void run() {
while (true) {
valueOP.getValue();
}
}
}
测试 1生产1 消费
/**
* 操作一生产,一消费的情况
*/
public class Test {
public static void main(String[] args) {
ValueOP valueOP = new ValueOP();
ProducerThread producerThread = new ProducerThread(valueOP);
ConsumerThread consumerThread = new ConsumerThread(valueOP);
producerThread.start();
consumerThread.start();
// 生产与消费交替进行
}
}
测试多生产多消费
package com.company.producerdata;
/**
* 操作多生产,多消费的情况
*/
public class Test2 {
public static void main(String[] args) {
ValueOP valueOP = new ValueOP();
ProducerThread p1 = new ProducerThread(valueOP);
ProducerThread p2 = new ProducerThread(valueOP);
ProducerThread p3 = new ProducerThread(valueOP);
ConsumerThread c1 = new ConsumerThread(valueOP);
ConsumerThread c2= new ConsumerThread(valueOP);
ConsumerThread c3 = new ConsumerThread(valueOP);
p1.start();
p2.start();
p3.start();
c1.start();
c2.start();
c3.start();
// 生产与消费交替进行
}
}
运行结果:
- 操作栈
使生产者把数据存储到List集合中,消费者从list集合中取数据,使用list集合模拟栈。
实例:
/**
* 模拟栈
*/
public class MyStack {
private List list = new ArrayList(); // 定义集合模拟栈
private static final int MAX = 1; // 集合的最大容量
// 定义方法模拟入参
public synchronized void push() {
// 当栈中的数据已满,就等待
while (list.size() >= MAX) {
System.out.println(Thread.currentThread().getName() + "begin wait ...");
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
String data = "data--" + Math.random();
System.out.println(Thread.currentThread().getName() + "添加了数据" +data);
list.add(data);
// this.notify(); // 当多个生产多个消费者时,可能会出现假死现象,调用notifyAll 解决假死现象。
this.notifyAll();
}
// 定义方法模拟出栈
public synchronized void pop() {
// 如果没有数据就等待
while (list.size() == 0) {
try {
System.out.println(Thread.currentThread().getName() + "begin wait...");
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + "出栈数据" + list.remove(0));
this.notifyAll();
}
}
/**
* 生产者线程
*/
public class ProduerThread extends Thread {
private MyStack myStack;
public ProduerThread(MyStack myStack){
this.myStack = myStack;
}
@Override
public void run() {
while (true) {
myStack.push();
}
}
}
/**
* 消费者线程
*/
public class ConsumerThread extends Thread {
private MyStack myStack;
public ConsumerThread(MyStack myStack){
this.myStack = myStack;
}
@Override
public void run() {
while (true) {
myStack.pop();
}
}
}
/**
* 测试一生产一消费
*/
public class Test {
public static void main(String[] args) {
MyStack myStack = new MyStack();
ProduerThread p = new ProduerThread(myStack);
ConsumerThread c = new ConsumerThread(myStack);
c.start();
p.start();
/**
* 运行结果是两个线程交替执行,一个线程负责生产,通知另外一个线程负责消费
*/
}
}
/**
* 测试多生产多消费
*/
public class Test02 {
public static void main(String[] args) {
MyStack myStack = new MyStack();
ProduerThread p1 = new ProduerThread(myStack);
ProduerThread p2 = new ProduerThread(myStack);
ProduerThread p3 = new ProduerThread(myStack);
ConsumerThread c1 = new ConsumerThread(myStack);
ConsumerThread c2 = new ConsumerThread(myStack);
ConsumerThread c3 = new ConsumerThread(myStack);
p1.setName("生产者1号");
p2.setName("生产者2号");
p3.setName("生产者3号");
p1.start();
p2.start();
p3.start();
c1.setName("消费者1号");
c1.setName("消费者2号");
c1.setName("消费者3号");
c1.start();
c2.start();
c3.start();
/**
* 运行结果是两个线程交替执行,一个线程负责生产,通知另外一个线程负责消费
*/
}
}
4.2 通过管道实现线程间的通信
在Java.io 包中的PipeSteam 管道流用于在线程之间传送数据,一个线程发送数据到输出管道,另外一个线程从输入管道中读取数据,相关的类包括: PipedInportStream 和Piped OutPut Stream,pipedReader 和PopedWriter
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
/**
* 使用PipedStream 和PipedOutPutStream 管道字节流在线程之间传递数据
*/
public class Test {
public static void main(String[] args) {//
// 定义管道字节流
PipedInputStream inputStream = new PipedInputStream();
PipedOutputStream outputStream = new PipedOutputStream();
// 在输入管道流与输出管道流之间建立连接
try {
inputStream.connect(outputStream);
} catch (IOException e) {
e.printStackTrace();
}
// 创建线程向管道流中写入数据
new Thread(new Runnable() {
@Override
public void run() {
writeData(outputStream);
}
}).start();
// 定义一个线程从管道中读取数据
new Thread(new Runnable() {
@Override
public void run() {
readData(inputStream);
}
}).start();
}
// 定义一个方法想管道流中写入数据
public static void writeData(PipedOutputStream outputStream) {
try {
for (int i = 0; i < 100; i++) {
String data = ""+ i;
outputStream.write(data.getBytes());// 把字节数据写入到输出管道流中
}
outputStream.close(); // 关闭管道流
} catch (Exception e) {
e.printStackTrace();
}
}
// 定义一个方法从管道流中读取数据
public static void readData(PipedInputStream inputStream){
byte[] bytes = new byte[1024];
try {
// 从管道输入流中读取字节保存到字节数组中,
int len = inputStream.read(bytes); // 返回读到的字节数,如果没有读到任何数据返回-1
while (len != -1) {
// 把 byte 数组中从0开始将道德len个字节转换为字符串打印
System.out.println(new String (bytes, 0,len));
len = inputStream.read(bytes); // 继续从管道中读取数据。
}
} catch (IOException e) {
e.printStackTrace();
}
}
}