登录
首页 >  Golang >  Go问答

使用 Kafka-Go,为什么我会看到似乎正在批量读取/写入的内容?有我缺少的配置吗?

来源:stackoverflow

时间:2024-04-17 21:27:31 201浏览 收藏

各位小伙伴们,大家好呀!看看今天我又给各位带来了什么文章?本文标题《使用 Kafka-Go,为什么我会看到似乎正在批量读取/写入的内容?有我缺少的配置吗?》,很明显是关于Golang的文章哈哈哈,其中内容主要会涉及到等等,如果能帮到你,觉得很不错的话,欢迎各位多多点评和分享!

问题内容

我将从 rabbitmq 切换到 kafka。这只是一个简单的秒杀,看看 kafka 是如何运作的。我不确定是否缺少某些设置,是否是我的代码,是否是 kafka-go,或者这是否是预期的 kafka 行为。

我尝试调整 batchsize 以及 batchtimeout 但都没有产生影响。

下面的代码创建一个具有 6 个分区且复制因子为 3 的主题。然后,它每隔 100ms 生成一条递增消息。它启动 6 个消费者,每个分区一个。读取和写入都是在 go 例程中执行的。

在下面的日志中,它在 7 秒内没有收到消息,然后收到突发消息。我正在使用 confluence 的平台,因此我认识到会有一些网络延迟,但没有达到我所看到的程度。

package main

import (
    "context"
    "crypto/tls"
    "crypto/x509"
    "fmt"
    "log"
    "net"
    "strconv"
    "time"

    kafka "github.com/segmentio/kafka-go"
    "github.com/segmentio/kafka-go/sasl/plain"
)

func newdialer(clientid, username, password string) *kafka.dialer {
    mechanism := plain.mechanism{
        username: username,
        password: password,
    }

    rootcas, _ := x509.systemcertpool()
    if rootcas == nil {
        rootcas = x509.newcertpool()
    }

    return &kafka.dialer{
        timeout:       10 * time.second,
        dualstack:     true,
        clientid:      clientid,
        saslmechanism: mechanism,
        tls: &tls.config{
            insecureskipverify: false,
            rootcas:            rootcas,
        },
    }
}

func createtopic(url string, topic string, dialer *kafka.dialer) {
    conn, err := dialer.dial("tcp", url)
    if err != nil {
        panic(err.error())
    }
    defer conn.close()

    controller, err := conn.controller()
    if err != nil {
        panic(err.error())
    }

    var controllerconn *kafka.conn
    controllerconn, err = dialer.dial("tcp", net.joinhostport(controller.host, strconv.itoa(controller.port)))
    if err != nil {
        panic(err.error())
    }
    defer controllerconn.close()

    topicconfigs := []kafka.topicconfig{
        {
            topic:             topic,
            numpartitions:     6,
            replicationfactor: 3,
        },
    }

    err = controllerconn.createtopics(topicconfigs...)
    if err != nil {
        panic(err.error())
    }

}

func newwriter(url string, topic string, dialer *kafka.dialer) *kafka.writer {
    return kafka.newwriter(kafka.writerconfig{
        brokers:      []string{url},
        topic:        topic,
        balancer:     &kafka.crc32balancer{},
        dialer:       dialer,
        batchsize:    10,
        batchtimeout: 1 * time.millisecond,
    })
}

func newreader(url string, topic string, partition int, dialer *kafka.dialer) *kafka.reader {

    return kafka.newreader(kafka.readerconfig{
        brokers:   []string{url},
        topic:     topic,
        dialer:    dialer,
        partition: partition,
    })
}

func read(url string, topic string, dialer *kafka.dialer, partition int) {

    reader := newreader(url, topic, partition, dialer)
    defer reader.close()
    for {
        msg, err := reader.readmessage(context.background())
        if err != nil {
            panic(err)
        }
        log.printf("rec%d:\t%s\n", partition, msg.value)
    }
}
func write(url string, topic string, dialer *kafka.dialer) {
    writer := newwriter(url, topic, dialer)
    defer writer.close()
    for i := 0; ; i++ {
        v := []byte("v" + strconv.itoa(i))
        log.printf("send:\t%s\n", v)
        msg := kafka.message{ key: v, value: v }
        err := writer.writemessages(context.background(), msg)
        if err != nil {
            fmt.println(err)
        }
        time.sleep(100 * time.millisecond)
    }
}

func main() {
    url := "_______.______.___.confluent.cloud:9092"
    topic := "test"
    username := "________________"
    password := "________________"
    clientid := "________________"
    dialer := newdialer(clientid, username, password)
    ctx := context.background()
    createtopic(url, topic, dialer)
    for i := 0; i < 6; i++ {
        go read(url, topic, dialer, i)
    }

    go write(url, topic, dialer)
    <-ctx.done()
}

正在记录以下内容。

2020/11/02 23:19:22 send:       v0
2020/11/02 23:19:23 send:       v1
2020/11/02 23:19:23 send:       v2
2020/11/02 23:19:23 send:       v3
2020/11/02 23:19:24 send:       v4
2020/11/02 23:19:24 send:       v5
2020/11/02 23:19:24 send:       v6
2020/11/02 23:19:25 send:       v7
2020/11/02 23:19:25 send:       v8
2020/11/02 23:19:25 send:       v9
2020/11/02 23:19:25 send:       v10
2020/11/02 23:19:26 send:       v11
2020/11/02 23:19:26 send:       v12
2020/11/02 23:19:26 send:       v13
2020/11/02 23:19:26 send:       v14
2020/11/02 23:19:26 send:       v15
2020/11/02 23:19:27 send:       v16
2020/11/02 23:19:27 send:       v17
2020/11/02 23:19:27 send:       v18
2020/11/02 23:19:27 send:       v19
2020/11/02 23:19:28 send:       v20
2020/11/02 23:19:29 send:       v21
2020/11/02 23:19:29 send:       v22
2020/11/02 23:19:29 send:       v23
2020/11/02 23:19:29 send:       v24
2020/11/02 23:19:29 send:       v25
2020/11/02 23:19:30 send:       v26
2020/11/02 23:19:30 send:       v27
2020/11/02 23:19:30 send:       v28
2020/11/02 23:19:30 send:       v29
2020/11/02 23:19:31 send:       v30
2020/11/02 23:19:31 send:       v31
2020/11/02 23:19:31 send:       v32
2020/11/02 23:19:32 send:       v33
2020/11/02 23:19:32 send:       v34
2020/11/02 23:19:32 rec3:       v8
2020/11/02 23:19:32 rec3:       v14
2020/11/02 23:19:32 rec3:       v15
2020/11/02 23:19:32 rec3:       v16
2020/11/02 23:19:32 rec3:       v17
2020/11/02 23:19:32 rec3:       v20
2020/11/02 23:19:32 rec3:       v21
2020/11/02 23:19:32 rec3:       v23
2020/11/02 23:19:32 rec3:       v29
2020/11/02 23:19:32 rec1:       v0
2020/11/02 23:19:32 rec1:       v9
2020/11/02 23:19:32 rec1:       v22
2020/11/02 23:19:32 rec1:       v28
2020/11/02 23:19:32 rec4:       v4
2020/11/02 23:19:32 rec4:       v5
2020/11/02 23:19:32 rec4:       v7
2020/11/02 23:19:32 rec4:       v10
2020/11/02 23:19:32 rec4:       v11
2020/11/02 23:19:32 rec4:       v12
2020/11/02 23:19:32 rec4:       v18
2020/11/02 23:19:32 rec4:       v24
2020/11/02 23:19:32 rec4:       v25
2020/11/02 23:19:32 rec4:       v30
2020/11/02 23:19:32 rec4:       v31
2020/11/02 23:19:32 send:       v35
2020/11/02 23:19:32 rec5:       v1
2020/11/02 23:19:32 rec5:       v2
2020/11/02 23:19:32 rec5:       v3
2020/11/02 23:19:32 rec5:       v34
2020/11/02 23:19:32 rec2:       v6
2020/11/02 23:19:32 rec2:       v13
2020/11/02 23:19:32 rec2:       v26
2020/11/02 23:19:32 rec2:       v33
2020/11/02 23:19:32 send:       v36
2020/11/02 23:19:33 send:       v37
2020/11/02 23:19:33 send:       v38
2020/11/02 23:19:33 send:       v39
2020/11/02 23:19:33 send:       v40
2020/11/02 23:19:33 send:       v41
2020/11/02 23:19:33 rec0:       v19
2020/11/02 23:19:33 rec0:       v27
2020/11/02 23:19:33 rec0:       v32
2020/11/02 23:19:34 send:       v42
2020/11/02 23:19:34 send:       v43
2020/11/02 23:19:34 send:       v44
2020/11/02 23:19:34 send:       v45
2020/11/02 23:19:34 send:       v46
2020/11/02 23:19:35 send:       v47
2020/11/02 23:19:35 send:       v48
2020/11/02 23:19:35 send:       v49
2020/11/02 23:19:35 send:       v50
2020/11/02 23:19:35 send:       v51
2020/11/02 23:19:35 send:       v52
2020/11/02 23:19:36 send:       v53
2020/11/02 23:19:36 send:       v54
2020/11/02 23:19:36 send:       v55
2020/11/02 23:19:36 send:       v56
2020/11/02 23:19:36 send:       v57
2020/11/02 23:19:37 send:       v58
2020/11/02 23:19:37 send:       v59
2020/11/02 23:19:37 send:       v60
2020/11/02 23:19:38 send:       v61
2020/11/02 23:19:38 send:       v62
2020/11/02 23:19:38 send:       v63
2020/11/02 23:19:38 send:       v64
2020/11/02 23:19:38 send:       v65
2020/11/02 23:19:39 send:       v66
2020/11/02 23:19:39 send:       v67
2020/11/02 23:19:39 send:       v68
2020/11/02 23:19:40 send:       v69
2020/11/02 23:19:40 send:       v70
2020/11/02 23:19:40 send:       v71
2020/11/02 23:19:40 send:       v72
2020/11/02 23:19:40 send:       v73
2020/11/02 23:19:40 send:       v74
2020/11/02 23:19:41 send:       v75
2020/11/02 23:19:41 send:       v76
2020/11/02 23:19:41 rec1:       v41
2020/11/02 23:19:41 rec1:       v56
2020/11/02 23:19:41 rec1:       v68
2020/11/02 23:19:41 rec1:       v74
2020/11/02 23:19:41 rec1:       v75
2020/11/02 23:19:41 rec1:       v76
2020/11/02 23:19:41 rec3:       v37
2020/11/02 23:19:41 rec3:       v40
2020/11/02 23:19:41 rec3:       v42
2020/11/02 23:19:41 rec3:       v48
2020/11/02 23:19:41 rec3:       v55
2020/11/02 23:19:41 rec3:       v57
2020/11/02 23:19:41 rec3:       v60
2020/11/02 23:19:41 rec3:       v61
2020/11/02 23:19:41 rec3:       v62
2020/11/02 23:19:41 send:       v77
2020/11/02 23:19:41 rec4:       v38
2020/11/02 23:19:41 rec4:       v39
2020/11/02 23:19:41 rec4:       v45
2020/11/02 23:19:41 rec4:       v46
2020/11/02 23:19:41 rec4:       v47
2020/11/02 23:19:41 rec4:       v53
2020/11/02 23:19:41 rec4:       v59
2020/11/02 23:19:41 rec4:       v70
2020/11/02 23:19:41 rec4:       v71
2020/11/02 23:19:41 rec4:       v73
2020/11/02 23:19:41 rec5:       v35
2020/11/02 23:19:41 rec5:       v36
2020/11/02 23:19:41 rec5:       v43
2020/11/02 23:19:41 rec5:       v49
2020/11/02 23:19:41 rec5:       v54
2020/11/02 23:19:41 rec5:       v63
2020/11/02 23:19:41 rec5:       v69
2020/11/02 23:19:41 rec5:       v77
2020/11/02 23:19:41 send:       v78
2020/11/02 23:19:41 rec2:       v44
2020/11/02 23:19:41 rec2:       v50
2020/11/02 23:19:41 rec2:       v51
2020/11/02 23:19:41 rec2:       v64
2020/11/02 23:19:41 rec2:       v65
2020/11/02 23:19:41 rec2:       v66
2020/11/02 23:19:41 rec2:       v72
2020/11/02 23:19:41 send:       v79
2020/11/02 23:19:42 send:       v80
2020/11/02 23:19:42 send:       v81
2020/11/02 23:19:42 send:       v82
2020/11/02 23:19:42 send:       v83
2020/11/02 23:19:42 send:       v84
2020/11/02 23:19:43 send:       v85
2020/11/02 23:19:43 rec0:       v52
2020/11/02 23:19:43 rec0:       v58
2020/11/02 23:19:43 rec0:       v67
2020/11/02 23:19:43 send:       v86

如有任何建议,我们将不胜感激。谢谢!

编辑:

缓冲肯定是在 kafka-go 中发生的。 sarama 没有遇到相同的行为:

package main

import (
    "context"
    "fmt"

    "github.com/shopify/sarama"

    "crypto/tls"
    "crypto/x509"

    "log"
    "strings"
    "time"
)

var (
    broker   = "___-_____.us-east1.gcp.confluent.cloud:9092"
    brokers  = []string{broker}
    clientid = "___________"
    username = "___________"
    password = "___________"
    topic    = "sarama"
)

func main() {

    log.printf("kafka brokers: %s", strings.join(brokers, ", "))
    ctx := context.background()
    sync := newsyncproducer()
    // accesslog := newasyncproducer()

    createtopic(topic)

    go func() {
        for i := 0; ; i++ {
            v := sarama.stringencoder(fmt.sprintf("v%d", i))

            p, o, err := sync.sendmessage(&sarama.producermessage{
                topic: topic,
                value: v,
            })
            if err != nil {
                panic(err)
            }
            fmt.printf("sent\t\t%v\tp: %d\toffset: %d\t\n", v, p, o)
            time.sleep(100 * time.millisecond)
        }
    }()
    ps := []sarama.partitionconsumer{}
    offset := int64(0)

    loop := func(msgs <-chan *sarama.consumermessage) {
        for msg := range msgs {
            fmt.printf("recv:\t\t%s\tp: %d\toffset: %d\n", msg.value, msg.partition, msg.offset)
        }
    }

    for i := 0; i < 6; i++ {
        ps = append(ps, createpartitionconsumer(topic, int32(i), offset))
    }

    for _, p := range ps {
        go loop(p.messages())
    }
    <-ctx.done()

}

func createpartitionconsumer(topic string, partition int32, offset int64) sarama.partitionconsumer {
    config := baseconfig()
    c, err := sarama.newconsumer(brokers, config)

    if err != nil {
        panic(err)
    }
    p, err := c.consumepartition(topic, partition, offset)
    if err != nil {
        panic(err)
    }
    return p
}

func createtopic(topic string) {
    config := baseconfig()
    admin, err := sarama.newclusteradmin(brokers, config)
    if err != nil {
        log.fatal("error while creating cluster admin: ", err.error())
    }
    defer func() { _ = admin.close() }()
    err = admin.createtopic(topic, &sarama.topicdetail{
        numpartitions:     6,
        replicationfactor: 3,
    }, false)
    if err != nil {
        log.println("error while creating topic: ", err.error())
    }
}

func baseconfig() *sarama.config {
    rootcas, _ := x509.systemcertpool()
    if rootcas == nil {
        rootcas = x509.newcertpool()
    }

    config := sarama.newconfig()
    config.version = sarama.maxversion
    config.net.tls.enable = true
    config.net.tls.config = &tls.config{
        rootcas:            rootcas,
        insecureskipverify: false,
    }

    config.clientid = clientid
    config.net.sasl.enable = true
    config.net.sasl.password = password
    config.net.sasl.user = username
    return config
}

func newsyncproducer() sarama.syncproducer {

    config := baseconfig()
    config.producer.requiredacks = sarama.waitforall
    config.producer.retry.max = 10
    config.producer.return.successes = true

    producer, err := sarama.newsyncproducer(brokers, config)
    if err != nil {
        log.fatalln("failed to start sarama producer:", err)
    }

    return producer
}

事实上,在某些情况下,它实际上在发送被确认之前就收到了,这让我想知道是否发生了内部消息传递,如果是这样,我是否应该关心......

recv:           V1176   p: 1    offset: 355
recv:           V1177   p: 2    offset: 363
send:           V1177   p: 2    offset: 363
send:           V1178   p: 5    offset: 377
recv:           V1178   p: 5    offset: 377
recv:           V1179   p: 1    offset: 356
send:           V1179   p: 1    offset: 356
send:           V1180   p: 1    offset: 357
recv:           V1180   p: 1    offset: 357
recv:           V1181   p: 1    offset: 358
send:           V1181   p: 1    offset: 358
send:           V1182   p: 4    offset: 393
recv:           V1182   p: 4    offset: 393
send:           V1183   p: 4    offset: 394
recv:           V1183   p: 4    offset: 394
send:           V1184   p: 3    offset: 358
recv:           V1184   p: 3    offset: 358
send:           V1185   p: 2    offset: 364
recv:           V1185   p: 2    offset: 364
send:           V1186   p: 3    offset: 359
recv:           V1186   p: 3    offset: 359
recv:           V1187   p: 3    offset: 360
send:           V1187   p: 3    offset: 360
send:           V1188   p: 5    offset: 378
recv:           V1188   p: 5    offset: 378
send:           V1189   p: 2    offset: 365
recv:           V1189   p: 2    offset: 365
recv:           V1190   p: 4    offset: 395
send:           V1190   p: 4    offset: 395
send:           V1191   p: 1    offset: 359
recv:           V1191   p: 1    offset: 359
send:           V1192   p: 4    offset: 396
recv:           V1192   p: 4    offset: 396
send:           V1193   p: 0    offset: 431
recv:           V1193   p: 0    offset: 431
send:           V1194   p: 4    offset: 397
recv:           V1194   p: 4    offset: 397
recv:           V1195   p: 2    offset: 366
send:           V1195   p: 2    offset: 366
send:           V1196   p: 3    offset: 361
recv:           V1196   p: 3    offset: 361

解决方案


您需要更改 readerconfig.minbytes,否则 segmentio/kafka-go 会将其设置为 1e6 = 1 mb,在这种情况下,kafka 将等待那么多数据积累后再响应请求。

func newReader(url string, topic string, partition int, dialer *kafka.Dialer) *kafka.Reader {

    return kafka.NewReader(kafka.ReaderConfig{
        Brokers:   []string{url},
        Topic:     topic,
        Dialer:    dialer,
        Partition: partition,
        MinBytes:  1,         // same value of Shopify/sarama 
        MaxBytes:  57671680,
    })
}

另一方面,shopify/sarama 的默认值是 1 个字节。

参考文献:

到这里,我们也就讲完了《使用 Kafka-Go,为什么我会看到似乎正在批量读取/写入的内容?有我缺少的配置吗?》的内容了。个人认为,基础知识的学习和巩固,是为了更好的将其运用到项目中,欢迎关注golang学习网公众号,带你了解更多关于的知识点!

声明:本文转载于:stackoverflow 如有侵犯,请联系study_golang@163.com删除
相关阅读
更多>
最新阅读
更多>
课程推荐
更多>