Node.js+Redis实现消息队列的最佳实践

微信扫一扫,分享到朋友圈

Node.js+Redis实现消息队列的最佳实践

问题来由

最近在开发一个前端监控项目,由于技术栈中使用到了 Node + Redis 作为消息队列实现,因此这里记录下在 Node 中通过 Redis 来实现消息队列时的 使用方法注意事项

什么是消息队列

消息队列,是一种存放 消息 是队列结构,可以用来解决 分布式系统通信 从而解耦系统模块异步任务处理请求消峰限流 的问题。

既然叫做队列,那它一般是从一侧推入消息,从另一侧消费消息;大概是如下的流程。

在我的需求当中,我用消息队列来做异步的入库处理。

我通过 Node 做了一个对外的日志接收层(即图中 Koa Server)用于接收上报日志,当Koa Server接收完成会立即给用户响应 OK,因为用户是没必要去感知后端日志的入库结果的。

因此 Koa Server 收到日志后,将消息放入 Redis 消息队列即可。另外一端,我启动了一个 消费 程序(即上图中的日志入库模块,它也是一个 Node 脚本)来对MQ消息进行读取并进行入库操作。

Redis 如何做消息队列

消息队列,其实有 2种类型。一种是基于 队列模型 的,一种是基于 订阅发布模式 的。

对于 订阅发布模式 来说,是指的多个消费者都可以订阅某一个 channel 的消息,当channel中来了消息,所有的订阅者都会收到通知,并且所有的订阅者都可以对同一个消息进行处理(消费)。

对于 队列模型 来说,当消息入队后,在另一端只出队一次,如果有多个消费者在等待这个队列,那么只有一个消费者能拿到这个消息进行处理。

在 Redis 中,以上 2 种模型,分别通过 pub/sub 功能和 list 结构可以来实现。

对于我的日志接收场景来说,我期望的是无论我后端有多少个 入库消费者 ,我希望同一条上报只能入库一次。因此对我来说,我需要使用 队列模型 来实现消息队列,即使用 Redis 的 List 结构。

CLI 简单实验

我们通过 redis-cli 来简单实验下 list 结构是如何当做消息队列的。

首先,通过 lpush 命令往 redis 中某个队列的左侧推入一条消息:

lpush my_mq abc

这样,我们就往 my_mq 这个队列推入了一条内容为 abc 的消息。由于此时并没有消费者,所以这条消息依然存在于队列当中。我们甚至可以再次往里推入第2条 def 消息,并通过 llen 命令来查看当前队列的长度。

接下来,我们在另外一个命令行窗口输入:

rpop my_mq

意思是从 my_mq 队列的右侧拿出一条消息。结果:

阻塞模式实验

Redis 的 List 结构,为了方便大家当做消息队列。提供了一种阻塞模式。 阻塞和非阻塞有什么区别呢?

我们用一个新命令行窗口,去执行 阻塞等待消息 :

brpop my_mq 0

注意后面要加一个 超时时间 ,0就表示一直阻塞等待。然后,我们看到 redis 命令行就阻塞在这里了,处于等待消息的状态:

而如果使用 rpop 非阻塞命令的话,则会返回空并直接退出等待:

因此,可以发现,阻塞非阻塞模式,最大的区别:是在于当消息队列为空的时候,阻塞模式不会退出等待,而非阻塞模式则会直接返回空并退出等待。

brpop 正在等待的时候,我们往队列中 push 一个消息:

lpush my_mq 123

可以看到,阻塞模式的消费端,收到了 123 这个消息,同时自己也退出了等待:

这说明:阻塞模式并不是会 循环等待 ,而是 等到一条消息就退出等待 。所谓的阻塞,只是 当还未等到1条消息时,则阻塞等待;当等到1条消息,即立刻退出 。 这将给我们编写 Node 代码提供一些启发。

Node 如何使用

到了重点了。我们在 Node 中编码来使用 redis 消息队列,跟在 cli 界面使用的方式是一样的。但是需要我们考虑如何编写 消费者 端的代码,才能实现所谓的 持续监听队列 。毕竟,我们的 消费者 是需要常驻进程,持续监听队列消息的。并不是说 收到一个消息就退出进程

因此,我们需要编写一个

  • 能常驻的Node进程,能够持续的等待 redis 队列消息
  • 当收到1条消息,便由 Node 脚本处理;处理完要继续等待队列中下一条消息。如此循环往复。

首先,我们可以这样编写代码来在 Node 中创建 redis 客户端:

const redis = require('promise-redis-client')
let client = redis.createClient(...options)
client.on('error', err => {
console.log('redis链接出错')
})
client.on('ready', () => {
console.log('redis ready')
})

为了实现 当redis客户端创建完毕,再开启消息队列监听 ,我们把上面的代码,封装成一个模块,用 promise 方式导出:

// redis.js
const redis = require('promise-redis-client')
exports.createClient = function() {
return new Promise((resolve, reject) => {
let client = redis.createClient(...options)
client.on('error', err => {
console.log('redis 连接出错')
reject(err)
})
client.on('ready', () => {
console.log('redis ready')
resolve(client)
})
})
}

OK,接下来,我们可以去 app.js 中编写队列的消费者代码。为了更优雅的使用 async/await,我们可以这样来编写一个 startWait 函数:

async function startWaitMsg(client) {
...
}

然后,在 client ready 的时候,去启动它:

const { createClient } = require('./redis.js')
const c = createClient()
client.then(async c => {
await startWaitMsg(c)
})

最难的地方在于,startWaitMsg 函数该如何编写。由于我们使用了 promise 版本的 redis库。因此,我们可以像这样去读取一个消息:

async function startWaitMsg(client) {
await client.rpop('my_mq')
}

但这样写的话,redis返回消息后,node继续往后执行,最终 startWaitMsg 函数就执行结束了。尽管整个 Node 进程会因为 redis 连接未断开而不会退出,但 node 此时已经无法再次去执行 client.rpop 这句代码了,也因此无法再次从消息队列中获取新来的消息。

循环实现持续等待

我们想到,可以使用循环来实现 持续监听队列 。于是,把代码改成:

async function startWaitMsg(client) {
while(true) {
await client.rpop('my_mq')
}
}

如此便实现了 持续执行 rpop 指令 。然而,如果你在 rpop 代码后面加一行日志打印的话,会观察到 client.rpop 在持续打印 null。

这是因为,rpop 指令是 非阻塞的 ,因此当队列没有消息,他便返回一个 null,由此触发你的 while 循环在不断执行。这会导致我们程序占用过多的 cpu时间片,且对 redis 网络IO有过多的没必要的消耗。

整个while循环不停的执行,只有执行rpop这一行的时候会短暂释放一下EventLoop给其他代码,这对脚本性能影响也会较大。国家提倡节能减排,这显然不是最优雅的。

使用阻塞模式

让我们来用上 redis 队列的阻塞模式试试。

async function startWaitMsg(c) {
while(true) {
const res = await c.brpop('my_mq', 0)
console.log('收到消息', res)
}
}

通过 brpop 指令,可以让 brpop 代码阻塞在这里。这里所谓的 阻塞 并不是对 Node 程序的阻塞,而是 redis 客户端自身的阻塞。实际上对 Node 进程来说,无论是 rpop 还是 brpop 都是 非阻塞 的异步 IO操作,只是在消息队列为空时 rpop 底层会立刻返回null,从而node进程会 resolve一个空,而 brpop 会在底层redis阻塞等待消息,消息到达后再给 Node 进程通知 resolve。

因此,brpop 对 Node 来说,可以避免自己实现队列的内容轮询,可以在等待IO回调期间将cpu留给其他任务。从而大大减少 Node 进程的 CPU 消耗。

redis断开无法继续消费的问题

在代码运行过程中,出现了一个新的问题: redis 客户端会在某些情况下断开连接(可能由于网络等原因)。而通过分析日志发现:一旦发生连接异常,我们的消费者脚本就无法继续接收新的消息了(我的日志入库功能失效)。

经过分析,发现问题原因依然在于我们的 while 语句 和 brpop 的配合问题。

当 redis client 对象发生连接异常时,会向当前正在等待的 brpop 代码抛出一个 reject 异常。我们回看上述代码的 startWait 函数:

async function startWaitMsg(c) {
while(true) {
const res = await c.brpop('my_mq', 0)
console.log('收到消息', res)
}
}

如果 await brpop 这一行抛出 reject 异常,由于我们未捕获该异常,则异常会抛出 startWaitMsg 函数,结果就是 while 循环被退出了。

思考如何解决

事实上,当连接出现问题,我们需要对 client 进行重连。不过,这个重连机制,redisclient 会自动进行,因此我们的代码要做的仅仅只需要 保证while循环能在异常时恢复 。于是,我们在发生异常时,continue 一下:

async function startWaitMsg(c) {
while(true) {
let res = null
try {
res = await c.brpop('my_mq', 0)
console.log('收到消息', res)
}
catch(err) {
console.log('brpop 出错,重新brpop')
continue
}
// ... 消息处理任务
}
}

由于 redis 客户端内部的重连过程不会再触发 reject (只是断开连接的时候触发一次),因此 continue 之后的 brpop 又会重新 “阻塞” 等待,由此,我们的 消费者 便可以正常活着了。

最终代码

  • 客户端连接代码文件:redis.js
const redis = require('promise-redis-client')
exports.createClient = function() {
return new Promise((resolve, reject) => {
let client = redis.createClient(...options)
client.on('error', err => {
console.log('redis 连接出错')
reject(err)
})
client.on('ready', () => {
console.log('redis ready')
resolve(client)
})
})
}

app.js

const { createClient } = require('./redis.js')
const c = createClient()
client.then(async c => {
await startWaitMsg(c) // 启动消息监听
})
async function startWaitMsg(c) {
while(true) {
let res = null
try {
res = await c.brpop('my_mq', 0)
console.log('收到消息', res)
}
catch(err) {
console.log('brpop 出错,重新brpop')
continue
}
// ... 消息处理任务
}
}

总结

  • redis 的 list 数据结构,可以用作实现 队列模式消息队列
  • Node 中可以通过 while(true) 实现队列的持续循环监听
  • 通过 brpop 阻塞指令的使用,可以避免 cpu 空转来监听队列
  • Node 中要注意 redis 连接断开时的错误处理,以避免因出错导致无法重新监听队列

苹果中国力度空前:大量无版号游戏被封 半日下架逾2.6万款

上一篇

概念回顾:JS执行上下文

下一篇

你也可能喜欢

Node.js+Redis实现消息队列的最佳实践

长按储存图像,分享给朋友