Java线程池

news2024/12/28 2:10:59

自定义线程池

1. 简介

1.1 引入原因

1. 一个任务过来,一个线程去做。如果每次过来都创建新线程,性能低且比较耗费内存
2. 线程数多于cpu核心,线程切换,要保存原来线程的状态,运行现在的线程,势必会更加耗费资源
   线程数少于cpu核心,不能很好的利用多线程的性能
   
3. 充分利用已有线程,去处理原来的任务

1.2. 线程池组件

1. 消费者(线程池):                 保存一定数量线程来处理任务
2. 生产者:                        客户端源源不断产生的新任务
3. 阻塞队列(blocking queue):      平衡消费者和生产者之间,用来保存任务 的一个等待队列

- 生产任务速度较快,多余的任务要等
- 生产任务速度慢,那么线程池中存活的线程等

image-20221012105847884

2. 自定义线程池

2.1 不带超时

阻塞队列

package com.erick.multithread.d6;

import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class BlockingQueue<T> {

    /*阻塞队列的上限*/
    private int capacity;
    /*保存具体任务: 也就是Runnable的池子*/
    private Deque<T> blockingQueue = new ArrayDeque<>();

    /*锁:从池子中拿或取时需要*/
    private ReentrantLock lock = new ReentrantLock(true);

    /*池子满时,生产者线程等待*/
    private Condition producerRoom = lock.newCondition();

    /*池子空时,消费者线程等待*/
    private Condition consumerRoom = lock.newCondition();

    public BlockingQueue(int capacity) {
        this.capacity = capacity;
    }

    public T getTask() {
        try {
            lock.lock();

            while (blockingQueue.isEmpty()) {
                System.out.println("阻塞队列为空,消费者等待");
                try {
                    consumerRoom.await();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }

            T task = blockingQueue.removeLast();
            producerRoom.signal();
            return task;
        } finally {
            lock.unlock();
        }
    }

    /*生产任务:一直等待*/
    public void addTask(T t) {
        try {
            lock.lock();

            while (blockingQueue.size() == capacity) {
                System.out.println("阻塞队列已满,生产者等待");
                try {
                    producerRoom.await();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
            blockingQueue.addFirst(t);
            consumerRoom.signal();
        } finally {
            lock.unlock();
        }
    }

    /*获取队列大小*/
    public int getSize() {
        try {
            lock.lock();
            return blockingQueue.size();
        } finally {
            lock.unlock();
        }
    }
}

线程池

package com.erick.multithread.d6;

import java.util.HashSet;
import java.util.Set;

/*自定义线程池*/
public class ErickThreadPool<T> {
    /*阻塞队列*/
    private BlockingQueue<T> blockingQueue;

    /*装工作线程的池子*/
    private final Set<Worker> pool = new HashSet<>();
    /*核心线程数*/
    private int coreThreadSize;

    public ErickThreadPool(int blockQueueCapacity, int coreThreadSize) {
        blockingQueue = new BlockingQueue<>(blockQueueCapacity);
        this.coreThreadSize = coreThreadSize;
    }

    /**
     * 任务具体执行流程: 外界接口的任务(Thread) 来了
     * 1. 当前池子没满,则新建一个线程并加入到池子中
     * 2. 如果池子已经满了,当前任务进入到阻塞队列中等待
     */
    public synchronized void executeTask(Runnable task) {
        if (pool.size() < coreThreadSize) {
            Worker worker = new Worker(task);
            pool.add(worker);
            System.out.println("创建新的线程来执行任务");
            worker.start();
        } else {
            System.out.println("线程池已满,生产者暂时等待");
            blockingQueue.addTask((T) task);
        }
    }

    /*线程池中具体干活的线程*/
    class Worker extends Thread {
        private Runnable task;

        public Worker(Runnable task) {
            this.task = task;
        }

        /*阻塞获取,一直等待*/
        @Override
        public void run() {
          // 获取任务的时候,是一直等待,死等,因此线程一直不会结束
            while (task != null || (task = (Runnable) blockingQueue.getTask()) != null) {
                try {
                    task.run();
                } catch (Exception e) {
                    System.out.println("线程执行任务出错");
                } finally {
                    task = null;
                }
            }
            /*任务执行完毕后,将该线程从池子中移除*/
            synchronized (pool) {
                System.out.println("线程销毁");
                pool.remove(this);
            }
        }
    }
}

测试代码

package com.erick.multithread.d6;

import java.util.Date;

public class Test {
    public static void main(String[] args) {
        ErickThreadPool pool = new ErickThreadPool(10, 3);

        for (int i = 0; i < 5; i++) {
            pool.executeTask(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName() + ":" + new Date());
                }
            });
        }
    }
}

2.2 超时等待

  • 上面线程池中的worker线程获取blockingqueue的时候,即使阻塞队列中没有任务,也会一直死等,并不会结束

阻塞队列

package com.erick.multithread.d6;

import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class BlockingQueue<T> {

    /*阻塞队列的上限*/
    private int capacity;
    /*保存具体任务: 也就是Runnable的池子*/
    private Deque<T> blockingQueue = new ArrayDeque<>();

    /*锁:从池子中拿或取时需要*/
    private ReentrantLock lock = new ReentrantLock(true);

    /*池子满时,生产者线程等待*/
    private Condition producerRoom = lock.newCondition();

    /*池子空时,消费者线程等待*/
    private Condition consumerRoom = lock.newCondition();

    public BlockingQueue(int capacity) {
        this.capacity = capacity;
    }

    /*获取任务:一直等待*/
    public T getTask() {
        try {
            lock.lock();

            while (blockingQueue.isEmpty()) {
                System.out.println("阻塞队列为空,消费者等待");
                try {
                    consumerRoom.await();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }

            T task = blockingQueue.removeLast();
            producerRoom.signal();
            return task;
        } finally {
            lock.unlock();
        }
    }

    /*获取任务: 超时不侯*/
    public T getTask(long timeout, TimeUnit timeUnit) {
        try {
            lock.lock();
            long nanos = timeUnit.toNanos(timeout);

            while (blockingQueue.isEmpty()) {
                if (nanos < 0) {
                    return null;
                }

                try {
                    nanos = consumerRoom.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
            T task = blockingQueue.removeLast();
            producerRoom.signal();
            return task;
        } finally {
            lock.unlock();
        }
    }


    /*生产任务:一直等待*/
    public void addTask(T t) {
        try {
            lock.lock();

            while (blockingQueue.size() == capacity) {
                System.out.println("阻塞队列已满,生产者等待");
                try {
                    producerRoom.await();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
            blockingQueue.addFirst(t);
            consumerRoom.signal();
        } finally {
            lock.unlock();
        }
    }

    /*获取队列大小*/
    public int getSize() {
        try {
            lock.lock();
            return blockingQueue.size();
        } finally {
            lock.unlock();
        }
    }
}

线程池

package com.erick.multithread.d6;

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;

/*自定义线程池*/
public class ErickThreadPool<T> {
    /*阻塞队列*/
    private BlockingQueue<T> blockingQueue;

    /*装工作线程的池子*/
    private final Set<Worker> pool = new HashSet<>();
    /*核心线程数*/
    private int coreThreadSize;

    /*线程池子中的线程,等待获取的任务的时候,如果超时,则线程kill掉*/
    private long timeout;
    private TimeUnit timeUnit;

    public ErickThreadPool(int blockQueueCapacity, int coreThreadSize) {
        blockingQueue = new BlockingQueue<>(blockQueueCapacity);
        this.coreThreadSize = coreThreadSize;
    }

    public ErickThreadPool(int blockQueueCapacity, int coreThreadSize, long timeout, TimeUnit timeUnit) {
        this(blockQueueCapacity, coreThreadSize);
        this.timeUnit = timeUnit;
        this.timeout = timeout;
    }

    /**
     * 任务具体执行流程: 外界接口的任务(Thread) 来了
     * 1. 当前池子没满,则新建一个线程并加入到池子中
     * 2. 如果池子已经满了,当前任务进入到阻塞队列中等待
     */
    public synchronized void executeTask(Runnable task) {
        if (pool.size() < coreThreadSize) {
            Worker worker = new Worker(task);
            pool.add(worker);
            System.out.println("创建新的线程来执行任务");
            worker.start();
        } else {
            System.out.println("线程池已满,生产者暂时等待");
            blockingQueue.addTask((T) task);
        }
    }

    /*线程池中具体干活的线程*/
    class Worker extends Thread {
        private Runnable task;

        public Worker(Runnable task) {
            this.task = task;
        }

        /*阻塞获取,一直等待*/
        @Override
        public void run() {
            while (task != null || (task = (Runnable) blockingQueue.getTask(timeout, timeUnit)) != null) {
                try {
                    task.run();
                } catch (Exception e) {
                    System.out.println("线程执行任务出错");
                } finally {
                    task = null;
                }
            }
            /*任务执行完毕后,将该线程从池子中移除*/
            synchronized (pool) {
                System.out.println("线程销毁");
                pool.remove(this);
            }
        }
    }
}

测试代码

package com.erick.multithread.d6;

import java.util.Date;
import java.util.concurrent.TimeUnit;

public class Test {
    public static void main(String[] args) {
        ErickThreadPool pool = new ErickThreadPool(10, 3, 5, TimeUnit.SECONDS);

        for (int i = 0; i < 5; i++) {
            pool.executeTask(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName() + ":" + new Date());
                }
            });
        }
    }
}

2.3 生产者-超时设置

  • 当阻塞队列中已满,并且核心线程都在工作的时候,生产者线程提供的任务就会进行等待
  • 让任务生产者自己决定该如何执行
# 拒绝策略
- 死等
- 带超时等待
- 让调用者放弃执行任务
- 让调用者抛出异常
- 让调用者自己执行任务

阻塞队列

package com.erick.multithread.d6;

import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class BlockingQueue<T> {

    /*阻塞队列的上限*/
    private int capacity;
    /*保存具体任务: 也就是Runnable的池子*/
    private Deque<T> blockingQueue = new ArrayDeque<>();

    /*锁:从池子中拿或取时需要*/
    private ReentrantLock lock = new ReentrantLock(true);

    /*池子满时,生产者线程等待*/
    private Condition producerRoom = lock.newCondition();

    /*池子空时,消费者线程等待*/
    private Condition consumerRoom = lock.newCondition();

    public BlockingQueue(int capacity) {
        this.capacity = capacity;
    }

    /*获取任务:一直等待*/
    public T getTask() {
        try {
            lock.lock();

            while (blockingQueue.isEmpty()) {
                System.out.println("阻塞队列为空,消费者等待");
                try {
                    consumerRoom.await();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }

            T task = blockingQueue.removeLast();
            producerRoom.signal();
            return task;
        } finally {
            lock.unlock();
        }
    }

    /*获取任务: 超时不侯*/
    public T getTask(long timeout, TimeUnit timeUnit) {
        try {
            lock.lock();
            long nanos = timeUnit.toNanos(timeout);

            while (blockingQueue.isEmpty()) {
                if (nanos < 0) {
                    return null;
                }

                try {
                    nanos = consumerRoom.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
            T task = blockingQueue.removeLast();
            producerRoom.signal();
            return task;
        } finally {
            lock.unlock();
        }
    }


    /*生产任务:一直等待*/
    public void addTask(T t) {
        try {
            lock.lock();

            while (blockingQueue.size() == capacity) {
                System.out.println("阻塞队列已满,生产者等待");
                try {
                    producerRoom.await();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
            blockingQueue.addFirst(t);
            consumerRoom.signal();
        } finally {
            lock.unlock();
        }
    }

    /*带超时的添加*/
    public boolean addTask(T t, long timeout, TimeUnit timeUnit) {
        try {
            lock.lock();
            long nanos = timeUnit.toNanos(timeout);

            while (blockingQueue.size() == capacity) {
                if (nanos < 0) {
                    return false;
                }
                try {
                    nanos = producerRoom.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }

            blockingQueue.addFirst(t);
            consumerRoom.signal();
            return true;
        } finally {
            lock.unlock();
        }
    }

    public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
        try {
            lock.lock();
            if (blockingQueue.size() == capacity) {
                /*具体操作的权利:下放给对应的consumer*/
                rejectPolicy.reject(this, (Runnable) task);
                return;
            }
            System.out.println("加入阻塞队列");
            blockingQueue.addFirst(task);
            consumerRoom.signal();
        } finally {
            lock.unlock();
        }
    }

    /*获取队列大小*/
    public int getSize() {
        try {
            lock.lock();
            return blockingQueue.size();
        } finally {
            lock.unlock();
        }
    }
}

interface RejectPolicy<T> {
    void reject(BlockingQueue<T> blockingQueue, Runnable task);
}

线程池

package com.erick.multithread.d6;

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;

/*自定义线程池*/
public class ErickThreadPool<T> {
    /*阻塞队列*/
    private BlockingQueue<T> blockingQueue;

    /*装工作线程的池子*/
    private final Set<Worker> pool = new HashSet<>();
    /*核心线程数*/
    private int coreThreadSize;

    /*线程池子中的线程,等待获取的任务的时候,如果超时,则线程kill掉*/
    private long timeout;
    private TimeUnit timeUnit;

    /*拒绝策略*/
    private RejectPolicy<T> rejectPolicy;

    public ErickThreadPool(int blockQueueCapacity, int coreThreadSize) {
        blockingQueue = new BlockingQueue<>(blockQueueCapacity);
        this.coreThreadSize = coreThreadSize;
    }

    public ErickThreadPool(int blockQueueCapacity, int coreThreadSize, long timeout, TimeUnit timeUnit, RejectPolicy<T> rejectPolicy) {
        this(blockQueueCapacity, coreThreadSize);
        this.timeUnit = timeUnit;
        this.timeout = timeout;
        this.rejectPolicy = rejectPolicy;
    }

    /**
     * 任务具体执行流程: 外界接口的任务(Thread) 来了
     * 1. 当前池子没满,则新建一个线程并加入到池子中
     * 2. 如果池子已经满了,当前任务进入到阻塞队列中等待
     */
    public synchronized void executeTask(Runnable task) {
        if (pool.size() < coreThreadSize) {
            Worker worker = new Worker(task);
            pool.add(worker);
            System.out.println("创建新的线程来执行任务");
            worker.start();
        } else {
            System.out.println("线程池已满,生产者???");
            blockingQueue.tryPut(rejectPolicy, (T) task);
        }
    }

    /*线程池中具体干活的线程*/
    class Worker extends Thread {
        private Runnable task;

        public Worker(Runnable task) {
            this.task = task;
        }

        /*阻塞获取,一直等待*/
        @Override
        public void run() {
            while (task != null || (task = (Runnable) blockingQueue.getTask(timeout, timeUnit)) != null) {
                try {
                    task.run();
                } catch (Exception e) {
                    System.out.println("线程执行任务出错");
                } finally {
                    task = null;
                }
            }
            /*任务执行完毕后,将该线程从池子中移除*/
            synchronized (pool) {
                System.out.println("线程销毁");
                pool.remove(this);
            }
        }
    }
}

测试代码

package com.erick.multithread.d6;

import java.util.Date;
import java.util.concurrent.TimeUnit;

public class Test {
    public static void main(String[] args) {
        ErickThreadPool pool = new ErickThreadPool(1, 2, 5, TimeUnit.SECONDS, new ProducerException());

        for (int i = 0; i < 10; i++) {
            pool.executeTask(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName() + ":" + new Date());
                    try {
                        TimeUnit.SECONDS.sleep(5);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
            });
        }
    }
}

/*死等的逻辑*/
class StillWait implements RejectPolicy {
    @Override
    public void reject(BlockingQueue blockingQueue, Runnable task) {

        blockingQueue.addTask(task);
    }
}

/*超时等待的逻辑*/
class WaitWithTimeOut implements RejectPolicy {

    @Override
    public void reject(BlockingQueue blockingQueue, Runnable task) {
        blockingQueue.addTask(task, 1, TimeUnit.SECONDS);
    }
}

/*调用者放弃任务*/
class ProducerGiveUp implements RejectPolicy {

    @Override
    public void reject(BlockingQueue blockingQueue, Runnable task) {
        System.out.println("调用者抛弃任务");
    }
}

class ProducerExecute implements RejectPolicy {

    @Override
    public void reject(BlockingQueue blockingQueue, Runnable task) {
        System.out.println("调用者自己执行任务");
        new Thread(task).start();
    }
}

class ProducerException implements RejectPolicy {

    /*后续其他任务就不会进来执行*/
    @Override
    public void reject(BlockingQueue blockingQueue, Runnable task) {
        throw new RuntimeException("核心线程已在工作,阻塞队列已满");
    }
}

JDK线程池

1. 类图

image-20221018074250288

2. 线程状态

  • ThreadPoolExecutor 使用int的高3位来表示线程池状态,低29位表示线程数量

image-20221018074517365

3. ThreadPoolExecutor

3.1 构造方法

int corePoolSize:                     // 核心线程数
int maximumPoolSize:                 // 最大线程数
long keepAliveTime:                  // 救急线程数执行任务完后存活时间
TimeUnit unit:                       // 救急线程数执行任务完后存活时间
BlockingQueue<Runnable> workQueue:   // 阻塞队列
ThreadFactory threadFactory:         // 线程生产工厂,为线程起名字
RejectedExecutionHandler handler:    // 拒绝策略 

 public ThreadPoolExecutor(int corePoolSize,
                           int maximumPoolSize,
                           long keepAliveTime,
                           TimeUnit unit,
                           BlockingQueue<Runnable> workQueue,
                           ThreadFactory threadFactory,
                           RejectedExecutionHandler handler)

3.2 核心线程和救急线程

1. 核心线程: 执行完任务后,会继续保留在线程池中

2. 救急线程:如果阻塞队列已满,并且没有空余的核心线程。那么会创建救急线程来执行任务
  2.1 任务执行完毕后,这个线程就会被销毁(临时工)
  2.2 必须是有界阻塞,如果是无界队列,则不需要创建救急线程

3. 拒绝策略: 有界队列,核心线程满负荷,阻塞队列已满,无空余救急线程,才会执行拒绝

3.3 JDK拒绝策略

  • 如果线程达到最大线程数,救急线程也满负荷,且有界队列也满了,JDK 提供了4种拒绝策略
AbortPolicy:           调用者抛出RejectedExecutionException,  默认策略
CallerRunsPolicy:      调用者运行任务
DiscardPolicy:         放弃本次任务
DiscardOldestPolicy:   放弃阻塞队列中最早的任务,本任务取而代之

# 第三方框架的技术实现
- Dubbo: 在抛出异常之前,记录日志,并dump线程栈信息,方便定位问题
- Netty: 创建一个新的线程来执行任务
- ActiveMQ: 带超时等待(60s), 尝试放入阻塞队列

image-20221018075430425

4. Executors类工厂方法

  • 默认的构造方法来创建线程池,参数过多,JDK提供了工厂方法,来创建线程池

4.1 固定大小

  • 核心线程数 = 最大线程数,救急线程数为0
  • 阻塞队列:无界,可以存放任意数量的任务
# 应用场景
任务量已知,但是线程执行时间较长
执行任务后,线程并不会结束
public static ExecutorService newFixedThreadPool(int nThreads) {
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
package com.erick.multithread.d7;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

public class Demo01 {
    public static void main(String[] args) {
        ExecutorService pool = Executors.newFixedThreadPool(2, new ThreadFactory() {
            private AtomicInteger num = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                // 给线程起一个名字
                return new Thread(r, "erick-pool" + num.getAndIncrement());
            }
        });

        pool.execute(() -> System.out.println(Thread.currentThread().getName() + " running"));
        pool.execute(() -> System.out.println(Thread.currentThread().getName() + " running"));
    }
}

4.2 带缓冲

  • 核心线程数为0, 最大线程数为Integer的无限大
  • 全部是救急线程,等待时间是60s,60s后就会消亡
  • SynchronousQueue: 没有容量,没有线程来取的时候是放不进去的
  • 整个线程池数会随着任务数目增长,1分钟后没有其他活动会消亡
# 应用场景
1. 时间较短的线程
2. 数量大,任务执行时间长,会造成  OutOfMmeory问题
 public static ExecutorService newCachedThreadPool() {
     return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                   60L, TimeUnit.SECONDS,
                                   new SynchronousQueue<Runnable>());
 }

4.3. 单线程

  • 线程池大小始终为1个,不能改变线程数
  • 相比自定义一个线程来执行,线程池可以保证前面任务的失败,不会影响到后续任务
# 1. 和自定义线程的区别
自定义线程:  执行多个任务时,一个出错,后续都能不能执行了
单线程池:    一个任务失败后,会结束出错线程。重新new一个线程来执行下面的任务

# 2. 执行顺序
单线程池: 保证所有任务都是串行

# 3. 和newFixedThreadPool的区别
newFixedThreadPool:          初始化后,还可以修改线程大小
newSingleThreadExecutor:     不可以修改
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
package com.nike.erick.d07;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Demo01 {
    public static void main(String[] args) {
        method03();
    }

    private static void method01() {
        ExecutorService pool = Executors.newFixedThreadPool(2);
        /*pool-1-thread-1   pool-1-thread-2  pool-1-thread-1*/
        pool.execute(() -> System.out.println(Thread.currentThread().getName() + " working"));
        pool.execute(() -> System.out.println(Thread.currentThread().getName() + " working"));
        pool.execute(() -> System.out.println(Thread.currentThread().getName() + " working"));
    }

    private static void method02() {
        ExecutorService pool = Executors.newCachedThreadPool();
        /*pool-1-thread-1  pool-1-thread-2  pool-1-thread-3*/
        pool.execute(() -> System.out.println(Thread.currentThread().getName() + " working"));
        pool.execute(() -> System.out.println(Thread.currentThread().getName() + " working"));
        pool.execute(() -> System.out.println(Thread.currentThread().getName() + " working"));
    }

    private static void method03() {
        ExecutorService pool = Executors.newSingleThreadExecutor();
        /*第一个任务执行失败后,后续任务会继续执行*/
        pool.execute(() -> {
            int i = 1 / 0;
            System.out.println(Thread.currentThread().getName() + " running");
        });

        pool.execute(() -> {
            System.out.println(Thread.currentThread().getName() + " running");
        });

        pool.execute(() -> {
            System.out.println(Thread.currentThread().getName() + " running");
        });
    }
}

5. 提交任务

5.1. execute

void execute(Runnable command)

5.2. submit

Future<?> submit(Runnable task);

// 可以从 Future 对象中获取一些执行任务的最终结果
<T> Future<T> submit(Runnable task, T result);

<T> Future<T> submit(Callable<T> task);

3. invokeAll

  • 执行集合中的所有的任务
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
    throws InterruptedException;

4. invokeAny

  • 集合中之要有一个任务执行完毕,其他任务就不再执行
package com.nike.erick.d07;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class Demo02 {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService pool = Executors.newFixedThreadPool(10);
        method05(pool);
    }

    /* void execute(Runnable command) */
    public static void method01(ExecutorService pool) {
        pool.execute(() -> System.out.println(Thread.currentThread().getName() + " running"));
    }

    /*  <T> Future<T> submit(Runnable task, T result)
     * Future<?> submit(Runnable task) */
    public static void method02(ExecutorService pool) throws InterruptedException {
        Future<?> result = pool.submit(new Thread(() -> System.out.println(Thread.currentThread().getName() + " running")));
        TimeUnit.SECONDS.sleep(1);
        System.out.println(result.isDone());
        System.out.println(result.isCancelled());
    }

    /*
     * <T> Future<T> submit(Callable<T> task)*/
    public static void method03(ExecutorService pool) throws InterruptedException, ExecutionException {
        Future<String> submit = pool.submit(() -> "success");
        TimeUnit.SECONDS.sleep(1);
        System.out.println(submit.isDone());
        System.out.println(submit.isCancelled());
        System.out.println(submit.get()); // 返回结果是success
    }

    /* <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;*/
    public static void method04(ExecutorService pool) throws InterruptedException {
        Collection tasks = new ArrayList();
        for (int i = 0; i < 10; i++) {
            int round = i;
            tasks.add((Callable) () -> {
                System.out.println(Thread.currentThread().getName() + " running");
                return "success:" + round;
            });
        }
        List results = pool.invokeAll(tasks);

        TimeUnit.SECONDS.sleep(1);
        System.out.println(results);
    }

    /*
     *     <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
     * */
    public static void method05(ExecutorService pool) throws InterruptedException, ExecutionException {
        ExecutorService service = Executors.newFixedThreadPool(1);
        Collection<Callable<String>> tasks = new ArrayList<>();

        tasks.add(() -> {
            System.out.println("first task");
            TimeUnit.SECONDS.sleep(1);
            return "success";
        });

        tasks.add(() -> {
            System.out.println("second task");
            TimeUnit.SECONDS.sleep(2);
            return "success";
        });


        tasks.add(() -> {
            System.out.println("third task");
            TimeUnit.SECONDS.sleep(3);
            return "success";
        });
        // 任何一个任务执行完后,就会返回结果
        String result = pool.invokeAny(tasks);
        System.out.println(result);
    }
}

6. 关闭线程池

6.1 shutdown

  • 将线程池的状态改变为SHUTDOWN状态
  • 不会接受新任务,已经提交的任务不会停止
  • 不会阻塞调用线程的执行
void shutdown();
package com.dreamer.multithread.day09;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class Demo04 {
    public static void main(String[] args) {
        ExecutorService pool = Executors.newFixedThreadPool(1);
        pool.submit(() -> {
            try {
                TimeUnit.SECONDS.sleep(2);
                System.out.println(Thread.currentThread().getName() + " first running");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        pool.submit(() -> {
            try {
                TimeUnit.SECONDS.sleep(2);
                System.out.println(Thread.currentThread().getName() + " second running");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        pool.submit(() -> {
            try {
                TimeUnit.SECONDS.sleep(2);
                System.out.println(Thread.currentThread().getName() + " third running");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        // 不会阻塞主线程的执行
        pool.shutdown();
        System.out.println("main thread ending");

    }
}

6.2. shutdownNow

  • 不会接受新任务
  • 没执行的任务会打断
  • 将等待队列中的任务返回
List<Runnable> shutdownNow();
package com.dreamer.multithread.day09;

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class Demo04 {
    public static void main(String[] args) {
        ExecutorService pool = Executors.newFixedThreadPool(1);
        pool.submit(() -> {
            try {
                TimeUnit.SECONDS.sleep(2);
                System.out.println(Thread.currentThread().getName() + " first running");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        pool.submit(() -> {
            try {
                TimeUnit.SECONDS.sleep(2);
                System.out.println(Thread.currentThread().getName() + " second running");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        pool.submit(() -> {
            try {
                TimeUnit.SECONDS.sleep(2);
                System.out.println(Thread.currentThread().getName() + " third running");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        // 不会阻塞主线程的执行
        List<Runnable> leftOver = pool.shutdownNow();
        System.out.println(leftOver.size()); // 2
        System.out.println("main thread ending"); 
    }
}

线程池拓展

1. 异步模式之工作线程

1.1 Worker Thread

  • 让有限的工作线程来轮流异步处理无限多的任务
  • 分类:不同的任务类型应该使用不同的线程池

1.2 饥饿现象

  • 固定大小线程池会有饥饿现象
- 两个工人是同一个线程池中的两个线程, 为客人点餐和后厨做菜,这是两个阶段的工作
- 客人点餐:必须先点餐,等菜做好,上菜,在此期间,处理点餐的工人必须等待
- A工人处理了点餐任务,B工人把菜做好,然后上菜,配合正常
- 同时来了两个客人,A和B工人都去处理点餐了,没人做饭了,出现线程数不足导致的资源饥饿

正常

package com.erick.multithread.d7;

import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.*;

public class Demo02 {
    private static List<String> MENU = Arrays.asList("宫保鸡丁", "地三鲜", "辣子鸡丁", "红烧肉");

    private static Random random = new Random();

    private static String cooking() {
        return MENU.get(random.nextInt(MENU.size()));
    }

    public static void main(String[] args) {
        ExecutorService pool = Executors.newFixedThreadPool(2);
        pool.execute(new Runnable() {
            @Override
            public void run() {
                System.out.println("开始处理点餐");
                Future<String> cook = pool.submit(new Callable<String>() {
                    @Override
                    public String call() throws Exception {
                        System.out.println("开始做菜");
                        return cooking();
                    }
                });

                try {
                    String result = cook.get();
                    System.out.println("上菜:" + result);
                } catch (InterruptedException | ExecutionException e) {
                    throw new RuntimeException(e);
                }
            }
        });
    }
}

线程池饥饿

package com.erick.multithread.d7;

import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.*;

public class Demo02 {
    private static List<String> MENU = Arrays.asList("宫保鸡丁", "地三鲜", "辣子鸡丁", "红烧肉");

    private static Random random = new Random();

    private static String cooking() {
        return MENU.get(random.nextInt(MENU.size()));
    }

    public static void main(String[] args) {
        ExecutorService pool = Executors.newFixedThreadPool(2);
        pool.execute(new Runnable() {
            @Override
            public void run() {
                System.out.println("开始处理点餐");
                Future<String> cook = pool.submit(new Callable<String>() {
                    @Override
                    public String call() throws Exception {
                        System.out.println("开始做菜");
                        return cooking();
                    }
                });

                try {
                    String result = cook.get();
                    System.out.println("上菜:" + result);
                } catch (InterruptedException | ExecutionException e) {
                    throw new RuntimeException(e);
                }
            }
        });

        pool.execute(new Runnable() {
            @Override
            public void run() {
                System.out.println("开始处理点餐");
                Future<String> cook = pool.submit(new Callable<String>() {
                    @Override
                    public String call() throws Exception {
                        System.out.println("开始做菜");
                        return cooking();
                    }
                });

                try {
                    String result = cook.get();
                    System.out.println("上菜:" + result);
                } catch (InterruptedException | ExecutionException e) {
                    throw new RuntimeException(e);
                }
            }
        });
    }
}

解决方法

  • 最简单的方法: 增加线程池的线程数量,但是不能从根本解决问题
  • 解决方法:不同的任务类型,使用不同的线程池

2. 线程数量

  • 过小,导致cpu资源不能充分利用,浪费性能
  • 过大,线程上下文切换浪费性能,每个线程也要占用内存导致占用内存过多

2.1 CPU密集型

  • 如果线程的任务主要是和cpu资源打交道,比如大数据运算,称为CPU密集型
  • 线程数量: 核心数+1
  • +1: 保证某线程由于某些原因(操作系统方面)导致暂停时,额外线程可以启动,不浪费CPU资源

2.2. IO密集型

  • IO操作,RPC调用,数据库访问时,CPU是空闲的,称为IO密集型
  • 更加常见: IO操作,远程RPC调用,数据库操作
  • 线程数 = 核数 * 期望cpu利用率 * (CPU计算时间 + CPU等待时间) / CPU 计算时间

image-20221018104629282

3. 调度功能

3.1 延时执行

  • 如果希望线程延时执行任务
package com.dreamer.multithread.day09;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class Demo05 {
    public static void main(String[] args) {
        // 两个线程,分别可以延时执行不同的任务
        ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);

        // 延时1s后执行
        pool.schedule(new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName() + " first running");
            }
        }, 1, TimeUnit.SECONDS);

        // 延时5s后执行
        pool.schedule(new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName() + " second running");
            }
        }, 5, TimeUnit.SECONDS);
    }
}
package com.dreamer.multithread.day09;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class Demo06 {
    public static void main(String[] args) {
        // 如果是单个线程,则延时的任务是串行执行的
        ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);

        // 如果一个线程出错,则会再次创建一个线程来执行任务
        pool.schedule(new Runnable() {
            @Override
            public void run() {
                int i = 1 / 0;
                System.out.println(Thread.currentThread().getName() + " first running");
            }
        }, 1, TimeUnit.SECONDS);

        pool.schedule(new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName() + " second running");
            }
        }, 2, TimeUnit.SECONDS);
    }
}

3.2 定时执行

# scheduleAtFixedRate
- 如果任务的执行时间大于时间间隔,就会紧接着立刻执行

# scheduleWithFixedDelay
- 上一个任务执行完毕后,再延迟一定的时间才会执行
package com.dreamer.multithread.day09;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class Demo07 {
    public static void main(String[] args) {
        ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);

        // 定时执行任务
        pool.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                try {
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("task is running");
            }
        }, 3, 2, TimeUnit.SECONDS);
        //  初始延时,   任务间隔时间,    任务间隔时间单位
    }
}

4. 异常处理

4.1 不处理异常

  • 任务执行过程中,业务中的异常并不会抛出
package com.dreamer.multithread.day09;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Demo08 {
    public static void main(String[] args) {
        ExecutorService pool = Executors.newFixedThreadPool(1);
        pool.submit(new Runnable() {
            @Override
            public void run() {
                int i = 1 / 0;
                System.out.println(Thread.currentThread().getName() + " task running");
            }
        });
    }
}

4.2 任务执行者处理

package com.dreamer.multithread.day09;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Demo08 {
    public static void main(String[] args) {
        ExecutorService pool = Executors.newFixedThreadPool(1);
        pool.submit(new Runnable() {
            @Override
            public void run() {
                try {
                    int i = 1 / 0;
                    System.out.println(Thread.currentThread().getName() + " task running");
                } catch (Exception e) {
                    e.printStackTrace();
                    return;
                }
            }
        });
    }
}

4.3 线程池处理

package com.dreamer.multithread.day09;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class Demo08 {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService pool = Executors.newFixedThreadPool(1);
        Future<?> result = pool.submit(new Runnable() {
            @Override
            public void run() {
                int i = 1 / 0;
                System.out.println(Thread.currentThread().getName() + " task running");
            }
        });

        TimeUnit.SECONDS.sleep(1);
        // 获取结果的时候,就可以把线程执行任务过程中的异常报出来
        System.out.println(result.get());
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/20911.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

idea创建spring boot工程及配置

目录 一、dea 创建spring boot工程 二、打包 三、启动配置文件 一、dea 创建spring boot工程 new project 选择Spring Initializr ,Type&#xff1a;选择Maven&#xff0c;Java 8, Packagin 选择Jar。然后点击next 添加依赖&#xff1a; 选择Sprint Boot版本&#xff0c;选…

差分约束算法

差分约束是为了解决这样一组不等式问题&#xff1a; 这个咋解决》我们来看 对于某个下标k而言&#xff0c;提取出关于其的所有不等式&#xff0c;&#xff08;其中xk在第一个),也就是 xk-x1<m1 xk-x2<m2 xk-x3<m3....对于这些不等式相当于是 xk取min(x1m1,x2m2,x3m3…

面试常问:HTTPS的加密过程 ----- 光明和黑暗的恩怨情仇

目录 关于运营商劫持 &#xff1a; 什么是运营商劫持?? 什么是运营商? 为什么要劫持? 如何劫持? 劫持的危害 互联网公司怎么办? HTTPS 什么是HTTPS 一些概念&#xff1a; HTTPS加密 1. 对称加密&#xff1a; 2. 非对称加密 3. 非对称加密对称加密 4. 加密…

基于java+ssm购物商城网站系统-计算机毕业设计

项目介绍 乐优购物商城是商业贸易中的一条非常重要的道路&#xff0c;可以把其从传统的实体模式中解放中来&#xff0c;网上购物可以为消费者提供巨大的便利。通过乐优购物商城这个平台&#xff0c;可以使用户足不出户就可以了解现今的流行趋势和丰富的商品信息&#xff0c;为…

请求跨域问题

在前端请求接口时&#xff0c;出现跨域是很常见的问题&#xff0c;跨域的解决方法也很多&#xff0c;但是目前通用的是以下两种方式&#xff1a; 开发环境生产环境在服务端配置 CORS在服务端配置 CORS配置开发服务器代理&#xff0c;比如 vite-server.proxy配置生产服务器代理…

Debezium的增量快照

GreatSQL社区原创内容未经授权不得随意使用&#xff0c;转载请联系小编并注明来源。GreatSQL是MySQL的国产分支版本&#xff0c;使用上与MySQL一致。作者&#xff1a; 如常 Debezium Incremental snapshotting Introduction CDC&#xff08;Change-Data-Capture&#xff09;正…

Java之反射相关知识补充

Java之反射一、概述1、静态语言和动态语言1.1 静态语言1.2 动态语言2、Reflection(反射)2.1 介绍2.2 流程2.3 Java反射机制提供的功能2.4 优缺点&#xff08;1&#xff09;优点&#xff08;2&#xff09;缺点2.5 反射相关主要API2.6 示例二、反射相关操作1、获取Class类的实例1…

第十二节:String类【java】

目录 &#x1f4d8;1.1 字符串构造方法 &#x1f4d2;1.2 String对象怎样比较 &#x1f4dc;1.2.1 引用类型 比较的是引用中的地址。 &#x1f4c4;1.2.2 boolean equals(Object anObject) 方法&#xff1a;比较怕两个引用所指的对象当中的内容是否一致 &#x1f4d1;1.2…

企业级nginx使用

企业级nginx使用 nginx实现平滑升级 [rootlnmp nginx-1.16.0]# cd /usr/local/nginx/sbin/ [rootlnmp sbin]# ls nginx nginx.old [rootlnmp sbin]# ./nginx -v nginx version: nginx/1.16.0 [rootlnmp sbin]# ./nginx.old -v nginx version: nginx/1.14.2 [rootlnmp sbin]#操…

物联网开发笔记(49)- 使用Micropython开发ESP32开发板之控制RGB全彩LED模块

一、目的 这一节我们学习如何使用我们的ESP32开发板来控制RGB全彩LED模块。 二、环境 ESP32 RGB全彩LED模块 Thonny IDE 几根杜邦线 1&#xff0c;共阴极接线方法 2&#xff0c;共阳极接线方法 三、代码 1&#xff0c;亮指定的颜色&#xff0c;比如百度蓝。 我们使用取色…

万字长文,精美插图,带你玩转redis

资料下载&#xff1a; 链接: https://pan.baidu.com/s/1ue4Bnv4b4ifp0S0I_yOJ_w?pwdd9x9 提取码: d9x9 Redis学习笔记 1. 认识Redis 1.1 什么是NoSQL&#xff1f; SQL是关系型数据库&#xff0c;NoSQL是非关系型数据库 下面是几种非关系型数据库及其优缺点和应用场景。 分类…

[附源码]java毕业设计网上书店系统

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

【分隔结构】动宾分离

动宾分离 动词 宾语 状语&#xff1a;如果宾语较长&#xff0c;状语较短&#xff0c;会转化为 动词 状语 宾语 While I disapprove of what you say, I would defend to the death your right to say it. 名词 引导词 主语 及物动词 You are the man that I will marry…

【K8S】学习笔记(一)

K8S学习笔记一、Kubernetes1.1、K8S功能1.2、K8S架构组件1.2.1、架构细节1.3、K8S核心概念1.3.1、Pod1.3.2、Volume1.3.3、Controller1.3.4、Deployment1.3.5、Service1.3.6、Label1.3.7、Namespace1.3.8、API二、搭建K8S2.1、K8S搭建规划2.1.1、单master集群2.1.2、多master集…

Html5的新增特性

Html5的新增特性主要是针对以前的不足&#xff0c;增加了一些新的标签&#xff0c;新的表单和新的表单属性等。 这些新特性都有兼容性问题&#xff0c;基本是IE9以上版本的浏览器才支持&#xff0c;如果不考虑兼容性问题&#xff0c;可以大量使用这些新特性。 声明&#xff1…

m基于迫零ZF准则的通信均衡器的matlab仿真

目录 1.算法概述 2.仿真效果预览 3.MATLAB部分代码预览 4.完整MATLAB程序 1.算法概述 在数字通信系统中&#xff0c;码间串扰和加性噪声是造成信号传输失真的主要因素&#xff0c;为克服码间串扰&#xff0c;在接收滤波器和抽样判决器之间附加一个可调滤波器&#xff0c;用…

STM32CubeMX:串口DMA

DMA&#xff1a;直接储存区访问&#xff0c;DMA传输将数据从一个地址空间复制到另一个空间。DMA传输方式无需CPU直接控制传输&#xff0c;也没有中断处理方式那样保留现场和恢复现场过程&#xff0c;通过硬件为RAM何IO设备开辟一条直接传输数据的通道&#xff0c;从而可以提高C…

WSL下安装ubuntu 18.04 +meep进行FDTD仿真计算

WSL下安装ubuntu 18.04 meep进行FDTD仿真计算前言WSL安装过程打开虚拟环境下载Ubuntu并修改安装路径更改软件源MeepVScode远程访问测试程序前言 使用meep进行FDTD开发&#xff0c;开源。这里记录一下自己的安装过程&#xff0c;可以不安装在C盘&#xff0c;有助于后面进行修改…

【JVM】java的jvm类加载器和类加载子系统

JVM类加载器和类加载子系统一、JVM体系结构二、ClassLoader类介绍三、类加载子系统3.1 加载阶段3.1.1 引导类加载器&#xff08;Bootstrap ClassLoader&#xff09;3.1.2 扩展类加载器&#xff08;Extension ClassLoader&#xff09;3.1.3 应用程序类加载器&#xff08;Applica…

Android入门第32天-Android中的Alert Dialog的使用大全

写在我的第200篇博客中的“前言” 这篇是我的第200篇博客。落笔写正文前我感触彼深。自从一个小P孩那时写博客只是为了一时的好玩&#xff0c;到逐步发觉只有让越来越多的人理解了技术&#xff0c;把技术普及到门槛越来越低&#xff0c;才会反推技术人员的处镜越来越好。因为必…