登录
首页 >  Golang >  Go问答

读取 Kafka 中 AVRO 消息并解码与 kafka 密钥相关联的方法指南

来源:stackoverflow

时间:2024-02-17 13:18:13 467浏览 收藏

Golang小白一枚,正在不断学习积累知识,现将学习到的知识记录一下,也是将我的所得分享给大家!而今天这篇文章《读取 Kafka 中 AVRO 消息并解码与 kafka 密钥相关联的方法指南》带大家来了解一下##content_title##,希望对大家的知识积累有所帮助,从而弥补自己的不足,助力实战开发!


问题内容

我正在使用 Benthos 从 Kafka 读取 AVRO 编码的消息,其中 kafka_key 元数据字段设置为还包含 AVRO 编码的有效负载。这些 AVRO 编码的有效负载的模式存储在模式注册表中,Benthos 有一个 schema_registry_decode 处理器来解码它们。我希望为每个包含两个字段的 Kafka 消息生成一条输出 JSON 消息,一个名为 content 包含解码的 AVRO 消息,另一个名为 metadata 包含 Benthos 收集的各种元数据字段,包括解码的 kafka_key 有效负载。


正确答案


事实证明,可以使用 branch 处理器来实现这一目标,如下所示:

input:
  kafka:
    addresses:
      - localhost:9092
    consumer_group: benthos_consumer_group
    topics:
      - benthos_input

pipeline:
  processors:
    # Decode the message
    - schema_registry_decode:
        url: http://localhost:8081

    # Populate output content field
    - bloblang: |
        root.content = this

    # Decode kafka_key metadata payload and populate output metadata field
    - branch:
        request_map: |
          root = meta("kafka_key")

        processors:
          - schema_registry_decode:
              url: http://localhost:8081

        result_map: |
          root.metadata = meta()
          root.metadata.kafka_key = this

output:
  stdout: {}

到这里,我们也就讲完了《读取 Kafka 中 AVRO 消息并解码与 kafka 密钥相关联的方法指南》的内容了。个人认为,基础知识的学习和巩固,是为了更好的将其运用到项目中,欢迎关注golang学习网公众号,带你了解更多关于的知识点!

声明:本文转载于:stackoverflow 如有侵犯,请联系study_golang@163.com删除
相关阅读
更多>
最新阅读
更多>
课程推荐
更多>