一、多线程的术语
在学习多线程之前需要先理解有关多线程的术语。
- CPU(中央处理器)或内核/核心是实际执行程序的硬件单元。许多现代CPU都支持同时多线程(Intel称之为超线程),即使一个CPU能表现为多个「虚拟」CPU。
- 进程(process)是某个程序当前正在执行的实例。操作系统的一项基本功能就是管理进程。每个进程都包含一个或多个线程。
- 线程(thread)是操作系统能够进行运算调度的最小单位,也是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流。
- 单线程程序的进程仅包含一个线程;多线程程序则包含多个。
- 在多线程程序中运行具有正确行为的代码,就说代码是线程安全的。代码的线程处理模型是指代码向调用者提出的一系列要求,只有满足这些要求才能保障线程安全。
- 任务是可能出现高延迟的工作单元,作用是产生结果值或希望的副作用。它与线程的区别是:任务代表需要执行的一件工作,而线程代表做这件工作的工作者。
- 线程池是多个线程的集合,通过一定的逻辑决定如何为线程分配工作。当有任务要执行时,它分配池中的一个线程执行任务,任务结束后解除分配,从而使该线程在下次请求额外工作时可用。
二、多线程的实现
如果核心数量足够,每个线程都能分配到一个,那么每个线程都相当于在一台单独的机器上运行。但可惜大多数时候都是线程多、核心少。
为了解决这一矛盾,操作系统通过时间分片机制来模拟多个线程并发运行。即操作系统以极快的速度从一个线程切换到另一个线程,给人留下所有线程都在同时执行的错觉。处理器执行一个线程的时间周期称为时间片或量子。在某个核心上更改执行线程的行动称为上下文切换。
无论是真正的多核运行还是通过时间分片技术模拟,我们说一起进行的两个操作就是并发。实现这种并发操作需要以异步方式调用,被调用操作的执行和完成都独立于调用它的控制流。异步分配的工作与当前控制流并行执行,就实现了并发性。并行编程是指将一个问题分解成较小的部分,异步发起对每一部分的处理,最终使它们全部并发执行。
三、线程处理问题
多线程意味着原本在单一线程中成立的假设在多线程程序中不再成立,这就导致了如下所示的一系列问题。
非原子性
如果一个操作是原子操作,那意味着它要么尚未开始,要么已经完成。然而我们平时编程中的大多数操作都不是原子性的。比如下面这个购票程序
if (tickets > 0)
{
tickets--;
// 出票
// ...
}
如果假定有多个线程同时运行,他们恰好都通过了tickets > 0
的校验,又恰巧系统中只剩一张余票,那么就会只有一个人拿到真正的门票,而其他人拿到的都是虚假的门票。因为部分完成的非原子操作而造成了不一致状态,这是竞态条件的一种特例。
竞态条件造成的不确定性
在缺少线程同步的情况下,操作系统会在它认为合适的任何时间在任何两个线程之间切换上下文。结果就是当两个线程访问同一个对象时,无法预测哪个线程“竞争胜出”并抢先运行。
对于包含竞态条件的代码,其行为取决于上下文切换的时机。这就造成了程序执行的不确定性。有可能1000次执行里面只有1次异常状况。这使得竞态条件难以重现,所以需要依赖长期的压力测试、专业的代码分析工具及专家对代码进行大量分析和检查。此外,在多线程编程中,“越简单越好”也是一条重要的原则。
内存模型的复杂性
现代处理器不会在每次要用到一个变量时都去访问主存,而是在处理器的高速缓存中生成本地拷贝。该缓存定时与主存同步。这意味着如果两个线程在两个不同的处理器上,但都要访问同一个对象中的字段时,它们实际读取的可能不是对方那个位置的实时更新,从而得到了不一样的结果。这就是处理器同步缓存的时机产生了竞态条件。
死锁
为了对线程进行调度以防止竞态条件,开发者有必要对一部分代码进行加锁。即一次只能有一个线程执行这段代码,其他同时到达的线程将被挂起。但锁本身也存在问题,最容易发生的是死锁。当有两个线程都在等待对方释放它们的锁,只有在对方释放了锁之后才能继续,此时就发生了线程阻塞,造成代码彻底死锁。
四、线程类
4.1 Thread类
创建线程
要创建并启动一个线程,需要首先实例化Thread对象并调用Start方法。Thread的最简单的构造器接收一个ThreadStart委托:一个无参数的方法,表示执行的起始位置。
下面代码通过Thread
创建了一个线程,并在该线程中打印“A”。同时在主线程中打印“B”。
public static void ThreadPracticeMain()
{
Thread thread = new Thread(Print);
thread.Start();
for (int i = 0; i < 1000; i++)
{
Console.Write("B");
}
}
private static void Print()
{
for (int i = 0; i < 1000; i++)
{
Console.Write("A");
}
}
运行结果如下
阻塞线程
通过Thread.Sleep()
方法可以使当前线程暂停指定的时间
public static void ThreadPracticeMain()
{
Thread thread = new Thread(SleepTest);
thread.Start();
// 主线程睡眠1000毫秒
Thread.Sleep(1000);
Console.WriteLine("Main Thread Wake");
}
private static void SleepTest()
{
Console.WriteLine("Child Thread Start");
// 子线程睡眠3000毫秒
Thread.Sleep(3000);
Console.WriteLine("Child Thread Wake");
}
运行结果如下
需要注意的是,如果调用的是Thread.Sleep(0)
,会导致当前线程立即放弃自己的时间片,将CPU交给其他线程。这一点与Thread.Yield()
是相同的,只不过Thread.Yield()
只会将资源交给同一个处理器上运行的线程。
通过Thread.Join()
方法可以使当前线程阻塞,等待调用该方法的线程执行完毕后再唤醒。比如下面的例子中,Thread1线程调用了Thread2线程的Join()
方法,那么Thread1就会被阻塞,直到Thread2执行完毕。
private static Thread thread1, thread2;
public static void ThreadPracticeMain()
{
thread1 = new Thread(JointTest1);
thread1.Start();
thread2 = new Thread(JointTest2);
thread2.Start();
}
private static void JointTest1()
{
Console.WriteLine("Child Thread 1 Start");
Thread.Sleep(3000);
// 阻塞当前线程,直到被调用线程执行完毕
thread2.Join();
Console.WriteLine("Child Thread 1 Awake");
}
private static void JointTest2()
{
Console.WriteLine("Child Thread 2 Start");
Thread.Sleep(5000);
Console.WriteLine("Child Thread 2 Awake");
}
运行结果如下
本地状态与共享状态
CLR为每一个线程分配了独立的内存栈,从而保证了局部变量的隔离。比如下面的示例中,定义了一个拥有局部变量的方法,并在主线程与子线程中同时调用。
private static void Test4()
{
Thread thread = new Thread(Print2);
thread.Start();
Print2();
}
private static void Print2()
{
for (int i = 0; i < 5; i++)
{
Console.Write("A");
}
}
每个线程的内存栈上都会有一个独立的i变量副本,因此两个线程的输出不会受到影响。输出的结果是10个A。
如果不同的线程拥有同一个对象的引用,则这些线程之间就共享了数据
private bool _flag;
public static void ThreadPracticeMain()
{
// 类实例
ThreadPractice obj = new ThreadPractice();
Thread thread = new Thread(obj.SharedStateTest);
thread.Start();
obj.SharedStateTest();
}
private void SharedStateTest()
{
if (!_flag)
{
_flag = true;
Console.WriteLine("flag is false");
}
}
因为两个线程均在同一个实例上调用了SharedStateTest()
方法,因此它们共享_flag
字段,结果是只会打印一次“flag is false”。
事实上,上面的例子有极小的概率会打印两次“flag is false”。这是因为SharedStateTest()
方法并不是原子操作,同时又存在共享状态,这就导致了线程安全问题。如果将_flag = true;
与Console.WriteLine("flag is false");
调换一下位置,打印两次的概率就会大大增加。
锁与线程安全
为了解决上面这种线程安全问题,我们可以在读写共享字段时首先获取一个排他锁。使用lock
语句即可实现
private bool _flag;
private readonly object _locker = new();
public static void Test5()
{
// 类实例
ThreadPractice obj = new ThreadPractice();
Thread thread = new Thread(obj.SharedStateTest);
thread.Start();
obj.SharedStateTest();
}
private void SharedStateTest()
{
// 加锁
lock (_locker)
{
if (!_flag)
{
Console.WriteLine("flag is false");
_flag = true;
}
}
}
当两个线程同时竞争一个锁时,一个线程会进行等待(阻塞),直到锁被释放。这样就保证了一次只有一个线程能够进入代码块,因此“flag is false”只会打印一次。采用这种方式进行保护的代码称为线程安全代码。
向线程传递数据
如果我们要给线程的启动方法传递参数,最简单的方式是通过Lambda表达式,并在其中使用指定的参数调用相应方法
public static void ThreadPracticeMain()
{
Thread thread = new Thread(() => PrintParam("Hello world"));
thread.Start();
}
private static void PrintParam(string param)
{
Console.WriteLine(param);
}
另一种则是通过Start()
方法传递参数,但缺点是线程启动方法的参数只能是object类型,因此需要进行类型转换
public static void ThreadPracticeMain()
{
Thread thread = new Thread(PrintParam);
thread.Start("Hello world");
}
private static void PrintParam(object? param)
{
Console.WriteLine(param as string);
}
异常处理
线程执行和线程创建时所处的try/catch/finally语句块无关。下面的示例中,子线程中抛出的异常并不会被主线程的try/catch语句捕获,也就不会输出“发生异常”。
try
{
new Thread(() => throw new NullReferenceException()).Start();
}
catch (Exception e)
{
Console.WriteLine("发生异常");
}
如果将try/catch语句移到线程方法中,则会正常捕获
new Thread(() =>
{
try
{
throw new NullReferenceException();
}
catch (Exception e)
{
Console.WriteLine("发生异常");
}
}).Start();
前台线程与后台线程
一般情况下,显式创建的线程称为前台线程。只要有一个前台线程还在运行,应用程序就仍然保持运行状态。而后台线程则不然。当所有前台线程结束时,应用程序就会停止,且所有运行的后台线程也会随之终止。
我们可以使用IsBackground
属性查询或修改线程的前后台状态
Thread thread = new Thread(() => Console.ReadLine());
thread.IsBackground = false;
thread.Start();
如果IsBackground
设置为false(默认为false),则该线程为前台线程。当主线程结束时,前台线程仍会运行,因此程序会一直等待用户输入。如果将IsBackground
设置为true,则主线程结束后,整个应用程序也会结束,并不会等待用户输入。
线程的优先级
通过线程的Priority
属性可以修改当前线程的优先级,优先级决定了线程在操作系统中分配的执行时间的长短。
Priority
属性的类型是枚举类型ThreadPriority
,它包括以下几个枚举
public enum ThreadPriority
{
Lowest = 0,
BelowNormal = 1,
Normal = 2,
AboveNormal = 3,
Highest = 4
}
如果你希望一个线程比其他进程中的线程有更高的优先级,可以使用System.Diagnostics命名空间下的Process类提高进程本身的优先级
Process p = Process.GetCurrentProcess();
p.PriorityClass = ProcessPriorityClass.High;
信号发送
有时一个线程需要等待来自其他线程的通知,即所谓的信号发送。最简单的信号发送结构是ManualResetEvent
。调用ManualResetEvent.WaitOne()
方法可以阻塞当前线程,直到其他线程调用Set()
“打开”了信号。
// 信号
var signal = new ManualResetEvent(false);
new Thread(() =>
{
Console.WriteLine("等待信号....");
// 等待信号
signal.WaitOne();
// 释放资源
signal.Dispose();
Console.WriteLine("已取得信号....");
}).Start();
Thread.Sleep(2000);
// 打开信号
signal.Set();
上面这段代码中,子线程会等待主线程打开信号,并在2秒后取得信号。
在Set()
调用后,信号发送结构仍然会保持“打开”状态,可以调用Reset()
方法再次将其“关闭”。
线程池
每当启动一个线程时,都需要一定的时间(几百微秒)来创建新的局部变量栈。而线程池通过预先创建一个可回收线程的池子来降低这个开销。
使用线程池需要注意如下几个问题:
- 线程池中的线程都是后台线程。
- 阻塞线程池中的线程将影响性能。
- 我们可以任意设置线程池中线程的优先级,而当将线程归还线程池时其优先级会恢复为普通级别。
通过调用ThreadPool.QueueUserWorkItem()
方法来在线程池上运行代码
ThreadPool.QueueUserWorkItem(e => Console.WriteLine("Hello World"));
Thread.Sleep(2000);
通过Thread.CurrentThread.IsThreadPoolThread
属性可用于确认当前运行的线程是否是一个线程池线程。
4.2 Task类
Thread
是创建并发的底层工具,因而存在一些局限性:
- 无法直接从线程返回结果,需要借助共享字段。
- 线程启动后无法取消。
- 无法在父函数中捕获异常。
- 线程一次只能运行一个任务。
在.NET Framework 4.0后引入的Task
类解决了这些问题。与线程相比,Task是一个更高级的抽象概念,它代表了一个并发操作,而该操作并不一定依赖线程来完成。
启动任务
启动一个基于线程的Task的最简单方式是使用Task.Run(Task类位于System.Threading.Tasks命名空间)静态方法。调用时只需传入一个Action委托
Task.Run(() => Console.WriteLine("Hello World"));
任务默认使用线程池中的线程(它们都是后台线程)。这意味着当主线程结束时,所有的任务也会随之停止。所以上面的代码要正确运行需要阻塞主线程。
等待任务
调用Task.Wait()
方法可以使当前方法阻塞,直到任务完成。这一点与线程类中的Join()
方法类似。
var task = Task.Run(() =>
{
Thread.Sleep(2000);
Console.WriteLine("Hello World");
});
task.Wait();
上面这段代码可以阻塞主线程,直到任务结束后,主线程才继续执行。
长任务
默认情况下,CLR会将任务运行在线程池线程上,这种线程非常适合执行短小的计算密集的任务。如果要执行长时间阻塞的操作,则可以按照以下方式避免使用线程池线程:
Task.Factory.StartNew(() => Thread.Sleep(2000),
TaskCreationOptions.LongRunning);
返回值
Task有一个泛型子类Task<TResult>,它允许任务返回一个值。如果在调用Task.Run时传入一个Func<TResult>委托(或者兼容的Lambda表达式)替代Action就可以获得一个Task<TResult>对象
Task<int> task = Task.Run(() =>
{
Thread.Sleep(2000);
return 3;
});
Console.WriteLine("等待任务返回值....");
Console.WriteLine(task.Result);
如果任务并没有执行完,则调用任务的返回值会阻塞当前线程,直到任务结束。
异常处理
相比于线程,任务处理异常的方式就方便很多。如果任务中的代码抛出一个未处理异常,那么调用Wait()或者访问Task<TResult>的Result属性时,该异常就会被重新抛出。也就是说我们可以在主线程中捕获子线程的异常
var task = Task.Run(() => throw new NullReferenceException());
try
{
task.Wait();
}
catch (Exception e)
{
Console.WriteLine("发生异常");
}
使用Task
的IsFaulted
和IsCanceled
属性可以在不抛出异常的情况下检测出错的任务。如果两个属性都返回了false,则说明没有错误发生;如果IsCanceled
为true,则说明任务抛出了OperationCanceledException;如果IsFaulted
为true,则说明任务抛出了其他类型的异常。
任务延续
延续会告知任务在完成后继续执行后续的操作。延续通常由回调方法实现,该方法会在操作完成后执行。实现延续的方式有两种:
第一种是调用任务的GetAwaiter方法并返回一个awaiter对象。这个对象的OnCompleted方法告知先导任务,当它执行完毕(或者出现错误)时调用一个委托。
var task = Task.Run(() =>
{
Console.WriteLine("Hello World");
Thread.Sleep(2000);
return "Goodbye World";
});
var taskAwaiter = task.GetAwaiter();
taskAwaiter.OnCompleted(() =>
{
Console.WriteLine(taskAwaiter.GetResult());
});
另一种附加延续的方式是调用任务对象的ContinueWith方法
var task = Task.Run(() =>
{
Console.WriteLine("Hello World");
Thread.Sleep(2000);
return "Goodbye World";
});
task.ContinueWith(e =>
{
Console.WriteLine(e.Result);
});
TaskCompletionSource类
TaskCompletionSource
可以创建一个任务,但是这种任务并非那种需要执行启动操作并在随后停止的任务;而是在操作结束或出错时手动创建的“附属”任务。这非常适用于I/O密集型的工作:它不但可以利用任务所有的优点(能够传递返回值、异常或延续),而且不需要在操作执行期间阻塞线程。
它的用法如下
var tcs = new TaskCompletionSource<int>();
new Thread(() =>
{
Thread.Sleep(3000);
tcs.SetResult(123);
}){IsBackground = true}.Start();
var task = tcs.Task;
Console.WriteLine(task.Result);
TaskCompletionSource
的真正作用是创建不绑定线程的任务。例如,假设一个任务需要等待5秒钟,之后返回数字42。我们可以使用Timer
类,由CLR(进而由操作系统)在x毫秒后触发一个事件,而无须使用线程
public static Task<int> GetAnswer()
{
var tcs = new TaskCompletionSource<int>();
var timer = new Timer(5000){AutoReset = false};
timer.Elapsed += delegate
{
timer.Dispose();
tcs.SetResult(42);
};
timer.Start();
return tcs.Task;
}
以上代码会返回一个在5秒钟后完成的任务,其结果为42。通过给任务附加延续,就可以在不阻塞任何线程的情况下打印出这个结果
var taskAwaiter = GetAnswer().GetAwaiter();
taskAwaiter.OnCompleted(()=> Console.WriteLine(taskAwaiter.GetResult()));
Delay()方法
Task.Delay()
方法可以创建一个任务,并在延后指定时间后完成
Task.Delay(3000).ContinueWith(e => Console.WriteLine("Hello World"));
Task.Delay(3000).GetAwaiter().OnCompleted(() => Console.WriteLine("Hello World"));
Task.Delay()
相当于是Thread.Sleep()
的异步版本。
五、参考资料
[1].《C# 8.0本质论》
[2].《C# 8.0核心技术指南》