登录
首页 >  Golang >  Go问答

使用Kafka-go Segmentio库创建Kafka主题的步骤

来源:stackoverflow

时间:2024-03-18 11:09:29 256浏览 收藏

使用 Segmentio 库的 Kafka-go 创建主题需要连接到 Kafka leader 节点。如果连接到非 leader 节点,则会收到“不是控制器”错误。要获取 leader 连接,可以使用 Dial 方法,并使用 Controller 方法获取控制器信息。然后,使用 controllerConn 变量连接到控制器,并使用 CreateTopics 方法创建主题。

问题内容

我可以获得使用segmentio的kafka-go创建主题的示例吗?

我尝试创建如下主题:

c, _ := kafka.Dial("tcp", "host:port")
kt := kafka.TopicConfig{Topic: "sometopic", NumPartitions: 1, ReplicationFactor: 1}
e := c.CreateTopics(kt)

但这仅在给定的主机:端口是 kafka leader 时才有效。如果 host:port 不是 kafka leader,那么我将收到此错误:

不是控制器:这不是该集群的正确控制器*

传递集群地址以创建主题的正确方法是什么?

kafka segmentio:github.com/segmentio/kafka-go


解决方案


就像 shmsr 所说 - 您需要获得领导者连接才能创建主题。您可以通过以下方式做到这一点:

conn, err := kafka.Dial("tcp", "host:port")
if err != nil {
    panic(err.Error())
}
defer conn.Close()

controller, err := conn.Controller()
if err != nil {
    panic(err.Error())
}
controllerConn, err := kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
if err != nil {
    panic(err.Error())
}
defer controllerConn.Close()

topicConfigs := []kafka.TopicConfig{{Topic: "sometopic", NumPartitions: 1, ReplicationFactor: 1}}

err = controllerConn.CreateTopics(topicConfigs...)
if err != nil {
    panic(err.Error())
}

到这里,我们也就讲完了《使用Kafka-go Segmentio库创建Kafka主题的步骤》的内容了。个人认为,基础知识的学习和巩固,是为了更好的将其运用到项目中,欢迎关注golang学习网公众号,带你了解更多关于的知识点!

声明:本文转载于:stackoverflow 如有侵犯,请联系study_golang@163.com删除
相关阅读
更多>
最新阅读
更多>
课程推荐
更多>