使用 Kafka-Go,为什么我会看到似乎正在批量读取/写入的内容?有我缺少的配置吗?
来源:stackoverflow
时间:2024-04-17 21:27:31 201浏览 收藏
各位小伙伴们,大家好呀!看看今天我又给各位带来了什么文章?本文标题是《使用 Kafka-Go,为什么我会看到似乎正在批量读取/写入的内容?有我缺少的配置吗?》,很明显是关于Golang的文章哈哈哈,其中内容主要会涉及到等等,如果能帮到你,觉得很不错的话,欢迎各位多多点评和分享!
我将从 rabbitmq 切换到 kafka。这只是一个简单的秒杀,看看 kafka 是如何运作的。我不确定是否缺少某些设置,是否是我的代码,是否是 kafka-go,或者这是否是预期的 kafka 行为。
我尝试调整 batchsize
以及 batchtimeout
但都没有产生影响。
下面的代码创建一个具有 6 个分区且复制因子为 3 的主题。然后,它每隔 100ms
生成一条递增消息。它启动 6 个消费者,每个分区一个。读取和写入都是在 go 例程中执行的。
在下面的日志中,它在 7 秒内没有收到消息,然后收到突发消息。我正在使用 confluence 的平台,因此我认识到会有一些网络延迟,但没有达到我所看到的程度。
package main import ( "context" "crypto/tls" "crypto/x509" "fmt" "log" "net" "strconv" "time" kafka "github.com/segmentio/kafka-go" "github.com/segmentio/kafka-go/sasl/plain" ) func newdialer(clientid, username, password string) *kafka.dialer { mechanism := plain.mechanism{ username: username, password: password, } rootcas, _ := x509.systemcertpool() if rootcas == nil { rootcas = x509.newcertpool() } return &kafka.dialer{ timeout: 10 * time.second, dualstack: true, clientid: clientid, saslmechanism: mechanism, tls: &tls.config{ insecureskipverify: false, rootcas: rootcas, }, } } func createtopic(url string, topic string, dialer *kafka.dialer) { conn, err := dialer.dial("tcp", url) if err != nil { panic(err.error()) } defer conn.close() controller, err := conn.controller() if err != nil { panic(err.error()) } var controllerconn *kafka.conn controllerconn, err = dialer.dial("tcp", net.joinhostport(controller.host, strconv.itoa(controller.port))) if err != nil { panic(err.error()) } defer controllerconn.close() topicconfigs := []kafka.topicconfig{ { topic: topic, numpartitions: 6, replicationfactor: 3, }, } err = controllerconn.createtopics(topicconfigs...) if err != nil { panic(err.error()) } } func newwriter(url string, topic string, dialer *kafka.dialer) *kafka.writer { return kafka.newwriter(kafka.writerconfig{ brokers: []string{url}, topic: topic, balancer: &kafka.crc32balancer{}, dialer: dialer, batchsize: 10, batchtimeout: 1 * time.millisecond, }) } func newreader(url string, topic string, partition int, dialer *kafka.dialer) *kafka.reader { return kafka.newreader(kafka.readerconfig{ brokers: []string{url}, topic: topic, dialer: dialer, partition: partition, }) } func read(url string, topic string, dialer *kafka.dialer, partition int) { reader := newreader(url, topic, partition, dialer) defer reader.close() for { msg, err := reader.readmessage(context.background()) if err != nil { panic(err) } log.printf("rec%d:\t%s\n", partition, msg.value) } } func write(url string, topic string, dialer *kafka.dialer) { writer := newwriter(url, topic, dialer) defer writer.close() for i := 0; ; i++ { v := []byte("v" + strconv.itoa(i)) log.printf("send:\t%s\n", v) msg := kafka.message{ key: v, value: v } err := writer.writemessages(context.background(), msg) if err != nil { fmt.println(err) } time.sleep(100 * time.millisecond) } } func main() { url := "_______.______.___.confluent.cloud:9092" topic := "test" username := "________________" password := "________________" clientid := "________________" dialer := newdialer(clientid, username, password) ctx := context.background() createtopic(url, topic, dialer) for i := 0; i < 6; i++ { go read(url, topic, dialer, i) } go write(url, topic, dialer) <-ctx.done() }
正在记录以下内容。
2020/11/02 23:19:22 send: v0 2020/11/02 23:19:23 send: v1 2020/11/02 23:19:23 send: v2 2020/11/02 23:19:23 send: v3 2020/11/02 23:19:24 send: v4 2020/11/02 23:19:24 send: v5 2020/11/02 23:19:24 send: v6 2020/11/02 23:19:25 send: v7 2020/11/02 23:19:25 send: v8 2020/11/02 23:19:25 send: v9 2020/11/02 23:19:25 send: v10 2020/11/02 23:19:26 send: v11 2020/11/02 23:19:26 send: v12 2020/11/02 23:19:26 send: v13 2020/11/02 23:19:26 send: v14 2020/11/02 23:19:26 send: v15 2020/11/02 23:19:27 send: v16 2020/11/02 23:19:27 send: v17 2020/11/02 23:19:27 send: v18 2020/11/02 23:19:27 send: v19 2020/11/02 23:19:28 send: v20 2020/11/02 23:19:29 send: v21 2020/11/02 23:19:29 send: v22 2020/11/02 23:19:29 send: v23 2020/11/02 23:19:29 send: v24 2020/11/02 23:19:29 send: v25 2020/11/02 23:19:30 send: v26 2020/11/02 23:19:30 send: v27 2020/11/02 23:19:30 send: v28 2020/11/02 23:19:30 send: v29 2020/11/02 23:19:31 send: v30 2020/11/02 23:19:31 send: v31 2020/11/02 23:19:31 send: v32 2020/11/02 23:19:32 send: v33 2020/11/02 23:19:32 send: v34 2020/11/02 23:19:32 rec3: v8 2020/11/02 23:19:32 rec3: v14 2020/11/02 23:19:32 rec3: v15 2020/11/02 23:19:32 rec3: v16 2020/11/02 23:19:32 rec3: v17 2020/11/02 23:19:32 rec3: v20 2020/11/02 23:19:32 rec3: v21 2020/11/02 23:19:32 rec3: v23 2020/11/02 23:19:32 rec3: v29 2020/11/02 23:19:32 rec1: v0 2020/11/02 23:19:32 rec1: v9 2020/11/02 23:19:32 rec1: v22 2020/11/02 23:19:32 rec1: v28 2020/11/02 23:19:32 rec4: v4 2020/11/02 23:19:32 rec4: v5 2020/11/02 23:19:32 rec4: v7 2020/11/02 23:19:32 rec4: v10 2020/11/02 23:19:32 rec4: v11 2020/11/02 23:19:32 rec4: v12 2020/11/02 23:19:32 rec4: v18 2020/11/02 23:19:32 rec4: v24 2020/11/02 23:19:32 rec4: v25 2020/11/02 23:19:32 rec4: v30 2020/11/02 23:19:32 rec4: v31 2020/11/02 23:19:32 send: v35 2020/11/02 23:19:32 rec5: v1 2020/11/02 23:19:32 rec5: v2 2020/11/02 23:19:32 rec5: v3 2020/11/02 23:19:32 rec5: v34 2020/11/02 23:19:32 rec2: v6 2020/11/02 23:19:32 rec2: v13 2020/11/02 23:19:32 rec2: v26 2020/11/02 23:19:32 rec2: v33 2020/11/02 23:19:32 send: v36 2020/11/02 23:19:33 send: v37 2020/11/02 23:19:33 send: v38 2020/11/02 23:19:33 send: v39 2020/11/02 23:19:33 send: v40 2020/11/02 23:19:33 send: v41 2020/11/02 23:19:33 rec0: v19 2020/11/02 23:19:33 rec0: v27 2020/11/02 23:19:33 rec0: v32 2020/11/02 23:19:34 send: v42 2020/11/02 23:19:34 send: v43 2020/11/02 23:19:34 send: v44 2020/11/02 23:19:34 send: v45 2020/11/02 23:19:34 send: v46 2020/11/02 23:19:35 send: v47 2020/11/02 23:19:35 send: v48 2020/11/02 23:19:35 send: v49 2020/11/02 23:19:35 send: v50 2020/11/02 23:19:35 send: v51 2020/11/02 23:19:35 send: v52 2020/11/02 23:19:36 send: v53 2020/11/02 23:19:36 send: v54 2020/11/02 23:19:36 send: v55 2020/11/02 23:19:36 send: v56 2020/11/02 23:19:36 send: v57 2020/11/02 23:19:37 send: v58 2020/11/02 23:19:37 send: v59 2020/11/02 23:19:37 send: v60 2020/11/02 23:19:38 send: v61 2020/11/02 23:19:38 send: v62 2020/11/02 23:19:38 send: v63 2020/11/02 23:19:38 send: v64 2020/11/02 23:19:38 send: v65 2020/11/02 23:19:39 send: v66 2020/11/02 23:19:39 send: v67 2020/11/02 23:19:39 send: v68 2020/11/02 23:19:40 send: v69 2020/11/02 23:19:40 send: v70 2020/11/02 23:19:40 send: v71 2020/11/02 23:19:40 send: v72 2020/11/02 23:19:40 send: v73 2020/11/02 23:19:40 send: v74 2020/11/02 23:19:41 send: v75 2020/11/02 23:19:41 send: v76 2020/11/02 23:19:41 rec1: v41 2020/11/02 23:19:41 rec1: v56 2020/11/02 23:19:41 rec1: v68 2020/11/02 23:19:41 rec1: v74 2020/11/02 23:19:41 rec1: v75 2020/11/02 23:19:41 rec1: v76 2020/11/02 23:19:41 rec3: v37 2020/11/02 23:19:41 rec3: v40 2020/11/02 23:19:41 rec3: v42 2020/11/02 23:19:41 rec3: v48 2020/11/02 23:19:41 rec3: v55 2020/11/02 23:19:41 rec3: v57 2020/11/02 23:19:41 rec3: v60 2020/11/02 23:19:41 rec3: v61 2020/11/02 23:19:41 rec3: v62 2020/11/02 23:19:41 send: v77 2020/11/02 23:19:41 rec4: v38 2020/11/02 23:19:41 rec4: v39 2020/11/02 23:19:41 rec4: v45 2020/11/02 23:19:41 rec4: v46 2020/11/02 23:19:41 rec4: v47 2020/11/02 23:19:41 rec4: v53 2020/11/02 23:19:41 rec4: v59 2020/11/02 23:19:41 rec4: v70 2020/11/02 23:19:41 rec4: v71 2020/11/02 23:19:41 rec4: v73 2020/11/02 23:19:41 rec5: v35 2020/11/02 23:19:41 rec5: v36 2020/11/02 23:19:41 rec5: v43 2020/11/02 23:19:41 rec5: v49 2020/11/02 23:19:41 rec5: v54 2020/11/02 23:19:41 rec5: v63 2020/11/02 23:19:41 rec5: v69 2020/11/02 23:19:41 rec5: v77 2020/11/02 23:19:41 send: v78 2020/11/02 23:19:41 rec2: v44 2020/11/02 23:19:41 rec2: v50 2020/11/02 23:19:41 rec2: v51 2020/11/02 23:19:41 rec2: v64 2020/11/02 23:19:41 rec2: v65 2020/11/02 23:19:41 rec2: v66 2020/11/02 23:19:41 rec2: v72 2020/11/02 23:19:41 send: v79 2020/11/02 23:19:42 send: v80 2020/11/02 23:19:42 send: v81 2020/11/02 23:19:42 send: v82 2020/11/02 23:19:42 send: v83 2020/11/02 23:19:42 send: v84 2020/11/02 23:19:43 send: v85 2020/11/02 23:19:43 rec0: v52 2020/11/02 23:19:43 rec0: v58 2020/11/02 23:19:43 rec0: v67 2020/11/02 23:19:43 send: v86
如有任何建议,我们将不胜感激。谢谢!
编辑:
缓冲肯定是在 kafka-go 中发生的。 sarama 没有遇到相同的行为:
package main import ( "context" "fmt" "github.com/shopify/sarama" "crypto/tls" "crypto/x509" "log" "strings" "time" ) var ( broker = "___-_____.us-east1.gcp.confluent.cloud:9092" brokers = []string{broker} clientid = "___________" username = "___________" password = "___________" topic = "sarama" ) func main() { log.printf("kafka brokers: %s", strings.join(brokers, ", ")) ctx := context.background() sync := newsyncproducer() // accesslog := newasyncproducer() createtopic(topic) go func() { for i := 0; ; i++ { v := sarama.stringencoder(fmt.sprintf("v%d", i)) p, o, err := sync.sendmessage(&sarama.producermessage{ topic: topic, value: v, }) if err != nil { panic(err) } fmt.printf("sent\t\t%v\tp: %d\toffset: %d\t\n", v, p, o) time.sleep(100 * time.millisecond) } }() ps := []sarama.partitionconsumer{} offset := int64(0) loop := func(msgs <-chan *sarama.consumermessage) { for msg := range msgs { fmt.printf("recv:\t\t%s\tp: %d\toffset: %d\n", msg.value, msg.partition, msg.offset) } } for i := 0; i < 6; i++ { ps = append(ps, createpartitionconsumer(topic, int32(i), offset)) } for _, p := range ps { go loop(p.messages()) } <-ctx.done() } func createpartitionconsumer(topic string, partition int32, offset int64) sarama.partitionconsumer { config := baseconfig() c, err := sarama.newconsumer(brokers, config) if err != nil { panic(err) } p, err := c.consumepartition(topic, partition, offset) if err != nil { panic(err) } return p } func createtopic(topic string) { config := baseconfig() admin, err := sarama.newclusteradmin(brokers, config) if err != nil { log.fatal("error while creating cluster admin: ", err.error()) } defer func() { _ = admin.close() }() err = admin.createtopic(topic, &sarama.topicdetail{ numpartitions: 6, replicationfactor: 3, }, false) if err != nil { log.println("error while creating topic: ", err.error()) } } func baseconfig() *sarama.config { rootcas, _ := x509.systemcertpool() if rootcas == nil { rootcas = x509.newcertpool() } config := sarama.newconfig() config.version = sarama.maxversion config.net.tls.enable = true config.net.tls.config = &tls.config{ rootcas: rootcas, insecureskipverify: false, } config.clientid = clientid config.net.sasl.enable = true config.net.sasl.password = password config.net.sasl.user = username return config } func newsyncproducer() sarama.syncproducer { config := baseconfig() config.producer.requiredacks = sarama.waitforall config.producer.retry.max = 10 config.producer.return.successes = true producer, err := sarama.newsyncproducer(brokers, config) if err != nil { log.fatalln("failed to start sarama producer:", err) } return producer }
事实上,在某些情况下,它实际上在发送被确认之前就收到了,这让我想知道是否发生了内部消息传递,如果是这样,我是否应该关心......
recv: V1176 p: 1 offset: 355 recv: V1177 p: 2 offset: 363 send: V1177 p: 2 offset: 363 send: V1178 p: 5 offset: 377 recv: V1178 p: 5 offset: 377 recv: V1179 p: 1 offset: 356 send: V1179 p: 1 offset: 356 send: V1180 p: 1 offset: 357 recv: V1180 p: 1 offset: 357 recv: V1181 p: 1 offset: 358 send: V1181 p: 1 offset: 358 send: V1182 p: 4 offset: 393 recv: V1182 p: 4 offset: 393 send: V1183 p: 4 offset: 394 recv: V1183 p: 4 offset: 394 send: V1184 p: 3 offset: 358 recv: V1184 p: 3 offset: 358 send: V1185 p: 2 offset: 364 recv: V1185 p: 2 offset: 364 send: V1186 p: 3 offset: 359 recv: V1186 p: 3 offset: 359 recv: V1187 p: 3 offset: 360 send: V1187 p: 3 offset: 360 send: V1188 p: 5 offset: 378 recv: V1188 p: 5 offset: 378 send: V1189 p: 2 offset: 365 recv: V1189 p: 2 offset: 365 recv: V1190 p: 4 offset: 395 send: V1190 p: 4 offset: 395 send: V1191 p: 1 offset: 359 recv: V1191 p: 1 offset: 359 send: V1192 p: 4 offset: 396 recv: V1192 p: 4 offset: 396 send: V1193 p: 0 offset: 431 recv: V1193 p: 0 offset: 431 send: V1194 p: 4 offset: 397 recv: V1194 p: 4 offset: 397 recv: V1195 p: 2 offset: 366 send: V1195 p: 2 offset: 366 send: V1196 p: 3 offset: 361 recv: V1196 p: 3 offset: 361
解决方案
您需要更改 readerconfig.minbytes
,否则 segmentio/kafka-go
会将其设置为 1e6 = 1 mb
,在这种情况下,kafka 将等待那么多数据积累后再响应请求。
func newReader(url string, topic string, partition int, dialer *kafka.Dialer) *kafka.Reader { return kafka.NewReader(kafka.ReaderConfig{ Brokers: []string{url}, Topic: topic, Dialer: dialer, Partition: partition, MinBytes: 1, // same value of Shopify/sarama MaxBytes: 57671680, }) }
另一方面,shopify/sarama
的默认值是 1 个字节。
参考文献:
到这里,我们也就讲完了《使用 Kafka-Go,为什么我会看到似乎正在批量读取/写入的内容?有我缺少的配置吗?》的内容了。个人认为,基础知识的学习和巩固,是为了更好的将其运用到项目中,欢迎关注golang学习网公众号,带你了解更多关于的知识点!
-
502 收藏
-
502 收藏
-
501 收藏
-
501 收藏
-
501 收藏
-
139 收藏
-
204 收藏
-
325 收藏
-
477 收藏
-
486 收藏
-
439 收藏
-
357 收藏
-
352 收藏
-
101 收藏
-
440 收藏
-
212 收藏
-
143 收藏
-
- 前端进阶之JavaScript设计模式
- 设计模式是开发人员在软件开发过程中面临一般问题时的解决方案,代表了最佳的实践。本课程的主打内容包括JS常见设计模式以及具体应用场景,打造一站式知识长龙服务,适合有JS基础的同学学习。
- 立即学习 542次学习
-
- GO语言核心编程课程
- 本课程采用真实案例,全面具体可落地,从理论到实践,一步一步将GO核心编程技术、编程思想、底层实现融会贯通,使学习者贴近时代脉搏,做IT互联网时代的弄潮儿。
- 立即学习 508次学习
-
- 简单聊聊mysql8与网络通信
- 如有问题加微信:Le-studyg;在课程中,我们将首先介绍MySQL8的新特性,包括性能优化、安全增强、新数据类型等,帮助学生快速熟悉MySQL8的最新功能。接着,我们将深入解析MySQL的网络通信机制,包括协议、连接管理、数据传输等,让
- 立即学习 497次学习
-
- JavaScript正则表达式基础与实战
- 在任何一门编程语言中,正则表达式,都是一项重要的知识,它提供了高效的字符串匹配与捕获机制,可以极大的简化程序设计。
- 立即学习 487次学习
-
- 从零制作响应式网站—Grid布局
- 本系列教程将展示从零制作一个假想的网络科技公司官网,分为导航,轮播,关于我们,成功案例,服务流程,团队介绍,数据部分,公司动态,底部信息等内容区块。网站整体采用CSSGrid布局,支持响应式,有流畅过渡和展现动画。
- 立即学习 484次学习