文章目录
- Synchronized
- synchronized解决
- Lock锁
- synchronized锁与Lock锁的区别
- 生成者消费者问题
- synchronized实现
- lock版实现- condition
- condition实现精准通知唤醒
- Callable
- JUC常用辅助类
- CountDownLatch(倒计时器)
- CyclicBarrier(循环栅栏)
- Semaphore 信号量 - 允许多个线程同时访问
- ReadWriteLock读写锁
- BlockingQueue阻塞队列
- SynchronousQueue同步队列
- 四大函数接口
- 功能 / 函数型
- 断言型
- 消费型
- 供给型
- 使用案例-Stream流式编程
- TreadPoolExecutor
- ForkJoin
Synchronized
1、并发就是多线程操作同一个资源。
2、在Java中,线程就是一个单独的资源类,没有任何附属操作。资源中包含并发操作的属性、方法。
并发案例——多线程买票的例子:
在我的例子中,公共资源类为Ticket,left为剩余票数,cnt是记录的卖出的票数,sale()方法在有余票时,打印谁买了一张票,并显示当前余票和共卖出的票。
class Ticket {
int left;
int cnt = 0;
public int getLeft() {
return left;
}
public void setLeft(int left) {
this.left = left;
}
public Ticket(int n) {
this.left = n;
}
public void sale() {
try {
Thread.sleep(100); // 模拟卖票耗时
} catch (InterruptedException e) {
e.printStackTrace();
}
if (left > 0) {
++cnt;
--left;
System.out.println("线程:" + Thread.currentThread().getName() + "剩余:" + left + ",共卖出:" + cnt);
}
}
}
在main函数中,设定票数,买票的人数,每人买票的机会尝试数:
public static void main(String[] args) {
int ticketNum = 8, people = 4, chance = 5;
Ticket t = new Ticket(ticketNum);
System.out.println("开售前:---------" + t.getLeft());
for (int i = 0; i < people; ++i) {
new Thread(() -> {
for (int j = 0; j < chance; ++j) {
t.sale();
}
}, Integer.toString(i)).start();
}
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
结果:
synchronized解决
synchronized实现同步的基础:Java中的每一个对象都可以作为锁。具体表现为以下3种形式:
1. 对于普通同步方法,锁是当前实例对象。
2. 对于静态同步方法,锁是当前类的Class对象。
3. 对于同步方法块,锁是Synchonized括号里配置的对象。
直接让sale()变成同步方法,即每个线程访问sale()方法时,会给当前方法所在实例(也就公共资源)对象加锁,那么一次只能有一个线程能够操作,其他线程必须等待:
Lock锁
lock锁使用三部曲:
- 创建锁: Lock lk = new ReentrantLock();
- 获取锁 :lk.lock
- try - catch - finally,其中try中写需要并发控制的业务逻辑,finally中释放锁,确保异常产生时,锁正常被释放lk.lock()。
class Ticket2 {
int left;
int cnt = 0;
Lock lk = new ReentrantLock();
public int getLeft() {
return left;
}
public void setLeft(int left) {
this.left = left;
}
public Ticket2(int left) {
this.left = left;
}
public void sale() {
lk.lock();
try {
if (left > 0) {
++cnt;
--left;
System.out.println("线程:" + Thread.currentThread().getName() + "剩余:" + left + ",共卖出:" + cnt);
}
}
catch (Exception e) {
e.printStackTrace();
}finally {
lk.unlock();
}
}
}
ReentrantLock也能保障并发安全。
synchronized锁与Lock锁的区别
- synchronized是Java内置的关键字而Lock锁是一个Java类。
- synchronized无法获取锁的状态,Lock可以判断是否获取到了锁。
- synchronized会自动释放锁,lock必须手动锁,如果不释放锁,会发生死锁。
- synchronized,没有获取到锁的线程会一直等待,Lock锁,则有尝试获取锁的机制,不一定会一直等待。
- synchronized 可重入,非公平;Lock锁,可重入,可以设置公平锁,即一个是内置关键字不可修改,一个是自己定制。
- synchronized适合锁少量的代码同步问题,Lock适合锁大量的同步代码。
生成者消费者问题
synchronized实现
线程 A、B操作同一个变量num,A让num + 1,
B让num - 1,两个交替使用。
这里A操作完了,需要通知B,B操作完了需要通知A,从而实现线程的同步,相当于A生产了后,交给B消费,B消费完毕,再通知A。
完成这种生产-消费模型编程三部曲:
- 等待:当不满足条件时,while循环等待
- 业务:当条件满足后,执行业务。
- 通知:业务完毕后,通知其他线程。
构建一个资源类Data,有一个成员变量num,构建两个同步方法,一个执行+1,一个执行-1,在main方法中,起两个线程A和B,分别尝试操作+1和-1:
class Data {
private int num = 0;
public synchronized void increase() {
while (num != 0) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
++num;
System.out.println(Thread.currentThread().getName() +":->"+ num);
this.notifyAll();
}
public synchronized void decrease() {
while (num != 1) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
--num;
System.out.println(Thread.currentThread().getName() +":->"+ num);
this.notifyAll();
}
}
public static void main(String[] args) {
Data d = new Data();
new Thread(() -> {
for (int i = 0; i < 5; i++) {
d.increase();
}
}, "A").start();
new Thread(() -> {
for (int i = 0; i < 5; i++) {
d.decrease();
}
}, "B").start();
}
}
lock版实现- condition
可以利用lock构建condition条件变量,condition提供await()方法和signal()和signalAll()方法,类似wait(),notify()和notifyAll;
要点步骤:
- 1、创建ReentrantLock() lk,并获取lk的condition
- 2、加锁:lk.lock()
- 3、try - catch - final
- try中编写业务逻辑:
- 等待:当不满足条件时,while循环等待:condition.await();
- 业务:当条件满足后,执行业务
- 通知:业务完毕后,通知其他线程:condition.signalAll();
- final中释放锁:lk.unlock();
- try中编写业务逻辑:
class Data2 {
private int num = 0;
Lock lk = new ReentrantLock();
Condition condition = lk.newCondition();
public void increase() {
lk.lock();
try {
while (num != 0) condition.await();
++num;
System.out.println(Thread.currentThread().getName() +":->"+ num);
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lk.unlock();
}
}
public void decrease() {
lk.lock();
try {
while (num != 1) condition.await();
--num;
System.out.println(Thread.currentThread().getName() +":->"+ num);
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lk.unlock();
}
}
}
condition实现精准通知唤醒
Callable
用于有返回值的异步请求,获取结果。
第一步,构建自己的Callable对象,实现Callable接口,它需要一个泛型参数,标识期望返回的结果类型。
class MyCall implements Callable<Integer> {
int a, b;
public MyCall(int a, int b) {
this.a = a;
this.b = b;
}
@Override
public Integer call() throws Exception {
TimeUnit.SECONDS.sleep(2);
return a + b;
}
}
Callable对象与FutureTask包装使用,即封装为一个未来将要执行的任务。
MyCall call = new MyCall(3, 4);
FutureTask future = new FutureTask(call);
FutureTask 中实现了RunnableFuture复合接口,即有Runnable的实现,因此可以放入Thead中启动:
new Thread(future).start();
Integer a = 0;
try {
a = (Integer) future.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
System.out.println(a.toString());
这个 future.get()会阻塞!
JUC常用辅助类
CountDownLatch(倒计时器)
CountDownLatch允许count个线程阻塞在一个地方,直至所有线程的任务都执行完毕。
模拟一个场景,教室中有6个学生,所有学生都离开后才可以关门!
public static void main(String[] args) {
// 1、统计num个线程的倒计时器
int num = 6;
CountDownLatch cn = new CountDownLatch(num);
for (int i = 0; i < num; ++i) {
new Thread(()->{
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "离开");
// 2、线程结束前,倒数一下
cn.countDown();
}, String.valueOf(i)).start();
}
try {
// 3、等待所有线程结束
cn.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("关门!");
}
CyclicBarrier(循环栅栏)
CyclicBarrier和CountDownLatch非常类似,它也可以实现线程间的技术等待,它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。
现在的场景反过来,假设教师前到达的人数达到指定个数,才允许开门:
第一步:创建CyclicBarrier,指定满足的线程数和所有线程都到达后要执行的runnable对象。
CyclicBarrier cb = new CyclicBarrier(num, () -> {
System.out.println("开门!");
});
第二步:每个线程执行结束前,使用cb.await();
等待其他线程同步。
public static void main(String[] args) {
// 1、等待的人数到达num后,才开门!
int num = 6;
CyclicBarrier cb = new CyclicBarrier(num, () -> {
System.out.println("开门!");
});
for (int i = 0; i < num; ++i) {
new Thread(()->{
System.out.println(Thread.currentThread().getName() + "到达");
try {
// 2、线程结束前,需等待其他线程同步
cb.await();
} catch (Exception e) {
e.printStackTrace();
}
}, String.valueOf(i)).start();
}
}
Semaphore 信号量 - 允许多个线程同时访问
synchronized 和 ReentrantLock 都是一次只允许一个线程访问某个资源,Semaphore(信号量)可以指定多个线程同时访问某个资源。
Semaphore有两种模式:公平模式和非公平模式。
- 公平模式:调用acquire的顺序就是获取许可证的顺序,遵循FIFO
- 非公平模式:抢占式
构造方法:
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
这两个构造方法,都必须提供许可的数量,第二个构造方法可以指定是公平模式还是非公平模式,默认非公平模式。
最常用的场景就是在资源有限的情况下,只允许指定数量的线程,同时访问某个资源,比如模拟一个抢车位的场景:
第一步:
模拟资源情况:num个车位,而用户有10个
int num = 3, total = 6;
Semaphore semaphore = new Semaphore(num);
第二步:
try -catch -final :
try:semaphore.acquire(); // 获取到资源
finally:semaphore.release(); // 释放资源
public static void main(String[] args) {
// 1、num个车位,而用户有total 个
int num = 3, total = 6;
Semaphore sm = new Semaphore(num);
for (int i = 0; i < total; ++i) {
new Thread(()->{
try {
sm.acquire();
System.out.println(Thread.currentThread().getName() + "抢到车位");
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName() + "离开车位");
} catch (Exception e) {
e.printStackTrace();
} finally {
sm.release();
}
}, String.valueOf(i)).start();
}
}
同一时刻只能有3个用户占用车位。
ReadWriteLock读写锁
利用读写锁实现自定义缓存,写的时候只允许有一个操作:
第一步:定义读写锁:
private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
第二步:定义读操作函数:
读前加读锁
readWriteLock.readLock().lock();
读完释放锁
第三步:定义写操作函数()
写时加写锁,写完释放锁。
class MyCache {
private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private Map<String, Object> mp = new HashMap<>();
public void put(String s, Object o) {
readWriteLock.writeLock().lock();
try {
mp.put(s, o);
System.out.println(Thread.currentThread().getName() + "插入:" + s);
} catch (Exception exception) {
exception.printStackTrace();
} finally {
readWriteLock.writeLock().unlock();
}
}
public Object get(String s) {
Object ans = null;
readWriteLock.readLock().lock();
try {
ans = mp.get(s);
System.out.println(Thread.currentThread().getName() + "查询:" + s);
} catch (Exception exception) {
exception.printStackTrace();
} finally {
readWriteLock.readLock().unlock();
}
return ans;
}
}
BlockingQueue阻塞队列
BlockingQueue是FIFO(先进先出)型队列,他很好的解决了多线程中,如何高效安全“传输”数据的问题。
阻塞队列有四种添加元素和移出元素的API:
- 非阻塞,返回布尔值
- 会抛出异常:add()、remove()、element()
- 不会抛出异常:offer()、poll()、peek()
- 阻塞
- 一直阻塞:put()、take()
- 可以设置等待时间,超时返回:offer(e, timeout, unit),poll(timeout, unit)
SynchronousQueue同步队列
SynchronousQueue同步队列不存储元素,只要向里边put一个元素,就需要取出一个元素,take。
SynchronousQueue<String> bq = new SynchronousQueue();
new Thread(() -> {
try {
bq.put("1");
System.out.println(Thread.currentThread().getName());
bq.put("1");
System.out.println(Thread.currentThread().getName());
bq.put("1");
System.out.println(Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "A").start();
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(2);
bq.take();
System.out.println(Thread.currentThread().getName());
bq.take();
System.out.println(Thread.currentThread().getName());
bq.take();
System.out.println(Thread.currentThread().getName());
} catch (Exception e){
e.printStackTrace();
}
}, "B").start();
}
四大函数接口
函数式接口是指只定义了一个抽象方法的接口,或者加了 @FunctionalInterface 注解的接口。可以有默认方法。
四大函数式接口分别是:
- 功能型函数接口(Function):接口输入一个是函数的输入一个是函数的输出
- 消费型函数接口(Consumer):接口输入是函数的输入 函数的返回是一个布尔值
- 供给型函数式接口(Supplier)
- 断言型函数式接口(Predicate)
功能 / 函数型
函数型接口:接口输入一个是函数的输入一个是函数的输出
@FunctionalInterface
public interface Function<T, R> {
/**
* Applies this function to the given argument.
*
* @param t the function argument
* @return the function result
*/
R apply(T t);
// ... 两个默认函数和一个静态函数
}
断言型
判定型接口:接口输入是函数的输入 函数的返回是一个布尔值
public interface Predicate<T> {
/**
* Evaluates this predicate on the given argument.
*
* @param t the input argument
* @return {@code true} if the input argument matches the predicate,
* otherwise {@code false}
*/
boolean test(T t);
// ...三个默认函数和一个静态函数
}
消费型
消费型接口:只有输入,没有输出
@FunctionalInterface
public interface Consumer<T> {
/**
* Performs this operation on the given argument.
*
* @param t the input argument
*/
void accept(T t);
// ...一个默认方法
}
供给型
供给型接口:只有输出,没有输入
@FunctionalInterface
public interface Supplier<T> {
/**
* Gets a result.
*
* @return a result
*/
T get();
}
Consumer consumer = (str) -> {
System.out.println(str);
};
consumer.accept("Happy");
}
使用案例-Stream流式编程
/**
有5个用户,筛选
1、ID 必须是偶数
2、年龄必须大于23
3、用户名转大写字母
4、倒序排序
5、只需要一个用户
**/
public static void main(String[] args) {
List<User> list = new ArrayList<>();
Collections.addAll(list,
new User(0, 22, "lzy"),
new User(1, 20, "blzy"),
new User(2, 25, "azy"),
new User(3, 24, "czy"),
new User(4, 24, "dzy"),
new User(5, 24, "ezy"),
new User(6, 24, "fzy"),
new User(7, 24, "gsy"));
list.stream().filter(e -> {return e.getId() % 2 == 1;})
.filter(e -> {return e.getAge() > 23;})
.map(e -> {return e.getUsername().toUpperCase();})
.sorted(Comparator.reverseOrder())
.limit(1)
.forEach(System.out::println);
}