文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

java连接zookeeper实现zookeeper教程

2024-04-02 19:55

关注

java连接zookeeper实现zookeeper

Java服务端连接Zookeeper,进行节点信息的获取,管理…整理成一个基本工具

添加依赖:


<dependency>
   <groupId>org.apache.zookeeper</groupId>
   <artifactId>zookeeper</artifactId>
   <version>3.3.6</version>
</dependency>

具体代码如下:


package com; 
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;  
 
public class BaseZookeeper implements Watcher{ 
   private ZooKeeper zookeeper;
    
   private static final int SESSION_TIME_OUT = 2000;
   private CountDownLatch countDownLatch = new CountDownLatch(1);
   @Override
   public void process(WatchedEvent event) {
      if (event.getState() == KeeperState.SyncConnected) {
         System.out.println("Watch received event");
         countDownLatch.countDown();
      }
   }   
  
   
   public void connectZookeeper(String host) throws Exception{
      zookeeper = new ZooKeeper(host, SESSION_TIME_OUT, this);
      countDownLatch.await();
      System.out.println("zookeeper connection success");
   }
  
   
   public String createNode(String path,String data) throws Exception{
      return this.zookeeper.create(path, data.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
   }
  
   
   public List<String> getChildren(String path) throws KeeperException, InterruptedException{
      List<String> children = zookeeper.getChildren(path, false);
      return children;
   }
  
   
   public String getData(String path) throws KeeperException, InterruptedException{
      byte[] data = zookeeper.getData(path, false, null);
      if (data == null) {
         return "";
      }
      return new String(data);
   }
  
   
   public Stat setData(String path,String data) throws KeeperException, InterruptedException{
      Stat stat = zookeeper.setData(path, data.getBytes(), -1);
      return stat;
   }
  
   
   public void deleteNode(String path) throws InterruptedException, KeeperException{
      zookeeper.delete(path, -1);
   }
  
   
   public String getCTime(String path) throws KeeperException, InterruptedException{
      Stat stat = zookeeper.exists(path, false);
      return String.valueOf(stat.getCtime());
   }
  
   
   public Integer getChildrenNum(String path) throws KeeperException, InterruptedException{
      int childenNum = zookeeper.getChildren(path, false).size();
      return childenNum;
   }
   
   public void closeConnection() throws InterruptedException{
      if (zookeeper != null) {
         zookeeper.close();
      }
   }  
}  
 

测试:


public class Demo { 
    public static void main(String[] args) throws Exception {
        BaseZookeeper zookeeper = new BaseZookeeper();
        zookeeper.connectZookeeper("192.168.0.1:2181"); 
        List<String> children = zookeeper.getChildren("/");
        System.out.println(children);
    } 
}

ZookeeperJavaAPI基本操作

Zookeeper官方提供了两种语言的API,Java和C,在这里只演示JavaAPI

操作API的类中的变量,一下方法都会使用到


static Logger logg = LoggerFactory.getLogger(ZKApi.class);
private static final String zkServerPath = "10.33.57.28:2181";
private static final String zkServerPath = "127.0.0.1:2181";
private static final Integer timeOut = 5000;
private static Stat stat = new Stat(); 

以及实现接口Watcher的实现方法process


public void process(WatchedEvent event) {
    try {
        if (event.getType() == Event.EventType.NodeDataChanged) {
            ZooKeeper zk = null;
            zk = ZKApi.getZkConnect();
            byte[] resByt = new byte[0];
            resByt = zk.getData("/test1", false, stat);
            String resStr = new String(resByt);
            System.out.println("更改后的值:" + resStr);
            System.out.println("版本号的变化:" + stat.getVersion());
            System.out.println("-------");
            countDown.countDown();
        }else if(event.getType() == Event.EventType.NodeChildrenChanged){
            System.out.println("NodeChildrenChanged");
            ZooKeeper zk = null;
            zk = ZKApi.getZkConnect();
            List<String> srcChildList = zk.getChildren(event.getPath(), false);
            for (String child:srcChildList){
                System.out.println(child);
            }
            countDown.countDown();
        }else if(event.getType() == Event.EventType.NodeCreated){
            countDown.countDown();
        }else if (event.getType() == Event.EventType.NodeCreated){
            countDown.countDown();
        }
    }  catch (KeeperException e) {
        e.printStackTrace();
    }  catch (InterruptedException e) {
        e.printStackTrace();
    } catch (IOException e) {
        e.printStackTrace();
    }
}

1.连接客户端

创建客户端连接使用Zookeeper类的构造函数

Zookeeper构造函数总共四个如下:



public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly)
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
	long sessionId, byte[] sessionPasswd)
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
        long sessionId, byte[] sessionPasswd, boolean canBeReadOnly)

连接客户端代码


public static ZooKeeper getZkConnect() throws IOException {
    ZooKeeper zk = new ZooKeeper(zkServerPath, timeOut, new ZKApi());
    logg.debug("连接状态:{}", zk.getState());
    return zk;
}

DEBUG [main] - zookeeper.disableAutoWatchReset is false
DEBUG [main] - 连接状态:CONNECTING

2.恢复回话


public static void recoveryConnect() throws IOException, InterruptedException {
    ZooKeeper zooKeeper = new ZooKeeper(zkServerPath, timeOut, new ZKApi());
    long sessionId = zooKeeper.getSessionId();
    byte[] sessionPasswd = zooKeeper.getSessionPasswd();
    logg.debug("开始连接服务器 . . .");
    logg.debug("连接状态:{}",zooKeeper.getState());
    new Thread().sleep(1000 );
    logg.debug("开始重连 . . . ");
    ZooKeeper zooSession = new ZooKeeper(zkServerPath, timeOut, new ZKApi(), sessionId, sessionPasswd);
    logg.debug("重连状态:{}",zooSession.getState());
    new Thread().sleep(200);
    logg.debug("重连状态:{}",zooSession.getState());
}

DEBUG [main] - 开始连接服务器 . . .
DEBUG [main] - 连接状态:CONNECTING
DEBUG [main-SendThread(hdfa67:2181)] - Canonicalized address to hdfa67
 INFO [main-SendThread(hdfa67:2181)] - Opening socket connection to server hdfa67/10.33.57.67:2181. Will not attempt to authenticate using SASL (unknown error)
 INFO [main-SendThread(hdfa67:2181)] - Socket connection established to hdfa67/10.33.57.67:2181, initiating session
DEBUG [main-SendThread(hdfa67:2181)] - Session establishment request sent on hdfa67/10.33.57.67:2181
 INFO [main-SendThread(hdfa67:2181)] - Session establishment complete on server hdfa67/10.33.57.67:2181, sessionid = 0x10000ea59aa0011, negotiated timeout = 5000
DEBUG [main] - 开始重连 . . . 
 INFO [main] - Initiating client connection, connectString=10.33.57.67:2181 sessionTimeout=5000 watcher=ZKApi@73a28541 sessionId=0 sessionPasswd=<hidden>
DEBUG [main] - 重连状态:CONNECTING
DEBUG [main-SendThread(hdfa67:2181)] - Canonicalized address to hdfa67
 INFO [main-SendThread(hdfa67:2181)] - Opening socket connection to server hdfa67/10.33.57.67:2181. Will not attempt to authenticate using SASL (unknown error)
 INFO [main-SendThread(hdfa67:2181)] - Socket connection established to hdfa67/10.33.57.67:2181, initiating session
DEBUG [main-SendThread(hdfa67:2181)] - Session establishment request sent on hdfa67/10.33.57.67:2181
 INFO [main-SendThread(hdfa67:2181)] - Session establishment complete on server hdfa67/10.33.57.67:2181, sessionid = 0x10000ea59aa0012, negotiated timeout = 5000
DEBUG [main] - 重连状态:CONNECTED

3.创建节点

创建节点通过zk客户端对象的create方法进行创建,主要有两个方法:一种是同步,一种是异步,接下来的修改等方法同样如此,就不多加解释了



public String create(final String path, byte data[], List<ACL> acl,
        CreateMode createMode)
public void create(final String path, byte data[], List<ACL> acl,
        CreateMode createMode,  StringCallback cb, Object ctx)

public static void createZkNode1() throws IOException, KeeperException, InterruptedException {
    ZooKeeper zk = getZkConnect();
    String result = zk.create("/test1", "test-data".getBytes(), 
    	ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);//创建一个/test的持续节点
    System.out.println(result);
//输出/test1
public static void createZkNode2() throws IOException, KeeperException, InterruptedException {
    ZooKeeper zk = getZkConnect();
    
    String ctx = "{'create': 'success'}";
    zk.create("/test2", "test-data".getBytes(), 
    	ZooDefs.Ids.OPEN_ACL_UNSAFE,
    	CreateMode.PERSISTENT,new CreateCallBack() ,ctx);
    new Thread().sleep(2000);//需要暂停一会,否则创建失败
}

4.修改节点


public Stat setData(final String path, byte data[], int version)
public void setData(final String path, byte data[], int version,
        StatCallback cb, Object ctx)

public static void setZkNode1() throws IOException, KeeperException, InterruptedException{
    ZooKeeper zk = getZkConnect();
    Stat stat = zk.setData("/test1", "modifyed-data".getBytes(), 0);
    System.out.println(stat.getVersion());
}
public static void setZkNode2() throws IOException, KeeperException, InterruptedException{
    ZooKeeper zk = getZkConnect();
    String ctx = "{'modify': 'success'}";
    zk.setData("/test1", "modifyed-data".getBytes(),0,new ModifyCalback(),ctx);
    new Thread().sleep(1000);//必须加上,否则回掉不成功
}

5.删除节点


public void delete(final String path, int version)
public void delete(final String path, int version, VoidCallback cb,
        Object ctx)

public static void deleteZkNode1() throws IOException, KeeperException, InterruptedException {
    ZooKeeper zk = getZkConnect();
    zk.delete("/test1",1);//不能够删除子节点
}
public static void deleteZkNode2() throws IOException, InterruptedException {
    ZooKeeper zk = getZkConnect();
    String ctx = "{'delete': 'success'}";
    zk.delete("/test2",0,new DeleteCallBack(),ctx);//不能够删除子节点
    new Thread().sleep(1000);//必须加上,否则回掉不成功
}

6.查询节点


public byte[] getData(String path, boolean watch, Stat stat)
public byte[] getData(final String path, Watcher watcher, Stat stat)
public void getData(final String path, Watcher watcher,DataCallback cb, Object ctx)             
public void getData(String path, boolean watch, DataCallback cb, Object ctx)        

public static CountDownLatch countDown = new CountDownLatch(1);
public static void selectData1() throws IOException, KeeperException, InterruptedException {
    ZooKeeper zk = getZkConnect();
    byte[] data = zk.getData("/test1", true, stat);
    String s = new String(data);
    System.out.println("value: "+s);
    countDown.await();
}

if (event.getType() == Event.EventType.NodeDataChanged) {
            ZooKeeper zk = null;
            zk = ZKApi.getZkConnect();
            byte[] resByt = new byte[0];
            resByt = zk.getData("/test1", false, stat);
            String resStr = new String(resByt);
            System.out.println("更改后的值:" + resStr);
            System.out.println("版本号的变化:" + stat.getVersion());
            System.out.println("-------");
            countDown.countDown();
        

由于更改之后,触发了监听器,再次在命令行中进行更改,出现了一下结果。

在这里插入图片描述

7.查询子节点

查询子节点的方法


public List<String> getChildren(final String path, Watcher watcher)
public List<String> getChildren(String path, boolean watch)
public void getChildren(final String path, Watcher watcher, ChildrenCallback cb, Object ctx)
public void getChildren(String path, boolean watch, Children2Callback cb, Object ctx)   
public List<String> getChildren(final String path, Watcher watcher, Stat stat)
public List<String> getChildren(String path, boolean watch, Stat stat)
public void getChildren(final String path, Watcher watcher, Children2Callback cb, Object ctx)
public void getChildren(String path, boolean watch, Children2Callback cb, Object ctx)
            

代码实现


public static CountDownLatch countDown = new CountDownLatch(1);
public static void selectchildData1() throws IOException, KeeperException, InterruptedException {
    ZooKeeper zk = getZkConnect();
    List<String> srcChildList = zk.getChildren("/test", true, stat);
    for (String child:srcChildList){
        System.out.println(child);
    }
    countDown.await();
}

if(event.getType() == Event.EventType.NodeChildrenChanged){
    System.out.println("NodeChildrenChanged");
    ZooKeeper zk = null;
    zk = ZKApi.getZkConnect();
    List<String> srcChildList = zk.getChildren(event.getPath(), false);
    for (String child:srcChildList){
        System.out.println(child);
    }

运行结果完成后,触监听器,再次删除test1

在这里插入图片描述

第二种异步方式实现


public static void selectchildData2() throws IOException, KeeperException, InterruptedException{
    ZooKeeper zk = getZkConnect();
    String ctx = "{'selectChild': 'success'}";
    zk.getChildren("/test",false,new ChildrenCallback(),ctx);
    new Thread().sleep(1000);
}

8.使用递归得到所有的节点


public static void selectchildData3() throws IOException, KeeperException, InterruptedException{
   getChild("/");
}
public static void getChild(String path) throws IOException, KeeperException, InterruptedException {
    System.out.println(path);
    ZooKeeper zk = getZkConnect();
    List<String> childrenList = zk.getChildren(path, false, stat);
    if(childrenList.isEmpty() || childrenList ==null)
        return;
    for(String s:childrenList){
        if(path.equals("/"))
           getChild(path+s);
        else {
            getChild(path+"/"+s);
        }
    }
}

运行结果:

/zookeeper
/zookeeper/config
/zookeeper/quota
/ldd
/ldd/l
/loo
/t1
/test1
/seq
/seq/seq30000000002
/seq/seq20000000001
/seq/se0000000003
/seq/seq10000000000

9.判断节点是否存在


public static void existNode() throws IOException, KeeperException, InterruptedException {
    ZooKeeper zk = getZkConnect();
    Stat stat = zk.exists("/ff", true);
    System.out.println(stat);
}
//输出null则不存在

10.自定义权限


public static void oneSelfACL() throws Exception {
    ZooKeeper zk = getZkConnect();
    ArrayList<ACL> acls = new ArrayList<ACL>();
  //  zk.create("/test1","test-data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); //所有人均可访问
    Id id1 = new Id("digest", ACLUtils.getDigestUserPassword("id1:123456"));
    Id id2 = new Id("digest", ACLUtils.getDigestUserPassword("id2:123456"));
   // Id ipId = new Id("ip","127.0.0.1");ip设置
    // acls.add(new ACL(ZooDefs.Perms.ALL,id1));
    acls.add(new ACL(ZooDefs.Perms.ALL,id1));
    acls.add(new ACL(ZooDefs.Perms.DELETE,id2));
    //注册过的用户必须通过addAuthInfo才可以操作节点
    zk.addAuthInfo("digest","id1:123456".getBytes());
    zk.create("/test2","test2-data".getBytes(), acls,CreateMode.PERSISTENT);
}

结果如下:

在这里插入图片描述

直接登录id1由于在程序已经注册完成

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

以上为个人经验,希望能给大家一个参考,也希望大家多多支持编程网。

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯