目录
- 1 线程不安全
- 2 线程同步方式
- 2.1 简单的阻塞方法
- 2.2 锁
- 2.2.1 Lock使用
- 2.2.2 互斥体Mutex
- 2.2.3 信号量Semaphore
- 2.2.3 轻量级信号量SemaphoreSlim
- 2.2.4 读写锁ReaderWriterLockSlim
- 2.3 信号同步
- 2.3.1 AutoResetEvent
- 2.3.1.1 AutoResetEvent实现双向信号
- 2.3.2 ManualResetEvent
- 2.3.3 CountdownEvent
- 2.3 原子操作
1 线程不安全
class ThreadTest
{
bool done;
static void Main()
{
ThreadTest tt = new ThreadTest(); // 创建一个公共的实例
new Thread (tt.Go).Start();
tt.Go();
}
// 注意: Go现在是一个实例方法
void Go()
{
if (!done) { Console.WriteLine ("Done"); done = true; }
}
}
这个代码示例可能会输出两个Done ,也有可能输出一个Done。
这个问题是因为一个线程对if中的语句估值的时候,另一个线程正在执行WriteLine语句,这时done还没有被设置为true。所以程序的输出结果是不确定的。显然这在实际中开发是允许的。
当多个线程共享资源时,就会因为线程调度的不确定性导致线程不安全问题(即线程的执行没有正确的同步)。
修复这个问题需要在读写公共字段时,获得一个排它锁(互斥锁,exclusive lock )。C# 提供了lock来达到这个目的:
class ThreadSafe
{
static bool done;
static readonly object locker = new object();
static void Main()
{
new Thread (Go).Start();
Go();
}
static void Go()
{
lock (locker)
{
if (!done) { Console.WriteLine ("Done"); done = true; }
}
}
}
两个线程同时争夺一个锁的时候(例子中的locker),一个线程等待,或者说阻塞(释放cpu时间片),直到锁变为可用。这样就确保了在同一时刻只有一个线程能进入临界区(critical section,不允许并发执行的代码),所以 “ Done “ 只被打印了一次。像这种用来避免在多线程下的不确定性的方式被称为 线程安全(thread-safe)。根据上述分析可知,保证线程安全的方式其实就是 对共享对象的操作能够以正确的顺序执行,通常被称作为线程同步
2 线程同步方式
线程不安全的问题发生的主要原因是因为多个线程竞争共享的资源,导致问题发生的原因是多线程的执行并没有正确同步
当在同一时刻多个线程操作共享资源时就会导致数据的错误,但是如果在单一线程中按照顺序就不出现这样的问题,这也就引申出线程同步的内容,保证多个线程提升性能的前提下,也不会出现程式数据的错误,重点就是让多个线程按照一定的顺序同步的执行代码,就是线程同步的概念。
2.1 简单的阻塞方法
这些方法会使当前线程等待另一个线程结束或是自己等待一段时间。Sleep、Join与Task.Wait都是简单的阻塞方法。
使用上述阻塞方法后,处于阻塞状态,让出了CPU时间片。此时线程调度器会保存等待线程的状态,并切换到另一个线程,直到等待的线程重新获得CPU时间片。
这种模式下, 由于阻塞可以让线程按照一定的顺序执行代码,但是这也意味着至少会引入一次上下文切换,一定程度上耗费了资源。通常建议,当线程被挂起很长时间时,这种阻塞是值得的。
若线程只需要等待一小段时间,最好只是简单的等待,而不用将线程切换到阻塞状态。虽然线程等待会耗费CPU 时间,但是我们节省了上下文切换的CPU时间和资源。这种方式非常轻量,速度很快。
比如while(flag)
2.2 锁
锁构造能够限制每次可以执行某些动作或是执行某段代码的线程数量。排它锁构造是最常见的,它每次只允许一个线程执行,从而可以使得参与竞争的线程在访问公共数据时不会彼此干扰。标准的排它锁构造是lock(一种语法糖,本质上是调用Monitor.Enter/Monitor.Exit方法)、Mutex与 SpinLock(自旋锁)。非排它锁构造是Semaphore、SemaphoreSlim以及读写锁。
2.2.1 Lock使用
class ThreadSafe
{
static readonly object _locker = new object();
static int _val1, _val2;
static void Go()
{
lock (_locker)
{
if (_val2 != 0) Console.WriteLine (_val1 / _val2);
_val2 = 0;
}
}
}
lock关键字在C# 4.0编译器产生的代码为
bool lockTaken = false;
try
{
Monitor.Enter (_locker, ref lockTaken);
// 你的代码...
}
finally { if (lockTaken) Monitor.Exit (_locker); }
lock 排它锁的使用,确保了多个线程在访问竞态代码块时,只有一个线程是获得CPU时间片的,其他的线程处于阻塞中,并处于一个等待队列中。直到锁被释放,等待的线程属于先到先得的情形,依次等待获得锁去执行竞态代码块,保证了线程同步,因此可以保证线程的安全。
2.2.2 互斥体Mutex
/// <summary>
/// Mutex是一种原始同步的操作
/// 互斥量 只有一个线程能持有这个互斥量,并阻塞其他线程
/// 相较于lock关键字而言,虽然都能够构建同步代码
/// 其中lock更快,使用也更方便。而Mutex的优势是它可以跨进程的使用。
/// </summary>
public class MutexWork
{
Mutex mut = new Mutex();
public void Method3(object threadId) {
// 命名的 Mutex 是进程范围的,它的名称需要是唯一的
string mutexName = "Foxconn168!";
//为了正确的关闭锁,通常使用using代码块来包围互斥体锁
using (var mutex = new Mutex(false, mutexName))
{
// 使用mutex.WaitOne()方法来获得锁
// 可能其它程序实例正在关闭,所以可以等待几秒来让其它实例完成关闭
if (!mutex.WaitOne(TimeSpan.FromSeconds(3), false))
{
Console.WriteLine("Another app{0} instance is running. Bye!",threadId);
Console.WriteLine(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"));
return;
}
RunProgram(threadId);
}
}
public void RunProgram(object threadId) {
Console.WriteLine("Running {0}. Press Enter to exit",threadId);
Console.WriteLine(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"));
Console.ReadLine();
}
}
static void Main(string[] args) {
MutexWork work = new MutexWork();
//使用ParameterizedThreadStart来传递参数时,需要保证方法参数类型为object,参数有且仅有一个
Thread t1 = new Thread(work.Method3);
Thread t2 = new Thread(work.Method3);
t1.Start(1);
t2.Start(2);
}
这里使用两个线程来演示互斥体的用法。线程1获得mutex锁后,并执行RunProgram方法,需要等待控制台输入空格符。线程2在用户输入空格符前,等待3s以获得mutex锁,当没有获得锁后输出Another app2 instance is running. Bye!
2.2.3 信号量Semaphore
Semaphore限制了同时访问同一个资源的线程数量,信号量在有限并发的需求中有用,它可以阻止过多的线程同时执行特定的代码段。通过协调各个线程,以保证合理的使用资源。
可以用上厕所的行为来类比Semaphore。一个厕所的容量是一定的。一旦满员,就不允许其他人进入,其他人将在外面排队。当有一个人离开时,排在最前头的人便可以进入。
public class SeamphoreWork
{
//定义信号量,总容量为3,同时允许最多3个线程访问资源
//使用 Semaphore(int initialCount, int maximumCount, string name)构造函数初始化信号量
//initialCount 初始空闲容量 maximumCount 最大容量 name 信号量名称
Semaphore seamphore = new Semaphore(1,3, "Semaphore_One");
/// <summary>
/// 模拟上厕所
/// </summary>
public void EnterToilet(int threadId,int waitTime) {
Console.OutputEncoding = Encoding.Unicode;
Console.WriteLine("{0} wants to enter",threadId);
seamphore.WaitOne(); //线程调用WaitOne,信号空闲容量计数减一。当容量为零时,后续请求会阻塞,直到其他线程释放信号灯。
Console.WriteLine("{0} has entered the Toilet {1}",threadId,DateTime.Now.ToString("yyyy-mm-dd HH:mm:ss"));
Thread.Sleep(waitTime); //线程阻塞模拟上厕所的时耗费的时间
seamphore.Release(); //释放信号量,可用容量增加一
Console.WriteLine("{0} has left the Toilet {1}", threadId, DateTime.Now.ToString("yyyy-mm-dd HH:mm:ss"));
}
}
static void Main(string[] args)
{
SeamphoreWork seamphore = new SeamphoreWork();
for (int i = 0; i < 5; i++)
{
int tempName = i;
int waitTime = (i + 1) * 1000;
Thread t = new Thread(() => seamphore.EnterToilet(tempName, waitTime));
t.Start();
}
}
容量为 1 的信号量与Mutex和lock类似,所不同的是信号量没有“所有者”,它是线程无关(thread-agnostic)的。任何线程都可以在调用Semaphore上的Release方法,而对于Mutex和lock,只有获得锁的线程才可以释放。类似于Mutex,命名的Semaphore也可以跨进程使用
2.2.3 轻量级信号量SemaphoreSlim
SemaphoreSlim是 Framework 4.0 加入的轻量级的信号量,功能与Semaphore相似,不同之处是它对于并行编程的低延迟需求做了优化。在Semaphore上调用WaitOne或Release会产生大概 1 微秒的开销,而SemaphoreSlim产生的开销约是其四分之一。但它不能跨进程使用。
public class SeamaphoreSlimWork
{
//定义信号量,总容量为3,同时允许3个线程访问资源
SemaphoreSlim seamphore = new SemaphoreSlim(3);
/// <summary>
/// 模拟上厕所
/// </summary>
public void EnterToilet(int threadId, int waitTime)
{
Console.WriteLine("{0} wants to enter", threadId);
seamphore.Wait(); //进入信号量,有效容量减一
Console.WriteLine("{0} has entered the Toilet {1}", threadId, DateTime.Now.ToString("yyyy-mm-dd HH:mm:ss"));
Thread.Sleep(waitTime);
seamphore.Release(); //释放信号量,有效容量加一
Console.WriteLine("{0} has left the Toilet {1}", threadId, DateTime.Now.ToString("yyyy-mm-dd HH:mm:ss"));
}
}
2.2.4 读写锁ReaderWriterLockSlim
通常,一个类型的实例对于并发读操作是线程安全的,但对并发的更新操作却不是(并发读然后更新也不是)。尽管可以简单的对所有访问都使用排它锁来确保这种类型的实例是线程安全的,但对于有很多读操作而只有少量更新操作的情况,它就会过度限制并发能力。如浏览淘宝APP,更多的用户是在进行读操作而不是写操作。在这种情况下, R e a d e r W r i t e r L o c k S l i m \textcolor{red}{ReaderWriterLockSlim} ReaderWriterLockSlim类被设计用来提供高可用性的锁。
这个类有两种基本类型的锁,读锁和写锁:
- 写锁完全的排它。
- 读锁可以与其它的读锁相容。
所以,一个线程持有写锁会阻塞其它想要获取读锁或写锁的线程,如果没有线程持有写锁,任意数量的线程可以同时获取读锁。
ReaderWriterLockSlim定义了如下的方法来获取和释放读 / 写锁:
public void EnterReadLock();
public void ExitReadLock();
public void EnterWriteLock();
public void ExitWriteLock();
/// <summary>
/// ReaderWriterLockSlim 写锁阻塞所有的读写锁,在不持有写锁的情况下,所有的线程都可以持有读锁去写数据
/// </summary>
/// <param name="args"></param>
static void Main(string[] args)
{
Console.OutputEncoding = Encoding.Unicode;
Random _rand = new Random();
List<int> list = new List<int>();
ReaderWriterLockSlim _rw = new ReaderWriterLockSlim();//读写锁
//读数据
void Read() {
while (true)
{
Console.WriteLine(_rw.CurrentReadCount + " concurrent readers");
_rw.EnterReadLock();
foreach (int i in list) Thread.Sleep(10);
_rw.ExitReadLock();
}
}
//写数据
void Write(object threadID) {
while (true)
{
int newNumber = GetRandNum(100);
_rw.EnterWriteLock();
list.Add(newNumber);
_rw.ExitWriteLock();
Console.WriteLine("Thread " + threadID + " added " + newNumber);
Thread.Sleep(100);
}
}
int GetRandNum(int max) { lock (_rand) return _rand.Next(max); }
//3个线程读数据 ,2 个线程写数据(读线程和写线程均是后台线程)
new Thread(Read) { IsBackground=true}.Start();
new Thread(Read) { IsBackground = true }.Start();
new Thread(Read) { IsBackground = true }.Start();
new Thread(Write) { IsBackground = true }.Start("A");
new Thread(Write) { IsBackground = true }.Start("B");
//主线程休眠30s
Thread.Sleep(TimeSpan.FromSeconds(30));
}
通常需要添加try / finally块来确保抛出异常时锁能够被释放。
2.3 信号同步
信号同步就是一个线程进行等待,直到它收到其它线程的通知的过程。它们有三个成员:AutoResetEvent、ManualResetEvent以及CountdownEvent( Framework 4.0 中加入)。前两个的功能基本都是在它们的基类EventWaitHand
2.3.1 AutoResetEvent
AutoResetEvent就像验票闸机:插入一张票,就只允许一个人通过。多个用户(线程)等待闸机开放时,会阻塞等待。待人通过后,闸机会自动关闭。直到下一个人插入票。
在这个用户(线程)等待的过程,收到了另一个用户(线程)插入票的信号,阻塞态变为运行态。
在闸机处调用
W
a
i
t
O
n
e
\textcolor{red}{WaitOne}
WaitOne方法,等待这个闸机打开,线程就会进入等待或者说阻塞。如果有多个线程调用WaitOne,便会在闸机前排队(与锁同样,由于操作系统的差异,这个等待队列的先入先出顺序有时可能被破坏)。
票的插入则通过调用
S
e
t
\textcolor{red}{Set}
Set方法。票可以来自任意线程,换句话说,任何能够访问这个AutoResetEvent对象的(非阻塞)线程都可以调用Set方法来放行一个被阻塞的线程。
在接下来的例子中,一个线程开始等待直到收到另一个线程的信号。
static void Main(string[] args)
{
AutoResetEvent autoResetEvent = new AutoResetEvent(false);
Console.OutputEncoding = Encoding.Unicode;
//等待事件
void Waiter(int threadId)
{
Console.WriteLine("{0} Waiting...",threadId);
autoResetEvent.WaitOne(); // 等待通知
Console.WriteLine("{0} Notified", threadId);
}
Thread t1 = new Thread(()=>Waiter(1));
t1.Start();
Thread.Sleep(5000);//主线程休眠5s
Console.WriteLine("主线程发出唤醒信号");
//主线程发出信号,唤醒t1线程
autoResetEvent.Set();
}
2.3.1.1 AutoResetEvent实现双向信号
/// <summary>
/// 定义两个AutoResetEvent实例,其中一个是工作线程向主线程发信号,另一个实例是从主线程向工作线程发限号。
/// </summary>
/// <param name="args"></param>
static void Main(string[] args)
{
//主线程信号句柄,初始化等待工作线程
AutoResetEvent mainThreadSignal = new AutoResetEvent(false);
//工作线程句柄
AutoResetEvent workThreadSignal = new AutoResetEvent(false);
Console.OutputEncoding = Encoding.Unicode;
Thread t1 = new Thread(Process);
t1.Start();
void Process() {
Console.WriteLine("工作线程准备中");
Thread.Sleep(5_000); //模拟工作线程准备工作
mainThreadSignal.Set(); //通知主线程,工作线程已准备完毕
workThreadSignal.WaitOne();
Console.WriteLine("我是工作线程,我要处理工作业务了");
Thread.Sleep(5_000); //模拟工作线程处理业务
}
Console.WriteLine("主线程等待工作线程准备中");
mainThreadSignal.WaitOne();//主线程先等待
Console.WriteLine("工作线程准备完毕,主线程通知工作线程去完成任务");
workThreadSignal.Set(); //唤醒工作线程
}
2.3.2 ManualResetEvent
ManualResetEvent就像一个普通的门。调用 S e t \textcolor{red}{Set} Set 方法打开门,允许任意数量的线程调用 W a i t O n e \textcolor{red}{WaitOne} WaitOne方法来通过。调用 R e s e t \textcolor{red}{Reset} Reset方法关闭门。如果线程在一个关闭的门上调用WaitOne方法将会被阻塞,当门下次打开时,会被立即放行。除这些不同以外,ManualResetEvent就和AutoResetEvent差不多了。
M a n u a l R e s e t E v e n t 在需要让一个线程解除其它多个线程的阻塞时有用。 \textcolor{blue}{ManualResetEvent在需要让一个线程解除其它多个线程的阻塞时有用。} ManualResetEvent在需要让一个线程解除其它多个线程的阻塞时有用。
/// <summary>
/// 一个线程解除其它多个线程的阻塞态
/// </summary>
/// <param name="args"></param>
static void Main(string[] args)
{
//ManualResetEvent(bool initialState)
//初始态 门是关闭的
ManualResetEvent signal = new ManualResetEvent(false);
void EnterGate() {
string name = Thread.CurrentThread.Name;
Console.WriteLine(name + " starts and calls mre.WaitOne()");
signal.WaitOne();
Console.WriteLine(name + " ends.");
}
for (int i = 0; i < 3; i++) {
Thread t = new Thread(EnterGate);
t.Name = $"Thread_{0}";
t.Start();
}
Thread.Sleep(2_000);
//唤醒所有阻塞中的线程
signal.Set();
}
2.3.3 CountdownEvent
与ManualResetEvent让一个线程解除其它多个线程相反,CountdownEvent 可以让你等待 n 个线程,直到n个线程均发出信号后,解除等待线程的阻塞态。与Java多线程中的CountDownLatch功能类似。
/// <summary>
/// 等待多个线程
/// </summary>
/// <param name="args"></param>
static void Main(string[] args)
{
Console.OutputEncoding = Encoding.Unicode;
CountdownEvent countdownEvent = new CountdownEvent(3);
void DoWork() {
Thread.Sleep(2_000);//模拟单个线程执行任务的时间
countdownEvent.Signal();
}
for (int i = 0; i < 3; i++) {
new Thread(DoWork).Start();
}
countdownEvent.Wait();//主线程等待
Console.WriteLine("所有的工作线程发出信号后执行");
}
值得注意的是,如果调用Signal()没有达到指定的次数,那么Wait()将会一直等待。所有请确保使用CountDownEvent时,所有的线程完成后都要调用Signal()方法。
2.3 原子操作
所谓原子操作是指不会被线程调度机制打断的操作;这种操作一旦开始,就一直运行到结束,中间不会有任何的线程切换。在c#中提供了对int类型读写的原子操作类 I n t e r l o c k e d \textcolor{red}{Interlocked} Interlocked
/// <summary>
/// 提供了Interlocked类来实现原子操作,其方法有Add、Increment、Decrement、Exchange、CompareExchange等,
/// 可以使用原子操作进行加法、加一、减一、替换、比较替换等操作
/// </summary>
/// <param name="args"></param>
static void Main(string[] args)
{
//初始值
int a = 0;
int b = 0;
//+1 a++
void Increment() {
for (int i = 0; i < 20000; i++) {
a++;
}
}
//原子性+1
void IncrementAtomic()
{
for (int i = 0; i < 20000; i++)
{
Interlocked.Increment(ref b);
}
}
CountdownEvent countdown = new CountdownEvent(10);
for (int i = 0; i < 5; i++) {
new Thread(Increment).Start();
countdown.Signal();
}
for (int i = 0; i < 5; i++)
{
new Thread(IncrementAtomic).Start();
countdown.Signal();
}
countdown.Wait();
Console.WriteLine(a);
Console.WriteLine(b);
}
a++ 是线程不安全的操作,因为是非原子性的。在底层系统执行这个加一操作时分为3个步骤:
(1)从内存中将该变量加载带CPU寄存器中
(2)CPU对该变量进行加一操作
(3)将该变量从CPU寄存器返回内存中
在多线程同时操作a++操作时,会因为线程不同步的问题而造成线程不安全的问题
Interlocked类会将上述步骤合成一个动作,在没有执行完成的时候不会进行线程上下文的切换,所以保证了线程的安全。