Redis-实践篇-Redis实现分布式锁
Redis-实践篇-Redis实现分布式锁
学习核心
- Redis 实现分布式锁实践
学习资料
实践版本说明
- Redis 5.0.14.1
- Springboot 2.7.6
基于Redis实现分布式锁(JAVA版本)
Redis 实现分布式锁核心思路:通过set指令配置相关参数构建分布式锁的特性:SET key value [EX seconds] [PX milliseconds] [NX|XX]
将字符串值 value(确保唯一) 关联到 key ,如果 key 已经持有其他值, SET 就覆写旧值,无视类型。
TTL 设置key的过期时间
- EX second :设置键的过期时间为 second 秒。 SET key value EX second 效果等同于 SETEX key second value
- PX millisecond :设置键的过期时间为 millisecond 毫秒。 SET key value PX millisecond 效果等同于 PSETEX key millisecond value
- NX :只在键不存在时,才对键进行设置操作。 SET key value NX 效果等同于 SETNX key value
- XX :只在键已经存在时,才对键进行设置操作
加锁:**SET key value [PX milliseconds] [NX]**命令
- 如果key不存在,设置value,并设置过期时间(加锁成功)
- 如果已经存在lock(也就是有客户端持有锁了),则设置失败(加锁失败)
解锁:del 命令,通过删除键释放锁
- 释放锁之后,其他客户端可以通过set命令进行加锁
1.编程式分布式锁
加解锁代码示例
public void doSomething() {
try {
// 上锁
redisLock.lock();
// 处理业务
...
} catch (Exception e) {
e.printStackTrace();
} finally {
// 释放锁
redisLock.unlock();
}
}
构建参考
构建springboot项目,引入redis相关依赖
<!--引入Jedis依赖-->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<!-- <version>4.2.0</version> -->
<version>2.9.3</version>
</dependency>
加锁设置的参数(锁的属性配置)
/**
* 加锁设置的参数
*/
@Data
public class LockParam {
// 锁的key
private String lockKey;
// 尝试获得锁的时间(单位:毫秒),默认值:3000毫秒
private Long tryLockTime;
// 尝试获得锁后,持有锁的时间(单位:毫秒),默认值:5000毫秒
private Long holdLockTime;
// 构造函数:设定锁键值
public LockParam(String lockKey){
this(lockKey,1000*3L,1000*5L);
};
public LockParam(String lockKey,Long tryLockTime){
this(lockKey,tryLockTime,1000*5L);
};
public LockParam(String lockKey,Long tryLockTime,Long holdLockTime){
this.lockKey = lockKey;
this.tryLockTime = tryLockTime;
this.holdLockTime = holdLockTime;
};
}
分布式锁操作类
/**
* redis分布式锁
*/
@Slf4j
public class RedisLock {
// 锁key的前缀
private final static String prefix_key = "redisLock:";
// 释放锁的lua脚本
private final static String unLockScript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
// 执行unLockScript脚本,释放锁成功值
private final static Long unLockSuccess = 1L;
// 加锁设置的参数(key值、超时时间、持有锁的时间)
private LockParam lockParam;
// 尝试获得锁的截止时间【lockParam.getTryLockTime()+System.currentTimeMillis()】
private Long tryLockEndTime;
// redis加锁的key
private String redisLockKey;
// redis加锁的value
private String redisLockValue;
// redis加锁的成功标示
private Boolean holdLockSuccess = Boolean.FALSE;
// jedis实例
private Jedis jedis;
// 获取jedis实例
private Jedis getJedis() {
return this.jedis;
}
// 关闭jedis
private void closeJedis(Jedis jedis) {
jedis.close();
jedis = null;
}
public RedisLock(LockParam lockParam) {
if (lockParam == null) {
throw new RuntimeException("lockParam is null");
}
if (lockParam.getLockKey() == null || lockParam.getLockKey().trim().isEmpty()) {
throw new RuntimeException("lockParam lockKey is error");
}
this.lockParam = lockParam;
this.tryLockEndTime = lockParam.getTryLockTime() + System.currentTimeMillis();
this.redisLockKey = prefix_key.concat(lockParam.getLockKey());
this.redisLockValue = UUID.randomUUID().toString().replaceAll("-", "");
// todo 自定义获取Jedis实例的实现
jedis = new Jedis("127.0.0.1", 6379);
}
/**
* 加锁
*
* @return 成功返回true,失败返回false
*/
public boolean lock() {
while (true) {
// 判断是否超过了,尝试获取锁的时间
if (System.currentTimeMillis() > tryLockEndTime) {
return false;
}
// 尝试获取锁
holdLockSuccess = tryLock();
if (Boolean.TRUE.equals(holdLockSuccess)) {
return true;//获取锁成功
}
try {
// 获得锁失败,休眠50毫秒再去尝试获得锁,避免一直请求redis,导致redis cpu飙升
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 执行一次加锁操作:成功返回true 失败返回false
*
* @return 成功返回true,失败返回false
*/
private boolean tryLock() {
try {
// todo 指令版本确认
String result = getJedis().set(redisLockKey, redisLockValue, "NX", "PX", lockParam.getHoldLockTime()); // jedis 2.9.3 版本
if ("OK".equals(result)) {
return true;
}
} catch (Exception e) {
log.warn("tryLock failure redisLockKey:{} redisLockValue:{} lockParam:{}", redisLockKey, redisLockValue, lockParam, e);
}
return false;
}
/**
* 解锁
*
* @return 成功返回true,失败返回false
*/
public Boolean unlock() {
Object result = null;
try {
// 获得锁成功,才执行lua脚本
if (Boolean.TRUE.equals(holdLockSuccess)) {
// 执行Lua脚本
result = getJedis().eval(unLockScript, Collections.singletonList(redisLockKey), Collections.singletonList(redisLockValue));
if (unLockSuccess.equals(result)) {// 释放成功
return true;
}
}
} catch (Exception e) {
log.warn("unlock failure redisLockKey:{} redisLockValue:{} lockParam:{} result:{}", redisLockKey, redisLockValue, lockParam, result, e);
} finally {
this.closeJedis(jedis);
}
return false;
}
}
分布式锁测试demo
/**
* Redis 实现分布式锁 使用案例
*/
@Slf4j
public class RedisLockDemo {
// 设置锁键 lockKey
static String lockKey = "666";
public static void main(String[] args) throws InterruptedException {
// 测试1:验证分布式锁的互斥性、安全性
log.info("测试:测试两个线程同时抢占锁的结果");
Thread thread1 = new Thread(()->{
// 尝试获得锁时间2秒,获得锁成功后持有锁时间10秒,模拟业务代码执行时间5秒
testRedisLock(2000L,1000*10L,1000*5L);
});
thread1.setName("线程1");
Thread thread2 = new Thread(()->{
// 尝试获得锁时间2秒,获得锁成功后持有锁时间10秒,模拟业务代码执行时间5秒
testRedisLock(2000L,1000*10L,1000*5L);
});
thread2.setName("线程2");
// 同时启动线程
thread1.start();
thread2.start();
// 沉睡一段时间,确保上述所有业务执行完成
Thread.sleep(1000*20);
log.info("");
log.info("---------------------------------分割线--------------------------------");
log.info("");
// 测试2:验证分布式锁的对称性(谁申请谁释放)
log.info("测试:验证一个线程获取锁成功后,由于业务执行时间超过了设置持有锁的时间,是否会把其他线程持有的锁给释放掉");
Thread thread3 = new Thread(()->{
// 业务执行时间超过锁的持有时间,业务执行过程中锁过期
testRedisLock(1000*2L,1000*2L,1000*10L);
});
thread3.setName("线程3");
thread3.start();
Thread.sleep(1000*1);// 多暂停一秒是为了让线程3获取到锁
Thread thread4 = new Thread(()->{
// 线程4
testRedisLock(2000L,1000*20L,1000*15L);
});
thread4.setName("线程4");
thread4.start(); // 线程4启动,此时线程3持有的锁被释放,线程4可以正常获取锁
}
/**
* 测试获取Redis分布式锁
*/
public static void testRedisLock(Long tryLockTime,Long holdLockTime,Long businessTime){
LockParam lockParam = new LockParam(lockKey);
lockParam.setTryLockTime(tryLockTime);// 尝试获得锁时间设定
lockParam.setHoldLockTime(holdLockTime);// 获得锁成功后持有锁时间设定
RedisLock redisLock = new RedisLock(lockParam);
try {
Boolean lockFlag = redisLock.lock();
log.info("加锁结果:{}",lockFlag);
if(lockFlag){
try {
// 模拟处理业务代码时间
Thread.sleep(businessTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}catch (Exception e) {
log.info("testRedisLock e---->",e);
}finally {
boolean unlockResp = redisLock.unlock();
log.info("释放锁结果:{}",unlockResp);
}
}
}
测试结果
# output
15:45:02.043 [main] INFO com.noob.redis.lock.RedisLockDemo - 测试:测试两个线程同时抢占锁的结果
15:45:02.167 [线程1] INFO com.noob.redis.lock.RedisLockDemo - 加锁结果:true
15:45:04.116 [线程2] INFO com.noob.redis.lock.RedisLockDemo - 加锁结果:false
15:45:04.116 [线程2] INFO com.noob.redis.lock.RedisLockDemo - 释放锁结果:false
15:45:07.178 [线程1] INFO com.noob.redis.lock.RedisLockDemo - 释放锁结果:true
15:45:22.079 [main] INFO com.noob.redis.lock.RedisLockDemo -
15:45:22.079 [main] INFO com.noob.redis.lock.RedisLockDemo - ---------------------------------分割线--------------------------------
15:45:22.079 [main] INFO com.noob.redis.lock.RedisLockDemo -
15:45:22.079 [main] INFO com.noob.redis.lock.RedisLockDemo - 测试:验证一个线程获取锁成功后,由于业务执行时间超过了设置持有锁的时间,是否会把其他线程持有的锁给释放掉
15:45:22.083 [线程3] INFO com.noob.redis.lock.RedisLockDemo - 加锁结果:true
15:45:24.129 [线程4] INFO com.noob.redis.lock.RedisLockDemo - 加锁结果:true
15:45:32.084 [线程3] INFO com.noob.redis.lock.RedisLockDemo - 释放锁结果:false
15:45:39.142 [线程4] INFO com.noob.redis.lock.RedisLockDemo - 释放锁结果:true
结合结果分析,测试1:两个线程同时抢占锁,其抢占的结果并不是局限于哪个线程(可能是线程1也可能是线程2),但必定是遵循互斥性和安全性的原则,同一时刻不会出现多个线程都占用同一把锁。
测试2:线程3获取到锁,但是由于其业务执行时间超过了设置的持有锁的时间,就会导致在线程3执行业务的过程中,锁自动过期失效了。此时线程4启动刚好可以获取到锁,然后执行业务逻辑,但是线程4执行业务逻辑时间较长,在这个时候线程3执行完成需要释放锁(释放失败,因为校验该锁并不归属自身),线程4业务逻辑执行完成,此时锁还没过期,因此可以成功释放锁
实际使用注意问题
假设有这样一个场景: 有一个修改订单状态的接口,订单状态修改为失败,就不允许修改为其他状态了;在单台机器上,在代码方法上加了synchronized来做并发控制,由于代码逻辑比较复杂,假设目前它的TPS是1,一秒就只能处理一个订单。后面对这个系统做集群,部署了一百台,那么这个接口性能就提升了100倍了。但是synchronized是进程级别的锁,在集群环境下synchronized没办法控制其他服务器下线程并发访问临界代码,因此采用了分布式锁来做并发控制
基于上述构建的分布式锁,需要注意其在实际场景中的应用问题。可以结合下述几个方面进行分析:
锁设置:锁粒度控制(lockKey设置)
如果分布式锁的key 设置的是 redisLock:updateOrderStatus 相当于集群下对这个接口加了相同的一把大锁,按照上面那个场景TPS就变成1了,集群部署就浪费了。而实际上锁针对的不是整个接口的控制,而是针对修改订单这一操作,因此可以将锁粒度进行细化,让其对某个订单进行加锁(可以理解为将数据库表锁转化为行锁),让接口支持更高的并发场景。
例如可以将分布式的key设置为:redisLock:updateOrderStatus:{orderCode} ,{orderCode}执行的时候动态的替换成订单编号,则锁粒度就控制到这条订单
锁设置:获取锁的时间(tryLockTime)
此处tryLockTime的设置可以理解为一种重试机制的设计,在设定的tryLockTime范围内,线程会一直不断地尝试获取锁(尝试加锁)直至成功。如果这段时间没有获取成功,则最终认为加锁失败。
因此针对tryLockTime的设置要结合不同的业务,衡量大概需要设置多长的时间。
- 如果设置太短:可能用户多次操作,频繁获取锁失败,告知业务执行失败,用户体验不好
- 如果设置太长:用户等待太久才能得到响应结果
锁设置:持有锁的时间(holdLockTime)
holdLockTime 即为指令中锁的过期时间设定
如果锁粒度比较小,时间可以设置长一点,就算 业务代码较复杂 执行比较耗时,对客户的影响也较小 比较容易可以接受
分布式锁的使用
结合上述案例分析,可以看到每次使用一个分布式锁的流程都是清一色的固有流程:
- 【1】创建分布式锁对象:RedisLock redisLock = new RedisLock(lockParam);
- 【2】加锁:Boolean lockFlag = redisLock.lock();
- 【3】执行业务:if(lockFlag)
- 【4】finally 释放锁:redisLock.unlock();
这个流程中除了步骤【3】其他的步骤都是固化的,它的流程很像Spring的编程式事务,因此可以联想到Spring的声明式事务,思考是否可以通过Spring AOP + 自定义注解的方式来简化分布式锁的使用。
- 编程式分布式锁:每次实现都要额外编程,与业务代码耦合,存在一定的工作量
- 声明式分布式锁:无侵入式的横向切入,不会影响业务代码逻辑(但需注意AOP失效的场景)
2.声明式分布式锁
构建参考
基于上述编程式分布式锁实现,进一步优化其使用,构建声明式分布式锁
构建springboot 项目 引入aop依赖
<!-- SpringBoot AOP start -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-aop</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
</dependency>
<!-- SpringBoot AOP end -->
启动类开启AOP配置
@EnableAspectJAutoProxy
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
自定义注解:DistributionLock、DistributionLockParam
/**
* 通常和DistributionLockParam 注解配合使用,降低锁粒度
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface DistributionLock {
/**
* 分布式锁key
*/
String key();
/**
* 尝试获得锁的时间(单位:毫秒),默认值:5000毫秒
*
* @return 锁key过期时间
*/
long tryLockTime() default 5000;
/**
* 尝试获得锁后,持有锁的时间(单位:毫秒),默认值:60000毫秒
*
* @return
*/
long holdLockTime() default 60000;
/**
* 分布式锁key 的分隔符(默认 :)
*/
String delimiter() default ":";
}
/**
* 分布式锁参数:用于给DistributionLock用来控制锁粒度
*/
@Target({ElementType.PARAMETER, ElementType.METHOD, ElementType.FIELD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface DistributionLockParam {
}
AOP实现
/**
* 切面类 对springboot中aop切面编程
*/
@Aspect
@Component
@Slf4j
@Order(Ordered.HIGHEST_PRECEDENCE)
public class RedisAopAspect {
public RedisAopAspect(){
log.info("分布锁 aop init");
}
/***
* 定义切入点
*/
@Pointcut("execution(@com.noob.redis.aopLock.annotation.DistributionLock * *(..))")
public void pointCut(){
}
@Around(value = "pointCut()")
public Object aroundMethod(ProceedingJoinPoint pjp) {
MethodSignature signature = (MethodSignature) pjp.getSignature();
Method method = signature.getMethod();
/////////////////////AOP 能取得的信息 start////////////////////////
// log.info("目标方法名为:{}",pjp.getSignature().getName());
// log.info("目标方法所属类的简单类名:{}" , pjp.getSignature().getDeclaringType().getSimpleName());
// log.info("目标方法所属类的类名:{}", pjp.getSignature().getDeclaringTypeName());
// log.info("目标方法声明类型:{}" , Modifier.toString(pjp.getSignature().getModifiers()));
// log.info("目标方法返回值类型:{}" , method.getReturnType());
// //获取传入目标方法的参数
// Object[] args = pjp.getArgs();
// for (int i = 0; i < args.length; i++) {
// log.info("第{}个参数为:{}" ,(i + 1) , args[i]);
// }
// log.info("被代理的对象:{}" , pjp.getTarget());
// log.info("代理对象自己:{}" , pjp.getThis());
/////////////////////AOP 能取得的信息 end////////////////////////
//取得注解对象数据
DistributionLock lock = method.getAnnotation(DistributionLock.class);
//分布式锁实际的key
String lockKey = getRealDistributionLockKey(pjp,lock);
//创建分布式锁对象 start
LockParam lockParam = new LockParam(lockKey,lock.tryLockTime(),lock.holdLockTime());
RedisLock redisLock = new RedisLock(lockParam);
//创建分布式锁对象 end
//获取锁
Boolean holdLock = redisLock.lock();
log.info("lockKey:{} holdLock:{} ",lockKey,holdLock);
if(Boolean.FALSE.equals(holdLock)){
//获取锁失败后,处理返回结果
return handleAcquireLockFailReturn(pjp);
}
try {
return pjp.proceed();
} catch (Throwable e) {
e.printStackTrace();
throw new RuntimeException(e.getMessage());
}finally {
if(redisLock!=null){
Boolean unlock = redisLock.unlock();
log.info("释放锁:unlock {}",unlock);
}
}
}
/**
* 分布式锁获取失败,处理方法
* @param pjp
* @return
*/
public Object handleAcquireLockFailReturn(ProceedingJoinPoint pjp){
MethodSignature signature = (MethodSignature) pjp.getSignature();
Method method = signature.getMethod();
Class returnType = method.getReturnType();
// 开发规范:通常项目都有自己的统一的返回对象,Resp.class可以根据自己现有的进行构建
if(returnType.equals(Resp.class) ){
log.info("返回值类型 Resp");
return Resp.buildFail("业务处理繁忙,请稍后重试");
}
throw new RuntimeException("获取锁失败");
}
/**
* 加了DistributionLockParam注解参数值,按照顺序返回list
* @param pjp
* @return
*/
public List<Object> getDistributionLockParamList(ProceedingJoinPoint pjp){
ArrayList<Object> distributionLockParamList = null;
MethodSignature signature = ((MethodSignature) pjp.getSignature());
//得到拦截的方法
Method method = signature.getMethod();
//获取方法参数注解,返回二维数组是因为某些参数可能存在多个注解
Annotation[][] parameterAnnotations = method.getParameterAnnotations();
// log.info("parameterAnnotations:{}",parameterAnnotations);
//获取全部参数
Object[] objects = pjp.getArgs();
for(int i = 0; i < parameterAnnotations.length; i++){
for(Annotation a: parameterAnnotations[i]){
if(a.annotationType() == DistributionLockParam.class){
//初始化distributionLockParamList
if(distributionLockParamList==null){
distributionLockParamList = new ArrayList();
}
//获得参数值
Object o = objects[i];
distributionLockParamList.add(o);
}
}
}
return distributionLockParamList;
}
/**
* 加了DistributionLockParam注解参数值,拼接成字符串
* @param pjp
* @param lock
* @return
*/
public String getDistributionLockParamStr(ProceedingJoinPoint pjp,DistributionLock lock){
List<Object> distributionLockParamList = getDistributionLockParamList(pjp);
if(distributionLockParamList!=null && distributionLockParamList.size()>0){
StringBuffer sb = new StringBuffer();
for (int i = 0; i < distributionLockParamList.size(); i++) {
Object param = distributionLockParamList.get(i);
sb.append(lock.delimiter());
sb.append(param);
}
return sb.toString();
}
return "";
}
/**
* 返回分布式锁key完整的key
* @param pjp
* @param lock
* @return
*/
public String getRealDistributionLockKey(ProceedingJoinPoint pjp,DistributionLock lock){
String distributionLockParamStr = getDistributionLockParamStr(pjp,lock);
return lock.key().concat(distributionLockParamStr);
}
}
Service 层 构建(使用自定义注解引入分布式锁)
public interface IOrderService {
Resp updateOrder(String orderCode, Integer userId, Integer status);
}
@Slf4j
@Service
public class OrderServiceImpl implements IOrderService {
@DistributionLock(key = "updateOrderStatus",tryLockTime = 1000)
@Override
public Resp updateOrder(@DistributionLockParam String orderCode, Integer userId, Integer status){
try {
log.info("updateOrder 处理业务 start");
// 模拟处理业务3s
Thread.sleep(1000*3);
log.info("updateOrder 处理业务 end");
} catch (InterruptedException e) {
e.printStackTrace();
}
return Resp.buildSuccess("修改订单状态成功");
}
}
controller 层构建
@Slf4j
@RestController
@RequestMapping(value = "/v1/test")
public class OrderController {
@Autowired
IOrderService orderService;
/*
// Swagger 配置
@ApiOperation(value = "修改订单状态")
@ApiImplicitParams({
@ApiImplicitParam(name = "orderCode", value = "订单编号", paramType = "query"),
@ApiImplicitParam(name = "userId", value = "用户ID", paramType = "query"),
@ApiImplicitParam(name = "status", value = "订单状态 1:未发货 2:已发货 3:完成", paramType = "query"),
})
*/
@RequestMapping(value = "/updateOrderStatus", method = RequestMethod.PUT)
public Resp updateOrderStatus(@RequestParam(value = "orderCode")String orderCode,
@RequestParam(value = "userId")Integer userId,
@RequestParam(value = "status")Integer status){
log.info("updateOrderStatus reqParam:orderCode:{},userId:{},status:{}",orderCode,userId,status);
return orderService.updateOrder(orderCode,userId,status);
}
}
测试
启动DemoApplication,访问接口:http://127.0.0.1:8080/api/v1/test/updateOrderStatus
3.可重入锁(基于Redisson构建的分布式锁)
基于上述构建的分布式锁存在一个问题,即不可重入。因此针对redis的重入锁业界还是有很多解决方案的,当前比较流行的就是采用Redisson。
Redisson是Redis官方推荐的Java版的Redis客户端。 它基于Java实用工具包中常用接口,为使用者提供了一系列具有分布式特性的常用工具类。 它在网络通信上是基于NIO的Netty框架,保证网络通信的高性能。 在分布式锁的功能上,它提供了一系列的分布式锁;如:可重入锁(Reentrant Lock)、公平锁(Fair Lock、联锁(MultiLock)、 红锁(RedLock)、 读写锁(ReadWriteLock)等等。
构建参考
构建Springboot 项目,引入redisson相关依赖
<!-- redisson start -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.15.5</version>
</dependency>
<!-- redisson start -->
构建测试Demo
@Slf4j
public class ReentrantLockDemo {
// 锁
public static RLock lock;
static {
// Redisson需要的配置
Config config = new Config();
String node = "127.0.0.1:6379";//redis地址
node = node.startsWith("redis://") ? node : "redis://" + node;
SingleServerConfig serverConfig = config.useSingleServer()
.setAddress(node)
.setTimeout(3000)//超时时间
.setConnectionPoolSize(10)
.setConnectionMinimumIdleSize(10);
// serverConfig.setPassword("123456");//设置redis密码
// 创建RedissonClient客户端实例
RedissonClient redissonClient = Redisson.create(config);
// 创建redisson的分布式锁
RLock rLock = redissonClient.getLock("666");
lock = rLock;
}
/**
* 模拟业务操作
* @param n
*/
public void doSomething(int n) {
try {
// 进入递归第一件事:加锁
lock.lock();
log.info("--------lock()执行后,getState()的值:{} lock.isLocked():{}", lock.getHoldCount(), lock.isLocked());
log.info("--------递归{}次--------", n);
if (n <= 2) {
this.doSomething(++n);
} else {
return;
}
} finally {
lock.unlock();
log.info("--------unlock()执行后,getState()的值:{} lock.isLocked():{}", lock.getHoldCount(), lock.isLocked());
}
}
public static void testRedissonLock() {
log.info("--------------start---------------");
ReentrantLockDemo reentrantLockDemo = new ReentrantLockDemo();
reentrantLockDemo.doSomething(1);
log.info("执行完doSomething方法 是否还持有锁:{}", ReentrantLockDemo.lock.isLocked());
log.info("--------------end---------------");
}
public static void main(String[] args) {
testRedissonLock();
}
}
测试
# output
17:12:58.876 [main] INFO org.redisson.Version - Redisson 3.15.5
17:12:59.318 [redisson-netty-2-13] INFO org.redisson.connection.pool.MasterPubSubConnectionPool - 1 connections initialized for /127.0.0.1:6379
17:12:59.318 [redisson-netty-2-16] INFO org.redisson.connection.pool.MasterConnectionPool - 10 connections initialized for /127.0.0.1:6379
17:12:59.343 [main] INFO com.noob.redis.redisson.ReentrantLockDemo - --------------start---------------
17:12:59.363 [main] INFO com.noob.redis.redisson.ReentrantLockDemo - --------lock()执行后,getState()的值:1 lock.isLocked():true
17:12:59.363 [main] INFO com.noob.redis.redisson.ReentrantLockDemo - --------递归1次--------
17:12:59.365 [main] INFO com.noob.redis.redisson.ReentrantLockDemo - --------lock()执行后,getState()的值:2 lock.isLocked():true
17:12:59.366 [main] INFO com.noob.redis.redisson.ReentrantLockDemo - --------递归2次--------
17:12:59.367 [main] INFO com.noob.redis.redisson.ReentrantLockDemo - --------lock()执行后,getState()的值:3 lock.isLocked():true
17:12:59.367 [main] INFO com.noob.redis.redisson.ReentrantLockDemo - --------递归3次--------
17:12:59.369 [main] INFO com.noob.redis.redisson.ReentrantLockDemo - --------unlock()执行后,getState()的值:2 lock.isLocked():true
17:12:59.371 [main] INFO com.noob.redis.redisson.ReentrantLockDemo - --------unlock()执行后,getState()的值:1 lock.isLocked():true
17:12:59.372 [main] INFO com.noob.redis.redisson.ReentrantLockDemo - --------unlock()执行后,getState()的值:0 lock.isLocked():false
17:12:59.373 [main] INFO com.noob.redis.redisson.ReentrantLockDemo - 执行完doSomething方法 是否还持有锁:false
17:12:59.373 [main] INFO com.noob.redis.redisson.ReentrantLockDemo - --------------end---------------
结合测试结果分析,Redisson支持可重入锁
Redisson 如何实现可重入锁?
可以跟踪lock.lock()代码实现进行分析,发现它最终调用的是org.redisson.RedissonLock#tryLockInnerAsync的方法
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}
// lua脚本
if (redis.call('exists', KEYS[1]) == 0) then
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return 1;
end ;
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return 1;
end ;
return ;
上述Redis命令梳理如下:
exists 查询一个key是否存在
- EXISTS key [key ...]:返回值(0 如果key不存在;1 如果key存在)
hincrby :将hash中指定域的值增加给定的数字
pexpire:设置key的有效时间以毫秒为单位
hexists:判断field是否存在于hash中
pttl:获取key的有效毫秒数
lua脚本传入的参数:
KEYS[1] = key的值
ARGV[1]) = 持有锁的时间
ARGV[2] = getLockName(threadId) ,系统在启动的时候会全局生成的uuid 来作为当前进程的id,加上线程id就是getLockName(threadId),即进程ID+系统ID = ARGV[2]
protected String getLockName(long threadId) { return id + ":" + threadId; }
基于上述代码分析,其使用lua脚本保证多个命令执行的原子性,使用了hash来实现了分布式锁
(1)lua 脚本的加锁流程
第一个if判断
- 先判断了当前key是否存在(返回值是0说明key不存在,说明没有加锁)
- hincrby命令是对 ARGV[2] = 进程ID+系统ID 进行原子自增加1
- 是对整个hash设置过期期间
看第二个if判断
- 判断field是否存在于hash中,如果存在返回1,返回1说明是当前进程+当前线程ID 之前已经获得到锁了
- hincrby命令是对 ARGV[2] = 进程ID+系统ID 进行原子自增加1,说明重入次数加1了
- 再对整个hash设置过期期间
通过断点查看客户端中对应键值的存储情况
(2)解锁代码
解锁代码位于 org.redisson.RedissonLock#unlockInnerAsync
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"return nil;",
Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
// lua 脚本
if (redis.call('hexists', KEYS[1], ARGV[1]) == ) then
return nil;
end ;
-- 计算当前可重入次数
local counter = redis.call('hincrby', KEYS[1], ARGV[1], -1);
-- 小于等于 代表可以解锁
if (counter > ) then
return ;
else
redis.call('del', KEYS[1]);
return 1;
end ;
return nil;
- if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) 判断锁是否存在
- redis.call('hincrby', KEYS[1], ARGV[3], -1) 加锁次数原子自减
- if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); 自减后当前线程还持有锁(counter > 0),更新下锁的过期时间
- counter > 0 == false 走else逻辑(当前持有锁的线程 都解锁完成,删除该锁 )
- redis.call('publish', KEYS[2], ARGV[1]); 发一个通知队列,让等待锁的线程 可以去获取锁
解锁的Lua脚本,流程跟Reentrantlock的解锁流程 也是差不多的
如果要采用声明式的可重入分布式锁,相应的在AOP的实现中调整加解锁为Redisson相关操作即可