我们已经知道怎么通过终端的Client命令连接到ZooKeeper服务了,在代码中我们又改怎么操作呢,还有ZooKeeper集群是怎么搭建的呢?
我们可以通过ZooKeeper 提供的基础API访问,也可以通过Curator 开源框架来调用.
ZooKeeper 基础API 连接 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public class ZookeeperTest { private static final String CONNECT_URL = "172.30.60.2:2181"; private static final int TIME_OUT = 5 * 1000; private static final CountDownLatch countDownLatch = new CountDownLatch(1); ZooKeeper zooKeeper; @Before public void before() throws IOException, InterruptedException { zooKeeper = new ZooKeeper(CONNECT_URL, TIME_OUT, new Watcher() { public void process(WatchedEvent watchedEvent) { Event.EventType type = watchedEvent.getType(); Event.KeeperState state = watchedEvent.getState(); if (state == Event.KeeperState.SyncConnected) { countDownLatch.countDown(); System.out.println("连接上了 zookeeper"); } if (state == Event.KeeperState.Disconnected) { System.out.println("断开了连接"); } } }); System.out.println("连接中"); countDownLatch.await(); } @After public void after() throws InterruptedException { zooKeeper.close(); }
CURD 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 @Test public void zkCreate() throws UnsupportedEncodingException, KeeperException, InterruptedException { byte[] data = "c_node_data".getBytes(StandardCharsets.UTF_8); String nodePath = "/create_node4"; String s = zooKeeper.create(nodePath, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println(s); } @Test public void zkGetData() throws UnsupportedEncodingException, KeeperException, InterruptedException { Stat stat = new Stat(); String nodePath = "/create_node2"; byte[] data = zooKeeper.getData(nodePath, watchedEvent -> { System.out.printf(" 监听"); }, stat); String s = new String(data, StandardCharsets.UTF_8); System.out.println(s); System.out.println(stat.toString()); } @Test public void zkSetData() throws UnsupportedEncodingException, KeeperException, InterruptedException { Stat stat = new Stat(); String nodePath = "/create_node4"; byte[] data = zooKeeper.getData(nodePath, watchedEvent -> { System.out.println(" 数据被修改了"); }, stat); String s = new String(data, StandardCharsets.UTF_8); System.out.println(s); System.out.println(stat.toString()); byte[] changeData = "c_node_data1111".getBytes(StandardCharsets.UTF_8); Stat stat1 = zooKeeper.setData(nodePath, changeData, stat.getVersion()); System.out.println(stat1.toString()); } @Test public void zkDeleteData() throws UnsupportedEncodingException, KeeperException, InterruptedException { Stat stat = new Stat(); String nodePath = "/create_node3"; byte[] data = zooKeeper.getData(nodePath, watchedEvent -> { System.out.println(" 数据被修改了"); }, stat); String s = new String(data, StandardCharsets.UTF_8); System.out.println(s); System.out.println(stat.toString()); zooKeeper.delete(nodePath, stat.getVersion()); }
Wacth 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 @Test public void zkWatchData() throws UnsupportedEncodingException, KeeperException, InterruptedException { Stat stat = new Stat(); String nodePath = "/create_node2"; Watcher watcher = new Watcher() { @Override public void process(WatchedEvent watchedEvent) { Stat newStat = new Stat(); try { byte[] data = zooKeeper.getData(nodePath, this, newStat); String s = new String(data, StandardCharsets.UTF_8); System.out.println("修改之后的结果是: " + s); } catch (Exception e) { e.printStackTrace(); } } }; byte[] data = zooKeeper.getData(nodePath, watcher, stat); String s = new String(data, StandardCharsets.UTF_8); System.out.println(s); System.out.println(stat.toString()); byte[] changeData = "c_node_data2".getBytes(StandardCharsets.UTF_8); zooKeeper.setData(nodePath, changeData, stat.getVersion()); }
Curator API 使用 连接 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 private static final String CONNECT_URL = "172.30.60.2:2181"; private static final int TIME_OUT = 5 * 1000; private static final CountDownLatch countDownLatch = new CountDownLatch(1); private CuratorFramework curatorFramework; @Test public void testCuratorFactoryCreateInstance() throws Exception { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5); CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(CONNECT_URL, retryPolicy); curatorFramework.start(); byte[] bytes = "curator_node1_data".getBytes(StandardCharsets.UTF_8); String s = curatorFramework.create().forPath("/curator_node1", bytes); System.out.println(s); curatorFramework.close(); } @Test public void testCuratorBuilderCreateInstance() throws Exception { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5); CuratorFramework curatorFramework = CuratorFrameworkFactory.builder() .connectString(CONNECT_URL) .sessionTimeoutMs(TIME_OUT) .connectionTimeoutMs(TIME_OUT) .retryPolicy(retryPolicy) .namespace("namespace-zwh") .build(); curatorFramework.start(); byte[] bytes = "curator_node1_data".getBytes(StandardCharsets.UTF_8); String s = curatorFramework.create().forPath("/curator_node2", bytes); System.out.println(s); curatorFramework.close(); } // ===================== 连接测试end @Before public void before() { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5); curatorFramework = CuratorFrameworkFactory.builder() .connectString(CONNECT_URL) .sessionTimeoutMs(TIME_OUT) .connectionTimeoutMs(TIME_OUT) .retryPolicy(retryPolicy) .namespace("namespace-zwh") .build(); curatorFramework.start(); } @After public void after() { curatorFramework.close(); }
CURD 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 @Test public void testCreateData() throws Exception { byte[] bytes = "curator_node1_data".getBytes(StandardCharsets.UTF_8); String s = curatorFramework.create().forPath("/curator_node3", bytes); System.out.println(s); } @Test public void testSetData() throws Exception { byte[] bytes = "curator_node1_data-----".getBytes(StandardCharsets.UTF_8); Stat stat = curatorFramework.setData().forPath("/curator_node3", bytes); System.out.println(stat.toString()); } @Test public void testGetData() throws Exception { byte[] bytes = curatorFramework.getData().forPath("/curator_node3"); System.out.println(new String(bytes, StandardCharsets.UTF_8)); } @Test public void testDeleteData() throws Exception { curatorFramework.delete().forPath("/curator_node3"); // byte[] bytes = curatorFramework.getData().forPath("/curator_node3"); // System.out.println(new String(bytes, StandardCharsets.UTF_8)); } @Test public void testCreatingParentsIfNeededData() throws Exception { String s = curatorFramework.create() .creatingParentsIfNeeded() .forPath("/sub0/sub1/sub2"); System.out.println(s); } @Test public void testDeletingChildrenIfNeededData() throws Exception { curatorFramework.delete() .deletingChildrenIfNeeded() .forPath("/sub0/sub1/sub2"); } @Test public void testInBackgroundData() throws Exception { curatorFramework.create() .inBackground((curatorFramework, curatorEvent) -> { System.out.println("curatorEvent = " + curatorEvent); System.out.println(Thread.currentThread().getName()); }) .forPath("/sub0/sub1/sub2"); TimeUnit.SECONDS.sleep(5); }
Wacth(Cache) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 @Test public void testAddListener() throws Exception { curatorFramework.getCuratorListenable().addListener((curatorFramework, curatorEvent) -> { System.out.println("监听到了变化"); }); TimeUnit.SECONDS.sleep(5); } // 下面是监听 @Test public void testNodeCache() throws Exception { //CuratorCache String path = "/sub0/sub1/sub2"; NodeCache nodeCache = new NodeCache(curatorFramework, path); nodeCache.getListenable().addListener(() -> { System.out.println("testNodeWatch"); byte[] bytes = curatorFramework.getData().forPath(path); String s = new String(bytes, StandardCharsets.UTF_8); System.out.println(s); }); nodeCache.start(); TimeUnit.SECONDS.sleep(5); nodeCache.close(); } @Test public void testPathChildrenCache() throws Exception { //CuratorCache String path = "/sub0/sub1"; PathChildrenCache pathChildrenCache = new PathChildrenCache(curatorFramework, path, true); pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { byte[] bytes = client.getData().forPath("/sub0/sub1/sub2"); String s = new String(bytes, StandardCharsets.UTF_8); System.out.println(s); } }); pathChildrenCache.start(); TimeUnit.SECONDS.sleep(10); pathChildrenCache.close(); } @Test public void testTreeCache() throws Exception { //CuratorCache String path = "/sub0"; TreeCache treeCache = new TreeCache(curatorFramework, path); treeCache.start(); treeCache.getListenable().addListener(new TreeCacheListener() { @Override public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception { byte[] bytes = client.getData().forPath("/sub0/sub1/sub2"); String s = new String(bytes, StandardCharsets.UTF_8); System.out.println(s); } }); TimeUnit.SECONDS.sleep(10); treeCache.close(); }
集群搭建 我们将要搭建一个 1 Leader ,2 follower , 1 observer 的ZooKeeper Cluster.
4个 ZooKeeper,和Redis搭建集群类似, 一个服务对应着一个 cfg 文件,
这里我们将 zoo-1.cfg,zoo-2.cfg,zoo-3.cfg 作为参与选举的服务, zoo-4.cfg 作为observer.
配置文件 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 # The number of milliseconds of each tick tickTime=2000 # The number of ticks that the initial # synchronization phase can take initLimit=10 # The number of ticks that can pass between # sending a request and getting an acknowledgement syncLimit=5 # the directory where the snapshot is stored. # do not use /tmp for storage, /tmp here is just # example sakes. dataDir=/usr/local/data/zookeeper-1 # the port at which the clients will connect clientPort=2181 # the maximum number of client connections. # increase this if you need to handle more clients #maxClientCnxns=60 # # Be sure to read the maintenance section of the # administrator guide before turning on autopurge. # # http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance # # The number of snapshots to retain in dataDir #autopurge.snapRetainCount=3 # Purge task interval in hours # Set to "0" to disable auto purge feature #autopurge.purgeInterval=1 # Cluster 集群配置 server.1=172.30.60.2:2001:3001:participant server.2=172.30.60.2:2002:3002:participant server.3=172.30.60.2:2003:3003:participant server.4=172.30.60.2:2004:3004:observer
有必要解释下这个配置
1 server.1=172.30.60.2:2001:3001:participant
server.Num:IP:Port-1:Port-2:Role:host:hostPort
每个cfg配置文件都需要关注
1 2 dataDir=/usr/local/data/zookeeper-1 clientPort=2181
创建server id 1 server.1=172.30.60.2:2001:3001:participant
server.1 服务器编号,需要结合dataDir ,我们需要在对应的目录下创建一个myid 的文件,并把id记录在文件中.
这里以zoo-1.cfg 为例.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 drwxr-xr-x. 3 root root 63 11月 20 14:50 zookeeper-1 drwxr-xr-x. 3 root root 63 11月 20 14:50 zookeeper-2 drwxr-xr-x. 3 root root 63 11月 20 14:50 zookeeper-3 drwxr-xr-x. 3 root root 63 11月 20 14:51 zookeeper-4 [root@oldconan data]# cd zookeeper-1 [root@oldconan zookeeper-1]# ll 总用量 8 -rw-r--r--. 1 root root 2 11月 20 14:44 myid drwxr-xr-x. 2 root root 86 11月 20 14:53 version-2 -rw-r--r--. 1 root root 4 11月 20 14:50 zookeeper_server.pid [root@oldconan zookeeper-1]# vim myid [root@oldconan zookeeper-1]# cat myid 1 [root@oldconan zookeeper-1]# pwd /usr/local/data/zookeeper-1 [root@oldconan zookeeper-1]#
启动服务 启动所有的服务(zoo-1,zoo-2,zoo-3,zoo-4)
在启动之前,每个参与选举的服务是不知道自己会是什么角色的,只有启动之后才会确定角色.
检查服务角色信息 1 2 3 4 5 6 7 8 9 10 [root@oldconan apache-zookeeper-3.5.8-bin]# bin/zkServer.sh status clusterconfig/zoo-2.cfg ZooKeeper JMX enabled by default Using config: clusterconfig/zoo-2.cfg Client port found: 2182. Client address: localhost. Mode: leader [root@oldconan apache-zookeeper-3.5.8-bin]# bin/zkServer.sh status clusterconfig/zoo-1.cfg ZooKeeper JMX enabled by default Using config: clusterconfig/zoo-1.cfg Client port found: 2181. Client address: localhost. Mode: follower
连接服务验证 连接到集群
1 [root@oldconan apache-zookeeper-3.5.8-bin]# bin/zkCli.sh -server 172.30.60.2:2181,172.30.60.2:2182,172.30.60.2:2183,172.30.60.2:2184