第十六章Java多线程常见模式

news2025/1/13 17:05:23

文章目录

  • 同步模式之保护性暂停
    • 带超时版 GuardedObject
    • join 原理
    • 多任务版 GuardedObject
  • 异步模式之生产者/消费者模式
    • 标准库中的阻塞队列
    • 阻塞队列的实现
      • 加锁实现
    • 生产者消费者模型的作用是什么

同步模式之保护性暂停

定义

  • 即 Guarded Suspension,用在一个线程等待另一个线程的执行结果

要点

  • 有一个结果需要从一个线程传递到另一个线程,让他们关联同一个 GuardedObject
  • 如果有结果不断从一个线程到另一个线程那么可以使用消息队列(见生产者/消费者)
  • JDK 中,join 的实现、Future 的实现,采用的就是此模式
  • 因为要等待另一方的结果,因此归类到同步模式

image-20230625105247377

带超时版 GuardedObject

public class GuardedObject {
    private Object response;
    private final Object lock = new Object();
    public Object get(long time){
        //1记录最初时间
        long startTime = System.currentTimeMillis();
        //2记录还需要等待的时间
        long passedTime = 0;
        synchronized (lock){
            while (response==null){
                //4计算当前还需要等待多久时间
                long waitTime=time-passedTime;
                if (waitTime <= 0) {
                    System.out.println("break");
                    break;
                }
                try {
                    lock.wait(waitTime);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //如果提前被唤醒(以为存在被虚假唤醒的情况)
               passedTime = System.currentTimeMillis() - startTime;
                System.out.println("已经过去了"+passedTime);
            }
            return response;
        }
    }
    public void complete(Object response){
        synchronized (lock){
            this.response=response;
            System.out.println("notify");
            lock.notifyAll();
        }
    }
}

测试

public class Demo1 {
    public static void main(String[] args) {
        GuardedObject v1 = new GuardedObject();
        new Thread(()->{
            try {
                TimeUnit.SECONDS.sleep(1);
                v1.complete(null);

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"t1").start();
        Object o = v1.get(2500);
        if(o!=null){
            System.out.println(o);
        }else {
            System.out.println("不能获取response");
        }
    }
}
notify
已经过去了1002
已经过去了2504
break
不能获取response

join 原理

public final synchronized void join(long millis) throws InterruptedException {
        long base = System.currentTimeMillis();
        long now = 0;

        if (millis < 0) {
            throw new IllegalArgumentException("timeout value is negative");
        }

        if (millis == 0) {
            while (isAlive()) {
                wait(0);
            }
        } else {
            while (isAlive()) {
                long delay = millis - now;
                if (delay <= 0) {
                    break;
                }
                wait(delay);
                now = System.currentTimeMillis() - base;
            }
        }
    }
  • join(0)表示一直等待,因为wait(0)表示一直等待
  • 这里的等待的结果其实就是对应线程是否存活,因为我们调用这个方法是采用对象.join()
  • delay就是对应还需要等待多久时间 now就是已经过去多久时间

多任务版 GuardedObject

image-20230625115446903

图中 Futures 就好比居民楼一层的信箱(每个信箱有房间编号),左侧的 t0,t2,t4 就好比等待邮件的居民,右侧的 t1,t3,t5 就好比邮递员

  • 如果需要在多个类之间使用 GuardedObject 对象,作为参数传递不是很方便,因此设计一个用来解耦的中间类,这样不仅能够解耦【结果等待者】和【结果生产者】,还能够同时支持多个任务的管理
public class GuardedObjects {
    // 标识 Guarded Object
    private int id;

    public GuardedObjects(int id) {
        this.id = id;
    }
    public int getId() {
        return id;
    }
    //结果
    private Object response;
    //获取结果
    //timeOut表示要等待多久
    public Object get(long timeout){
        synchronized (this){
            //1开始时间
            long startTime = System.currentTimeMillis();
            long passedTime= 0;
            while (response==null){
                long waitTime = timeout - passedTime;
                if(waitTime<=0){
                    break;
                }
                try {
                    this.wait(waitTime);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                passedTime = System.currentTimeMillis() - startTime;
            }
            return response;
        }
    }
    public void complete(Object object){
        synchronized (this){
            this.response=object;
            this.notifyAll();
        }
    }

}

邮箱结构

public class Mailboxes {
    private static Map<Integer,GuardedObjects> boxes = new Hashtable<>();
    private static int id = 1;
    //产生唯一id
    private static synchronized int generateId(){
        return id++;
    }
    public static GuardedObjects getGuardedObject(int id) {
        return boxes.remove(id);
    }
    public static GuardedObjects createGuardedObject() {
        GuardedObjects go = new GuardedObjects(generateId());
        boxes.put(go.getId(), go);
        return go;
    }
    public static Set<Integer> getIds() {
        return boxes.keySet();
    }
}

测试

class People extends Thread{
    @Override
    public void run() {
        // 收信
        GuardedObjects guardedObject = Mailboxes.createGuardedObject();
        System.out.println("开始收信 id:"+guardedObject.getId());
        Object mail = guardedObject.get(5000);
        System.out.println("收到信 id:"+guardedObject.getId() + "内容为"+ mail);
    }
}
class Postman extends Thread {
    private int id;
    private String mail;
    public Postman(int id, String mail) {
        this.id = id;
        this.mail = mail;
    }
    @Override
    public void run() {
        GuardedObjects guardedObject = Mailboxes.getGuardedObject(id);
        System.out.println("送信 id为"+id+"内容为"+mail);
        guardedObject.complete(mail);
    }
}
public class Demo2 {
    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 3; i++) {
            new People().start();
        }
       TimeUnit.SECONDS.sleep(1);
        for (Integer id : Mailboxes.getIds()) {
            new Postman(id,"你好"+id).start();
        }
    }
}

异步模式之生产者/消费者模式

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题
生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等 待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取.

  • 阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力.

    • 比如在 " 秒杀 " 场景下 , 服务器同一时刻可能会收到大量的支付请求 . 如果直接处理这些支付请求 , 服务器可能扛不住( 每个支付请求的处理都需要比较复杂的流程 ). 这个时候就可以把这些请求都放 到一个阻塞队列中, 然后再由消费者线程慢慢的来处理每个支付请求 .这样做可以有效进行 " 削峰 ", 防止服务器被突然到来的一波请求直接冲垮 .
  • 阻塞队列也能使生产者和消费者之间 解耦.

    • 比如过年一家人一起包饺子 . 一般都是有明确分工 , 比如一个人负责擀饺子皮 , 其他人负责包 . 擀饺子皮的人就是 " 生产者 ", 包饺子的人就是 " 消费者 ". 擀饺子皮的人不关心包饺子的人是谁( 能包就行 , 无论是手工包 , 借助工具 , 还是机器包 ), 包饺子的人 也不关心擀饺子皮的人是谁( 有饺子皮就行 , 无论是用擀面杖擀的 , 还是拿罐头瓶擀 , 还是直接从超 市买的).

    阻塞队列的大小

标准库中的阻塞队列

在 Java 标准库中内置了阻塞队列 . 如果我们需要在一些程序中使用阻塞队列 , 直接使用标准库中的即可 .
BlockingQueue 是一个接口 .真正实现的类是 LinkedBlockingQueue和ArrayBlockingQueue等

  • put 方法用于阻塞式的入队列, take 用于阻塞式的出队列.
  • BlockingQueue 也有 offer, poll, peek 等方法, 但是这些方法不带有阻塞特性.
  • 必须先生产 ,再消费,不然会阻塞

通过构造方法来确定阻塞队列的大小
若没有声明数字,就表示无界

阻塞队列的实现

  • 通过 “循环队列” 的方式来实现.
  • 使用 synchronized 进行加锁控制.
    • put 插入元素的时候, 判定如果队列满了, 就进行 wait. (注意, 要在循环中进行 wait. 被唤醒时不一 定队列就不满了, 因为同时可能是唤醒了多个线程).
    • take 取出元素的时候, 判定如果队列为空, 就进行 wait. (也是循环 wait)

加锁实现

要点

  • 与前面的保护性暂停中的 GuardObject 不同,不需要产生结果和消费结果的线程一一对应
  • 消费队列可以用来平衡生产和消费的线程资源
  • 生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据
  • 消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据
  • JDK 中各种阻塞队列,采用的就是这种模式

image-20230625160404007

简单实现线程间阻塞队列

class Message {
    private int id;
    private Object message;

    public Message(int id, Object message) {
        this.id = id;
        this.message = message;
    }

    public int getId() {
        return id;
    }

    public Object getMessage() {
        return message;
    }
}
class BlockingQueue {
    private LinkedList<Message> queue;
    private int capacity;
    public BlockingQueue(int capacity){
        this.capacity=capacity;
        queue = new LinkedList<>();
    }
    public Message take(){
        synchronized (queue){
            while (queue.isEmpty()){
                System.out.println("没货了,wait");
                try {
                    queue.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            Message message = queue.removeFirst();
            System.out.println("输出一个资源");
            queue.notifyAll();
            return message;
        }
    }
    public void put(Message message){
        synchronized (queue){
            while (queue.size()==capacity){
                System.out.println("队列满了,wait");
                try {
                    TimeUnit.SECONDS.sleep(1);
                    queue.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            queue.push(message);
            System.out.println("进入一个资源");
            queue.notifyAll();
        }
    }
}
  • put 插入元素的时候, 判定如果队列满了, 就进行 wait. (注意, 要在循环中进行 wait. 被唤醒时不一定队列就不满了, 因为同时可能是唤醒了多个线程).
  • take 取出元素的时候, 判定如果队列为空, 就进行 wait. (也是循环 wait)

测试

public class BlockingQueueDemo{
    public static void main(String[] args) {
        BlockingQueue blockingQueue = new BlockingQueue(2);
        // 4 个生产者线程
        for (int i = 0; i < 4; i++) {
            int id = i;
            new Thread(()->{
                blockingQueue.put(new Message(id,"资源"+id));
            },"t1").start();
        }
        // 1 个消费者线程, 处理结果
        new Thread(() -> {
            while (true) {
                Message message = blockingQueue.take();
                System.out.println("id为"+message.getId()+" 信息为"+message.getMessage());
            }
        }, "消费者").start();
    }
}
没货了,wait
进入一个资源
输出一个资源
进入一个资源
进入一个资源
id为2 信息为资源2
队列满了,wait
输出一个资源
id为0 信息为资源0
输出一个资源
id为3 信息为资源3
进入一个资源
输出一个资源
id为1 信息为资源1
没货了,wait

生产者消费者模型的作用是什么

这个问题很理论,但是很重要:

  • 1 通过平衡生产者的生产能力和消费者的消费能力来提升整个系统的运行效率,这是生产者消费者模型最重要的作用
  • 2解耦,这是生产者消费者模型附带的作用,解耦意味着生产者和消费者之间的联系少,联系越少越可以独自发展而不需要收到相互的制约

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/712179.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

LeetCode_链表_中等_445.两数相加 II

目录 1.题目2.思路3.代码实现&#xff08;Java&#xff09; 1.题目 给你两个非空链表来代表两个非负整数。数字最高位位于链表开始位置。它们的每个节点只存储一位数字。将这两数相加会返回一个新的链表。 你可以假设除了数字 0 之外&#xff0c;这两个数字都不会以零开头。 …

C#异常总结

C#异常总结 定义Try语句异常类创建用户自定义异常搜索调用栈的示例异常抛出 定义 程序中的运行时错误&#xff0c;它违反一个系统约束或应用程序约束&#xff0c;或出现了在正常操作时未预料的情形。 Try语句 指明被异常保护的代码块&#xff0c;并提供代码以处理异常。try由…

ICC2:fixed和locked有什么不同?

如题&#xff0c;physical status中locked与fixed有很多小伙伴会搞混&#xff0c;从实用性的角度来讲这两个并没有什么区别&#xff0c;一是工具都不会更改object这两种属性&#xff0c;二是工具都不会在优化过程中移除这两个属性的object。 所以&#xff0c;唯一的区别在于lo…

【JUC-4】线程池实战应用

线程池 线程池创建方式 Executors创建线程池(不推荐) JDK提供的工具类Execurtors创建线程池(不推荐), 列举几个Executors中创建线程池的方法; 查看Executors的源代码发现, 它创建线程池也是通过 new ThreadPoolExecutor() 来创建线程池的. 当然其中有一些特殊的线程池也不是…

Moka AI产品后观察:HR SaaS迈进AGI时代

在AI这条路上&#xff0c;Moka已经走了很远。如今的Moka Eva是在此前AI模型基础上的更进一步。未来AGI时代&#xff0c;HR SaaS会有更多可能性。 出品|产业家 在AI潮水里&#xff0c;Moka正在加速快跑。 在6月28日的2023夏季新品发布会上&#xff0c;国内首个AI原生HR Saa…

流量卡收货地就是归属地,这是什么意思呢?

我们在网上申请流量卡时&#xff0c;会比较关注流量卡归属地这一问题&#xff0c;据小编了解&#xff0c;目前网上的流量卡归属地有两种模式&#xff0c;接下来&#xff0c;小编一一为大家介绍一下&#xff0c;大家可以根据自己的情况来选择。 ​ 在中国的手机号码都有固定的区…

【MySQL】备份数据(导出数据 / 导入数据)

&#x1f3af;导出数据 1、使用 SELECT ... INTO OUTFILE 语句导出数据 SELECT...INTO OUTFILE 是 MySQL 用于导出数据的语句&#xff0c;它允许将查询结果保存到指定的文件中。 该语句的基本语法如下&#xff1a; SELECT column1, column2, ... INTO OUTFILE file_path …

Kotlin~Template模版方法模式

概念 定义算法骨架、代码模版 角色介绍 Abstract ClassConcrete Class UML 代码实现 abstract class Game {protected abstract fun initialize()protected abstract fun startPlay()protected abstract fun endPlay()// 模版fun play(){initialize()startPlay()endPlay()…

Spring面试题--Spring中事务失效的场景有哪些

Spring中事务失效的场景有哪些&#xff1f; 异常捕获处理 Transactional public void update(Integer from, Integer to, Double money) {try {//转账的用户不能为空Account fromAccount accountDao.selectById(from);//判断用户的钱是否够转账if (fromAccount.getMoney() - …

idea运行项目时右下角一直提示Lombok requires enabled annotation processing

出现这个错误是因为使用了Lombok插件的原因&#xff0c;可能是安装时候没有配置好 Lombok requires enabled annotation processing&#xff1a;翻译过来就是Lombok 需要启用注释处理 解决方案 File -> Settings ->Build,Execution,Deployment -> Compiler ->An…

35岁程序员现状,太真实!

“未来每年&#xff0c;我们将会为社会输送1000名工作10年以上的人才。” 这是之前马云在演讲中提到的关于阿里巴巴这样的大厂老员工的问题。总的来讲就是——“毕业”。 也经常能够看到在各个平台有程序员讲到自己35岁的焦虑。 之前&#xff0c;在某平台上就有一个有意思的…

Redis主从复制模式3

谋权篡位 假设在一个Redis集群中&#xff0c;有一台主机和两台从机构成一个Redis集群。此时因外部原因&#xff0c;导致主机宕机&#xff0c;俗话说 “国不可⼀一日无君&#xff0c;军不可一日无帅”&#xff0c;那么需要从剩余的两台从机中再次选出一台主机&#xff0c;从而来…

【小沐学Unity3d】Unity插件之绳索模拟Obi Rope

文章目录 1、简介2、安装3、示例测试3.1 Chains3.2 Crane3.3 ElectricalWires3.4 FreightLift3.5 Rocker3.6 RopeAndJoints3.7 RopeShowcase 4、简单测试结语 1、简介 https://assetstore.unity.com/packages/tools/physics/obi-rope-55579 Obi 是一款基于粒子的高级物理引擎…

本地部署 Chatbot UI,一个开源的 ChatGPT UI

openchat-ui 0. 什么是 Chatbot UI1. Github 地址2. 本地部署3. (参考)配置文件说明 0. 什么是 Chatbot UI Chatbot UI 是一个用于 AI 模型的开源聊天 UI。适用于 OpenChat 模型。 画面效果展示如下&#xff0c; 1. Github 地址 https://github.com/imoneoi/openchat-ui 2.…

Docker安装RabbitMQ docker安装RabbitMQ完整详细教程

Docker安装RabbitMQ docker安装RabbitMQ完整详细教程 Docker 上安装 RabbitMQ 3.12 的步骤&#xff1a;选择要安装的RabbitMQ 版本1、拉取 RabbitMQ 镜像2、创建并运行容器3、RabbitMQ 常用端口以及作用4、访问 管理页面测试&#xff0c;是否启动成功关闭容器启动容器 5、Docke…

OpenResty cosocket

cosocket 是各种 lua-resty-* 非阻塞库的基础 cosocket coroutine socket 需要 Lua 协程特性的支持&#xff0c;也需要 Nginx 事件机制的支持&#xff0c;两者结合在一起实现非阻塞网络 I/O。 遇到网络 I/O 时会交出控制权&#xff0c;把网络事件注册到 Nginx 监听列表中&a…

OpenSSH 用户枚举漏洞(CVE-2018-15473) 漏洞修复

OpenSSH 用户枚举漏洞&#xff08;CVE-2018-15473&#xff09;漏洞修复 1 漏洞说明2 漏洞修复3 相关问题 1 漏洞说明 2 漏洞修复 查看当前openssh版本&#xff1a; [rootizr0a05u4qferpr7yfhtotz ~]# ssh -V OpenSSH_7.4p1, OpenSSL 1.0.2k-fips 26 Jan 2017 [rootizr0a05u4…

ChatGLM-6B一键安装,马上使用(windows)!!

产品特点 双语&#xff1a; 同时支持中文和英文。 高精度&#xff08;英文&#xff09;&#xff1a; 在公开的英文自然语言榜单 LAMBADA、MMLU 和 Big-bench-lite 上优于 GPT-3 175B&#xff08;API: davinci&#xff0c;基座模型&#xff09;、OPT-175B 和 BLOOM-176B。 高…

MiniKube安装教程,简易版k8s,带你用最简单的方法体验k8s(学习环境通用)

Minikube安装教程 minikube 是本地 Kubernetes&#xff0c;专注于让 Kubernetes 易于学习和开发&#xff0c;这能很方便的在本地进行k8s学习&#xff0c;减轻初学者对于k8s的安装困难。&#xff08;信我&#xff0c;新手入门k8s就用这个够用了&#xff01;先别去折腾生产环境的…

[pyqt5]右键窗口弹出菜单并触发菜单点击事件

from PyQt5.Qt import * import sysclass MyWindow(QWidget):# 自定义窗体def contextMenuEvent(self, evt: QContextMenuEvent) -> None:menu QMenu(self)new_action QAction(打开, menu)new_action.triggered.connect(self.open)close_action QAction(关闭, menu)close…