前言

我们的管控项目是按地域部署,为了保证服务高可用,每个管控组件都部署2份,但是同时只有1个(Leader)对外提供服务,当Leader服务挂掉时,我们需要从Follower服务中重新选举一个服务来当Leader,复杂的方式是通过Raft协议去协商,简单点,可以通过分布式锁的思路来做。

而我们的项目就是通过etcd的分布式锁来实现的。在服务启动时,会尝试去进行leader竞争,成为leader的才能对外提供服务。

我在本地实现了个最基础版的leader选举代码

核心思想

  1. 所有的Follower服务去竞争同一把锁,并给这个锁设置一个过期时间
  2. 只会有一个Follower服务取到锁,这把锁的值就为它的标识,他就变成了Leader服务
  3. 其他Follower服务竞争失败后,去获取锁得到的当前的Leader服务标识,与之通信
  4. Leader服务需要在锁过期之前不断的续期,证明自己是健康的
  5. 所有Follower服务监控这把锁是否还被Leader服务持有,如果没有,就跳到了第1步

代码演示

该段代码实现了基于etcd的分布式leader选举算法,可以协调多个客户端对leader的竞争,并在leader节点上周期性地更新key-value以维持其leader地位。

当然etcd提供的有选举SDK,我自己简单实现了下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package main

import (
"context"
"fmt"
"github.com/coreos/etcd/clientv3"
"go.etcd.io/etcd/clientv3/concurrency"
"log"
"time"
)

const (
leaderKey = "/leader"
)

func main() {
// 创建etcd客户端
etcdClient, err := clientv3.New(clientv3.Config{
// etcd集群的地址
Endpoints: []string{"http://localhost:12379"},
// 连接etcd的超时时间
DialTimeout: time.Second * 5,
})
if err != nil {
log.Fatal(err)
}
defer etcdClient.Close()

// 新建会话
session, err := concurrency.NewSession(etcdClient)
if err != nil {
log.Fatal(err)
}
defer session.Close()

mutex := concurrency.NewMutex(session, leaderKey)

// 竞争leader的key
for {
// 尝试获得锁
if err := mutex.Lock(context.Background()); err != nil {
fmt.Println("failed to acquire lock:", err)
time.Sleep(time.Second)
continue
}

// 成为leader,打印当前服务的IP地址
fmt.Println("success to acquire lock, I'm the leader, my IP address is xxx.xxx.xxx.xxx")

// 周期性地续约leader的key
ticker := time.NewTicker(time.Second * 3)
for {
select {
case <-session.Done():
// session已经过期,释放锁
mutex.Unlock(context.Background())
return
case <-ticker.C:
// 续约leader的key
if _, err := etcdClient.Put(context.Background(), leaderKey, "xxx.xxx.xxx.xxx", clientv3.WithLease(session.Lease())); err != nil {
fmt.Println("failed to update leader key:", err)
mutex.Unlock(context.Background())
return
}
fmt.Println("续约成功...")
}
}
}
}