使用 Kafka-Go,为什么我会看到似乎正在批量读取/写入的内容?有我缺少的配置吗?
来源:stackoverflow
时间:2024-04-17 21:27:31 201浏览 收藏
各位小伙伴们,大家好呀!看看今天我又给各位带来了什么文章?本文标题是《使用 Kafka-Go,为什么我会看到似乎正在批量读取/写入的内容?有我缺少的配置吗?》,很明显是关于Golang的文章哈哈哈,其中内容主要会涉及到等等,如果能帮到你,觉得很不错的话,欢迎各位多多点评和分享!
我将从 rabbitmq 切换到 kafka。这只是一个简单的秒杀,看看 kafka 是如何运作的。我不确定是否缺少某些设置,是否是我的代码,是否是 kafka-go,或者这是否是预期的 kafka 行为。
我尝试调整 batchsize 以及 batchtimeout 但都没有产生影响。
下面的代码创建一个具有 6 个分区且复制因子为 3 的主题。然后,它每隔 100ms 生成一条递增消息。它启动 6 个消费者,每个分区一个。读取和写入都是在 go 例程中执行的。
在下面的日志中,它在 7 秒内没有收到消息,然后收到突发消息。我正在使用 confluence 的平台,因此我认识到会有一些网络延迟,但没有达到我所看到的程度。
package main
import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"log"
"net"
"strconv"
"time"
kafka "github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl/plain"
)
func newdialer(clientid, username, password string) *kafka.dialer {
mechanism := plain.mechanism{
username: username,
password: password,
}
rootcas, _ := x509.systemcertpool()
if rootcas == nil {
rootcas = x509.newcertpool()
}
return &kafka.dialer{
timeout: 10 * time.second,
dualstack: true,
clientid: clientid,
saslmechanism: mechanism,
tls: &tls.config{
insecureskipverify: false,
rootcas: rootcas,
},
}
}
func createtopic(url string, topic string, dialer *kafka.dialer) {
conn, err := dialer.dial("tcp", url)
if err != nil {
panic(err.error())
}
defer conn.close()
controller, err := conn.controller()
if err != nil {
panic(err.error())
}
var controllerconn *kafka.conn
controllerconn, err = dialer.dial("tcp", net.joinhostport(controller.host, strconv.itoa(controller.port)))
if err != nil {
panic(err.error())
}
defer controllerconn.close()
topicconfigs := []kafka.topicconfig{
{
topic: topic,
numpartitions: 6,
replicationfactor: 3,
},
}
err = controllerconn.createtopics(topicconfigs...)
if err != nil {
panic(err.error())
}
}
func newwriter(url string, topic string, dialer *kafka.dialer) *kafka.writer {
return kafka.newwriter(kafka.writerconfig{
brokers: []string{url},
topic: topic,
balancer: &kafka.crc32balancer{},
dialer: dialer,
batchsize: 10,
batchtimeout: 1 * time.millisecond,
})
}
func newreader(url string, topic string, partition int, dialer *kafka.dialer) *kafka.reader {
return kafka.newreader(kafka.readerconfig{
brokers: []string{url},
topic: topic,
dialer: dialer,
partition: partition,
})
}
func read(url string, topic string, dialer *kafka.dialer, partition int) {
reader := newreader(url, topic, partition, dialer)
defer reader.close()
for {
msg, err := reader.readmessage(context.background())
if err != nil {
panic(err)
}
log.printf("rec%d:\t%s\n", partition, msg.value)
}
}
func write(url string, topic string, dialer *kafka.dialer) {
writer := newwriter(url, topic, dialer)
defer writer.close()
for i := 0; ; i++ {
v := []byte("v" + strconv.itoa(i))
log.printf("send:\t%s\n", v)
msg := kafka.message{ key: v, value: v }
err := writer.writemessages(context.background(), msg)
if err != nil {
fmt.println(err)
}
time.sleep(100 * time.millisecond)
}
}
func main() {
url := "_______.______.___.confluent.cloud:9092"
topic := "test"
username := "________________"
password := "________________"
clientid := "________________"
dialer := newdialer(clientid, username, password)
ctx := context.background()
createtopic(url, topic, dialer)
for i := 0; i < 6; i++ {
go read(url, topic, dialer, i)
}
go write(url, topic, dialer)
<-ctx.done()
}
正在记录以下内容。
2020/11/02 23:19:22 send: v0 2020/11/02 23:19:23 send: v1 2020/11/02 23:19:23 send: v2 2020/11/02 23:19:23 send: v3 2020/11/02 23:19:24 send: v4 2020/11/02 23:19:24 send: v5 2020/11/02 23:19:24 send: v6 2020/11/02 23:19:25 send: v7 2020/11/02 23:19:25 send: v8 2020/11/02 23:19:25 send: v9 2020/11/02 23:19:25 send: v10 2020/11/02 23:19:26 send: v11 2020/11/02 23:19:26 send: v12 2020/11/02 23:19:26 send: v13 2020/11/02 23:19:26 send: v14 2020/11/02 23:19:26 send: v15 2020/11/02 23:19:27 send: v16 2020/11/02 23:19:27 send: v17 2020/11/02 23:19:27 send: v18 2020/11/02 23:19:27 send: v19 2020/11/02 23:19:28 send: v20 2020/11/02 23:19:29 send: v21 2020/11/02 23:19:29 send: v22 2020/11/02 23:19:29 send: v23 2020/11/02 23:19:29 send: v24 2020/11/02 23:19:29 send: v25 2020/11/02 23:19:30 send: v26 2020/11/02 23:19:30 send: v27 2020/11/02 23:19:30 send: v28 2020/11/02 23:19:30 send: v29 2020/11/02 23:19:31 send: v30 2020/11/02 23:19:31 send: v31 2020/11/02 23:19:31 send: v32 2020/11/02 23:19:32 send: v33 2020/11/02 23:19:32 send: v34 2020/11/02 23:19:32 rec3: v8 2020/11/02 23:19:32 rec3: v14 2020/11/02 23:19:32 rec3: v15 2020/11/02 23:19:32 rec3: v16 2020/11/02 23:19:32 rec3: v17 2020/11/02 23:19:32 rec3: v20 2020/11/02 23:19:32 rec3: v21 2020/11/02 23:19:32 rec3: v23 2020/11/02 23:19:32 rec3: v29 2020/11/02 23:19:32 rec1: v0 2020/11/02 23:19:32 rec1: v9 2020/11/02 23:19:32 rec1: v22 2020/11/02 23:19:32 rec1: v28 2020/11/02 23:19:32 rec4: v4 2020/11/02 23:19:32 rec4: v5 2020/11/02 23:19:32 rec4: v7 2020/11/02 23:19:32 rec4: v10 2020/11/02 23:19:32 rec4: v11 2020/11/02 23:19:32 rec4: v12 2020/11/02 23:19:32 rec4: v18 2020/11/02 23:19:32 rec4: v24 2020/11/02 23:19:32 rec4: v25 2020/11/02 23:19:32 rec4: v30 2020/11/02 23:19:32 rec4: v31 2020/11/02 23:19:32 send: v35 2020/11/02 23:19:32 rec5: v1 2020/11/02 23:19:32 rec5: v2 2020/11/02 23:19:32 rec5: v3 2020/11/02 23:19:32 rec5: v34 2020/11/02 23:19:32 rec2: v6 2020/11/02 23:19:32 rec2: v13 2020/11/02 23:19:32 rec2: v26 2020/11/02 23:19:32 rec2: v33 2020/11/02 23:19:32 send: v36 2020/11/02 23:19:33 send: v37 2020/11/02 23:19:33 send: v38 2020/11/02 23:19:33 send: v39 2020/11/02 23:19:33 send: v40 2020/11/02 23:19:33 send: v41 2020/11/02 23:19:33 rec0: v19 2020/11/02 23:19:33 rec0: v27 2020/11/02 23:19:33 rec0: v32 2020/11/02 23:19:34 send: v42 2020/11/02 23:19:34 send: v43 2020/11/02 23:19:34 send: v44 2020/11/02 23:19:34 send: v45 2020/11/02 23:19:34 send: v46 2020/11/02 23:19:35 send: v47 2020/11/02 23:19:35 send: v48 2020/11/02 23:19:35 send: v49 2020/11/02 23:19:35 send: v50 2020/11/02 23:19:35 send: v51 2020/11/02 23:19:35 send: v52 2020/11/02 23:19:36 send: v53 2020/11/02 23:19:36 send: v54 2020/11/02 23:19:36 send: v55 2020/11/02 23:19:36 send: v56 2020/11/02 23:19:36 send: v57 2020/11/02 23:19:37 send: v58 2020/11/02 23:19:37 send: v59 2020/11/02 23:19:37 send: v60 2020/11/02 23:19:38 send: v61 2020/11/02 23:19:38 send: v62 2020/11/02 23:19:38 send: v63 2020/11/02 23:19:38 send: v64 2020/11/02 23:19:38 send: v65 2020/11/02 23:19:39 send: v66 2020/11/02 23:19:39 send: v67 2020/11/02 23:19:39 send: v68 2020/11/02 23:19:40 send: v69 2020/11/02 23:19:40 send: v70 2020/11/02 23:19:40 send: v71 2020/11/02 23:19:40 send: v72 2020/11/02 23:19:40 send: v73 2020/11/02 23:19:40 send: v74 2020/11/02 23:19:41 send: v75 2020/11/02 23:19:41 send: v76 2020/11/02 23:19:41 rec1: v41 2020/11/02 23:19:41 rec1: v56 2020/11/02 23:19:41 rec1: v68 2020/11/02 23:19:41 rec1: v74 2020/11/02 23:19:41 rec1: v75 2020/11/02 23:19:41 rec1: v76 2020/11/02 23:19:41 rec3: v37 2020/11/02 23:19:41 rec3: v40 2020/11/02 23:19:41 rec3: v42 2020/11/02 23:19:41 rec3: v48 2020/11/02 23:19:41 rec3: v55 2020/11/02 23:19:41 rec3: v57 2020/11/02 23:19:41 rec3: v60 2020/11/02 23:19:41 rec3: v61 2020/11/02 23:19:41 rec3: v62 2020/11/02 23:19:41 send: v77 2020/11/02 23:19:41 rec4: v38 2020/11/02 23:19:41 rec4: v39 2020/11/02 23:19:41 rec4: v45 2020/11/02 23:19:41 rec4: v46 2020/11/02 23:19:41 rec4: v47 2020/11/02 23:19:41 rec4: v53 2020/11/02 23:19:41 rec4: v59 2020/11/02 23:19:41 rec4: v70 2020/11/02 23:19:41 rec4: v71 2020/11/02 23:19:41 rec4: v73 2020/11/02 23:19:41 rec5: v35 2020/11/02 23:19:41 rec5: v36 2020/11/02 23:19:41 rec5: v43 2020/11/02 23:19:41 rec5: v49 2020/11/02 23:19:41 rec5: v54 2020/11/02 23:19:41 rec5: v63 2020/11/02 23:19:41 rec5: v69 2020/11/02 23:19:41 rec5: v77 2020/11/02 23:19:41 send: v78 2020/11/02 23:19:41 rec2: v44 2020/11/02 23:19:41 rec2: v50 2020/11/02 23:19:41 rec2: v51 2020/11/02 23:19:41 rec2: v64 2020/11/02 23:19:41 rec2: v65 2020/11/02 23:19:41 rec2: v66 2020/11/02 23:19:41 rec2: v72 2020/11/02 23:19:41 send: v79 2020/11/02 23:19:42 send: v80 2020/11/02 23:19:42 send: v81 2020/11/02 23:19:42 send: v82 2020/11/02 23:19:42 send: v83 2020/11/02 23:19:42 send: v84 2020/11/02 23:19:43 send: v85 2020/11/02 23:19:43 rec0: v52 2020/11/02 23:19:43 rec0: v58 2020/11/02 23:19:43 rec0: v67 2020/11/02 23:19:43 send: v86
如有任何建议,我们将不胜感激。谢谢!
编辑:
缓冲肯定是在 kafka-go 中发生的。 sarama 没有遇到相同的行为:
package main
import (
"context"
"fmt"
"github.com/shopify/sarama"
"crypto/tls"
"crypto/x509"
"log"
"strings"
"time"
)
var (
broker = "___-_____.us-east1.gcp.confluent.cloud:9092"
brokers = []string{broker}
clientid = "___________"
username = "___________"
password = "___________"
topic = "sarama"
)
func main() {
log.printf("kafka brokers: %s", strings.join(brokers, ", "))
ctx := context.background()
sync := newsyncproducer()
// accesslog := newasyncproducer()
createtopic(topic)
go func() {
for i := 0; ; i++ {
v := sarama.stringencoder(fmt.sprintf("v%d", i))
p, o, err := sync.sendmessage(&sarama.producermessage{
topic: topic,
value: v,
})
if err != nil {
panic(err)
}
fmt.printf("sent\t\t%v\tp: %d\toffset: %d\t\n", v, p, o)
time.sleep(100 * time.millisecond)
}
}()
ps := []sarama.partitionconsumer{}
offset := int64(0)
loop := func(msgs <-chan *sarama.consumermessage) {
for msg := range msgs {
fmt.printf("recv:\t\t%s\tp: %d\toffset: %d\n", msg.value, msg.partition, msg.offset)
}
}
for i := 0; i < 6; i++ {
ps = append(ps, createpartitionconsumer(topic, int32(i), offset))
}
for _, p := range ps {
go loop(p.messages())
}
<-ctx.done()
}
func createpartitionconsumer(topic string, partition int32, offset int64) sarama.partitionconsumer {
config := baseconfig()
c, err := sarama.newconsumer(brokers, config)
if err != nil {
panic(err)
}
p, err := c.consumepartition(topic, partition, offset)
if err != nil {
panic(err)
}
return p
}
func createtopic(topic string) {
config := baseconfig()
admin, err := sarama.newclusteradmin(brokers, config)
if err != nil {
log.fatal("error while creating cluster admin: ", err.error())
}
defer func() { _ = admin.close() }()
err = admin.createtopic(topic, &sarama.topicdetail{
numpartitions: 6,
replicationfactor: 3,
}, false)
if err != nil {
log.println("error while creating topic: ", err.error())
}
}
func baseconfig() *sarama.config {
rootcas, _ := x509.systemcertpool()
if rootcas == nil {
rootcas = x509.newcertpool()
}
config := sarama.newconfig()
config.version = sarama.maxversion
config.net.tls.enable = true
config.net.tls.config = &tls.config{
rootcas: rootcas,
insecureskipverify: false,
}
config.clientid = clientid
config.net.sasl.enable = true
config.net.sasl.password = password
config.net.sasl.user = username
return config
}
func newsyncproducer() sarama.syncproducer {
config := baseconfig()
config.producer.requiredacks = sarama.waitforall
config.producer.retry.max = 10
config.producer.return.successes = true
producer, err := sarama.newsyncproducer(brokers, config)
if err != nil {
log.fatalln("failed to start sarama producer:", err)
}
return producer
}
事实上,在某些情况下,它实际上在发送被确认之前就收到了,这让我想知道是否发生了内部消息传递,如果是这样,我是否应该关心......
recv: V1176 p: 1 offset: 355 recv: V1177 p: 2 offset: 363 send: V1177 p: 2 offset: 363 send: V1178 p: 5 offset: 377 recv: V1178 p: 5 offset: 377 recv: V1179 p: 1 offset: 356 send: V1179 p: 1 offset: 356 send: V1180 p: 1 offset: 357 recv: V1180 p: 1 offset: 357 recv: V1181 p: 1 offset: 358 send: V1181 p: 1 offset: 358 send: V1182 p: 4 offset: 393 recv: V1182 p: 4 offset: 393 send: V1183 p: 4 offset: 394 recv: V1183 p: 4 offset: 394 send: V1184 p: 3 offset: 358 recv: V1184 p: 3 offset: 358 send: V1185 p: 2 offset: 364 recv: V1185 p: 2 offset: 364 send: V1186 p: 3 offset: 359 recv: V1186 p: 3 offset: 359 recv: V1187 p: 3 offset: 360 send: V1187 p: 3 offset: 360 send: V1188 p: 5 offset: 378 recv: V1188 p: 5 offset: 378 send: V1189 p: 2 offset: 365 recv: V1189 p: 2 offset: 365 recv: V1190 p: 4 offset: 395 send: V1190 p: 4 offset: 395 send: V1191 p: 1 offset: 359 recv: V1191 p: 1 offset: 359 send: V1192 p: 4 offset: 396 recv: V1192 p: 4 offset: 396 send: V1193 p: 0 offset: 431 recv: V1193 p: 0 offset: 431 send: V1194 p: 4 offset: 397 recv: V1194 p: 4 offset: 397 recv: V1195 p: 2 offset: 366 send: V1195 p: 2 offset: 366 send: V1196 p: 3 offset: 361 recv: V1196 p: 3 offset: 361
解决方案
您需要更改 readerconfig.minbytes,否则 segmentio/kafka-go 会将其设置为 1e6 = 1 mb,在这种情况下,kafka 将等待那么多数据积累后再响应请求。
func newReader(url string, topic string, partition int, dialer *kafka.Dialer) *kafka.Reader {
return kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{url},
Topic: topic,
Dialer: dialer,
Partition: partition,
MinBytes: 1, // same value of Shopify/sarama
MaxBytes: 57671680,
})
}
另一方面,shopify/sarama 的默认值是 1 个字节。
参考文献:
到这里,我们也就讲完了《使用 Kafka-Go,为什么我会看到似乎正在批量读取/写入的内容?有我缺少的配置吗?》的内容了。个人认为,基础知识的学习和巩固,是为了更好的将其运用到项目中,欢迎关注golang学习网公众号,带你了解更多关于的知识点!
-
502 收藏
-
502 收藏
-
501 收藏
-
501 收藏
-
501 收藏
-
139 收藏
-
204 收藏
-
325 收藏
-
478 收藏
-
486 收藏
-
439 收藏
-
357 收藏
-
352 收藏
-
101 收藏
-
440 收藏
-
212 收藏
-
143 收藏
-
- 前端进阶之JavaScript设计模式
- 设计模式是开发人员在软件开发过程中面临一般问题时的解决方案,代表了最佳的实践。本课程的主打内容包括JS常见设计模式以及具体应用场景,打造一站式知识长龙服务,适合有JS基础的同学学习。
- 立即学习 543次学习
-
- GO语言核心编程课程
- 本课程采用真实案例,全面具体可落地,从理论到实践,一步一步将GO核心编程技术、编程思想、底层实现融会贯通,使学习者贴近时代脉搏,做IT互联网时代的弄潮儿。
- 立即学习 516次学习
-
- 简单聊聊mysql8与网络通信
- 如有问题加微信:Le-studyg;在课程中,我们将首先介绍MySQL8的新特性,包括性能优化、安全增强、新数据类型等,帮助学生快速熟悉MySQL8的最新功能。接着,我们将深入解析MySQL的网络通信机制,包括协议、连接管理、数据传输等,让
- 立即学习 500次学习
-
- JavaScript正则表达式基础与实战
- 在任何一门编程语言中,正则表达式,都是一项重要的知识,它提供了高效的字符串匹配与捕获机制,可以极大的简化程序设计。
- 立即学习 487次学习
-
- 从零制作响应式网站—Grid布局
- 本系列教程将展示从零制作一个假想的网络科技公司官网,分为导航,轮播,关于我们,成功案例,服务流程,团队介绍,数据部分,公司动态,底部信息等内容区块。网站整体采用CSSGrid布局,支持响应式,有流畅过渡和展现动画。
- 立即学习 485次学习