前提条件:生产者生产包子,但是包子总数不得超过10个,消费者有包子就购买
第一种:通过wait()和notify()来实现
在这个实例中,生产者和消费者通过wait()和notify()来进行通讯,再通过synchronized 块来确保线程安全,当包子数量达到最大容量时生产者线程会进入等待状态,直到有消费者购买包子释放空间。当包子数量为 0 时消费者线程会进入等待状态,直到有生产者生产包子。
public class ThreadTest {
public static void main(String[] args) {
Bakery bakery = new Bakery();
Thread producerThread = new Thread(() -> {
try {
while (true) {
bakery.produce();
Thread.sleep(1000); // 模拟生产时间
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Thread consumerThread = new Thread(() -> {
try {
while (true) {
bakery.consume();
Thread.sleep(2000); // 模拟购买时间
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
producerThread.start();
consumerThread.start();
}
}
class Bakery {
private int numOfBuns = 0;
private final int capacity = 10;
public synchronized void produce() throws InterruptedException {
while (numOfBuns == capacity) {
wait(); // 等待直到有消费者买走包子,释放空间
}
numOfBuns++;
System.out.println("包子数量+++++++++++++++++" + numOfBuns);
notify(); // 通知等待的消费者可以买包子了
}
public synchronized void consume() throws InterruptedException {
while (numOfBuns == 0) {
wait(); // 等待直到有包子可买
}
numOfBuns--;
System.out.println("包子数量—————————————————" + numOfBuns);
notify(); // 通知等待的生产者可以继续生产包子了
}
}
运行一下
可以看到因为生产速度比购买速度快,包子最后一直维持在10个
第二种:通过BlockingQueue来实现
在这个示例中,BlockingQueue 可以自动处理线程安全和阻塞的问题,生产者调用 put() 方法向队列中放入包子,如果队列已满则会阻塞。消费者调用 take() 方法从队列中取出包子,如果队列为空则会阻塞。
这种方式简化了代码,提高了可读性和可维护性。
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class ThreadTest {
public static void main(String[] args) {
Bakery bakery = new Bakery();
Thread producerThread = new Thread(() -> {
try {
while (true) {
bakery.produce();
Thread.sleep(1000); // 模拟生产时间
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Thread consumerThread = new Thread(() -> {
try {
while (true) {
bakery.consume();
Thread.sleep(2000); // 模拟购买时间
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
producerThread.start();
consumerThread.start();
}
}
class Bakery {
private BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
public void produce() throws InterruptedException {
queue.put(1); // 生产一个包子,如果队列已满,则阻塞
System.out.println("包子数量+++++++++++++++++" + queue.size());
}
public void consume() throws InterruptedException {
queue.take(); // 消费一个包子,如果队列为空,则阻塞
System.out.println("包子数量—————————————————" + queue.size());
}
}
运行一下
可以发现第一个数据有些奇怪,这是因为生产者生产了一个包子放入队列马上就被消费者取走了,这个过程发生在System.out.println之前,所以打印出来的包子数量变成了0,最后包子数量还是维持在10个,然后买一个生产一个
第三种:通过 Object 类型的锁对象来实现
在这个示例中,使用一个 Object 类型的锁对象 lock 来进行同步。生产者在生产包子时获取 lock 锁,如果缓冲区已满,则调用 wait() 方法进入等待状态,直到有消费者购买了包子并释放了锁。消费者在购买包子时也获取 lock 锁,如果缓冲区为空,则调用 wait() 方法进入等待状态,直到有生产者生产了包子并释放了锁。当生产者生产了包子或消费者购买了包子后,调用 notify() 方法通知等待中的线程。
public class ThreadTest {
public static void main(String[] args) {
Bakery bakery = new Bakery();
Thread producerThread = new Thread(() -> {
try {
while (true) {
bakery.produce();
Thread.sleep(1000); // 模拟生产时间
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Thread consumerThread = new Thread(() -> {
try {
while (true) {
bakery.consume();
Thread.sleep(2000); // 模拟购买时间
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
producerThread.start();
consumerThread.start();
}
}
class Bakery {
private final Object lock = new Object();
private int numOfBuns = 0;
private final int capacity = 10;
public void produce() throws InterruptedException {
synchronized (lock) {
while (numOfBuns == capacity) {
lock.wait(); // 缓冲区已满,生产者等待
}
numOfBuns++;
System.out.println("包子数量+++++++++++++++++" + numOfBuns);
lock.notify(); // 通知消费者可以购买包子
}
}
public void consume() throws InterruptedException {
synchronized (lock) {
while (numOfBuns == 0) {
lock.wait(); // 缓冲区为空,消费者等待
}
numOfBuns--;
System.out.println("包子数量—————————————————" + numOfBuns);
lock.notify(); // 通知生产者可以继续生产包子
}
}
}
运行一下
数据和第一种方式差不多
第四种:通过Semaphore来实现
在这个示例中,使用信号量Semaphore来控制包子的数量,初始设置了10个许可证。生产者在生产包子时先获取 spaceAvailable的许可证,然后生产一个包子,最多可以获取10个,消费者在购买包子时先释放 spaceAvailable的许可证,然后消费一个包子。
import java.util.concurrent.Semaphore;
public class ThreadTest {
public static void main(String[] args) {
Bakery bakery = new Bakery();
Thread producerThread = new Thread(() -> {
try {
while (true) {
bakery.produce();
Thread.sleep(1000); // 模拟生产时间
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Thread consumerThread = new Thread(() -> {
try {
while (true) {
bakery.consume();
Thread.sleep(2000); // 模拟购买时间
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
producerThread.start();
consumerThread.start();
}
}
class Bakery {
private Semaphore spaceAvailable = new Semaphore(10); // 初始为 10,表示有 10 个空位
private int numOfBuns = 0;
public void produce() throws InterruptedException {
spaceAvailable.acquire(); // 获取一个空位的信号量
numOfBuns++;
System.out.println("包子数量+++++++++++++++++" + numOfBuns);
}
public void consume() throws InterruptedException {
spaceAvailable.release(); // 释放一个空位的信号量
numOfBuns--;
System.out.println("包子数量—————————————————" + numOfBuns);
}
}
运行一下
和第一种方式差不多