使用 better-queue 管理复杂的任务
SegmentFault
共 12626字,需浏览 26分钟
·
2021-08-30 09:40
并行化处理
持久(和可扩展)存储
批处理
优先队列
合并、过滤任务
任务统计
使用方法
代码风格
import BetterQueue from "better-queue";
// 创建队列并且提供任务处理的回调函数
// 当调用回调意味该数据已经从队列中删除
// 然后从队列中取出下一条数据继续处理
const q = new BetterQueue(function (input, cb) {
// 从队列中取出数据并进行处理...
const result = 'xxxx'
try {
// 如果成功则调用回调并且返回结果
cb(null, result);
} catch (err) {
// 否则返回错误
cb(err)
}
})
q.push(1)
q.push({x: 1})
fs.readFile(filePath, (err, data) => {
if (err) {
console.log(err)
return
}
console.log(data)
})
队列生成与使用
// 任务数据
interface Job<T> {
// 任务的唯一值,唯一确定当前任务
id: string;
// 当前任务的状态:等待中,已成功,已失败
status: 'waiting' | 'succeeded' | 'failed';
// 任务的请求参数,可以是 id,也可以是其他数据
queryArgs?: any;
// 任务的返回结果
result: T;
// 任务错误信息
err: Error;
}
// 异步处理逻辑
async function asyncProcess<T>(job: Job<T>, cb: Function) {
const req = job.queryArgs || job.id
try {
// await 异步请求处理,数据库访问,或者生成文件等耗时任务
const result = await query('/xxx/xxx', req)
cb(null, result)
} catch (error) {
// 生成错误
cb(error)
}
}
// 创建队列
const betterQueue = new BetterQueue(asyncProcess)
// 对象存储,因为队列只会进行任务处理,并不包括数据的存储
// 也可以使用 map
const jobById = {}
// 创建队列数据
for (let i = 0; i < 10000; i++) {
// 建立 job
const asyncJob: Job = {
id: `${id}`,
queryArgs: {},
status: 'waiting'
}
// 存储 job,通过 id 追踪数据
jobById[asyncJob.id] = asyncJob
betterQueue.push(asyncJob)
// 取出数据并且完成请求后调用 cb(null, result) 会进入这里
.on('finish', (result) => {
// 修改任务状态,并存储任务结果
job.status = 'succeeded'
job.result = result
})
// 失败调用 cb(err) 会进入这里
.on('failed', (error: Error) => {
// 修改任务状态,并存储错误信息
job.status = 'failed'
job.err = error
})
}
// 获取任务,如果队列没有处理,会返回 wait 状态
// 队列已经处理,会返回 succeeded 或者 failed
function getJob(id: string) {
return jobById[id]
}
// 创建队列
const betterQueue = new BetterQueue(asyncProcess, {
concurrent: 10
})
interface QueueStats {
total: number; // 处理的任务总数
average: number; // 平均处理时间
successRate: number; // 成功率,在 0 和 1 之间
peak: number; // 大多数任务在任何给定时间点排队
}
function cb() {
// 获取当前队列的状态并打印完成数据对比。
// 如: 1/10 2/10
const stats = betterQueue.getStats()
console.log(`${stats.total}/10000`)
}
betterQueue.push(asyncJob)
.on('finish', (result) => {
// ...
// 完成时候进行回调
cb()
})
.on('failed', (error: Error) => {
// ...
// 完成时候进行回调
cb()
})
// 直接取消任务
betterQueue.cancel(jobId)
// 创建队列
const betterQueue = new BetterQueue(asyncProcess, {
cancelIfRunning: true
})
betterQueue.push({id: 'xxx'});
// 如果之前的 id 在队列中,取消前一个任务,执行后一个任务
betterQueue.push({id: 'xxx'});
// 暂停队列运行
betterQueue.pause()
// 恢复队列运行
betterQueue.resume()
// 销毁队列,清理数据
betterQueue.destroy()
const betterQueue = new BetterQueue(function (file: File, cb: Function) {
var worker = someLongProcess(file);
return {
cancel: function () {
// 取消文件上传
},
pause: function () {
// 暂停文件处理
},
resume: function () {
// 恢复文件上传
}
}
})
betterQueue.push('/path/to/file.pdf')
betterQueue.pause()
betterQueue.resume()
重试与超时
const betterQueue = new BetterQueue(asyncProcess, {
// 当前任务失败了可以重新请求,最大为 10 次,超过 10 次宣告任务失败
maxRetries: 10,
// 重试等待时间 1s
retryDelay: 1000,
// 超时时间 5s,当前异步任务处理超过 5s 则认为任务失败
maxTimeout: 5000,
})
持久化
// 此时队列的插入和删除都会和数据库进行交互
const betterQueue = new BetterQueue(asyncProcess, {
store: {
type: 'sql',
dialect: 'sqlite',
path: '/path/to/sqlite/file'
}
})
// 或者使用 use
betterQueue.use({
type: 'sql',
dialect: 'sqlite',
path: '/path/to/sqlite/file'
})
betterQueue.use({
connect: function (cb) {
// 连接你的数据库
},
getTask: function (taskId, cb) {
// 查询任务
},
putTask: function (taskId, task, priority, cb) {
// 保存任务同时携带优先级
},
takeFirstN: function (n, cb) {
// 删除前 n 项(根据优先级和传入顺序排序)
},
takeLastN: function (n, cb) {
// 删除后 n 项(根据优先级和传入顺序排序)
}
})
// 创建队列
const betterQueue = new BetterQueue(asyncProcess, {
filo: true
})
任务过滤、合并以及调整优先级
const betterQueue = new BetterQueue(asyncProcess, {
// 在推送任务前执行过滤
filter: async function (job: Job, cb: Function) {
// 在执行业务处理前预处理,验证数据,数据库查找等较为有用
// 异步处理验证失败
if (filterFail) {
cb('not_allowed')
return
}
// 为 job 前置处理
cb(null, job)
}
})
const betterQueue = new BetterQueue(function (task, cb) {
console.log("I have %d %ss.", task.count, task.id);
cb();
}, {
merge: function (oldTask, newTask, cb) {
oldTask.count += newTask.count;
cb(null, oldTask);
}
})
betterQueue.push({ id: 'apple', count: 2 })
betterQueue.push({ id: 'apple', count: 1 })
betterQueue.push({ id: 'orange', count: 1 })
betterQueue.push({ id: 'orange', count: 1 })
// 这时候会打印出
// I have 3 apples.
// I have 2 oranges.
// 而不是
// I have 1 apples.
// I have 1 oranges.
const betterQueue = new BetterQueue(asyncProcess, {
// 决定先处理那些任务
priority: function (job: Job, cb: Function) {
if (job.queryArgs === 'xxxxx') {
cb(null, 10)
return
}
if (job.queryArgs === 'xxx'){
cb(null, 5)
return
}
cb(null, 1);
}
})
批处理与批处理前置
const betterQueue = new BetterQueue<(function (batch, cb) {
// batch 中是一个数组,最多为 3 个
// [job1, job2, job3]
cb()
}, {
// 批处理大小
batchSize: 3,
// 5 秒内等待队列拥有 3 个项目,或者 3 秒内没有添加新的任务
// 直接处理队列
batchDelay: 5000,
batchDelayTimeout: 3000
})
// 当前也会触发,不过要等 3 秒没有添加新任务
// 如开始时放入 1 条数据,等待 2.5 s 后放入第二条数据,则在 5s 后也会执行
betterQueue.push(job1)
betterQueue.push(job2)
// 在 1s 内推入第三条数据到队列中
// 队列数据达到 3 了,开始处理
betterQueue.push(job3)
const betterQueue = new BetterQueue<(function (batch, cb) {
// batch 中是一个数组,最多为 3 个
// [job1, job2, job3]
cb()
}, {
precondition: function (cb) {
// 当前是否是联网状态
isOnline(function (err, ok) {
if (ok) {
// 返回 true,进行下一次批处理
cb(null, true);
} else {
// 继续执行直到为 true
cb(null, false);
}
})
},
// 每 10 秒执行一次 precondition 函数
preconditionRetryTimeout: 10 * 1000
})
参考资料
better-queue - npm (npmjs.com)
评论