下载地址

go.etcd.io/etcd/client/v3

基本用法

  1. PUT/GET 操作

     package main
    
     import (
         "context"
         "fmt"
         "go.etcd.io/etcd/client/v3"
         "log"
         "time"
     )
    
     func main() {
         client, err := clientv3.New(clientv3.Config{
             Endpoints:   []string{"10.0.0.12:2379", "10.0.0.13:2379", "10.0.0.14:2379"},
             DialTimeout: 5 * time.Second,
         })
         if err != nil {
             log.Fatalln(err)
         }
         ctx := context.Background()
         // PUT
         key := "foo"
         value := "bar"
         _, err = client.Put(ctx, key, value)
         if err != nil {
             log.Printf("etcd put error,%v\n", err)
             return
         }
         // GET
         //getResponse, err := client.Get(ctx, key)
         getResponse, err := client.Get(ctx, key, clientv3.WithPrefix()) // 以前缀获取
         if err != nil {
             log.Printf("etcd GET error,%v\n", err)
             return
         }
         for _, kv := range getResponse.Kvs {
             fmt.Printf("%s=%s\n", kv.Key, kv.Value)
         }
     }
  2. WATCH 监听操作

     package main
    
     import (
         "context"
         "fmt"
         "go.etcd.io/etcd/client/v3"
         "log"
         "time"
     )
    
     func main() {
         client, err := clientv3.New(clientv3.Config{
             Endpoints:   []string{"localhost:2379"},
             DialTimeout: 5 * time.Second,
         })
         if err != nil {
             log.Fatalln(err)
         }
         key := "foo"
         // 监听变化
         go watcher(client, key)
         select {}
     }
     func watcher(client *clientv3.Client, key string) {
         // 监听这个chan
         //watchChan := client.Watch(context.Background(), key)
         watchChan := client.Watch(context.Background(), key, clientv3.WithPrefix()) // 以前缀监听
         for watchResponse := range watchChan {
             for _, event := range watchResponse.Events {
                 fmt.Printf("Type:%s,Key:%s,Value:%s\n", event.Type, event.Kv.Key, event.Kv.Value)
                 switch event.Type.String() {
                 case "PUT": // 新增 key
                     // todo something...
                 case "DELETE": // 删除 key 或 key 过期
                     // todo something...
                 }
             }
         }
     }
  3. 租约(有效期)

     package main
    
     import (
         "context"
         "go.etcd.io/etcd/client/v3"
         "log"
         "time"
     )
    
     func main() {
         client, err := clientv3.New(clientv3.Config{
             Endpoints:   []string{"localhost:2379"},
             DialTimeout: 5 * time.Second,
         })
         if err != nil {
             log.Fatalln(err)
         }
         key := "foo"
         value := "bar"
         ctx := context.Background()
         // 获取一个租约 有效期为5秒
         leaseGrant, err := client.Grant(ctx, 5)
         if err != nil {
             log.Printf("put error %v", err)
             return
         }
         // PUT 租约期限为5秒
         _, err = client.Put(ctx, key, value, clientv3.WithLease(leaseGrant.ID))
         if err != nil {
             log.Printf("put error %v", err)
             return
         }
     }
  4. KeepAlive 续租

     package main
    
     import (
         "context"
         "fmt"
         "go.etcd.io/etcd/client/v3"
         "time"
     )
    
     func main() {
         client, err := clientv3.New(clientv3.Config{
             Endpoints:   []string{"localhost:2379"},
             DialTimeout: 5 * time.Second,
         })
         if err != nil {
             panic(err)
         }
         key := "foo"
         value := "bar"
         ctx := context.Background()
         // 获取一个租约 有效期为5秒
         leaseGrant, err := client.Grant(ctx, 5)
         if err != nil {
             fmt.Printf("grant error %v", err)
             return
         }
         // PUT 租约期限为5秒
         _, err = client.Put(ctx, key, value, clientv3.WithLease(leaseGrant.ID))
         if err != nil {
             fmt.Printf("put error %v", err)
             return
         }
         for {
             // 续租(每隔 3 秒续一次)
             keepaliveResponse, err := client.KeepAliveOnce(ctx, leaseGrant.ID)
             if err != nil {
                 fmt.Printf("KeepAlive error %v", err)
                 return
             }
             fmt.Println(keepaliveResponse.TTL)
             time.Sleep(3 * time.Second)
         }
     }
  5. 使用事务实现分布式锁:

     package main
    
     import (
         "context"
         "fmt"
         clientv3 "go.etcd.io/etcd/client/v3"
         "log"
         "sync"
         "time"
     )
    
     func doSomethingWithLock(wg *sync.WaitGroup) {
         defer wg.Done()
    
         lockKey := "lock"
         client, err := clientv3.New(clientv3.Config{
             Endpoints:   []string{"localhost:2379"},
             DialTimeout: 5 * time.Second,
         })
         if err != nil {
             log.Fatalln(err)
         }
         // 初始化锁
         lease := clientv3.NewLease(client)
         leaseGrant, err := lease.Grant(context.TODO(), 1)
         if err != nil {
             panic(err)
         }
         leaseId := leaseGrant.ID
         ctx := context.Background()
         // 释放锁
         defer lease.Revoke(context.TODO(), leaseId)
         _, err = lease.KeepAlive(ctx, leaseId)
         if err != nil {
             panic(err)
         }
         kv := clientv3.NewKV(client)
         // 创建事务
         tx := kv.Txn(context.TODO())
         // 获取锁:
         // if--如果不存在key, then--则设置它, else--否则抢锁失败
         tx.If(clientv3.Compare(clientv3.CreateRevision(lockKey), "=", 0)).
             Then(clientv3.OpPut(lockKey, "xxx", clientv3.WithLease(leaseId))).
             Else()
         // 提交事务
         txResp, err := tx.Commit()
         if err != nil {
             panic(err)
         }
         if !txResp.Succeeded {
             fmt.Println("锁被占用")
             return
         }
         // 执行业务逻辑
         fmt.Println("成功获得锁")
     }
    
     func main() {
         wg := sync.WaitGroup{}
         for i := 0; i < 1000; i++ {
             wg.Add(1)
             go doSomethingWithLock(&wg)
         }
         wg.Wait()
     }
文档更新时间: 2024-04-20 10:57   作者:lee