RedisLockRegistry分布式锁实现原理

目录

一、使用

1、导入依赖

2、配置文件

3、具体使用

二、原理

1、obtain方法

2、tryLock方法

主要过程

拿redis锁的过程

为什么要用本地锁

为什么要用lua脚本

3、unlock方法

三、总结


一、使用

1、导入依赖

pom.xml

org.springframework.bootspring-boot-starter-integration

org.springframework.integrationspring-integration-redis

org.springframework.bootspring-boot-starter-data-redis

2、配置文件

注册RedisLockRegistry,设置过期时间5s

@Configuration
public class RedisLockRegistryConfiguration {// 用户操作锁keypublic static final String CACHE_USER_LOCK_KEY = "forlan:user:lock";@Bean(name = "RedisLockRegistryExpireFiveSecond")public RedisLockRegistry redisLockRegistry(RedisConnectionFactory connectionFactory) {return new RedisLockRegistry(connectionFactory, CACHE_USER_LOCK_KEY, 5000L);}}

不设置的话,默认是60s,可以看RedisLockRegistry的构造方法

public RedisLockRegistry(RedisConnectionFactory connectionFactory, String registryKey) {this(connectionFactory, registryKey, 60000L);
}

3、具体使用

public class ForlanTest {private static final Logger logger = LoggerFactory.getLogger(ForlanTest.class);@Autowiredprivate RedisLockRegistry redisLockRegistry;public void test() {String lockKey = "key_id";Lock loginLock = redisLockRegistry.obtain(lockKey.intern());boolean getLock = false;try {// tryLock()底层调用this.tryLock(0L, TimeUnit.MILLISECONDS)getLock = loginLock.tryLock(5, TimeUnit.SECONDS);if (getLock) {//获得锁执行业务}} catch (Exception e) {logger.error("异常信息...", e);} finally {if (getLock) {//释放锁loginLock.unlock();}}}
}

二、原理

1、obtain方法

private final Map locks;private final class RedisLock implements Lock {private final String lockKey;private final ReentrantLock localLock;private volatile long lockedAt;private RedisLock(String path) {this.localLock = new ReentrantLock();this.lockKey = this.constructLockKey(path);}
}public Lock obtain(Object lockKey) {Assert.isInstanceOf(String.class, lockKey);String path = (String)lockKey;return (Lock)this.locks.computeIfAbsent(path, (x$0) -> {return new RedisLockRegistry.RedisLock(x$0);});
}

主要是根据lockKey去查locks这个map中是否已经存在这个key
如果存在就返回内部类RedisLock
如果不存在就创建一个RedisLock,以lockKey为key,RedisLock为value放入map中 
注意:每个分布式应用自己都会创建一个RedisLockRegistry实例,同一个应用的多个线程共享RedisLock类

2、tryLock方法

public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {long now = System.currentTimeMillis();// 尝试拿取本地锁,拿不到直接返回失败falseif (!this.localLock.tryLock(time, unit)) {return false;} else {try {long expire = now + TimeUnit.MILLISECONDS.convert(time, unit);boolean acquired;// 当前时间还没过期并且还未获得redis锁,睡眠100ms继续重试while(!(acquired = this.obtainLock()) && System.currentTimeMillis() < expire) {Thread.sleep(100L);}if (!acquired) {this.localLock.unlock();}return acquired;} catch (Exception var9) {this.localLock.unlock();this.rethrowAsLockException(var9);return false;}}
}

主要过程

先获得本地锁,拿不到直接返回失败
当前时间还没过期并且还没拿到redis锁,睡眠100ms继续重试
如果拿到redis锁,结束循环,返回成功
如果超时了还没拿到,释放锁,返回失败

拿redis锁的过程

private boolean obtainLock() {boolean success = (Boolean)RedisLockRegistry.this.redisTemplate.execute(RedisLockRegistry.this.obtainLockScript, Collections.singletonList(this.lockKey), new Object[]{RedisLockRegistry.this.clientId, String.valueOf(RedisLockRegistry.this.expireAfter)});if (success) {this.lockedAt = System.currentTimeMillis();}return success;
}

通过obtainLock方法,执行lua脚本来获取
redisTemplate.execute()参数说明:

  • 第一个参数(RedisLockRegistry.this.obtainLockScript)表示要执行的lua脚本,脚本内容如下:
local lockClientId = redis.call('GET', KEYS[1])
if lockClientId == ARGV[1] thenredis.call('PEXPIRE', KEYS[1], ARGV[2])return true
elseif not lockClientId thenredis.call('SET', KEYS[1], ARGV[1], 'PX', ARGV[2])return true
end
return false
  • 第二个参数(Collections.singletonList(this.lockKey)),表示在脚本中所用到的redis 键(key);
    说明:这些键名参数可以在 Lua 中通过全局变量 KEYS 数组,用1为基址的形式访问( KEYS[1] , KEYS[2] ,以此类推);
  • 第三个参(new Object[]{RedisLockRegistry.this.clientId, String.valueOf(RedisLockRegistry.this.expireAfter)}),表示客户端Id和过期时间;
    说明:附加参数 arg [arg …] ,可以在 Lua 中通过全局变量 ARGV 数组访问,访问的形式和 KEYS 变量类似( ARGV[1] 、 ARGV[2] ,诸如此类)

脚本大概的意思:查出分布式锁key对应的value(lockClientId),根据lockClientId的情况进行处理,主要有3种情况:

  • 查出的lockClientId和我们传入的客户端Id一样,表示锁重入,重新设置过期时间为expireAfter(默认60s),返回成功;
  • 分布式锁key不存在,查不到lockClientId,上锁,返回成功;
  • 其它情况,说明锁被占用了,上锁失败,返回失败;

为什么要用本地锁

  • 为了可重入
  • 为了减轻redis服务器的压力

为什么要用lua脚本

  • 保证原子性
  • 减少网络开销
  • 替代redis的事务功能

如果业务执行太久,超过过期时间,会有什么问题?

分情况说的,单实例和多实例;

  • 如果是单个实例情况下,就算key过期了,其它任务也拿不到锁,因为有本地锁ReentrantLock的约束;
  • 如果是多实例的情况下,就会有问题;

3、unlock方法

public void unlock() {if (!this.localLock.isHeldByCurrentThread()) {throw new IllegalStateException("You do not own lock at " + this.lockKey);} else if (this.localLock.getHoldCount() > 1) {this.localLock.unlock();} else {try {if (Thread.currentThread().isInterrupted()) {RedisLockRegistry.this.executor.execute(this::removeLockKey);} else {this.removeLockKey();}if (RedisLockRegistry.logger.isDebugEnabled()) {RedisLockRegistry.logger.debug("Released lock; " + this);}} catch (Exception var5) {ReflectionUtils.rethrowRuntimeException(var5);} finally {this.localLock.unlock();}}
}

释放锁的过程
1、判断是否是当前线程持有锁,如果不是,抛异常(本地锁)
2、判断当前线程持有锁的计数,如果当前线程持有锁的计数 > 1,说明本地锁被当前线程多次获取,这时只会释放本地锁,释放之后当前线程持有锁的计数-1;否则,释放本地锁和redis锁

三、总结

  • RedisLockRegistry通过本地锁(ReentrantLock)和Redis锁,双重锁实现;
  • 使用ReentrantLock可重入锁;
  • RedisLockRegistry对锁无续期操作;
  • 只适用于单实例的情况下,key过期,还能通过本地锁保证;多实例下无法通过本地锁保证,会有问题;


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部