.NET并发编程-任务函数并行
被门夹过的核桃还能补脑吗
本系列学习在.NET中的并发并行编程模式,实战技巧
本小节开始学习基于任务的函数式并行,以及在.NET中数据并行的实现方式。本系列保证最少代码呈现量,虽然talk is cheap, show me the code被奉为圭臬,我的学习习惯是,只学习知识点,代码不在当下立马要用的时候不会认真去读的,更何况在大多时候在手机阅读更不顺畅。
本小节介绍一种简单的函数组合来并行执行任务方式,达到不阻塞程序提高性能的目的。
1、任务并行
回顾下什么是任务并行,任务并行是在相同或不同的数据集上用时执行多个不同的函数,区别于数据并行是在数据集的元素之间同时执行同一个函数。
生产中可能会涉及不同的任务函数,处理不同的复杂的结构数据,通过利用在.NET提供的一些模型工具箱我们可以较为简便的任务并行跑起来。
2、.NET中的任务并行化支持
由浅入深,.NET1.0开始就提供线程的访问控制。System.Thread,可以代码控制创建启动销毁现场。但线程的创建开销比较大,后面有提供了ThreadPool类,线程池有助于克服性能问题。在初始化期间就加载了一组线程,然后重用这些线程,避免了频繁创建销毁线程的开销。
Action<string> downloadSite = url => {
var content = new WebClient().DownloadString(url);
Console.WriteLine($"The size of the web site {url} is {content.Length}");
};
var threadA = new Thread(() => downloadSite("http://www.nasdaq.com"));
var threadB = new Thread(() => downloadSite("http://www.bbc.com"));
threadA.Start();
threadB.Start();
threadA.Join();
threadB.Join();
ThreadPool.QueueUserWorkItem(o => downloadSite("http://www.nasdaq.com"));
ThreadPool.QueueUserWorkItem(o => downloadSite("http://www.bbc.com"));
像上面所示传统的方式也很繁琐,而且有很多弊端,比如无法获取结果,没有内置通知等。因为又提供了TPL任务并行库。
3、.NET任务并行库
TPL在ThreadPool上实现了很多优化,简化了添加并行的过程,通过Task对象提供支持,以取消和管理状态,处理和传播异常,以及控制工程线程的执行。
TPL提供很多种调度任务的方式,Invoke是最简单的一种。类似的还有Parallel.ForEach
System.Threading.Tasks.Parallel.Invoke(
Action(fun () -> convertImageTo3D (pathCombine "MonaLisa.jpg") (pathCombine "MonaLisa3D.jpg")),
Action(fun () -> setGrayscale (pathCombine "LadyErmine.jpg") (pathCombine "LadyErmineRed.jpg")),
Action(fun () -> setRedscale (pathCombine "GinevraBenci.jpg") (pathCombine "GinevraBenciGray.jpg")))
此方法接收任意数量的action委托参数,并为每一个委托创建任务。但是,action委托没有输入参数,并且返回void,这样的函数会有副作用。当所有任务终止时,Invoke方法将控制权交回给主线程以继续执行后续流程。在并行执行独立的异构任务时,就是针对不同的结构数据,此方法挺有效的。
弊端也很明显,没有输入类型,返回为Void,也就限制了组合使用,执行顺序也无法保障。
4、C#void问题
和Null类似,Void也是一个头疼的问题。函数式编程语言中每一个函数都有返回值,包括与void类似情况的unit类型,但是与void不同的是该值被视为一个值,概念上与bool和int没多大区别。
unit是缺少其他特定值的表达式的类型,像打印日志到控制台,写入文件等,没有特定的内容需要返回,因为函数需要返回unit。unit就是C#的void在F#中的等价产物。
在FP的函数就是一个映射,一个输入映射一个输出,这样函数才是无副作用的。在命令式编程语言中丢失了这个概念。
可以参考F#unit自定义个C#中的unit
public struct Unit : IEquatable<Unit>
{
public static readonly Unit Default = new Unit();
public override int GetHashCode() => 0;
public override bool Equals(object obj) => obj is Unit;
public override string ToString() => "()";
public bool Equals(Unit other) => true;
public static bool operator ==(Unit lhs, Unit rhs) => true;
public static bool operator !=(Unit lhs, Unit rhs) => false;
}
这样可以让每个函数都有返回值来确认函数已完成,并且任何使用action委托的地方都可以使用func代替,只需要给func执行返回值为unit即可。return Unit.Default;
5、延续传递风格CPS
一种更新更好的机制是将剩余的计算传递给(在线程完成执行后运行的)回调函数以继续工作。这种技术在FP中被称为延续传递风格Continuation-Passing Style CPS。通过将当前函数的结果传递给下一个函数,以延续的形式为你提供执行控制。
.NET中Task类提供比Thread更高级别的抽象,以便于控制每个每个任务操作的生命周期。
Task monaLisaTask = Task.Factory.StartNew(() => convertImageTo3D("MonaLisa.jpg", "MonaLisa3D.jpg"));
Task ladyErineTask = new Task(() => setGrayscale("ladyErine.jpg", "ladyErine3D.jpg"));
ladyErineTask.Start();
Task ginevraBenciTask = Task.Run(() => setRedscale("ginevraBenci.jpg", "ginevraBenci3D.jpg"));
Task提供三种直接创建任务的方式,new Task方式可以控制在何处Start启动任务。
通过Task的ContinueWith可以延续任务。FromCurrentSynchronizationContext捕获当前不同上下文中运行,如果需要同步UI请使用,会自动选择合适的上下文去更新。
Task ginevraBenciTask = Task.Run<Bitmap>(() => setRedscale("ginevraBenci.jpg", "ginevraBenci3D.jpg"));
ginevraBenciTask.ContinueWith(bitmap => {
var bitmapImage = bitmap.Result;
}, TaskScheduler.FromCurrentSynchronizationContext());
6、组合策略
使用ContinueWith可以延续任务,但较多的延续,代码将比较繁琐,而且如果要添加错误处理或取消支持就不好添加了。所以要使用到函数闭包中说到的函数组合。
C#实现组合Compose函数如下
Func<A,C> Compose<A,B,C>(this Func<A.B> f ,Func<B,C> g)=>(n)=>g(f(n))
在并行Task中,f,g应该是独立运行的,当做两个任务,f任务返回Task(B),g任务返回Task(C),所以改造如下
Func<A,Task<C>> Compose<A,B,C>(this Func<A.Task<B>> f ,Func<B,Task<C>> g)=>(n)=>g(f(n))
但是有问题的,f(n)返回类型Task(B),无法直接给函数g使用,输入类型不一致。
这个使用需要使用FP中常见的一种模式,单子Monad。对于命令式编程语言的程序员来说,压根没听过啊。其实也是一种设计模式,就像装饰器和适配器一样。单子是一种数学模式,它通过封装程序逻辑,保持函数式的纯粹性以及提供一个强大的组合工具以组合使用提供类型的计算来控制副作用的执行。比较晦涩难懂,还需要多看看官方文档才行。
我们定义一个Bind来提升类型,包装B,然后组合就像下面这样了。
static Task<C> Bind<B, C>(Task<B> b, Func<B, Task<C g)
{
return g(b.Result);
}
Func<A,Task<C>> Compose<A,B,C>(this Func<A.Task<B>> f ,Func<B,Task<C>> g)=>(n)=>bind(f(n),g)
被门夹过的核桃还能补脑吗