文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

Zookeeper笔记

2023-09-08 08:31

关注

为什么要使用Zookeeper

dubbo需要一个注册中心,而Zookeeper是我们在使用Dubbo是官方推荐的注册中心
在这里插入图片描述
在这里插入图片描述

Zookeeper介绍

在这里插入图片描述
Zookeeper的集群机制
Zookeeper是为了其他分布式程序提供服务的,所以不能随便就挂了。Zookeeper的集群机制采取的是半数存活机制。也就是整个集群节点中有半数以上的节点存活,那么整个集群环境可用。这也是为什么说zk的集群最好是奇数个节点。

zk的作用

序号功能
1为别的分布式程序服务的
2本身就是一个分布式程序
3主从协调 服务器节点动态上下线 统一配置管理 分布式共享锁 统一名称服务
4管理(存储,读取)用户程序提交的数据 并为用户程序提供数据节点监听服务

Zookeeper 节点的三种角色

Leader
Leader是Zookeeper集群工作的核心,其主要工作是:

  1. 事务请求的唯一调度和处理者,保证集群事务处理的顺序性。
  2. 集群内部各服务器的调度者

Follower
Follower是zookeeper集群的跟随者,主要工作是:

  1. 处理客户端非事务性请求(读取数据),转发事务请求给Leader服务器
  2. 参与事务请求Proposal的投票
  3. 参与Leader选举投票

Observer
Observer充当观察者的角色,观察Zookeeper集群的最小状态变化并将这些状态同步过来,其对于非事务请求可以独立处理,对于事务请求,会转给Leader节点进行处理。Observer不会参与投票,包括事务请求Proposal的投票和Leader选举投票

###Zookeeper 节点的四种状态
在知道了 Zookeeper 中有三种角色后,我们可能会问: Zookeeper 是如何知道自己目前是什么角色的呢?

在 ZAB 协议中定义:通过自身的状态来区分自己的角色的,在运行期间各个进程可能出现以下四种状态之一:

在组成 ZAB 协议的所有进程启动的时候,初始化状态都是 LOOKING 状态,此时进程组中不存在 Leader,选举之后才有,在进行选举成功后,就进入消息广播模式,此时 Zookeeper 集群中的角色状态就不再是 LOOKING 状态。

集群环境准备

节点的映射关系

每个节点设置相应的ip和主机名的映射关系,方便集群环境的部署
修改hosts配置文件中的信息
在这里插入图片描述

配置免密登录
生成公钥和私钥

ssh-keygen

输入命令后根据提示,四次回车即可
在这里插入图片描述

发送公钥给需要免密登录的节点

ssh-copy-id zk01ssh-copy-id zk02ssh-copy-id zk03

在这里插入图片描述
节点和节点发送文件通过scp命令实现

scp -r b.txt bobo01:/root/

在这里插入图片描述
关闭防火墙
查看防火墙状态

firewall-cmd --state

停止防火墙

systemctl stop firewall.service

禁止开机启动

systemctl disable firewall.service

Zookeeper的选举机制

Leader主要作用是保证分布式数据一致性,即每个节点的存储的数据同步。

服务器初始化时Leader选举

Zookeeper由于自身的性质,一般建议选取奇数个节点进行搭建分布式服务器集群。以3个节点组成的服务器集群为例,说明服务器初始化时的选举过程。启动第一台安装Zookeeper的节点时,无法单独进行选举,启动第二台时,两节点之间进行通信,开始选举Leader。

  1. 每个Server投出一票。第一次他们都投给自己作为Leader,投票内容未(SID,ZXID)。
    SID为Server的id,即启动ZK时配置文件中的myid;
    ZXID为事务id,为节点的更新程序,ZXID越大,代表Server对ZK节点的操作越新。由于服务器初始化,
    每个Sever上的Znode为0,所以Server1投的票为(1,0),Server2为(2,0)。两Server将各自投票发给集群中其他机器。
  2. 每个Server接收来自其他Server的投票。集群中的每个Server先判断投票的有效性,如检查是不是本轮的投票,是不是来Looking状态的服务器投的票。
  3. 对投票结果进行处理。处理规则为:
    • 首先对比ZXID。ZXID大的服务器优先作为Leader
    • 若ZXID系统,如初始化时,每个Server的ZXID都是0
    • 就会比较sid即myid,myid大的选出来做Leader。
      首次选举对于Server而言,他接受到的投票为(2,0),因为自身的票为(1,0),所以此时它会选举Server2为Leader,
      将自己的更新为(2,0)。而Server2收到的投票为Server1的(1,0)由于比他自己小,
      Server2的投票不变。Server1和Server2再次将票投出,投出的票都为(2,0)
  4. 统计投票。每次投票后,服务器都会统计投票信息,如果判定某个Server有过半的票数,俺么该Server就是Leader。首次投票对于Server1和Server2而言,统计出已经有两台机器接收了(2,0)的投票信息,此时认为选出了Leader。
  5. 改变服务器的状态。当确定了Leader之后,每个Server更新自己的状态,
    Leader将状态更新为Leading,Follower将状态更新为Following。
    image.png
    在这里插入图片描述

服务器运行期间的Leader选举

ZK运行期间,如果有新的Server加入,或非Leader节点挂了,那么Leader会同步数据给新的Server或寻找其他备用Server替代宕机的Server。若Leader宕机,此时集群暂停对外服务,开始在内部选举新的Leader。假设当前集群中有Server1、Server2、Server3三台服务器,Server2为当前集群的Leader,由于意外情况,Server2宕机了,便开始进入选举状态。过程如下

1 变更状态。其他非Observer服务器将自己的状态改变成Looking,开始进入Leader选举。
2. 每个Server投出1张票(myid,ZXID),由于集群运行过,所以每个Server的ZXID可能不同。
假设Server1的ZXID为145,Server3的为122,第一轮投票中,Server1和Server3都投自己,
票分别为(1,145)、(3,122),将自己的票发送给集群中所有机器。
3. 每个Server接收接收来自其他Server的投票,接下来的步骤与初始化时相同。

恢复模式的两个原则

当集群正在启动过程中,或 Leader 与超过半数的主机断连后,集群就进入了恢复模式。对于要恢复的数据状态需要遵循两个原则。
已被处理过的消息不能丢失
被丢弃的消息(原Leader未提交的事务)不能再现

1.已被处理过的消息不能丢失(部分follower提交了)
当 Leader 收到超过半数 Follower 的 ACKs 后,就向各个 Follower 广播 COMMIT 消息, 批准各个 Server 执行该写操作事务。当各个 Server 在接收到 Leader 的 COMMIT 消息后就会在本地执行该写操作,然后会向客户端响应写操作成功。

但是如果在非全部 Follower 收到 COMMIT 消息之前 Leader 就挂了,这将导致一种后 果:部分 Server 已经执行了该事务,而部分 Server 尚未收到 COMMIT 消息,所以其并没有执行该事务。当新的 Leader 被选举出,集群经过恢复模式后需要保证所有 Server 上都执行 了那些已经被部分 Server 执行过的事务。

2.被 Leader 提出的但没有被提交的消息不能再现(无follower提交)
当新事务在 Leader 上已经通过,其已经将该事务更新到了本地,但所有 Follower 还都没有收到 COMMIT 之前,Leader 宕机了(比前面叙述的宕机更早),此时,所有 Follower 根本 就不知道该 Proposal 的存在。当新的 Leader 选举出来,整个集群进入正常服务状态后,之 前挂了的 Leader 主机重新启动并注册成为了 Follower。

若那个别人根本不知道的 Proposal 还保留在那个主机,那么其数据就会比其它主机多出了内容,导致整个系统状态的不一致。所以,该 Proposa 应该被丢弃。类似这样应该被丢弃的事务,是不能再次出现在集群中的, 应该被清除。

myid与zxid

myid : 每个 Zookeeper 服务器,都需要在数据文件夹下创建一个名为 myid 的文件,该文件包含整个 Zookeeper 集群唯一的 ID

例如,某个 Zookeeper 集群包含三台服务器,在配置文件中,server.后面的数据即为 myid
server.1=zoo1:2888:3888
server.2=zoo2:2888:3888
server.3=zoo3:2888:3888

zxid : 类似于 RDBMS 中的事务ID,用于标识一个 Proposal ID,为了保证顺序性,ZXID 必须单调递增.

ZAB协议

ZAB(Zookeeper Atomic Broadcast)协议是Apache Zookeeper分布式协调服务中使用的一种原子广播协议。Zookeeper是一个开源的、高性能的分布式协调服务,用于管理分布式系统中的配置信息、命名服务、分布式锁等。ZAB协议是Zookeeper的核心协议,通过确保Zookeeper集群中所有服务器之间的事务广播顺序,它实现了Zookeeper数据的一致性

Zab 协议核心
Zookeeper采用ZAB协议的核心就是只要一台服务器提交了Proposal(事务),就要确保所有服务器最终都能正确提交Proposal,这也是CAP/BASE最终一致性的体现。(选举的核心

从大体上看,Zab 协议的原理可细分为四个阶段:

  1. 选举(Leader Election)
  2. 发现(Discovery)
  3. 同步(Synchronization)
  4. 广播(Broadcast)
    但是在 ZooKeeper 的实现中,将发现和同步两部分内容合成数据恢复一部分,所以按实现划分可以分为三个阶段:(选举的核心
  5. Leader选举(Fast Leader Election)
  6. 数据恢复(Recovery Phase)
  7. 广播(Broadcast)

Zab 协议按照功能的不同,有两种模式:一种是消息广播模式,另一种是崩溃恢复模式。
image.png
消息广播模式用于处理客户端的请求,崩溃恢复模式用于在节点意外崩溃时能够快速恢复,继续对外提供服务,让集群达成高可用状态。

1) 消息广播模式

Zookeeper 中广播消息步骤:

崩溃恢复模式

一旦 Leader 服务器出现崩溃或者由于网络原因导致 Leader 服务器失去了与过半 Follower 的联系,那么就会进入崩溃恢复模式。

Zookeeper 集群中为保证任何进程能够顺序执行,只能是 Leader 服务器接收写请求,其他服务器接收到客户端的写请求,也会转发至 Leader 服务器进行处理。

Zab 协议崩溃恢复需满足以下2个请求:

也就是新选举出来的 Leader 不能包含未提交的 Proposal(事务),必须都是已经提交了的 Proposal 的 Follower 服务器节点,新选举出来的 Leader 节点中含有最高的 ZXID,所以,在 Leader 选举时,将 ZXID 作为每个 Follower 投票时的信息依据。这样做的好处是避免了 Leader 服务器检查 Proposal 的提交和丢弃工作。(具体看上面的恢复规则)

Zookeeper是不是强一致性

写操作时是强一致性,读操作则不是强一致性
强一致性

顺序一致性

Zookeeper官方定义:顺序一致性。(Sequential consistency)

在zk的官方文档中对此有解释,地址在:https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html

ZK保证了保持长链接的单个客户端,总是能得到自己保持链接的机器上的最新值,这个值不一定是Leader的最新值。如果该机器挂了,会连接到zxid不小于当前节点的机器,从而保证不会读到更旧的值。

Zookeeper顺序一致性怎么做到的?

Leader 发送 proposal 时,其实会为每个 Follower 创建一个队列,都往各自的队列中发送 proposal。
image.png

Zookeeper客户端使用

配置Zookeeper的环境变量
为了简化我们每次操作Zookeeper而不用进入到Zookeeper的安装目录,我们可以将Zookeeper的安装信息配置到系统的环境变量中

vim /etc/profile

添加的内容

export ZOOKEPPER_HOME=/opt/zookeeperexport PATH=$PATH:$ZOOKEEPER_HOME/bin

执行source命令

source /etc/profile

我们就可以在节点的任意位置操作Zookeeper了,通过scp命令将profile文件发送到其他几个节点上

scp /etc/profile zk02:/etc/

客户端连接
通过bin目录下的zkCli.sh 命令连接即可

zkCli.sh

在这里插入图片描述
zkCli.sh默认连接的是当前节点的Zookeeper节点,如果我们要连接其他节点执行如下命令即可

zkCli.sh -timeout 5000 -server zk02:2181

数据操作

Zookeeper的数据结构

  1. 层次化的目录结构,命名符合常规文件系统规范
  2. 每个节点在Zookeeper中叫做znode,并且有一个唯一的路径标识
  3. 节点znode可以包含数据和子节点(但是EPHEMERAL类型的节点不能有子节点)
  4. 客户端应用可以在节点上设置监听器
    在这里插入图片描述

节点类型

1).znode有两种类型:

短暂性(ephemeral)(断开连接自己删除)
持久性(persistent)(断开连接不删除)

2).znode有四种形式的目录节点(默认是persistent)如下

序号节点类型描述
1PERSISTENT持久节点
2PERSISTENT_SEQUENTIAL持久有序节点(顺序节点)
3EPHEMERAL短暂节点 (临时节点)
4EPHEMERAL_SEQUENTIAL短暂有序节点 (临时顺序节点)

创建znode时设置顺序标识,znode名称后会附加一个值,顺序号是一个单调递增的计数器,有父节点维护在分布式系统中,顺序号可以被用于为所有的事件进行全局排序,这样客户端可以通过顺序号推断事件的顺序

常用命令
Zookeeper作为Dubbo的注册中心用来保存我们各个服务的节点信息,显示Zookeeper是可以实现输出的存储操作的,我们来看下Zookeeper中存储操作的基本命令

ls
​ ls用来查看某个节点下的子节点信息
在这里插入图片描述
增强的命令,查看节点下的子节点及当前节点的属性信息 ls2或者 ls -s 命令
在这里插入图片描述
create
​ 创建节点信息
在这里插入图片描述
get

​ get命令用来查看节点的数据
在这里插入图片描述
如果要查看节点的属性信息那么我们可以通过get -s 来实现

delete
​ delete只能删除没有子节点的节点要删除非空节点可以通过 rmr 或者 deleteall 命令实现
在这里插入图片描述
set

​ set命令可以用来修改节点的内容。
在这里插入图片描述
事件监听
监听某个节点的数据内容变化,通过get命令 带 -w 参数即可,在3.4版本的Zookeeper中是通过 get path watch 来说实现监控的
在这里插入图片描述
然后我们在其他节点上修改app1节点的数据,会触监听事件

Zookeeper Java API使用

pom

<dependencies><dependency>        <groupId>org.apache.zookeepergroupId>        <artifactId>zookeeperartifactId>        <version>3.5.9version>    dependency>    <dependency>        <groupId>com.github.sgroschupfgroupId>        <artifactId>zkclientartifactId>        <version>0.1version>    dependency>    <dependency>        <groupId>junitgroupId>        <artifactId>junitartifactId>        <version>4.12version>    dependency>dependencies>

连接ZK服务,并监听节点变化

@Slf4jpublic class ConfigCenter {    private final static  String CONNECT_STR="192.168.40.243:2181";    private final static Integer  SESSION_TIMEOUT=30*1000;    private static ZooKeeper zooKeeper=null;    private static CountDownLatch countDownLatch=new CountDownLatch(1);    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {        zooKeeper=new ZooKeeper(CONNECT_STR, SESSION_TIMEOUT, new Watcher() {            @Override            public void process(WatchedEvent event) {                if (event.getType()== Event.EventType.None                        && event.getState() == Event.KeeperState.SyncConnected){                    log.info("连接已建立");                    countDownLatch.countDown();                }            }        });        countDownLatch.await();        MyConfig myConfig = new MyConfig();        myConfig.setKey("anykey");        myConfig.setName("anyName");        ObjectMapper objectMapper=new ObjectMapper();        byte[] bytes = objectMapper.writeValueAsBytes(myConfig);        String s = zooKeeper.create("/myconfig", bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);        Watcher watcher = new Watcher() {            @SneakyThrows            @Override            public void process(WatchedEvent event) {                if (event.getType()== Event.EventType.NodeDataChanged                        && event.getPath()!=null && event.getPath().equals("/myconfig")){                    log.info(" PATH:{}  发生了数据变化" ,event.getPath());                    //循环监听                    //监听结束后,重新设置                    byte[] data = zooKeeper.getData("/myconfig", this, null);                    MyConfig newConfig = objectMapper.readValue(new String(data), MyConfig.class);                    log.info("数据发生变化: {}",newConfig);                }            }        };        byte[] data = zooKeeper.getData("/myconfig", watcher, null);        MyConfig originalMyConfig = objectMapper.readValue(new String(data), MyConfig.class);        log.info("原始数据: {}", originalMyConfig);//        TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);    }

在这里插入图片描述
standalone版本

@Slf4jpublic abstract class  StandaloneBase {    private static final String CONNECT_STR="192.168.109.200:2181";    private static final int SESSION_TIMEOUT=30 * 1000;    private static ZooKeeper zooKeeper =null;    private static CountDownLatch countDownLatch = new CountDownLatch(1);    private Watcher watcher =new Watcher() {        @Override        public void process(WatchedEvent event) {            if (event.getState() == Event.KeeperState.SyncConnected                    && event.getType()== Event.EventType.None){                countDownLatch.countDown();                log.info("连接建立");            }        }    };    @Before    public void init(){        try {            log.info(" start to connect to zookeeper server: {}",getConnectStr());             zooKeeper=new ZooKeeper(getConnectStr(), getSessionTimeout(), watcher);            log.info(" 连接中...");            countDownLatch.await();        } catch (IOException e) {            e.printStackTrace();        } catch (InterruptedException e) {            e.printStackTrace();        }    }    public static ZooKeeper getZooKeeper() {        return zooKeeper;    }    @After    public void test(){        try {            TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);        } catch (InterruptedException e) {            e.printStackTrace();        }    }    protected String getConnectStr(){        return CONNECT_STR;    }    protected int getSessionTimeout() {        return SESSION_TIMEOUT;    }}
    private String first_node = "/first-node";    @Test    public void testCreate() throws KeeperException, InterruptedException {        ZooKeeper zooKeeper = getZooKeeper();        String s = zooKeeper.create(first_node, "first".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);        log.info("Create:{}",s);    }@Test    public void testGetData(){        Watcher watcher = new Watcher() {            @Override            public void process(WatchedEvent event) {                      if (event.getPath()!=null && event.getPath().equals(first_node)  && event.getType()== Event.EventType.NodeDataChanged){                          log.info(" PATH: {}  发现变化",first_node);                          try {  byte[] data = getZooKeeper().getData(first_node, this, null);  log.info(" data: {}",new String(data));                          } catch (KeeperException e) {  e.printStackTrace();                          } catch (InterruptedException e) {  e.printStackTrace();                          }                      }            }        };        try {            byte[] data = getZooKeeper().getData(first_node, watcher, null);  //            log.info(" data: {}",new String(data));        } catch (KeeperException e) {            e.printStackTrace();        } catch (InterruptedException e) {            e.printStackTrace();        }    }        @Test    public void  asyncTest(){          String userId="xxx";          getZooKeeper().getData("/test", false, (rc, path, ctx, data, stat) -> {              Thread thread = Thread.currentThread();              log.info(" Thread Name: {},   rc:{}, path:{}, ctx:{}, data:{}, stat:{}",thread.getName(),rc, path, ctx, data, stat);        },"test");        log.info(" over .");    }        @Test    public void exist() throws  Exception{        ZooKeeper zooKeeper = getZooKeeper();        // true表示的是使用Zookeeper中的watch        Stat stat = zooKeeper.exists(first_node, true);        if(stat != null){            System.out.println("节点存在"+ stat.getNumChildren());        }else{            System.out.println("节点不存在 ....");        }    }        @Test    public void getChildrens() throws Exception{    ZooKeeper zooKeeper = getZooKeeper();        List<String> childrens = zooKeeper.getChildren(first_node, true);        for (String children : childrens) {            // System.out.println(children);            // 获取子节点中的数据            byte[] data = zooKeeper.getData(first_node+"/" + children, false, null);            System.out.println(children+":" + new String(data));        }    }        @Test    public void setData() throws Exception{        // -1 不指定版本 自动维护        Stat stat = zooKeeper.setData(first_node+"/a1", "666666".getBytes(), -1);        System.out.println(stat);        //  指定版本 自动维护//ZooKeeper zooKeeper = getZooKeeper();        //Stat stat = new Stat();        //byte[] data = zooKeeper.getData(first_node, false, stat);        //int version = stat.getVersion();//版本号        //zooKeeper.setData(first_node, "third".getBytes(), version);    }        @Test    public void deleteNode() throws Exception{        zooKeeper.delete(first_node,-1);            }

事件监听处理

    @Test    public void nodeChildrenChange() throws Exception{        List<String> list = zooKeeper.getChildren("/app1", new Watcher() {                        @Override            public void process(WatchedEvent watchedEvent) {                System.out.println("--->"+ watchedEvent.getType());            }        });        for (String s : list) {            System.out.println(s);        }        Thread.sleep(Integer.MAX_VALUE);    }        @Test    public void nodeDataChanged() throws Exception{        byte[] data = zooKeeper.getData("/app1/a1", new Watcher() {            @Override            public void process(WatchedEvent watchedEvent) {                System.out.println("--->" + watchedEvent.getType());            }        }, null);        System.out.println("--->" + new String(data));        Thread.sleep(Integer.MAX_VALUE);    }

Curator

Curator 是一套由netflix 公司开源的,Java 语言编程的 ZooKeeper 客户端框架,Curator项目
是现在ZooKeeper 客户端中使用最多,对ZooKeeper 版本支持最好的第三方客户端,并推荐使
用,Curator 把我们平时常用的很多 ZooKeeper 服务开发功能做了封装,例如 Leader 选举、
分布式计数器、分布式锁。这就减少了技术人员在使用 ZooKeeper 时的大部分底层细节开发工
作。在会话重新连接、Watch 反复注册、多种异常处理等使用场景中,用原生的 ZooKeeper
处理比较复杂。而在使用 Curator 时,由于其对这些功能都做了高度的封装,使用起来更加简
单,不但减少了开发时间,而且增强了程序的可靠性。

yaml

<dependency>    <groupId>org.apache.curatorgroupId>    <artifactId>curator-recipesartifactId>    <version>5.0.0version>    <exclusions>        <exclusion>            <groupId>org.apache.zookeepergroupId>            <artifactId>zookeeperartifactId>        exclusion>    exclusions>dependency><dependency>    <groupId>org.apache.curatorgroupId>    <artifactId>curator-x-discoveryartifactId>    <version>5.0.0version>    <exclusions>        <exclusion>            <groupId>org.apache.zookeepergroupId>            <artifactId>zookeeperartifactId>        exclusion>    exclusions>dependency><dependency>    <groupId>org.apache.zookeepergroupId>    <artifactId>zookeeperartifactId>    <version>3.5.8version>dependency>

父类

@Slf4jpublic abstract  class CuratorStandaloneBase {    private static final String CONNECT_STR = "192.168.109.200:2181";    private static final int sessionTimeoutMs = 60*1000;    private static final int connectionTimeoutMs = 5000;    private static CuratorFramework curatorFramework;    @Before    public void init() {        RetryPolicy retryPolicy = new ExponentialBackoffRetry(5000, 30);        curatorFramework = CuratorFrameworkFactory.builder().connectString(getConnectStr())                .retryPolicy(retryPolicy)                .sessionTimeoutMs(sessionTimeoutMs)                .connectionTimeoutMs(connectionTimeoutMs)                .canBeReadOnly(true)                .build();        curatorFramework.getConnectionStateListenable().addListener((client, newState) -> {            if (newState == ConnectionState.CONNECTED) {                log.info("连接成功!");            }        });        log.info("连接中......");        curatorFramework.start();    }    public void createIfNeed(String path) throws Exception {        Stat stat = curatorFramework.checkExists().forPath(path);        if (stat==null){            String s = curatorFramework.create().forPath(path);            log.info("path {} created! ",s);        }    }    public static CuratorFramework getCuratorFramework() {        return curatorFramework;    }       protected   String getConnectStr(){        return CONNECT_STR;    }}
// 递归创建子节点    @Test    public void testCreateWithParent() throws Exception {        CuratorFramework curatorFramework = getCuratorFramework();        String pathWithParent = "/node-parent/sub-node-1";        String path = curatorFramework.create().creatingParentsIfNeeded().forPath(pathWithParent);        log.info("curator create node :{}  successfully.", path);    }    // protection 模式,防止由于异常原因,导致僵尸节点    @Test    public void testCreate() throws Exception {        CuratorFramework curatorFramework = getCuratorFramework();        String forPath = curatorFramework                .create()                .withProtection()                .withMode(CreateMode.EPHEMERAL_SEQUENTIAL).                        forPath("/curator-node", "some-data".getBytes());        log.info("curator create node :{}  successfully.", forPath);    }    @Test    public void testGetData() throws Exception {        CuratorFramework curatorFramework = getCuratorFramework();        byte[] bytes = curatorFramework.getData().forPath("/curator-node");        log.info("get data from  node :{}  successfully.", new String(bytes));    }    @Test    public void testSetData() throws Exception {        CuratorFramework curatorFramework = getCuratorFramework();        curatorFramework.setData().forPath("/curator-node", "changed!".getBytes());        byte[] bytes = curatorFramework.getData().forPath("/curator-node");        log.info("get data from  node /curator-node :{}  successfully.", new String(bytes));    }    @Test    public void testDelete() throws Exception {        CuratorFramework curatorFramework = getCuratorFramework();        String pathWithParent = "/node-parent";        curatorFramework.delete().guaranteed().deletingChildrenIfNeeded().forPath(pathWithParent);    }    @Test    public void testListChildren() throws Exception {        CuratorFramework curatorFramework = getCuratorFramework();        String pathWithParent = "/discovery/example";        List<String> strings = curatorFramework.getChildren().forPath(pathWithParent);        strings.forEach(System.out::println);    }//线程池方式    @Test    public void testThreadPool() throws Exception {        CuratorFramework curatorFramework = getCuratorFramework();        ExecutorService executorService = Executors.newSingleThreadExecutor();        String ZK_NODE="/zk-node";        curatorFramework.getData().inBackground((client, event) -> {            log.info(" background: {}", event);        },executorService).forPath(ZK_NODE);     }

Watch机制

ZooKeeper的Watch机制是它的一个重要特性,它允许客户端在ZooKeeper节点发生变化时得到通知。通过Watch机制,客户端可以设置对指定节点的监视,并在节点发生变化(数据更新、节点删除、子节点变化等)时,ZooKeeper会通知相关的客户端。(相当与redis分布式锁中的watchDog机制,是分布式锁中十分重要的机制,用于监听比自己小的节点

Watch机制的主要特点包括:

一次性触发:一旦Watch被设置在一个节点上,并且该节点发生了监视的事件(例如数据更新),那么Watch就会触发通知。一次触发后,Watch就会失效,客户端需要重新设置Watch以继续监视节点。

轻量级通知:ZooKeeper的Watch机制是轻量级的,因为Watch只是一个通知,不包含实际数据。客户端收到Watch通知后,可以根据需要再次向ZooKeeper请求节点的最新数据。

顺序性:Watch通知是有序的。也就是说,如果多个Watch被设置在一个节点上,当该节点发生变化时,通知的顺序是确定的,保证了客户端可以按照特定的顺序处理通知。

一次性回调:客户端收到Watch通知后,需要进行一次性回调来处理事件。在处理通知的过程中,如果发生了新的变化,之前的Watch不会再次触发。

需要注意的是,Watch机制并不保证强一致性。由于网络延迟或其他因素,Watch通知可能有一定的延迟,客户端可能会收到旧的数据变更通知。因此,客户端在处理Watch通知时,需要谨慎处理,并考虑可能出现的数据不一致情况。

Watch机制是ZooKeeper实现分布式协作的关键机制之一。通过Watch,客户端可以实时获取ZooKeeper节点的变化,从而在分布式系统中做出相应的处理,实现高效的协作和协调。

ZK分布式锁

创建节点,判断顺序号是否是最小的
在这里插入图片描述
在这里插入图片描述

创建永久节点,在普通节点下创建临时顺序节点,节点之间按顺序依次监听(通过watch机制),当拿到锁的节点处理完事务后,释放锁,后一个节点监听到前一个节点释放锁后,立刻申请获得锁,以此类推
过程解析

public class DistributedLock {    private String connectString = "192.168.58.100:2181";    private ZooKeeper client;    private CountDownLatch countDownLatch = new CountDownLatch(1);    private CountDownLatch waitLatch = new CountDownLatch(1);    //当前节点    private String currentNode;    //要等待的节点    private String waitPath;        //1.连接Zookeeper    public DistributedLock() throws Exception {        client = new ZooKeeper(connectString, 300000, new Watcher() {            @Override            public void process(WatchedEvent watchedEvent) {                //连接上zk  释放                if(watchedEvent.getState() == Event.KeeperState.SyncConnected){                    countDownLatch.countDown();                }                // waitLatch 需要释放,节点被删除,并且是前一个节点                if(watchedEvent.getType() == Event.EventType.NodeDeleted &&                watchedEvent.getPath().equals(waitPath)){                    waitLatch.countDown();                }            }        });        //zk 连接成功,再往下走        countDownLatch.await();        //2.判断节点是否存在        Stat stat = client.exists("/locks", false);        if(stat == null){            //创建根节点            client.create("/locks","locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);        }    }    //3.加锁    public void zkLock() throws KeeperException, InterruptedException {        //创建临时顺序节点        currentNode = client.create("/locks/" + "seq-" ,null,ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);        //判断创建的节点是否是最小序号节点,如果是 就获取锁,不是监听前一个节点        List<String> children = client.getChildren("/locks", false);        //如果集合中只有一个元素,可以直接获取锁        if(children.size() == 1){            return;        }else{            //先排序            Collections.sort(children);            //获取节点名称            String nodeName = currentNode.substring("/locks/".length());            //获取节点名称 在集合的位置            int index = children.indexOf(nodeName);            if(index == -1){                System.out.println("数据异常");            }else if(index == 0){                return;            }else{                //需要监听前一个节点的变化                waitPath = "/locks/" + children.get(index - 1);                client.getData(waitPath,true,null);                //等待监听执行                waitLatch.await();                return;            }        }    }    //解锁    public void unZkLock() throws KeeperException, InterruptedException {        //删除节点        client.delete(currentNode,-1);    }}

Nginx代理转发
在这里插入图片描述

Curator框架实现分布式锁

实现思路
image.png

InterProcessMutex介绍

Apache Curator 内置了分布式锁的实现: InterProcessMutex

public InterProcessMutex(CuratorFramework client, String path){    this(client, path, new StandardLockInternalsDriver());}public InterProcessMutex(CuratorFramework client, String path, LockInternalsDriver driver){    this(client, path, LOCK_NAME, 1, driver);}
参数说明
clientcurator中zk客户端对象
path抢锁路径,同一个锁path需一致
driver可自定义lock驱动实现分布式锁
//获取锁,若失败则阻塞等待直到成功,支持重入public void acquire() throws Exception  //超时获取锁,超时失败public boolean acquire(long time, TimeUnit unit) throws Exception  //释放锁public void release() throws Exception

代码实现
配置类

@Configurationpublic class CuratorCfg {    //Curator初始化    @Bean(initMethod = "start")    public CuratorFramework curatorFramework(){        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);        CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.40.240:2181", retryPolicy);//CuratorFramework build = CuratorFrameworkFactory.builder()//                .connectString("192.168.40.240:2181")//                .sessionTimeoutMs(60*1000)//                .connectionTimeoutMs(60*1000)//                .retryPolicy(new ExponentialBackoffRetry(1000,3))//                .build();        return client;    }}

业务代码

@RestControllerpublic class TestController {    @Autowired    private OrderService orderService;    @Value("${server.port}")    private String port;        public static final String product = "/product_";    @Autowired    CuratorFramework curatorFramework;    @PostMapping("/stock/deduct")    public Object reduceStock(Integer id) throws Exception {        InterProcessMutex interProcessMutex = new InterProcessMutex(curatorFramework, product + id);//互斥锁        try {            // ...            interProcessMutex.acquire();//加锁            orderService.reduceStock(id);        } catch (Exception e) {            if (e instanceof RuntimeException) {                throw e;            }        }finally {            interProcessMutex.release();//解锁        }        return "ok:" + port;    }}

全局异常处理类

@ControllerAdvicepublic class ExceptionHandlerController {    @ExceptionHandler    @ResponseStatus(value = HttpStatus.BAD_REQUEST)    @ResponseBody    public Object exceptionHandler(RuntimeException e){        Map<String,Object> result=new HashMap<>(  );        result.put( "status","error" );        result.put( "message",e.getMessage() );        return result;    }}

ZK注册中心

项目较小的话会考虑使用ZK做注册中心,原理:使用临时节点

spring.application.name=product-center1#zookeeper 连接地址spring.cloud.zookeeper.connect-string=192.168.40.240:2181#将本服务注册到zookeeper,如果不希望直接被发现可以配置为false,默认为truespring.cloud.zookeeper.discovery.register=truespring.cloud.zookeeper.session-timeout=30000

来源地址:https://blog.csdn.net/Forbidden_City/article/details/132010388

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯