Java并发编程(七)实践[生产者-消费者]

news2024/11/26 11:13:38

生产者-消费者 

概述

  • 生产者消费者问题,也称有限缓冲问题,是一个多线程同步问题的经典案例。该问题描述了共享固定大小缓冲区的两个线程
  • 在多线程开发中,如果生产者(生产数据的线程)处理速度很快,而消费者(消费数据的线程)处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这种生产消费能力不均衡的问题,便有了生产者和消费者模式

具体实现

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通信。生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取。所以在并发场景下,多线程对临界区资源(即共享资源)的操作时候必须保证在读写中只能存在一个线程,所以需要设计锁的策略

synchronized+wait/notify实现

package com.bierce.multiThread;
import java.time.Instant;
import java.util.LinkedList;
import java.util.Random;
public class TestProducerConsumer1 {
    public static void main(String[] args) throws InterruptedException {
        //创建一个自定义的Sy阻塞队列,其中存储整型的个数为10
        MySnchronizedBlockingQueueS mySnchronizedBlockingQueueS = new MySnchronizedBlockingQueueS(10);
        int resourceCount = mySnchronizedBlockingQueueS.size(); //阻塞队列资源个数
        //创建生产者线程
        Runnable producer=()->{
            while (resourceCount < 1){
                try {
                    int random = new Random().nextInt(100); //生成一个0-100的两位整数
                    mySnchronizedBlockingQueueS.put(random);
                    System.out.println("北京时间:" + Instant.now() + ":" + Thread.currentThread().getName() + ":" + "生产了一个整型数据==>"+random + ",当前资源池有"+mySnchronizedBlockingQueueS.size()+"个资源");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }};
        for (int i = 1; i <= 5; i++) { //创建5个生产线程 线程名称从0-4
            new Thread(producer).start();
        }
        //创建消费者线程
        Runnable consumer=()->{
            while (true){
                try {
                    System.out.println("北京时间:" + Instant.now() + ":" + Thread.currentThread().getName() + ":" + "消费了一个整型数据==>"+mySnchronizedBlockingQueueS.take() + ",当前资源池有"+mySnchronizedBlockingQueueS.size()+"个资源");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }};
        for (int i = 1; i <= 5; i++) { //创建5个消费线程 线程名称从5-9
            new Thread(consumer).start();
        }
    }
}
/***
 *
 *  自定义阻塞队列: 通过Synchronized + wait/notifyAll实现
 * @author Bierce
 * @date 2023/08/15
 */
class MySnchronizedBlockingQueueS {
    private final int maxSize; //容器允许存放的最大数量
    private final LinkedList<Integer> container; //存储数据的容器
    public MySnchronizedBlockingQueueS(int maxSize ) {
        this.maxSize = maxSize;
        this.container = new LinkedList<>();
    }
    /**
     *  往队列添加元素,如果队列已满则阻塞线程
     */
    public  synchronized  void put(Integer data){
        //如果队列已满,则阻塞生产者线程
        while (container.size()==maxSize){
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        //队列未满则添加元素,并通知消费者消费数据
        container.add(data);
        notifyAll();
    }
    /**
     *  从队列取出数据,如果队列为空则阻塞
     * @return  队列元素
     */
    public synchronized  Integer take(){
        //如果队列为空,则消费者停止消费
        while (container.size()==0){
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        //队列不为空则消费数据,并通知生产者继续生产数据
        int data = container.poll();
        notifyAll();
        return data;
    }
    public int size(){
        return container.size();
    }
}

synchronized无法实现精确通知的效果,而Condition可以达到精确通知哪个线程要被唤醒 

Lock+Condition实现

package com.bierce.multiThread;
import java.time.Instant;
import java.util.LinkedList;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class TestProducerConsumer2 {
    public static void main(String[] args) throws InterruptedException {
        //创建一个自定义的阻塞队列,其中存储整型的个数为10
        MyBlockingQueue<Integer> myBlockingQueue = new MyBlockingQueue<>(10, false);
        int resourceCount = myBlockingQueue.size(); //阻塞队列资源个数
        //创建生产者线程
        Runnable producer=()->{
            while (resourceCount < 1){
                try {
                    int random = new Random().nextInt(100); //生成一个0-100的两位整数
                    myBlockingQueue.put(random);
                    System.out.println("北京时间:" + Instant.now() + ":" + Thread.currentThread().getName() + ":" + "生产了一个整型数据==>"+random + ",当前资源池有"+myBlockingQueue.size()+"个资源");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }};

        for (int i = 1; i <= 5; i++) { //创建5个生产线程 线程名称从0-4
            new Thread(producer).start();
        }
        //创建消费者线程
        Runnable consumer=()->{
            while (true){
                try {
                    System.out.println("北京时间:" + Instant.now() + ":" + Thread.currentThread().getName() + ":" + "消费了一个整型数据==>"+myBlockingQueue.take() + ",当前资源池有"+myBlockingQueue.size()+"个资源");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }};
        for (int i = 1; i <= 5; i++) { //创建5个消费线程 线程名称从5-9
            new Thread(consumer).start();
        }
    }
}
/**
 *自定义阻塞队列: Lock+Condition实现
 */
class MyBlockingQueue<E> {
    private final Queue queue; //队列容器
    private final int capacity; //队列容量
    final ReentrantLock lock; //对象锁
    private final Condition notEmpty; //等待取出数据条件
    private final Condition notFull; //等待添加数据条件
    /**
     * 初始化阻塞队列
     * @param capacity  队列容量
     * @param fair  是否公平锁
     */
    public MyBlockingQueue(int capacity, boolean fair) {
        this.queue = new LinkedList();
        this.capacity=capacity;
        this.lock = new ReentrantLock(fair);
        this.notEmpty = lock.newCondition();
        this.notFull =  lock.newCondition();
    }
    /**
     *   往队列插入元素,如果队列大小到达容量限制则阻塞
     * @param e 插入元素
     * @throws InterruptedException 中断异常
     */
    public  void put(E e) throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lock(); //上锁
        try{
            while (queue.size()==capacity){ //队列已满则阻塞
                notFull.await();
            }
            //队列未满则加入数据并唤醒消费者进行消费
            queue.add(e);
            notEmpty.signalAll();
        } finally {
            lock.unlock(); //必须释放锁
        }
    }
    /**
     *   从队列取出一个元素,如果队列为空则阻塞
     * @return 队列元素
     * @throws InterruptedException 中断异常
     */
    public  E take()throws  InterruptedException{
        final ReentrantLock lock = this.lock;
        lock.lock();
        try{
            while (queue.size()==0){ //队列为空则阻塞
                notEmpty.await();
            }
            //队列有数据则获取数据并唤醒生产者进行生产
            E element = (E) queue.remove();
            notFull.signalAll();
            return   element;
        } finally {
            lock.unlock(); //必须释放锁
        }
    }
    public int size(){
        return queue.size();
    }
}
自定义myBlockingQueue控制台输出

阻塞队列BlockingQueue实现

public void put(E e) throws InterruptedException {
	checkNotNull(e);
	final ReentrantLock lock = this.lock;
	lock.lockInterruptibly();
	try {
		while (count == items.length)
			notFull.await();
		enqueue(e);
	} finally {
		lock.unlock();
	}
}
public E take() throws InterruptedException {
	final ReentrantLock lock = this.lock;
	lock.lockInterruptibly();
	try {
		while (count == 0)
			notEmpty.await();
		return dequeue();
	} finally {
		lock.unlock();
	}
}
/**
 * Inserts element at current put position, advances, and signals.
 * Call only when holding lock.
 */
private void enqueue(E x) {
	// assert lock.getHoldCount() == 1;
	// assert items[putIndex] == null;
	final Object[] items = this.items;
	items[putIndex] = x;
	if (++putIndex == items.length)
		putIndex = 0;
	count++;
	notEmpty.signal();
}
/**
 * Extracts element at current take position, advances, and signals.
 * Call only when holding lock.
 */
private E dequeue() {
	// assert lock.getHoldCount() == 1;
	// assert items[takeIndex] != null;
	final Object[] items = this.items;
	@SuppressWarnings("unchecked")
	E x = (E) items[takeIndex];
	items[takeIndex] = null;
	if (++takeIndex == items.length)
		takeIndex = 0;
	count--;
	if (itrs != null)
		itrs.elementDequeued();
	notFull.signal();
	return x;
}

根据ArrayBlockingQueue的put和take方法源码可知其底层最终使用的仍是Lock+condition机制

package com.bierce.multiThread;
import java.time.Instant;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class TestProducerConsumer3 {
    public static void main(String[] args) throws InterruptedException {
        //创建一个阻塞队列,其中存储整型的个数为10
        BlockingQueue<Integer> queue= new ArrayBlockingQueue<>(10);
        int resourceCount = queue.size(); //阻塞队列资源个数
        //System.out.println("资源总数:" + resourceCount);
        //创建生产者线程
        Runnable producer=()->{
            while (resourceCount < 1){
                try {
                    int random = new Random().nextInt(100); //生成一个0-100的两位整数
                    queue.put(random);
                    System.out.println("北京时间:" + Instant.now() + ":" + Thread.currentThread().getName() + ":" + "生产了一个整型数据==>"+random + ",当前资源池有"+queue.size()+"个资源");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }};
        for (int i = 1; i <= 5; i++) { //创建5个生产线程 线程名称从0-4
            new Thread(producer).start();
        }
        //创建消费者线程
        Runnable consumer=()->{
            while (true){
                try {
                    System.out.println("北京时间:" + Instant.now() + ":" + Thread.currentThread().getName() + ":" + "消费了一个整型数据==>"+queue.take() + ",当前资源池有"+queue.size()+"个资源");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }};
        for (int i = 1; i <= 5; i++) { //创建5个消费线程 线程名称从5-9
            new Thread(consumer).start();
        }
    }
}
BlockingQueue控制台输出

扩展

  • 通过信号量semaphore实现
  • 通过PipedInputStream/PipedOutputStream实现

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/882686.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

CMake生成的VS项目之后运行弹出错误框:无法启动...\Debug\~.exe,找不到指定的文件夹

问题&#xff1a; CMake生成的VS项目之后运行弹出错误框&#xff1a;无法启动…\Debug~.exe&#xff0c;找不到指定的文件夹 首先确实Debug文件夹中没有.exe文件 问题的根本是项目缺东西&#xff0c;有问题&#xff0c;当所有问题解决了&#xff0c;也就不会出现这个问题的&am…

【Mariadb高可用MHA】

目录 一、概述 1.概念 2.组成 3.特点 4.工作原理 二、案例介绍 1.192.168.42.3 2.192.168.42.4 3.192.168.42.5 4.192.168.42.6 三、实际构建MHA 1.ssh免密登录 1.1 所有节点配置hosts 1.2 192.168.42.3 1.3 192.168.42.4 1.4 192.168.42.5 1.5 192.168.42.6 …

02-C++数据类型-高级

数据类型-高级 4、复合类型 4.4、结构简介 struct inflatable {char name[20];float vol;double price; };inflatable vincent; //C struct inflatable goose; //C例子 // structur.cpp -- a simple structure #include <iostream> struct inflatable // structu…

【数据库系统】--【2】DBMS架构

DBMS架构 01DBMS架构概述02 DBMS的物理架构03 DBMS的运行和数据架构DBMS的运行架构DBMS的数据架构PostgreSQL的体系结构RMDB的运行架构 04DBMS的逻辑和开发架构DBMS的层次结构DBMS的开发架构DBMS的代码架构 05小结 01DBMS架构概述 02 DBMS的物理架构 数据库系统的体系结构 数据…

(二)结构型模式:5、装饰器模式(Decorator Pattern)(C++实例)

目录 1、装饰器模式&#xff08;Decorator Pattern&#xff09;含义 2、装饰器模式的UML图学习 3、装饰器模式的应用场景 4、装饰器模式的优缺点 5、C实现装饰器模式的简单实例 1、装饰器模式&#xff08;Decorator Pattern&#xff09;含义 装饰模式&#xff08;Decorato…

学习 Iterator 迭代器

今天看到一个面试题&#xff0c; 让下面解构赋值成立。 let [a,b] {a:1,b:2} 如果我们直接在浏览器输出这行代码&#xff0c;会直接报错&#xff0c;说是 {a:1,b:2} 不能迭代。 看了es6文档后&#xff0c;具有迭代器的就一下几种类型&#xff0c;没有Object类型&#xff0c;…

探索Java中的静态变量与实例变量:存储区域、生命周期以及内存分配方式的区别

文章目录 静态变量实例变量不可变对象静态变量和实例变量有什么区别&#xff1f;静态变量实例变量 Object 类都有哪些公共方法&#xff1f;Java 创建对象有哪几种方式&#xff1f;ab 与 a.equals(b) 有什么区别&#xff1f;总结 &#x1f389;欢迎来到Java面试技巧专栏~探索Jav…

Nacos详解(springcloud+nacos实战)

Nacos详解 Nacos1.介绍2.Nacos专业术语2.1 服务 (Service)2.2 服务注册中心 (Service Registry)2.3服务提供方 (Service Provider)2.4服务消费方 (Service Consumer)2.5版本依赖关系 Nacos 注册中心1. 启动NacosServer2 使用 Nacos 做注册中心2.1 nacos-client-b2.2 nacos-clie…

部门用户权限应用的设计和创建(进行中)

数据库表设计 代码实现之前首先是表设计&#xff0c; 六个基本步骤 1.需求分析 (分析用户需求,包括数据、功能和性能需求&#xff09; 2.概念结构设计(主要采用 E-R图) 3.逻辑结构设计 (将ER图转换成表,实现从E-R模型到关系模型转换&#xff09; 4.数据库物理设计 (为设计的…

深度学习的“前世今生”

1、“感知机”的诞生 20世纪50年代&#xff0c;人工智能派生出了这样两个学派&#xff0c;分别是“符号学派”及“连接学派”。前者的领军学者有Marvin Minsky及John McCarthy&#xff0c;后者则是由Frank Rosenblatt所领导。 符号学派的人相信对机器从头编程&#xff0c;一个…

Vue-5.编译器idea

关闭 IDEA 自动更新 IDEA无法搜索插件 填写idea下载插件的官方地址点击ok测试成功则ok https://plugins.jetbrains.com/idea 全局内存配置&#xff08;重启后生效&#xff09; 部署 Alibaba Cloud toolkit&#xff08;部署代码的利器&#xff09; Git&#xff08;需要安装gi…

人工智能原理(4)

目录 一、确定性推理 1、推理方式 2、控制策略 二、推理的逻辑基础 1、永真和可满足性 2、等价性和永真蕴含 3、置换与合一 三、自然演绎推理 四、归结演绎推理 1、子句型 2、鲁滨逊归结原理 3、归结策略 一、确定性推理 推理&#xff1a;就是按照某种策略从已有事…

微机原理与接口技术 学习笔记(二) 存储器

文章目录 一&#xff0c;存储器1.1 概述1.1.1 半导体存储器的分类按制造工艺&#xff1a; 易失性或挥发性存储器 / 不易失性或不挥发性存储器按制造工艺&#xff1a; 1.1.2 半导体存储器的性能指标1.1.3 半导体存储器的一般结构及组成 1.2 随机存取存储器 RAM1.2.1 静态RAM1.2.…

操作符和表达式求值

目录 1.运算符的优先级和结合性 1.1运算符的优先级 1.2结合性 2.操作符的使用最终带来的是一个表达式的值 2.1.隐式类型转换&#xff08;整型提升&#xff09; 2.1.1整形提升的例子 2.2算术转换 1.运算符的优先级和结合性 运算符是编程语言中的基本元素之一&#xff0c;主…

临床试验三原则-对照、重复、随机

临床试验必须遵循三个基本原则&#xff1a;对照、重复、随机。 一、对照原则和对照的设置 核心观点&#xff1a;有比较才有鉴别。 对照组和试验组同质可比。 三臂试验 安慰剂&#xff1a;试验组&#xff1a;阳性对照组1&#xff1a;n&#xff1a;m&#xff08;n≥m&#xff…

论文略读:城市道路场景下车辆编队运动规划与控制算法研究

1. 一些观点&#xff1a; &#xff08;1&#xff09;我曾经认为不能复现的论文都是垃圾。我现在看到能够量产的论文之后发现&#xff0c;论文的复现实属难得&#xff0c;即使给你代码&#xff0c;反复钻研&#xff0c;一个月之久才敢说略微看懂&#xff0c;所以论文的复现实在是…

使用 `tailwindcss-patch@2` 来提取你的类名吧

使用 tailwindcss-patch2 来提取你的类名吧 使用 tailwindcss-patch2 来提取你的类名吧 安装使用方式 命令行 Cli 开始提取吧 Nodejs API 的方式来使用 配置 初始化 What’s next? tailwindcss-patch 是一个 tailwindcss 生态的扩展项目。也是 tailwindcss-mangle 项目重要…

高等数学教材重难点题型总结(二)导数与微分

本章重点题目较少&#xff0c;除了*标题页没什么特别难的&#xff0c;本帖出于总结性的角度考虑并未囊概全部的*标&#xff0c;最后会出一期*标题的全部内容整理&#xff0c;在攻克重难点的基础上更上一层楼。 1.根据定义求某点处的导数值 2.通过定义证明导数 3.左右导数的相关…

QT使用QML实现地图绘制虚线

QML提供了MapPolyline用于在地图上绘制线段&#xff0c;该线段是实线&#xff0c;因此我使用Canvas自定义绘制的方式在地图上绘制线段&#xff0c;如图&#xff1a; 鼠标在地图上点击后&#xff0c;在点击位置添加图标 &#xff0c;当有多个图标被添加到地图上后&#xff0c;计…

openGauss学习笔记-40 openGauss 高级数据管理-锁

文章目录 openGauss学习笔记-40 openGauss 高级数据管理-锁40.1 语法格式40.2 参数说明40.3 示例 openGauss学习笔记-40 openGauss 高级数据管理-锁 如果需要保持数据库数据的一致性&#xff0c;可以使用LOCK TABLE来阻止其他用户修改表。 例如&#xff0c;一个应用需要保证表…