登录
首页 >  Golang >  Go问答

使用 confluence-kafka-go 进行 kafka 消费者的偏移量更改

来源:stackoverflow

时间:2024-02-08 10:15:22 352浏览 收藏

亲爱的编程学习爱好者,如果你点开了这篇文章,说明你对《使用 confluence-kafka-go 进行 kafka 消费者的偏移量更改》很感兴趣。本篇文章就来给大家详细解析一下,主要介绍一下,希望所有认真读完的童鞋们,都有实质性的提高。

问题内容

我使用此配置创建一个新的消费者:

c, err := kafka.newconsumer(&kafka.configmap{
                 "bootstrap.servers": addresses,
                 "group.id":          "my_group",
                 "auto.offset.reset": "earliest",
         })
topic := "testtopic"
if err = c.subscribetopics([]string{topic}, nil); err != nil {
    panic(err)
}

然后我根据以下代码生成事件并使用一个事件:

events := []map[string]string{
{                                     
        "name":       "Foo",
},                                                 
{                                                  
        "name":       "Bar",                                                       
},                                                 
}                                                          
                                                                                                    
err = p.ProduceEvent(events[0])//there is a wrapper to produce events       
err = p.ProduceEvent(events[1])                                    
                                                                                   
res, err := c.ReadMessage(100 * time.Second)                                              
time.Sleep(20 * time.Second)                                                       
                                                                                                              
c.Close()

当我描述这个团体时 观看 /home/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my_group --describe。每一步的结果为:

  1. 生成事件后:

  2. 当我消费一个事件时:

  3. 关闭消费者后:

我不明白为什么延迟最终为零!我刚刚消费了一项活动。这对我来说很奇怪, clos​​e 会改变偏移量。任何线索表示赞赏。


正确答案


ReadMessage 包装 PollPoll 获取一批消息并在本地缓冲。由于您已将使用者配置为自动提交偏移量,因此它将提交所有获取的消息,甚至是本地缓存且应用程序尚未处理的消息。这就是为什么您会看到关闭消费者后没有延迟。

librdkafka(因此 confluence-kafka-go)无法配置 max.pool.records 因此,如果您想准确控制提交哪些偏移量,则需要禁用自动提交偏移量并手动提交它们使用 StoreOffsetshttps://github.com/confluentinc/confluent-kafka-go/issues/380#issuecomment-539903016

好了,本文到此结束,带大家了解了《使用 confluence-kafka-go 进行 kafka 消费者的偏移量更改》,希望本文对你有所帮助!关注golang学习网公众号,给大家分享更多Golang知识!

声明:本文转载于:stackoverflow 如有侵犯,请联系study_golang@163.com删除
相关阅读
更多>
最新阅读
更多>
课程推荐
更多>