线程池应用
- 线程池
- 线程池应用
- 多线程应用
- 同步和异步
- 1. 需要等待结果
- 1. join 实现(同步)
- 2. Future 实现(同步)
- 3.CompletableFuture 实现(异步)
- 4. BlockingQueue 实现(异步)
- 2. 不需等待结果
- 1. 普通线程实现
- 2. 线程池实现
- 3. CompletableFuture 实现
线程池
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>30.0-jre</version>
</dependency>
import com.google.common.util.concurrent.ThreadFactoryBuilder;
// 自定义线程池
static ExecutorService executorService = new ThreadPoolExecutor(3, 5, 100, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(20),
new ThreadFactoryBuilder().setNameFormat("log-thread-pool-%d").build(),
new ThreadPoolExecutor.AbortPolicy());
线程池应用
public class ThreadPoolExecutorController {
@GetMapping("/")
public ResultVO testTheadPool() throws ExecutionException, InterruptedException {
ThreadPoolExecutor threadPoolExecutor = ThreadPoolExecutorUtils.getThreadPoolExecutor();
ArrayList<BuildXmlTask> list = Lists.newArrayList();
for (int i = 0; i < 10; i++) {
BuildXmlTask buildXmlTask = new BuildXmlTask();
list.add(buildXmlTask);
}
List<Future<BuildXmlDTO>> futures = threadPoolExecutor.invokeAll(list);
ArrayList<BuildXmlDTO> buildXmlDTOS = Lists.newArrayList();
for (Future<BuildXmlDTO> future : futures) {
if (future != null) {
buildXmlDTOS.add(future.get());
}
}
return ResultVO.ok(buildXmlDTOS);
}
}
@Data
class BuildXmlDTO {
private String code;
private String msg;
}
@Slf4j
class BuildXmlTask implements Callable<BuildXmlDTO> {
@Override
public BuildXmlDTO call() throws Exception {
log.info("{}",Thread.currentThread().getName());
BuildXmlDTO buildXmlDTO = new BuildXmlDTO();
buildXmlDTO.setCode("200");
buildXmlDTO.setMsg("发送成功");
return buildXmlDTO;
}
}
多线程应用
同步和异步
同步和异步通常来形容一次方法调用,同步方法调用一旦开始,调用者必须等到方法调用返回后,才能继续后续的行为。异步方法调用更像一个消息传递,一旦开始,方法调用就会立即返回,调用者就可以继续后续的操作。而异步方法通常会在另外一个线程中“真实”地执行。整个过程,不会阻碍调用者的工作。
1. 需要等待结果
这时既可以使用同步处理,也可以使用异步来处理
1. join 实现(同步)
@Slf4j
public class demo01 {
static int result = 0;
public static void main(String[] args) throws InterruptedException {
log.info("main开始");
Thread thread = new Thread(() -> {
log.info("子线程开始");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
result = 10;
log.info("子线程结束");
},"join方法同步线程");
thread.start();
thread.join();
log.info("结果为:{}", result);
}
}
输出
缺点:
1、需要外部共享变量,不符合面向对象封装的思想
2、必须等待线程结束,不能配合线程池使用
2. Future 实现(同步)
@Slf4j
public class demo02 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
log.info("main线程开始");
FutureTask<Integer> result = new FutureTask<>(() -> {
log.info("任务线程开始");
TimeUnit.SECONDS.sleep(1);
log.info("任务线程结束");
return 10;
});
new Thread(result, "Future实现(同步)").start();
log.info("结果为:{}", result.get());
log.info("main线程结束");
}
}
-
规避了使用 join 之前的缺点
-
可以方便配合线程池使用
private static void userThreadPool() throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(1);
log.info("main线程开始");
// execute(): 无返回值, submit(): 有返回值
Future<?> result = executorService.submit(() -> {
log.info("开始");
TimeUnit.SECONDS.sleep(1);
log.info("结束");
return 10;
});
log.info("结果为:{}, result 的类型:{}", result.get(), result.getClass());
executorService.shutdown();
log.info("main线程结束");
}
-
仍然是 main 线程接收结果
-
get 方法是让调用线程同步等待
3.CompletableFuture 实现(异步)
@Slf4j
public class demo03 {
public static void main(String[] args) {
// 进行计算的线程池
ExecutorService computeService = Executors.newFixedThreadPool(1);
// 接收结果的线程池
ExecutorService resultService = Executors.newFixedThreadPool(1);
log.info("main线程开始");
CompletableFuture.supplyAsync(() -> {
log.info("子线程开始");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("子线程结束");
return 10;
}, computeService).thenAcceptAsync((result) -> {
log.info("结果为:{}", result);
}, resultService);
log.info("main线程结束");
}
}
1、可以让调用线程异步处理结果,实际是其他线程去同步等待
2、可以方便地分离不同职责的线程池
3、以任务为中心,而不是以线程为中心
4. BlockingQueue 实现(异步)
@Slf4j
public class demo04 {
public static void main(String[] args) throws InterruptedException {
ExecutorService consumer = Executors.newFixedThreadPool(1);
ExecutorService producer = Executors.newFixedThreadPool(1);
BlockingQueue<Integer> queue = new SynchronousQueue<>();
log.info("main线程开始");
producer.submit(() -> {
log.info("生产出添加任务开始");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("生产出添加任务结束");
try {
queue.put(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
consumer.submit(() -> {
log.info("消费者消费 开始");
try {
Integer result = queue.take();
log.info("结果为:{}", result);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
log.info("main线程结束");
}
}
2. 不需等待结果
这时最好是使用异步来处理
1. 普通线程实现
@Slf4j
public class demo05 {
public static void main(String[] args) throws InterruptedException {
read();
log.info("do other things ...");
}
public static void read() throws InterruptedException {
String shortName = "a.txt";
long start = System.currentTimeMillis();
log.info("read [{}] start ...", shortName);
TimeUnit.SECONDS.sleep(4);
long end = System.currentTimeMillis();
log.info("read [{}] end ... cost: {} ms", shortName, end - start);
}
}
没有用线程时,方法的调用是同步的:
使用了线程后,方法的调用是异步的:
// 1、普通线程
new Thread(()->{
try {
read();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
log.info("do other things ...");
输出
2. 线程池实现
// 2、线程池实现
ExecutorService service = Executors.newFixedThreadPool(1);
service.execute(()->{
try {
read();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
log.info("do other things ...");
service.shutdown();
3. CompletableFuture 实现
// 3、CompletableFuture 实现
CompletableFuture.runAsync(() -> {
try {
read();
} catch (InterruptedException e) {
e.printStackTrace();
}
},service);
service.shutdown();
log.info("do other things ...");