前言
我们的管控项目是按地域部署,为了保证服务高可用,每个管控组件都部署2份,但是同时只有1个(Leader
)对外提供服务,当Leader
服务挂掉时,我们需要从Follower
服务中重新选举一个服务来当Leader
,复杂的方式是通过Raft
协议去协商,简单点,可以通过分布式锁的思路来做。
而我们的项目就是通过etcd
的分布式锁来实现的。在服务启动时,会尝试去进行leader
竞争,成为leader
的才能对外提供服务。
我在本地实现了个最基础版的leader
选举代码
核心思想
- 所有的
Follower
服务去竞争同一把锁,并给这个锁设置一个过期时间
- 只会有一个
Follower
服务取到锁,这把锁的值就为它的标识,他就变成了Leader
服务
- 其他
Follower
服务竞争失败后,去获取锁得到的当前的Leader
服务标识,与之通信
Leader
服务需要在锁过期之前不断的续期,证明自己是健康的
- 所有
Follower
服务监控这把锁是否还被Leader
服务持有,如果没有,就跳到了第1步
代码演示
该段代码实现了基于etcd
的分布式leader
选举算法,可以协调多个客户端对leader
的竞争,并在leader
节点上周期性地更新key-value
以维持其leader
地位。
当然etcd
提供的有选举SDK
,我自己简单实现了下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
| package main
import ( "context" "fmt" "github.com/coreos/etcd/clientv3" "go.etcd.io/etcd/clientv3/concurrency" "log" "time" )
const ( leaderKey = "/leader" )
func main() { etcdClient, err := clientv3.New(clientv3.Config{ Endpoints: []string{"http://localhost:12379"}, DialTimeout: time.Second * 5, }) if err != nil { log.Fatal(err) } defer etcdClient.Close()
session, err := concurrency.NewSession(etcdClient) if err != nil { log.Fatal(err) } defer session.Close()
mutex := concurrency.NewMutex(session, leaderKey)
for { if err := mutex.Lock(context.Background()); err != nil { fmt.Println("failed to acquire lock:", err) time.Sleep(time.Second) continue }
fmt.Println("success to acquire lock, I'm the leader, my IP address is xxx.xxx.xxx.xxx")
ticker := time.NewTicker(time.Second * 3) for { select { case <-session.Done(): mutex.Unlock(context.Background()) return case <-ticker.C: if _, err := etcdClient.Put(context.Background(), leaderKey, "xxx.xxx.xxx.xxx", clientv3.WithLease(session.Lease())); err != nil { fmt.Println("failed to update leader key:", err) mutex.Unlock(context.Background()) return } fmt.Println("续约成功...") } } } }
|