简单设计go-amqp

微信扫一扫,分享到朋友圈

简单设计go-amqp

仓库地址

go get -u github.com/lazychanger/go-amqp

注意事项

  • rabbitmq将连接与管道分开,即 connection
    channel
    connection
    是实体连接,而 channel
    是逻辑连接。所以当我们多线程使用时候,应该是单 connection
    +多 channel
  • connection
    闪断以后, channel
    也会失效。所以重连以后,还需要重新建立 channel
  • 所有的 推送
    消费
    都是基于 channel
    ,所以 channel
    重连以后,还需要重新开启 推送
    消费
  • connection
    闪断以后。 推送
    是无效的,这个时候再去推送,肯定会报错。所以我们重连机制,必须要保证, connection
    在闪断时候,也要保证 推送
    接口正常

channel
管理

思考

我们先定义, channel
到底代表什么?我认为单个 channel
操作单条队列的发送与消费,并且需要对主服务提供重启。既然要提供重启,那就要保证单个 channel
对象的消费与推送,不允许直接与 channel
操作,需要进行操作隔离,防止闪断与重启期间,操作无效的 channel
。要完成操作隔离,隔离 消费
比较简单,收集好相关操作的 handle
,重启以后,重新接入 handle
即可。但是隔离 推送
比较困难,因为对于用户来说, 推送
是实时的,而且我们也无法保证重连时间是多久,所以让用户等待重连是不可行的,那么该怎么操作呢?这里我选择的是内置一条缓存队列,用户 推送
数据,先进入缓存队列,再由缓存队列的消费者取出交由真正的队列,同时也要注意重连时间过长导致内存溢出、服务端重启导致交换队列还清空。所以我们设计的 channel
应该满足以下几点

  • 内置推送交换队列
  • 隔离消费
  • 提供重启机制

    channel
    消费
    推送
    
  • 提供优雅关闭(关机时候需要保障推送交换队列完全推送完毕)

代码案例

package go_amqp
import (
"encoding/json"
"errors"
"fmt"
"github.com/lazychanger/go-amqp/tools"
"github.com/streadway/amqp"
"log"
"sync"
"time"
)
type Queue struct {
sync.Mutex
// 队列名
name string
// 交换机名
exchangeName string
// 队列绑定交换机路由KEY
routingKey string
// 交换机模式
exchange string
// 最大内置队列长度
maxOverstock int64
conn    *amqp.Connection
channel *amqp.Channel
// 1.为什么不采用slice。因为golang的slice我们不方便作为队列操作,每次入出都会引发大量的内存操作。所以采用 map 类型
// 2.为什么采用sync.map。因为入与出是同时执行,异步操作,如果直接采用map类型,会导致脏读、幻读等问题,我们需要对map类型的读写需要锁,而golang有内置完整的读写锁map模型
// 3.采用map类型以后,需要我们自己维护头出尾入以及长度,所以这里有qMaxI、qMinI、ql进行数据记录
q *sync.Map
// 最大index
qMaxI int64
// 最小index
qMinI int64
// 内置队列长度
ql int64
// 是否启动释放库存
isReleaseStock bool
// 是否重启消费
isReloadConsume int
// 是否停止消费
isStopConsume bool
// 是否准备好
already int
close bool
// 消费者
cs []consume
}
type MessageStatus int8
const (
AlreadyStop    = 0
AlreadyReload  = 1
AlreadySucceed = 2
MessageStatusSucceed MessageStatus = 1
MessageStatusError   MessageStatus = 2
MessageStatusRequeue MessageStatus = 3
)
// 提供重启服务
func (q *Queue) Reload(conn *amqp.Connection) error {
// 先关闭状态
q.conn = conn
// 标识重启中
q.already = AlreadyReload
return q.init()
}
// 初始化操作
func (q *Queue) init() error {
var (
ch  *amqp.Channel
err error
)
log.Println("[amqp] queue init start")
q.Lock()
ch, err = q.conn.Channel()
if err != nil {
return err
}
// 创建交换机
if err = ch.ExchangeDeclare(q.exchangeName, q.exchange, false, false, false, false, nil); err != nil {
return errors.New(fmt.Sprintf("[amqp] exchange declare failed, err: %s", err))
}
// 创建队列
if _, err = ch.QueueDeclare(q.name, false, false, false, false, nil); err != nil {
return errors.New(fmt.Sprintf("[amqp] queue declare failed, err: %s", err))
}
// 交换机绑定队列
if err = ch.QueueBind(q.name, q.routingKey, q.exchangeName, false, nil); err != nil {
return errors.New(fmt.Sprintf("[amqp] queue bind failed, err: %s", err))
}
// 管道交换
q.channel = ch
// 告知已经准备完毕
if q.already == AlreadyReload {
q.already = AlreadySucceed
// 重新触发消费
q.reloadConsume()
if !q.isReleaseStock && q.ql > 0 {
log.Println("init release stock")
// 重新触发
go q.releaseStock()
}
}
q.Unlock()
log.Println("[amqp] queue init end")
return nil
}
// 对推送简单封装一下,使json对象推送更加简便
func (q *Queue) PublishJson(v interface{}) error {
body, err := json.Marshal(v)
if err != nil {
return err
}
return q.Publish(body)
}
// 原始推送,要求[]byte
func (q *Queue) Publish(data []byte) error {
// 检查库存,防止溢出
if q.maxOverstock > 0 && q.ql > q.maxOverstock {
return publishOverstock
}
if q.close {
return errors.New("service closing")
}
// 启动锁,防止释放库存时候,共同操作导致脏写
q.Lock()
// 防止并发map创建
if q.q == nil {
q.q = &sync.Map{}
}
// 增加最大下标
q.qMaxI++
// 增加最大长度
q.ql++
// 存储值
q.q.Store(q.qMaxI, data)
q.Unlock()
// 检查释放库存是否启动,未启动并且channel已经准备完毕,就启动库存释放
if !q.isReleaseStock && q.already == AlreadySucceed {
go q.releaseStock()
}
log.Printf("published,now %d", q.ql)
return nil
}
// 内置队列消费,库存释放
func (q *Queue) releaseStock() {
// 判断是否重复启动
q.Lock()
if q.isReleaseStock {
q.Unlock()
log.Println("[amqp] release stock already run")
return
}
// 标记服务已经启动
q.isReleaseStock = true
q.Unlock()
log.Println("[amqp] release stock")
for {
// 如果库存为空或者channel还未准备好就关闭循环
if q.ql == 0 || q.already != AlreadySucceed {
break
}
// 先将当前长度取出,防止循环时候修改,变成脏读
l := q.ql
// 实际启动当前轮次库存释放
for i := int64(0); i < l; i++ {
if q.already != AlreadySucceed {
break
}
// 对库存
q.Lock()
log.Printf("internal queues length: %d", q.ql)
// 对库存最小下标进行+1
q.qMinI++
// 减少库存最大数
q.ql--
// 锁期间,顶部索引不变
min := q.qMinI
q.Unlock()
// 读取内容
body, has := q.q.Load(min)
// 预防脏读
if has && body != nil {
// 推送
_ = q.publish(body.([]byte))
} else {
log.Println("[amqp] data error")
}
// 释放map空间
q.q.Delete(min)
}
// 本轮库存释放已经结束,延迟执行3秒后执行下一轮
ticker := time.NewTicker(time.Second * 3)
select {
case <-ticker.C:
ticker.Stop()
}
}
// 标记关闭
q.isReleaseStock = false
}
// 实际channel向队列发送数据
func (q *Queue) publish(data []byte) error {
return q.channel.Publish(q.exchangeName, q.routingKey, false, false, amqp.Publishing{
Body: data,
})
}
// 添加消费内容,先存储,等待服务启动以后触发
func (q *Queue) Consume(name string, consumeFunc ConsumeFunc, repeat int) error {
q.cs = append(q.cs, consume{
repeat:      tools.IF(repeat <= 0, 1, repeat).(int),
consumeFunc: consumeFunc,
name:        name,
})
// 尝试启动消费
q.reloadConsume()
return nil
}
// 暂停消费
func (q *Queue) StopConsume() {
q.isStopConsume = true
}
// 启动消费
func (q *Queue) StartConsume() {
q.isStopConsume = false
q.reloadConsume()
}
// 实际触发消费
func (q *Queue) reloadConsume() {
// 如果未启动,直接返回
if q.already != AlreadySucceed || q.isStopConsume {
return
}
// 推送重启消费
q.isReloadConsume++
// 记录当前消费重启值
reloadConsume := q.isReloadConsume
// 记录当前channel重启值
for i, c := range q.cs {
// 并发消费
for l := 0; l < c.repeat; l++ {
name := fmt.Sprintf("%s_%d-%d", c.name, i, l)
msgs, err := q.channel.Consume(q.name, name, false, false, false, false, nil)
if err != nil {
log.Fatalf("[AMQP] customer register err;name: %s, %s", name, err)
} else {
go func(c ConsumeFunc, consumeName string, reloadConsume int) {
for msg := range msgs {
switch c(msg.Body, consumeName) {
case MessageStatusSucceed:
case MessageStatusError:
_ = msg.Ack(true)
break
case MessageStatusRequeue:
_ = msg.Reject(true)
break
}
// 如果channel重启或者消费重启,都结束当前消费,防止溢出,或者正在关闭
if q.already != AlreadySucceed || q.isReloadConsume != reloadConsume || q.close || q.isStopConsume {
break
}
}
}(c.consumeFunc, name, reloadConsume)
}
}
}
}
// 优雅重启
func (q *Queue) Close() error {
// 先标记关闭
q.close = true
retry := 0
for {
if q.ql > 0 {
if q.already == AlreadySucceed {
if q.isReleaseStock == false {
q.releaseStock()
}
} else {
retry++
// 如果channel没有准备好,内置队列也没有释放完,则重试三次,三次还没有处理好,就放弃重试
if retry > 3 {
break
}
}
} else {
break
}
ticker := time.NewTicker(time.Second / 2)
select {
case <-ticker.C:
ticker.Stop()
}
}
return q.channel.Close()
}
type consume struct {
name        string
consumeFunc ConsumeFunc
repeat      int
}
type ConsumeFunc func(data []byte, name string) MessageStatus

channel
管理

思考

上面介绍了单个 channel
内部状态维护,那么现在就要开始对这些 channel
进行管理。管理内容如下:

channel

代码

package go_amqp
import (
"github.com/streadway/amqp"
"log"
"sync"
"time"
)
type Connection struct {
// 配置
config *config
// amqp连接
conn *amqp.Connection
// 连接状态
isConnected bool
// 关机提示
done chan bool
// amqp闪断通知
notifyClose chan *amqp.Error
// 多队列(channel),此处认为一个channel管理一条队列
qs []*Queue
}
type config struct {
// 连接AMQP DSN构建驱动
driver Driver
// 最大消息重发次数
maxSendRetries int
// 最大重连次数
maxReconnects int
// 重连延迟时间
reconnectDelay time.Duration
// 最大发送积压数
maxOverstock int64
}
func New(opts ...Option) (*Connection, error) {
// 建立默认连接对象
conn := &Connection{
// 生成默认配置
config: &config{
reconnectDelay: time.Second,
maxReconnects:  0,
maxSendRetries: 3,
},
}
for _, opt := range opts {
// 配置写入
opt(conn.config)
}
// 基础必须配置检查
if conn.config.driver == nil {
return nil, missingConnectionDsnDriver
}
// 断线重连
go conn.handleReconnect()
return conn, nil
}
func (c *Connection) handleReconnect() {
tryConnects := 0
for {
c.isConnected = false
if err := c.connect(); err != nil {
if c.config.maxReconnects > 0 && tryConnects > c.config.maxReconnects {
log.Fatalf("[AMQP] Reconnection times exceeded!(%d)", tryConnects)
return
}
tryConnects += 1
log.Printf("Failed to connect. %s Retrying...(%d)", err, tryConnects)
time.Sleep(c.config.reconnectDelay)
} else {
// clear try connect
tryConnects = 0
}
// 等待下一步信号通知
select {
case <-c.done:
return
case <-c.notifyClose:
}
}
}
//
func (c *Connection) connect() error {
url := c.config.driver.Url()
log.Printf("[amqp] connected. %s", url)
conn, err := amqp.Dial(url)
if err != nil {
return err
}
c.conn = conn
c.notifyClose = make(chan *amqp.Error)
c.conn.NotifyClose(c.notifyClose)
c.isConnected = true
// channel重连
c.queueReconnect()
return nil
}
// channel重连
// 当连接闪断以后,需要重新建立新的连接,所以,所有的channel也需要进行新的连接
func (c *Connection) queueReconnect() {
for _, q := range c.qs {
if err := q.Reload(c.conn); err != nil {
log.Println(err)
}
}
}
// 优雅关闭,
// 关闭channel
// 关闭闪断重连机制
// 关闭connection
func (c *Connection) Close() error {
if c.isConnected {
return alreadyClosed
}
var wg sync.WaitGroup
// 批量关闭
for i := 0; i < len(c.qs); i++ {
wg.Add(1)
go func(idx int, group *sync.WaitGroup) {
_ = c.qs[idx].Close()
group.Done()
}(i, &wg)
}
wg.Wait()
_ = c.conn.Close()
// 关闭
close(c.done)
return nil
}
// 生成单Channel管理,让一个channel管理一条队列的发送与消费
func (c *Connection) Queue(name, exchange, routingKey string) (*Queue, error) {
q := &Queue{
name:         name,
exchange:     amqp.ExchangeDirect,
exchangeName: exchange,
routingKey:   routingKey,
maxOverstock: c.config.maxOverstock,
}
if c.isConnected == true {
if err := q.Reload(c.conn); err != nil {
return nil, err
}
}
c.qs = append(c.qs, q)
return q, nil
}

刚开始写一些技术分享的,很多地方或者结构可能写的比较粗糙,欢迎各位执教、交流。如果有不了解或者新的想法,也可以评论留言沟通。thanks!

微信扫一扫,分享到朋友圈

简单设计go-amqp

充值30万还没排到网游第一 上海一男子抛下妻女离家出走

上一篇

梦工厂动画《疯狂原始人2》国内定档 11月27日院线上映

下一篇

你也可能喜欢

简单设计go-amqp

长按储存图像,分享给朋友