文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

Redis 分布式锁的 5个坑,真是又大又深

2024-12-24 17:53

关注

引言

最近项目上线的频率颇高,连着几天加班熬夜,身体有点吃不消精神也有些萎靡,无奈业务方催的紧,工期就在眼前只能硬着头皮上了。脑子浑浑噩噩的时候,写的就不能叫代码,可以直接叫做Bug。我就熬夜写了一个bug被骂惨了。

由于是做商城业务,要频繁的对商品库存进行扣减,应用是集群部署,为避免并发造成库存超买超卖等问题,采用 redis 分布式锁加以控制。本以为给扣库存的代码加上锁lock.tryLock就万事大吉了

  1.  
  2.    public String stockLock() { 
  3.         RLock lock = redissonClient.getLock("stockLock"); 
  4.         try { 
  5.              
  6.             if (lock.tryLock(10, TimeUnit.SECONDS)) { 
  7.                  
  8.                 Integer stock = Integer.valueOf(stringRedisTemplate.opsForValue().get("stockCount")); 
  9.                  
  10.                 if (stock > 0) { 
  11.                     stock = stock - 1; 
  12.                     stringRedisTemplate.opsForValue().set("stockCount", stock.toString()); 
  13.                     LOGGER.info("库存扣减成功,剩余库存数量:{}", stock); 
  14.                 } else { 
  15.                     LOGGER.info("库存不足~"); 
  16.                 } 
  17.             } else { 
  18.                 LOGGER.info("未获取到锁业务结束.."); 
  19.             } 
  20.         } catch (Exception e) { 
  21.             LOGGER.info("处理异常", e); 
  22.         } finally { 
  23.             lock.unlock(); 
  24.         } 
  25.         return "ok"
  26.   } 

结果业务代码执行完以后我忘了释放锁lock.unlock(),导致redis线程池被打满,redis服务大面积故障,造成库存数据扣减混乱,被领导一顿臭骂,这个月绩效~ 哎·~。

随着 使用redis 锁的时间越长,我发现 redis 锁的坑远比想象中要多。就算在面试题当中redis分布式锁的出镜率也比较高,比如:“用锁遇到过哪些问题?” ,“又是如何解决的?” 基本都是一套连招问出来的。

今天就分享一下我用redis 分布式锁的踩坑日记,以及一些解决方案,和大家一起共勉。

一、锁未被释放

这种情况是一种低级错误,就是我上边犯的错,由于当前线程 获取到redis 锁,处理完业务后未及时释放锁,导致其它线程会一直尝试获取锁阻塞,例如:用Jedis客户端会报如下的错误信息

  1. redis.clients.jedis.exceptions.JedisConnectionException: Could not get a resource from the pool 

redis线程池已经没有空闲线程来处理客户端命令。

解决的方法也很简单,只要我们细心一点,拿到锁的线程处理完业务及时释放锁,如果是重入锁未拿到锁后,线程可以释放当前连接并且sleep一段时间。

  1. public void lock() { 
  2.       while (true) { 
  3.           boolean flag = this.getLock(key); 
  4.           if (flag) { 
  5.                 TODO ......... 
  6.           } else { 
  7.                 // 释放当前redis连接 
  8.                 redis.close(); 
  9.                 // 休眠1000毫秒 
  10.                 sleep(1000); 
  11.           } 
  12.         } 
  13.     } 

二、B的锁被A给释放了

我们知道Redis实现锁的原理在于 SETNX命令。当 key不存在时将 key的值设为 value ,返回值为 1;若给定的 key已经存在,则 SETNX不做任何动作,返回值为 0 。

  1. SETNX key value 

我们来设想一下这个场景:A、B两个线程来尝试给key myLock加锁,A线程先拿到锁(假如锁3秒后过期),B线程就在等待尝试获取锁,到这一点毛病没有。

那如果此时业务逻辑比较耗时,执行时间已经超过redis锁过期时间,这时A线程的锁自动释放(删除key),B线程检测到myLock这个key不存在,执行 SETNX命令也拿到了锁。

但是,此时A线程执行完业务逻辑之后,还是会去释放锁(删除key),这就导致B线程的锁被A线程给释放了。

为避免上边的情况,一般我们在每个线程加锁时要带上自己独有的value值来标识,只释放指定value的key,否则就会出现释放锁混乱的场景。

三、数据库事务超时

emm~ 聊redis锁咋还扯到数据库事务上来了?别着急往下看,看下边这段代码:

  1. @Transaction 
  2.    public void lock() { 
  3.  
  4.         while (true) { 
  5.             boolean flag = this.getLock(key); 
  6.             if (flag) { 
  7.                 insert(); 
  8.             } 
  9.         } 
  10.     } 

给这个方法添加一个@Transaction注解开启事务,如代码中抛出异常进行回滚,要知道数据库事务可是有超时时间限制的,并不会无条件的一直等一个耗时的数据库操作。

比如:我们解析一个大文件,再将数据存入到数据库,如果执行时间太长,就会导致事务超时自动回滚。

一旦你的key长时间获取不到锁,获取锁等待的时间远超过数据库事务超时时间,程序就会报异常。

一般为解决这种问题,我们就需要将数据库事务改为手动提交、回滚事务。

  1. @Autowired 
  2.     DataSourceTransactionManager dataSourceTransactionManager; 
  3.  
  4.     @Transaction 
  5.     public void lock() { 
  6.         //手动开启事务 
  7.         TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition); 
  8.         try { 
  9.             while (true) { 
  10.                 boolean flag = this.getLock(key); 
  11.                 if (flag) { 
  12.                     insert(); 
  13.                     //手动提交事务 
  14.                     dataSourceTransactionManager.commit(transactionStatus); 
  15.                 } 
  16.             } 
  17.         } catch (Exception e) { 
  18.             //手动回滚事务 
  19.             dataSourceTransactionManager.rollback(transactionStatus); 
  20.         } 
  21.     } 

四、锁过期了,业务还没执行完

这种情况和我们上边提到的第二种比较类似,但解决思路上略有不同。

同样是redis分布式锁过期,而业务逻辑没执行完的场景,不过,这里换一种思路想问题,把redis锁的过期时间再弄长点不就解决了吗?

那还是有问题,我们可以在加锁的时候,手动调长redis锁的过期时间,可这个时间多长合适?业务逻辑的执行时间是不可控的,调的过长又会影响操作性能。

要是redis锁的过期时间能够自动续期就好了。

为了解决这个问题我们使用redis客户端redisson,redisson很好的解决了redis在分布式环境下的一些棘手问题,它的宗旨就是让使用者减少对Redis的关注,将更多精力用在处理业务逻辑上。

redisson对分布式锁做了很好封装,只需调用API即可。

  1. RLock lock = redissonClient.getLock("stockLock"); 

redisson在加锁成功后,会注册一个定时任务监听这个锁,每隔10秒就去查看这个锁,如果还持有锁,就对过期时间进行续期。默认过期时间30秒。这个机制也被叫做:“看门狗”,这名字。。。

举例子:假如加锁的时间是30秒,过10秒检查一次,一旦加锁的业务没有执行完,就会进行一次续期,把锁的过期时间再次重置成30秒。

通过分析下边redisson的源码实现可以发现,不管是加锁、解锁、续约都是客户端把一些复杂的业务逻辑,通过封装在Lua脚本中发送给redis,保证这段复杂业务逻辑执行的原子性。

 

  1. @Slf4j 
  2. @Service 
  3. public class RedisDistributionLockPlus { 
  4.  
  5.      
  6.     private static final long DEFAULT_LOCK_TIMEOUT = 30; 
  7.  
  8.     private static final long TIME_SECONDS_FIVE = 5 ; 
  9.  
  10.      
  11.     private Map lockContentMap = new ConcurrentHashMap<>(512); 
  12.  
  13.      
  14.     private static final Long EXEC_SUCCESS = 1L; 
  15.  
  16.      
  17.     private static final String LOCK_SCRIPT = "if redis.call('exists', KEYS[2]) == 1 then ARGV[2] = math.floor(redis.call('get', KEYS[2]) + 10) end " + 
  18.             "if redis.call('exists', KEYS[1]) == 0 then " + 
  19.                "local t = redis.call('set', KEYS[1], ARGV[1], 'EX', ARGV[2]) " + 
  20.                "for k, v in pairs(t) do " + 
  21.                  "if v == 'OK' then return tonumber(ARGV[2]) end " + 
  22.                "end " + 
  23.             "return 0 end"
  24.  
  25.      
  26.     private static final String UNLOCK_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then " + 
  27.             "local ctime = tonumber(ARGV[2]) " + 
  28.             "local biz_timeout = tonumber(ARGV[3]) " + 
  29.             "if ctime > 0 then  " + 
  30.                "if redis.call('exists', KEYS[2]) == 1 then " + 
  31.                    "local avg_time = redis.call('get', KEYS[2]) " + 
  32.                    "avg_time = (tonumber(avg_time) * 8 + ctime * 2)/10 " + 
  33.                    "if avg_time >= biz_timeout - 5 then redis.call('set', KEYS[2], avg_time, 'EX', 24*60*60) " + 
  34.                    "else redis.call('del', KEYS[2]) end " + 
  35.                "elseif ctime > biz_timeout -5 then redis.call('set', KEYS[2], ARGV[2], 'EX', 24*60*60) end " + 
  36.             "end " + 
  37.             "return redis.call('del', KEYS[1]) " + 
  38.             "else return 0 end"
  39.      
  40.     private static final String RENEW_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('expire', KEYS[1], ARGV[2]) else return 0 end"
  41.  
  42.  
  43.     private final StringRedisTemplate redisTemplate; 
  44.  
  45.     public RedisDistributionLockPlus(StringRedisTemplate redisTemplate) { 
  46.         this.redisTemplate = redisTemplate; 
  47.         ScheduleTask task = new ScheduleTask(this, lockContentMap); 
  48.         // 启动定时任务 
  49.         ScheduleExecutor.schedule(task, 1, 1, TimeUnit.SECONDS); 
  50.     } 
  51.  
  52.      
  53.     public boolean lock(String lockKey, String requestId, long expire) { 
  54.         log.info("开始执行加锁, lockKey ={}, requestId={}", lockKey, requestId); 
  55.         for (; ; ) { 
  56.             // 判断是否已经有线程持有锁,减少redis的压力 
  57.             LockContent lockContentOld = lockContentMap.get(lockKey); 
  58.             boolean unLocked = null == lockContentOld; 
  59.             // 如果没有被锁,就获取锁 
  60.             if (unLocked) { 
  61.                 long startTime = System.currentTimeMillis(); 
  62.                 // 计算超时时间 
  63.                 long bizExpire = expire == 0L ? DEFAULT_LOCK_TIMEOUT : expire; 
  64.                 String lockKeyRenew = lockKey + "_renew"
  65.  
  66.                 RedisScript script = RedisScript.of(LOCK_SCRIPT, Long.class); 
  67.                 List keys = new ArrayList<>(); 
  68.                 keys.add(lockKey); 
  69.                 keys.add(lockKeyRenew); 
  70.                 Long lockExpire = redisTemplate.execute(script, keys, requestId, Long.toString(bizExpire)); 
  71.                 if (null != lockExpire && lockExpire > 0) { 
  72.                     // 将锁放入map 
  73.                     LockContent lockContent = new LockContent(); 
  74.                     lockContent.setStartTime(startTime); 
  75.                     lockContent.setLockExpire(lockExpire); 
  76.                     lockContent.setExpireTime(startTime + lockExpire * 1000); 
  77.                     lockContent.setRequestId(requestId); 
  78.                     lockContent.setThread(Thread.currentThread()); 
  79.                     lockContent.setBizExpire(bizExpire); 
  80.                     lockContent.setLockCount(1); 
  81.                     lockContentMap.put(lockKey, lockContent); 
  82.                     log.info("加锁成功, lockKey ={}, requestId={}", lockKey, requestId); 
  83.                     return true
  84.                 } 
  85.             } 
  86.             // 重复获取锁,在线程池中由于线程复用,线程相等并不能确定是该线程的锁 
  87.             if (Thread.currentThread() == lockContentOld.getThread() 
  88.                       && requestId.equals(lockContentOld.getRequestId())){ 
  89.                 // 计数 +1 
  90.                 lockContentOld.setLockCount(lockContentOld.getLockCount()+1); 
  91.                 return true
  92.             } 
  93.  
  94.             // 如果被锁或获取锁失败,则等待100毫秒 
  95.             try { 
  96.                 TimeUnit.MILLISECONDS.sleep(100); 
  97.             } catch (InterruptedException e) { 
  98.                 // 这里用lombok 有问题 
  99.                 log.error("获取redis 锁失败, lockKey ={}, requestId={}", lockKey, requestId, e); 
  100.                 return false
  101.             } 
  102.         } 
  103.     } 
  104.  
  105.  
  106.      
  107.     public boolean unlock(String lockKey, String lockValue) { 
  108.         String lockKeyRenew = lockKey + "_renew"
  109.         LockContent lockContent = lockContentMap.get(lockKey); 
  110.  
  111.         long consumeTime; 
  112.         if (null == lockContent) { 
  113.             consumeTime = 0L; 
  114.         } else if (lockValue.equals(lockContent.getRequestId())) { 
  115.             int lockCount = lockContent.getLockCount(); 
  116.             // 每次释放锁, 计数 -1,减到0时删除redis上的key 
  117.             if (--lockCount > 0) { 
  118.                 lockContent.setLockCount(lockCount); 
  119.                 return false
  120.             } 
  121.             consumeTime = (System.currentTimeMillis() - lockContent.getStartTime()) / 1000; 
  122.         } else { 
  123.             log.info("释放锁失败,不是自己的锁。"); 
  124.             return false
  125.         } 
  126.  
  127.         // 删除已完成key,先删除本地缓存,减少redis压力, 分布式锁,只有一个,所以这里不加锁 
  128.         lockContentMap.remove(lockKey); 
  129.  
  130.         RedisScript script = RedisScript.of(UNLOCK_SCRIPT, Long.class); 
  131.         List keys = new ArrayList<>(); 
  132.         keys.add(lockKey); 
  133.         keys.add(lockKeyRenew); 
  134.  
  135.         Long result = redisTemplate.execute(script, keys, lockValue, Long.toString(consumeTime), 
  136.                 Long.toString(lockContent.getBizExpire())); 
  137.         return EXEC_SUCCESS.equals(result); 
  138.  
  139.     } 
  140.  
  141.      
  142.     public boolean renew(String lockKey, LockContent lockContent) { 
  143.  
  144.         // 检测执行业务线程的状态 
  145.         Thread.State state = lockContent.getThread().getState(); 
  146.         if (Thread.State.TERMINATED == state) { 
  147.             log.info("执行业务的线程已终止,不再续约 lockKey ={}, lockContent={}", lockKey, lockContent); 
  148.             return false
  149.         } 
  150.  
  151.         String requestId = lockContent.getRequestId(); 
  152.         long timeOut = (lockContent.getExpireTime() - lockContent.getStartTime()) / 1000; 
  153.  
  154.         RedisScript script = RedisScript.of(RENEW_SCRIPT, Long.class); 
  155.         List keys = new ArrayList<>(); 
  156.         keys.add(lockKey); 
  157.  
  158.         Long result = redisTemplate.execute(script, keys, requestId, Long.toString(timeOut)); 
  159.         log.info("续约结果,True成功,False失败 lockKey ={}, result={}", lockKey, EXEC_SUCCESS.equals(result)); 
  160.         return EXEC_SUCCESS.equals(result); 
  161.     } 
  162.  
  163.  
  164.     static class ScheduleExecutor { 
  165.  
  166.         public static void schedule(ScheduleTask task, long initialDelay, long period, TimeUnit unit) { 
  167.             long delay = unit.toMillis(initialDelay); 
  168.             long period_ = unit.toMillis(period); 
  169.             // 定时执行 
  170.             new Timer("Lock-Renew-Task").schedule(task, delay, period_); 
  171.         } 
  172.     } 
  173.  
  174.     static class ScheduleTask extends TimerTask { 
  175.  
  176.         private final RedisDistributionLockPlus redisDistributionLock; 
  177.         private final Map lockContentMap; 
  178.  
  179.         public ScheduleTask(RedisDistributionLockPlus redisDistributionLock, Map lockContentMap) { 
  180.             this.redisDistributionLock = redisDistributionLock; 
  181.             this.lockContentMap = lockContentMap; 
  182.         } 
  183.  
  184.         @Override 
  185.         public void run() { 
  186.             if (lockContentMap.isEmpty()) { 
  187.                 return
  188.             } 
  189.             Set> entries = lockContentMap.entrySet(); 
  190.             for (Map.Entry entry : entries) { 
  191.                 String lockKey = entry.getKey(); 
  192.                 LockContent lockContent = entry.getValue(); 
  193.                 long expireTime = lockContent.getExpireTime(); 
  194.                 // 减少线程池中任务数量 
  195.                 if ((expireTime - System.currentTimeMillis())/ 1000 < TIME_SECONDS_FIVE) { 
  196.                     //线程池异步续约 
  197.                     ThreadPool.submit(() -> { 
  198.                         boolean renew = redisDistributionLock.renew(lockKey, lockContent); 
  199.                         if (renew) { 
  200.                             long expireTimeNew = lockContent.getStartTime() + (expireTime - lockContent.getStartTime()) * 2 - TIME_SECONDS_FIVE * 1000; 
  201.                             lockContent.setExpireTime(expireTimeNew); 
  202.                         } else { 
  203.                             // 续约失败,说明已经执行完 OR redis 出现问题 
  204.                             lockContentMap.remove(lockKey); 
  205.                         } 
  206.                     }); 
  207.                 } 
  208.             } 
  209.         } 
  210.     } 

五、redis主从复制的坑

redis高可用最常见的方案就是主从复制(master-slave),这种模式也给redis分布式锁挖了一坑。

redis cluster集群环境下,假如现在A客户端想要加锁,它会根据路由规则选择一台master节点写入key mylock,在加锁成功后,master节点会把key异步复制给对应的slave节点。

如果此时redis master节点宕机,为保证集群可用性,会进行主备切换,slave变为了redis master。B客户端在新的master节点上加锁成功,而A客户端也以为自己还是成功加了锁的。

此时就会导致同一时间内多个客户端对一个分布式锁完成了加锁,导致各种脏数据的产生。

至于解决办法嘛,目前看还没有什么根治的方法,只能尽量保证机器的稳定性,减少发生此事件的概率。

总结

上面就是我在使用Redis 分布式锁时遇到的一些坑,有点小感慨,经常用一个方法填上这个坑,没多久就发现另一个坑又出来了,其实根本没有什么十全十美的解决方案,哪有什么银弹,只不过是在权衡利弊后,选一个在接受范围内的折中方案而已。

来源:程序员内点事内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯