Redis + NodeJS 实现一个能处理海量数据的异步任务队列系统
一、引言
总耗时时间 = 数据量 × 单条数据处理时间 T = N * t (N = 100,000; t = 25s) 总耗时时间 = 2,500,000 秒 ≈ 695 小时 ≈ 29 天
二、异步任务队列原理

三、使用 NodeJS 操作 Redis
docker pull redis:latest
docker run -itd --name redis-local -p 6379:6379 redis

import * as Redis from 'redis'const client = Redis.createClient({ host: '127.0.0.1', port: 6379})export default client
import client from './mqClient'// 获取 Redis 中某个 key 的内容export const getRedisValue = (key: string): Promise<string | null> => new Promise(resolve => client.get(key, (err, reply) => resolve(reply)))// 设置 Redis 中某个 key 的内容export const setRedisValue = (key: string, value: string) => new Promise(resolve => client.set(key, value, resolve))// 删除 Redis 中某个 key 及其内容export const delRedisKey = (key: string) => new Promise(resolve => client.del(key, resolve))
import { TASK_NAME, TASK_AMOUNT, setRedisValue, delRedisKey } from './utils'import client from './mqClient'client.on('ready', async () => {await delRedisKey(TASK_NAME)for (let i = TASK_AMOUNT; i > 0 ; i--) {client.lpush(TASK_NAME, `task-${i}`)}client.lrange(TASK_NAME, 0, TASK_AMOUNT, async (err, reply) => {if (err) {console.error(err)return}console.log(reply)process.exit()})})


四、异步任务处理
import taskHandler from './tasksHandler'import client from './mqClient'client.on('connect', () => {console.log('Redis is connected!')})client.on('ready', async () => {console.log('Redis is ready!')await taskHandler()})client.on('error', (e) => {console.log('Redis error! ' + e)})
function handleTask(task: string) {return new Promise((resolve) => {setTimeout(async () => {console.log(`Handling task: ${task}...`)resolve()}, 2000)})}
import { popTask } from './utils'import client from './mqClient'function handleTask(task: string) { /* ... */}export default async function tasksHandler() {// 从队列中取出一个任务const task = await popTask()// 处理任务await handleTask(task)// 递归运行await tasksHandler()}
pm2 start ./dist/index.js -i 4 && pm2 logs


五、统计任务完成耗时
总耗时 = 最后一个任务的完成时间 - 首个任务被取得的时间

node-redlock 是 Redis 分布式锁 Redlock 算法的 JavaScript 实现,关于该算法的讲解可参考:https://redis.io/topics/distlock 值得注意的是,在 node-redlock 在使用的过程中,如果要锁一个已存在的 key,就必须为该 key 添加一个前缀 locks:,否则会报错。
export const setBeginTime = async (redlock: Redlock) => {// 读取标记值前先把它锁住const lock = await redlock.lock(`lock:${TASK_NAME}_SET_FIRST`, 1000)const setFirst = await getRedisValue(`${TASK_NAME}_SET_FIRST`)// 当且仅当标记值不等于 true 时,才设置起始时间if (setFirst !== 'true') {console.log(`${pm2tips} Get the first task!`)await setRedisValue(`${TASK_NAME}_SET_FIRST`, 'true')await setRedisValue(`${TASK_NAME}_BEGIN_TIME`, `${new Date().getTime()}`)}// 完成标记值的读写操作后,释放锁await lock.unlock().catch(e => e)}
export default async function tasksHandler() {+ // 获取第一个任务被取得的时间+ await setBeginTime(redlock)// 从队列中取出一个任务const task = await popTask()// 处理任务await handleTask(task)// 递归运行await tasksHandler()}

export default async function tasksHandler() {+ // 获取标识值和队列初始长度+ let curIndex = Number(await getRedisValue(`${TASK_NAME}_CUR_INDEX`))+ const taskAmount = Number(await getRedisValue(`${TASK_NAME}_TOTAL`))+ // 等待新任务+ if (taskAmount === 0) {+ console.log(`${pm2tips} Wating new tasks...`)+ await sleep(2000)+ await tasksHandler()+ return+ }+ // 判断所有任务已经完成+ if (curIndex === taskAmount) {+ const beginTime = await getRedisValue(`${TASK_NAME}_BEGIN_TIME`)+ // 获取总耗时+ const cost = new Date().getTime() - Number(beginTime)+ console.log(`${pm2tips} All tasks were completed! Time cost: ${cost}ms. ${beginTime}`)+ // 初始化 Redis 的一些标识值+ await setRedisValue(`${TASK_NAME}_TOTAL`, '0')+ await setRedisValue(`${TASK_NAME}_CUR_INDEX`, '0')+ await setRedisValue(`${TASK_NAME}_SET_FIRST`, 'false')+ await delRedisKey(`${TASK_NAME}_BEGIN_TIME`)+ await sleep(2000)+ await tasksHandler()}// 获取第一个任务被取得的时间await setBeginTime(redlock)// 从队列中取出一个任务const task = await popTask()// 处理任务await handleTask(task)+ // 任务完成后需要为标识位加一+ try {+ const lock = await redlock.lock(`lock:${TASK_NAME}_CUR_INDEX`, 1000)+ curIndex = await getCurIndex()+ await setCurIndex(curIndex + 1)+ await lock.unlock().catch((e) => e)+ } catch (e) {+ console.log(e)+ }+ // recursion+ await tasksHandler()+}// 递归运行await tasksHandler()}


六、结语
评论
