登录
首页 >  Golang >  Go教程

关于golang监听rabbitmq消息队列任务断线自动重连接的问题

来源:脚本之家

时间:2022-12-29 12:55:29 323浏览 收藏

来到golang学习网的大家,相信都是编程学习爱好者,希望在这里学习Golang相关编程知识。下面本篇文章就来带大家聊聊《关于golang监听rabbitmq消息队列任务断线自动重连接的问题》,介绍一下消息队列、RabbitMQ、断线自动重连,希望对大家的知识积累有所帮助,助力实战开发!

golang监听消息队列rabbitmq任务脚本,当rabbimq消息队列断开连接后自动重试,重新唤起协程执行任务

需求背景:

goalng常驻内存任务脚本监听rbmq执行任务

任务脚本由supervisor来管理

当rabbitmq长时间断开连接会出现如下图 进程处于fatal状态

假如因为不可抗拒因素,rabbitmq服务器内存满了或者其它原因导致rabbitmq消息队列服务停止了

如果是短时间的停止重启,supervisor是可以即时唤醒该程序。如果服务器长时间没有恢复正常运行,程序就会出现fatal进程启动失败的状态,此时可以通过告警来提醒开发人员

如果以上告警能时时通知运维人员此问题可以略过了。今天讨论的是如果在长时间断开连接还能在服务器恢复正常情况下自动实现重连。

代码实现一:

消费者:

package main
import (
    "fmt"
    "github.com/ichunt2019/golang-rbmq-sl/utils/rabbitmq"
)
type RecvPro struct {
}
//// 实现消费者 消费消息失败 自动进入延时尝试  尝试3次之后入库db
/*
返回值 error 为nil  则表示该消息消费成功
否则消息会进入ttl延时队列  重复尝试消费3次
3次后消息如果还是失败 消息就执行失败  进入告警 FailAction
 */
func (t *RecvPro) Consumer(dataByte []byte) error {
    //time.Sleep(500*time.Microsecond)
    //return errors.New("顶顶顶顶")
    fmt.Println(string(dataByte))
    //time.Sleep(1*time.Second)
    return nil
//消息已经消费3次 失败了 请进行处理
如果消息 消费3次后 仍然失败  此处可以根据情况 对消息进行告警提醒 或者 补偿  入库db  钉钉告警等等
func (t *RecvPro) FailAction(err error,dataByte []byte) error {
    fmt.Println(err)
    fmt.Println("任务处理失败了,我要进入db日志库了")
    fmt.Println("任务处理失败了,发送钉钉消息通知主人")
func main() {
    t := &RecvPro{}
    //rabbitmq.Recv(rabbitmq.QueueExchange{
    //    "a_test_0001",
    //    "",
    //    "amqp://guest:guest@192.168.2.232:5672/",
    //},t,5)
    /*
        runNums: 表示任务并发处理数量  一般建议 普通任务1-3    就可以了
     */
    err := rabbitmq.Recv(rabbitmq.QueueExchange{
        "a_test_0001",
        "hello_go",
        "direct",
        "amqp://guest:guest@192.168.1.169:5672/",
    },t,4)
    if(err != nil){
        fmt.Println(err)
    }

rabbitmq代码

package rabbitmq

import (
    "errors"
    "strconv"
    "time"
    //"errors"
    "fmt"
    "github.com/streadway/amqp"
    "log"
)
// 定义全局变量,指针类型
var mqConn *amqp.Connection
var mqChan *amqp.Channel
// 定义生产者接口
type Producer interface {
    MsgContent() string
}
type RetryProducer interface {
// 定义接收者接口
type Receiver interface {
    Consumer([]byte)    error
    FailAction(error , []byte)  error
// 定义RabbitMQ对象
type RabbitMQ struct {
    connection *amqp.Connection
    Channel *amqp.Channel
    dns string
    QueueName   string            // 队列名称
    RoutingKey  string            // key名称
    ExchangeName string           // 交换机名称
    ExchangeType string           // 交换机类型
    producerList []Producer
    retryProducerList []RetryProducer
    receiverList []Receiver
// 定义队列交换机对象
type QueueExchange struct {
    QuName  string           // 队列名称
    RtKey   string           // key值
    ExName  string           // 交换机名称
    ExType  string           // 交换机类型
    Dns     string              //链接地址
// 链接rabbitMQ
func (r *RabbitMQ)MqConnect() (err error){
    mqConn, err = amqp.Dial(r.dns)
    r.connection = mqConn   // 赋值给RabbitMQ对象
    if err != nil {
        fmt.Printf("rbmq链接失败  :%s \n", err)
    }
    return
// 关闭mq链接
func (r *RabbitMQ)CloseMqConnect() (err error){
    err = r.connection.Close()
    if err != nil{
        fmt.Printf("关闭mq链接失败  :%s \n", err)
func (r *RabbitMQ)MqOpenChannel() (err error){
    mqConn := r.connection
    r.Channel, err = mqConn.Channel()
    //defer mqChan.Close()
        fmt.Printf("MQ打开管道失败:%s \n", err)
    return err
func (r *RabbitMQ)CloseMqChannel() (err error){
    r.Channel.Close()
// 创建一个新的操作对象
func NewMq(q QueueExchange) RabbitMQ {
    return RabbitMQ{
        QueueName:q.QuName,
        RoutingKey:q.RtKey,
        ExchangeName: q.ExName,
        ExchangeType: q.ExType,
        dns:q.Dns,
func (mq *RabbitMQ) sendMsg (body string) (err error)  {
    err = mq.MqOpenChannel()
    ch := mq.Channel
        log.Printf("Channel err  :%s \n", err)
    defer mq.Channel.Close()
    if mq.ExchangeName != "" {
        if mq.ExchangeType == ""{
            mq.ExchangeType = "direct"
        }
        err =  ch.ExchangeDeclare(mq.ExchangeName, mq.ExchangeType, true, false, false, false, nil)
        if err != nil {
            log.Printf("ExchangeDeclare err  :%s \n", err)
    // 用于检查队列是否存在,已经存在不需要重复声明
    _, err = ch.QueueDeclare(mq.QueueName, true, false, false, false, nil)
        log.Printf("QueueDeclare err :%s \n", err)
    // 绑定任务
    if mq.RoutingKey != "" && mq.ExchangeName != "" {
        err = ch.QueueBind(mq.QueueName, mq.RoutingKey, mq.ExchangeName, false, nil)
            log.Printf("QueueBind err :%s \n", err)
    if mq.ExchangeName != "" && mq.RoutingKey != ""{
        err = mq.Channel.Publish(
            mq.ExchangeName,     // exchange
            mq.RoutingKey, // routing key
            false,  // mandatory
            false,  // immediate
            amqp.Publishing {
                ContentType: "text/plain",
                Body:        []byte(body),
            })
    }else{
            "",     // exchange
            mq.QueueName, // routing key
/*
发送延时消息
 */
func (mq *RabbitMQ)sendDelayMsg(body string,ttl int64) (err error){
    err =mq.MqOpenChannel()
            return
    if ttl 

实现重连方式很多,下面实现方式比较简单

1.Recv方法创建ampq链接

2.启动协程开始执行任务 

MqOpenChannel 打开一个channel通道处理amqp消息

拿到消息 处理任务

  3,协程中捕获异常发送消息到taskQuit

  4,主进程监听taskQuit管道 开始尝试重新链接amqp 直到链接成功

  5,重新链接成功后启动新的协程处理任务

主要代码分析:

/*
runNums  开启并发执行任务数量
 */
func Recv(queueExchange QueueExchange,receiver Receiver,runNums int) (err error){
    mq := NewMq(queueExchange)
    //链接rabbitMQ
    err = mq.MqConnect()
    if(err != nil){
        return
    }
    //rbmq断开链接后 协程退出释放信号
    taskQuit:= make(chan struct{}, 1)
    //尝试链接rbmq
    tryToLinkC := make(chan struct{}, 1)
    //开始执行任务
    for i:=1;i

到这里,我们也就讲完了《关于golang监听rabbitmq消息队列任务断线自动重连接的问题》的内容了。个人认为,基础知识的学习和巩固,是为了更好的将其运用到项目中,欢迎关注golang学习网公众号,带你了解更多关于golang的知识点!

声明:本文转载于:脚本之家 如有侵犯,请联系study_golang@163.com删除
相关阅读
更多>
最新阅读
更多>
课程推荐
更多>
评论列表