0%

Zookeeper 启动选举源码

前面我们知道了怎么搭建Zookeeper集群,只要涉及到集群,我们就有必要了解下其选举机制,Zookeeper 正好是通过JAVA来实现的,我们这就从源码来看看它的选举逻辑是怎么一回事的.

单机版本是不存在什么选举机制的,下面的跟源码的过程,建立在两个条件下:

  • 1 . Zookeeper 是集群环境.

  • 2 . ServerCnxnFactory 是 NettyServerCnxnFactory,而不是NIOServerCnxnFactory.

源码下载

ZK 源码

有需要的可以查看 feature/read_source_3.5.8分支.

本地启动

我们在使用 Zookeeper 的时候有使用过zkServer.sh这个命令,因此,我们可以打开bin/zkServer.sh文件, ZK 启动的入口就在这个配置文件中.

1
ZOOMAIN="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=$JMXPORT -Dcom.sun.management.jmxremote.authenticate=$JMXAUTH -Dcom.sun.management.jmxremote.ssl=$JMXSSL -Dzookeeper.jmx.log4j.disable=$JMXLOG4J org.apache.zookeeper.server.quorum.QuorumPeerMain"

QuorumPeerMain类就是 ZK 服务启动的入口.

启动ZK, 不仅需要找到入口,还需要配置类, 配置类cnf/zoo_sample.cfg.

这里我根据自己的需要单独弄了一个 cnf/zoo1.cfg的配置文件.

配置启动QuorumPeerMain

如此配置之后,就可以本地启动 ZK 服务端了.

同样的方法,我们可以配置 ZK 客户端.

配置启动ZooKeeperMain

源码解读

大概流程

zk 启动选举

zk 启动选举选举机制

源码验证

ZK 服务器启动的入口是 QuorumPeerMain.main() ;

解析配置

服务器启动,就会去读取配置.这里不对各个配置做解释,后续代码中如果出现了配置,再回来这边查找.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public static void main(String[] args) {
QuorumPeerMain main = new QuorumPeerMain();
try {
main.initializeAndRun(args);
...
}
}

protected void initializeAndRun(String[] args)
throws ConfigException, IOException, AdminServerException {
QuorumPeerConfig config = new QuorumPeerConfig();
// 有传参数
if (args.length == 1) {
/**
* 解析cfg 类到config实例中
*/
config.parse(args[0]);
}

/**
* 开启定时清理任务
*/
// Start and schedule the the purge task
DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
.getDataDir(), config.getDataLogDir(), config
.getSnapRetainCount(), config.getPurgeInterval());
purgeMgr.start();

/**
* 有配置 且是分布式
*/
if (args.length == 1 && config.isDistributed()) {
runFromConfig(config);
} else {
LOG.warn("Either no config or no quorum defined in config, running "
+ " in standalone mode");
// there is only server in the quorum -- run as standalone
/**
* 如果是没有参数,走单机模式
*/
ZooKeeperServerMain.main(args);
}
}

启动分布式逻辑

runFromConfig(config) 只有zk在分布式情况下才会执行该逻辑.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
public void runFromConfig(QuorumPeerConfig config)
throws IOException, AdminServerException {


/**
* 下面正式开启
*/
LOG.info("Starting quorum peer");
try {
ServerCnxnFactory cnxnFactory = null;
ServerCnxnFactory secureCnxnFactory = null;

/**
* config.getClientPortAddress() 是zk服务器地址和端口信息
*/
if (config.getClientPortAddress() != null) {
/**
* 根据 zookeeper.serverCnxnFactory 设置 cnxnFactory 默认是nio 官方推荐 NettyServerCnxnFactory
*/
cnxnFactory = ServerCnxnFactory.createFactory();
/**
* 配置cnxnFactory, 这里我们以 NettyServerCnxnFactory为例 并绑定端口
*/

cnxnFactory.configure(config.getClientPortAddress(),
config.getMaxClientCnxns(),
false);
}

if (config.getSecureClientPortAddress() != null) {
/**
* 根据 zookeeper.serverCnxnFactory 设置 secureCnxnFactory 默认是nio 官方推荐 NettyServerCnxnFactory
*/
secureCnxnFactory = ServerCnxnFactory.createFactory();
// 配置cnxnFactory
secureCnxnFactory.configure(config.getSecureClientPortAddress(),
config.getMaxClientCnxns(),
true);
}

quorumPeer = getQuorumPeer();
// 快照 和 log 文件 (没有则创建)
quorumPeer.setTxnFactory(new FileTxnSnapLog(
config.getDataLogDir(),
config.getDataDir()));
...
//quorumPeer.setQuorumPeers(config.getAllMembers());
/**
* 设计选举类型 默认是 3
*/
quorumPeer.setElectionType(config.getElectionAlg());
...
/**
* 设置最终的裁定机制
*/
quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
if (config.getLastSeenQuorumVerifier() != null) {
quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false);
}
...
quorumPeer.initialize();

quorumPeer.start();
quorumPeer.join();
} catch (InterruptedException e) {
// warn, but generally this is ok
LOG.warn("Quorum Peer interrupted", e);
}
}

集群环境启动,主要是创建了一个quorumPeer,并执行了其start();

quorumPeer.start() 内部执行的就是 zk 集群环境启动的所有逻辑.

集群环境启动quorumPeer.start()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public synchronized void start() {
if (!getView().containsKey(myid)) {
throw new RuntimeException("My id " + myid + " not in the peer list");
}
/**
* 加载快照数据, 读取快照信息记录最新的记录行号 记录在QuorumPeer实例中
*/
loadDataBase();
// TODO: 2020/12/1 NIOServerCnxnFactory 中的 start
/**
* 开启服务 默认是 NIOServerCnxnFactory ,也可以使用NettyServerCnxnFactory
*
* 内部调用的是 cnxnFactory 和 secureCnxnFactory start 方法
*/
startServerCnxnFactory();
try {
// TODO: 2020/12/1 目前没看到什么特别的
/**
* QuorumPeer 构造函数中初始化的
* JettyAdminServer ( zookeeper.admin.enableServer ="ture")
* DummyAdminServer
*/
adminServer.start();
} catch (AdminServerException e) {
LOG.warn("Problem starting AdminServer", e);
System.out.println(e);
}
/**
* 开始leader选举
*/
startLeaderElection();

/**
* 启动 线程 执行内部的run() 方法
*/
super.start();
}

quorumPeer.start() 内部主要做了如下5个事.

  • 加载快照

  • 启动之前设置的ServerCnxnFactory

  • adminServer 启动(JettyAdminServer/DummyAdminServer ): 这个暂时不是很清楚是干嘛的.

  • 开启选举

  • 监听选举情况,随时结束选举

这里我们重点关注后面两个方法.

创建选举策略

因为 quorumPeer.setElectionType(config.getElectionAlg()); 默认设置的是3 , 这里走的逻辑是

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* 创建QuorumCnxManager 内部有监听 Listener (是一个ServerSocket)
*/
QuorumCnxManager qcm = createCnxnManager();
QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm);
if (oldQcm != null) {
LOG.warn("Clobbering already-set QuorumCnxManager (restarting leader election?)");
oldQcm.halt();
}
QuorumCnxManager.Listener listener = qcm.listener;
if (listener != null) {
/**
* 子线程 执行 QuorumCnxManager.Listener 中的run()方法
*/
listener.start();
/**
* 快速选举
*/
FastLeaderElection fle = new FastLeaderElection(this, qcm);
/**
* FastLeaderElection 不是一个线程 其内部 Messenger 维护了 两个线程 (wsThread WorkerSender wrThread = WorkerReceiver)
*/
fle.start();
le = fle;
} else {
LOG.error("Null listener when initializing cnx manager");
}
break;

从这里可以看出, 我们这边用的选举策略是FastLeaderElection 策略.

QuorumCnxManager.Listener

QuorumCnxManager.Listener 内部维护的是一个 ServerSocket. 专门负责zk服务之间的通信.

QuorumCnxManager.Listener.run() 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
@Override
public void run() {
// 重试次数
int numRetries = 0;
InetSocketAddress addr;
Socket client = null;
Exception exitException = null;
// portBindMaxRetry 默认值 3
while ((!shutdown) && (portBindMaxRetry == 0 || numRetries < portBindMaxRetry)) {
LOG.debug("Listener thread started, myId: {}", self.getId());
try {
if (self.shouldUsePortUnification()) {
LOG.info("Creating TLS-enabled quorum server socket");
ss = new UnifiedServerSocket(self.getX509Util(), true);
} else if (self.isSslQuorum()) {
LOG.info("Creating TLS-only quorum server socket");
ss = new UnifiedServerSocket(self.getX509Util(), false);
} else {
/**
* 创建Socket
*/
ss = new ServerSocket();
}

ss.setReuseAddress(true);

if (self.getQuorumListenOnAllIPs()) {
int port = self.getElectionAddress().getPort();
addr = new InetSocketAddress(port);
} else {
// Resolve hostname for this server in case the
// underlying ip address has changed.
self.recreateSocketAddresses(self.getId());
addr = self.getElectionAddress();
}
LOG.info("{} is accepting connections now, my election bind port: {}", QuorumCnxManager.this.mySid, addr.toString());
setName(addr.toString());
ss.bind(addr);
/**
* 循环接收 其他zk发来的消息
*/
while (!shutdown) {
try {
client = ss.accept();
setSockOpts(client);
LOG.info("Received connection request from {}", client.getRemoteSocketAddress());

/**
* 接收和处理消息
*/
if (quorumSaslAuthEnabled) {
receiveConnectionAsync(client);
} else {
receiveConnection(client);
}
numRetries = 0;
} catch (SocketTimeoutException e) {
LOG.warn("The socket is listening for the election accepted "
+ "and it timed out unexpectedly, but will retry."
+ "see ZOOKEEPER-2836");
}
}
} catch (IOException e) {
if (shutdown) {
break;
}
LOG.error("Exception while listening", e);
exitException = e;
numRetries++;
try {
ss.close();
Thread.sleep(1000);
} catch (IOException ie) {
LOG.error("Error closing server socket", ie);
} catch (InterruptedException ie) {
LOG.error("Interrupted while sleeping. " +
"Ignoring exception", ie);
}
closeSocket(client);
}
}
LOG.info("Leaving listener");
...
}



public void receiveConnection(final Socket sock) {
DataInputStream din = null;
try {
din = new DataInputStream(
new BufferedInputStream(sock.getInputStream()));

LOG.debug("Sync handling of connection request received from: {}", sock.getRemoteSocketAddress());
handleConnection(sock, din);
} catch (IOException e) {
LOG.error("Exception handling connection, addr: {}, closing server connection",
sock.getRemoteSocketAddress());
LOG.debug("Exception details: ", e);
closeSocket(sock);
}
}



private void handleConnection(Socket sock, DataInputStream din)
throws IOException {
Long sid = null, protocolVersion = null;
InetSocketAddress electionAddr = null;

try {
protocolVersion = din.readLong();
/**
* server id 而不是协议id
* 而是 myid
*/
if (protocolVersion >= 0) { // this is a server id and not a protocol version
sid = protocolVersion;
} else {
try {
InitialMessage init = InitialMessage.parse(protocolVersion, din);
sid = init.sid;
electionAddr = init.electionAddr;
} catch (InitialMessage.InitialMessageException ex) {
LOG.error("Initial message parsing error!", ex);
closeSocket(sock);
return;
}
}

if (sid == QuorumPeer.OBSERVER_ID) {
/*
* Choose identifier at random. We need a value to identify
* the connection.
*/
sid = observerCounter.getAndDecrement();
LOG.info("Setting arbitrary identifier to observer: " + sid);
}
} catch (IOException e) {
LOG.warn("Exception reading or writing challenge: {}", e);
closeSocket(sock);
return;
}

// do authenticating learner
authServer.authenticate(sock, din);
//If wins the challenge, then close the new connection.
/**
* 比较 myid, 只保留 大 --指向--> 小 的连接
*/
if (sid < self.getId()) {
/*
* This replica might still believe that the connection to sid is
* up, so we have to shut down the workers before trying to open a
* new connection.
*/
SendWorker sw = senderWorkerMap.get(sid);
if (sw != null) {
sw.finish();
}

/*
* Now we start a new connection
*/
LOG.debug("Create new connection to server: {}", sid);
// 关闭当前竞争失败的其他zk服务
closeSocket(sock);

/**
* connectOne 连接 来竞争的 sid electionAddr 对应的zk服务 通知其竞争失败,下次竞争用胜利者的myid
*/
if (electionAddr != null) {
connectOne(sid, electionAddr);
} else {
connectOne(sid);
}
} else if (sid == self.getId()) {
// we saw this case in ZOOKEEPER-2164
LOG.warn("We got a connection request from a server with our own ID. "
+ "This should be either a configuration error, or a bug.");
} else { // Otherwise start worker threads to receive data.
/**
* 如果对方 sid 大于 自己的sid 则创建两个线程 监听
*
* SendWorker 和 RecvWorker 关联
* RecvWorker 内部持有 SendWorker
*/
SendWorker sw = new SendWorker(sock, sid);
RecvWorker rw = new RecvWorker(sock, din, sid, sw);
sw.setRecv(rw);

SendWorker vsw = senderWorkerMap.get(sid);

if (vsw != null) {
vsw.finish();
}

// 将 sendwork 保存引用
senderWorkerMap.put(sid, sw);
// 创建sid 对应的 blockingQueue
queueSendMap.putIfAbsent(sid,
new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY));

/**
* 发送选票
*
* 其实就是发送 sid 对应的BlockingQueue 中数据
* queueSendMap 中(sid,BlockingQueue);
*/
sw.start();
/**
* 接收选票 并保存到 recvQueue 队列中
*/
rw.start();
}
}

这里贴出来的方法很长,我们可以大概总结下 QuorumCnxManager.Listener 内部做了些什么.

我们大概可以看出.QuorumCnxManager.Listener 内部维护了一个 SendWorkerRecvWorker 专门负责zk服务之间的通信, SendWorker负责发送,RecvWorker 负责接收消息.

当zk 服务接收到其他zk服务器发来的消息时.

  • 如果对方的sid (service id) 如果比自己的小, 关闭对方和自己建立的socket连接,自己主动和该服务建立socket连接. 这种情况其实就是下面的这种情况,只是服务器发送了变化.

  • 如果对方的sid 大于自己的 sid. 就为这个zk 服务创建 SendWorker,RecvWorker专门负责接收该服务器的消息.

SendWorker发送消息

SendWorker 是 Thread 的子类. 内部循环从对应的sid 队列(blockingqueue)中取消息, 如果有消息就发送.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52

public void run() {
threadCnt.incrementAndGet();
try {

ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
if (bq == null || isSendQueueEmpty(bq)) {
ByteBuffer b = lastMessageSent.get(sid);
if (b != null) {
LOG.debug("Attempting to send lastMessage to sid=" + sid);
send(b);
}
}
} catch (IOException e) {
LOG.error("Failed to send last message. Shutting down thread.", e);
this.finish();
}
LOG.debug("SendWorker thread started towards {}. myId: {}", sid, QuorumCnxManager.this.mySid);
try {
while (running && !shutdown && sock != null) {

ByteBuffer b = null;
try {
ArrayBlockingQueue<ByteBuffer> bq = queueSendMap
.get(sid);
if (bq != null) {
b = pollSendQueue(bq, 1000, TimeUnit.MILLISECONDS);
} else {
LOG.error("No queue of incoming messages for " +
"server " + sid);
break;
}

if(b != null){
lastMessageSent.put(sid, b);
send(b);
}
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting for message on queue",
e);
}
}
} catch (Exception e) {
LOG.warn("Exception when using channel: for id " + sid
+ " my id = " + QuorumCnxManager.this.mySid
+ " error = " + e);
}
this.finish();
LOG.warn("Send worker leaving thread " + " id " + sid + " my id = " + self.getId());
}
}

RecvWorker接收消息

RecvWorker 是 Thread 的子类. 只要对应的sid 的socket连接没有中断,就一直循环尝试从 DataInputStream 中获取sid 服务器发送过来的消息. 取得了消息就存放在recvQueue中.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public void run() {
threadCnt.incrementAndGet();
try {
LOG.debug("RecvWorker thread towards {} started. myId: {}", sid, QuorumCnxManager.this.mySid);
while (running && !shutdown && sock != null) {
/**
* Reads the first int to determine the length of the
* message
*/
int length = din.readInt();
if (length <= 0 || length > PACKETMAXSIZE) {
throw new IOException(
"Received packet with invalid packet: "
+ length);
}
/**
* Allocates a new ByteBuffer to receive the message
*/
byte[] msgArray = new byte[length];
din.readFully(msgArray, 0, length);
ByteBuffer message = ByteBuffer.wrap(msgArray);
addToRecvQueue(new Message(message.duplicate(), sid));
}
} catch (Exception e) {
LOG.warn("Connection broken for id " + sid + ", my id = "
+ QuorumCnxManager.this.mySid + ", error = " , e);
} finally {
LOG.warn("Interrupting SendWorker thread from RecvWorker. sid: {}. myId: {}", sid, QuorumCnxManager.this.mySid);
sw.finish();
closeSocket(sock);
}
}
}
FastLeaderElection.start

FastLeaderElection 内部维护了一组WorkerSender ,WorkerReceiver. 专门负责处理本服务消息 发送和接收.

FastLeaderElection.WorkerSender

WorkerSender 负责的将需要发送的消息从sendqueue 逐个取出发送出去.

如果是sid是自己,说明需要发送给自己,就直接将消息条件到本服务的recvQueue接受队列中.

如果不是发送给自己,则将消息添加到发送队列,即每个sid对应的发送队列(BlockingQueue),后续交给SendWorker 将消息发送给指定sid对应的ZK服务.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public void run() {
while (!stop) {
try {
ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);
if(m == null) continue;

process(m);
} catch (InterruptedException e) {
break;
}
}
LOG.info("WorkerSender is down");
}

void process(ToSend m) {
ByteBuffer requestBuffer = buildMsg(m.state.ordinal(),
m.leader,
m.zxid,
m.electionEpoch,
m.peerEpoch,
m.configData);

manager.toSend(m.sid, requestBuffer);

}

public void toSend(Long sid, ByteBuffer b) {
/*
* If sending message to myself, then simply enqueue it (loopback).
*/
if (this.mySid == sid) {
b.position(0);
addToRecvQueue(new Message(b.duplicate(), sid));
/*
* Otherwise send to the corresponding thread to send.
*/
} else {
/*
* Start a new connection if doesn't have one already.
*/
ArrayBlockingQueue<ByteBuffer> bq = new ArrayBlockingQueue<ByteBuffer>(
SEND_CAPACITY);
ArrayBlockingQueue<ByteBuffer> oldq = queueSendMap.putIfAbsent(sid, bq);
if (oldq != null) {
addToSendQueue(oldq, b);
} else {
addToSendQueue(bq, b);
}
//连接 发送 sid 和 addressip
connectOne(sid);

}
}

FastLeaderElection.WorkerReceiver

监听选举 super.start()

zk 启动选举选举机制