0%

ZooKeeper客户端使用与集群搭建

我们已经知道怎么通过终端的Client命令连接到ZooKeeper服务了,在代码中我们又改怎么操作呢,还有ZooKeeper集群是怎么搭建的呢?

我们可以通过ZooKeeper提供的基础API访问,也可以通过Curator开源框架来调用.

ZooKeeper 基础API

连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class ZookeeperTest {

private static final String CONNECT_URL = "172.30.60.2:2181";
private static final int TIME_OUT = 5 * 1000;
private static final CountDownLatch countDownLatch = new CountDownLatch(1);
ZooKeeper zooKeeper;

@Before
public void before() throws IOException, InterruptedException {
zooKeeper = new ZooKeeper(CONNECT_URL, TIME_OUT, new Watcher() {
public void process(WatchedEvent watchedEvent) {
Event.EventType type = watchedEvent.getType();
Event.KeeperState state = watchedEvent.getState();
if (state == Event.KeeperState.SyncConnected) {
countDownLatch.countDown();
System.out.println("连接上了 zookeeper");
}
if (state == Event.KeeperState.Disconnected) {
System.out.println("断开了连接");
}

}
});

System.out.println("连接中");
countDownLatch.await();
}


@After
public void after() throws InterruptedException {
zooKeeper.close();
}

CURD

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
@Test
public void zkCreate() throws UnsupportedEncodingException, KeeperException, InterruptedException {
byte[] data = "c_node_data".getBytes(StandardCharsets.UTF_8);
String nodePath = "/create_node4";
String s = zooKeeper.create(nodePath, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println(s);
}


@Test
public void zkGetData() throws UnsupportedEncodingException, KeeperException, InterruptedException {
Stat stat = new Stat();
String nodePath = "/create_node2";
byte[] data = zooKeeper.getData(nodePath, watchedEvent -> {
System.out.printf(" 监听");
}, stat);
String s = new String(data, StandardCharsets.UTF_8);
System.out.println(s);
System.out.println(stat.toString());
}

@Test
public void zkSetData() throws UnsupportedEncodingException, KeeperException, InterruptedException {
Stat stat = new Stat();
String nodePath = "/create_node4";
byte[] data = zooKeeper.getData(nodePath, watchedEvent -> {
System.out.println(" 数据被修改了");
}, stat);
String s = new String(data, StandardCharsets.UTF_8);
System.out.println(s);
System.out.println(stat.toString());

byte[] changeData = "c_node_data1111".getBytes(StandardCharsets.UTF_8);
Stat stat1 = zooKeeper.setData(nodePath, changeData, stat.getVersion());
System.out.println(stat1.toString());
}


@Test
public void zkDeleteData() throws UnsupportedEncodingException, KeeperException, InterruptedException {
Stat stat = new Stat();
String nodePath = "/create_node3";
byte[] data = zooKeeper.getData(nodePath, watchedEvent -> {
System.out.println(" 数据被修改了");
}, stat);
String s = new String(data, StandardCharsets.UTF_8);
System.out.println(s);
System.out.println(stat.toString());
zooKeeper.delete(nodePath, stat.getVersion());
}

Wacth

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Test
public void zkWatchData() throws UnsupportedEncodingException, KeeperException, InterruptedException {
Stat stat = new Stat();
String nodePath = "/create_node2";

Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
Stat newStat = new Stat();
try {
byte[] data = zooKeeper.getData(nodePath, this, newStat);
String s = new String(data, StandardCharsets.UTF_8);
System.out.println("修改之后的结果是: " + s);
} catch (Exception e) {
e.printStackTrace();
}
}
};
byte[] data = zooKeeper.getData(nodePath, watcher, stat);
String s = new String(data, StandardCharsets.UTF_8);
System.out.println(s);
System.out.println(stat.toString());

byte[] changeData = "c_node_data2".getBytes(StandardCharsets.UTF_8);
zooKeeper.setData(nodePath, changeData, stat.getVersion());
}

Curator API 使用

连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
 private static final String CONNECT_URL = "172.30.60.2:2181";
private static final int TIME_OUT = 5 * 1000;
private static final CountDownLatch countDownLatch = new CountDownLatch(1);
private CuratorFramework curatorFramework;

@Test
public void testCuratorFactoryCreateInstance() throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);
CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(CONNECT_URL, retryPolicy);
curatorFramework.start();

byte[] bytes = "curator_node1_data".getBytes(StandardCharsets.UTF_8);
String s = curatorFramework.create().forPath("/curator_node1", bytes);

System.out.println(s);
curatorFramework.close();
}

@Test
public void testCuratorBuilderCreateInstance() throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);
CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
.connectString(CONNECT_URL)
.sessionTimeoutMs(TIME_OUT)
.connectionTimeoutMs(TIME_OUT)
.retryPolicy(retryPolicy)
.namespace("namespace-zwh")
.build();
curatorFramework.start();

byte[] bytes = "curator_node1_data".getBytes(StandardCharsets.UTF_8);
String s = curatorFramework.create().forPath("/curator_node2", bytes);

System.out.println(s);
curatorFramework.close();
}


// ===================== 连接测试end


@Before
public void before() {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);
curatorFramework = CuratorFrameworkFactory.builder()
.connectString(CONNECT_URL)
.sessionTimeoutMs(TIME_OUT)
.connectionTimeoutMs(TIME_OUT)
.retryPolicy(retryPolicy)
.namespace("namespace-zwh")
.build();
curatorFramework.start();
}

@After
public void after() {
curatorFramework.close();
}

CURD

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57

@Test
public void testCreateData() throws Exception {
byte[] bytes = "curator_node1_data".getBytes(StandardCharsets.UTF_8);
String s = curatorFramework.create().forPath("/curator_node3", bytes);
System.out.println(s);
}

@Test
public void testSetData() throws Exception {
byte[] bytes = "curator_node1_data-----".getBytes(StandardCharsets.UTF_8);
Stat stat = curatorFramework.setData().forPath("/curator_node3", bytes);
System.out.println(stat.toString());
}

@Test
public void testGetData() throws Exception {
byte[] bytes = curatorFramework.getData().forPath("/curator_node3");
System.out.println(new String(bytes, StandardCharsets.UTF_8));
}

@Test
public void testDeleteData() throws Exception {
curatorFramework.delete().forPath("/curator_node3");
// byte[] bytes = curatorFramework.getData().forPath("/curator_node3");
// System.out.println(new String(bytes, StandardCharsets.UTF_8));
}


@Test
public void testCreatingParentsIfNeededData() throws Exception {
String s = curatorFramework.create()
.creatingParentsIfNeeded()
.forPath("/sub0/sub1/sub2");
System.out.println(s);
}

@Test
public void testDeletingChildrenIfNeededData() throws Exception {
curatorFramework.delete()
.deletingChildrenIfNeeded()
.forPath("/sub0/sub1/sub2");
}


@Test
public void testInBackgroundData() throws Exception {
curatorFramework.create()
.inBackground((curatorFramework, curatorEvent) -> {
System.out.println("curatorEvent = " + curatorEvent);

System.out.println(Thread.currentThread().getName());
})
.forPath("/sub0/sub1/sub2");

TimeUnit.SECONDS.sleep(5);
}

Wacth(Cache)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
@Test
public void testAddListener() throws Exception {
curatorFramework.getCuratorListenable().addListener((curatorFramework, curatorEvent) -> {
System.out.println("监听到了变化");
});
TimeUnit.SECONDS.sleep(5);
}


// 下面是监听

@Test
public void testNodeCache() throws Exception {
//CuratorCache
String path = "/sub0/sub1/sub2";
NodeCache nodeCache = new NodeCache(curatorFramework, path);
nodeCache.getListenable().addListener(() -> {

System.out.println("testNodeWatch");
byte[] bytes = curatorFramework.getData().forPath(path);

String s = new String(bytes, StandardCharsets.UTF_8);
System.out.println(s);

});
nodeCache.start();
TimeUnit.SECONDS.sleep(5);

nodeCache.close();
}


@Test
public void testPathChildrenCache() throws Exception {
//CuratorCache
String path = "/sub0/sub1";
PathChildrenCache pathChildrenCache = new PathChildrenCache(curatorFramework, path, true);
pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
byte[] bytes = client.getData().forPath("/sub0/sub1/sub2");

String s = new String(bytes, StandardCharsets.UTF_8);
System.out.println(s);
}
});
pathChildrenCache.start();
TimeUnit.SECONDS.sleep(10);
pathChildrenCache.close();

}


@Test
public void testTreeCache() throws Exception {
//CuratorCache
String path = "/sub0";
TreeCache treeCache = new TreeCache(curatorFramework, path);
treeCache.start();
treeCache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
byte[] bytes = client.getData().forPath("/sub0/sub1/sub2");

String s = new String(bytes, StandardCharsets.UTF_8);
System.out.println(s);
}
});

TimeUnit.SECONDS.sleep(10);
treeCache.close();
}

集群搭建

我们将要搭建一个 1 Leader ,2 follower , 1 observer 的ZooKeeper Cluster.

4个 ZooKeeper,和Redis搭建集群类似, 一个服务对应着一个 cfg 文件,

这里我们将 zoo-1.cfg,zoo-2.cfg,zoo-3.cfg 作为参与选举的服务, zoo-4.cfg 作为observer.

配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/usr/local/data/zookeeper-1
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1

# Cluster 集群配置
server.1=172.30.60.2:2001:3001:participant
server.2=172.30.60.2:2002:3002:participant
server.3=172.30.60.2:2003:3003:participant
server.4=172.30.60.2:2004:3004:observer

有必要解释下这个配置

1
server.1=172.30.60.2:2001:3001:participant

server.Num:IP:Port-1:Port-2:Role:host:hostPort

  • Num : 表示服务器编号

  • IP: 服务器ip

  • Port-1: ZooKeeper 服务与Leader服务器 交换信息的通信端口

  • Port-2: ZooKeeper 选举通信端口

  • Role: 角色 比如(participant,observer)

  • host:hostPort 宿主机ip:端口

每个cfg配置文件都需要关注

1
2
dataDir=/usr/local/data/zookeeper-1
clientPort=2181

创建server id

1
server.1=172.30.60.2:2001:3001:participant

server.1 服务器编号,需要结合dataDir,我们需要在对应的目录下创建一个myid的文件,并把id记录在文件中.

这里以zoo-1.cfg为例.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
drwxr-xr-x. 3 root root 63 11月 20 14:50 zookeeper-1
drwxr-xr-x. 3 root root 63 11月 20 14:50 zookeeper-2
drwxr-xr-x. 3 root root 63 11月 20 14:50 zookeeper-3
drwxr-xr-x. 3 root root 63 11月 20 14:51 zookeeper-4
[root@oldconan data]# cd zookeeper-1
[root@oldconan zookeeper-1]# ll
总用量 8
-rw-r--r--. 1 root root 2 11月 20 14:44 myid
drwxr-xr-x. 2 root root 86 11月 20 14:53 version-2
-rw-r--r--. 1 root root 4 11月 20 14:50 zookeeper_server.pid
[root@oldconan zookeeper-1]# vim myid
[root@oldconan zookeeper-1]# cat myid
1
[root@oldconan zookeeper-1]# pwd
/usr/local/data/zookeeper-1
[root@oldconan zookeeper-1]#

启动服务

启动所有的服务(zoo-1,zoo-2,zoo-3,zoo-4)

在启动之前,每个参与选举的服务是不知道自己会是什么角色的,只有启动之后才会确定角色.

检查服务角色信息

1
2
3
4
5
6
7
8
9
10
[root@oldconan apache-zookeeper-3.5.8-bin]# bin/zkServer.sh status  clusterconfig/zoo-2.cfg 
ZooKeeper JMX enabled by default
Using config: clusterconfig/zoo-2.cfg
Client port found: 2182. Client address: localhost.
Mode: leader
[root@oldconan apache-zookeeper-3.5.8-bin]# bin/zkServer.sh status clusterconfig/zoo-1.cfg
ZooKeeper JMX enabled by default
Using config: clusterconfig/zoo-1.cfg
Client port found: 2181. Client address: localhost.
Mode: follower

3

连接服务验证

连接到集群

1
[root@oldconan apache-zookeeper-3.5.8-bin]# bin/zkCli.sh  -server 172.30.60.2:2181,172.30.60.2:2182,172.30.60.2:2183,172.30.60.2:2184

1