安装(集成 rabbitmq、mysql)
docker run --name zipkin223 \
-e STORAGE_TYPE=mysql \
-e MYSQL_DB=test \
-e MYSQL_USER=root \
-e MYSQL_PASS=123456 \
-e MYSQL_HOST=127.0.0.1 \
-e MYSQL_TCP_PORT=3306 \
-e RABBIT_ADDRESSES=127.0.0.1:5672 \
-e RABBIT_USER=admin \
-e RABBIT_PASSWORD=123456 \
-p 9411:9411 \
-d \
openzipkin/zipkin:2.23
mysql 创建数据表
CREATE TABLE IF NOT EXISTS zipkin_spans (
`trace_id_high` BIGINT NOT NULL DEFAULT 0 COMMENT 'If non zero, this means the trace uses 128 bit traceIds instead of 64 bit',
`trace_id` BIGINT NOT NULL,
`id` BIGINT NOT NULL,
`name` VARCHAR(255) NOT NULL,
`remote_service_name` VARCHAR(255),
`parent_id` BIGINT,
`debug` BIT(1),
`start_ts` BIGINT COMMENT 'Span.timestamp(): epoch micros used for endTs query and to implement TTL',
`duration` BIGINT COMMENT 'Span.duration(): micros used for minDuration and maxDuration query',
PRIMARY KEY (`trace_id_high`, `trace_id`, `id`)
) ENGINE=InnoDB ROW_FORMAT=COMPRESSED CHARACTER SET=utf8 COLLATE utf8_general_ci;
ALTER TABLE zipkin_spans ADD INDEX(`trace_id_high`, `trace_id`) COMMENT 'for getTracesByIds';
ALTER TABLE zipkin_spans ADD INDEX(`name`) COMMENT 'for getTraces and getSpanNames';
ALTER TABLE zipkin_spans ADD INDEX(`remote_service_name`) COMMENT 'for getTraces and getRemoteServiceNames';
ALTER TABLE zipkin_spans ADD INDEX(`start_ts`) COMMENT 'for getTraces ordering and range';
CREATE TABLE IF NOT EXISTS zipkin_annotations (
`trace_id_high` BIGINT NOT NULL DEFAULT 0 COMMENT 'If non zero, this means the trace uses 128 bit traceIds instead of 64 bit',
`trace_id` BIGINT NOT NULL COMMENT 'coincides with zipkin_spans.trace_id',
`span_id` BIGINT NOT NULL COMMENT 'coincides with zipkin_spans.id',
`a_key` VARCHAR(255) NOT NULL COMMENT 'BinaryAnnotation.key or Annotation.value if type == -1',
`a_value` BLOB COMMENT 'BinaryAnnotation.value(), which must be smaller than 64KB',
`a_type` INT NOT NULL COMMENT 'BinaryAnnotation.type() or -1 if Annotation',
`a_timestamp` BIGINT COMMENT 'Used to implement TTL; Annotation.timestamp or zipkin_spans.timestamp',
`endpoint_ipv4` INT COMMENT 'Null when Binary/Annotation.endpoint is null',
`endpoint_ipv6` BINARY(16) COMMENT 'Null when Binary/Annotation.endpoint is null, or no IPv6 address',
`endpoint_port` SMALLINT COMMENT 'Null when Binary/Annotation.endpoint is null',
`endpoint_service_name` VARCHAR(255) COMMENT 'Null when Binary/Annotation.endpoint is null'
) ENGINE=InnoDB ROW_FORMAT=COMPRESSED CHARACTER SET=utf8 COLLATE utf8_general_ci;
ALTER TABLE zipkin_annotations ADD UNIQUE KEY(`trace_id_high`, `trace_id`, `span_id`, `a_key`, `a_timestamp`) COMMENT 'Ignore insert on duplicate';
ALTER TABLE zipkin_annotations ADD INDEX(`trace_id_high`, `trace_id`, `span_id`) COMMENT 'for joining with zipkin_spans';
ALTER TABLE zipkin_annotations ADD INDEX(`trace_id_high`, `trace_id`) COMMENT 'for getTraces/ByIds';
ALTER TABLE zipkin_annotations ADD INDEX(`endpoint_service_name`) COMMENT 'for getTraces and getServiceNames';
ALTER TABLE zipkin_annotations ADD INDEX(`a_type`) COMMENT 'for getTraces and autocomplete values';
ALTER TABLE zipkin_annotations ADD INDEX(`a_key`) COMMENT 'for getTraces and autocomplete values';
ALTER TABLE zipkin_annotations ADD INDEX(`trace_id`, `span_id`, `a_key`) COMMENT 'for dependencies job';
CREATE TABLE IF NOT EXISTS zipkin_dependencies (
`day` DATE NOT NULL,
`parent` VARCHAR(255) NOT NULL,
`child` VARCHAR(255) NOT NULL,
`call_count` BIGINT,
`error_count` BIGINT,
PRIMARY KEY (`day`, `parent`, `child`)
) ENGINE=InnoDB ROW_FORMAT=COMPRESSED CHARACTER SET=utf8 COLLATE utf8_general_ci;
访问 web 界面
http://127.0.0.1:9411
使用
目录结构:
|—— zipkin/ // 根目录 ****|—— grpc/ // grpc 项目 ********|—— hello/ ************|—— client/ ****************|—— main.go // 客户端 ************|—— server/ ****************|—— main.go // 服务端 ********|—— proto/ ************|—— hello/ ****************|—— hello.proto // proto 描述文件 ****************|—— hello.pb.go // proto 编译后文件 ****|—— http/ // gin 项目 ********|—— main.go ****|—— go.mod // go module 文件 ****|—— main.go // opentracing + zipkin 原生用法
初始化 go module:
cd zipkin go mod init zipkinTest
原生用法:
编写 zipkin/main.go 文件:
package main import ( "context" "fmt" "github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go/log" zipkinOpen "github.com/openzipkin-contrib/zipkin-go-opentracing" "github.com/openzipkin/zipkin-go" httpReport "github.com/openzipkin/zipkin-go/reporter/http" ) const ( Address = "127.0.0.1:8888" SERVICE_NAME = "test_server" // 追踪的服务名 ZIPKIN_URL = "http://127.0.0.1:9411/api/v2/spans" ) // 自定义 carrier type MyTextMapCarrier struct { Data map[string]string } func (t MyTextMapCarrier) Set(key, val string) { t.Data[key] = val } func (t MyTextMapCarrier) ForeachKey(handler func(key, val string) error) error { for key, val := range t.Data { if err := handler(key, val); err != nil { return err } } return nil } func main() { // 第一步:初始化 tracer reporter := httpReport.NewReporter(ZIPKIN_URL) defer reporter.Close() endpoint, err := zipkin.NewEndpoint(SERVICE_NAME, Address) if err != nil { fmt.Printf("unable to create local endpoint: %+v\n", err) } nativeTracer, err := zipkin.NewTracer(reporter, zipkin.WithLocalEndpoint(endpoint)) if err != nil { fmt.Printf("unable to create tracer: %+v\n", err) } zkTracer := zipkinOpen.Wrap(nativeTracer) opentracing.SetGlobalTracer(zkTracer) // 第二步:注入-inject(客户端) carrier := MyTextMapCarrier{ Data: make(map[string]string), } spanClient := zkTracer.StartSpan("client-span") defer spanClient.Finish() err = zkTracer.Inject(spanClient.Context(), opentracing.TextMap, carrier) if err != nil { fmt.Printf("inject error:%v\n", err) } // 第三步:提取-extract(服务端) spanCtx, err := zkTracer.Extract(opentracing.TextMap, carrier) if err != nil { fmt.Printf("extract error:%v\n", err) } spanServer := opentracing.StartSpan("server-span", opentracing.ChildOf(spanCtx)) defer spanServer.Finish() // 第四步:生成 context ctx := opentracing.ContextWithSpan(context.Background(), spanServer) // 第五步:根据 context 继续追踪 parentSpan, parentCtx := opentracing.StartSpanFromContext(ctx, "foo") traceWithSpan(parentSpan, []map[string]interface{}{ {"foo-tag": "foo-tag-value"}, }, []log.Field{ log.String("foo-log", "foo-log-value"), }) span, _ := opentracing.StartSpanFromContext(parentCtx, "bar") // 第六步:获取 trace_id traceID := span.Context().(zipkinOpen.SpanContext).TraceID.String() traceWithSpan(span, []map[string]interface{}{ { "bar-tag": "bar-tag-value", "trace_id": traceID, }, }, nil) span.Finish() parentSpan.Finish() } func traceWithSpan(span opentracing.Span, tags []map[string]interface{}, logs []log.Field) { for _, tag := range tags { for k, v := range tag { span.SetTag(k, v) } } if logs != nil { span.LogFields(logs...) } }
启动 zipkin/main.go:
cd zipkin go run main.go
grpc 项目:
安装 grpc 开发环境:
参见:【rpc 框架 – grpc】- 1.前期准备编写 zipkin/grpc/proto/hello/hello.proto 文件:
syntax = "proto3"; package hello; option go_package = ".;hello"; service Hello { rpc SayHello(HelloRequest) returns (HelloResponse) {} } message HelloRequest { string name = 1; } message HelloResponse { string message = 1; string trace_id = 2; }
编译生成 .pb.go 文件:
cd zipkin/grpc/proto/hello protoc -I . --go_out=plugins=grpc:. ./hello.proto
编写 zipkin/grpc/hello/server/main.go:
package main import ( "fmt" "github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go/log" zipkinOpen "github.com/openzipkin-contrib/zipkin-go-opentracing" "github.com/openzipkin/zipkin-go" httpReport "github.com/openzipkin/zipkin-go/reporter/http" "golang.org/x/net/context" "google.golang.org/grpc" "google.golang.org/grpc/grpclog" "google.golang.org/grpc/metadata" "google.golang.org/grpc/reflection" "net" "zipkinTest/grpc/proto/hello" ) const ( Address = "127.0.0.1:50052" SERVICE_NAME = "grpc_server" // 追踪的服务名 ZIPKIN_URL = "http://127.0.0.1:9411/api/v2/spans" ) type MyTextMapCarrier struct { metadata.MD } func (t MyTextMapCarrier) Set(key, val string) { t.MD[key] = append(t.MD[key], val) } func (t MyTextMapCarrier) ForeachKey(handler func(key, val string) error) error { for key, val := range t.MD { for _, v := range val { if err := handler(key, v); err != nil { return err } } } return nil } type helloService struct{} var HelloService = helloService{} func (h helloService) SayHello(ctx context.Context, in *hello.HelloRequest) (*hello.HelloResponse, error) { resp := new(hello.HelloResponse) // 第五步:根据 context 继续追踪 span, ctx := opentracing.StartSpanFromContext(ctx, "response") // 第六步:获取 trace_id traceID := span.Context().(zipkinOpen.SpanContext).TraceID.String() resp.Message = fmt.Sprintf("Hello %s.", in.Name) resp.TraceId = traceID traceWithSpan(span, []map[string]interface{}{ {"resp": resp}, }, nil) span.Finish() return resp, nil } func main() { // 第一步:初始化 tracer reporter := httpReport.NewReporter(ZIPKIN_URL) defer reporter.Close() endpoint, err := zipkin.NewEndpoint(SERVICE_NAME, Address) if err != nil { fmt.Printf("unable to create local endpoint: %+v\n", err) } nativeTracer, err := zipkin.NewTracer(reporter, zipkin.WithLocalEndpoint(endpoint)) if err != nil { fmt.Printf("unable to create tracer: %+v\n", err) } zkTracer := zipkinOpen.Wrap(nativeTracer) opentracing.SetGlobalTracer(zkTracer) listen, err := net.Listen("tcp", Address) if err != nil { grpclog.Fatalf("Failed to listen: %v", err) } // 第二步:添加一个拦截器(interceptor), 为每一个请求添加 span srv := grpc.NewServer(grpc.UnaryInterceptor(interceptorTracer)) hello.RegisterHelloServer(srv, HelloService) reflection.Register(srv) // 注册到grpcurl fmt.Println("Listen on " + Address) srv.Serve(listen) } func interceptorTracer(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { md, ok := metadata.FromIncomingContext(ctx) if !ok { md = metadata.New(nil) } else { md = md.Copy() } carrier := MyTextMapCarrier{md} // 第三步:提取-extract(服务端) spanCtx, err := opentracing.GlobalTracer().Extract(opentracing.TextMap, carrier) if err != nil { fmt.Printf("extract error:%v\n", err) } // 第四步:生成 span 和 context span := opentracing.StartSpan(info.FullMethod, opentracing.ChildOf(spanCtx)) defer span.Finish() ctx = opentracing.ContextWithSpan(ctx, span) span.SetTag("req", req) // 继续处理请求 return handler(ctx, req) } func traceWithSpan(span opentracing.Span, tags []map[string]interface{}, logs []log.Field) { for _, tag := range tags { for k, v := range tag { span.SetTag(k, v) } } if logs != nil { span.LogFields(logs...) } }
启动 zipkin/grpc/hello/server/main.go:
cd zipkin/grpc/hello/server go run main.go
编写 zipkin/grpc/hello/client/main.go:
package main import ( "fmt" "github.com/opentracing/opentracing-go" zipkinOpen "github.com/openzipkin-contrib/zipkin-go-opentracing" "github.com/openzipkin/zipkin-go" httpReport "github.com/openzipkin/zipkin-go/reporter/http" "golang.org/x/net/context" "google.golang.org/grpc" "google.golang.org/grpc/grpclog" "google.golang.org/grpc/metadata" "time" "zipkinTest/grpc/proto/hello" ) const ( Address = "127.0.0.1:50052" SERVICE_NAME = "grpc_client" // 追踪的服务名 ZIPKIN_URL = "http://127.0.0.1:9411/api/v2/spans" ) type MyTextMapCarrier struct { metadata.MD } func (t MyTextMapCarrier) Set(key, val string) { t.MD[key] = append(t.MD[key], val) } func (t MyTextMapCarrier) ForeachKey(handler func(key, val string) error) error { for key, val := range t.MD { for _, v := range val { if err := handler(key, v); err != nil { return err } } } return nil } func main() { // 第一步:初始化 tracer reporter := httpReport.NewReporter(ZIPKIN_URL) defer reporter.Close() endpoint, err := zipkin.NewEndpoint(SERVICE_NAME, Address) if err != nil { fmt.Printf("unable to create local endpoint: %+v\n", err) } nativeTracer, err := zipkin.NewTracer(reporter, zipkin.WithLocalEndpoint(endpoint)) if err != nil { fmt.Printf("unable to create tracer: %+v\n", err) } zkTracer := zipkinOpen.Wrap(nativeTracer) opentracing.SetGlobalTracer(zkTracer) // 第二步:添加一个拦截器(interceptor), 为每一个请求添加 span conn, err := grpc.Dial( Address, grpc.WithInsecure(), grpc.WithUnaryInterceptor(interceptorTracer), ) if err != nil { grpclog.Fatalln(err) } defer conn.Close() c := hello.NewHelloClient(conn) req := &hello.HelloRequest{Name: "gRPC"} resp, err := c.SayHello(context.Background(), req) if err != nil { grpclog.Fatalln(err) } fmt.Printf("message:%s,\ntrace_id:%s\n", resp.Message, resp.TraceId) } func interceptorTracer(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { span := opentracing.GlobalTracer().StartSpan(method) defer span.Finish() md, ok := metadata.FromIncomingContext(ctx) if !ok { md = metadata.New(nil) } else { md = md.Copy() } carrier := MyTextMapCarrier{md} // 第三步:注入-inject(客户端) err := opentracing.GlobalTracer().Inject(span.Context(), opentracing.TextMap, carrier) if err != nil { fmt.Printf("inject error:%v\n", err) } // 设置 tag ctx = metadata.NewOutgoingContext(ctx, md) start := time.Now() err = invoker(ctx, method, req, reply, cc, opts...) span.SetTag("method", method) span.SetTag("req", req) span.SetTag("resp", reply) span.SetTag("duration", time.Since(start)) span.SetTag("error", err) return err }
启动 zipkin/grpc/hello/client/main.go:
cd zipkin/grpc/hello/client go run main.go
gin 项目:
编写 zipkin/http/main.go:
package main import ( "bytes" "fmt" "github.com/gin-gonic/gin" "github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go/log" zipkinOpen "github.com/openzipkin-contrib/zipkin-go-opentracing" "github.com/openzipkin/zipkin-go" httpReport "github.com/openzipkin/zipkin-go/reporter/http" "io" ) const ( Address = "127.0.0.1:8080" SERVICE_NAME = "http_server" // 追踪的服务名 ZIPKIN_URL = "http://127.0.0.1:9411/api/v2/spans" ) func main() { // 第一步:初始化 tracer reporter := httpReport.NewReporter(ZIPKIN_URL) defer reporter.Close() endpoint, err := zipkin.NewEndpoint(SERVICE_NAME, Address) if err != nil { fmt.Printf("unable to create local endpoint: %+v\n", err) } nativeTracer, err := zipkin.NewTracer(reporter, zipkin.WithLocalEndpoint(endpoint)) if err != nil { fmt.Printf("unable to create tracer: %+v\n", err) } zkTracer := zipkinOpen.Wrap(nativeTracer) opentracing.SetGlobalTracer(zkTracer) router := gin.Default() // 第二步:添加一个 middleware, 为每一个请求添加 span router.Use(middlewareTracer) router.GET("/ping", func(c *gin.Context) { resp := map[string]interface{}{ "message": "pong", } c.JSON(200, resp) // 第六步:根据 context 继续追踪 span, _ := opentracing.StartSpanFromContext(c.Request.Context(), "ping") traceWithSpan(span, []map[string]interface{}{ {"req": c.Request.URL.Query()}, {"resp": resp}, }, nil) span.Finish() }) router.Run(Address) } func middlewareTracer(c *gin.Context) { params, _ := c.GetRawData() if len(params) > 0 { c.Request.Body = io.NopCloser(bytes.NewBuffer(params)) } // 第三步:提取-extract(服务端) spanCtx, err := opentracing.GlobalTracer().Extract(opentracing.HTTPHeaders, opentracing.HTTPHeadersCarrier(c.Request.Header)) if err != nil { fmt.Printf("extract error:%v\n", err) } // 第四步:生成 span 和 context span := opentracing.StartSpan(fmt.Sprintf("%s %s", c.Request.Method, c.FullPath()), opentracing.ChildOf(spanCtx)) span.SetTag("host", c.Request.Host) span.SetTag("url", c.Request.URL) span.SetTag("header", c.Request.Header) span.SetTag("method", c.Request.Method) span.SetTag("remoteAddr", c.Request.RemoteAddr) span.SetTag("proto", c.Request.Proto) span.SetTag("params", string(params)) defer span.Finish() ctx := opentracing.ContextWithSpan(c.Request.Context(), span) c.Request = c.Request.WithContext(ctx) // 第五步:获取 trace_id traceID := span.Context().(zipkinOpen.SpanContext).TraceID.String() c.Writer.Header().Set("trace-id", traceID) c.Next() } func traceWithSpan(span opentracing.Span, tags []map[string]interface{}, logs []log.Field) { for _, tag := range tags { for k, v := range tag { span.SetTag(k, v) } } if logs != nil { span.LogFields(logs...) } }
启动 zipkin/http/main.go:
cd zipkin/http go run main.go
请求 web 界面:
curl 127.0.0.1:8080/ping
访问 web 界面进行追踪验证:
http://127.0.0.1:9411
文档更新时间: 2024-04-20 10:57 作者:lee