开务分布式数据库 Tracing(二)—— 源码解析
来源:SegmentFault
时间:2023-02-24 16:08:00 232浏览 收藏
本篇文章主要是结合我之前面试的各种经历和实战开发中遇到的问题解决经验整理的,希望这篇《开务分布式数据库 Tracing(二)—— 源码解析》对你有很大帮助!欢迎收藏,分享给更多的需要的朋友学习~
按照【开务数据库 Tracing(一)】介绍的使用 opentracing 要求,本文着重介绍开务数据库(原:云溪数据库) Tracing 模块中是如何实现 Span,SpanContexts 和 Tracer 的。
Part 1 - Tracing 模块调用关系
1.1 Traincg 模块包含的文件列表
Tracer.go :定义了opentracing 中的trace相关接口的实现。
Tracer_span.go :定义了opentracing中的span 相关操作的实现。
Tags.go :定义了 opentracing中关于tags的相关接口。
Shadow.go :不是opentracing中的概念,这里主要实现与zipkin的通信,用于tracing 信息推送到外部的zipkin中。
1.2 各个文件之间的调用关系
在 cluster_settings.go 中会创建 tracer,供全局使用,其他模块中使用这个 Tracer 实现 span 的创建和其他操作,例如设定 span 名称、设定 tag 、增加 log 等操作。
Part 2 - Opentracing
在开务数据库中的实现
以下是只是列出了部分接口实现,并非全部。
2.1 Span 接口实现:
GetContext 实现:API 用于获取 Span 中的 SpanContext,主要功能是先创建一个 map[string]string 类型的 baggageCopy, 将 span 中的 mu.Baggage 读出写入 baggageCopy,创建新的 spanContext,并且返回。
func (s *span) Context() opentracing.SpanContext { s.mu.Lock() defer s.mu.Unlock() baggageCopy := make(map[string]string, len(s.mu.Baggage)) for k, v := range s.mu.Baggage { baggageCopy[k] = v } sc := &spanContext{ spanMeta: s.spanMeta, Baggage: baggageCopy, } if s.shadowTr != nil { sc.shadowTr = s.shadowTr sc.shadowCtx = s.shadowSpan.Context() } if s.isRecording() { sc.recordingGroup = s.mu.recordingGroup sc.recordingType = s.mu.recordingType } return sc }
Finished 实现:API 用于结束一个 Span 的记录和追踪。
func (s *span) Finish() { s.FinishWithOptions(opentracing.FinishOptions{}) }
SetTag 实现:用于向指定的 Span 添加 Tag 信息。
func (s *span) SetTag(key string, value interface{}) opentracing.Span { return s.setTagInner(key, value, false /* locked */) }
Log 实现:用于向指定的 Span 添加 Log 信息。
func (s *span) LogKV(alternatingKeyValues ...interface{}) { fields, err := otlog.InterleavedKVToFields(alternatingKeyValues...) if err != nil { s.LogFields(otlog.Error(err), otlog.String("function", "LogKV")) return } s.LogFields(fields...) }
SetBaggageItem 实现:用于向指定的 Span 增加 Baggage 信息,主要是用于跨进程追踪使用。
func (s *span) SetBaggageItem(restrictedKey, value string) opentracing.Span { s.mu.Lock() defer s.mu.Unlock() return s.setBaggageItemLocked(restrictedKey, value) }
BaggageItem 实现:用于获取指定的 Baggage 信息。
func (s *span) BaggageItem(restrictedKey string) string { s.mu.Lock() defer s.mu.Unlock() return s.mu.Baggage[restrictedKey] }
SetOperationName 实现:用于设定 Span 的名称。
func (s *span) SetOperationName(operationName string) opentracing.Span { if s.shadowTr != nil { s.shadowSpan.SetOperationName(operationName) } s.operation = operationName return s }
Tracer 实现:用于获取 Span 属于哪个 Tracer。
// Tracer is part of the opentracing.Span interface. func (s *span) Tracer() opentracing.Tracer { return s.tracer }
2.2 SpanContext 接口实现:
ForeachBaggageItem 实现:用于遍历 spanContext 中的 baggage 信息。
func (sc *spanContext) ForeachBaggageItem(handler func(k, v string) bool) { for k, v := range sc.Baggage { if !handler(k, v) { break } } }
2.3 Tracer 接口实现:
Inject 实现:用于向 carrier 中注入 SpanContext 信息
// Inject is part of the opentracing.Tracer interface. func (t *Tracer) Inject( osc opentracing.SpanContext, format interface{}, carrier interface{}, ) error { …… // We only support the HTTPHeaders/TextMap format. if format != opentracing.HTTPHeaders && format != opentracing.TextMap { return opentracing.ErrUnsupportedFormat } mapWriter, ok := carrier.(opentracing.TextMapWriter) if !ok { return opentracing.ErrInvalidCarrier } sc, ok := osc.(*spanContext) if !ok { return opentracing.ErrInvalidSpanContext } mapWriter.Set(fieldNameTraceID, strconv.FormatUint(sc.TraceID, 16)) mapWriter.Set(fieldNameSpanID, strconv.FormatUint(sc.SpanID, 16)) for k, v := range sc.Baggage { mapWriter.Set(prefixBaggage+k, v) } …… return nil }
Extract 实现:用于从 carrier 中抽取出 SpanContext 信息。
func (t *Tracer) Extract(format interface{}, carrier interface{}) (opentracing.SpanContext, error) { // We only support the HTTPHeaders/TextMap format. if format != opentracing.HTTPHeaders && format != opentracing.TextMap { return noopSpanContext{}, opentracing.ErrUnsupportedFormat } mapReader, ok := carrier.(opentracing.TextMapReader) if !ok { return noopSpanContext{}, opentracing.ErrInvalidCarrier } var sc spanContext …… err := mapReader.ForeachKey(func(k, v string) error { switch k = strings.ToLower(k); k { case fieldNameTraceID: var err error sc.TraceID, err = strconv.ParseUint(v, 16, 64) if err != nil { return opentracing.ErrSpanContextCorrupted } case fieldNameSpanID: var err error sc.SpanID, err = strconv.ParseUint(v, 16, 64) if err != nil { return opentracing.ErrSpanContextCorrupted } case fieldNameShadowType: shadowType = v default: if strings.HasPrefix(k, prefixBaggage) { if sc.Baggage == nil { sc.Baggage = make(map[string]string) } sc.Baggage[strings.TrimPrefix(k, prefixBaggage)] = v } else if strings.HasPrefix(k, prefixShadow) { if shadowCarrier == nil { shadowCarrier = make(opentracing.TextMapCarrier) } // We build a shadow textmap with the original shadow keys. shadowCarrier.Set(strings.TrimPrefix(k, prefixShadow), v) } } return nil }) if err != nil { return noopSpanContext{}, err } if sc.TraceID == 0 && sc.SpanID == 0 { return noopSpanContext{}, nil } …… return &sc, nil }
StartSpan 接口实现:用于创建一个新的 Span,可根据传入不同 opts 来实现不同 Span 的初始化。
func (t *Tracer) StartSpan( operationName string, opts ...opentracing.StartSpanOption, ) opentracing.Span { // Fast paths to avoid the allocation of StartSpanOptions below when tracing // is disabled: if we have no options or a single SpanReference (the common // case) with a noop context, return a noop span now. if len(opts) == 1 { if o, ok := opts[0].(opentracing.SpanReference); ok { if IsNoopContext(o.ReferencedContext) { return &t.noopSpan } } } shadowTr := t.getShadowTracer() …… return s }
2.4 noop span 实现:
noop span 实现:使监控代码不依赖 Tracer 和 Span 的返回值,防止程序异常退出。
type noopSpan struct { tracer *Tracer } var _ opentracing.Span = &noopSpan{} func (n *noopSpan) Context() opentracing.SpanContext { return noopSpanContext{} } func (n *noopSpan) BaggageItem(key string) string { return "" } func (n *noopSpan) SetTag(key string, value interface{}) opentracing.Span { return n } func (n *noopSpan) Finish() {} func (n *noopSpan) FinishWithOptions(opts opentracing.FinishOptions) {} func (n *noopSpan) SetOperationName(operationName string) opentracing.Span { return n } func (n *noopSpan) Tracer() opentracing.Tracer { return n.tracer } func (n *noopSpan) LogFields(fields ...otlog.Field) {} func (n *noopSpan) LogKV(keyVals ...interface{}) {} func (n *noopSpan) LogEvent(event string) {} func (n *noopSpan) LogEventWithPayload(event string, payload interface{}) {} func (n *noopSpan) Log(data opentracing.LogData) {} func (n *noopSpan) SetBaggageItem(key, val string) opentracing.Span { if key == Snowball { panic("attempting to set Snowball on a noop span; use the Recordable option to StartSpan") } return n }
Part3 - 开务数据库中
Opentracing 简单使用示例
3.1 开启 Tracer Recording 测试
开务数据库中 开始创建的 span 均是 no operator span, 需要手动调用 StartRecording, 将 span 转换为可 record 状态,才能正常对 span 进行操作。
func TestTracerRecording(t *testing.T) { tr := NewTracer() noop1 := tr.StartSpan("noop") if _, noop := noop1.(*noopSpan); !noop { t.Error("expected noop span") } noop1.LogKV("hello", "void") noop2 := tr.StartSpan("noop2", opentracing.ChildOf(noop1.Context())) if _, noop := noop2.(*noopSpan); !noop { t.Error("expected noop child span") } noop2.Finish() noop1.Finish() s1 := tr.StartSpan("a", Recordable) if _, noop := s1.(*noopSpan); noop { t.Error("Recordable (but not recording) span should not be noop") } if !IsBlackHoleSpan(s1) { t.Error("Recordable span should be black hole") } // Unless recording is actually started, child spans are still noop. noop3 := tr.StartSpan("noop3", opentracing.ChildOf(s1.Context())) if _, noop := noop3.(*noopSpan); !noop { t.Error("expected noop child span") } noop3.Finish() s1.LogKV("x", 1) StartRecording(s1, SingleNodeRecording) s1.LogKV("x", 2) s2 := tr.StartSpan("b", opentracing.ChildOf(s1.Context())) if IsBlackHoleSpan(s2) { t.Error("recording span should not be black hole") } s2.LogKV("x", 3) if err := TestingCheckRecordedSpans(GetRecording(s1), ` span a: tags: unfinished= x: 2 span b: tags: unfinished= x: 3 `); err != nil { t.Fatal(err) } if err := TestingCheckRecordedSpans(GetRecording(s2), ` span b: tags: unfinished= x: 3 `); err != nil { t.Fatal(err) } s3 := tr.StartSpan("c", opentracing.FollowsFrom(s2.Context())) s3.LogKV("x", 4) s3.SetTag("tag", "val") s2.Finish() if err := TestingCheckRecordedSpans(GetRecording(s1), ` span a: tags: unfinished= x: 2 span b: x: 3 span c: tags: tag=val unfinished= x: 4 `); err != nil { t.Fatal(err) } s3.Finish() if err := TestingCheckRecordedSpans(GetRecording(s1), ` span a: tags: unfinished= x: 2 span b: x: 3 span c: tags: tag=val x: 4 `); err != nil { t.Fatal(err) } StopRecording(s1) s1.LogKV("x", 100) if err := TestingCheckRecordedSpans(GetRecording(s1), ``); err != nil { t.Fatal(err) } // The child span is still recording. s3.LogKV("x", 5) if err := TestingCheckRecordedSpans(GetRecording(s3), ` span c: tags: tag=val x: 4 x: 5 `); err != nil { t.Fatal(err) } s1.Finish() }
3.2 创建 childSpan 测试
测试 StartChildSpan,根据已有 span 创建出一个新的 span,为已有 span 的子 span。
func TestStartChildSpan(t *testing.T) { tr := NewTracer() sp1 := tr.StartSpan("parent", Recordable) StartRecording(sp1, SingleNodeRecording) sp2 := StartChildSpan("child", sp1, nil /* logTags */, false /*separateRecording*/) sp2.Finish() sp1.Finish() if err := TestingCheckRecordedSpans(GetRecording(sp1), ` span parent: span child: `); err != nil { t.Fatal(err) } sp1 = tr.StartSpan("parent", Recordable) StartRecording(sp1, SingleNodeRecording) sp2 = StartChildSpan("child", sp1, nil /* logTags */, true /*separateRecording*/) sp2.Finish() sp1.Finish() if err := TestingCheckRecordedSpans(GetRecording(sp1), ` span parent: `); err != nil { t.Fatal(err) } if err := TestingCheckRecordedSpans(GetRecording(sp2), ` span child: `); err != nil { t.Fatal(err) } sp1 = tr.StartSpan("parent", Recordable) StartRecording(sp1, SingleNodeRecording) sp2 = StartChildSpan( "child", sp1, logtags.SingleTagBuffer("key", "val"), false, /*separateRecording*/ ) sp2.Finish() sp1.Finish() if err := TestingCheckRecordedSpans(GetRecording(sp1), ` span parent: span child: tags: key=val `); err != nil { t.Fatal(err) } }
3.3 跨进程追踪测试
测试跨进程追踪功能,主要是测试 inject 接口和 extract 接口,Inject 用于向 carrier 中注入 SpanContext 信息,Extract 用于从 carrier 中抽取出 SpanContext 信息。
func TestTracerInjectExtract(t *testing.T) { tr := NewTracer() tr2 := NewTracer() // Verify that noop spans become noop spans on the remote side. noop1 := tr.StartSpan("noop") if _, noop := noop1.(*noopSpan); !noop { t.Fatalf("expected noop span: %+v", noop1) } carrier := make(opentracing.HTTPHeadersCarrier) if err := tr.Inject(noop1.Context(), opentracing.HTTPHeaders, carrier); err != nil { t.Fatal(err) } if len(carrier) != 0 { t.Errorf("noop span has carrier: %+v", carrier) } wireContext, err := tr2.Extract(opentracing.HTTPHeaders, carrier) if err != nil { t.Fatal(err) } if _, noopCtx := wireContext.(noopSpanContext); !noopCtx { t.Errorf("expected noop context: %v", wireContext) } noop2 := tr2.StartSpan("remote op", opentracing.FollowsFrom(wireContext)) if _, noop := noop2.(*noopSpan); !noop { t.Fatalf("expected noop span: %+v", noop2) } noop1.Finish() noop2.Finish() // Verify that snowball tracing is propagated and triggers recording on the // remote side. s1 := tr.StartSpan("a", Recordable) StartRecording(s1, SnowballRecording) carrier = make(opentracing.HTTPHeadersCarrier) if err := tr.Inject(s1.Context(), opentracing.HTTPHeaders, carrier); err != nil { t.Fatal(err) } wireContext, err = tr2.Extract(opentracing.HTTPHeaders, carrier) if err != nil { t.Fatal(err) } s2 := tr2.StartSpan("remote op", opentracing.FollowsFrom(wireContext)) // Compare TraceIDs trace1 := s1.Context().(*spanContext).TraceID trace2 := s2.Context().(*spanContext).TraceID if trace1 != trace2 { t.Errorf("TraceID doesn't match: parent %d child %d", trace1, trace2) } s2.LogKV("x", 1) s2.Finish() // Verify that recording was started automatically. rec := GetRecording(s2) if err := TestingCheckRecordedSpans(rec, ` span remote op: tags: sb=1 x: 1 `); err != nil { t.Fatal(err) } if err := TestingCheckRecordedSpans(GetRecording(s1), ` span a: tags: sb=1 unfinished= `); err != nil { t.Fatal(err) } if err := ImportRemoteSpans(s1, rec); err != nil { t.Fatal(err) } s1.Finish() if err := TestingCheckRecordedSpans(GetRecording(s1), ` span a: tags: sb=1 span remote op: tags: sb=1 x: 1 `); err != nil { t.Fatal(err) } }
以上就是《开务分布式数据库 Tracing(二)—— 源码解析》的详细内容,更多关于mysql的资料请关注golang学习网公众号!
-
499 收藏
-
220 收藏
-
244 收藏
-
235 收藏
-
157 收藏
-
335 收藏
-
467 收藏
-
303 收藏
-
176 收藏
-
368 收藏
-
475 收藏
-
- 前端进阶之JavaScript设计模式
- 设计模式是开发人员在软件开发过程中面临一般问题时的解决方案,代表了最佳的实践。本课程的主打内容包括JS常见设计模式以及具体应用场景,打造一站式知识长龙服务,适合有JS基础的同学学习。
- 立即学习 542次学习
-
- GO语言核心编程课程
- 本课程采用真实案例,全面具体可落地,从理论到实践,一步一步将GO核心编程技术、编程思想、底层实现融会贯通,使学习者贴近时代脉搏,做IT互联网时代的弄潮儿。
- 立即学习 507次学习
-
- 简单聊聊mysql8与网络通信
- 如有问题加微信:Le-studyg;在课程中,我们将首先介绍MySQL8的新特性,包括性能优化、安全增强、新数据类型等,帮助学生快速熟悉MySQL8的最新功能。接着,我们将深入解析MySQL的网络通信机制,包括协议、连接管理、数据传输等,让
- 立即学习 497次学习
-
- JavaScript正则表达式基础与实战
- 在任何一门编程语言中,正则表达式,都是一项重要的知识,它提供了高效的字符串匹配与捕获机制,可以极大的简化程序设计。
- 立即学习 487次学习
-
- 从零制作响应式网站—Grid布局
- 本系列教程将展示从零制作一个假想的网络科技公司官网,分为导航,轮播,关于我们,成功案例,服务流程,团队介绍,数据部分,公司动态,底部信息等内容区块。网站整体采用CSSGrid布局,支持响应式,有流畅过渡和展现动画。
- 立即学习 484次学习