选举原理

通过阅读etcd提供的Campaign方法,发现选举过程基于了Etcd的几个特性,就可以完成自动选主:

  • MVCC:key存在版本属性,没被创建时版本号为0
  • CAS操作:结合MVCC,可以实现竞选逻辑,if(version == 0) set(key,value),通过原子操作,确保只有一台机器能set成功;
  • Lease租约:可以对key绑定一个租约,租约到期时没预约,这个key就会被回收;
  • Watch监听:监听key的变化事件,如果key被删除,则重新发起竞选。

流程梳理

在Sentinel选主时,核心代码逻辑如下:

首先定义一个了选举接口,可以针对不同的场景实现(在这里只介绍使用etcd方式进行leadership

interface

  • RunForEletion方法:进行Leader选举

  • Leader方法:返回Leader节点value值

  • Stop方法:停止处理

Sentinel结构体定义如下:

sentinel

  • election:选举接口
  • leader:bool类型,标识当前sentinel节点是否为leader
  • leadershipCount:leader任期,每次竞选成功后会+1

当Sentinel启动时,首先会启动一个goroutine进行Leader选举

leadership

进入electionLoop方法查看具体选举过程

hexin

可以看到,调用RunForElection方法去竞争Leader,成为Leader的Sentinel将leader和leadershipCount字段更新

进入RunForElection方法,看一下具体实现:

etcds

继续看campaign方法:

campaign

加上注释后的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// 使用 etcdv3 实现的分布式选举功能
func (e *etcdv3Election) campaign() {
// 在函数退出时关闭已声明的通道 e.electedCh 和 e.errCh
defer close(e.electedCh)
defer close(e.errCh)

for {
// 向通道 e.electedCh 发送 false,表示当前节点未当选
e.electedCh <- false

// 创建一个新的会话,会话的 TTL 设置为 e.ttl 所表示的秒数,同时使用 e.ctx 作为上下文
s, err := concurrency.NewSession(e.c, concurrency.WithTTL(int(e.ttl.Seconds())), concurrency.WithContext(e.ctx))
if err != nil {
// 如果发生错误,则向通道 e.errCh 发送错误,并从该方法返回
e.errCh <- err
return
}

// 创建一个新的选举对象 etcdElection,该对象使用已创建的会话 s,并在 etcd 中的指定路径 e.path 下执行选举
etcdElection := concurrency.NewElection(s, e.path)

// 选举该节点作为候选人,如果出现错误,则向通道 e.errCh 发送错误,并从该方法返回
if err = etcdElection.Campaign(e.ctx, e.candidateUID); err != nil {
e.errCh <- err
return
}

// 向通道 e.electedCh 发送 true,表示该节点已被选中
e.electedCh <- true

// 使用 select 语句等待上下文 e.ctx 完成或者会话 s 结束,如果会话 s 结束,则向通道 e.electedCh 发送 false,表示该节点未被选中
select {
case <-e.ctx.Done():
return
case <-s.Done():
e.electedCh <- false
}
}
}

Campaign方法是etcd提供的leadership核心方法,进去看下具体是怎么实现的(代码太多了就不贴图片了….)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// Campaign puts a value as eligible for the election on the prefix
// key.
// Multiple sessions can participate in the election for the
// same prefix, but only one can be the leader at a time.
//
// If the context is 'context.TODO()/context.Background()', the Campaign
// will continue to be blocked for other keys to be deleted, unless server
// returns a non-recoverable error (e.g. ErrCompacted).
// Otherwise, until the context is not cancelled or timed-out, Campaign will
// continue to be blocked until it becomes the leader.
func (e *Election) Campaign(ctx context.Context, val string) error {
s := e.session
client := e.session.Client()

k := fmt.Sprintf("%s%x", e.keyPrefix, s.Lease())
txn := client.Txn(ctx).If(v3.Compare(v3.CreateRevision(k), "=", 0))
txn = txn.Then(v3.OpPut(k, val, v3.WithLease(s.Lease())))
txn = txn.Else(v3.OpGet(k))
resp, err := txn.Commit()
if err != nil {
return err
}
e.leaderKey, e.leaderRev, e.leaderSession = k, resp.Header.Revision, s
if !resp.Succeeded {
kv := resp.Responses[0].GetResponseRange().Kvs[0]
e.leaderRev = kv.CreateRevision
if string(kv.Value) != val {
if err = e.Proclaim(ctx, val); err != nil {
e.Resign(ctx)
return err
}
}
}

_, err = waitDeletes(ctx, client, e.keyPrefix, e.leaderRev-1)
if err != nil {
// clean up in case of context cancel
select {
case <-ctx.Done():
e.Resign(client.Ctx())
default:
e.leaderSession = nil
}
return err
}
e.hdr = resp.Header

return nil
}

该方法首先创建一个 key,然后用事务来进行竞选操作。如果当前 key 的 revision 是 0,表示该 key 没有被其他会话占用,那么就将当前 session 设置为 leader,并将 val 写入该 key 的 value。如果该 key 的 revision 不是 0,那么就从 etcd 中获取该 key 的 value 和 revision,判断它是否是当前 session 的 leader。如果是,则继续保持当前 session 为 leader;否则,使用 Proclaim 函数尝试成为 leader,如果失败则使用 Resign 函数放弃竞选。

在函数执行完成后,如果当前 session 成为了 leader,它将会监视比当前 key revision 小的 key,等待这些 key 被删除。如果有一个或多个 key 被删除,那么这些 key 的删除操作将会返回。如果在等待期间 context 被取消,那么当前 session 将会放弃 leadership。

总之,该方法的作用是让多个会话竞选成为 leader,从而实现分布式系统中的 leader 选举