文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

redisson实现分布式锁的源码解析

2024-04-02 19:55

关注

redisson

redisson 实现分布式锁的机制如下:

依赖版本

implementation 'org.redisson:redisson-spring-boot-starter:3.17.0'

测试代码

下面是模拟一个商品秒杀的场景,示例代码如下:

public class RedissonTest {
    public static void main(String[] args) {
        //1. 配置部分
        Config config = new Config();
        String address = "redis://127.0.0.1:6379";
        SingleServerConfig serverConfig = config.useSingleServer();
        serverConfig.setAddress(address);
        serverConfig.setDatabase(0);
        config.setLockWatchdogTimeout(5000);
        Redisson redisson = (Redisson) Redisson.create(config);
        RLock rLock = redisson.getLock("goods:1000:1");
        //2. 加锁
        rLock.lock();
        try {
            System.out.println("todo 逻辑处理 1000000.");
        } finally {
            if (rLock.isLocked() && rLock.isHeldByCurrentThread()) {
                //3. 解锁
                rLock.unlock();
            }
        }
    }
}

加锁设计

rLock.lock();是加锁的核心代码,我们一起来看看调用栈

加锁的核心方法是:org.redisson.RedissonLock#tryLockInnerAsync

<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
                "if (redis.call('exists', KEYS[1]) == 0) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "return redis.call('pttl', KEYS[1]);",
                Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
    }

其实它的本质是调用一段 LUA 脚本进行加锁。

锁续期设计

锁的续期是在 org.redisson.RedissonLock#tryAcquireAsync方法中调用 scheduleExpirationRenewal实现的。

续期需要注意的是,看门狗是设置在主线程的延迟队列的线程中。

tryAcquireAsync 代码如下:

private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    RFuture<Long> ttlRemainingFuture;
    if (leaseTime != -1) {
        ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    } else {
        ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    }
    
    CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
        // lock acquired
        if (ttlRemaining == null) {
            if (leaseTime != -1) {
                internalLockLeaseTime = unit.toMillis(leaseTime);
            } else {
                // 锁过期时间续期
                scheduleExpirationRenewal(threadId);
            }
        }
        return ttlRemaining;
    });
    return new CompletableFutureWrapper<>(f);
}

锁续期 scheduleExpirationRenewal代码如下:

protected void scheduleExpirationRenewal(long threadId) {
    ExpirationEntry entry = new ExpirationEntry();
    ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
    if (oldEntry != null) {
        oldEntry.addThreadId(threadId);
    } else {
        entry.addThreadId(threadId);
        try {
            renewExpiration();
        } finally {
            if (Thread.currentThread().isInterrupted()) {
                cancelExpirationRenewal(threadId);
            }
        }
    }
}

然后在调用 renewExpiration();执行续期逻辑

private void renewExpiration() {
    ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    if (ee == null) {
        return;
    }
    // 创建延迟任务
    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
            ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
            if (ent == null) {
                return;
            }
            Long threadId = ent.getFirstThreadId();
            if (threadId == null) {
                return;
            }
            // 真正的续期,调用 LUA 脚本续期
            RFuture<Boolean> future = renewExpirationAsync(threadId);
            future.whenComplete((res, e) -> {
                if (e != null) {
                    log.error("Can't update lock " + getRawName() + " expiration", e);
                    EXPIRATION_RENEWAL_MAP.remove(getEntryName());
                    return;
                }

                // 如果续期成功
                if (res) {
                    // reschedule itself
                    renewExpiration();
                } else {
                    cancelExpirationRenewal(null);
                }
            });
        }
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
    ee.setTimeout(task);
}

renewExpirationAsync方法, 里面还是一段 LUA 脚本,进行重新设置锁的过期时间。

    protected RFuture<Boolean> renewExpirationAsync(long threadId) {
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return 1; " +
                        "end; " +
                        "return 0;",
                Collections.singletonList(getRawName()),
                internalLockLeaseTime, getLockName(threadId));
    }

锁的自旋重试

org.redisson.RedissonLock#lock(long, java.util.concurrent.TimeUnit, boolean)在执行获取锁失败的时候,会进入重试。其实这里就会执行 18 行以后的 while (true) 逻辑

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
    long threadId = Thread.currentThread().getId();
    Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
    // lock acquired
    if (ttl == null) {
        return;
    }

    CompletableFuture<RedissonLockEntry> future = subscribe(threadId);
    RedissonLockEntry entry;
    if (interruptibly) {
        entry = commandExecutor.getInterrupted(future);
    } else {
        entry = commandExecutor.get(future);
    }

    try {
        while (true) {
            ttl = tryAcquire(-1, leaseTime, unit, threadId);
            // lock acquired
            if (ttl == null) {
                break;
            }

            // waiting for message
            if (ttl >= 0) {
                try {
                    // 阻塞锁的超时时间,等锁过期后再尝试加锁
                    entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                    if (interruptibly) {
                        throw e;
                    }
                    entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                }
            } else {
                if (interruptibly) {
                    entry.getLatch().acquire();
                } else {
                    entry.getLatch().acquireUninterruptibly();
                }
            }
        }
    } finally {
        unsubscribe(entry, threadId);
    }
//        get(lockAsync(leaseTime, unit));
}

entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);其实这里就是一个间歇性自旋。 等到上次锁过期的时间,在唤醒进行抢锁 entry.getLatch().acquire();

还有一个逻辑就是

CompletableFuture future = subscribe(threadId);

这里其实是会订阅一个消息,如果解锁过后,会发布解锁的消息。

解锁设计

rLock.unlock(); 的核心就是释放锁,撤销续期和唤醒在等待加锁的线程(发布解锁成功消息)。

核心方法(解锁): org.redisson.RedissonLock#unlockInnerAsync

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                        "return nil;" +
                        "end; " +
                        "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                        "if (counter > 0) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                        "return 0; " +
                        "else " +
                        "redis.call('del', KEYS[1]); " +
                        // 发布解锁成功消息
                        "redis.call('publish', KEYS[2], ARGV[1]); " +
                        "return 1; " +
                        "end; " +
                        "return nil;",
                Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
    }

还是 LUA 的执行方式。

撤销锁续期

核心方法 org.redisson.RedissonBaseLock#unlockAsync(long)

@Override
public RFuture<Void> unlockAsync(long threadId) {
    // 解锁
    RFuture<Boolean> future = unlockInnerAsync(threadId);
    // 撤销续期
    CompletionStage<Void> f = future.handle((opStatus, e) -> {
        cancelExpirationRenewal(threadId);
        if (e != null) {
            throw new CompletionException(e);
        }
        if (opStatus == null) {
            IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                    + id + " thread-id: " + threadId);
            throw new CompletionException(cause);
        }
        return null;
    });
    return new CompletableFutureWrapper<>(f);
}

解锁成功唤排队线程

org.redisson.pubsub.LockPubSub#onMessage中回去唤醒阻塞的线程,让执行前面的锁自旋逻辑,具体代码如下:

@Override
protected void onMessage(RedissonLockEntry value, Long message) {
    if (message.equals(UNLOCK_MESSAGE)) {
        Runnable runnableToExecute = value.getListeners().poll();
        if (runnableToExecute != null) {
            runnableToExecute.run();
        }
        value.getLatch().release();
    } else if (message.equals(READ_UNLOCK_MESSAGE)) {
        while (true) {
            Runnable runnableToExecute = value.getListeners().poll();
            if (runnableToExecute == null) {
                break;
            }
            runnableToExecute.run();
        }
        value.getLatch().release(value.getLatch().getQueueLength());
    }
}

到此这篇关于redisson 实现分布式锁的文章就介绍到这了,更多相关redisson 分布式锁内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     801人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     348人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     311人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     432人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     220人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯