需求:用户可以开启任务,暂停任务和中止任务。
用户开启任务后,可以随时暂停或者中止。暂停后又可以回到原进度继续运行。
这里写目录标题
- demo版-使用废弃的stop、suspend、resume实现
- 为什么废弃了?
- 不用stop,如何销毁线程呢?
- 正式版
- 延迟版:wait和notify、join和interrupt、LockSupport
- 非延迟版:无法实现
- 分布式集群最终版
demo版-使用废弃的stop、suspend、resume实现
一个MyTask类来实现线程,一个MyButton来模拟界面(最开始是想开线程监听控制台的,但是日志打印的频率不好控制,所以就出现了MyButton类)
package com.example.springbootproject.thread;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class MyTask extends Thread {
@Override
public void run() {
// 开启运行业务代码...
processBusiness();
}
private void processBusiness() {
/**
* 模拟业务运行,不用sleep,因为会抛出打断异常;
*/
for (int i = 0; i < 1000000; i++) {
log.info("business running...{}", i);
for (int j = 0; j < 1000000; j++) {
for (int k = 0; k < 100000; k++) {
for (int l = 0; l < 100000; l++) {
log.info("business running...l is" + l);
for (int m = 0; m < 100000; m++) {
for (int n = 0; n < 1000; n++) {
int aa = i +j +k+m+n;
int bb = aa *aa - m -n -i -j;
for (int o = 0; o < aa; o++) {
bb = aa+ bb;
}
}
}
}
}
}
}
}
public void mySuspend() {
this.suspend();
log.info("suspend success");
}
public void myReStart() {
this.resume();
log.info("resume success");
}
public void myStop() {
this.stop();
log.info("stop success");
}
}
package com.example.springbootproject.thread;
import lombok.extern.slf4j.Slf4j;
import javax.swing.*;
import java.awt.*;
import java.awt.event.*;
@Slf4j
public class MyButton {
public static void main(String[] args) {
// 创建一个 JFrame 窗口
JFrame frame = new JFrame("My Button");
frame.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
// 创建一个 JPanel 面板
JPanel panel = new JPanel();
panel.setLayout(new FlowLayout());
MyTask myTask = new MyTask();
// 创建一个 JButton 按钮
JButton startbutton = new JButton("start");
startbutton.addActionListener(new ActionListener() {
public void actionPerformed(ActionEvent e) {
myTask.start();
log.info("start clicked!");
}
});
final boolean[] flag = {false};
// 创建一个 JButton 按钮
JButton suspendbutton = new JButton("suspend");
suspendbutton.addActionListener(new ActionListener() {
public void actionPerformed(ActionEvent e) {
if (flag[0] == false) {
myTask.mySuspend();
flag[0] =true;
} else {
myTask.myReStart();
flag[0] =false;
}
log.info("suspend clicked!");
}
});
// 创建一个 JButton 按钮
JButton stopButton = new JButton("stop");
stopButton.addActionListener(new ActionListener() {
public void actionPerformed(ActionEvent e) {
log.info("stop clicked!");
myTask.myStop();
}
});
// 将按钮添加到面板
panel.add(startbutton);
panel.add(stopButton);
panel.add(suspendbutton);
// 将面板添加到窗口
frame.getContentPane().add(panel);
// 设置窗口的大小和可见性
frame.setSize(300, 200);
frame.setVisible(true);
}
}
暂停后,输出是32,取消暂停后,又从33开始输出。最后停止线程
但是jdk自带的这三个方法已经废弃了,所以不用。
为什么废弃了?
比如可以随时调用,可能会破坏线程的状态;导致死锁等问题。
但具体的代码还没有分析,先占个位置吧。TODO。
不用stop,如何销毁线程呢?
resume和suspend我们有很多函数可以代替。但是stop呢?
没有好办法。只能让线程里面的代码运行完,自己去关闭。
实际中都是用线程池去提交任务。那线程池的任务cancel可以吗?不可以。因为还是需要我们自己去控制当被打断时的逻辑
futureTask的cancel原码如下。传入一个布尔值,用来控制是否需要去打断当前任务。
- 首先进行cas操作,失败直接返回false;
- 如果设置了可打断,就去打断该任务
- 最后完成任务:里面的代码就是调用LockSupport.unpark打断线程
所以如果我们没有处理该打断标志位或者没有处理好打断异常,代码还是会继续运行。
正式版
正式版是用线程池去提交任务,和实际使用保持一致。
延迟版:wait和notify、join和interrupt、LockSupport
- wait和notify原理: 这俩都是获得了monitor对象(synchronized锁对象)后才能使用。
获得了obj的对象锁,当前线程调用obj.wait(),然后当前线程在其monitor对象上去等待,直到被打断(调用wait()前被打断,调用后被打断,都会抛出异常并清除打断标志)或者 其他线程调用了obj.notify或notifyall才可能会醒来。为啥可能呢?
因为notify唤醒它之后,他还要竞争锁成功才能真正被唤醒,否则就进入阻塞状态。 - join和interrupt: join,他不需要锁。当前线程调用了obj.join(),是当前线程 陷入阻塞。除非obj线程运行完成,或者线程被打断
我觉得无法实时去响应用户的操作,因为你如何让正在运行的 业务线程 去调用wait、join、LockSupport方法呢?
延迟版可以实现,可以对任务进行分步,每一步都可以用一个标志位去判断,如果为true,表示被暂停。
下面贴一个wait和notify版的,其他的join和locksupport也都可以实现,不再赘述
package com.example.springbootproject.thread;
import lombok.extern.slf4j.Slf4j;
import javax.swing.*;
import java.awt.*;
import java.awt.event.ActionEvent;
import java.awt.event.ActionListener;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Slf4j
public class MyButton {
public static void main(String[] args) {
// 创建一个 JFrame 窗口
JFrame frame = new JFrame("My Button");
frame.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
// 创建一个 JPanel 面板
JPanel panel = new JPanel();
panel.setLayout(new FlowLayout());
ExecutorService executorService = Executors.newSingleThreadExecutor();
final MyTask[] myTask = {null};
// 创建一个 JButton 按钮
JButton startbutton = new JButton("start");
startbutton.addActionListener(new ActionListener() {
public void actionPerformed(ActionEvent e) {
myTask[0] = new MyTask();
executorService.submit(myTask[0]);
log.info("start clicked!");
}
});
final boolean[] flag = {false};
// 创建一个 JButton 按钮
JButton suspendbutton = new JButton("suspend");
suspendbutton.addActionListener(new ActionListener() {
public void actionPerformed(ActionEvent e) {
if (flag[0] == false) {
myTask[0].mySuspend();
flag[0] =true;
} else {
myTask[0].myReStart();
flag[0] =false;
}
log.info("suspend clicked!");
}
});
// 创建一个 JButton 按钮
JButton stopButton = new JButton("stop");
stopButton.addActionListener(new ActionListener() {
public void actionPerformed(ActionEvent e) {
log.info("stop clicked!");
myTask[0].myStop();
}
});
// 将按钮添加到面板
panel.add(startbutton);
panel.add(stopButton);
panel.add(suspendbutton);
// 将面板添加到窗口
frame.getContentPane().add(panel);
// 设置窗口的大小和可见性
frame.setSize(300, 200);
frame.setVisible(true);
}
}
package com.example.springbootproject.thread;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class MyTask extends Thread{
private Thread currentRunTask;
@Override
public void run() {
// 注意:this和Thread.currentThread不一样。因为我们使用线程池提交的任务
// 前者是MyTask实例(state =new),后者是当前正在运行的线程(state=running)。
// 而this就是MyTask实例代表的线程,它的状态是new
currentRunTask = Thread.currentThread();
// 开启运行业务代码...
try {
processBusiness();
} catch (Exception e) {
log.info("业务线程终止");
}
}
private String processBusiness() throws InterruptedException {
/**
* 模拟业务运行,
*/
for (int i = 0; i < 1000000; i++) {
log.info("business running...{}", i);
for (int j = 0; j < 1000000; j++) {
for (int k = 0; k < 100000; k++) {
for (int l = 0; l < 100000; l++) {
while (flag) { // 不用if。避免虚假唤醒
synchronized (this) { // 获取
try {
log.info("业务线程 开始wait");
this.wait();
log.info("业务线程结束 wait");
} catch (InterruptedException e) {
log.info("处理业务过程中抛出一个异常");
throw e;
}
}
}
log.info("business running...l is" + l);
for (int m = 0; m < 100000; m++) {
for (int n = 0; n < 10000; n++) {
int aa = i + j + k + m + n;
int bb = aa * aa - m - n - i - j;
for (int o = 0; o < aa; o++) {
bb = aa + bb;
}
}
}
}
}
}
}
return "0";
}
private boolean flag = false;
public void mySuspend() {
flag = true;
log.info("suspend success");
}
public void myReStart() {
synchronized (this) {
flag = false;
this.notifyAll();
log.info("resume success");
}
}
public void myStop() {
currentRunTask.interrupt();
flag = true;
log.info("stop success");
}
}
非延迟版:无法实现
分布式集群最终版
- 如何保证两次请求都打到同一个服务器上呢?
- 上下文切换也消耗时间和内存。如果暂停的线程多了,如何处理呢?
- 暂停多长时间合适呢?如果用户一直暂停呢?
所以,最终只能用数据库来保存才能达到要求。
按照我的需求,用户给我数据A,我处理后返回给他数据B。而且按照正常情况,用户一般不会经常点击暂停。而且用户量不大,但不能保证用户多开。
- 把数据分组(或者叫程序分步骤)。比如用户传了1w数据,我就分每1k条就去判断用户是否点击暂停或者停止。
- 每一步处理完后的数据、进度需要存数据库。可以用redis缓存进度条
- 如果用户点击暂停,使用MQ广播,判断是哪一个服务器正在运行这个任务。然后就结束这个任务。
- 如果用户点击恢复,从服务器上面重新读取文件和数据库里面的匹配,重新运行暂停前没有跑的数据(或者最开始把所有数据都存到数据库,就不用从服务器上面获取文件、解析数据这些操作了。)这里不用MQ了,因为所有的数据都在数据库里面存着,直接继续跑就行了。
- 定时任务清理表。比如每15天,清理一直暂停的任务、已经跑完的数据等。