登录
首页 >  Golang >  Go问答

无法停用Golang Kafka消费者的融合

来源:stackoverflow

时间:2024-02-14 16:09:22 195浏览 收藏

今天golang学习网给大家带来了《无法停用Golang Kafka消费者的融合》,其中涉及到的知识点包括等等,无论你是小白还是老手,都适合看一看哦~有好的建议也欢迎大家在评论留言,若是看完有所收获,也希望大家能多多点赞支持呀!一起加油学习~

问题内容

我在程序执行结束时遇到了有关处理 kafka 消费者的问题。这是负责关闭消费者的代码

func(kc *kafkaconsumer) dispose() {
    sugar.info("disposing of consumer")
    kc.mu.lock()
    kc.consumer.close();
    sugar.info("disposed of consumer")
    kc.mu.unlock()
}

正如您可能已经注意到的,我正在使用sync.mutex,因为消费者是由多个goroutine访问的。下面是另一个负责从 kafka 读取消息的代码片段

  func (kc *KafkaConsumer) Consume(signalChan chan os.Signal, ctx context.Context) {
    for{
        select{
        case sig := <-signalChan:
            Sugar.Info("Caught signal %v", sig)
            break
        case <-ctx.Done():
            Sugar.Info("Got context done message. Closing consumer...")
            kc.Dispose()
            break
        default:
            for{
                message, err := kc.Consumer.ReadMessage(-1); if err != nil{
                    Log.Error(err.Error())
                    return
                }
                Sugar.Infof("Got a new message %v",message)
                resp := make(chan *KafkaResponseEntity)
                go router.UseMessage(*message, resp, ctx)
                //Potential deadlock
                response := <-resp
                /*
                    Explicit commit of an offset in order to ensure 
                    that request has been successfully processed
                */
                kc.Consumer.Commit()
                Sugar.Info("Successfully commited an offset")
                Sugar.Infof("Just got a response %v", response)
                go producer.KP.Produce(response.PaymentId, response.Bytes, "some_random_topic")
            }
        }
    }
}

问题是,当关闭消费者时,程序执行就会停止。 有什么问题吗?我应该将 cond 与互斥体一起使用吗?如果您对我的代码中可能出现的问题提供详尽的解释,我将非常高兴。 提前致谢。


解决方案


我怀疑这是挂起的原因:

kc.consumer.readmessage(-1)

其中 the documentation 状态将无限期阻止,因此它没有关闭。最简单的方法是将该值设置为正的持续时间(例如 1 * time.second),但是如果在超时时间内未消耗消息,则可能会出现超时错误。超时错误通常是无害的,但需要考虑链接文档中的问题:

Timeout is returned as (nil, err) where err is
`err.(kafka.Error).Code() == kafka.ErrTimedOut`

我还不确定有什么好方法来利用无限期阻塞并允许它被中断。如果有人知道请发布调查结果!

终于介绍完啦!小伙伴们,这篇关于《无法停用Golang Kafka消费者的融合》的介绍应该让你收获多多了吧!欢迎大家收藏或分享给更多需要学习的朋友吧~golang学习网公众号也会发布Golang相关知识,快来关注吧!

声明:本文转载于:stackoverflow 如有侵犯,请联系study_golang@163.com删除
相关阅读
更多>
最新阅读
更多>
课程推荐
更多>