—— 24.10.8
一、问题提出
目前队列存在的问题
1.很多场景要求分离生产者、消费者两个角色、它们需要由不同的线程来担当,而之前的实现根本没有考虑线程安全问题
2.poll方法,队列为空,那么在之前的实现里会返回null,如果就是硬要拿到一个元素呢?以现在的实现只能不断循环尝试
3.offer方法,队列为满,那么在之前的实现里会返回false,如果就是硬要塞入一个元素呢?以现在的实现只能不断循环尝试
4.指令交错,多个线程会造成混乱效果
二、解决方法
为解决线程不安全问题,需要给线程加锁,使线程局部阻塞
用条件变量让 poll 或 offer 线程进入等待 状态,而不是不断循环尝试,让CPU空转
三、单锁实现
Java中两种锁的选择
① synchronized:关键字,功能少
② ReentrantLock:可重入锁,功能丰富
lock() 加锁
unlock() 解锁
lockInterruptibly() 加锁(可在阻塞时打断,提前唤醒)
offer方法实现
if判断
问题
从 tailWaits 中唤醒的线程,会与新来的 offer 的线程争抢锁,谁能抢到是不一定的,如果后者先抢到,就会导致条件又发生变化
这种情况称之为虚假唤醒,唤醒后应该重新检查条件,看是不是得重新进入等待
public void offer(String e) throws InterruptedException {
// 加锁,可重入锁阻塞时可打断方法(可被强制唤醒)
lock.lockInterruptibly();
try {
// 判断是否为满
if (isFull()){
// 队列满时,使offer线程阻塞,直到poll线程取走后,有位置时再恢复运行
// tail.signal() 唤醒线程
tailWaits.await();
}
array[tail] = e;
if (++tail == array.length){
tail = 0;
}
size++;
}finally {
// 解锁
lock.unlock();
}
}
import java.util.Arrays;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class TestThreadUnsafe {
private final String[] array = new String[10];
// 尾指针
private int tail = 0;
// 元素个数
private int size = 0;
// 创建一个可重入锁对象
ReentrantLock lock = new ReentrantLock();
// 条件变量对象(集合线程)
Condition tailWaits = lock.newCondition();
public void offer(String e) throws InterruptedException {
// 加锁,可重入锁阻塞时可打断方法(可被强制唤醒)
lock.lockInterruptibly();
try {
// 判断是否为满
if (isFull()){
// 队列满时,使offer线程阻塞,直到poll线程取走后,有位置时再恢复运行
// tail.signal() 唤醒线程
tailWaits.await();
}
array[tail] = e;
if (++tail == array.length){
tail = 0;
}
size++;
}finally {
// 解锁
lock.unlock();
}
}
public void poll(String e) throws InterruptedException {
}
@Override
public String toString() {
return Arrays.toString(array);
}
private boolean isFull(){
return size == array.length;
}
private boolean isEmpty(){
return size == 0;
}
public static void main(String[] args) throws InterruptedException{
TestThreadUnsafe queue = new TestThreadUnsafe();
for (int i = 0; i < 10; i++) {
queue.offer("e"+i);
}
new Thread(()->{
try {
System.out.println(Thread.currentThread().getName()+"添加元素之前");
queue.offer("e10");
System.out.println(Thread.currentThread().getName()+"添加元素成功");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
},"t1").start();
new Thread(()->{
System.out.println("开始唤醒");
try{
queue.lock.lockInterruptibly();
queue.tailWaits.signal();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}finally {
queue.lock.unlock();
}
},"t2").start();
}
}
while判断
解决了虚假唤醒的问题
@Override
public void offer(E e) throws InterruptedException { // poll 等待队列非空
lock.lockInterruptibly();
try{
while (isFull()){
// 放在条件变量等待
tailWaits.await();
}
array[tail] = e;
if (++tail == array.length){
tail = 0;
}
size++;
// 唤醒等待线程
headWaits.signal();
}finally {
lock.unlock();
}
}
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class BlockingQueue1<E> implements BlockingQueue<E> {
private final E[] array;
private int head;
private int tail;
private int size;
// 根据容量创造一个数组
public BlockingQueue1(int capacity) {
array = (E[]) new Object[capacity];
}
// 加可重入锁
private ReentrantLock lock = new ReentrantLock();
// 配合poll方法条件变量,在队列头部删除
private Condition headWaits = lock.newCondition();
// 配合offer方法条件变量,在队列尾部加入
private Condition tailWaits = lock.newCondition();
// 判空
private boolean isEmpty(){
return head == tail;
}
// 判满
private boolean isFull(){
return size == array.length;
}
@Override
public String toString() {
return "array=" + Arrays.toString(array);
}
@Override
public void offer(E e) throws InterruptedException { // poll 等待队列非空
lock.lockInterruptibly();
try{
while (isFull()){
// 放在条件变量等待
tailWaits.await();
}
array[tail] = e;
if (++tail == array.length){
tail = 0;
}
size++;
// 唤醒等待线程
headWaits.signal();
}finally {
lock.unlock();
}
}
@Override
public boolean offer(E e, long timeout) throws InterruptedException {
lock.lockInterruptibly();
try{
// 将毫秒时间转换为纳秒时间
long t = TimeUnit.MILLISECONDS.toNanos(timeout);
while (isFull()){
if (t<=0){
return false;
}
// 最多等待多少纳秒
tailWaits.awaitNanos(t);
}
array[tail] = e;
if (++tail == array.length){
tail = 0;
}
size++;
// 唤醒等待线程
headWaits.signal();
return true;
}finally {
lock.unlock();
}
}
@Override
public E poll() throws InterruptedException {
lock.lockInterruptibly();
try {
while (isEmpty()){
headWaits.await();
}
E e = array[head];
array[head] = null;
if (++head == array.length){
head = 0;
}
size--;
tailWaits.signal();
return e;
}finally {
lock.unlock();
}
}
}
main函数
public class TestBlockingQueue1 {
public static void main(String[] args) throws InterruptedException {
BlockingQueue1<String> queue = new BlockingQueue1<>(3);
Thread t1 = new Thread(() -> {
try {
System.out.println(System.currentTimeMillis()+" begin ");
queue.offer("任务1");
System.out.println(queue);
queue.offer("任务2");
System.out.println(queue);
queue.offer("任务3");
System.out.println(queue);
queue.offer("任务4",5000);
System.out.println(queue);
System.out.println(System.currentTimeMillis() + " end ");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
},"生产者");
t1.start();
Thread.sleep(2000);
queue.poll();
}
}
四、双锁实现
单锁问题:
单锁实现的缺陷:两个线程用了同一把锁,一个执行时,另一个就需阻塞,而offer方法添加元素和poll方法取走元素使用了同一把锁,这样两个线程不能同时执行,两方法相互阻塞
解决方法:
offer方法主要操作尾指针,poll方法主要操作头指针,将offer方法和poll方法分别添加一个锁,用两把锁分别保护头指针和尾指针,从而分别保护offer和poll两个方法
代码实现
import java.util.Arrays;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class BlockingQueue2<E> implements BlockingQueue<E> {
private final E[] array;
private int head;
private int tail;
private int size;
// tailLock给offer方法入队用,Condition分别创建两个等待的条件变量
private ReentrantLock tailLock = new ReentrantLock();
private Condition notEmpty = tailLock.newCondition();
// headLock给poll方法出队用
private ReentrantLock headLock = new ReentrantLock();
private Condition notFull = headLock.newCondition();
public BlockingQueue2(int capacity) {
this.array = (E[]) new Object[capacity];
}
// 判空
private boolean isEmpty(){
return size == 0;
}
// 判满
private boolean isFull(){
return size == array.length;
}
@Override
public String toString() {
return "array=" + Arrays.toString(array);
}
@Override
public void offer(E e) throws InterruptedException {
// 加锁
tailLock.lockInterruptibly();
try{
while (isFull()) {
notEmpty.await();
}
array[tail] = e;
if (++tail == array.length) {
tail = 0;
}
size++;
}finally {
tailLock.unlock();
}
}
@Override
public boolean offer(E e, long timeout) throws InterruptedException {
return false;
}
@Override
public E poll() throws InterruptedException {
// 加锁
headLock.lockInterruptibly();
try{
while (isEmpty()){
notEmpty.await();
}
E e = array[head];
array[head] = null;
if (++head == array.length) {
head = 0;
}
size--;
return e;
}finally {
headLock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
BlockingQueue2<String> queue = new BlockingQueue2<>(3);
queue.offer("任务1");
new Thread(()->{
try {
queue.offer("任务2");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
},"offer").start();
new Thread(()->{
try {
System.out.println(queue.poll());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
},"poll").start();
}
}
size自增/自减问题
size的自增自减不能保障安全,size自增自减在多个线程同时执行时可能遇到冲突
解决方法
用原子变量AtomicInteger类型保证安全
getAndIncrement 自增方法,能保证线程安全
getAndDecrement 自减方法,能保证线程安全