安装(集成 rabbitmq、mysql)

    docker run --name zipkin223 \
            -e STORAGE_TYPE=mysql \
            -e MYSQL_DB=test \
            -e MYSQL_USER=root \
            -e MYSQL_PASS=123456 \
            -e MYSQL_HOST=127.0.0.1 \
            -e MYSQL_TCP_PORT=3306 \
            -e RABBIT_ADDRESSES=127.0.0.1:5672 \
            -e RABBIT_USER=admin \
            -e RABBIT_PASSWORD=123456 \
            -p 9411:9411 \
            -d \
            openzipkin/zipkin:2.23

mysql 创建数据表

    CREATE TABLE IF NOT EXISTS zipkin_spans (
      `trace_id_high` BIGINT NOT NULL DEFAULT 0 COMMENT 'If non zero, this means the trace uses 128 bit traceIds instead of 64 bit',
      `trace_id` BIGINT NOT NULL,
      `id` BIGINT NOT NULL,
      `name` VARCHAR(255) NOT NULL,
      `remote_service_name` VARCHAR(255),
      `parent_id` BIGINT,
      `debug` BIT(1),
      `start_ts` BIGINT COMMENT 'Span.timestamp(): epoch micros used for endTs query and to implement TTL',
      `duration` BIGINT COMMENT 'Span.duration(): micros used for minDuration and maxDuration query',
      PRIMARY KEY (`trace_id_high`, `trace_id`, `id`)
    ) ENGINE=InnoDB ROW_FORMAT=COMPRESSED CHARACTER SET=utf8 COLLATE utf8_general_ci;

    ALTER TABLE zipkin_spans ADD INDEX(`trace_id_high`, `trace_id`) COMMENT 'for getTracesByIds';
    ALTER TABLE zipkin_spans ADD INDEX(`name`) COMMENT 'for getTraces and getSpanNames';
    ALTER TABLE zipkin_spans ADD INDEX(`remote_service_name`) COMMENT 'for getTraces and getRemoteServiceNames';
    ALTER TABLE zipkin_spans ADD INDEX(`start_ts`) COMMENT 'for getTraces ordering and range';

    CREATE TABLE IF NOT EXISTS zipkin_annotations (
      `trace_id_high` BIGINT NOT NULL DEFAULT 0 COMMENT 'If non zero, this means the trace uses 128 bit traceIds instead of 64 bit',
      `trace_id` BIGINT NOT NULL COMMENT 'coincides with zipkin_spans.trace_id',
      `span_id` BIGINT NOT NULL COMMENT 'coincides with zipkin_spans.id',
      `a_key` VARCHAR(255) NOT NULL COMMENT 'BinaryAnnotation.key or Annotation.value if type == -1',
      `a_value` BLOB COMMENT 'BinaryAnnotation.value(), which must be smaller than 64KB',
      `a_type` INT NOT NULL COMMENT 'BinaryAnnotation.type() or -1 if Annotation',
      `a_timestamp` BIGINT COMMENT 'Used to implement TTL; Annotation.timestamp or zipkin_spans.timestamp',
      `endpoint_ipv4` INT COMMENT 'Null when Binary/Annotation.endpoint is null',
      `endpoint_ipv6` BINARY(16) COMMENT 'Null when Binary/Annotation.endpoint is null, or no IPv6 address',
      `endpoint_port` SMALLINT COMMENT 'Null when Binary/Annotation.endpoint is null',
      `endpoint_service_name` VARCHAR(255) COMMENT 'Null when Binary/Annotation.endpoint is null'
    ) ENGINE=InnoDB ROW_FORMAT=COMPRESSED CHARACTER SET=utf8 COLLATE utf8_general_ci;

    ALTER TABLE zipkin_annotations ADD UNIQUE KEY(`trace_id_high`, `trace_id`, `span_id`, `a_key`, `a_timestamp`) COMMENT 'Ignore insert on duplicate';
    ALTER TABLE zipkin_annotations ADD INDEX(`trace_id_high`, `trace_id`, `span_id`) COMMENT 'for joining with zipkin_spans';
    ALTER TABLE zipkin_annotations ADD INDEX(`trace_id_high`, `trace_id`) COMMENT 'for getTraces/ByIds';
    ALTER TABLE zipkin_annotations ADD INDEX(`endpoint_service_name`) COMMENT 'for getTraces and getServiceNames';
    ALTER TABLE zipkin_annotations ADD INDEX(`a_type`) COMMENT 'for getTraces and autocomplete values';
    ALTER TABLE zipkin_annotations ADD INDEX(`a_key`) COMMENT 'for getTraces and autocomplete values';
    ALTER TABLE zipkin_annotations ADD INDEX(`trace_id`, `span_id`, `a_key`) COMMENT 'for dependencies job';

    CREATE TABLE IF NOT EXISTS zipkin_dependencies (
      `day` DATE NOT NULL,
      `parent` VARCHAR(255) NOT NULL,
      `child` VARCHAR(255) NOT NULL,
      `call_count` BIGINT,
      `error_count` BIGINT,
      PRIMARY KEY (`day`, `parent`, `child`)
    ) ENGINE=InnoDB ROW_FORMAT=COMPRESSED CHARACTER SET=utf8 COLLATE utf8_general_ci;

访问 web 界面

http://127.0.0.1:9411

使用

  1. 目录结构:

    |—— zipkin/     // 根目录
    ****|—— grpc/   // grpc 项目
    ********|—— hello/
    ************|—— client/
    ****************|—— main.go   // 客户端
    ************|—— server/
    ****************|—— main.go   // 服务端
    ********|—— proto/
    ************|—— hello/
    ****************|—— hello.proto   // proto 描述文件
    ****************|—— hello.pb.go   // proto 编译后文件
    ****|—— http/   // gin 项目
    ********|—— main.go
    ****|—— go.mod  // go module 文件
    ****|—— main.go  // opentracing + zipkin 原生用法
  2. 初始化 go module:

    cd zipkin
    go mod init zipkinTest
  3. 原生用法:

    1. 编写 zipkin/main.go 文件:

       package main
      
       import (
           "context"
           "fmt"
           "github.com/opentracing/opentracing-go"
           "github.com/opentracing/opentracing-go/log"
           zipkinOpen "github.com/openzipkin-contrib/zipkin-go-opentracing"
           "github.com/openzipkin/zipkin-go"
           httpReport "github.com/openzipkin/zipkin-go/reporter/http"
       )
      
       const (
           Address      = "127.0.0.1:8888"
           SERVICE_NAME = "test_server" // 追踪的服务名
           ZIPKIN_URL   = "http://127.0.0.1:9411/api/v2/spans"
       )
      
       // 自定义 carrier
       type MyTextMapCarrier struct {
           Data map[string]string
       }
      
       func (t MyTextMapCarrier) Set(key, val string) {
           t.Data[key] = val
       }
       func (t MyTextMapCarrier) ForeachKey(handler func(key, val string) error) error {
           for key, val := range t.Data {
               if err := handler(key, val); err != nil {
                   return err
               }
           }
           return nil
       }
      
       func main() {
           // 第一步:初始化 tracer
           reporter := httpReport.NewReporter(ZIPKIN_URL)
           defer reporter.Close()
           endpoint, err := zipkin.NewEndpoint(SERVICE_NAME, Address)
           if err != nil {
               fmt.Printf("unable to create local endpoint: %+v\n", err)
           }
           nativeTracer, err := zipkin.NewTracer(reporter, zipkin.WithLocalEndpoint(endpoint))
           if err != nil {
               fmt.Printf("unable to create tracer: %+v\n", err)
           }
           zkTracer := zipkinOpen.Wrap(nativeTracer)
           opentracing.SetGlobalTracer(zkTracer)
      
           // 第二步:注入-inject(客户端)
           carrier := MyTextMapCarrier{
               Data: make(map[string]string),
           }
           spanClient := zkTracer.StartSpan("client-span")
           defer spanClient.Finish()
           err = zkTracer.Inject(spanClient.Context(), opentracing.TextMap, carrier)
           if err != nil {
               fmt.Printf("inject error:%v\n", err)
           }
      
           // 第三步:提取-extract(服务端)
           spanCtx, err := zkTracer.Extract(opentracing.TextMap, carrier)
           if err != nil {
               fmt.Printf("extract error:%v\n", err)
           }
           spanServer := opentracing.StartSpan("server-span", opentracing.ChildOf(spanCtx))
           defer spanServer.Finish()
      
           // 第四步:生成 context
           ctx := opentracing.ContextWithSpan(context.Background(), spanServer)
      
           // 第五步:根据 context 继续追踪
           parentSpan, parentCtx := opentracing.StartSpanFromContext(ctx, "foo")
           traceWithSpan(parentSpan, []map[string]interface{}{
               {"foo-tag": "foo-tag-value"},
           }, []log.Field{
               log.String("foo-log", "foo-log-value"),
           })
           span, _ := opentracing.StartSpanFromContext(parentCtx, "bar")
           // 第六步:获取 trace_id
           traceID := span.Context().(zipkinOpen.SpanContext).TraceID.String()
           traceWithSpan(span, []map[string]interface{}{
               {
                   "bar-tag":  "bar-tag-value",
                   "trace_id": traceID,
               },
           }, nil)
           span.Finish()
           parentSpan.Finish()
       }
      
       func traceWithSpan(span opentracing.Span, tags []map[string]interface{}, logs []log.Field) {
           for _, tag := range tags {
               for k, v := range tag {
                   span.SetTag(k, v)
               }
           }
           if logs != nil {
               span.LogFields(logs...)
           }
       }
    2. 启动 zipkin/main.go:

      cd zipkin
      go run main.go
  4. grpc 项目:

    1. 安装 grpc 开发环境:
      参见:【rpc 框架 – grpc】- 1.前期准备

    2. 编写 zipkin/grpc/proto/hello/hello.proto 文件:

       syntax = "proto3";
       package hello;
       option go_package = ".;hello";
       service Hello {
           rpc SayHello(HelloRequest) returns (HelloResponse) {}
       }
       message HelloRequest {
           string name = 1;
       }
       message HelloResponse {
           string message = 1;
           string trace_id = 2;
       }
    3. 编译生成 .pb.go 文件:

      cd zipkin/grpc/proto/hello
      protoc -I . --go_out=plugins=grpc:. ./hello.proto
    4. 编写 zipkin/grpc/hello/server/main.go:

       package main
      
       import (
           "fmt"
           "github.com/opentracing/opentracing-go"
           "github.com/opentracing/opentracing-go/log"
           zipkinOpen "github.com/openzipkin-contrib/zipkin-go-opentracing"
           "github.com/openzipkin/zipkin-go"
           httpReport "github.com/openzipkin/zipkin-go/reporter/http"
           "golang.org/x/net/context"
           "google.golang.org/grpc"
           "google.golang.org/grpc/grpclog"
           "google.golang.org/grpc/metadata"
           "google.golang.org/grpc/reflection"
           "net"
           "zipkinTest/grpc/proto/hello"
       )
      
       const (
           Address      = "127.0.0.1:50052"
           SERVICE_NAME = "grpc_server" // 追踪的服务名
           ZIPKIN_URL   = "http://127.0.0.1:9411/api/v2/spans"
       )
      
       type MyTextMapCarrier struct {
           metadata.MD
       }
      
       func (t MyTextMapCarrier) Set(key, val string) {
           t.MD[key] = append(t.MD[key], val)
       }
      
       func (t MyTextMapCarrier) ForeachKey(handler func(key, val string) error) error {
           for key, val := range t.MD {
               for _, v := range val {
                   if err := handler(key, v); err != nil {
                       return err
                   }
               }
           }
           return nil
       }
      
       type helloService struct{}
      
       var HelloService = helloService{}
      
       func (h helloService) SayHello(ctx context.Context, in *hello.HelloRequest) (*hello.HelloResponse, error) {
           resp := new(hello.HelloResponse)
           // 第五步:根据 context 继续追踪
           span, ctx := opentracing.StartSpanFromContext(ctx, "response")
           // 第六步:获取 trace_id
           traceID := span.Context().(zipkinOpen.SpanContext).TraceID.String()
           resp.Message = fmt.Sprintf("Hello %s.", in.Name)
           resp.TraceId = traceID
           traceWithSpan(span, []map[string]interface{}{
               {"resp": resp},
           }, nil)
           span.Finish()
           return resp, nil
       }
       func main() {
           // 第一步:初始化 tracer
           reporter := httpReport.NewReporter(ZIPKIN_URL)
           defer reporter.Close()
           endpoint, err := zipkin.NewEndpoint(SERVICE_NAME, Address)
           if err != nil {
               fmt.Printf("unable to create local endpoint: %+v\n", err)
           }
           nativeTracer, err := zipkin.NewTracer(reporter, zipkin.WithLocalEndpoint(endpoint))
           if err != nil {
               fmt.Printf("unable to create tracer: %+v\n", err)
           }
           zkTracer := zipkinOpen.Wrap(nativeTracer)
           opentracing.SetGlobalTracer(zkTracer)
      
           listen, err := net.Listen("tcp", Address)
           if err != nil {
               grpclog.Fatalf("Failed to listen: %v", err)
           }
           // 第二步:添加一个拦截器(interceptor), 为每一个请求添加 span
           srv := grpc.NewServer(grpc.UnaryInterceptor(interceptorTracer))
           hello.RegisterHelloServer(srv, HelloService)
           reflection.Register(srv) // 注册到grpcurl
           fmt.Println("Listen on " + Address)
           srv.Serve(listen)
       }
      
       func interceptorTracer(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
           md, ok := metadata.FromIncomingContext(ctx)
           if !ok {
               md = metadata.New(nil)
           } else {
               md = md.Copy()
           }
           carrier := MyTextMapCarrier{md}
      
           // 第三步:提取-extract(服务端)
           spanCtx, err := opentracing.GlobalTracer().Extract(opentracing.TextMap, carrier)
           if err != nil {
               fmt.Printf("extract error:%v\n", err)
           }
      
           // 第四步:生成 span 和 context
           span := opentracing.StartSpan(info.FullMethod, opentracing.ChildOf(spanCtx))
           defer span.Finish()
           ctx = opentracing.ContextWithSpan(ctx, span)
      
           span.SetTag("req", req)
           // 继续处理请求
           return handler(ctx, req)
       }
      
       func traceWithSpan(span opentracing.Span, tags []map[string]interface{}, logs []log.Field) {
           for _, tag := range tags {
               for k, v := range tag {
                   span.SetTag(k, v)
               }
           }
           if logs != nil {
               span.LogFields(logs...)
           }
       }
    5. 启动 zipkin/grpc/hello/server/main.go:

      cd zipkin/grpc/hello/server
      go run main.go
    6. 编写 zipkin/grpc/hello/client/main.go:

       package main
      
       import (
           "fmt"
           "github.com/opentracing/opentracing-go"
           zipkinOpen "github.com/openzipkin-contrib/zipkin-go-opentracing"
           "github.com/openzipkin/zipkin-go"
           httpReport "github.com/openzipkin/zipkin-go/reporter/http"
           "golang.org/x/net/context"
           "google.golang.org/grpc"
           "google.golang.org/grpc/grpclog"
           "google.golang.org/grpc/metadata"
           "time"
           "zipkinTest/grpc/proto/hello"
       )
      
       const (
           Address      = "127.0.0.1:50052"
           SERVICE_NAME = "grpc_client" // 追踪的服务名
           ZIPKIN_URL   = "http://127.0.0.1:9411/api/v2/spans"
       )
      
       type MyTextMapCarrier struct {
           metadata.MD
       }
      
       func (t MyTextMapCarrier) Set(key, val string) {
           t.MD[key] = append(t.MD[key], val)
       }
      
       func (t MyTextMapCarrier) ForeachKey(handler func(key, val string) error) error {
           for key, val := range t.MD {
               for _, v := range val {
                   if err := handler(key, v); err != nil {
                       return err
                   }
               }
           }
           return nil
       }
      
       func main() {
           // 第一步:初始化 tracer
           reporter := httpReport.NewReporter(ZIPKIN_URL)
           defer reporter.Close()
           endpoint, err := zipkin.NewEndpoint(SERVICE_NAME, Address)
           if err != nil {
               fmt.Printf("unable to create local endpoint: %+v\n", err)
           }
           nativeTracer, err := zipkin.NewTracer(reporter, zipkin.WithLocalEndpoint(endpoint))
           if err != nil {
               fmt.Printf("unable to create tracer: %+v\n", err)
           }
           zkTracer := zipkinOpen.Wrap(nativeTracer)
           opentracing.SetGlobalTracer(zkTracer)
      
           // 第二步:添加一个拦截器(interceptor), 为每一个请求添加 span
           conn, err := grpc.Dial(
               Address,
               grpc.WithInsecure(),
               grpc.WithUnaryInterceptor(interceptorTracer),
           )
           if err != nil {
               grpclog.Fatalln(err)
           }
           defer conn.Close()
           c := hello.NewHelloClient(conn)
           req := &hello.HelloRequest{Name: "gRPC"}
           resp, err := c.SayHello(context.Background(), req)
           if err != nil {
               grpclog.Fatalln(err)
           }
           fmt.Printf("message:%s,\ntrace_id:%s\n", resp.Message, resp.TraceId)
       }
      
       func interceptorTracer(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
           span := opentracing.GlobalTracer().StartSpan(method)
           defer span.Finish()
      
           md, ok := metadata.FromIncomingContext(ctx)
           if !ok {
               md = metadata.New(nil)
           } else {
               md = md.Copy()
           }
           carrier := MyTextMapCarrier{md}
           // 第三步:注入-inject(客户端)
           err := opentracing.GlobalTracer().Inject(span.Context(), opentracing.TextMap, carrier)
           if err != nil {
               fmt.Printf("inject error:%v\n", err)
           }
           // 设置 tag
           ctx = metadata.NewOutgoingContext(ctx, md)
           start := time.Now()
           err = invoker(ctx, method, req, reply, cc, opts...)
           span.SetTag("method", method)
           span.SetTag("req", req)
           span.SetTag("resp", reply)
           span.SetTag("duration", time.Since(start))
           span.SetTag("error", err)
           return err
       }
    7. 启动 zipkin/grpc/hello/client/main.go:

      cd zipkin/grpc/hello/client
      go run main.go
  5. gin 项目:

    1. 编写 zipkin/http/main.go:

       package main
      
       import (
           "bytes"
           "fmt"
           "github.com/gin-gonic/gin"
           "github.com/opentracing/opentracing-go"
           "github.com/opentracing/opentracing-go/log"
           zipkinOpen "github.com/openzipkin-contrib/zipkin-go-opentracing"
           "github.com/openzipkin/zipkin-go"
           httpReport "github.com/openzipkin/zipkin-go/reporter/http"
           "io"
       )
      
       const (
           Address      = "127.0.0.1:8080"
           SERVICE_NAME = "http_server" // 追踪的服务名
           ZIPKIN_URL   = "http://127.0.0.1:9411/api/v2/spans"
       )
      
       func main() {
           // 第一步:初始化 tracer
           reporter := httpReport.NewReporter(ZIPKIN_URL)
           defer reporter.Close()
           endpoint, err := zipkin.NewEndpoint(SERVICE_NAME, Address)
           if err != nil {
               fmt.Printf("unable to create local endpoint: %+v\n", err)
           }
           nativeTracer, err := zipkin.NewTracer(reporter, zipkin.WithLocalEndpoint(endpoint))
           if err != nil {
               fmt.Printf("unable to create tracer: %+v\n", err)
           }
           zkTracer := zipkinOpen.Wrap(nativeTracer)
           opentracing.SetGlobalTracer(zkTracer)
      
           router := gin.Default()
           // 第二步:添加一个 middleware, 为每一个请求添加 span
           router.Use(middlewareTracer)
           router.GET("/ping", func(c *gin.Context) {
               resp := map[string]interface{}{
                   "message": "pong",
               }
               c.JSON(200, resp)
               // 第六步:根据 context 继续追踪
               span, _ := opentracing.StartSpanFromContext(c.Request.Context(), "ping")
               traceWithSpan(span, []map[string]interface{}{
                   {"req": c.Request.URL.Query()},
                   {"resp": resp},
               }, nil)
               span.Finish()
           })
           router.Run(Address)
       }
      
       func middlewareTracer(c *gin.Context) {
           params, _ := c.GetRawData()
           if len(params) > 0 {
               c.Request.Body = io.NopCloser(bytes.NewBuffer(params))
           }
      
           // 第三步:提取-extract(服务端)
           spanCtx, err := opentracing.GlobalTracer().Extract(opentracing.HTTPHeaders, opentracing.HTTPHeadersCarrier(c.Request.Header))
           if err != nil {
               fmt.Printf("extract error:%v\n", err)
           }
      
           // 第四步:生成 span 和 context
           span := opentracing.StartSpan(fmt.Sprintf("%s %s", c.Request.Method, c.FullPath()), opentracing.ChildOf(spanCtx))
           span.SetTag("host", c.Request.Host)
           span.SetTag("url", c.Request.URL)
           span.SetTag("header", c.Request.Header)
           span.SetTag("method", c.Request.Method)
           span.SetTag("remoteAddr", c.Request.RemoteAddr)
           span.SetTag("proto", c.Request.Proto)
           span.SetTag("params", string(params))
           defer span.Finish()
           ctx := opentracing.ContextWithSpan(c.Request.Context(), span)
           c.Request = c.Request.WithContext(ctx)
      
           // 第五步:获取 trace_id
           traceID := span.Context().(zipkinOpen.SpanContext).TraceID.String()
           c.Writer.Header().Set("trace-id", traceID)
      
           c.Next()
       }
      
       func traceWithSpan(span opentracing.Span, tags []map[string]interface{}, logs []log.Field) {
           for _, tag := range tags {
               for k, v := range tag {
                   span.SetTag(k, v)
               }
           }
           if logs != nil {
               span.LogFields(logs...)
           }
       }
    2. 启动 zipkin/http/main.go:

      cd zipkin/http
      go run main.go
    3. 请求 web 界面:
      curl 127.0.0.1:8080/ping

  6. 访问 web 界面进行追踪验证:
    http://127.0.0.1:9411

文档更新时间: 2024-04-20 10:57   作者:lee