登录
首页 >  Golang >  Go问答

确保每条消息能够成功处理

来源:stackoverflow

时间:2024-02-19 21:06:26 217浏览 收藏

小伙伴们有没有觉得学习Golang很有意思?有意思就对了!今天就给大家带来《确保每条消息能够成功处理》,以下内容将会涉及到,若是在学习中对其中部分知识点有疑问,或许看了本文就能帮到你!

问题内容

下面是一个包含 3 个 Go 例程的服务,用于处理来自 Kafka 的消息:

Channel-1 和 Channel-2 是 Go 中的无缓冲数据通道。通道就像一个排队机制。

Goroutine-1 从 kafka 主题读取消息,在验证消息后将其消息负载扔到 Channel-1 上。

Goroutine-2 从 Channel-1 读取并处理有效负载,并将处理后的有效负载扔到 Channel-2 上。

Goroutine-3 从 Channel-2 读取并将处理后的有效负载封装到 http 数据包中,并向另一个服务执行 http 请求(使用 http 客户端)。

上述流程中的漏洞:在我们的例子中,由于服务之间的网络连接不良或远程服务尚未准备好接受来自 Go-routine3 的 http 请求(http 客户端超时)而导致处理失败,因此上述服务丢失该消息(已从 Kafka 主题中读取)。

Goroutine-1 当前订阅了来自 Kafka 的消息,没有发送到 Kafka 的确认(以通知 Goroutine-3 已成功处理特定消息)

正确性优先于性能。

如何确保每条消息都被成功处理?


正确答案


例如,通过新的 channel-3 添加从 goroutine-3 到 goroutine-1 的反馈。 goroutine-1 将阻塞,直到获得 channel-3 的确认。

// in gorouting 1
channel1 <- data
select {
    case <-channel3:
    case <-ctx.Done(): // or smth else to prevent deadlock 
}
...
// in gorouting 3
data := <-channel2
for {
    if err := sendData(data); err == nil {
        break
    }
}
channel3<-struct{}{}

为了确保正确性,您需要在处理成功完成后提交(=确认)消息。
对于处理没有成功完成的情况——一般情况下,你需要自己实现重试机制。
这应该特定于您的用例,但通常您将消息扔回到专用的 kafka 重试主题(您创建的),添加睡眠并再次处理消息。如果 x 次后处理失败 - 您将消息扔到 dlq(=死信队列)。
您可以在这里阅读更多内容:
https://eng.uber.com/reliable-reprocessing/
https://www.confluent.io/blog/error-handling-patterns-in-kafka/

文中关于的知识介绍,希望对你的学习有所帮助!若是受益匪浅,那就动动鼠标收藏这篇《确保每条消息能够成功处理》文章吧,也可关注golang学习网公众号了解相关技术文章。

声明:本文转载于:stackoverflow 如有侵犯,请联系study_golang@163.com删除
相关阅读
更多>
最新阅读
更多>
课程推荐
更多>