登录
首页 >  Golang >  Go问答

使用开源遥测实现异步 go 例程的跟踪

来源:stackoverflow

时间:2024-03-11 15:00:27 405浏览 收藏

珍惜时间,勤奋学习!今天给大家带来《使用开源遥测实现异步 go 例程的跟踪》,正文内容主要涉及到等等,如果你正在学习Golang,或者是对Golang有疑问,欢迎大家关注我!后面我会持续更新相关内容的,希望都能帮到正在学习的大家!

问题内容

我正在尝试使用开放遥测跟踪具有两个 go 例程的方法。第一个 go 例程从 kafka 读取并创建一个持久的作业(可能需要 1 秒到 1 分钟)。然后,第二个 go 例程监听已完成的作业。

进行跟踪的正确方法是什么,以便我们知道哪个作业结果(在第二个例程中)对应于哪个 kafka 消息(来自第一个例程)?

我的猜测是,在 go 例程中创建的两个 span 必须通过相同的 traceid 链接。

func startListening(ctx context.Context) {
  // initialise kafka client

  go kafkaConsumeMessages(ctx)
  go waitForJob(ctx)
}

func kafkaConsumeMessages(ctx) {
  for message := range kafkaEvents {
    // process message, create long job
    // create span here with traceID?
  }  

func waitForJobs(ctx) {
  for results := range finishedJobs
    // process result
    // create span here with traceID?
  }
}

任何建议都将受到高度赞赏!


正确答案


答案实际上比我想象的要容易。您需要进一步传递附加到该长作业的跟踪信息,然后在处理完成的作业时对其进行解码。

就我而言,因为我使用的是文本 traceparent 标头,因此使用 propagation.tracecontext{} 实现 propagation.textmappropagator,所以我决定发送整个 traceparent 标头(尽管我可能需要对 tracestate 执行相同的操作)弯弯弯弯),然后在处理完成的作业时使用 extract 方法对标头进行解码。但为了使用extract方法,您需要实现propagation.textmapcarrier接口。

func startlistening(ctx context.context) {
  // initialise kafka client

  go kafkaconsumemessages(ctx)
  go waitforjob(ctx)
}

func kafkaconsumemessages(ctx) {
  for msg := range kafkaevents {
    // extract incoming tracing info from traceparent header. example at https://github.com/open-telemetry/opentelemetry-go-contrib/blob/main/instrumentation/github.com/shopify/sarama/otelsarama/example/consumer/consumer.go#l84
    ctx := otel.gettextmappropagator().extract(context.background(), otelsarama.newconsumermessagecarrier(msg))

    // create span 
    tr := otel.tracer("consumer")
    _, span := tr.start(ctx, "consume message", trace.withattributes(
        semconv.messagingoperationprocess,
    ))
    defer span.end()
   
    // get just the traceparent header
    carrier := otelsarama.newconsumermessagecarrier(&msg)
    traceparentheader := carrier.get("traceparent")

    // process message, create long job and attach the header
    jobs.enqueue{traceparentheader: traceparentheader}
  }  

func waitforjobs(ctx) {
  for result := range finishedjobs {
    ctx = otel.gettextmappropagator().extract(ctx, models.pseudocarrier{s: result.traceparentheader})
    ctx, span := tr.start(ctx, "process result", trace.withattributes(
        attribute.string("jobname", result.jobname),
    ))
    defer span.end()
 
    // do more work 
  }
}
// PseudoCarrier implements the propagation.TextMapCarrier interface so we can use the propagation.Extract method when parsing the traceparent header
type PseudoCarrier struct {
    S string
}

func (c PseudoCarrier) Get(_ string) string {
    return c.S
}

func (c PseudoCarrier) Set(string, string) {}

func (c PseudoCarrier) Keys() []string {
    return []string{"traceparent"}
}

今天关于《使用开源遥测实现异步 go 例程的跟踪》的内容就介绍到这里了,是不是学起来一目了然!想要了解更多关于的内容请关注golang学习网公众号!

声明:本文转载于:stackoverflow 如有侵犯,请联系study_golang@163.com删除
相关阅读
更多>
最新阅读
更多>
课程推荐
更多>