前期准备:
- 下载 protobuf 插件:
https://github.com/protocolbuffers/protobuf/releases
- 安装 protoc-gen-go 插件:
go get github.com/golang/protobuf/protoc-gen-go@v1.1.0 go install github.com/golang/protobuf/protoc-gen-go@v1.1.0
- 目录结构:
|—— grpc/ // 根目录 ****|—— hello/ ********|—— client/ ************|—— main.go // 客户端 ********|—— server/ ************|—— main.go // 服务端 ********|—— keys/ // 证书目录 ************|—— server.key ************|—— server.crt ****|—— proto/ ********|—— hello/ ************|—— hello.proto // proto 描述文件 ************|—— hello.pb.go // proto 编译后文件
- 初始化 go module:
cd grpc go mod init grpcTest
- 编写 proto/hello/hello.proto 文件:
syntax = "proto3"; package hello; option go_package = ".;hello"; service Hello { rpc SayHello(HelloRequest) returns (HelloResponse) {} } message HelloRequest { string name = 1; } message HelloResponse { string message = 1; }
- 编译生成 .pb.go 文件:
cd proto/hello protoc -I . --go_out=plugins=grpc:. ./hello.proto
- 下载 protobuf 插件:
简单用法:
编写 hello/server/main.go:
package main import ( "fmt" "golang.org/x/net/context" "google.golang.org/grpc" "google.golang.org/grpc/grpclog" "google.golang.org/grpc/metadata" "google.golang.org/grpc/reflection" "grpcTest/proto/hello" "net" ) const ( Address = "127.0.0.1:50052" ) type helloService struct{} var HelloService = helloService{} func (h helloService) SayHello(ctx context.Context, in *hello.HelloRequest) (*hello.HelloResponse, error) { md, ok := metadata.FromIncomingContext(ctx) if !ok { fmt.Println("get metadata error") } if tokens := md.Get("token"); len(tokens) > 0 { fmt.Printf("token: %s\n", tokens[0]) } if names := md.Get("name"); len(names) > 0 { fmt.Printf("name: %s\n", names[0]) } resp := new(hello.HelloResponse) resp.Message = fmt.Sprintf("Hello %s.", in.Name) return resp, nil } func main() { listen, err := net.Listen("tcp", Address) if err != nil { grpclog.Fatalf("Failed to listen: %v", err) } s := grpc.NewServer() hello.RegisterHelloServer(s, HelloService) reflection.Register(s) // 注册到grpcurl fmt.Println("Listen on " + Address) s.Serve(listen) }
启动 hello/server/main.go:
cd hello/server go run main.go
编写 hello/client/main.go:
package main import ( "fmt" "golang.org/x/net/context" "google.golang.org/grpc" "google.golang.org/grpc/grpclog" "google.golang.org/grpc/metadata" "grpcTest/proto/hello" ) const ( Address = "127.0.0.1:50052" ) func main() { conn, err := grpc.Dial(Address, grpc.WithInsecure()) if err != nil { grpclog.Fatalln(err) } defer conn.Close() md := metadata.Pairs( "token", "xxx", "name", "lee", ) ctx := metadata.NewOutgoingContext(context.Background(), md) c := hello.NewHelloClient(conn) req := &hello.HelloRequest{Name: "gRPC"} res, err := c.SayHello(ctx, req) if err != nil { grpclog.Fatalln(err) } fmt.Println(res.Message) }
启动 hello/client/main.go:
cd hello/client go run main.go
ssl 认证:
- 生成证书:
参见:openssl 用法–1.生成证书// hello/keys/server.key // hello/keys/server.crt
- 编写 hello/server/main.go:
package main import ( "fmt" "golang.org/x/net/context" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/grpclog" "grpcTest/proto/hello" "net" ) const ( Address = "127.0.0.1:50052" ) type helloService struct{} var HelloService = helloService{} func (h helloService) SayHello(ctx context.Context, in *hello.HelloRequest) (*hello.HelloResponse, error) { resp := new(hello.HelloResponse) resp.Message = fmt.Sprintf("Hello %s.", in.Name) return resp, nil } func main() { listen, err := net.Listen("tcp", Address) creds, err := credentials.NewServerTLSFromFile("../keys/server.crt", "../keys/server.key") if err != nil { grpclog.Fatalf("Failed to generate credentials %v", err) } s := grpc.NewServer(grpc.Creds(creds)) hello.RegisterHelloServer(s, HelloService) fmt.Println("Listen on " + Address) s.Serve(listen) }
- 启动 hello/server/main.go:
cd hello/server go run main.go
- 编写 hello/client/main.go:
package main import ( "fmt" "golang.org/x/net/context" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/grpclog" "grpcTest/proto/hello" ) const ( Address = "127.0.0.1:50052" ) func main() { creds, err := credentials.NewClientTLSFromFile("../keys/server.crt", "localhost") if err != nil { grpclog.Fatalf("Failed to create TLS credentials %v", err) } conn, err := grpc.Dial(Address, grpc.WithTransportCredentials(creds)) if err != nil { grpclog.Fatalln(err) } defer conn.Close() c := hello.NewHelloClient(conn) req := &hello.HelloRequest{Name: "gRPC"} res, err := c.SayHello(context.Background(), req) if err != nil { grpclog.Fatalln(err) } fmt.Println(res.Message) }
- 启动 hello/client/main.go:
export GODEBUG=x509ignoreCN=0 cd hello/client go run main.go
- 生成证书:
拦截器:
编写 hello/server/main.go:
package main import ( "fmt" "golang.org/x/net/context" "google.golang.org/grpc" "google.golang.org/grpc/grpclog" "google.golang.org/grpc/reflection" "grpcTest/proto/hello" "net" ) const ( Address = "127.0.0.1:50052" ) type helloService struct{} var HelloService = helloService{} func (h helloService) SayHello(ctx context.Context, in *hello.HelloRequest) (*hello.HelloResponse, error) { resp := new(hello.HelloResponse) resp.Message = fmt.Sprintf("Hello %s.", in.Name) return resp, nil } func main() { listen, err := net.Listen("tcp", Address) if err != nil { grpclog.Fatalf("Failed to listen: %v", err) } s := grpc.NewServer(grpc.UnaryInterceptor(interceptor)) hello.RegisterHelloServer(s, HelloService) reflection.Register(s) // 注册到grpcurl fmt.Println("Listen on " + Address) s.Serve(listen) } func interceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { fmt.Println("server interceptor is running!") fmt.Printf("method=%s req=%v\n", info.FullMethod, req) // 继续处理请求 return handler(ctx, req) }
启动 hello/server/main.go:
cd hello/server go run main.go
编写 hello/client/main.go:
package main import ( "fmt" "golang.org/x/net/context" "google.golang.org/grpc" "google.golang.org/grpc/grpclog" "grpcTest/proto/hello" "time" ) const ( Address = "127.0.0.1:50052" ) func main() { conn, err := grpc.Dial(Address, grpc.WithInsecure(), grpc.WithUnaryInterceptor(interceptor)) if err != nil { grpclog.Fatalln(err) } defer conn.Close() c := hello.NewHelloClient(conn) req := &hello.HelloRequest{Name: "gRPC"} res, err := c.SayHello(context.Background(), req) if err != nil { grpclog.Fatalln(err) } fmt.Println(res.Message) } func interceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { fmt.Println("client interceptor is running!") start := time.Now() err := invoker(ctx, method, req, reply, cc, opts...) fmt.Printf("method=%s req=%v resp=%v duration=%s error=%v\n", method, req, reply, time.Since(start), err) return err }
启动 hello/client/main.go:
export GODEBUG=x509ignoreCN=0 cd hello/client go run main.go
双向 grpc:
编写 proto/hello/hello.proto 文件:
syntax = "proto3"; package hello; option go_package = ".;hello"; service Hello { rpc SayHello(stream HelloRequest) returns (stream HelloResponse) {} } message HelloRequest { string name = 1; } message HelloResponse { string message = 1; }
编译生成 .pb.go 文件:
cd proto/hello protoc -I . --go_out=plugins=grpc:. ./hello.proto
编写 hello/server/main.go:
package main import ( "fmt" "google.golang.org/grpc" "google.golang.org/grpc/grpclog" "google.golang.org/grpc/metadata" "google.golang.org/grpc/reflection" "grpcTest/proto/hello" "io" "net" "os" "os/signal" "syscall" "time" ) const ( Address = "127.0.0.1:50052" ) type helloService struct { requests []*hello.HelloResponse } var HelloService = helloService{} func (h helloService) SayHello(conn hello.Hello_SayHelloServer) error { ctx := conn.Context() mdClient, ok := metadata.FromIncomingContext(ctx) if !ok { fmt.Println("get metadata error") } if tokens := mdClient.Get("token"); len(tokens) > 0 { fmt.Printf("metadata from client - token: %s\n", tokens[0]) } if names := mdClient.Get("name"); len(names) > 0 { fmt.Printf("metadata from client - name: %s\n", names[0]) } // 发送消息 go func() { for { mdServer := metadata.Pairs("token", "123") _ = conn.SendHeader(mdServer) request := &hello.HelloResponse{} request.Message = "hello from server!" if err := conn.Send(request); err != nil { fmt.Println("send err:", err) } time.Sleep(3 * time.Second) } }() // 接收消息 for { msg, err := conn.Recv() if err == io.EOF { fmt.Println("receive done!") return nil } if err != nil { fmt.Println("receive error:", err) return err } fmt.Println("msg from client:", msg.Name) } } func main() { quit := make(chan os.Signal, 1) // 退出信号 signal.Notify(quit, syscall.SIGKILL, syscall.SIGQUIT, syscall.SIGINT, syscall.SIGTERM) listen, err := net.Listen("tcp", Address) if err != nil { grpclog.Fatalf("Failed to listen: %v", err) } srv := grpc.NewServer(grpc.StreamInterceptor(interceptor)) go shutdown(quit, srv) hello.RegisterHelloServer(srv, HelloService) reflection.Register(srv) // 注册到grpcurl fmt.Println("Listen on " + Address) srv.Serve(listen) } func shutdown(quit chan os.Signal, srv *grpc.Server) { <-quit fmt.Println("server shutdown...") srv.Stop() os.Exit(0) } func interceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { fmt.Printf("[server before] info: %+v\n", info) sStream := &serverStream{ss} err := handler(srv, sStream) fmt.Printf("[server after] err: %v\n", err) return err } type serverStream struct { grpc.ServerStream } func (s *serverStream) RecvMsg(m interface{}) error { fmt.Printf("receive from client: (type=%T)\n", m.(*hello.HelloRequest)) return s.ServerStream.RecvMsg(m) }
启动 hello/server/main.go:
cd hello/server go run main.go
编写 hello/client/main.go:
package main import ( "fmt" "golang.org/x/net/context" "google.golang.org/grpc" "google.golang.org/grpc/grpclog" "google.golang.org/grpc/metadata" "grpcTest/proto/hello" "io" "os" "os/signal" "syscall" "time" ) const ( Address = "127.0.0.1:50052" ) var msgChan = make(chan string) func rpcClient() { quit := make(chan os.Signal, 1) signal.Notify(quit, syscall.SIGKILL, syscall.SIGQUIT, syscall.SIGINT, syscall.SIGTERM) srv, err := grpc.Dial(Address, grpc.WithInsecure(), grpc.WithStreamInterceptor(interceptor)) if err != nil { grpclog.Fatalln(err) } defer srv.Close() mdClient := metadata.Pairs( "token", "xxx", "name", "lee", ) ctx := metadata.NewOutgoingContext(context.Background(), mdClient) c := hello.NewHelloClient(srv) conn, err := c.SayHello(ctx) if err != nil { fmt.Println("create stream error:", err) } // 发送消息 go func() { for { select { case msg := <-msgChan: if err := conn.Send(&hello.HelloRequest{ Name: msg, }); err != nil { fmt.Println("send error:", err) } case <-quit: fmt.Println("control + c pressed!") err := conn.CloseSend() if err != nil { fmt.Println("close error:", err) os.Exit(0) } os.Exit(0) } } }() // 接收消息 for { mdServer, err := conn.Header() if tokens := mdServer.Get("token"); len(tokens) > 0 { fmt.Printf("metadata from server - token: %s\n", tokens[0]) } msg, err := conn.Recv() if err == io.EOF { fmt.Println("receive done!") break } if err != nil { fmt.Println("receive error:", err) } fmt.Println("msg from server:", msg.Message) } } func main() { go rpcClient() // 将要发送到服务端的消息传递到消息管道 for i := 0; i < 3; i++ { msgChan <- "hello from client!" time.Sleep(3 * time.Second) } select {} } func interceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { fmt.Printf("[client before] method: %+v, StreamDesc: %+v\n", method, desc) cs, err := streamer(ctx, desc, cc, method, opts...) fmt.Printf("[client after] method: %+v\n", method) cStream := &clientStream{cs} return cStream, err } type clientStream struct { grpc.ClientStream } func (s *clientStream) RecvMsg(m interface{}) error { fmt.Printf("receive from server: (type=%T)\n", m) return s.ClientStream.RecvMsg(m) }
启动 hello/client/main.go:
cd hello/client go run main.go
使用多个拦截器(原生不支持,需下载包
go-grpc-middleware
):安装包:
go get github.com/grpc-ecosystem/go-grpc-middleware
编写 hello/server/main.go:
package main import ( "fmt" grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" "golang.org/x/net/context" "google.golang.org/grpc" "google.golang.org/grpc/grpclog" "google.golang.org/grpc/reflection" "grpcTest/proto/hello" "net" ) const ( Address = "127.0.0.1:50052" ) type helloService struct{} var HelloService = helloService{} func (h helloService) SayHello(ctx context.Context, in *hello.HelloRequest) (*hello.HelloResponse, error) { resp := new(hello.HelloResponse) resp.Message = fmt.Sprintf("Hello %s.", in.Name) return resp, nil } func main() { listen, err := net.Listen("tcp", Address) if err != nil { grpclog.Fatalf("Failed to listen: %v", err) } s := grpc.NewServer(grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer( interceptor1, interceptor2, ))) hello.RegisterHelloServer(s, HelloService) reflection.Register(s) // 注册到grpcurl fmt.Println("Listen on " + Address) s.Serve(listen) } func interceptor1(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { fmt.Println("server interceptor1 is running!") // 继续处理请求 return handler(ctx, req) } func interceptor2(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { fmt.Println("server interceptor2 is running!") // 继续处理请求 return handler(ctx, req) }
启动 hello/server/main.go:
cd hello/server go run main.go
编写 hello/client/main.go:
package main import ( "fmt" grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" "golang.org/x/net/context" "google.golang.org/grpc" "google.golang.org/grpc/grpclog" "grpcTest/proto/hello" ) const ( Address = "127.0.0.1:50052" ) func main() { conn, err := grpc.Dial(Address, grpc.WithInsecure(), grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( interceptor1, interceptor2, ))) if err != nil { grpclog.Fatalln(err) } defer conn.Close() c := hello.NewHelloClient(conn) req := &hello.HelloRequest{Name: "gRPC"} res, err := c.SayHello(context.Background(), req) if err != nil { grpclog.Fatalln(err) } fmt.Println(res.Message) } func interceptor1(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { fmt.Println("client interceptor1 is running!") err := invoker(ctx, method, req, reply, cc, opts...) return err } func interceptor2(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { fmt.Println("client interceptor2 is running!") err := invoker(ctx, method, req, reply, cc, opts...) return err }
启动 hello/client/main.go:
cd hello/client go run main.go
grpcurl
用法:- 安装:
go install github.com/fullstorydev/grpcurl/cmd/grpcurl@latest
- 查看grpc服务列表:
grpcurl -plaintext localhost:50052 list
- 查看
service
中有哪些方法:grpcurl -plaintext localhost:50052 list hello.Hello
- 查看
service
中的方法实现细节:grpcurl -plaintext localhost:50052 describe hello.Hello
- 查看grpc参数详情:
grpcurl -plaintext localhost:50052 describe hello.HelloRequest
- 调用grpc方法:
grpcurl -plaintext -d '{"name":"lee"}' localhost:50052 hello.Hello/SayHello
- 安装:
文档更新时间: 2024-04-20 10:57 作者:lee