此时JVM中的synchronized和lock锁,将只能对自己所在服务的JVM加锁,而跨机器,跨JMV的场景,仍然需要锁的场景就需要使用到分布式锁了。
2、为什么要使用Redis实现分布式锁?
因为Redis的性能很好,并且Redis是单线程的,天生线程安全。
并且Redis的key过期效果与Zookeeper的临时节点的效果相似,都能实现锁超时自动释放的功能。
而且Redis还可以使用lua脚本来保证redis多条命令实现整体的原子性,Redisson就是使用lua脚本的原子性来实现分布式锁的。
3、我们如何基于Redisson封装分布式锁?
1)、基于RedissonClient实现手动加锁
2)、基于AOP+Redisson封装注解版的分布式锁
3)、将分布式锁功能封装成一个starter, 引入jar包即可实现分布式锁
4、代码实现
4.1、整合封装Redisson
我们前面封装了基于Redis扩展了SpringCache,封装了
redis-cache-spring-boot-starter。
我们的分布式锁基于这个模块实现,下面引入依赖。
引入依赖
itdl-parent
com.itdl
1.0
4.0.0
redis-lock-spring-boot-starter
Redis实现分布式锁的自定义starter封装模块
${java.version}
${java.version}
com.itdl
redis-cache-spring-boot-starter
org.redisson
redisson
org.springframework.boot
spring-boot-starter-aop
编写RedisLockConfig配置RedissonClient
@Configuration // 标识为一个配置项,注入Spring容器
@AutoConfigureBefore({CustomRedisConfig.class, CacheNullValuesHandle.class})
@ConditionalOnProperty(value = "redis.enable", havingValue = "true") // 开启redis.enable=true时生效
@Slf4j
public class RedisLockConfig {
private volatile boolean isCluster = false;
private volatile String redisHostsStr = "";
@Bean
@ConditionalOnMissingBean
public RedissonClient redissonClient(CustomRedisProperties redisProperties){
// 构建配置
Config config = buildConfig(redisProperties);
RedissonClient redissonClient = Redisson.create(config);
log.info("==============创建redisClient{}版成功:{}==================", isCluster ? "集群": "单机", redisHostsStr);
return redissonClient;
}
private Config buildConfig(CustomRedisProperties redisProperties) {
final Config config = new Config();
// 根据逗号切割host列表
Set hosts = org.springframework.util.StringUtils.commaDelimitedListToSet(redisProperties.getHost());
if (CollectionUtils.isEmpty(hosts)){
throw new RuntimeException("redis host address cannot be empty");
}
// 只有一个host, 表示是单机host
if (hosts.size() == 1){
String hostPort = hosts.stream().findFirst().get();
redisHostsStr = "redis://" + hostPort.trim();
config.useSingleServer()
.setAddress(redisHostsStr)
.setDatabase(redisProperties.getDatabase())
.setPassword(StringUtils.isBlank(redisProperties.getPassword()) ? null : redisProperties.getPassword())
;
isCluster = false;
}else {
// 集群处理
String[] redisHosts = new String[hosts.size()];
int i = 0;
for (String host : hosts) {
String[] split = host.split(":");
if (split.length != 2){
throw new RuntimeException("host or port err");
}
redisHosts[i] = "redis://" + host.trim();
i++;
}
redisHostsStr = String.join(",", redisHosts);
// 配置集群
config.useClusterServers()
.addNodeAddress(redisHosts)
.setPassword(StringUtils.isBlank(redisProperties.getPassword()) ? null : redisProperties.getPassword())
// 解决Not all slots covered! Only 10922 slots are available
.setCheckSlotsCoverage(false);
isCluster = true;
}
return config;
}
}
我们配置时需要优先配置好redis-cache-spring-boot-starter,使用@AutoConfigureBefore({CustomRedisConfig.class, CacheNullValuesHandle.class})
直接使用,不再重复造轮子,然后我们根据自定义属性配置文件CustomRedisProperties来创建RedissonClient的Bean。
编写META-INF/spring.factories进行自动配置
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.itdl.lock.config.RedisLockConfig
在测试模块缓存service添加分布式锁
@Cacheable(cacheNames = "demo2#3", key = "#id")
public TestEntity getById2(Long id){
// 创建分布式锁
RLock lock = redissonClient.getLock("demo2_lock");
// 加锁
lock.lock(10, TimeUnit.SECONDS);
if (id > 1000){
log.info("id={}没有查询到数据,返回空值", id);
return null;
}
TestEntity testEntity = new TestEntity(new Random().nextLong(), UUID.randomUUID().toString(), new Random().nextInt(20) + 10);
log.info("模拟查询数据库:{}", testEntity);
// 释放锁
lock.unlock();
return testEntity;
}
我们这里的@Cacheable没有加sync=true, 此时并发请求会存在线程安全问题,但是我们在方法体局部添加了分布式锁,因此我们的程序会按照顺序执行。
如果我们的参数被定死了,最终请求会被先存储到缓存,所以后续的查询就会走缓存,这能很好的测试分布式锁的效果。
编写测试程序
@SpringBootTest
public class TestRedisLockRunner6 {
@Autowired
private MyTestService myTestService;
// 创建一个固定线程池
private ExecutorService executorService = Executors.newFixedThreadPool(16);
@Test
public void testMultiMyTestService() throws InterruptedException {
for (int i = 0; i < 100; i++) {
executorService.submit(() -> {
// 每次查询同一个参数
TestEntity t1 = myTestService.getById2(1L);
});
}
// 主线程休息10秒种
Thread.sleep(10000);
}
}
我们可以看到,结果并没有符合我们预期,但是又部分符合我们预期,为什么呢?
因为我们的@Cacheable是存在线程安全问题的,因为它先查询缓存这个操作存在并发问题,查询时就同时有N个请求进入@Cacheable, 并且都查询没有缓存。
然后同时执行方法体,但方法体加了分布式锁,所以排队进行处理,因此序号有序。
但打印数量不足总数,是因为这一批次没有全部到达@Cacheable,而是执行完毕之后才将缓存回填,所以后续的请求就是走缓存了。
解决方案:我们加上sync=true之后就能实现,只查询一次数据库,就可以回填缓存了。如果我们去掉@Cacheable注解,则会每一次都查询数据库,但是时按照顺序执行的。
加上sync=true测试
效果达到了我们的预期,继续看一下去掉@Cacheable注解的情况。
去掉@Cacheable注解测试
我们的分布式锁功能是没有问题的,但是每次我们都需要执行getLock(), lock.lock(), lock.unlock(),是不是很麻烦,能不能一个注解搞定?
当然是可以的。
4.2、封装注解版分布式锁
编写@RedisLock注解
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RedisLock {
String lockName() default "";
LockType lockType() default REENTRANT_LOCK;
long waitTime() default 30000L;
long leaseTime() default 60000L;
boolean immediatelyUnLock() default true;
TimeUnit timeUnit() default TimeUnit.MILLISECONDS;
}
编写分布式锁切面RedisLockAop
@ConditionalOnProperty(value = "redis.enable", havingValue = "true") // 开启redis.enable=true时生效
@AutoConfigureBefore(RedisLockConfig.class)
@Aspect
@Configuration
@Slf4j
public class RedisLockAop {
@Resource
private RedissonClient redissonClient;
@Pointcut("@annotation(com.itdl.lock.anno.RedisLock)")
public void pointcut(){
}
@Around("pointcut()")
public Object around(ProceedingJoinPoint joinPoint) throws Throwable {
// 获取方法
Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
// 获取方法上的分布式锁注解
RedisLock redisLock = method.getDeclaredAnnotation(RedisLock.class);
// 获取注解的参数
// 锁名称
String lockName = redisLock.lockName();
// 锁类型
LockType lockType = redisLock.lockType();
// 获取RedissonClient的Lock
RLock lock = getRLock(lockName, lockType, redisLock);
//获取到锁后, 开始执行方法,执行完毕后释放锁
try {
log.debug("=========>>>获取锁成功, 即将执行业务逻辑:{}", lockName);
Object proceed = joinPoint.proceed();
// 释放锁
if (redisLock.immediatelyUnLock()) {
//是否立即释放锁
lock.unlock();
}
log.debug("=========>>>获取锁成功且执行业务逻辑成功:{}", lockName);
return proceed;
} catch (Exception e) {
log.error("=========>>>获取锁成功但执行业务逻辑失败:{}", lockName);
e.printStackTrace();
throw new RedisLockException(LockErrCode.EXEC_BUSINESS_ERR);
}finally {
// 查询当前线程是否保持此锁定 被锁定则解锁
lock.unlock();
log.debug("=========>>>释放锁成功:{}", lockName);
}
}
private RLock getRLock(String lockName, LockType lockType, RedisLock redisLock) throws InterruptedException {
RLock lock;
switch (lockType){
case FAIR_LOCK:
lock = redissonClient.getFairLock(lockName);
break;
case READ_LOCK:
lock = redissonClient.getReadWriteLock(lockName).readLock();
break;
case WRITE_LOCK:
lock = redissonClient.getReadWriteLock(lockName).writeLock();
break;
default:
// 默认加可重入锁,也就是普通的分布式锁
lock = redissonClient.getLock(lockName);
break;
}
// 首先尝试获取锁,如果在规定时间内没有获取到锁,则调用lock等待锁,直到获取锁为止
if (lock.tryLock()) {
lock.tryLock(redisLock.waitTime(), redisLock.leaseTime(), redisLock.timeUnit());
}else {
// 如果leaseTime>0,规定时间内获取锁,超时则自动释放锁
long leaseTime = redisLock.leaseTime();
if (leaseTime > 0) {
lock.lock(redisLock.leaseTime(), redisLock.timeUnit());
} else {
// 自动释放锁时间设置为0或者负数,则加锁不设置超时时间
lock.lock();
}
}
return lock;
}
}
话不多说,封装的逻辑已经在注释中写的很清晰了。
将切面也放入自动配置spring.factories中
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.itdl.lock.config.RedisLockConfig,\
com.itdl.lock.anno.RedisLockAop
测试注解版分布式锁
@RedisLock(lockName = "demo4_lock")
public TestEntity getById4(Long id) throws InterruptedException {
index++;
log.info("current index is : {}", index);
Thread.sleep(new Random().nextInt(10) * 100);
TestEntity testEntity = new TestEntity(new Random().nextLong(), UUID.randomUUID().toString(), new Random().nextInt(20) + 10);
log.info("模拟查询数据库:{}", testEntity);
return testEntity;
}
可以看到,我们就是一个注解分布式锁的效果,而分布式锁与缓存注解通常不会一起使用,因为一般会在存在事务问题的地方我们会使用锁,在多个JMV操作同一条数据做写操作时需要加分布式锁。
编写测试程序
@SpringBootTest
public class TestRedisLockRunner6 {
@Autowired
private MyTestService myTestService;
// 创建一个固定线程池
private ExecutorService executorService = Executors.newFixedThreadPool(16);
@Test
public void testMultiMyTestService() throws InterruptedException {
for (int i = 0; i < 100; i++) {
executorService.submit(() -> {
try {
TestEntity t1 = myTestService.getById4(1L);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
// 主线程休息10秒种
Thread.sleep(60000);
}
}
测试结果
5、小结
我们将分布式锁基于缓存扩展了一版,也就是说本starter即有分布式缓存功能,又有分布式锁功能。
而注解版的分布式锁能够解决大多数场景的并核问题,小粒度的Lock锁方式补全其他场景。
将两者封装成为一个starter,我们就可以很方便的使用分布式锁功能,引入相关包即可,开箱即用。