关于golang监听rabbitmq消息队列任务断线自动重连接的问题
来源:脚本之家
时间:2022-12-29 12:55:29 323浏览 收藏
来到golang学习网的大家,相信都是编程学习爱好者,希望在这里学习Golang相关编程知识。下面本篇文章就来带大家聊聊《关于golang监听rabbitmq消息队列任务断线自动重连接的问题》,介绍一下消息队列、RabbitMQ、断线自动重连,希望对大家的知识积累有所帮助,助力实战开发!
golang监听消息队列rabbitmq任务脚本,当rabbimq消息队列断开连接后自动重试,重新唤起协程执行任务
需求背景:
goalng常驻内存任务脚本监听rbmq执行任务
任务脚本由supervisor来管理

当rabbitmq长时间断开连接会出现如下图 进程处于fatal状态


假如因为不可抗拒因素,rabbitmq服务器内存满了或者其它原因导致rabbitmq消息队列服务停止了
如果是短时间的停止重启,supervisor是可以即时唤醒该程序。如果服务器长时间没有恢复正常运行,程序就会出现fatal进程启动失败的状态,此时可以通过告警来提醒开发人员

如果以上告警能时时通知运维人员此问题可以略过了。今天讨论的是如果在长时间断开连接还能在服务器恢复正常情况下自动实现重连。

代码实现一:
消费者:
package main
import (
"fmt"
"github.com/ichunt2019/golang-rbmq-sl/utils/rabbitmq"
)
type RecvPro struct {
}
//// 实现消费者 消费消息失败 自动进入延时尝试 尝试3次之后入库db
/*
返回值 error 为nil 则表示该消息消费成功
否则消息会进入ttl延时队列 重复尝试消费3次
3次后消息如果还是失败 消息就执行失败 进入告警 FailAction
*/
func (t *RecvPro) Consumer(dataByte []byte) error {
//time.Sleep(500*time.Microsecond)
//return errors.New("顶顶顶顶")
fmt.Println(string(dataByte))
//time.Sleep(1*time.Second)
return nil
//消息已经消费3次 失败了 请进行处理
如果消息 消费3次后 仍然失败 此处可以根据情况 对消息进行告警提醒 或者 补偿 入库db 钉钉告警等等
func (t *RecvPro) FailAction(err error,dataByte []byte) error {
fmt.Println(err)
fmt.Println("任务处理失败了,我要进入db日志库了")
fmt.Println("任务处理失败了,发送钉钉消息通知主人")
func main() {
t := &RecvPro{}
//rabbitmq.Recv(rabbitmq.QueueExchange{
// "a_test_0001",
// "",
// "amqp://guest:guest@192.168.2.232:5672/",
//},t,5)
/*
runNums: 表示任务并发处理数量 一般建议 普通任务1-3 就可以了
*/
err := rabbitmq.Recv(rabbitmq.QueueExchange{
"a_test_0001",
"hello_go",
"direct",
"amqp://guest:guest@192.168.1.169:5672/",
},t,4)
if(err != nil){
fmt.Println(err)
}
rabbitmq代码
package rabbitmq
import (
"errors"
"strconv"
"time"
//"errors"
"fmt"
"github.com/streadway/amqp"
"log"
)
// 定义全局变量,指针类型
var mqConn *amqp.Connection
var mqChan *amqp.Channel
// 定义生产者接口
type Producer interface {
MsgContent() string
}
type RetryProducer interface {
// 定义接收者接口
type Receiver interface {
Consumer([]byte) error
FailAction(error , []byte) error
// 定义RabbitMQ对象
type RabbitMQ struct {
connection *amqp.Connection
Channel *amqp.Channel
dns string
QueueName string // 队列名称
RoutingKey string // key名称
ExchangeName string // 交换机名称
ExchangeType string // 交换机类型
producerList []Producer
retryProducerList []RetryProducer
receiverList []Receiver
// 定义队列交换机对象
type QueueExchange struct {
QuName string // 队列名称
RtKey string // key值
ExName string // 交换机名称
ExType string // 交换机类型
Dns string //链接地址
// 链接rabbitMQ
func (r *RabbitMQ)MqConnect() (err error){
mqConn, err = amqp.Dial(r.dns)
r.connection = mqConn // 赋值给RabbitMQ对象
if err != nil {
fmt.Printf("rbmq链接失败 :%s \n", err)
}
return
// 关闭mq链接
func (r *RabbitMQ)CloseMqConnect() (err error){
err = r.connection.Close()
if err != nil{
fmt.Printf("关闭mq链接失败 :%s \n", err)
func (r *RabbitMQ)MqOpenChannel() (err error){
mqConn := r.connection
r.Channel, err = mqConn.Channel()
//defer mqChan.Close()
fmt.Printf("MQ打开管道失败:%s \n", err)
return err
func (r *RabbitMQ)CloseMqChannel() (err error){
r.Channel.Close()
// 创建一个新的操作对象
func NewMq(q QueueExchange) RabbitMQ {
return RabbitMQ{
QueueName:q.QuName,
RoutingKey:q.RtKey,
ExchangeName: q.ExName,
ExchangeType: q.ExType,
dns:q.Dns,
func (mq *RabbitMQ) sendMsg (body string) (err error) {
err = mq.MqOpenChannel()
ch := mq.Channel
log.Printf("Channel err :%s \n", err)
defer mq.Channel.Close()
if mq.ExchangeName != "" {
if mq.ExchangeType == ""{
mq.ExchangeType = "direct"
}
err = ch.ExchangeDeclare(mq.ExchangeName, mq.ExchangeType, true, false, false, false, nil)
if err != nil {
log.Printf("ExchangeDeclare err :%s \n", err)
// 用于检查队列是否存在,已经存在不需要重复声明
_, err = ch.QueueDeclare(mq.QueueName, true, false, false, false, nil)
log.Printf("QueueDeclare err :%s \n", err)
// 绑定任务
if mq.RoutingKey != "" && mq.ExchangeName != "" {
err = ch.QueueBind(mq.QueueName, mq.RoutingKey, mq.ExchangeName, false, nil)
log.Printf("QueueBind err :%s \n", err)
if mq.ExchangeName != "" && mq.RoutingKey != ""{
err = mq.Channel.Publish(
mq.ExchangeName, // exchange
mq.RoutingKey, // routing key
false, // mandatory
false, // immediate
amqp.Publishing {
ContentType: "text/plain",
Body: []byte(body),
})
}else{
"", // exchange
mq.QueueName, // routing key
/*
发送延时消息
*/
func (mq *RabbitMQ)sendDelayMsg(body string,ttl int64) (err error){
err =mq.MqOpenChannel()
return
if ttl
<p>实现重连方式很多,下面实现方式比较简单</p>
<p style="text-align:center"><img alt="" src="/uploads/20221229/167229091663ad22644f2b1.png"></p>
<p>1.Recv方法创建ampq链接</p>
<p>2.启动协程开始执行任务 </p>
<p style="margin-left:40px">MqOpenChannel 打开一个channel通道处理amqp消息</p>
<p style="margin-left:40px">拿到消息 处理任务</p>
<p> 3,协程中捕获异常发送消息到taskQuit
</p><p> 4,主进程监听taskQuit管道 开始尝试重新链接amqp 直到链接成功</p>
<p> 5,重新链接成功后启动新的协程处理任务</p>
<p>主要代码分析:</p>
<pre class="brush:java;">/*
runNums 开启并发执行任务数量
*/
func Recv(queueExchange QueueExchange,receiver Receiver,runNums int) (err error){
mq := NewMq(queueExchange)
//链接rabbitMQ
err = mq.MqConnect()
if(err != nil){
return
}
//rbmq断开链接后 协程退出释放信号
taskQuit:= make(chan struct{}, 1)
//尝试链接rbmq
tryToLinkC := make(chan struct{}, 1)
//开始执行任务
for i:=1;i
<p>到这里,我们也就讲完了《关于golang监听rabbitmq消息队列任务断线自动重连接的问题》的内容了。个人认为,基础知识的学习和巩固,是为了更好的将其运用到项目中,欢迎关注golang学习网公众号,带你了解更多关于golang的知识点!</p>
声明:本文转载于:脚本之家 如有侵犯,请联系study_golang@163.com删除
相关阅读
更多>
-
161 收藏
-
148 收藏
-
260 收藏
-
260 收藏
-
368 收藏
最新阅读
更多>
-
150 收藏
-
132 收藏
-
450 收藏
-
209 收藏
-
237 收藏
-
188 收藏
-
317 收藏
-
209 收藏
-
351 收藏
-
212 收藏
-
126 收藏
-
316 收藏
课程推荐
更多>
-
- 前端进阶之JavaScript设计模式
- 设计模式是开发人员在软件开发过程中面临一般问题时的解决方案,代表了最佳的实践。本课程的主打内容包括JS常见设计模式以及具体应用场景,打造一站式知识长龙服务,适合有JS基础的同学学习。
- 立即学习 543次学习
-
- GO语言核心编程课程
- 本课程采用真实案例,全面具体可落地,从理论到实践,一步一步将GO核心编程技术、编程思想、底层实现融会贯通,使学习者贴近时代脉搏,做IT互联网时代的弄潮儿。
- 立即学习 516次学习
-
- 简单聊聊mysql8与网络通信
- 如有问题加微信:Le-studyg;在课程中,我们将首先介绍MySQL8的新特性,包括性能优化、安全增强、新数据类型等,帮助学生快速熟悉MySQL8的最新功能。接着,我们将深入解析MySQL的网络通信机制,包括协议、连接管理、数据传输等,让
- 立即学习 500次学习
-
- JavaScript正则表达式基础与实战
- 在任何一门编程语言中,正则表达式,都是一项重要的知识,它提供了高效的字符串匹配与捕获机制,可以极大的简化程序设计。
- 立即学习 487次学习
-
- 从零制作响应式网站—Grid布局
- 本系列教程将展示从零制作一个假想的网络科技公司官网,分为导航,轮播,关于我们,成功案例,服务流程,团队介绍,数据部分,公司动态,底部信息等内容区块。网站整体采用CSSGrid布局,支持响应式,有流畅过渡和展现动画。
- 立即学习 485次学习