基于Parallel.ForEach的数据并行使用
1.数据非并行
var items = new int[] { 1, 2, 3, 4, 5, 6, 7, 8, 9 };
DateTime t1 = DateTime.Now;
foreach (var item in items)
{
Console.WriteLine("数据非并行输出:{0}", item);
}
2.数据并行,只要使用Parallel.ForEach
Parallel.ForEach(items, item => Console.WriteLine("数据并行输出:{0}", item));
3.非并行与并行耗时对比
var items = new int[] { 1, 2, 3, 4, 5, 6, 7, 8, 9 };
DateTime t1 = DateTime.Now;
foreach (var item in items)
{
Console.WriteLine("数据非并行输出:{0}", item);
}
DateTime t2 = DateTime.Now;
TimeSpan t3 = t2 - t1;
Console.WriteLine("非数据并行输出耗费时间:毫秒:{0},微秒:{1}", t3.Milliseconds, t3.Microseconds);
t1 = DateTime.Now;
Parallel.ForEach(items, item => Console.WriteLine("数据并行输出:{0}", item));
t2 = DateTime.Now;
t3 = t2 - t1;
Console.WriteLine("数据并行输出耗费时间:毫秒:{0},微秒:{1}", t3.Milliseconds, t3.Microseconds);
基于Parallel.For的数据并行使用
1. Parallel.For返回一个ParallelLoopResult结构
public struct ParallelLoopResult
{
internal bool _completed;
internal long? _lowestBreakIteration;
public bool IsCompleted => _completed;
public long? LowestBreakIteration => _lowestBreakIteration;
}
下面的result类型为 ParallelLoopResult结构
var result = Parallel.For(1, 101, (i, state) => {
});
完整示例:
//Parallel.For返回一个ParallelLoopResult结构
var result = Parallel.For(1, 101, (i, state) => {
int delay;
lock (rnd);
delay = rnd.Next(1, 1001);//随机生成1到1001之间随机数
Thread.Sleep(delay);//随机休眠线程
//循环调用了退出方法Break
if (state.ShouldExitCurrentIteration)
{
if (state.LowestBreakIteration < i)
{
Console.WriteLine("循环调用了退出方法Break()");
return;
}
}
if (i == breakIndex)
{
Console.WriteLine($"随机数与索引相同,将退出循环,索引: {i}");
state.Break();//退出循环
}
Console.WriteLine($"完成循环遍历,当前循环索引: {i}");
});
if (result.LowestBreakIteration.HasValue)
Console.WriteLine($"\nLowest Break Iteration: {result.LowestBreakIteration}");
else
Console.WriteLine($"\nNo lowest break iteration.");
基于Parallel.ForEach的数据并行操作数据并使用局部变量
//初始化数组
int[] input = { 4, 1, 6, 2, 9, 5, 10, 3 };
int sum = 0;
try
{
Parallel.ForEach(
input, //要并行遍历的集合
() => 0, //线程本地初始化
//n当前遍历元素,loopState:并行状态对象,localSum局部变量
(n, loopState, localSum) => //匿名函数,localSum会初始化为0
{
localSum += n;//累加元素
//输出当前线程ID,元素值,与累加后的元素值
Console.WriteLine("Thread={0}, n={1}, localSum={2}", Thread.CurrentThread.ManagedThreadId, n, localSum);
//返回累加后的值
return localSum;
},
//任务动作,传入线程本地变量localSum
(localSum) => Interlocked.Add(ref sum, localSum) //多线程元子性操作共享变量
);
//输出并行操作后的结果
Console.WriteLine("\nSum={0}", sum);
}
catch (AggregateException e) //捕获线程操作异常
{
Console.WriteLine("并行操作数据异常\n{0}", e.Message.ToString());
}
基于Parallel.Invoke的任务并行操作
//静态函数实现,供并行任务使用
static void BasicAction()
{
Console.WriteLine("静态方法BasicAction, Thread={0}", Thread.CurrentThread.ManagedThreadId);
}
try
{
Parallel.Invoke(
BasicAction, // 任务1 - 静态方法
() => // 任务2 - lambda表达式
{
Console.WriteLine($"lambda表达式, Thread:{Thread.CurrentThread.ManagedThreadId}");
},
delegate () // 任务3 - 肉联委托
{
Console.WriteLine($"肉联委托, Thread:{Thread.CurrentThread.ManagedThreadId}");
}
);
}
catch (AggregateException e) //捕获任务并行异常
{
Console.WriteLine($"抛出异常:{e.InnerException.ToString()}");
}
基于Parallel.ForEachAsync的异常并行操作,异步方法要添加await关键字
await Parallel.ForEachAsync(Enumerable.Range(1, 100),
async (_, _) => {
await Task.Delay(1000);
});
在任务中使用数据并行:
//创建任务线程
Task _t = new Task(() => {
Console.WriteLine($"这个是任务子线程");
Parallel.ForEach(items, (item, state) =>{
Console.WriteLine($"{item},{state}");
});
});
Console.WriteLine($"任务线程:{_t.Id}");
_t.Start();//开始任务
任务的运行及等待
Thread.CurrentThread.Name = "主线程";
//创建任务并运行
Task taskA = Task.Run(
() => {
Thread.CurrentThread.Name = "任务子线程";
Console.WriteLine($"当前线程名:-> '{Thread.CurrentThread.Name}',线程ID:{Thread.CurrentThread.ManagedThreadId}");
Console.WriteLine("这个是在任务中输出的信息"); //使用匿名函数
}
);
Console.WriteLine($"当前线程名: '{Thread.CurrentThread.Name}',线程ID:{Thread.CurrentThread.ManagedThreadId}");
taskA.Wait();//等待任务完成
任务分离:
//分离任务
var outer = Task.Factory.StartNew(() =>
{
Console.WriteLine("任务开始...");
var child = Task.Factory.StartNew(() =>
{
Thread.SpinWait(5000000);
Console.WriteLine("任务分离成功.");
});
});
outer.Wait();
Console.WriteLine("任务结束.");
任务阻塞:
//阻塞任务
Task[] tasks = new Task[3]
{
Task.Factory.StartNew(() => Console.WriteLine("任务1.")),
Task.Factory.StartNew(() => Console.WriteLine("任务2.")),
Task.Factory.StartNew(() => Console.WriteLine("任务3."))
};
Task.WaitAll(tasks);//阻塞直接所有任务完成
多任务使用:
//多任务使用
Task[] taskArray = new Task[10];
for (int i = 0; i < taskArray.Length; i++)
{
//使用任务工厂启动任务
taskArray[i] = Task.Factory.StartNew((Object obj) =>
{
CustomData data = obj as CustomData;//实例化类对象
if (data == null) return;
data.ThreadNum = Thread.CurrentThread.ManagedThreadId;//赋值当前线程ID
},
new CustomData() { Name = i, CreationTime = DateTime.Now.Ticks });
};
Task.WaitAll(taskArray);
//遍历任务
foreach (var task in taskArray)
{
var data = task.AsyncState as CustomData;
if (data != null)
Console.WriteLine("任务 #{0} 已创建于 {1}, 在线程 #{2}.",
data.Name, data.CreationTime, data.ThreadNum);
}
class CustomData
{
public long CreationTime;
public int Name;
public int ThreadNum;
}
完整Demo:
// See https://aka.ms/new-console-template for more information
using System;
using System.Diagnostics;
var items = new int[] { 1, 2, 3, 4, 5, 6, 7, 8, 9 };
DateTime t1 = DateTime.Now;
foreach (var item in items)
{
Console.WriteLine("数据非并行输出:{0}", item);
}
DateTime t2 = DateTime.Now;
TimeSpan t3 = t2 - t1;
Console.WriteLine("非数据并行输出耗费时间:毫秒:{0},微秒:{1}", t3.Milliseconds, t3.Microseconds);
t1 = DateTime.Now;
Parallel.ForEach(items, item => Console.WriteLine("数据并行输出:{0}", item));
t2 = DateTime.Now;
t3 = t2 - t1;
Console.WriteLine("数据并行输出耗费时间:毫秒:{0},微秒:{1}", t3.Milliseconds, t3.Microseconds);
var rnd = new Random();
Console.WriteLine("开始遍历...");
int breakIndex = rnd.Next(1, 11);
Console.WriteLine($"Will call Break at iteration {breakIndex}\n");
//Parallel.For返回一个ParallelLoopResult结构
var result = Parallel.For(1, 101, (i, state) => {
int delay;
lock (rnd);
delay = rnd.Next(1, 1001);//随机生成1到1001之间随机数
Thread.Sleep(delay);//随机休眠线程
//循环调用了退出方法Break
if (state.ShouldExitCurrentIteration)
{
if (state.LowestBreakIteration < i)
{
Console.WriteLine("循环调用了退出方法Break()");
return;
}
}
if (i == breakIndex)
{
Console.WriteLine($"随机数与索引相同,将退出循环,索引: {i}");
state.Break();//退出循环
}
Console.WriteLine($"完成循环遍历,当前循环索引: {i}");
});
if (result.LowestBreakIteration.HasValue)
Console.WriteLine($"\nLowest Break Iteration: {result.LowestBreakIteration}");
else
Console.WriteLine($"\nNo lowest break iteration.");
//初始化数组
int[] input = { 4, 1, 6, 2, 9, 5, 10, 3 };
int sum = 0;
try
{
Parallel.ForEach(
input, //要并行遍历的集合
() => 0, //线程本地初始化
//n当前遍历元素,loopState:并行状态对象,localSum局部变量
(n, loopState, localSum) => //匿名函数,localSum会初始化为0
{
localSum += n;//累加元素
//输出当前线程ID,元素值,与累加后的元素值
Console.WriteLine("Thread={0}, n={1}, localSum={2}", Thread.CurrentThread.ManagedThreadId, n, localSum);
//返回累加后的值
return localSum;
},
//任务动作,传入线程本地变量localSum
(localSum) => Interlocked.Add(ref sum, localSum) //多线程元子性操作共享变量
);
//输出并行操作后的结果
Console.WriteLine("\nSum={0}", sum);
}
catch (AggregateException e) //捕获线程操作异常
{
Console.WriteLine("并行操作数据异常\n{0}", e.Message.ToString());
}
//静态函数实现,供并行任务使用
static void BasicAction()
{
Console.WriteLine("静态方法BasicAction, Thread={0}", Thread.CurrentThread.ManagedThreadId);
}
try
{
Parallel.Invoke(
BasicAction, // 任务1 - 静态方法
() => // 任务2 - lambda表达式
{
Console.WriteLine($"lambda表达式, Thread:{Thread.CurrentThread.ManagedThreadId}");
},
delegate () // 任务3 - 肉联委托
{
Console.WriteLine($"肉联委托, Thread:{Thread.CurrentThread.ManagedThreadId}");
}
);
}
catch (AggregateException e) //捕获任务并行异常
{
Console.WriteLine($"抛出异常:{e.InnerException.ToString()}");
}
var watch = Stopwatch.StartNew();
Console.WriteLine(watch.ElapsedMilliseconds);
Console.WriteLine($"当前机器的CPU数量:{Environment.ProcessorCount}");
watch.Restart();
//使用异步并行方法
await Parallel.ForEachAsync(Enumerable.Range(1, 100),
async (_, _) => {
await Task.Delay(1000);
});
watch.Stop();
Console.WriteLine($"花费时间:{watch.ElapsedMilliseconds}");
watch.Restart();
Thread.CurrentThread.Name = "主线程";
//创建任务线程
Task _t = new Task(() => {
Console.WriteLine($"这个是任务子线程");
Parallel.ForEach(items, (item, state) =>{
Console.WriteLine($"{item},{state}");
});
});
Console.WriteLine($"任务线程:{_t.Id}");
_t.Start();//开始任务
Thread.CurrentThread.Name = "主线程";
//创建任务并运行
Task taskA = Task.Run(
() => {
Thread.CurrentThread.Name = "任务子线程";
Console.WriteLine($"当前线程名:-> '{Thread.CurrentThread.Name}',线程ID:{Thread.CurrentThread.ManagedThreadId}");
Console.WriteLine("这个是在任务中输出的信息"); //使用匿名函数
}
);
Console.WriteLine($"当前线程名: '{Thread.CurrentThread.Name}',线程ID:{Thread.CurrentThread.ManagedThreadId}");
taskA.Wait();//等待任务完成
//分离任务
var outer = Task.Factory.StartNew(() =>
{
Console.WriteLine("任务开始...");
var child = Task.Factory.StartNew(() =>
{
Thread.SpinWait(5000000);
Console.WriteLine("任务分离成功.");
});
});
outer.Wait();
Console.WriteLine("任务结束.");
//阻塞任务
Task[] tasks = new Task[3]
{
Task.Factory.StartNew(() => Console.WriteLine("任务1.")),
Task.Factory.StartNew(() => Console.WriteLine("任务2.")),
Task.Factory.StartNew(() => Console.WriteLine("任务3."))
};
Task.WaitAll(tasks);//阻塞直接所有任务完成
//多任务使用
Task[] taskArray = new Task[10];
for (int i = 0; i < taskArray.Length; i++)
{
//使用任务工厂启动任务
taskArray[i] = Task.Factory.StartNew((Object obj) =>
{
CustomData data = obj as CustomData;//实例化类对象
if (data == null) return;
data.ThreadNum = Thread.CurrentThread.ManagedThreadId;//赋值当前线程ID
},
new CustomData() { Name = i, CreationTime = DateTime.Now.Ticks });
};
Task.WaitAll(taskArray);
//遍历任务
foreach (var task in taskArray)
{
var data = task.AsyncState as CustomData;
if (data != null)
Console.WriteLine("任务 #{0} 已创建于 {1}, 在线程 #{2}.",
data.Name, data.CreationTime, data.ThreadNum);
}
class CustomData
{
public long CreationTime;
public int Name;
public int ThreadNum;
}