登录
首页 >  Golang >  Go问答

如何通过segmentio/kafka-go读取Kafka中的全部消息?

来源:stackoverflow

时间:2024-02-17 18:39:20 206浏览 收藏

一分耕耘,一分收获!既然都打开这篇《如何通过segmentio/kafka-go读取Kafka中的全部消息?》,就坚持看下去,学下去吧!本文主要会给大家讲到等等知识点,如果大家对本文有好的建议或者看到有不足之处,非常欢迎大家积极提出!在后续文章我会继续更新Golang相关的内容,希望对大家都有所帮助!

问题内容

我运行包文档 segmentio/kafka-go 中的示例,但在其中我一次收到 1 条消息。

有没有办法一次性读取kafka中积累的所有消息并立即解析成[]mytype

func main() {

    // to consume messages
    kafkaBrokerUrl := "localhost:9092"
    topic := "test"

    conn, err := kafka.DialLeader(context.Background(), "tcp", kafkaBrokerUrl, topic, 0)
    if err != nil {
        log.Fatal("failed to dial leader:", err)
    }

    batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max

    b := make([]byte, 10e3) // 10KB max per message
    for {
        n, err := batch.Read(b)
        if err != nil {
            break
        }
        fmt.Println(string(b[:n]))
    }
}

正确答案


kafka对批处理没有帮助,如果你的意思是滑动窗口机制在特定时间获取一批流,大小最好使用apache kafka和apache flink连接器,flink提供滑动窗口机制,hdfs存储和 flink 通过检查点提供了更好的容错能力。不幸的是,可用的 flink go sdk 不稳定。

您可以轻松提交“[]type”作为消息,而不是“type” 另外,将其解码为“[]type”。

var ts []Type //data

// encode the data 
var bu bytes.Buffer        
enc := gob.NewEncoder(&bu) 
err := enc.Encode(ts)
if err != nil {
    log.Fatal("encode error:", err)
}

//Submit the message or messages
msg := kafka.Message{
        Key:   key,
        Value: bu.Bytes(),
    }
kafkaWriter := &kafka.Writer{
    Addr:     kafka.TCP([]string{Broker1}...),
    Topic:    Topic,
    Balancer: &kafka.LeastBytes{},
}
err := kafkaWriter.WriteMessages(ctx,
    msg,
)
if err != nil {
    return err
}

文中关于的知识介绍,希望对你的学习有所帮助!若是受益匪浅,那就动动鼠标收藏这篇《如何通过segmentio/kafka-go读取Kafka中的全部消息?》文章吧,也可关注golang学习网公众号了解相关技术文章。

声明:本文转载于:stackoverflow 如有侵犯,请联系study_golang@163.com删除
相关阅读
更多>
最新阅读
更多>
课程推荐
更多>