一文看懂C#异步 Linq

dotNET全栈开发

共 10019字,需浏览 21分钟

 ·

2021-09-28 08:30

用不好异步 LINQ,基本上就等于用不好 LINQ 了。

LINQ 这个东西,出来很早了,写过几年代码的兄弟们,或多或少都用过一些。

早期的 LINQ,主要是同步的,直到 C# 8.0 加入 IAsyncEnumerable,LINQ 才真正转向异步。这本来是个非常好的改变,配合 System.Linq.Async 库提供的扩展,可以在诸如 Where、Select、GroupBy 等各种地方用到异步。

但事实上,在我 Review 代码时,见了很多人的代码,并没有按异步的规则去使用,出现了很多的坑。

举个简单的例子:

static async Task<List<T>> Where<T>(this IAsyncEnumerable<T> source, Func<T, bool> predicate)
{
    var filteredItems = new List<T>();
    await foreach (var item in source)
    
{
        if (predicate(item))
        {
            filteredItems.Add(item);
        }
    }

    return filteredItems;
}

这样的写法,看着是用到了 async / await 对,但实际上并没有实现异步,程序依然是按照同步在运行。换句话说,这只是一个样子上的异步,实际没有任何延迟执行的效果。

1. 延迟执行

其实,这儿正确的写法也挺简单,用到的就是个异步的迭代器(关于异步迭代器,如果需要了解,可以看我的另一篇推文):

static async IAsyncEnumerable<T> Where<T>(this IAsyncEnumerable<T> source, Func<T, bool> predicate)
{
    await foreach (var item in source)
    
{
        if (predicate(item))
        {
            yield return item;
        }
    }
}

这种写法下,编译器会将方法转了状态机,并在实际调用时,才通过枚举器返回异步枚举项。

看看调用过程:

IAsyncEnumerable<User> users = ...
IAsyncEnumerable<User> filteredUsers = users.Where(User => User.Name == "WangPlus");

await foreach (User user in filteredUsers)
{
    Console.WriteLine(user.Age);
}

在这个调用的例子中,在 Where 时,实际方法并不会马上开始。只有在下面 foreach 时,才真正开始执行 Where 方法。

延迟执行,这是异步 LINQ 的第一个优势。

2. 流执行

流执行,依托的也是异步迭代器。

所谓流执行,其实就是根据调用的要求,一次返回一个对象。通过使用异步迭代器,可以不用一次返回所有的对象,而是一个一个地返回单个的对象,直到枚举完所有的对象。

流执行需要做个技巧性的代码,需要用到一个 C# 8.0 的新特性:局部方法。

看代码:

static IAsyncEnumerable<T> Where<T>(this IAsyncEnumerable<T> source, Func<T, bool> predicate)
{
    return Core();

    async IAsyncEnumerable<T> Core()
    {
        await foreach (var item in source)
        
{
            if (predicate(item))
            {
                yield return item;
            }
        }
    }
}

3. 取消异步 LINQ

前面两个小节,写的是异步 LINQ 的执行。

通常使用异步 LINQ 的原因,就是因为执行时间长,一般需要一段时间来完成。因此,取消异步 LINQ 就很重要。想象一下,一个长的 DB 查询已经超时了的情况,该怎么处理?

为了支持取消,IAsyncEnumerable.GetEnumerator 本身接受一个 CancellationToken 参数来中止任务,并用一个扩展方法挂接到 foreach 调用:

CancellationToken cancellationToken = ...
IAsyncEnumerable<User> users = ...
IAsyncEnumerable<User> filteredUsers = users.Where(User => User.Name == "WangPlus");

await foreach (var User in filteredUsers.WithCancellation(cancellationToken))
{
    Console.WriteLine(User.Age);
}

同时,在上面的 Where 定义中,也要响应 CancellationToken 参数:

static IAsyncEnumerable<T> Where<T>(this IAsyncEnumerable<T> source, Func<T, bool> predicate)
{
    return Core();

    async IAsyncEnumerable<T> Core([EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        await foreach (var item in source.WithCancellation(cancellationToken))
        
{
            if (predicate(item))
            {
                yield return item;
            }
        }
    }
}

多解释一下:在 Where 方法中,CancellationToken 只能加到局部函数 Core 中,一个简单的原因是 Where 本身并不是异步方法,而且,我们也不希望从 Where 往里传递。想象一下:

Users.Where(xxx, cancellationToken).Select(xxx, cancellationToken).OrderBy(xxx, cancellationToken);

这样的代码会让人晕死。

所以,我们会采用上面的方式,允许消费者在枚举数据时传递 CancellationToken 来达到取消异步操作的目的。

4. 处理ConfigureAwait(false)

这是另一个异步必须要注意的部分,其实就是上下文。

通常大多数的方法,我们不需要关注上下文,但总有一些需要,在等待的异步操作恢复后,需要返回到某个上下文的情况。这种情况在 UI 线程编码时通常都需要考虑。很多人提到的异步死锁,就是这个原因。

处理也很简单:

static IAsyncEnumerable<T> Where<T>(this IAsyncEnumerable<T> source, Func<T, bool> predicate)
{
    return Core();

    async IAsyncEnumerable<T> Core([EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        await foreach (var item in source.WithCancellation(cancellationToken).ConfigureAwait(false))
        
{
            if (predicate(item))
            {
                yield return item;
            }
        }
    }
}

这儿也多说两句:按微软的说法,await foreach 本身是基于模式的,WithCancellation 和 ConfigureAwait 返回同样的结构体 ConfiguredCancelableAsyncEnumerable。这个结构体没有实现 IAsyncEnumerable 接口,而是做了一个 GetAsyncEnumerator 方法,返回一个具有 MoveNextAsync、Current、DisposeAsync 的枚举器,因此可以 await foreach 。

5. 方法扩展

上面 4 个小节,我们完成了一个 Where 异步 LINQ 的全部内容。

不过,这个方法有一些限制和不足。熟悉异步的兄弟们应该已经看出来了,里面用了一个委托 predicate 来做数据过滤,而这个委托,是个同步的方法。

事实上,根据微软对异步 LINQ 的约定,每个操作符应该是三种重载:

  • 同步委托的实现,就是上面的 Where 方法;

  • 异步委托的实现,这个是指具有异步返回类型的实现,通常这种方法名称会用一个 Await 做后缀,例如:WhereAwait;

  • 可以接受取消的异步委托的实现,通常这种方法会用 AwaitWithCancellation 做后缀,例如:WhereAwaitWithCancellation。

参考微软的异步方法,基本上都是以这种结构来命名方法名称的。

下面,我们也按这个方式,来做一个 Where 方法的几个重载。

WhereAwait 方法

上面说了,这会是一个异步实现。所以,条件部分就不能用 Func<T, bool> 这样的同步委托了,而需要改为 Func<T, ValueTask<bool>>。这里的 ValueTask 倒不是必须,用 Task 也可以,只不过我更习惯用 ValueTask。两个的区别:Task 是类,有上下文,而 ValueTask 是结构。

代码是这样:

static IAsyncEnumerable<T> WhereAwait<T>(this IAsyncEnumerable<T> source, Func<T, ValueTask<bool>> predicate)
{
    return Core();

    async IAsyncEnumerable<T> Core([EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        await foreach (var item in source.WithCancellation(cancellationToken).ConfigureAwait(false))
        
{
            if (await predicate(item).ConfigureAwait(false))
            {
                yield return item;
            }
        }
    }
}

调用时是这样:

IAsyncEnumerable<User> filteredUsers = users.WhereAwait(async user => await someIfFunction());

WhereAwaitWithCancellation方法

在上面的基础上,又加了一个取消操作。

看代码:

static IAsyncEnumerable<T> WhereAwaitWithCancellation<T>(this IAsyncEnumerable<T> source, Func<T, CancellationToken, ValueTask<bool>> predicate)
{
    return Core();

    async IAsyncEnumerable<T> Core([EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        await foreach (var item in source.WithCancellation(cancellationToken).ConfigureAwait(false))
        
{
            if (await predicate(item, cancellationToken).ConfigureAwait(false))
            {
                yield return item;
            }
        }
    }
}

调用时是这样:

IAsyncEnumerable<User> filteredUsers = users.WhereAwaitWithCancellation(async (user, token) => await someIfFunction(user, token));

6. 总结

异步 LINQ,多数是在 LINQ 的扩展方法中使用,而不是我们通常习惯的 LINQ 直写。

事实上,异步 LINQ 的扩展,对 LINQ 本身是有比较大的强化作用的,不管从性能,还是可读性上,用多了,只会更爽。

喜欢就来个三连,让更多人因你而受益


浏览 16
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报