前言
在单体应用中,如果我们对共享数据不进行加锁操作,会出现数据一致性问题,我们的解决办法通常是加锁。在分布式架构中,我们同样会遇到数据共享操作问题,此时,我们就需要分布式锁来解决问题,下面我们一起聊聊使用redis来实现分布式锁。
使用场景
- 库存超卖 比如 5个笔记本 A 看 准备买3个 B 买2个 C 4个 一下单 3+2+4 =9
- 防止用户重复下单
- MQ消息去重
- 订单操作变更
为什么要使用分布式锁
从业务场景来分析,有一个共性,共享资源的竞争,比如库存商品,用户,消息,订单等,这些资源在同一时间点只能有一个线程去操作,并且在操作期间,禁止其他线程操作。要达到这个效果,就要实现共享资源互斥,共享资源串行化。其实,就是对共享资源加锁的问题。在单应用(单进程多线程)中使用锁,我们可以使用synchronize、ReentrantLock等关键字,对共享资源进行加锁。在分布式应用(多进程多线程)中,分布式锁是控制分布式系统之间同步访问共享资源的一种方式。
如何使用分布式锁
流程图
分布式锁的状态
- 客户端通过竞争获取锁才能对共享资源进行操作
- 当持有锁的客户端对共享资源进行操作时
- 其他客户端都不可以对这个资源进行操作
- 直到持有锁的客户端完成操作
分布式锁的特点
互斥性
在任意时刻,只有一个客户端可以持有锁(排他性)
高可用,具有容错性
只要锁服务集群中的大部分节点正常运行,客户端就可以进行加锁解锁操作
避免死锁
具备锁失效机制,锁在一段时间之后一定会释放。(正常释放或超时释放)
加锁和解锁为同一个客户端
一个客户端不能释放其他客户端加的锁了
分布式锁的实现方式(以redis分布式锁实现为例)
简单版本
public class SimplyRedisLock {
// Redis分布式锁的key
public static final String REDIS_LOCK = "redis_lock";
@Autowired
StringRedisTemplate template;
public String index(){
// 每个人进来先要进行加锁,key值为"redis_lock",value随机生成
String value = UUID.randomUUID().toString().replace("-","");
try{
// 加锁
Boolean flag = template.opsForValue().setIfAbsent(REDIS_LOCK, value);
// 加锁失败
if(!flag){
return "抢锁失败!";
}
System.out.println( value+ " 抢锁成功");
// 业务逻辑
String result = template.opsForValue().get("001");
int total = result == null ? 0 : Integer.parseInt(result);
if (total > 0) {
int realTotal = total - 1;
template.opsForValue().set("001", String.valueOf(realTotal));
// 如果在抢到所之后,删除锁之前,发生了异常,锁就无法被释放,
// 释放锁操作不能在此操作,要在finally处理
// template.delete(REDIS_LOCK);
System.out.println("购买商品成功,库存还剩:" + realTotal + "件");
return "购买商品成功,库存还剩:" + realTotal + "件";
} else {
System.out.println("购买商品失败");
}
return "购买商品失败";
}finally {
// 释放锁
template.delete(REDIS_LOCK);
}
}
}
该种实现方案比较简单,但是有一些问题。假如服务运行期间挂掉了,代码完成了加锁的处理,但是没用走的finally部分,即锁没有释放,这样的情况下,锁是永远没法释放的。于是就有了改进版本。
进阶版本
public class SimplyRedisLock2 {
// Redis分布式锁的key
public static final String REDIS_LOCK = "redis_lock";
@Autowired
StringRedisTemplate template;
public String index(){
// 每个人进来先要进行加锁,key值为"redis_lock",value随机生成
String value = UUID.randomUUID().toString().replace("-","");
try{
// 加锁
Boolean flag = template.opsForValue().setIfAbsent(REDIS_LOCK, value,10L, TimeUnit.SECONDS);
// 加锁失败
if(!flag){
return "抢锁失败!";
}
System.out.println( value+ " 抢锁成功");
// 业务逻辑
String result = template.opsForValue().get("001");
int total = result == null ? 0 : Integer.parseInt(result);
if (total > 0) {
int realTotal = total - 1;
template.opsForValue().set("001", String.valueOf(realTotal));
// 如果在抢到所之后,删除锁之前,发生了异常,锁就无法被释放,
// 释放锁操作不能在此操作,要在finally处理
// template.delete(REDIS_LOCK);
System.out.println("购买商品成功,库存还剩:" + realTotal + "件");
return "购买商品成功,库存还剩:" + realTotal + "件";
} else {
System.out.println("购买商品失败");
}
return "购买商品失败";
}finally {
// 释放锁
template.delete(REDIS_LOCK);
}
}
}
这种实现方案,对key增加了一个过期时间,这样即使服务挂掉,到了过期时间之后,锁会自动释放。但是仔细想想,还是有问题。比如key值的过期时间为10s,但是业务处理逻辑需要15s的时间,这样就会导致某一个线程处理完业务逻辑之后,在释放锁,即删除key的时候,删除的key不是自己set的,而是其他线程设置的,这样就会造成数据的不一致性,引起数据的错误,从而影响业务。还需要改进。
进阶版本2-谁设置的锁,谁释放
public class SimplyRedisLock3 {
// Redis分布式锁的key
public static final String REDIS_LOCK = "redis_lock";
@Autowired
StringRedisTemplate template;
public String index(){
// 每个人进来先要进行加锁,key值为"redis_lock",value随机生成
String value = UUID.randomUUID().toString().replace("-","");
try{
// 加锁
Boolean flag = template.opsForValue().setIfAbsent(REDIS_LOCK, value,10L, TimeUnit.SECONDS);
// 加锁失败
if(!flag){
return "抢锁失败!";
}
System.out.println( value+ " 抢锁成功");
// 业务逻辑
String result = template.opsForValue().get("001");
int total = result == null ? 0 : Integer.parseInt(result);
if (total > 0) {
int realTotal = total - 1;
template.opsForValue().set("001", String.valueOf(realTotal));
// 如果在抢到所之后,删除锁之前,发生了异常,锁就无法被释放,
// 释放锁操作不能在此操作,要在finally处理
// template.delete(REDIS_LOCK);
System.out.println("购买商品成功,库存还剩:" + realTotal + "件");
return "购买商品成功,库存还剩:" + realTotal + "件";
} else {
System.out.println("购买商品失败");
}
return "购买商品失败";
}finally {
// 谁加的锁,谁才能删除!!!!
if(template.opsForValue().get(REDIS_LOCK).equals(value)){
template.delete(REDIS_LOCK);
}
}
}
}
这种方式解决了因业务复杂,处理时间太长,超过了过期时间,而释放了别人锁的问题。还会有其他问题吗?其实还是有的,finally块的判断和del删除操作不是原子操作,并发的时候也会出问题,并发就是要保证数据的一致性,保证数据的一致性,最好要保证对数据的操作具有原子性。于是还是要改进。
进阶版本3-Lua版本
public class SimplyRedisLock3 {
// Redis分布式锁的key
public static final String REDIS_LOCK = "redis_lock";
@Autowired
StringRedisTemplate template;
public String index(){
// 每个人进来先要进行加锁,key值为"redis_lock",value随机生成
String value = UUID.randomUUID().toString().replace("-","");
try{
// 加锁
Boolean flag = template.opsForValue().setIfAbsent(REDIS_LOCK, value,10L, TimeUnit.SECONDS);
// 加锁失败
if(!flag){
return "抢锁失败!";
}
System.out.println( value+ " 抢锁成功");
// 业务逻辑
String result = template.opsForValue().get("001");
int total = result == null ? 0 : Integer.parseInt(result);
if (total > 0) {
int realTotal = total - 1;
template.opsForValue().set("001", String.valueOf(realTotal));
// 如果在抢到所之后,删除锁之前,发生了异常,锁就无法被释放,
// 释放锁操作不能在此操作,要在finally处理
// template.delete(REDIS_LOCK);
System.out.println("购买商品成功,库存还剩:" + realTotal + "件");
return "购买商品成功,库存还剩:" + realTotal + "件";
} else {
System.out.println("购买商品失败");
}
return "购买商品失败";
}finally {
// 谁加的锁,谁才能删除,使用Lua脚本,进行锁的删除
Jedis jedis = null;
try{
jedis = RedisUtils.getJedis();
String script = "if redis.call('get',KEYS[1]) == ARGV[1] " +
"then " +
"return redis.call('del',KEYS[1]) " +
"else " +
" return 0 " +
"end";
Object eval = jedis.eval(script, Collections.singletonList(REDIS_LOCK), Collections.singletonList(value));
if("1".equals(eval.toString())){
System.out.println("-----del redis lock ok....");
}else{
System.out.println("-----del redis lock error ....");
}
}catch (Exception e){
}finally {
if(null != jedis){
jedis.close();
}
}
}
}
}
这种方式,规定了谁上的锁,谁才能删除,并且解决了删除操作没有原子性问题。但还没有考虑缓存,以及Redis集群部署下,异步复制造成的锁丢失:主节点没来得及把刚刚set进来这条数据给从节点,就挂了。所以还得改进。
终极进化版
public class SimplyRedisLock5 {
// Redis分布式锁的key
public static final String REDIS_LOCK = "redis_lock";
@Autowired
StringRedisTemplate template;
@Autowired
Redisson redisson;
public String index(){
RLock lock = redisson.getLock(REDIS_LOCK);
lock.lock();
// 每个人进来先要进行加锁,key值为"redis_lock"
String value = UUID.randomUUID().toString().replace("-","");
try {
String result = template.opsForValue().get("001");
int total = result == null ? 0 : Integer.parseInt(result);
if (total > 0) {
// 如果在此处需要调用其他微服务,处理时间较长。。。
int realTotal = total - 1;
template.opsForValue().set("001", String.valueOf(realTotal));
System.out.println("购买商品成功,库存还剩:" + realTotal + "件");
return "购买商品成功,库存还剩:" + realTotal + "件";
} else {
System.out.println("购买商品失败");
}
return "购买商品失败";
}finally {
if(lock.isLocked() && lock.isHeldByCurrentThread()){
lock.unlock();
}
}
}
}
这种实现方案,底层封装了多节点redis实现的分布式锁算法,有效防止单点故障,感兴趣的可以去研究一下。
总结
分析问题的过程,也是解决问题的过程,也能锻炼自己编写代码时思考问题的方式和角度。
到此这篇关于详解Redis分布式锁的原理与实现的文章就介绍到这了,更多相关Redis分布式锁内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!