文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

分布式锁实战-基于Etcd的实现很优雅

2024-12-01 12:32

关注

etcd 能被Kubernetes 如此青睐,是因为它一直在聆听社区的声音并快速改进,积极配合 Kubernetes 项目向前推进,解决社区反馈的痛点;发起 V2 到 V3 的重大版本更新,尤其是 19 年由 Google、Alibaba 等公司联合打造的 3.4 版本,满足了 Kubernetes 在超大型公司大规模使用中严苛的可用性、扩展性和性能等要求。

上图描述了 etcd 名字的由来,其寓意是为大规模分布式系统提供存储配置信息;华为 Kubernetes 专家杜军老师专门为 etcd 著书《云原生分布式存储基石-etcd》,称它基石足见何等重要,另一位大咖etcd 的作者李响老师曾讲:“etcd 就是用来存储云上最重要的数据的”;相信随着云原生架构的演进,etcd 会担任越来越多重要的角色;技术人员在云原生时代,除了拥抱 Kubernetes,也要拥抱 etcd。

2.etcd vs ZooKeeper

很长一段时间 ZooKeeper(后文简称 ZK) 被作为默认首选项,用于解决分布式系统的协同和元数据存储,但其太复杂、迭代慢、难维护、性能缺陷等问题逐渐成为槽点;而 etcd 吸取了 ZK 的教训,从设计和实现上具有后见之明,提供了更好的工程和运维体验,其主要改进在于如下几个方面:

据说在一个由 3 台 8 核节点组成的云服务器上, etcd v3 版本可以做到每秒数万次的写操作和数十万次的读操作(ZK:);后续会有其他篇章结合QA的测试结果来探讨etcd服务端的设计和性能情况。

3.etcd 特性介绍

为满足本篇目标所需,也考虑到对 etcd 熟悉的读者不多,这里着重介绍以下几个关键特性:

etcd 的分布式锁正是基于以上特性来实现的,简单来说是:

二、加解锁的流程描述

1.准备客户端和 Key

client-a 线程 1 的 key 为"/lock/lock1/uuid1",申请 lock1

client-a 线程 2 的 key 为"/lock/lock1/uuid2",申请 lock1

client-c 线程 1 的 key 为"/lock/lock2/uuid3",申请 lock2

client-b 线程 1 的 key 为"/lock/lock2/uuid4",申请 lock2

2.创建租约并保持续租

持锁状态:当业务未完成时,不能让租约到期,需定时续租;当业务完成时可主动解除租约,持锁 Key 会被删除;若客户端异常,租约到期后持锁 Key 也会被删除;等锁的客户端监听到持锁 Key 被删除后,可开始抢锁。

等锁状态:等锁超时会主动解除租约,或客户端异常时等锁 key 被删除,后边排队的就前进一步,尝试抢锁。

3.绑定租约写 key

4.获取竞争锁的 key-Value 列表

5.对所获取的 Key-Value 列表按 revision 从小到大排序

6.判断自己是不是第一个(revision 最小),若是,则成功获取锁

7.若不是,则监听自己的前一个 Key-Value 的删除事件

8.若是阻塞申请锁,则申请锁的操作可增加阻塞等待

9.若监听事件生效,则回到第 4 步重新进行判断,直到获取到锁

10解锁时,将第一个 Key-Value 的租约释放

三、etcd 分布式锁的能力

可能读者是单篇阅读,这里引入第一篇《分布式锁上-初探》中的一些内容,一个分布式锁应具备这样一些功能特点:

基于上边对 etcd 分布式锁的介绍,这里简单总结一下 etcd 的能力矩阵,ZK 的情况请看《分布式锁中-基于 Zookeeper 的实现》,redis锁的情况会在后续文章中补充

能力

ZK

etcd

Redis 原生

Redlock

互斥



安全

链接异常时,session 丢失自动释放锁

基于租约,租约过期后自动释放锁,不用像ZK那样释放链接



可用性

相对可用性还好



可重入

服务端非可重入,本地线程可重入

服务端非可重入,本地线程可重入需自研



加解锁速度

速度不算快

速度快,如GRPC 协议优势、服务端性能的优势



阻塞非阻塞

客户端两种能力都提供

jetcd-core 中,阻塞非阻塞由 Future#get 的超时控制能力支撑



公平非公平

公平锁

公平锁



可续租

天然支持,基于session

天然支持,基于Lease



四、jetcd 库实现分布式锁

etcd 针对 java 语言的客户端有官方的 jetcd-core 还有 IBM 的 etcd-java,下文使用 jetcd-core 做示例。

jetcd-core 中提供了高阶的 Lock API(无需使用者准备唯一 key、前缀查询 Key-Value 列表、排序判断自己是否 revision 最小的、监听前一个 Key-Value 的删除),使用者只需关注最原始的诉求:申请的锁是什么名称、用多久,申请不到就等多久;使用高阶 API 实现分布式锁的流程会比第 2 部分原生的流程要简单许多,流程如下:

方案 1-创建定时任务定时续租

方案 2-使用自动续约

1.pom 依赖

<dependency>
<groupId>io.etcdgroupId>
<artifactId>jetcd-coreartifactId>
<version>0.7.3version>
dependency>

2.Lease 相关的 API 介绍

public interface Lease extends CloseableClient {

//创建一个租约,过期时间是ttl,单位秒;没有请求超时控制
CompletableFuture<LeaseGrantResponse> grant(long ttl);

//创建一个租约,过期时间是ttl,单位秒;后两个参数是请求超时控制
CompletableFuture<LeaseGrantResponse> grant(long ttl, long timeout, TimeUnit unit);

//解约
CompletableFuture<LeaseRevokeResponse> revoke(long leaseId);

//主动续约1次
CompletableFuture<LeaseKeepAliveResponse> keepAliveOnce(long leaseId);

//查询租约信息
CompletableFuture<LeaseTimeToLiveResponse> timeToLive(long leaseId, LeaseOption leaseOption);

//自动续约
CloseableClient keepAlive(long leaseId, StreamObserver<LeaseKeepAliveResponse> observer);
}

3.lock 相关的 API 介绍

public interface Lock extends CloseableClient {

//绑定租约申请指定名称的锁,抢锁成功返回锁秘钥,
CompletableFuture<LockResponse> lock(ByteSequence name, long leaseId);

//持锁秘钥解锁
CompletableFuture<UnlockResponse> unlock(ByteSequence lockKey);
}

4.分布式锁示例

package com.rock.dlock;

import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.Lease;
import io.etcd.jetcd.Lock;
import io.etcd.jetcd.lease.LeaseKeepAliveResponse;
import io.etcd.jetcd.lock.LockResponse;
import io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;


public class DemoEtcdLock {
private final static Logger log = LoggerFactory.getLogger(DemoEtcdLock.class);
private Client client;
private Lock lockClient;
private Lease leaseClient;

private LockState lockState;

class LockState{
private String lockKey;
private String lockPath;
private String errorMsg;
private long leaseTTL;
private long leaseId;
private boolean lockSuccess;

public LockState(String lockKey, long leaseTTL) {
this.lockKey = lockKey;
this.leaseTTL = leaseTTL;
}

public String getLockKey() {
return lockKey;
}

public void setLockKey(String lockKey) {
this.lockKey = lockKey;
}

public String getLockPath() {
return lockPath;
}

public void setLockPath(String lockPath) {
this.lockPath = lockPath;
}

public String getErrorMsg() {
return errorMsg;
}

public void setErrorMsg(String errorMsg) {
this.errorMsg = errorMsg;
}

public long getLeaseId() {
return leaseId;
}

public void setLeaseId(long leaseId) {
this.leaseId = leaseId;
}

public boolean isLockSuccess() {
return lockSuccess;
}

public void setLockSuccess(boolean lockSuccess) {
this.lockSuccess = lockSuccess;
}

public long getLeaseTTL() {
return leaseTTL;
}

public void setLeaseTTL(long leaseTTL) {
this.leaseTTL = leaseTTL;
}
}


public DemoEtcdLock(Client client, String lockKey, Long leaseTTL, TimeUnit unit) {
this.client = client;
//1.准备客户端
this.lockClient = client.getLockClient();
this.leaseClient = client.getLeaseClient();
this.lockState = new LockState(lockKey,unit.toSeconds(leaseTTL));
}


public boolean lock() {
try {
//2.创建租约,并自动续约
createLease();

//3.执行加锁,并为锁对应的Key绑定租约
createLock();
}catch (InterruptedException | ExecutionException e) {
//todo:异常处理
}
return lockState.isLockSuccess();
}

public void unlock() {
try {
//正常释放锁
if (this.lockState.getLockPath() != null) {
lockClient.unlock(ByteSequence.from(lockState.getLockPath().getBytes())).get();
}
//如果是主动续约,则关闭续约的定时任务

//删除租约
if (lockState.getLeaseId() != 0L) {
leaseClient.revoke(lockState.getLeaseId());
}
} catch (InterruptedException | ExecutionException e) {
//todo:异常处理
}
log.info("线程:{} 释放锁", Thread.currentThread().getName());
}

// 创建一个租约
private void createLease() throws ExecutionException, InterruptedException {
log.debug("[etcd-lock]: start to createLease." + this.lockState.getLockKey() + Thread.currentThread().getName());
try {
long leaseId = leaseClient.grant(this.lockState.getLeaseTTL()).get().getID();
lockState.setLeaseId(leaseId);
//自动续约
StreamObserver<LeaseKeepAliveResponse> observer = new StreamObserver<LeaseKeepAliveResponse>() {
@Override
public void onNext(LeaseKeepAliveResponse value) {
log.trace("cluster node lease remaining ttl: {}, lease id: {}", value.getTTL(), value.getID());
}

@Override
public void onError(Throwable t) {
log.error("cluster node lease keep alive failed. exception info: {}", t);
}

@Override
public void onCompleted() {
log.trace("cluster node lease completed");
}
};
// 设置自动续约
leaseClient.keepAlive(leaseId, observer);
}catch (InterruptedException | ExecutionException e) {
log.error("[etcd-lock] Create lease failed:" + e);
lockState.setErrorMsg("Create lease failed:" + e);
throw e;
}
}

private void createLock() throws ExecutionException, InterruptedException {
String lockKey = this.lockState.getLockKey();
log.debug("[etcd-lock]: start to createLock." + lockKey + Thread.currentThread().getName());
try {
LockResponse lockResponse = lockClient.lock(ByteSequence.from(lockKey.getBytes()), lockState.getLeaseId()).get();
if (lockResponse != null) {
String lockPath = lockResponse.getKey().toString(StandardCharsets.UTF_8);
this.lockState.setLockPath(lockPath);
log.info("线程:{} 加锁成功,锁路径:{}", Thread.currentThread().getName(), lockPath);
this.lockState.setLockSuccess(true);
}
}
catch (InterruptedException | ExecutionException e) {
log.error("[etcd-lock] lock failed:" + e);
lockState.setErrorMsg("[etcd-lock] lock failed:" + e);
leaseClient.revoke(this.lockState.getLeaseId());
throw e;
}
}
}

5.测试锁

package com.rock.dlock;

import io.etcd.jetcd.Client;

import java.util.concurrent.TimeUnit;


public class TestEtcdLock {
public static void main(String[] args) {
Client client = Client.builder().endpoints("http://localhost:2379").build();

DemoEtcdLock demoEtcdLock1 = new DemoEtcdLock(client,"rock",30L, TimeUnit.SECONDS);
DemoEtcdLock demoEtcdLock2 = new DemoEtcdLock(client,"rock",30L, TimeUnit.SECONDS);

boolean lock1 = demoEtcdLock1.lock();
if(lock1) {
try {
System.out.printf("do something");
} finally {
demoEtcdLock1.unlock();
}
}
demoEtcdLock1.lock();//demoEtcdLock1 持锁未释放
demoEtcdLock2.lock();//demoEtcdLock2 客户端无可重入设计,这里将会阻塞等待demoEtcdLock1释放锁
}
}

五、总结

本篇从 etcd V3 版本的 Lease、Prefix 、Watch 等关键特性切入,介绍了如何基于这些特性来实现一个分布式锁,并基于jetcd-core库提供了一个分布式锁的示例,呈现了其关键API的用法;此示例尚未达到生产级可用,如异常、可重入、可重试、超时控制等功能都未补全,计划在下一篇介绍完redis之后,再介绍一个健壮的分布式锁客户端要如何抽象设计,如何适配 ZK 、Redis 、etcd 。

本文转载自微信公众号「架构染色」,可以通过以下二维码关注。转载本文请联系【架构染色】公众号作者。


来源:架构染色内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯