6.Java并发编程—深入剖析Java Executors:探索创建线程的5种神奇方式

news2024/11/20 14:28:01

Executors快速创建线程池的方法

Java通过Executors 工厂提供了5种创建线程池的方法,具体方法如下

方法名描述
newSingleThreadExecutor()创建一个单线程的线程池,该线程池中只有一个工作线程。所有任务按照提交的顺序依次执行,保证任务的顺序性。当工作线程意外终止时,会创建一个新的线程来替代它。适用于需要顺序执行任务且保证任务安全性的场景。
newFixedThreadPool(int nThreads)创建一个固定大小的线程池,该线程池中的线程数量固定为指定的数量。当有新任务提交时,如果线程池中有空闲线程,则立即使用空闲线程执行任务;如果没有空闲线程,则任务将被放入任务队列等待执行。
newCachedThreadPool()创建一个缓存线程池,该线程池中的线程数量不固定,可以根据任务的需求动态调整线程数量。空闲线程会被保留一段时间,如果在保留时间内没有任务执行,则这些线程将被终止并从线程池中删除。适用于执行大量短期任务的场景
newScheduledThreadPool(int corePoolSize)创建一个可调度的线程池,该线程池能够按照一定的调度策略执行任务。除了执行任务外,还可以按照指定的延迟时间或周期性地执行任务。适用于需要按照计划执行任务、定时任务或周期性任务的场景。
newWorkStealingPool(int parallelism) newWorkStealingPool(int parallelism)方法用于创建一个工作窃取线程池。工作窃取线程池是一种特殊的线程池,它根据一定的调度策略执行任务。除了执行任务外,工作窃取线程池还可以按照指定的延迟时间或周期性地执行任务。

ThreadFactory

在学习Executor创建线程池之前,我们先来学习一下

ThreadFactory是一个接口,用于创建线程对象的工厂。它定义了一个方法newThread,用于创建新的线程。

在Java中,线程的创建通常通过Thread类的构造函数进行,但是使用ThreadFactory可以将线程的创建过程与线程的执行逻辑分离开来。通过自定义的ThreadFactory,我们可以对线程进行更加灵活的配置和管理,例如指定线程名称、设置线程优先级、设置线程是否为守护线程等。

ThreadFactory接口只有一个方法:

Thread newThread(Runnable runnable);

该方法接受一个Runnable对象作为参数,并返回一个新的Thread对象。

一般情况下,我们可以通过实现ThreadFactory接口来自定义线程的创建。以下是一个示例的自定义ThreadFactory实现:

public class MyThreadFactory implements ThreadFactory {
    // 自定义线程的名称
    private final String namePrefix = "test-async-thread";
    private final AtomicInteger threadNumber = new AtomicInteger(1);

    public MyThreadFactory(String namePrefix) {
        this.namePrefix = namePrefix;
    }

    @Override
    public Thread newThread(Runnable runnable) {
        Thread thread = new Thread(runnable);
        thread.setName(namePrefix + "-" + threadNumber.getAndIncrement());
        thread.setPriority(Thread.NORM_PRIORITY);
        thread.setDaemon(false);
        return thread;
    }
}

在上面的示例中,MyThreadFactory实现了ThreadFactory接口,并通过构造函数传入一个namePrefix参数,用于指定线程的名称前缀。

newThread方法中,首先创建一个新的Thread对象,并设置线程的名称为namePrefix加上一个递增的数字。然后,可以根据需要设置线程的优先级、是否为守护线程等属性。

newSingleThreadExecutor() 创建单线程化线程池

该方法用于创建一个单线程化的线程池,也就是只有一个线程的线程池。该线程池中只有一个工作线程,它负责按照任务的提交顺序依次执行任务。当有任务提交时,会创建一个新的线程来执行任务。如果工作线程意外终止,线程池会创建一个新的线程来替代它,确保线程池中始终有一个可用的线程。

newSingleThreadExecutor()方法返回的线程池实例实现了ExecutorService接口,因此可以使用submit()方法提交任务并获取Future对象,或使用execute()方法提交任务。

该线程池适用于需要顺序执行任务且保证任务之间不会发生并发冲突的场景。由于只有一个工作线程,所以不存在线程间的竞争和并发问题,可以确保任务的安全性。

此外,newSingleThreadExecutor()方法创建的线程池还可以用于任务的异常处理。当任务抛出异常时,线程池会捕获异常并记录或处理异常,避免异常导致整个应用程序崩溃。

需要注意的是,由于该线程池只有一个线程,如果任务执行时间过长任务量过大可能会导致任务队列堆积,造成应用程序的性能问题。所以在使用该线程池时,需要根据任务的特性和需求进行适当的评估和调优。

下面我们使用Executors中newSingleThreadExecutor()方法创建一个单线程线程池

	/**
	 * 这里创建的线程是 是Executors.newSingleThreadExecutor() 一样 保证只有一个线程来进行执行 并且按照提交的顺序进行执行
	 * <pre>
	 *     {@code
	 *  public static ExecutorService newSingleThreadExecutor() {
	 * 			return new Executors.FinalizableDelegatedExecutorService (
	 * 		    	new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
	 *          }
	 *     }
	 * </pre>
	 */
	@Test
	public void test10() {
		// 如果有多个任务提交到线程池中,那么这个线程池中的线程会依次执行任务 和
		ExecutorService executorService = Executors.newSingleThreadExecutor();

		// 创建线程 1
		Thread t1 = new Thread(() -> {
			logger.error("t1 ----> 开始执行了~");
		}, "t1");

		// 创建线程 2
		Thread t2 = new Thread(() -> {
			logger.error("t2 ----> 开始执行了~");
		}, "t2");


		// 创建线程 3
		Thread t3 = new Thread(() -> {
			logger.error("t3 ----> 开始执行了~");
		}, "t3");


		// Executor 这个接口定义的功能很有限,同时也只支持 Runnale 形式的异步任务
		// 向线程池提交任务
		executorService.submit(t1);
		executorService.submit(t2);
		executorService.submit(t3);


		// 关闭线程池
		executorService.shutdown();

	}

首先,通过Executors.newSingleThreadExecutor();创建了一个单线程化的线程池。

然后,创建了三个线程t1、t2和t3,分别用于执行不同的任务。

接着,通过executorService.submit(t1)将t1线程提交到线程池中进行执行。同样地,也将t2和t3线程提交到线程池中。

最后,通过executorService.shutdown()关闭线程池。

执行时,可以观察到日志输出的顺序。由于线程池中只有一个线程,所以任务会依次按照提交的顺序进行执行。

需要注意的是,通过线程池执行任务后,线程的名称不再是我们自定义的线程名称,而是线程池的名称(如pool-2-thread-1)。这是因为具体的执行任务是交给线程池来管理和执行的。

从输出中我们可以看出,该线程池有以下特点:

  • 单线程化的线程池中的任务,都是按照提交的顺序来进行执行的。

  • 该线程池中的唯一线程存活时间是无限的

  • 当线程池中唯一的线程正在繁忙时,新提交的任务会进入到其内部的阻塞队列中,而且阻塞队列的容量是无限的

  • // 这是   newSingleThreadExecutor 一个无参的构造方法  
    public static ExecutorService newSingleThreadExecutor() {
          // 创建一个FinalizableDelegatedExecutorService实例,该实例是ExecutorService接口的一个包装类
            // 将上面创建的ThreadPoolExecutor实例作为参数传入
            // 这样就得到了一个单线程化的线程池
            return new FinalizableDelegatedExecutorService
                (   
                 	// 创建一个ThreadPoolExecutor实例,指定参数如下:
                    // corePoolSize: 1,线程池中核心线程的数量为1
                    // maximumPoolSize: 1,线程池中最大线程的数量为1
                    // keepAliveTime: 0L,空闲线程的存活时间为0毫秒,即空闲线程立即被回收
                    // unit: TimeUnit.MILLISECONDS,存活时间的时间单位是毫秒
                    // workQueue: new LinkedBlockingQueue<Runnable>(),使用无界阻塞队列作为任务队列
                	new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
        }
    
    // 其中 阻塞队列大小,如果不传容量,默认是整形的最大值
    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }
    
  • 总体来说:ewSingleThreadExecutor();适用于按照任务提交次序,一个接一个的执行场景

image-20240313074456452

newFixedThreadPool 创建固定数量的线程池

该方法用于创建一个固定数量的线程池,其中唯一的参数是用于设置线程池中线程的数量

newFixedThreadPoolExecutors类提供的一个静态方法,用于创建一个固定大小的线程池。

方法具体如下

public static ExecutorService newFixedThreadPool(int nThreads)

参数nThreads表示线程池中的线程数量,即固定的线程数量。线程池中的线程数不会根据任务的多少进行动态调整,即使有空闲线程也不会销毁,除非调用了线程池的shutdown方法。

newFixedThreadPool方法返回一个ExecutorService对象,它是Executor接口的子接口,提供了更加丰富的任务提交和管理方法。

通过创建固定大小的线程池,可以在任务并发量较高且预期的任务数量固定的情况下,提供一定程度的线程复用和线程调度控制。线程池会根据固定的线程数量来创建对应数量的线程,并将任务分配给这些线程进行执行。

线程池的工作原理如下:

  • 当有任务提交到线程池时,线程池中的某个线程会被唤醒来执行任务。
  • 如果所有线程都在执行任务,新的任务会被放入一个任务队列中等待执行。
  • 当任务队列已满时,线程池会根据配置的拒绝策略来处理无法执行的任务。

需要注意的是,由于线程池的大小是固定的,如果任务数量超过线程池的容量,任务会在任务队列中等待执行。这可能会导致任务等待时间增加或任务堆积,进而影响系统的响应性能。因此,在选择线程池大小时,需要根据系统的负载情况和任务特点进行合理的配置。

下面我们通过代码来了解一下 newFixedThreadPool

@Test
	public void test18() {
		ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(2);
		executor.submit(new Runnable() {
			@Override
			public void run() {
				// 随机睡觉
				timeSleep();
				printPoolInfo(executor, "*[耗时线程-1]");
			}
		});
		executor.submit(new Runnable() {
			@Override
			public void run() {
				// 随机睡觉
				timeSleep();
				printPoolInfo(executor, "*[耗时线程-2]");
			}
		});
		executor.submit(new Runnable() {
			@Override
			public void run() {
				timeSleep();
				printPoolInfo(executor, "*[耗时线程-3]");
			}
		});
		for (int i = 0; i < 5; i++) {
			int finalI = i;
			executor.submit(new Runnable() {
				@Override
				public void run() {
					printPoolInfo(executor, "普通线程");
				}
			});
		}

		// 优雅关闭线程池
		executor.shutdown();

		waitPoolExecutedEnd(executor);
	}


	// 这个方法用于等待线程全部执行结束
	public void waitPoolExecutedEnd(ThreadPoolExecutor executor) {
		// 确保主线程完全等待子线程执行完毕
		try {
			// 等待线程池中的任务执行完毕,最多等待1天
			if (!executor.awaitTermination(5, TimeUnit.MINUTES)) {
				logger.error("线程池中的任务还未全部执行完毕~");
			}
		} catch (InterruptedException e) {
			logger.error("等待线程池中的任务被中断~");
		}
	}

这段代码创建了一个固定大小为2的线程池 executor,并向其中提交了多个任务。其中:

  1. 通过 Executors.newFixedThreadPool(2) 创建了一个固定大小为2的线程池。
  2. 向线程池中提交了多个任务,包括两个耗时任务和五个普通任务。
  3. 每个耗时任务都会执行 timeSleep() 方法进行一段随机时间的睡眠,然后执行 printPoolInfo() 方法输出当前线程池的信息。
  4. 普通任务直接执行 printPoolInfo() 方法输出当前线程池的信息。
  5. 在所有任务提交完毕后,调用 executor.shutdown() 方法优雅地关闭线程池,并调用 waitPoolExecutedEnd(executor) 方法等待线程池中的任务执行完毕。

image-20240313210333249

根据输出的结果 我们可以观察到

  • 当第一个耗时任务执行时,线程池中有2个核心线程在工作。
  • 当第三个任务执行时,线程池中的任务数量已经达到5个,但是活跃线程数量仍然为2,说明任务正在等待空闲线程来执行。
  • 所有任务执行完成后,线程池中的线程数量仍然保持为2,这是因为线程池是一个固定大小的线程池。
  • 在任务执行的过程中,线程池的核心线程数量一直为2,线程数量也一直保持在2个,因为线程池的大小是固定的。

固定大小线程池适用于以下场景:

  1. 资源受限场景:当系统资源受限,无法创建过多线程时,固定大小线程池能够限制线程数量,防止系统资源被耗尽。
  2. 稳定的任务处理:适用于稳定的任务处理场景,例如批量数据处理、定时任务等,因为固定大小线程池能够保持一定数量的线程长时间运行,避免线程的频繁创建和销毁。
  3. 控制并发数量:在需要限制并发数量的场景下,固定大小线程池能够控制同时执行的任务数量,防止系统过载。
  4. 避免资源竞争:固定大小线程池可以避免多个任务争夺系统资源而导致的竞争和性能下降。
  5. 稳定的性能预测:在需要稳定的性能预测下,固定大小线程池能够提供一致的性能表现,因为线程数量是固定的,可以更好地进行性能测试和预测。

newCachedThreadPool 创建可以缓存的线程池

newCachedThreadPool 用于创建一个可以缓存的线程池,如果线程内的某些线程无事可干,那么就会成为空线程,可缓存线程池,可以灵活回收这些空闲线程

newCachedThreadPoolExecutors类提供的一个静态方法,用于创建一个缓存型线程池。

方法定义如下

public static ExecutorService newCachedThreadPool()

newCachedThreadPool方法返回一个ExecutorService对象,它是Executor接口的子接口,提供了更加丰富的任务提交和管理方法。

缓存型线程池会根据需要自动创建和回收线程,线程池的大小可以根据任务的数量自动调整。如果当前没有可用的空闲线程,会创建新的线程来执行任务;如果有空闲线程并且它们在指定的时间内没有执行任务,那么这些空闲线程将会被回收。

使用缓存型线程池的优点是可以根据任务的数量动态调整线程池的大小,以适应不同的负载情况。当任务数量较少时,线程池会减少线程的数量以节省资源;当任务数量增加时,线程池会增加线程的数量以提高并发性。

需要注意的是,由于缓存型线程池的大小是不限制的,它可能会创建大量的线程,如果任务的提交速度超过了线程执行任务的速度,可能会导致系统资源消耗过多,甚至造成系统崩溃。因此,在使用缓存型线程池时,需要根据任务特点和系统资源情况进行合理的配置。

下面我们通过一个案例,来了解一下newCachedThreadPool,但是了解newCachedThreadPool之前,我们先来熟悉一个阻塞队列,这个会在后面的阻塞队列专题中详细介绍,这里只是作为了解

SynchronousQueue
  1. 无内部存储容量:与其他阻塞队列不同,SynchronousQueue 不存储元素,其容量为零。换句话说,它是一个零容量的队列,用于在线程之间同步传输数据。
  2. 阻塞队列:作为 BlockingQueue 接口的一个实现,SynchronousQueue 提供了阻塞操作,允许线程在队列的插入和移除操作上进行阻塞等待。
  3. 匹配插入和移除操作:在 SynchronousQueue 中,每个插入操作必须等待另一个线程的移除操作,反之亦然。换句话说,发送线程必须等待接收线程,而接收线程也必须等待发送线程,才能够完成操作。这样的特性保证了数据的可靠传输,只有在有线程与之匹配时,才会进行数据传输。
  4. 不支持 peek 操作:由于 SynchronousQueue 内部没有存储元素,因此不能调用 peek 操作。只有在移除元素时才会有元素可供操作。
  5. 支持公平和非公平模式:SynchronousQueue 可以在构造时指定为公平或非公平模式。在公平模式下,队列会按照线程的到达顺序进行操作;而在非公平模式下,则不保证操作的顺序。

SynchronousQueue 在多线程并发编程中常用于一些特定场景,例如生产者-消费者模式中,用于传输数据的场景,以及一些任务执行器中用于任务的传递等。其特殊的同步机制保证了线程之间数据的可靠传输和同步操作。

基于SynchronousQueue 一个小案例

	/**
	 * 理解SynchronousQueue
	 * SynchronousQueue,实际上它不是一个真正的队列,因为SynchronousQueue没有容量。与其他BlockingQueue(阻塞队列)不同,
	 * SynchronousQueue是一个不存储元素的BlockingQueue。只是它维护一组线程,这些线程在等待着把元素加入或移出队列。
	 * 我们简单分为以下几种特点:
	 * 内部没有存储(容量为0)
	 * 阻塞队列(也是blockingqueue的一个实现)
	 * 发送或者消费线程会阻塞,只有有一对消费和发送线程匹配上,才同时退出。
	 * (其中每个插入操作必须等待另一个线程的移除操作,同样任何一个移除操作都等待另一个线程的插入操作。因此此队列内部其 实没有任何一个元素,因此不能调用peek操作,因为只有移除元素时才有元素。)
	 * 配对有公平模式和非公平模式(默认)
	 */
	@Test
	public void test19() throws InterruptedException {
		SynchronousQueue<String> queue = new SynchronousQueue<>();
		// 我们通过线程内 入队 和 出队 了解下 SynchronousQueue的特性
		new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					// 入队
					logger.error("---- 喜羊羊进锅,沐浴~ ----");
					queue.put("喜羊羊!");

					logger.error("---- 懒羊羊进锅,沐浴~ ----");
					queue.put("懒羊羊!");


					logger.error("---- 美羊羊进锅,沐浴~ ----");
					queue.put("美羊羊!");

				} catch (InterruptedException e) {
					throw new RuntimeException(e);
				}
			}
		}, "t1").start();


		new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					// 入队
					Thread.sleep(5000);
					String node1 = queue.take();
					logger.error("---- {} 出锅~ ----", node1);
					Thread.sleep(10000);
					String node2 = queue.take();
					logger.error("---- {} 出锅~ ----", node2);
					Thread.sleep(5000);
					String node3 = queue.take();
					logger.error("---- {} 出锅~ ----", node3);
				} catch (InterruptedException e) {
					throw new RuntimeException(e);
				}
			}
		}, "t2").start();

		Thread.sleep(Integer.MAX_VALUE);
	}

image-20240313211349801

	// 根据 结果可以发现, 三只羊同时进锅,但是一个锅只能容纳一只羊,所以只有一只羊能进锅,其他的羊只能等待,直到锅里的羊出锅,才能进锅
	// 也就是说,SynchronousQueue是一个不存储元素的BlockingQueue。只是它维护一组线程,这些线程在等待着把元素加入或移出队列。
	// 喜羊羊进锅,然后等待 5s后   喜羊羊出锅,此时美羊羊开始进锅

了解这个阻塞队列后,我们再来了解一下newCachedThreadPool这个线程池,还是通过一个案例来进行了解一下具体用法

	/**
	 * newCachedThreadPool 创建可以缓存的线程池
	 */
	@Test
	public void test17() {
		// 创建可以缓存的线程池
		// 创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
		// keepAliveTime为60S,意味着线程空闲时间超过60S就会被杀死(60L)
		ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) Executors.newCachedThreadPool();
		// 这里使用两个线程,异步执行
		// 此处 羊羊线程是不休眠的,直接放入线程池
		new Thread(() -> {
			for (int i = 0; i < 2; i++) {
				int finalI = i;
				threadPoolExecutor.submit(() -> {
					printPoolInfo(threadPoolExecutor, "羊羊线程" + finalI + "-正在执行");
				});
			}
		}, "任务线程-1").start();

		// 狼狼线程 此时第一个线程也是不会进行阻塞,此时应该是  三个线程 (两个羊羊+一个狼狼 同时进线程池)
		new Thread(() -> {
			for (int i = 0; i < 5; i++) {
				if (i == 1 || i == 2) {
					try {
						Thread.sleep(TimeUnit.SECONDS.toMillis(5));
					} catch (InterruptedException e) {
						throw new RuntimeException(e);
					}
				}
				int finalI = i;
				threadPoolExecutor.submit(() -> {
					printPoolInfo(threadPoolExecutor, "狼狼线程" + finalI + "-正在执行");
				});
			}
		}, "任务线程-2").start();


		// 等待全部线程都执行完毕
		waitPoolExecutedEnd(threadPoolExecutor);

		threadPoolExecutor.shutdown();

	}

这段代码的主要逻辑如下

  1. 使用 Executors.newCachedThreadPool() 创建了一个可缓存的线程池 threadPoolExecutor。这种线程池的特点是,如果线程池长度超过当前任务需求,它会灵活地回收空闲线程;若没有可回收的线程,则会新建线程来处理任务。线程的空闲时间超过60秒后,就会被回收。
  2. 创建了两个新的线程,分别用于提交任务到线程池中执行。其中,一个线程负责提交羊羊线程,另一个线程负责提交狼狼线程。
  3. 羊羊线程的任务不会进行阻塞,直接提交到线程池中执行。
  4. 狼狼线程的任务中,当 i 等于1或2时,会进行5秒的睡眠,模拟任务的耗时操作,然后再提交到线程池中执行。
  5. 使用 waitPoolExecutedEnd(threadPoolExecutor) 方法等待线程池中的任务执行完毕。
  6. 在所有任务执行完毕后,调用 threadPoolExecutor.shutdown() 方法关闭线程池。

image-20240313211756425

  1. 通过结果可以看到每个任务的执行都是间隔5秒执行一次。
  2. 线程池信息中的线程数量始终为3,这是因为 Executors.newCachedThreadPool() 创建的是一个可缓存的线程池,其最大线程数量为 Integer.MAX_VALUE,因此当任务提交到线程池时,如果没有空闲线程可用,则会新建线程来处理任务,线程数量会一直增加,直到达到设定的最大值。
  3. 在任务提交时,活跃线程数量始终为3,这是因为每次提交任务时,都有空闲线程可用,所以不需要新建线程,而是直接使用已存在的线程执行任务。
  4. 狼狼线程的任务会进行5秒的睡眠操作,模拟耗时操作,因此在执行任务期间,线程池中的活跃线程数量会减少,直到任务执行完毕后,线程池会继续维持3个活跃线程数量。
  5. 在执行完所有任务后,线程池并不会立即关闭,因为线程池是可缓存的,会等待一段时间后空闲线程自动被回收。

应用场景:

  1. 短期任务处理:适用于处理大量短期任务的场景,因为它能够根据需要动态地创建线程,处理任务,处理完毕后又自动回收线程,避免了线程过多占用资源的问题。
  2. 任务处理时间不确定:适用于任务处理时间不确定的场景,因为它能够根据实际情况动态调整线程数量,保证任务能够及时得到处理,提高系统的响应速度。
  3. 需要快速响应的任务:适用于需要快速响应的任务,因为它能够快速地创建线程来处理任务,缩短任务等待的时间,提高任务的处理效率。
  4. 任务负载波动大:适用于任务负载波动大的场景,因为它能够根据负载情况动态调整线程数量,使系统能够更好地适应负载的变化。

缺点:

  1. 线程数量不受限制:由于 newCachedThreadPool 的最大线程数量为 Integer.MAX_VALUE,因此在大量任务提交的情况下,可能会导致线程数量过多,占用大量系统资源,导致系统负载过高,甚至引发系统崩溃。
  2. 不适用于长时间任务:由于它的线程数量不受限制,适用于处理短期任务,但不适用于长时间任务,因为长时间任务可能会导致线程数量过多,占用大量系统资源。
  3. 可能导致频繁创建和销毁线程:由于 newCachedThreadPool 是一个动态的线程池类型,可能会频繁地创建和销毁线程,这种线程的创建和销毁操作会带来一定的性能开销。

综上所述,newCachedThreadPool 适用于任务处理时间不确定、负载波动大、需要快速响应的场景,但在大量长时间任务的情况下,需要慎重选择以避免占用过多系统资源。

newScheduledThreadPool 创建可调度的线程池

项目中经常会遇到一些非分布式的调度任务,需要在未来的某个时刻周期性执行。实现这样的功能,我们有多种方式可以选择:

  1. Timer类, jdk1.3引入,不推荐
    1. 它所有任务都是串行执行的,同一时间只能有一个任务在执行,而且前一个任务的延迟或异常都将会影响到之后的任务。
  2. Spring的@Scheduled注解,不是很推荐
    1. 这种方式底层虽然是用线程池实现,但是有个最大的问题,所有的任务都使用的同一个线程池,可能会导致长周期的任务运行影响短周期任务运行,造成线程池"饥饿",更加推荐的做法是同种类型的任务使用同一个线程池。
  3. 自定义ScheduledThreadPoolExecutor实现调度任务
    这也是下面重点讲解的方式,通过自定义ScheduledThreadPoolExecutor调度线程池,提交调度任务才是最优解。

newScheduledThreadPool 用于创建一个可调度的线程newScheduledThreadPoolExecutors类提供的一个静态方法

方法如下

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)

newScheduledThreadPool方法返回一个ScheduledExecutorService对象,它是ExecutorService接口的子接口,提供了任务调度的能力。

可调度的线程池可以用于延时执行任务和周期性执行任务。它可以根据需要自动创建和回收线程,并且可以在指定的延时时间后执行任务,或在指定的时间间隔内重复执行任务。

使用newScheduledThreadPool创建的可调度线程池有以下特点:

  • corePoolSize参数指定了线程池的核心线程数,即线程池中同时执行任务的最大线程数。
  • 当任务的延时时间到达时,线程池会创建新的线程来执行任务。
  • 如果线程池中的线程数量超过核心线程数,空闲的线程会在指定的时间内被回收。
  • 可以使用schedule方法来延时执行任务,也可以使用scheduleAtFixedRate方法或scheduleWithFixedDelay方法来周期性执行任务。

使用示例:

ScheduledExecutorService executor = Executors.newScheduledThreadPool(3);
executor.schedule(() -> {
    // 延时执行的任务逻辑
}, 5, TimeUnit.SECONDS);

executor.scheduleAtFixedRate(() -> {
    // 周期性执行的任务逻辑
}, 1, 3, TimeUnit.SECONDS);

executor.scheduleWithFixedDelay(() -> {
    // 周期性执行的任务逻辑
}, 2, 4, TimeUnit.SECONDS);

在示例中,schedule方法用于延时执行任务,它接受一个任务和延时时间,表示在指定的延时时间后执行任务。

scheduleAtFixedRate方法用于周期性执行任务,它接受一个任务、初始延时时间和周期时间,表示在初始延时时间后开始执行任务,并以指定的周期时间重复执行任务。

scheduleWithFixedDelay方法也用于周期性执行任务,它接受一个任务、初始延时时间和周期时间,表示在初始延时时间后开始执行任务,并在任务执行完成后等待指定的周期时间,然后再执行下一个任务。

总之,newScheduledThreadPool方法用于创建一个可调度的线程池,可以用于延时执行任务和周期性执行任务。通过合理配置延时时间和周期时间,可以满足不同场景下的任务调度需求。

下面我们通过一个案例来了解如何创建延时线程 和 定时线程

@Test
	public void test20() {
		// 使用Executors.newScheduledThreadPool 创建线程池
		ScheduledThreadPoolExecutor scheduledThreadPool = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(5, new ThreadFactory() {
			@Override
			public Thread newThread(Runnable r) {
				// 对线程名称进行自定义
				return new Thread(r, "my-scheduled-job-" + r.hashCode());
			}
		});

		// 准备工作
		DelayedThread delayedThread = new DelayedThread();
		LoopThread loopThread = new LoopThread();


		// 执行延时线程 (延时 10s开始执行 )
		logger.error("延时线程工作准备结束!");
		scheduledThreadPool.schedule(delayedThread, 10, TimeUnit.SECONDS);


		// 执行循环线程

		// scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)
		// command: 执行的任务
		// initialDelay: 初始延迟的时间
		// delay: 上次执行结束,延迟多久执行
		// unit:单位
		logger.error("循环线程工作准备结束!");
		scheduledThreadPool.scheduleAtFixedRate(loopThread, 10, 5, TimeUnit.SECONDS);

		waitPoolExecutedEnd(scheduledThreadPool);

		scheduledThreadPool.shutdown();

	}

	static class DelayedThread implements Runnable {
		@Override
		public void run() {
			logger.error("延时线程正在开始执行~~");
		}
	}

	static class LoopThread implements Runnable {

		@Override
		public void run() {
			logger.error("循环线程开始执行~~");
		}
	}

image-20240313213718558

首先,通过Executors.newScheduledThreadPool方法创建了一个可调度的线程池ScheduledThreadPoolExecutor,并指定了线程池的核心线程数为5。同时,通过自定义的ThreadFactory来创建线程,并给线程指定了自定义的名称。

接下来,定义了两个任务类DelayedThreadLoopThread,分别实现Runnable接口。

在测试方法中,首先创建了DelayedThreadLoopThread的实例。

然后,在测试方法中,首先创建了DelayedThreadLoopThread的实例。

然后,通过调用scheduledThreadPool.schedule方法,将DelayedThread任务提交给线程池,并指定延时时间为10秒。这意味着DelayedThread任务将在10秒后执行。

接着,通过调用scheduledThreadPool.scheduleAtFixedRate方法,将LoopThread任务提交给线程池,并指定初始延时时间为10秒、周期时间为5秒。这意味着LoopThread任务将在初始延时时间后开始执行,并且每隔5秒重复执行一次。

最后,调用waitPoolExecutedEnd方法等待线程池中的任务执行完毕,并调用线程池的shutdown方法关闭线程池。

总结起来,这段代码演示了使用Executors.newScheduledThreadPool创建可调度的线程池,并展示了延时执行和周期性执行的例子。通过合理配置延时时间和周期时间,可以实现在指定的时间点或时间间隔内执行任务。

newWorkStealingPool 创建一个可窃取任务的线程池

newWorkStealingPool(int parallelism)方法用于创建一个工作窃取线程池。工作窃取线程池是一种特殊的线程池,它根据一定的调度策略执行任务。除了执行任务外,工作窃取线程池还可以按照指定的延迟时间或周期性地执行任务。

工作窃取线程池最早由美国计算机科学家 Charles E. Leiserson 和 John C. Bains 发明,他们在 1994 年的论文《Scheduling Multithreaded Computations by Work Stealing》中首次提出了这一概念。

工作窃取线程池的设计初衷是为了解决并行计算中独立任务的负载均衡问题。在并行计算中,通常存在大量的独立任务需要并行执行,而这些独立任务的执行时间往往不一致。如果简单地将任务平均分配给每个线程,那些执行时间较短的任务将会导致线程空闲,而执行时间较长的任务则可能导致线程被阻塞,从而降低整体的执行效率。

为了解决这个问题,工作窃取线程池引入了工作窃取算法。该算法允许空闲线程从其他线程的任务队列末尾窃取任务来执行,以实现负载均衡。每个线程都维护一个自己的任务队列,当线程自己的任务执行完毕后,它会尝试从其他线程的任务队列末尾窃取任务执行。这样,任务的分配和执行可以更加均衡,避免线程之间出现明显的负载不均衡。

我们了解 newWorkStealingPool 先来了解一下Fork/Join

Fork/Join框架是Java提供的一种并行执行任务的框架,它基于工作窃取算法实现任务的自动调度和负载均衡。在Fork/Join框架中,工作窃取线程池是其中的核心组件。

Fork/Join框架的工作原理如下:

  1. 每个任务被划分为更小的子任务,这个过程通常被称为"fork"。
  2. 当一个线程执行"fork"操作时,它会将子任务放入自己的工作队列中。
  3. 当一个线程完成自己的任务后,它会从其他线程的工作队列中"steal"(窃取)任务来执行。
  4. 窃取的任务通常是其他线程工作队列的末尾的任务,这样可以减少线程之间的竞争。

工作窃取线程池在Fork/Join框架中的应用主要体现在以下几个方面:

  1. 任务分割: 在Fork/Join框架中,任务被递归地分割成更小的子任务,直到达到某个终止条件。工作窃取线程池中的线程负责执行这些任务。每个线程都有自己的任务队列,当一个线程执行完自己的任务后,会从自己的队列中获取新的任务来执行。
  2. 负载均衡: 工作窃取线程池通过工作窃取算法实现负载均衡当一个线程的任务队列为空时,它会从其他线程的任务队列中窃取任务来执行,以保持各个线程的工作量相对均衡。这种负载均衡策略可以避免线程之间出现明显的负载不均衡,提高整体的执行效率。
  3. 递归任务执行: Fork/Join框架中的任务通常是递归执行的。当一个任务被分割成多个子任务时,每个子任务会被提交到工作窃取线程池中执行。如果子任务还可以进一步分割,线程会继续执行这个过程,直到任务不能再分割为止。这种递归的任务执行方式能够充分利用线程池中的线程资源,提高并行任务的执行效率。
  4. Join操作: 在Fork/Join框架中,一个任务可以等待其子任务执行完成后再继续执行,这个操作被称为"join"。工作窃取线程池在执行任务时会自动进行join操作,确保任务的执行顺序满足依赖关系。这样可以避免线程之间的竞争和冲突,保证任务的正确性。

总的来说,工作窃取线程池在Fork/Join框架中扮演着重要的角色。它通过工作窃取算法和负载均衡策略,实现了并行任务的自动调度和执行。通过递归任务执行和join操作,工作窃取线程池能够高效地处理大量的并行任务,并充分利用系统的并行计算能力。

newWorkStealingPool 是一个创建工作窃取线程池的方法,它使用了ForkJoinPool,并根据CPU核心数动态调整线程数量。这种线程池适用于CPU密集型的任务。

与其他四种线程池不同,newWorkStealingPool 使用了ForkJoinPool。它的优势在于将一个任务拆分成多个小任务,并将这些小任务分发给多个线程并行执行。当所有小任务都执行完成后,再将它们的结果合并。

相较于之前的线程池,newWorkStealingPool 中的每个线程都拥有自己的任务队列,而不是多个线程共享一个阻塞队列。

当一个线程发现自己的任务队列为空时,它会去其他线程的队列中窃取任务来执行。可以将这个过程简单理解为"窃取"。为了降低冲突,一般情况下,自己的本地队列采用后进先出(LIFO)的顺序,而窃取时则采用先进先出(FIFO)的顺序。由于窃取的动作非常快速,这种冲突会大大降低,从而提高了性能。这也是一种优化方式。

下面我们通过一个案例 来了解一下 newWorkStealingPool

	@Test
	public void test21() {
		// 返回可用的计算资源
		int core = Runtime.getRuntime().availableProcessors();
		logger.error("cpu 可以计算机资源 :{}", core);
		// 无参数的话,会根据cpu当前核心数据 动态分配
		// ExecutorService executorService = Executors.newWorkStealingPool();
		// 当传入参数,就可以指定cpu的一个并行数量
		ForkJoinPool forkJoinPool = (ForkJoinPool) Executors.newWorkStealingPool(8);

		for (int i = 1; i <= core * 2; i++) {
			WorkStealThread workStealThread = new WorkStealThread(i, forkJoinPool);
			forkJoinPool.submit(workStealThread);

		}


		// 优雅关闭
		forkJoinPool.shutdown();

		// 为了防止 主线程 没有完全执行结束
		try {
			Thread.sleep(Integer.MAX_VALUE);
		} catch (InterruptedException e) {
			throw new RuntimeException(e);
		}


	}


	static class WorkStealThread implements Runnable {
		private int i;
		private ForkJoinPool forkJoinPool;

		WorkStealThread() {

		}

		WorkStealThread(int i, ForkJoinPool forkJoinPool) {
			this.i = i;
			this.forkJoinPool = forkJoinPool;
		}

		@Override
		public void run() {
			try {
				// 随机休眠
				Thread.sleep(TimeUnit.SECONDS.toMillis(new Random().nextInt(10)));
			} catch (InterruptedException e) {
				throw new RuntimeException(e);
			}
			logger.error("工作窃取线程-{},线程池的大小:[{}],活动线程数:[{}],总任务窃取次数:[{}]",
					i, forkJoinPool.getPoolSize(), forkJoinPool.getActiveThreadCount(), forkJoinPool.getStealCount());
		}
	}

当代码运行时,首先通过Runtime.getRuntime().availableProcessors()获取可用的计算资源,即CPU核心数,并将其记录在日志中。

接下来,使用Executors.newWorkStealingPool(8)创建一个工作窃取线程池,其中参数8表示线程池的并行度,即同时执行的线程数。这个线程池是基于ForkJoinPool实现的,具有任务窃取的特性。

然后,通过一个循环将一些工作窃取线程提交到线程池中进行并行执行。每个工作窃取线程都有一个编号,从1到核心数的两倍。这些线程使用WorkStealThread类实现了Runnable接口。

WorkStealThread类中的run方法定义了线程的执行逻辑。首先,线程会随机休眠一段时间,模拟执行一些耗时的任务。然后,它会输出一些关于线程池状态的信息,包括线程池的大小、活动线程数和总任务窃取次数。这些信息会记录在日志中。

最后,线程池会被优雅地关闭,确保所有任务都能执行完毕。为了防止主线程提前结束,使用Thread.sleep(Integer.MAX_VALUE)使主线程休眠,直到被中断或抛出异常。

通过以上代码,我们可以了解到如何使用newWorkStealingPool方法创建工作窃取线程池,并通过工作窃取线程实现并行执行。同时,通过获取线程池的状态信息,我们可以了解线程池的工作情况,包括活动线程数和任务窃取次数。

通过观察结果,我们可以发现,当前线程执行完毕,如果空闲的话,会去执行别的线程任务

image-20240313215049387

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1514132.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Mybatis-Plus实现常规增删改操作

文章目录 3.1 MP实现插入操作3.1.1 BaseMapper定义操作方法3.1.2 代码测试 3.2 MP主键字段注解-TableId3.2.1 注解TableId介绍3.2.2 MP主键生成策略介绍3.2.3 MP常用主键生成策略3.2.4 雪花算法(了解) 3.3 普通列注解-TableField3.3.1 注解TableField作用3.3.2 代码示例 3.4.MP…

自适应差分进化算法(SaDE)和差分进化算法(DE)优化BP神经网络

自适应差分进化算法(SaDE)和差分进化算法(DE)优化BP神经网络 自适应差分进化算法(SaDE)和差分进化算法(DE)可以用于优化神经网络中的参数&#xff0c;包括神经网络的权重和偏置。在优化BP神经网络中&#xff0c;DE和SaDE可以帮助找到更好的权重和偏置的组合&#xff0c;以提高…

基于YOLOv8/YOLOv7/YOLOv6/YOLOv5的交通标志识别系统详解(深度学习模型+UI界面代码+训练数据集)

摘要&#xff1a;本篇博客详细介绍了利用深度学习构建交通标志识别系统的过程&#xff0c;并提供了完整的实现代码。该系统采用了先进的YOLOv8算法&#xff0c;并与YOLOv7、YOLOv6、YOLOv5等早期版本进行了性能评估对比&#xff0c;分析了性能指标如mAP、F1 Score等。文章深入探…

4、设计模式之建造者模式(Builder)

一、什么是建造者模式 建造者模式是一种创建型设计模式&#xff0c;也叫生成器模式。 定义&#xff1a;封装一个复杂对象构造过程&#xff0c;并允许按步骤构造。 解释&#xff1a;就是将复杂对象的创建过程拆分成多个简单对象的创建过程&#xff0c;并将这些简单对象组合起来…

吴恩达机器学习-可选实验室:逻辑回归,决策边界(Logistic Regression,Decision Boundary))

文章目录 目标数据集图数据逻辑回归模型复习逻辑回归和决策边界绘图决策边界恭喜 目标 在本实验中&#xff0c;你将:绘制逻辑回归模型的决策边界。这会让你更好地理解模型的预测。 import numpy as np %matplotlib widget import matplotlib.pyplot as plt from lab_utils_co…

Python逆向:pyc字节码转py文件

一、 工具准备 反编译工具&#xff1a;pycdc.exe 十六进制编辑器&#xff1a;010editor 二、字节码文件转换 在CTF中&#xff0c;有时候会得到一串十六进制文件&#xff0c;通过010editor使用查看后&#xff0c;怀疑可能是python的字节码文件。 三、逆向反编译 将010editor得到…

【网络工程师进阶之路】BFD技术

个人名片&#xff1a;&#x1faaa; &#x1f43c;作者简介&#xff1a;一名大三在校生&#xff0c;喜欢AI编程&#x1f38b; &#x1f43b;‍❄️个人主页&#x1f947;&#xff1a;落798. &#x1f43c;个人WeChat&#xff1a;hmmwx53 &#x1f54a;️系列专栏&#xff1a;&a…

第十四届蓝桥杯蜗牛

蜗牛 线性dp 目录 蜗牛 线性dp 先求到达竹竿底部的状态转移方程 求蜗牛到达第i根竹竿的传送门入口的最短时间​编辑 题目链接&#xff1a;蓝桥杯2023年第十四届省赛真题-蜗牛 - C语言网 关键在于建立数组将竹竿上的每个状态量表示出来&#xff0c;并分析出状态转移方程 in…

《详解:鸿蒙NEXT开发核心技术》

我们现在都知道鸿蒙作为一个国产的全栈自研系统&#xff0c;经过国家主推后。已经引起人们很大的关注&#xff0c;其中作为开发者来说&#xff1b;许多一线大厂已经与其华为鸿蒙展开原生应用的合作了&#xff0c;目前了解到已经有200家。而之后出现了很多的高薪鸿蒙开发岗位&am…

Unity制作马赛克效果

大家好&#xff0c;我是阿赵。   之前在玩怒之铁拳4里面&#xff0c;看到了马赛克场景转换的效果&#xff0c;觉得很有趣&#xff0c;于是也来做一下。 一、2D版本的马赛克转场效果 先看看视频效果&#xff1a; 马赛克转场 这里我是直接写shader实现的&#xff0c;我这里是把…

sqlite3——数据库——day2

今天学习了sqlite3数据库 sqlite3_open sqlite3_openint sqlite3_open(const char *filename, /* Database filename (UTF-8) */sqlite3 **ppDb /* OUT: SQLite db handle */); 功能:打开数据库文件(创建一个数据库连接) 参数:filename:数据库文件路径 ppDb:操作数…

SpringCloud Gateway 新一代网关

一、前言 接下来是开展一系列的 SpringCloud 的学习之旅&#xff0c;从传统的模块之间调用&#xff0c;一步步的升级为 SpringCloud 模块之间的调用&#xff0c;此篇文章为第六篇&#xff0c;即介绍 Gateway 新一代网关。 二、概述 2.1 Gateway 是什么 Gateway 是在 Spring 生…

前端请求到 SpringMVC 的处理流程

1. 发起请求 客户端通过 HTTP 协议向服务器发起请求。 2. 前端控制器&#xff08;DispatcherServlet&#xff09; 这个请求会先到前端控制器 DispatcherServlet&#xff0c;它是整个流程的入口点&#xff0c;负责接收请求并将其分发给相应的处理器。 3. 处理器映射&#xf…

安卓项目:app注册/登录界面设计

目录 第一步&#xff1a;设计视图xml 第二步&#xff1a;编写登录和注册逻辑代码 运行效果展示&#xff1a; 总结&#xff1a; 提前展示项目结构&#xff1a; 第一步&#xff1a;设计视图xml 在layout目录下面创建activity_login.xml和activity_main.xml文件 activity_lo…

rust学习(手动写一个线程池)

哈哈&#xff0c;主要是为了练习一下rust的语法&#xff0c;不喜勿喷。 一.Executor申明 struct AExecutor<T> {results:Arc<Mutex<HashMap<u32,T>>>, //1functions:Arc<Mutex<Vec<ATask<T>>>> //2 } 1.results&#xff1a…

docker-compose up -d使用遇到问题no configuration file provided: not found

docker-compose up -d使用遇到问题&#xff0c;因为你文件名称没指定&#xff0c; 又找不到默认的文件名称&#xff1b;如果该目录下有个文件叫docker-compose.yml时&#xff0c;那么可以直接使用docker-compose up -d;否则就要使用docker-compose -f mysql up -d

IP数据报格式

每一行都由32位比特&#xff0c;即4个字节组成&#xff0c;每个格子称为字段或者域。IP数据报由20字节的固定部分和最大40字节的可变部分组成。 总长度 总长度为16个比特&#xff0c;该字段的取值以字节为单位&#xff0c;用来表示IPv4数据报的长度(首部长度数据载荷长度)最大…

【阿里云系列】-基于云效构建部署Springboot项目到ACK

介绍 为了提高项目迭代的速度加速交付产品给客户&#xff0c;我们通常会选择CICD工具来减少人力投入产生的成本&#xff0c;开源的工具比如有成熟的Jenkins&#xff0c;但是本文讲的是阿里云提高的解决方案云效平台&#xff0c;通过配置流水线的形式实现项目的快速部署到服务器…

(第73天)DBUA 升级:单机 11GR2 升级到 19C

前言 Oracle 11GR2 版本是上一个长期稳定版本,但是官方已与 2020 年停止服务,官方建议升级到最新长期稳定版 19C。 参考官方文档:当前数据库版本的发行时间表 (Doc ID 1626244.1) Database Upgrade Assistant (DBUA) 交互式的引导我们完成升级数据库的步骤,它会自动执行为…

前端去除网页水印

按F12&#xff0c;打开开发者工具面板&#xff0c;然后直接在样式搜索backgroud 然后直接取消backgroud 的复选框即可。