(多线程)并发编程的三大基础应用——阻塞队列、定时器、线程池【手搓源码】

news2024/12/29 1:08:06

9.2 阻塞式队列

在这里插入图片描述
在这里插入图片描述在这里插入图片描述

BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<Integer>();
BlockingQueue<String> queue = new LinkedBlockingQueue<>();
// 入队列
queue.put("abc");
// 出队列. 如果没有 put 直接 take, 就会阻塞. 
String elem = queue.take();
public static void main(String[] args) throws InterruptedException {
    BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<Integer>();
    Thread customer = new Thread(() -> {
        while (true) {
            try {
                int value = blockingQueue.take();
                System.out.println("消费元素: " + value);
           } catch (InterruptedException e) {
                e.printStackTrace();
           }
       }
   }, "消费者");
    customer.start();
    Thread producer = new Thread(() -> {
        Random random = new Random();
        while (true) {
            try {
                int num = random.nextInt(1000);
                System.out.println("生产元素: " + num);
                blockingQueue.put(num);
                Thread.sleep(1000);
           } catch (InterruptedException e) {
                e.printStackTrace();
           }
       }
   }, "生产者");
    producer.start();
    customer.join();
    producer.join();
}
阻塞队列实现
// 名字还是不要和标准库的混淆
class MyBlockingQueue {
    private int[] items = new int[1000];
    private volatile int head = 0;
    private volatile int tail = 0;
    private volatile int size = 0;

    // 入队列
    public void put(int elem) throws InterruptedException {
        synchronized (this) {
            // 判定队列是否满了, 满了则不能插入.
            while (size >= items.length) {
                this.wait();
            }
            // 进行插入操作, 把 elem 放到 items 里, 放到 tail 指向的位置.
            items[tail] = elem;
            tail++;
            if (tail >= items.length) {
                tail = 0;
            }
            size++;
            this.notify();
        }
    }

    // 出队列, 返回删除的元素内容
    public Integer take() throws InterruptedException {
        synchronized (this) {
            // 判定队列是否空, 如果空了, 则不能出队列
            while (size == 0) {
                this.wait();
            }
            // 进行取元素操作.
            int ret = items[head];
            head++;
            if (head >= items.length) {
                head = 0;
            }
            size--;
            this.notify();
            return ret;
        }
    }
}

public class TestMyBlockingQueue {
    public static void main(String[] args) {
        MyBlockingQueue queue = new MyBlockingQueue();

        Thread producer = new Thread(() -> {
            int n = 1;
            while (true) {
                try {
                    queue.put(n);
                    System.out.println("生产元素 " + n);
                    n++;
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        Thread customer = new Thread(() -> {
            while (true) {
                try {
                    int n = queue.take();
                    System.out.println("消费元素 " + n);
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        producer.start();
        customer.start();
    }
}

9.3 定时器

定时器的使用样例
public class ThreadTimer {
    public static void main(String[] args) {
        // 标准库的定时器.
        Timer timer = new Timer();
        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                System.out.println("时间到, 快起床!");
            }
        }, 3000);

        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                System.out.println("时间到2!");
            }
        }, 4000);

        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                System.out.println("时间到3!");
            }
        }, 5000);

        System.out.println("开始计时!");
    }
}
Timer timer = new Timer();
timer.schedule(new TimerTask() {
    @Override
    public void run() {
        System.out.println("hello");
   }
}, 3000);
实现定时器
定时器的构成:
  • 一个带优先级的阻塞队列

为啥要带优先级呢?
因为阻塞队列中的任务都有各自的执行时刻 (delay). 最先执行的任务一定是 delay 最小的. 使用带优先级的队列就可以高效的把这个 delay 最小的任务找出来.

  • 队列中的每个元素是一个 Task 对象.
  • Task 中带有一个时间属性, 队首元素就是即将
  • 同时有一个 worker 线程一直扫描队首元素, 看队首元素是否需要执行
定时器完整代码
package threading;

import java.util.ArrayDeque;
import java.util.PriorityQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;

// 这个类表示一个任务
class MyTask implements Comparable<MyTask> {
    // 要执行的任务
    private Runnable runnable;
    // 什么时间来执行任务. (是一个时间戳)
    private long time;

    public MyTask(Runnable runnable, long delay) {
        this.runnable = runnable;
        this.time = System.currentTimeMillis() + delay;
    }

    public Runnable getRunnable() {
        return runnable;
    }

    public long getTime() {
        return time;
    }

    @Override
    public int compareTo(MyTask o) {
        return (int) (this.time - o.time);
    }
}

class MyTimer {
    private BlockingQueue<MyTask> queue = new PriorityBlockingQueue<>();	

    private Object locker = new Object();

    public MyTimer() {
        // 创建一个扫描线程.
        Thread t = new Thread(() -> {
            while (true) {
                try {
                    synchronized (locker) {
                        // 取出队首元素
                        MyTask task = queue.take();
                        // 假设当前时间是 2:30, 任务设定的时间是 2:30, 显然就要执行任务了.
                        // 假设当前时间是 2:30, 任务设定的时间是 2:29, 也是到点了, 也要执行任务.
                        long curTime = System.currentTimeMillis();
                        if (curTime >= task.getTime()) {
                            // 到点了, 改执行任务了!!
                            task.getRunnable().run();
                        } else {
                            // 还没到点
                            queue.put(task);
                            // 没到点, 就等待
                            locker.wait(task.getTime() - curTime);
                        }
                    }	
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        t.start();
    }

    public void schedule(Runnable runnable, long after) throws InterruptedException {
        synchronized (locker) {
            MyTask myTask = new MyTask(runnable, after);
            queue.put(myTask);
            locker.notify();
        }
    }

}

public class ThreadMyTimer {
    public static void main(String[] args) throws InterruptedException {
        MyTimer timer = new MyTimer();
        timer.schedule(new Runnable() {
            @Override
            public void run() {
                System.out.println("时间到1!");
            }
        }, 3000);
        timer.schedule(new Runnable() {
            @Override
            public void run() {
                System.out.println("时间到2!");
            }
        }, 4000);
        timer.schedule(new Runnable() {
            @Override
            public void run() {
                System.out.println("时间到3!");
            }
        }, 5000);
        System.out.println("开始计时");

        ArrayDeque<String> a = new ArrayDeque<>();
        a.peekLast();
    }
}

9.4 线程池

线程池是什么
虽然创建线程 / 销毁线程 的开销

想象这么一个场景:
在学校附近新开了一家快递店,老板很精明,想到一个与众不同的办法来经营。店里没有雇人,
而是每次有业务来了,就现场找一名同学过来把快递送了,然后解雇同学。这个类比我们平时来
一个任务,起一个线程进行处理的模式。
很快老板发现问题来了,每次招聘 + 解雇同学的成本还是非常高的。老板还是很善于变通的,知道了为什么大家都要雇人了,所以指定了一个指标,公司业务人员会扩张到 3 个人,但还是随着业务逐步雇人。于是再有业务来了,老板就看,如果现在公司还没 3 个人,就雇一个人去送快递,否则只是把业务放到一个本本上,等着 3 个快递人员空闲的时候去处理。这个就是我们要带出的线程池的模式。

线程池最大的好处就是减少每次启动、销毁线程的损耗

标准库中的线程池

  • 使用 Executors.newFixedThreadPool(10) 能创建出固定包含 10 个线程的线程池.
  • 返回值类型为 ExecutorService
  • 通过 ExecutorService.submit 可以注册一个任务到线程池中
ExecutorService pool = Executors.newFixedThreadPool(10);
pool.submit(new Runnable() {
    @Override
    public void run() {
        System.out.println("hello");
   }
});

Executors 创建线程池的几种方式

  • newFixedThreadPool: 创建固定线程数的线程池
  • newCachedThreadPool: 创建线程数目动态增长的线程池.
  • newSingleThreadExecutor: 创建只包含单个线程的线程池.
  • newScheduledThreadPool: 设定 延迟时间后执行命令,或者定期执行命令. 是进阶版的 Timer.

Executors 本质上是 ThreadPoolExecutor 类的封装.
ThreadPoolExecutor 提供了更多的可选参数, 可以进一步细化线程池行为的设定. (后面再介绍)

实现线程池

  • 核心操作为 submit, 将任务加入线程池中
  • 使用 Worker 类描述一个工作线程. 使用 Runnable 描述一个任务.
  • 使用一个 BlockingQueue 组织所有的任务
  • 每个 worker 线程要做的事情: 不停的从 BlockingQueue 中取任务并执行.
  • 指定一下线程池中的最大线程数 maxWorkerCount; 当当前线程数超过这个最大值时, 就不再新增线程了.
线程池的实现
class Worker extends Thread {
    private LinkedBlockingQueue<Runnable> queue = null;
    public Worker(LinkedBlockingQueue<Runnable> queue) {
        super("worker");
        this.queue = queue;
   }
    @Override
    public void run() {
        // try 必须放在 while 外头, 或者 while 里头应该影响不大
        try {
            while (!Thread.interrupted()) {
                Runnable runnable = queue.take();
                runnable.run();
           }
       } catch (InterruptedException e) {
       }
   }
}

public class MyThreadPool {
    private int maxWorkerCount = 10;
    private LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue();
    public void submit(Runnable command) {
        if (workerList.size() < maxWorkerCount) {
            // 当前 worker 数不足, 就继续创建 worker
            Worker worker = new Worker(queue);
            worker.start();
       }
        // 将任务添加到任务队列中
        queue.put(command);
   }
    public static void main(String[] args) throws InterruptedException {
        MyThreadPool myThreadPool = new MyThreadPool();
        myThreadPool.execute(new Runnable() {
            @Override
            public void run() {
                System.out.println("吃饭");
           }
       });
        Thread.sleep(1000);
   }
}
package threading;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

class MyThreadPool {
    private BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();

    public void submit(Runnable runnable) throws InterruptedException {
        queue.put(runnable);
    }

    public MyThreadPool(int m) {
        // 在构造方法中, 创建出 M 个线程. 负责完成工作.
        for (int i = 0; i < m; i++) {
            Thread t = new Thread(() -> {
                while (true) {
                    try {
                        Runnable runnable = queue.take();
                        runnable.run();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
            t.start();
        }
    }
}

public class Demo28 {
    public static void main(String[] args) throws InterruptedException {
        MyThreadPool pool = new MyThreadPool(10);
        for (int i = 0; i < 1000; i++) {
            int taskId = i;
            pool.submit(new Runnable() {
                @Override
                public void run() {
                    System.out.println("执行当前任务: " + taskId + " 当前线程: " + Thread.currentThread().getName());
                }
            });
        }
    }
}
线程池的使用
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Demo26 {
    public static void main(String[] args) {
        ExecutorService pool = Executors.newCachedThreadPool();
        pool.submit(new Runnable() {
            @Override
            public void run() {
                System.out.println("这是任务");
            }
        });
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1133085.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

IDEA 删除一次性删除所有断点

Ctrl Shift F8 &#xff08;打开“断点”对话框&#xff09; Ctrl A &#xff08;选择所有断点&#xff09; Alt Delete &#xff08;删除选定的断点&#xff09; Enter &#xff08;确认&#xff09;

数字孪生技术:工业数字化转型的引擎

数字孪生是一种将物理实体数字化为虚拟模型的技术&#xff0c;这些虚拟模型与其物理对应物相互关联。这种虚拟模型通常是在数字平台上创建的&#xff0c;它们复制了实际设备、工厂、甚至整个供应链的运作方式。这使工业企业能够实现以下益处&#xff1a; 1. 实时监测和分析 数…

(Java)中的数据类型和变量

文章目录 一、字面常量二、数据类型三、变量1.变量的概念2.语法的格式3.整型变量4.长整型变量5.短整型变量6.字节型变量 四、浮点型变量1.双精度浮点数2.单精度浮点数 五、字符型常量六、布尔型变量七、类型转换1.自动类型转换&#xff08;隐式&#xff09;2.强制类型转换(显式…

【数据结构】数组和字符串(四):特殊矩阵的压缩存储:稀疏矩阵——三元组表

文章目录 4.2.1 矩阵的数组表示4.2.2 特殊矩阵的压缩存储a. 对角矩阵的压缩存储b~c. 三角、对称矩阵的压缩存储d. 稀疏矩阵的压缩存储——三元组表结构体初始化元素设置打印矩阵主函数输出结果代码整合 4.2.1 矩阵的数组表示 【数据结构】数组和字符串&#xff08;一&#xff…

一篇教你学会Ansible

前言 Ansible首次发布于2012年&#xff0c;是一款基于Python开发的自动化运维工具&#xff0c;核心是通过ssh将命令发送执行&#xff0c;它可以帮助管理员在多服务器上进行配置管理和部署。它的工作形式依托模块实现&#xff0c;自己没有批量部署的能力。真正具备批量部署的是…

生产管理中,如何做好生产进度控制?

在生产管理中&#xff0c;我们常常会遇到以下问题&#xff1a; 由于计划不清或者无计划&#xff0c;导致物料进度无法保障&#xff0c;经常出现停工待料的情况。 停工待料导致了生产时间不足&#xff0c;为了赶交货期&#xff0c;只能加班加点。 生产计划并未发挥实际作用&am…

14、Python -- 列表推导式(for表达式)与控制循环

目录 for表达式&#xff08;列表推导式&#xff09;列表推导式的说明使用break跳出循环使用continue忽略本次循环使用return结束函数 列表推导式 使用break跳出循环 使用continue忽略本次循环 for表达式&#xff08;列表推导式&#xff09; for表达式用于利用其他区间、元组、…

哪些车企是前向雷达大客户?国产突围/4D升级进展如何

可穿透尘雾、雨雪、不受恶劣天气影响&#xff0c;唯一能够“全天候全天时”工作&#xff0c;同时在中远距离的物体识别能力&#xff0c;毫米波雷达成为二十几年前豪华车ACC功能的必备传感器。 此后&#xff0c;随着视觉感知技术的不断成熟&#xff0c;尤其是Mobileye、特斯拉等…

强化学习代码实战(3) --- 寻找真我

前言 本文内容来自于南京大学郭宪老师在博文视点学院录制的视频&#xff0c;课程仅9元地址&#xff0c;配套书籍为深入浅出强化学习 编程实战 郭宪地址。 正文 我们发现多臂赌博机执行一个动作之后&#xff0c;无论是选择摇臂1&#xff0c;摇臂2&#xff0c;还是摇臂3之后都会返…

MySQL Join 类型

文章目录 1 Join 类型有哪些2 Inner Join3 Left Join4 Right Join5 Full Join 1 Join 类型有哪些 SQL Join 类型的区别 Inner Join: 左,右表都有的数据Left Join: 左表返回所有的行, 右表没有的补充为 NULLRight Loin: 右表返回所有的行, 左表没有的补充为 NULLFull Outer J…

【会员管理系统】篇二之项目搭建、初始化、安装第三方库

一、项目搭建 1.全局安装vue-cli npm install -g vue/cli查看版本信息 vue -V 2.创建项目 vue create 项目名称 回车 回车 剩余选择如下 之后等待项目创建 最后npm run serve 二、初始化配置 1.更改标题 打开public下的index&#xff0c;将title标签里的改成想要设置的…

【模式识别】贝叶斯决策模型理论总结

贝叶斯决策模型理论 一、引言二、贝叶斯定理三、先验概率和后验概率3.1 先验概率3.2 后验概率 四、最大后验准则五、最小错误率六、最小化风险七、最小最大决策八、贝叶斯决策建模参考 一、引言 在概率计算中&#xff0c;我们常常遇到这样的一类问题&#xff0c;某事件的发生可…

【Redis】redis 十大数据类型 概述

个人简介&#xff1a;Java领域新星创作者&#xff1b;阿里云技术博主、星级博主、专家博主&#xff1b;正在Java学习的路上摸爬滚打&#xff0c;记录学习的过程~ 个人主页&#xff1a;.29.的博客 学习社区&#xff1a;进去逛一逛~ redis十大数据类型 一、redis字符串&#xff0…

【Elasticsearch】es脚本编程使用详解

目录 一、es脚本语言介绍 1.1 什么是es脚本 1.2 es脚本支持的语言 1.3 es脚本语言特点 1.4 es脚本使用场景 二、环境准备 2.1 docker搭建es过程 2.1.1 拉取es镜像 2.1.2 启动容器 2.1.3 配置es参数 2.1.4 重启es容器并访问 2.2 docker搭建kibana过程 2.2.1 拉取ki…

crossover23.6闪亮登场发布啦,2023最新功能解析

CrossOver刚刚更新了23.6版本&#xff0c;新增了多款游戏的支持&#xff0c;快来看看你想玩的游戏在不在里面吧。点击这里立即下载最新版CrossOver。 软件介绍 CrossOver 23.6 让Mac可以运行Windows程序的工具 已通过小编安装运行测试 100%可以使用。 CrossOver for Mac 23.…

手把手教你玩转单目摄像头(OpenCv+Python)

目录 ​编辑 一&#xff0c;单目应用前景 二&#xff0c;打开摄像头 三&#xff0c;设置分辨率 四&#xff0c;摄像头拍照 五&#xff0c;录制视频 六&#xff0c;单目结合OpenCV的实际应用 一&#xff0c;单目应用前景 单目视觉&#xff08;monocular vision&#xff0…

C++入门03——程序流程结构

C/C支持最基本的三种程序运行结构&#xff1a;顺序结构、选择结构、循环结构 顺序结构&#xff1a;程序按顺序执行&#xff0c;不发生跳转 选择结构&#xff1a;依据条件是否满足&#xff0c;有选择的执行相应功能 循环结构&#xff1a;依据条件是否满足&#xff0c;循环多次…

Leetcode刷题详解——寻找旋转排序数组中的最小值

1. 题目链接&#xff1a;153. 寻找旋转排序数组中的最小值 2. 题目描述&#xff1a; 已知一个长度为 n 的数组&#xff0c;预先按照升序排列&#xff0c;经由 1 到 n 次 旋转 后&#xff0c;得到输入数组。例如&#xff0c;原数组 nums [0,1,2,4,5,6,7] 在变化后可能得到&…

通俗介绍:什么是 Redis ?

刚接触 Redis 的伙伴们可能会因为不熟悉而感到困惑。本文简述 Redis 是什么、有哪些作用的问题&#xff0c;是一篇短浅而入门级别的文章。 Redis官网&#xff1a;Redis 打开 Redis 官网可以看到&#xff0c;官方对 Redis 的介绍是这样的&#xff1a;The open source, in-memo…