如何使用 C# 构建自己的 task scheduler

DotNetCore实战

共 5230字,需浏览 11分钟

 · 2020-12-04

译文链接:https://www.infoworld.com/article/3063560/how-to-build-your-own-task-scheduler-in-csharp.html

TPL(任务并行库)是 .NET Framework 最新版本中最有趣的新功能之一,在.NET Framework 4.0 中被引入。要使用TPL,您需要引用 System.Threading.Tasks 命名空间。

什么是 task schedulers?为什么需要它们?

如果我想问,Task 是如何被调度执行的?其实有一个组件叫做 task scheduler,它专门负责调度 task。从本质上来说,这是一个底层对象的抽象层,它可以将您的 task 通过排队调度到线程上。

.NET Framework 为您提供了两个 task schedulers。包括基于默认的 task scheduler 用于调度在 Thread Pool 之上,另一个 task scheduler 能够调度在 指定对象的同步上下文上。请注意,TPL的默认 task scheduler 利用了.NET Framework 线程池。该线程池的表示类 ThreadPool 是在 System.Threading.Tasks命名空间下。

尽管默认的 task scheduler 在大多数情况下已足够,但有时候你也许也想构建自定义的 task scheduler 去完成个性化的功能,比如那些默认的 task scheduler 没有提供的。此类功能可能包括 FIFO执行,并发度等。

在C#中扩展 TaskScheduler

要构建自定义的 task scheduler,您需要创建一个类并继承 System.Threading.Tasks.TaskScheduler 。因此,要构建自定义的 task scheduler,您需要扩展 TaskScheduler抽象类并重写以下方法。

  • QueueTask 返回void并接受Task对象作为参数,当一个 task 需要调度的时候就要调用这个方法

  • GetScheduledTasks 返回所有已被调用的 task list, (准确的说是IEnumerable)

  • TryExecuteTaskInline 用于内联方式(即在当前线程上)执行task。在这种情况下,无需排队即可执行 task

下面的代码段展示了如何继承 TaskScheduler 类来实现我们自定义的 scheduler。


    public class CustomTaskScheduler : TaskSchedulerIDisposable
    {
    }

正如在本文前面所讨论的,您需要在 CustomTaskScheduler 中重写下面3个方法:GetScheduledTasks,QueueTask和TryExecuteTaskInline。


public sealed class CustomTaskScheduler : TaskSchedulerIDisposable
  {
        protected override IEnumerable GetScheduledTasks()
        {
            //TODO
        }
        protected override void QueueTask(Task task)
        {
             //TODO
        }
        protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
        {
            //TODO
        }
        public void Dispose()
        {
            //TODO
        }
  }

使用 BlockingCollection 来存储 task对象集合

现在我们开始实现自定义的 task scheduler。以下代码段显示了如何利用 BlockingCollection 存储 task对象的集合。


 public sealed class CustomTaskScheduler : TaskSchedulerIDisposable
 {
        private BlockingCollection tasksCollection = new BlockingCollection();
        private readonly Thread mainThread = null;
        public CustomTaskScheduler()
        {
            mainThread = new Thread(new ThreadStart(Execute));
            if (!mainThread.IsAlive)
            {
                mainThread.Start();
            }
        }
        private void Execute()
        {
            foreach (var task in tasksCollection.GetConsumingEnumerable())
            {
                TryExecuteTask(task);
            }
        } 
      //Other methods
  }

可以着重看看 CustomTaskScheduler 类的构造函数,一个 thread 是如何被创建并且如何开始调度 Execute 方法的。

实现 GetScheduledTasks,QueueTask 和 TryExecuteTaskInline

接下来,我们需要在 CustomTaskScheduler 中实现三个自定义方法。这三种方法包括 GetScheduledTasks,QueueTask 和 TryExecuteTaskInline。

GetScheduledTasks方法返回 IEnumerable 类型的 task 集合,这样你就可以迭代这个 collection,就像上面 Execute 方法一样。QueueTask方法接受Task对象作为方法参数,然后将其存储在 task collection 中,TryExecuteTaskInline方法暂不实现, 我准备将它留给读者来实现。


        protected override IEnumerable GetScheduledTasks()
        {
            return tasksCollection.ToArray();
        }
        protected override void QueueTask(Task task)
        {
            if (task != null)
                tasksCollection.Add(task);
        }
        protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
        {
            return false;
        }

完整的 CustomTaskScheduler 代码如下

下面是最终版本的  CustomTaskScheduler  代码清单:


    public sealed class CustomTaskScheduler : TaskSchedulerIDisposable
    {
        private BlockingCollection tasksCollection = new BlockingCollection();
        private readonly Thread mainThread = null;
        public CustomTaskScheduler()
        {
            mainThread = new Thread(new ThreadStart(Execute));
            if (!mainThread.IsAlive)
            {
                mainThread.Start();
            }
        }
        private void Execute()
        {
            foreach (var task in tasksCollection.GetConsumingEnumerable())
            {
                TryExecuteTask(task);
            }
        }
        protected override IEnumerable GetScheduledTasks()
        {
            return tasksCollection.ToArray();
        }
        protected override void QueueTask(Task task)
        {
            if (task != null)
                tasksCollection.Add(task);           
        }
        protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
        {
            return false;
        }
        private void Dispose(bool disposing)
        {
            if (!disposing) return;
            tasksCollection.CompleteAdding();
            tasksCollection.Dispose();
        }
        public void Dispose()
        {
            Dispose(true);
            GC.SuppressFinalize(this);
        }
    }

要使用我们刚刚实现的 CustomTaskScheduler ,您可以使用以下代码段:


CustomTaskScheduler taskScheduler = new CustomTaskScheduler();
Task.Factory.StartNew(() => SomeMethod(), CancellationToken.None, TaskCreationOptions.None, taskScheduler);



往期精彩回顾




【推荐】.NET Core开发实战视频课程 ★★★

.NET Core实战项目之CMS 第一章 入门篇-开篇及总体规划

【.NET Core微服务实战-统一身份认证】开篇及目录索引

Redis基本使用及百亿数据量中的使用技巧分享(附视频地址及观看指南)

.NET Core中的一个接口多种实现的依赖注入与动态选择看这篇就够了

10个小技巧助您写出高性能的ASP.NET Core代码

用abp vNext快速开发Quartz.NET定时任务管理界面

在ASP.NET Core中创建基于Quartz.NET托管服务轻松实现作业调度

现身说法:实际业务出发分析百亿数据量下的多表查询优化

关于C#异步编程你应该了解的几点建议

C#异步编程看这篇就够了


浏览 6
点赞
评论
收藏
分享

手机扫一扫分享

举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

举报