这篇文章主要讲解了“Redis的分布式锁应该怎么打开”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Redis的分布式锁应该怎么打开”吧!
要求
基于Redis实现分布式锁需要满足如下几点要求:
在分布式集群中,被分布式锁控制的方法或代码段同一时刻只能被一个客户端上面的一个线程执行,也就是互斥
锁信息需要设置过期时间,避免一个线程长期占有(比如在做解锁操作前异常退出)而导致死锁
加锁与解锁必须一致,谁加的锁,就由谁来解(或过期超时),一个客户端不能解开另一个客户端加的锁
加锁与解锁的过程必须保证原子性
实现
1. 加锁实现
基于Redis的分布式锁加锁操作一般使用 SETNX 命令,其含义是“将 key 的值设为 value ,当且仅当 key 不存在。若给定的 key 已经存在,则 SETNX 不做任何动作”。
在 Spring Boot 中,可以使用 StringRedisTemplate 来实现,如下,一行代码即可实现加锁过程。(下列代码给出两种调用形式——立即返回加锁结果与给定超时时间获取加锁结果)
public boolean lock(String key, String value, long expire) { return stringRedisTemplate.opsForValue().setIfAbsent(key, value, expire, TimeUnit.SECONDS);
}public boolean lock(String key, String value, long expire, long timeout, TimeUnit unit) { long waitMillis = unit.toMillis(timeout); long waitAlready = 0; while (!stringRedisTemplate.opsForValue().setIfAbsent(key, value, expire, TimeUnit.SECONDS) && waitAlready < waitMillis) { try {
Thread.sleep(waitMillisPer);
} catch (InterruptedException e) {
log.error("Interrupted when trying to get a lock. key: {}", key, e);
}
waitAlready += waitMillisPer;
} if (waitAlready < waitMillis) { return true;
}
log.warn("<====== lock {} failed after waiting for {} ms", key, waitAlready); return false;
}
上述实现如何满足前面提到的几点要求:
客户端互斥: 可以将expire过期时间设置为大于同步代码的执行时间,比如同步代码块执行时间为1s,则可将expire设置为3s或5s。避免同步代码执行过程中expire时间到,其它客户端又可以获取锁执行同步代码块。
通过设置过期时间expire来避免某个客户端长期占有锁。
通过value来控制谁加的锁,由谁解的逻辑,比如可以使用requestId作为value,requestId唯一标记一次请求。
setIfAbsent方法 底层通过调用 Redis 的 SETNX 命令,操作具备原子性。
错误示例:
网上有如下实现,
public boolean lock(String key, String value, long expire) { boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, value); if(result) {
stringRedisTemplate.expire(key, expire, TimeUnit.SECONDS);
} return result;
}
该实现的问题是如果在result为true,但还没成功设置expire时,程序异常退出了,将导致该锁一直被占用而导致死锁,不满足第二点要求。
2. 解锁实现
解锁也需要满足前面所述的四个要求,实现代码如下:
private static final String RELEASE_LOCK_LUA_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";private static final Long RELEASE_LOCK_SUCCESS_RESULT = 1L;public boolean unLock(String key, String value) {
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(RELEASE_LOCK_LUA_SCRIPT, Long.class); long result = stringRedisTemplate.execute(redisScript, Collections.singletonList(key), value); return Objects.equals(result, RELEASE_LOCK_SUCCESS_RESULT);
}
这段实现使用一个Lua脚本来实现解锁操作,保证操作的原子性。传入的value值需与该线程加锁时的value一致,可以使用requestId(具体实现下面给出)。
错误示例:
public boolean unLock(String key, String value) {
String oldValue = stringRedisTemplate.opsForValue().get(key); if(value.equals(oldValue)) {
stringRedisTemplate.delete(key);
}
}
该实现先获取锁的当前值,判断两值相等则删除。考虑一种极端情况,如果在判断为true时,刚好该锁过期时间到,另一个客户端加锁成功,则接下来的delete将不管三七二十一将别人加的锁直接删掉了,不满足第三点要求。该示例主要是因为没有保证解锁操作的原子性导致。
3. 注解支持
为了方便使用,添加一个注解,可以放于方法上控制方法在分布式环境中的同步执行。
@Retention(RetentionPolicy.RUNTIME)@Target(ElementType.METHOD)public @interface DistributedLockable { String key(); String prefix() default "disLock:"; long expire() default 10L; // 默认10s过期}
添加一个切面来解析注解的处理,
@Aspect@Slf4jpublic class DistributedLockAspect { private DistributedLock lock; public DistributedLockAspect(DistributedLock lock) { this.lock = lock;
}
@Around(value = "@annotation(lockable)") public Object distLock(ProceedingJoinPoint point, DistributedLockable lockable) throws Throwable { boolean locked = false;
String key = lockable.prefix() + lockable.key(); try {
locked = lock.lock(key, WebUtil.getRequestId(), lockable.expire()); if(locked) { return point.proceed();
} else {
log.info("Did not get a lock for key {}", key); return null;
}
} catch (Exception e) { throw e;
} finally { if(locked) { if(!lock.unLock(key, WebUtil.getRequestId())){
log.warn("Unlock {} failed, maybe locked by another client already. ", lockable.key());
}
}
}
}
}
RequestId 的实现如下,通过注册一个Filter,在请求开始时生成一个uuid存于ThreadLocal中,在请求返回时清除。
public class WebUtil { public static final String REQ_ID_HEADER = "Req-Id"; private static final ThreadLocal<String> reqIdThreadLocal = new ThreadLocal<>(); public static void setRequestId(String requestId) {
reqIdThreadLocal.set(requestId);
} public static String getRequestId(){
String requestId = reqIdThreadLocal.get(); if(requestId == null) {
requestId = ObjectId.next();
reqIdThreadLocal.set(requestId);
} return requestId;
} public static void removeRequestId() {
reqIdThreadLocal.remove();
}
}public class RequestIdFilter implements Filter { @Override
public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
HttpServletRequest httpServletRequest = (HttpServletRequest) servletRequest;
String reqId = httpServletRequest.getHeader(WebUtil.REQ_ID_HEADER); //没有则生成一个
if (StringUtils.isEmpty(reqId)) {
reqId = ObjectId.next();
}
WebUtil.setRequestId(reqId); try {
filterChain.doFilter(servletRequest, servletResponse);
} finally {
WebUtil.removeRequestId();
}
}
}//在配置类中注册Filter@Beanpublic FilterRegistrationBean requestIdFilter() {
RequestIdFilter reqestIdFilter = new RequestIdFilter();
FilterRegistrationBean registrationBean = new FilterRegistrationBean();
registrationBean.setFilter(reqestIdFilter);
List<String> urlPatterns = Collections.singletonList("/*");
registrationBean.setUrlPatterns(urlPatterns);
registrationBean.setOrder(Ordered.HIGHEST_PRECEDENCE + 1); return registrationBean;
}
4. 使用注解
@DistributedLockable(key = "test", expire = 10)public void test(){
System.out.println("线程-"+Thread.currentThread().getName()+"开始执行..." + LocalDateTime.now()); try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程-"+Thread.currentThread().getName()+"结束执行..." + LocalDateTime.now());
}
总结
本文给出了基于Redis的分布式锁的实现方案与常见的错误示例。要保障分布式锁的正确运行,需满足本文所提的四个要求,尤其注意保证加锁解锁操作的原子性,设置过期时间,及对同一个锁的加锁解锁线程一致。
感谢各位的阅读,以上就是“Redis的分布式锁应该怎么打开”的内容了,经过本文的学习后,相信大家对Redis的分布式锁应该怎么打开这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是亿速云,小编将为大家推送更多相关知识点的文章,欢迎关注!