Redis + NodeJS 实现一个能处理海量数据的异步任务队列系统
前端技术江湖
共 8499字,需浏览 17分钟
·
2022-07-10 21:20
一、引言
总耗时时间 = 数据量 × 单条数据处理时间 T = N * t (N = 100,000; t = 25s) 总耗时时间 = 2,500,000 秒 ≈ 695 小时 ≈ 29 天
二、异步任务队列原理
三、使用 NodeJS 操作 Redis
docker pull redis:latest
docker run -itd --name redis-local -p 6379:6379 redis
import * as Redis from 'redis'const client = Redis.createClient({ host: '127.0.0.1', port: 6379})export default client
import client from './mqClient'
// 获取 Redis 中某个 key 的内容
export const getRedisValue = (key: string): Promise<string | null> => new Promise(resolve => client.get(key, (err, reply) => resolve(reply)))
// 设置 Redis 中某个 key 的内容
export const setRedisValue = (key: string, value: string) => new Promise(resolve => client.set(key, value, resolve))
// 删除 Redis 中某个 key 及其内容
export const delRedisKey = (key: string) => new Promise(resolve => client.del(key, resolve))
import { TASK_NAME, TASK_AMOUNT, setRedisValue, delRedisKey } from './utils'
import client from './mqClient'
client.on('ready', async () => {
await delRedisKey(TASK_NAME)
for (let i = TASK_AMOUNT; i > 0 ; i--) {
client.lpush(TASK_NAME, `task-${i}`)
}
client.lrange(TASK_NAME, 0, TASK_AMOUNT, async (err, reply) => {
if (err) {
console.error(err)
return
}
console.log(reply)
process.exit()
})
})
四、异步任务处理
import taskHandler from './tasksHandler'
import client from './mqClient'
client.on('connect', () => {
console.log('Redis is connected!')
})
client.on('ready', async () => {
console.log('Redis is ready!')
await taskHandler()
})
client.on('error', (e) => {
console.log('Redis error! ' + e)
})
function handleTask(task: string) {
return new Promise((resolve) => {
setTimeout(async () => {
console.log(`Handling task: ${task}...`)
resolve()
}, 2000)
})
}
import { popTask } from './utils'
import client from './mqClient'
function handleTask(task: string) { /* ... */}
export default async function tasksHandler() {
// 从队列中取出一个任务
const task = await popTask()
// 处理任务
await handleTask(task)
// 递归运行
await tasksHandler()
}
pm2 start ./dist/index.js -i 4 && pm2 logs
五、统计任务完成耗时
总耗时 = 最后一个任务的完成时间 - 首个任务被取得的时间
node-redlock 是 Redis 分布式锁 Redlock 算法的 JavaScript 实现,关于该算法的讲解可参考:https://redis.io/topics/distlock 值得注意的是,在 node-redlock 在使用的过程中,如果要锁一个已存在的 key,就必须为该 key 添加一个前缀 locks:,否则会报错。
export const setBeginTime = async (redlock: Redlock) => {
// 读取标记值前先把它锁住
const lock = await redlock.lock(`lock:${TASK_NAME}_SET_FIRST`, 1000)
const setFirst = await getRedisValue(`${TASK_NAME}_SET_FIRST`)
// 当且仅当标记值不等于 true 时,才设置起始时间
if (setFirst !== 'true') {
console.log(`${pm2tips} Get the first task!`)
await setRedisValue(`${TASK_NAME}_SET_FIRST`, 'true')
await setRedisValue(`${TASK_NAME}_BEGIN_TIME`, `${new Date().getTime()}`)
}
// 完成标记值的读写操作后,释放锁
await lock.unlock().catch(e => e)
}
export default async function tasksHandler() {
+ // 获取第一个任务被取得的时间
+ await setBeginTime(redlock)
// 从队列中取出一个任务
const task = await popTask()
// 处理任务
await handleTask(task)
// 递归运行
await tasksHandler()
}
export default async function tasksHandler() {
+ // 获取标识值和队列初始长度
+ let curIndex = Number(await getRedisValue(`${TASK_NAME}_CUR_INDEX`))
+ const taskAmount = Number(await getRedisValue(`${TASK_NAME}_TOTAL`))
+ // 等待新任务
+ if (taskAmount === 0) {
+ console.log(`${pm2tips} Wating new tasks...`)
+ await sleep(2000)
+ await tasksHandler()
+ return
+ }
+ // 判断所有任务已经完成
+ if (curIndex === taskAmount) {
+ const beginTime = await getRedisValue(`${TASK_NAME}_BEGIN_TIME`)
+ // 获取总耗时
+ const cost = new Date().getTime() - Number(beginTime)
+ console.log(`${pm2tips} All tasks were completed! Time cost: ${cost}ms. ${beginTime}`)
+ // 初始化 Redis 的一些标识值
+ await setRedisValue(`${TASK_NAME}_TOTAL`, '0')
+ await setRedisValue(`${TASK_NAME}_CUR_INDEX`, '0')
+ await setRedisValue(`${TASK_NAME}_SET_FIRST`, 'false')
+ await delRedisKey(`${TASK_NAME}_BEGIN_TIME`)
+ await sleep(2000)
+ await tasksHandler()
}
// 获取第一个任务被取得的时间
await setBeginTime(redlock)
// 从队列中取出一个任务
const task = await popTask()
// 处理任务
await handleTask(task)
+ // 任务完成后需要为标识位加一
+ try {
+ const lock = await redlock.lock(`lock:${TASK_NAME}_CUR_INDEX`, 1000)
+ curIndex = await getCurIndex()
+ await setCurIndex(curIndex + 1)
+ await lock.unlock().catch((e) => e)
+ } catch (e) {
+ console.log(e)
+ }
+ // recursion
+ await tasksHandler()
+}
// 递归运行
await tasksHandler()
}
六、结语
评论