使用 async_hooks 模块进行请求追踪

来源 | https://juejin.cn/post/6922274745622724616
async_hooks 提供了追踪异步资源的 API,这种异步资源是具有关联回调的对象。
简而言之,async_hooks 模块可以用来追踪异步回调。那么如何使用这种追踪能力,使用的过程中又有什么问题呢?
认识 async_hooks
v8.x.x 版本下的 async_hooks 主要有两部分组成,一个是 createHook 用以追踪生命周期,一个是 AsyncResource 用于创建异步资源。
const { createHook, AsyncResource, executionAsyncId } = require('async_hooks')const hook = createHook({init (asyncId, type, triggerAsyncId, resource) {},before (asyncId) {},after (asyncId) {},destroy (asyncId) {}})hook.enable()function fn () {console.log(executionAsyncId())}const asyncResource = new AsyncResource('demo')asyncResource.run(fn)asyncResource.run(fn)asyncResource.emitDestroy()
创建一个包含在每个异步操作的 init、before、after、destroy 声明周期执行的钩子函数的 hooks 实例。 启用这个 hooks 实例。 手动创建一个类型为 demo 的异步资源。此时触发了 init 钩子,异步资源 id 为 asyncId,类型为 type(即 demo),异步资源的创建上下文 id 为 triggerAsyncId,异步资源为 resource。 使用此异步资源执行 fn 函数两次,此时会触发 before 两次、after 两次,异步资源 id 为 asyncId,此 asyncId 与 fn 函数内通过 executionAsyncId 取到的值相同。 手动触发 destroy 生命周期钩子。 
请求追踪
通过 init 钩子使得在同一条调用链上的异步资源共用一个存储对象。 解析请求头中 request-id,添加到当前异步调用链对应的存储上。 改写 http、https 模块的 request 方法,在请求执行时获取当前当前的调用链对应存储中的 request-id。 
const http = require('http')const { createHook, executionAsyncId } = require('async_hooks')const fs = require('fs')// 追踪调用链并创建调用链存储对象const cache = {}const hook = createHook({init (asyncId, type, triggerAsyncId, resource) {if (type === 'TickObject') return// 由于在 Node.js 中 console.log 也是异步行为,会导致触发 init 钩子,所以我们只能通过同步方法记录日志fs.appendFileSync('log.out', `init ${type}(${asyncId}: trigger: ${triggerAsyncId})\n`);// 判断调用链存储对象是否已经初始化if (!cache[triggerAsyncId]) {cache[triggerAsyncId] = {}}// 将父节点的存储与当前异步资源通过引用共享cache[asyncId] = cache[triggerAsyncId]}})hook.enable()// 改写 httpconst httpRequest = http.requesthttp.request = (options, callback) => {const client = httpRequest(options, callback)// 获取当前请求所属异步资源对应存储的 request-id 写入 headerconst requestId = cache[executionAsyncId()].requestIdconsole.log('cache', cache[executionAsyncId()])client.setHeader('request-id', requestId)return client}function timeout () {return new Promise((resolve, reject) => {setTimeout(resolve, Math.random() * 1000)})}// 创建服务http.createServer(async (req, res) => {// 获取当前请求的 request-id 写入存储cache[executionAsyncId()].requestId = req.headers['request-id']// 模拟一些其他耗时操作await timeout()// 发送一个请求http.request('http://www.baidu.com', (res) => {})res.write('hello\n')res.end()}).listen(3000)
陷阱
同时,我们也需要注意到一点,init 是异步资源创建的钩子,不是异步回调函数创建的钩子,只会在异步资源创建的时候执行一次。
存储初始化部分将 triggerAsyncId 保存下来,方便观察异步调用的追踪关系:
if (!cache[triggerAsyncId]) {cache[triggerAsyncId] = {id: triggerAsyncId}}
timeout 函数改为先进行一次长耗时再进行一次短耗时操作:
function timeout () {return new Promise((resolve, reject) => {setTimeout(resolve, [1000, 5000].pop())})}
重启服务后,使用 postman (不用 curl 是因为 curl 每次请求结束会关闭连接,导致不能复现)连续的发送两次请求,可以观察到以下输出:
{ id: 1, requestId: '第二次请求的id' }{ id: 1, requestId: '第二次请求的id' }
即可发现在多并发且写读存储的操作之间有耗时不固定的其他操作情况下,先到达服务器的请求存储的值会被后到达服务器的请求执行复写掉,使得前一次请求读取到错误的值。
当然,你可以保证在写和读之间不插入其他的耗时操作,但在复杂的服务中这种靠脑力维护的保障方式明显是不可靠的。
此时,我们就需要使每次读写前,JS 都能进入一个全新的异步资源上下文,即获得一个全新的 asyncId,避免这种复用。需要将调用链存储的部分做以下几方面修改:
const http = require('http')const { createHook, executionAsyncId } = require('async_hooks')const fs = require('fs')const cache = {}const httpRequest = http.requesthttp.request = (options, callback) => {const client = httpRequest(options, callback)const requestId = cache[executionAsyncId()].requestIdconsole.log('cache', cache[executionAsyncId()])client.setHeader('request-id', requestId)return client}// 将存储的初始化提取为一个独立的方法async function cacheInit (callback) {// 利用 await 操作使得 await 后的代码进入一个全新的异步上下文await Promise.resolve()cache[executionAsyncId()] = {}// 使用 callback 执行的方式,使得后续操作都属于这个新的异步上下文return callback()}const hook = createHook({init (asyncId, type, triggerAsyncId, resource) {if (!cache[triggerAsyncId]) {// init hook 不再进行初始化return fs.appendFileSync('log.out', `未使用 cacheInit 方法进行初始化`)}cache[asyncId] = cache[triggerAsyncId]}})hook.enable()function timeout () {return new Promise((resolve, reject) => {setTimeout(resolve, [1000, 5000].pop())})}http.createServer(async (req, res) => {// 将后续操作作为 callback 传入 cacheInitawait cacheInit(async function fn() {cache[executionAsyncId()].requestId = req.headers['request-id']await timeout()http.request('http://www.baidu.com', (res) => {})res.write('hello\n')res.end()})}).listen(3000)
async function middleware (ctx, next) {await Promise.resolve()cache[executionAsyncId()] = {}return next()}
NodeJs v14
这种使用 await Promise.resolve() 创建全新异步上下文的方式看起来总有些 “歪门邪道” 的感觉。好在 NodeJs v9.x.x 版本中提供了创建异步上下文的官方实现方式 asyncResource.runInAsyncScope。
更好的是,NodeJs v14.x.x 版本直接提供了异步调用链数据存储的官方实现,它会直接帮你完成异步调用关系追踪、创建新的异步上线文、管理数据这三项工作!API 就不再详细介绍,我们直接使用新 API 改造之前的实现
const { AsyncLocalStorage } = require('async_hooks')// 直接创建一个 asyncLocalStorage 存储实例,不再需要管理 async 生命周期钩子const asyncLocalStorage = new AsyncLocalStorage()const storage = {enable (callback) {// 使用 run 方法创建全新的存储,且需要让后续操作作为 run 方法的回调执行,以使用全新的异步资源上下文asyncLocalStorage.run({}, callback)},get (key) {return asyncLocalStorage.getStore()[key]},set (key, value) {asyncLocalStorage.getStore()[key] = value}}// 改写 httpconst httpRequest = http.requesthttp.request = (options, callback) => {const client = httpRequest(options, callback)// 获取异步资源存储的 request-id 写入 headerclient.setHeader('request-id', storage.get('requestId'))return client}// 使用http.createServer((req, res) => {storage.enable(async function () {// 获取当前请求的 request-id 写入存储storage.set('requestId', req.headers['request-id'])http.request('http://www.baidu.com', (res) => {})res.write('hello\n')res.end()})}).listen(3000)
可以看到,官方实现的 asyncLocalStorage.run API 和我们的第二版实现在结构上也很一致。
于是,在 Node.js v14.x.x 版本下,使用 async_hooks 模块进行请求追踪的功能很轻易的就实现了。

