登录
首页 >  Golang >  Go问答

竞争条件下的 Uber-go/zap 和 kafka-go

来源:stackoverflow

时间:2024-02-18 11:18:22 398浏览 收藏

IT行业相对于一般传统行业,发展更新速度更快,一旦停止了学习,很快就会被行业所淘汰。所以我们需要踏踏实实的不断学习,精进自己的技术,尤其是初学者。今天golang学习网给大家整理了《竞争条件下的 Uber-go/zap 和 kafka-go》,聊聊,我们一起来看看吧!

问题内容

我正在创建一个自定义记录器,我们可以在其中登录到 std out 和 std err,而且还添加了登录到 kafka 的可能性(代码示例位于:https://github.com/roppa/kafka-go )。我们有多个主题,因此需要多个记录器,但是当我们使用多个记录器时,就会发生一些奇怪的事情。当两个 kafka-go 设置都是异步时,我不会收到任何消费者消息,当一个是异步而另一个是同步时,我们会得到如下内容:

//consumer topica
{"level":"\u001b[34minfo\u001b[0m","timestamp":"2020-12-09t15:31:04.023z","msg":"topic-a log 1","uid":"abc123","ns":"test-service"}

{"level":"\u001b[34minfo\u001b[0m","timestamp":"2020-12-09t15:31:05.078z","msg":"topic-a log 2","uid":"abc123","ns":"test-service"}

{"level":"\u001b[34minfo\u001b[0m","timestamp":"2020-12-09t15:31:06.085z","msg":"topic-a log 3","uid":"abc123","ns":"test-service"}

//consumer topicb
2020-12-09t15:31:06.085z    info    topic-a log 3   {"uid": "abc123", "ns": "test-service"}
2","uid":"abc123","ns":"test-service"}

更改同步会产生完全不同的效果。我对 go 还很陌生。

这是 main.go:

package main

import (
    "context"
    "kafka-log/logger"
)

func main() {
    loggera := logger.init("test-service", "localhost:9092", "topica", false, false)
    loggerb := logger.init("test-service", "localhost:9092", "topicb", false, true)

    ctx := context.background()
    ctx2 := context.withvalue(ctx, logger.uid, "abc123")

    loggera.cinfo(ctx2, "topic-a log 1")
    loggerb.cinfo(ctx2, "topic-b log 1")

    loggera.cinfo(ctx2, "topic-a log 2")
    loggerb.cinfo(ctx2, "topic-b log 2")

    loggera.cinfo(ctx2, "topic-a log 3")
    loggerb.cinfo(ctx2, "topic-b log 3")
}

这是 logger/logger.go:

package logger

import (
    "context"
    "os"

    "github.com/segmentio/kafka-go"
    "go.uber.org/zap"
    "go.uber.org/zap/zapcore"
)

type (
    key string

    // logger type embeds zap and also contains the current system name (namespace, ns)
    logger struct {
        *zap.logger
        ns string
    }

    // kconfig type for creating a new kafka logger. takes a namespace,
    // broker (eg 'localhost:9092'), topic (eg 'topic-a')
    kconfig struct {
        namespace string
        broker    string
        topic     string
        async     bool
    }

    producerinterface interface {
        writemessages(ctx context.context, msgs ...kafka.message) error
    }

    // kafkaproducer contains a kafka.producer and kafka topic
    kafkaproducer struct {
        producer producerinterface
        topic    string
    }
)

const (
    // uid - uniquely request identifier
    uid key = "request_id"
)

var customconfig = zapcore.encoderconfig{
    timekey:        "timestamp",
    levelkey:       "level",
    namekey:        "logger",
    callerkey:      "caller",
    functionkey:    zapcore.omitkey,
    messagekey:     "msg",
    stacktracekey:  "stacktrace",
    lineending:     zapcore.defaultlineending,
    encodelevel:    zapcore.capitalcolorlevelencoder,
    encodetime:     zapcore.iso8601timeencoder,
    encodeduration: zapcore.secondsdurationencoder,
}

// cinfo this function takes a context as first parameter, extracts specific fields as well as namespace, and calls zap info
func (l *logger) cinfo(ctx context.context, msg string, fields ...zap.field) {
    l.info(msg, consolidate(ctx, l.ns, fields...)...)
}

func consolidate(ctx context.context, namespace string, fields ...zap.field) []zap.field {
    return append(append(ctxtozapfields(ctx), fields...), zap.string("ns", namespace))
}

// see advanced config example: https://github.com/uber-go/zap/blob/master/example_test.go#l105
var lowpriority = zap.levelenablerfunc(func(lvl zapcore.level) bool {
    return lvl < zapcore.errorlevel && lvl > zapcore.debuglevel
})
var debugpriority = zap.levelenablerfunc(func(lvl zapcore.level) bool {
    return lvl < zapcore.errorlevel
})
var kafkapriority = zap.levelenablerfunc(func(lvl zapcore.level) bool {
    return lvl > zapcore.debuglevel
})

// init creates a new instance of a logger. namespace is the name of the module using the logger. broker and topic are kafa specific,
// if either of these is not set a default console logger is created.
func init(namespace, broker, topic string, debug, async bool) *logger {
    var kp *kafkaproducer = nil
    if broker != "" && topic != "" {
        kp = newkafkaproducer(&kconfig{
        broker: broker,
        topic:  topic,
        async:  async,
    })
    }
    logger := getlogger(debug, kp)
    // logger.info("initiated logger", zap.string("ns", namespace), zap.bool("kafka", kp != nil), zap.bool("debug", debug))
    return &logger{logger, namespace}
}

func getlogger(debug bool, kp *kafkaproducer) *zap.logger {
    // cores are logger interfaces
    var cores []zapcore.core

    // optimise message for console output (human readable)
    consoleencoder := zapcore.newconsoleencoder(customconfig)
    // lock wraps a writesyncer in a mutex to make it safe for concurrent use.
    // see https://godoc.org/go.uber.org/zap/zapcore
    cores = append(cores,
        zapcore.newcore(consoleencoder, zapcore.lock(os.stdout), getpriority(debug)),
        zapcore.newcore(consoleencoder, zapcore.lock(os.stderr), zap.errorlevel),
    )

    if kp != nil {
        cores = append(cores, zapcore.newcore(zapcore.newjsonencoder(customconfig), zapcore.lock(zapcore.addsync(kp)), kafkapriority))
    }

    // join inputs, encoders, level-handling functions into cores, then "tee" together
    logger := zap.new(zapcore.newtee(cores...))
    defer logger.sync()
    return logger
}

func getpriority(debug bool) zap.levelenablerfunc {
    if debug {
        return debugpriority
    }
    return lowpriority
}

func ctxtozapfields(ctx context.context) []zap.field {
    reqid, _ := ctx.value(uid).(string)
    return []zap.field{
        zap.string("uid", reqid),
    }
}

// newkafkaproducer instantiates a kafka.producer, saves topic, and returns a kafkaproducer
func newkafkaproducer(c *kconfig) *kafkaproducer {
    return &kafkaproducer{
        producer: kafka.newwriter(kafka.writerconfig{
            brokers:      []string{c.broker},
            topic:        c.topic,
            balancer:     &kafka.hash{},
            async:        c.async,
            requiredacks: -1, // -1 = all
        }),
        topic: c.topic,
    }
}

// write takes a message as a byte slice, wraps in a kafka.message and calls kafka produce
func (kp *kafkaproducer) write(msg []byte) (int, error) {
    return len(msg), kp.producer.writemessages(context.background(), kafka.message{
        key:   []byte(""),
        value: msg,
    })
}

我正在为消费者使用这些:

docker exec kafka kafka-console-consumer --bootstrap-server localhost:9092 --topic topica

docker exec kafka kafka-console-consumer --bootstrap-server localhost:9092 --topic topicb

这是我的 kafka docker-compose:

version: '3.8'

services:
  
  zookeeper:
    image: confluentinc/cp-zookeeper
    networks:
      - kafka-net
    container_name: zookeeper
    environment:
        ZOOKEEPER_CLIENT_PORT: 2181
    ports:
        - 2181:2181

  kafka:
    image: confluentinc/cp-kafka
    networks:
      - kafka-net
    container_name: kafka
    environment:
        KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
        KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
        ALLOW_PLAINTEXT_LISTENER: "yes"
        KAFKA_LISTENERS-INTERNAL: //kafka:29092,EXTERNAL://localhost:9092
        KAFKA_ADVERTISED: INTERNAL://kafka:29092,EXTERNAL://localhost:9092
        KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:29092,EXTERNAL://localhost:9092
        KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
        KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
    ports:
        - 9092:9092
        - 29092:29092
    depends_on:
        - zookeeper
    restart: on-failure

networks:
  kafka-net:
    driver: bridge


解决方案


我想象你的程序在异步消息有时间发送之前就退出了(尽管如果我正确地阅读你的示例,我很奇怪“topic-a log 3”是唯一的日志消息)。与 javascript 不同,go 不会等待所有线程/goroutines 终止才退出。

还将突出显示 kafka-go 异步配置的文档字符串:

// Setting this flag to true causes the WriteMessages method to never block.
// It also means that errors are ignored since the caller will not receive
// the returned value. Use this only if you don't care about guarantees of
// whether the messages were written to kafka.

在解决方案方面:我认为您可以通过在编写器上调用 clos​​e 来解决此问题:

https://pkg.go.dev/github.com/segmentio/kafka-go#Writer.Close

您需要在退出之前显示底层 kafkaproducer.producer 并调用 kafkaproducer.producer.close

可能有更聪明的方法来构造清理,但我似乎找不到比仅在编写器上调用 close 更简单的方法来刷新待处理消息。

今天带大家了解了的相关知识,希望对你有所帮助;关于Golang的技术知识我们会一点点深入介绍,欢迎大家关注golang学习网公众号,一起学习编程~

声明:本文转载于:stackoverflow 如有侵犯,请联系study_golang@163.com删除
相关阅读
更多>
最新阅读
更多>
课程推荐
更多>