/** * 开启定时清理任务 */ // Start and schedule the the purge task DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config .getDataDir(), config.getDataLogDir(), config .getSnapRetainCount(), config.getPurgeInterval()); purgeMgr.start();
/** * 有配置 且是分布式 */ if (args.length == 1 && config.isDistributed()) { runFromConfig(config); } else { LOG.warn("Either no config or no quorum defined in config, running " + " in standalone mode"); // there is only server in the quorum -- run as standalone /** * 如果是没有参数,走单机模式 */ ZooKeeperServerMain.main(args); } }
quorumPeer.start(); quorumPeer.join(); } catch (InterruptedException e) { // warn, but generally this is ok LOG.warn("Quorum Peer interrupted", e); } }
@Override public void run() { // 重试次数 int numRetries = 0; InetSocketAddress addr; Socket client = null; Exception exitException = null; // portBindMaxRetry 默认值 3 while ((!shutdown) && (portBindMaxRetry == 0 || numRetries < portBindMaxRetry)) { LOG.debug("Listener thread started, myId: {}", self.getId()); try { if (self.shouldUsePortUnification()) { LOG.info("Creating TLS-enabled quorum server socket"); ss = new UnifiedServerSocket(self.getX509Util(), true); } else if (self.isSslQuorum()) { LOG.info("Creating TLS-only quorum server socket"); ss = new UnifiedServerSocket(self.getX509Util(), false); } else { /** * 创建Socket */ ss = new ServerSocket(); }
ss.setReuseAddress(true);
if (self.getQuorumListenOnAllIPs()) { int port = self.getElectionAddress().getPort(); addr = new InetSocketAddress(port); } else { // Resolve hostname for this server in case the // underlying ip address has changed. self.recreateSocketAddresses(self.getId()); addr = self.getElectionAddress(); } LOG.info("{} is accepting connections now, my election bind port: {}", QuorumCnxManager.this.mySid, addr.toString()); setName(addr.toString()); ss.bind(addr); /** * 循环接收 其他zk发来的消息 */ while (!shutdown) { try { client = ss.accept(); setSockOpts(client); LOG.info("Received connection request from {}", client.getRemoteSocketAddress()); /** * 接收和处理消息 */ if (quorumSaslAuthEnabled) { receiveConnectionAsync(client); } else { receiveConnection(client); } numRetries = 0; } catch (SocketTimeoutException e) { LOG.warn("The socket is listening for the election accepted " + "and it timed out unexpectedly, but will retry." + "see ZOOKEEPER-2836"); } } } catch (IOException e) { if (shutdown) { break; } LOG.error("Exception while listening", e); exitException = e; numRetries++; try { ss.close(); Thread.sleep(1000); } catch (IOException ie) { LOG.error("Error closing server socket", ie); } catch (InterruptedException ie) { LOG.error("Interrupted while sleeping. " + "Ignoring exception", ie); } closeSocket(client); } } LOG.info("Leaving listener"); ... }
public void receiveConnection(final Socket sock) { DataInputStream din = null; try { din = new DataInputStream( new BufferedInputStream(sock.getInputStream()));
try { protocolVersion = din.readLong(); /** * server id 而不是协议id * 而是 myid */ if (protocolVersion >= 0) { // this is a server id and not a protocol version sid = protocolVersion; } else { try { InitialMessage init = InitialMessage.parse(protocolVersion, din); sid = init.sid; electionAddr = init.electionAddr; } catch (InitialMessage.InitialMessageException ex) { LOG.error("Initial message parsing error!", ex); closeSocket(sock); return; } }
if (sid == QuorumPeer.OBSERVER_ID) { /* * Choose identifier at random. We need a value to identify * the connection. */ sid = observerCounter.getAndDecrement(); LOG.info("Setting arbitrary identifier to observer: " + sid); } } catch (IOException e) { LOG.warn("Exception reading or writing challenge: {}", e); closeSocket(sock); return; }
// do authenticating learner authServer.authenticate(sock, din); //If wins the challenge, then close the new connection. /** * 比较 myid, 只保留 大 --指向--> 小 的连接 */ if (sid < self.getId()) { /* * This replica might still believe that the connection to sid is * up, so we have to shut down the workers before trying to open a * new connection. */ SendWorker sw = senderWorkerMap.get(sid); if (sw != null) { sw.finish(); }
/* * Now we start a new connection */ LOG.debug("Create new connection to server: {}", sid); // 关闭当前竞争失败的其他zk服务 closeSocket(sock);
/** * connectOne 连接 来竞争的 sid electionAddr 对应的zk服务 通知其竞争失败,下次竞争用胜利者的myid */ if (electionAddr != null) { connectOne(sid, electionAddr); } else { connectOne(sid); } } else if (sid == self.getId()) { // we saw this case in ZOOKEEPER-2164 LOG.warn("We got a connection request from a server with our own ID. " + "This should be either a configuration error, or a bug."); } else { // Otherwise start worker threads to receive data. /** * 如果对方 sid 大于 自己的sid 则创建两个线程 监听 * * SendWorker 和 RecvWorker 关联 * RecvWorker 内部持有 SendWorker */ SendWorker sw = new SendWorker(sock, sid); RecvWorker rw = new RecvWorker(sock, din, sid, sw); sw.setRecv(rw);
public void toSend(Long sid, ByteBuffer b) { /* * If sending message to myself, then simply enqueue it (loopback). */ if (this.mySid == sid) { b.position(0); addToRecvQueue(new Message(b.duplicate(), sid)); /* * Otherwise send to the corresponding thread to send. */ } else { /* * Start a new connection if doesn't have one already. */ ArrayBlockingQueue<ByteBuffer> bq = new ArrayBlockingQueue<ByteBuffer>( SEND_CAPACITY); ArrayBlockingQueue<ByteBuffer> oldq = queueSendMap.putIfAbsent(sid, bq); if (oldq != null) { addToSendQueue(oldq, b); } else { addToSendQueue(bq, b); } //连接 发送 sid 和 addressip connectOne(sid);