接口幂等性问题处理(分布式锁)
接口幂等性问题处理
- 理论
- 幂等性概念
- 幂等性场景
- 实践
- 1、redis加锁代码实现
- 2、Redisson高性能分布式锁代码实现(AOP实现)
- 添加maven依赖
- RedissionConfig配置类:
- 定义注解类
- 注解业务实现类:
- controller层使用注解
- 测试日志打印如下
- Redisson源码分析
理论
幂等性概念
一个幂等操作的特点是任意执行多少次与执行一次产生的影响都是一样的。
幂等函数
或幂等方法是指可以使用相同参数重复执行,并能获得相同结果的函数。这些函数不会影响系统状态,也不用担心重复执行会对系统造成改变。例如,“getUsername()和setTrue()”函数就是一个幂等函数. 更复杂的操作幂等保证是利用唯一交易号(流水号)实现。
幂等性场景
-
1、查询操作:查询一次和查询多次,在数据不变的情况下,查询结果是一样的。select是天然的幂等操作;
-
2、删除操作:删除操作也是幂等的,删除一次和多次删除都是把数据删除。(注意可能返回结果不一样,删除的数据不存在,返回0,删除的数据多条,返回结果多个) ;
-
3、唯一索引:防止新增脏数据。比如:支付宝的资金账户,支付宝也有用户账户,每个用户只能有一个资金账户,怎么防止给用户创建资金账户多个,那么给资金账户表中的用户ID加唯一索引,所以一个用户新增成功一个资金账户记录。要点:唯一索引或唯一组合索引来防止新增数据存在脏数据(当表存在唯一索引,并发时新增报错时,再查询一次就可以了,数据应该已经存在了,返回结果即可);
-
4、token机制:防止页面重复提交。
原理上通过session token来实现的(也可以通过redis来实现)。当客户端请求页面时,服务器会生成一个随机数Token,并且将Token放置到session当中,然后将Token发给客户端(一般通过构造hidden表单)。 下次客户端提交请求时,Token会随着表单一起提交到服务器端。
服务器端第一次验证相同过后,会将session中的Token值更新下,若用户重复提交,第二次的验证判断将失败,因为用户提交的表单中的Token没变,但服务器端session中Token已经改变了。 -
5、悲观锁 获取数据的时候加锁获取。select * from table_xxx where id=‘xxx’ for update; 注意:id字段一定是主键或者唯一索引,不然是锁表,会死人的;悲观锁使用时一般伴随事务一起使用,数据锁定时间可能会很长,根据实际情况选用;
-
6、乐观锁——乐观锁只是在更新数据那一刻锁表,其他时间不锁表,所以相对于悲观锁,效率更高。乐观锁的实现方式多种多样可以通过version或者其他状态条件:
a、通过版本号实现update tablexxx set name=#name#,version=version+1 where version=#version#如下图(来自网上);
b、通过条件限制 update tablexxx set avaiamount=avaiamount-#subAmount# where avai_amount-#subAmount# >= 0要求:quality-#subQuality# >= ,这个情景适合不用版本号,只更新是做数据安全校验,适合库存模型,扣份额和回滚份额,性能更高; -
7、分布式锁
如果是分布是系统,构建全局唯一索引比较困难,例如唯一性的字段没法确定,这时候可以引入分布式锁,通过第三方的系统(redis或zookeeper),在业务系统插入数据或者更新数据,获取分布式锁,然后做操作,之后释放锁,这样其实是把多线程并发的锁的思路,引入多多个系统,也就是分布式系统中得解决思路。要点:某个长流程处理过程要求不能并发执行,可以在流程执行之前根据某个标志(用户ID+后缀等)获取分布式锁,其他流程执行时获取锁就会失败,也就是同一时间该流程只能有一个能执行成功,执行完成后,释放分布式锁(分布式锁要第三方系统提供); -
8、select + insert 并发不高的后台系统,或者一些任务JOB,为了支持幂等,支持重复执行,简单的处理方法是,先查询下一些关键数据,判断是否已经执行过,在进行业务处理,就可以了。注意:核心高并发流程不要用这种方法;
-
9、状态机幂等 在设计单据相关的业务,或者是任务相关的业务,肯定会涉及到状态机(状态变更图),就是业务单据上面有个状态,状态在不同的情况下会发生变更,一般情况下存在有限状态机,这时候,如果状态机已经处于下一个状态,这时候来了一个上一个状态的变更,理论上是不能够变更的,这样的话,保证了有限状态机的幂等。注意:订单等单据类业务,存在很长的状态流转,一定要深刻理解状态机,对业务系统设计能力提高有很大帮助
-
10、对外提供接口的api如何保证幂等
如银联提供的付款接口:需要接入商户提交付款请求时附带:source来源,seq序列号;source+seq在数据库里面做唯一索引,防止多次付款(并发时,只能处理一个请求) 。 重点:对外提供接口为了支持幂等调用,接口有两个字段必须传,一个是来源source,一个是来源方序列号seq,这个两个字段在提供方系统里面做联合唯一索引,这样当第三方调用时,先在本方系统里面查询一下,是否已经处理过,返回相应处理结果;没有处理过,进行相应处理,返回结果。注意,为了幂等友好,一定要先查询一下,是否处理过该笔业务,不查询直接插入业务系统,会报错,但实际已经处理了
实践
talk is cheap, show you the code
1、redis加锁代码实现
// Copyright 2016-2101 Pica.
package com.pica.cloud.commercialization.crrs.util;import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;import java.util.UUID;
import java.util.concurrent.TimeUnit;/*** @ClassName RedisLock* @Description* @Author Chongwen.jiang* @Date 2019/9/10 9:44* @ModifyDate 2019/9/10 9:44* @Version 1.0*/
@Slf4j
@Component
public class RedisLock {@Autowiredprivate RedisTemplate redisTemplate;/*** @Description 添加元素* @Author Chongwen.jiang* @Date 2019/9/10 9:47* @ModifyDate 2019/9/10 9:47* @Params [key, value]* @Return void*/public void set(Object key, Object value) {if (key == null || value == null) {return;}redisTemplate.opsForValue().set(key, value.toString());}/*** @Description 获取数据* @Author Chongwen.jiang* @Date 2019/9/10 9:49* @ModifyDate 2019/9/10 9:49* @Params [key]* @Return java.lang.Object*/public Object get(Object key) {if (key == null) {return null;}return redisTemplate.opsForValue().get(key);}/*** @Description 删除* @Author Chongwen.jiang* @Date 2019/9/10 9:49* @ModifyDate 2019/9/10 9:49* @Params [key]* @Return java.lang.Boolean*/private Boolean remove(Object key) {if (key == null) {return false;}return redisTemplate.delete(key);}/*** @Description 如果已经存在返回false,否则返回true* @Author Chongwen.jiang* @Date 2019/9/10 9:48* @ModifyDate 2019/9/10 9:48* @Params [key, value, expireTime, mimeUnit]* @Return java.lang.Boolean*/private Boolean setNx(Object key, Object value, Long expireTime, TimeUnit mimeUnit) {if (key == null || value == null) {return false;}return redisTemplate.opsForValue().setIfAbsent(key, value, expireTime, mimeUnit);}/*** @Description 加锁* @Author Chongwen.jiang* @Date 2019/9/10 9:50* @ModifyDate 2019/9/10 9:50* @Params [key(缓存key), waitTime(等待时间,毫秒), expireTime(key过期时间,毫秒)]* @Return java.lang.Boolean*/public Boolean lock(String key, Long waitTime, Long expireTime) {String value = UUID.randomUUID().toString().replaceAll("-", "").toLowerCase();Boolean flag = setNx(key, value, expireTime, TimeUnit.MILLISECONDS);// 尝试获取锁 成功返回if (flag) {return flag;} else {// 获取失败// 现在时间long newTime = System.currentTimeMillis();// 等待过期时间long loseTime = newTime + waitTime;// 不断尝试获取锁成功返回while (System.currentTimeMillis() < loseTime) {Boolean testFlag = setNx(key, value, expireTime, TimeUnit.MILLISECONDS);if (testFlag) {return testFlag;}try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}}return false;}/*** @Description 释放锁* @Author Chongwen.jiang* @Date 2019/9/10 9:54* @ModifyDate 2019/9/10 9:54* @Params [key]* @Return java.lang.Boolean*/public Boolean unLock(Object key) {return remove(key);}}
测试一下:
@Autowiredprivate RedisLock redisLock;@Testpublic void testRedisLock(){String key = "aaaa";try {Boolean locked = redisLock.lock(key, 1000L, 1000*60L);if(locked) {System.out.println("获取锁成功,开始执行业务代码...");Thread.sleep(1000*10);} else {System.out.println("获取锁失败...");}} catch (Exception e) {log.error(e.getMessage());e.printStackTrace();} finally {log.info("释放锁:{}", redisLock.unLock(key));}}
2、Redisson高性能分布式锁代码实现(AOP实现)
在一些高并发的场景中,比如秒杀,抢票,抢购这些场景,都存在对核心资源,商品库存的争夺,控制不好,库存数量可能被减少到负数,出现超卖的情况,或者 产生唯一的一个递增ID,由于web应用部署在多个机器上,简单的同步加锁是无法实现的,给数据库加锁的话,对于高并发,1000/s的并发,数据库可能由行锁变成表锁,性能下降会厉害。那相对而言,redis的分布式锁,相对而言,是个很好的选择,redis官方推荐使用的Redisson就提供了分布式锁和相关服务。
添加maven依赖
<dependency><groupId>org.projectlombokgroupId><artifactId>lombokartifactId><version>1.18.2version>
dependency>
<dependency><groupId>org.springframework.bootgroupId><artifactId>spring-boot-starter-data-redisartifactId>
dependency>
<dependency><groupId>org.redissongroupId><artifactId>redissonartifactId><version>3.5.5version>
dependency>
RedissionConfig配置类:
这里使用的是单机连接模式,redisson提供了单机、主从、哨兵、集群等多种连接方式,感兴趣的可以自行从官网了解学习
// Copyright 2016-2101 Pica.
package com.pica.cloud.commercialization.crrs.config;import io.netty.channel.nio.NioEventLoopGroup;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.client.codec.Codec;
import org.redisson.config.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.ClassUtils;/*** @ClassName RedissionConfig* @Description* @Author Chongwen.jiang* @Date 2019/9/6 10:40* @ModifyDate 2019/9/6 10:40* @Version 1.0*/
@Slf4j
@Data
@Configuration
public class RedissonConfig {private String address = "redis://192.168.110.241:6380";private int connectionMinimumIdleSize = 10;private int idleConnectionTimeout = 10000;private int pingTimeout = 1000;private int connectTimeout = 10000;private int timeout = 3000;private int retryAttempts = 3;private int retryInterval = 1500;private int reconnectionTimeout = 3000;private int failedAttempts = 3;private String password = null;private int subscriptionsPerConnection = 5;private String clientName = null;private int subscriptionConnectionMinimumIdleSize = 1;private int subscriptionConnectionPoolSize = 50;private int connectionPoolSize = 64;private int database = 0;private boolean dnsMonitoring = false;private int dnsMonitoringInterval = 5000;/*** 当前处理核数量 * 2*/private int thread = 2;/** redisson默认使用的序列化编码类 *///private String codec = "org.redisson.codec.JsonJacksonCodec";private String codec = "org.redisson.codec.FstCodec";/*** @Description redisson单机连接模式* @Author Chongwen.jiang* @Date 2019/9/10 12:53* @ModifyDate 2019/9/10 12:53* @Params []* @Return org.redisson.api.RedissonClient*/@Bean(destroyMethod = "shutdown")RedissonClient redisson() throws Exception {log.info("redission init......");Config config = new Config();config.useSingleServer().setAddress(address).setConnectionMinimumIdleSize(connectionMinimumIdleSize).setConnectionPoolSize(connectionPoolSize).setDatabase(database).setDnsMonitoring(dnsMonitoring).setDnsMonitoringInterval(dnsMonitoringInterval).setSubscriptionConnectionMinimumIdleSize(subscriptionConnectionMinimumIdleSize).setSubscriptionConnectionPoolSize(subscriptionConnectionPoolSize).setSubscriptionsPerConnection(subscriptionsPerConnection).setClientName(clientName).setFailedAttempts(failedAttempts).setRetryAttempts(retryAttempts).setRetryInterval(retryInterval).setReconnectionTimeout(reconnectionTimeout).setTimeout(timeout).setConnectTimeout(connectTimeout).setIdleConnectionTimeout(idleConnectionTimeout).setPingTimeout(pingTimeout).setPassword(password);Codec codec = (Codec) ClassUtils.forName(getCodec(),ClassUtils.getDefaultClassLoader()).newInstance();config.setCodec(codec);config.setThreads(thread);config.setEventLoopGroup(new NioEventLoopGroup());config.setUseLinuxNativeEpoll(false);return Redisson.create(config);}}
这里值得注意的是redisson使用的序列化编码类是org.redisson.codec.JsonJacksonCodec,JsonJackson在序列化有双向引用的对象时,会出现无限循环异常。而fastjson在检查出双向引用后会自动用引用符$ref替换,终止循环。
如果序列化了一个service,这个service已被spring托管,而且和另一个service之间也相互注入了,用fastjson能 正常序列化到redis,而JsonJackson则抛出无限循环异常。
为了序列化后的内容可见,所以使用FstCodec(org.redisson.codec.FstCodec)

定义注解类
package com.pica.cloud.commercialization.crrs.interceptor;import java.lang.annotation.*;
import java.util.concurrent.TimeUnit;/*** @ClassName RLock* @Description 分布式锁* @Author Chongwen.jiang* @Date 2019/9/6 10:47* @ModifyDate 2019/9/6 10:47* @Version 1.0*/
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface Rlock {/** 分布式锁的key */String localKey();/** 锁释放时间 默认5秒 */long leaseTime() default 5 * 1000;/** 时间格式 默认:毫秒 */TimeUnit timeUtil() default TimeUnit.MILLISECONDS;}
注解业务实现类:
// Copyright 2016-2101 Pica.
package com.pica.cloud.commercialization.crrs.interceptor;import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.binary.Hex;
import org.apache.commons.lang.StringUtils;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.web.context.request.RequestAttributes;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;import javax.crypto.Cipher;
import javax.crypto.KeyGenerator;
import javax.crypto.SecretKey;
import javax.crypto.spec.SecretKeySpec;
import javax.servlet.http.HttpServletRequest;
import java.security.Key;
import java.util.Enumeration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;/*** @ClassName RlockAspect* @Description 分布式锁aop配置* @Author Chongwen.jiang* @Date 2019/9/6 10:52* @ModifyDate 2019/9/6 10:52* @Version 1.0*/
@Slf4j
@Aspect
@Component
public class RlockAspect {@Autowiredprivate RedissonClient redissonClient;ThreadLocal<HttpServletRequest> request = new ThreadLocal();@Pointcut("@annotation(com.pica.cloud.commercialization.crrs.interceptor.Rlock)")public void RlockAspect() {log.info("RlockAspect constructor...");}@Around("RlockAspect()")public Object around(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {String params = getAllRequestParam(proceedingJoinPoint);Object object = null;RLock lock = null;try {Rlock rlockInfo = getRlockInfo(proceedingJoinPoint);lock = redissonClient.getLock(getLocalKey(proceedingJoinPoint, rlockInfo, params));if (null != lock) {final boolean status = lock.tryLock(rlockInfo.leaseTime(), rlockInfo.timeUtil());if (status) {log.info("获取到锁.....");object = proceedingJoinPoint.proceed();}}} finally {if (lock != null) {log.info("释放锁.....");lock.unlock();}}return object;}public Rlock getRlockInfo(ProceedingJoinPoint proceedingJoinPoint) {MethodSignature methodSignature = (MethodSignature) proceedingJoinPoint.getSignature();return methodSignature.getMethod().getAnnotation(Rlock.class);}/*** @Description 生成redis lock key* @Author Chongwen.jiang* @Date 2019/9/6 11:05* @ModifyDate 2019/9/6 11:05* @Params [proceedingJoinPoint, rlockInfo]* @Return java.lang.String*/public String getLocalKey(ProceedingJoinPoint proceedingJoinPoint, Rlock rlockInfo, String params) {StringBuffer localKey = new StringBuffer("Rlock");//body中的参数final Object[] args = proceedingJoinPoint.getArgs();MethodSignature methodSignature = (MethodSignature) proceedingJoinPoint.getSignature();//方法名称String methodName = methodSignature.getMethod().getName();//参数加密//params = jdkAES(params);localKey.append(rlockInfo.localKey()).append("-").append(methodName).append("-").append(params);log.info("getLocalKey: {}", localKey.toString());return localKey.toString();}/*** @Description 获取query、body和header中的所有参数* @Author Chongwen.jiang* @Date 2019/9/6 13:11* @ModifyDate 2019/9/6 13:11* @Params [request]* @Return java.util.Map*/ private String getAllRequestParam(ProceedingJoinPoint proceedingJoinPoint) {RequestAttributes ra = RequestContextHolder.getRequestAttributes();ServletRequestAttributes sra = (ServletRequestAttributes) ra;HttpServletRequest request = sra.getRequest();this.request.set(request);//参数获取Map<String, Object> params = new ConcurrentHashMap<>();params.put("url", request.getRequestURL());params.put("method", request.getMethod());Object[] bodyParams = proceedingJoinPoint.getArgs();if (null != bodyParams && bodyParams.length > 0) {params.put("bodyParams", bodyParams);}String queryParams = request.getQueryString();if (StringUtils.isNotEmpty(queryParams)) {params.put("queryParams", queryParams);}Map<String, String> headerParams = new ConcurrentHashMap<>();Enumeration<?> headers = request.getHeaderNames();if (null != headers) {while (headers.hasMoreElements()) {String element = (String) headers.nextElement();String value = request.getHeader(element);if ("token".equals(element)) {headerParams.put(element, value);}if (StringUtils.isEmpty(element)) {headerParams.remove(element);}}}if (!CollectionUtils.isEmpty(headerParams)) {params.put("headerParams", headerParams);}String json = JSON.toJSONString(params);log.info("getAllRequestParam: {}", json);return json;}/*** @Description 使用jdk实现AES加密* @Author Chongwen.jiang* @Date 2019/9/6 13:48* @ModifyDate 2019/9/6 13:48* @Params [source]* @Return void*/private String jdkAES(String source) {try {// 生成KEYKeyGenerator keyGenerator = KeyGenerator.getInstance("AES");keyGenerator.init(128);// 产生密钥SecretKey secretKey = keyGenerator.generateKey();// 获取密钥byte[] keyBytes = secretKey.getEncoded();// KEY转换Key key = new SecretKeySpec(keyBytes, "AES");// 加密Cipher cipher = Cipher.getInstance("AES/ECB/PKCS5Padding");cipher.init(Cipher.ENCRYPT_MODE, key);byte[] result = cipher.doFinal(source.getBytes());return Hex.encodeHexString(result);/*System.out.println("jdk aes encrypt:" + Hex.encodeHexString(result));// 解密cipher.init(Cipher.DECRYPT_MODE, key);result = cipher.doFinal(result);System.out.println("jdk aes decrypt:" + new String(result));*/} catch (Exception e) {e.printStackTrace();}return null;}}
controller层使用注解
@PostMapping("/{id}")
@Rlock(localKey = "redisLockAnnotion", leaseTime = 10, timeUtil = TimeUnit.SECONDS)
public ApiResponse<?> order(@RequestParam(required = false) String a, @PathVariable String id) {Map<String, Object> resultMap = dictService.doModify();return new ApiResponse<>(resultMap);
}
这里我设置的锁释放时间是10秒,所以就需要程序员自己估计业务代码执行时间一定不能超过该时间,否则在释放锁的时候,redisson会抛出异常

测试日志打印如下

简洁实用的Redis分布式锁用法
Redisson源码分析
/*** Copyright (c) 2013-2019 Nikita Koksharov** Licensed under the Apache License, Version 2.0 (the "License");* you may not use this file except in compliance with the License.* You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/
package org.redisson;import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;import org.redisson.api.RFuture;
import org.redisson.api.RLock;
import org.redisson.client.RedisException;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.client.protocol.convertor.IntegerReplayConvertor;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.redisson.pubsub.LockPubSub;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import io.netty.util.Timeout;
import io.netty.util.TimerTask;/*** 1、通过lua脚本保证了设置key和key的过期时间操作是原子操作* 2、每个加锁的key都会带上线程信息,保证不会出现a线程的锁被b线程释放问题* 3、通过看门狗(watchdog)实现key释放时间无法准确预估的问题,在主线成之外另外开一个工作线程,每隔releaseTime/3毫秒会去刷新key的失效时间* 4、当然这种分布式锁在超高并发的情况下仍然可能存在问题(比如主从模式、哨兵模式下master挂了,key还没有同步到从节点,那么其他线程再来加锁仍会成功),* 解决方式可以使用zk、或者RedissonRedLock-->zk是CP的,强一致性的话性能会降低,具体看业务情况* Distributed implementation of {@link java.util.concurrent.locks.Lock}* Implements reentrant lock.
* Lock will be removed automatically if client disconnects.* * Implements a non-fair locking so doesn't guarantees an acquire order.** @author Nikita Koksharov**/
public class RedissonLock extends RedissonExpirable implements RLock {public static class ExpirationEntry {private final Map<Long, Integer> threadIds = new LinkedHashMap<>(); // 记录线程次数private volatile Timeout timeout; // volatile保证可见性、有序性public ExpirationEntry() {super();}// 增加线程id次数public void addThreadId(long threadId) {Integer counter = threadIds.get(threadId);if (counter == null) {counter = 1;} else {counter++;}threadIds.put(threadId, counter);}// 判断是否有线程,一个没有的话则返回true,否则返回falsepublic boolean hasNoThreads() {return threadIds.isEmpty();}// 返回第一个线程idpublic Long getFirstThreadId() {if (threadIds.isEmpty()) {return null;}return threadIds.keySet().iterator().next();}// 指定线程id的计数减一,若线程计数为0时,将线程id从map集合中移除public void removeThreadId(long threadId) {Integer counter = threadIds.get(threadId);if (counter == null) {return;}counter--;if (counter == 0) {threadIds.remove(threadId);} else {threadIds.put(threadId, counter);}}// 设置过期时间public void setTimeout(Timeout timeout) {this.timeout = timeout;}// 获取过期时间public Timeout getTimeout() {return timeout;}}private static final Logger log = LoggerFactory.getLogger(RedissonLock.class);// 续期集合private static final ConcurrentMap<String, ExpirationEntry> EXPIRATION_RENEWAL_MAP = new ConcurrentHashMap<>();protected long internalLockLeaseTime;final String id;final String entryName;protected final LockPubSub pubSub;final CommandAsyncExecutor commandExecutor;public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {super(commandExecutor, name);this.commandExecutor = commandExecutor;this.id = commandExecutor.getConnectionManager().getId();this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();this.entryName = id + ":" + name;this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();}protected String getEntryName() {return entryName;}String getChannelName() {return prefixName("redisson_lock__channel", getName());}protected String getLockName(long threadId) {return id + ":" + threadId;}@Overridepublic void lock() {try {lock(-1, null, false);} catch (InterruptedException e) {throw new IllegalStateException();}}@Overridepublic void lock(long leaseTime, TimeUnit unit) {try {lock(leaseTime, unit, false);} catch (InterruptedException e) {throw new IllegalStateException();}}@Overridepublic void lockInterruptibly() throws InterruptedException {lock(-1, null, true);}@Overridepublic void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {lock(leaseTime, unit, true);}private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {long threadId = Thread.currentThread().getId();// 通过lua脚本尝试获取锁,并返回该锁的剩余过期时间(毫秒)Long ttl = tryAcquire(leaseTime, unit, threadId);// lock acquired// 获取到了锁或者redis中的key就是该线程的锁(锁对于同一线程可重入),直接返回if (ttl == null) {return;}// ----------------------如果没有获取到锁----------------------------------------------------// 异步订阅redis channelRFuture<RedissonLockEntry> future = subscribe(threadId);// 阻塞获取订阅结果commandExecutor.syncSubscription(future);try {// 自旋等待获取锁while (true) {// 通过lua脚本尝试获取锁,并返回该锁的剩余过期时间(毫秒)ttl = tryAcquire(leaseTime, unit, threadId);// lock acquiredif (ttl == null) {break;}// waiting for messageif (ttl >= 0) {try {// 使用Semaphore信号量,等待一段时间,然后重新尝试获取锁(并不是一直不停的去尝试获取锁)getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {if (interruptibly) {throw e;}getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);}} else {if (interruptibly) {getEntry(threadId).getLatch().acquire();} else {getEntry(threadId).getLatch().acquireUninterruptibly();}}}} finally {// 取消订阅redis channelunsubscribe(future, threadId);}
// get(lockAsync(leaseTime, unit));}/*** 通过lua脚本尝试获取锁,并返回该锁的剩余过期时间(毫秒)* @param leaseTime* @param unit* @param threadId* @return*/private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {return get(tryAcquireAsync(leaseTime, unit, threadId)); // 通过异步获取锁,但get(future)实现同步}private RFuture<Boolean> tryAcquireOnceAsync(long leaseTime, TimeUnit unit, long threadId) {if (leaseTime != -1) { // 1、如果设置了超时时间,直接调用 tryLockInnerAsyncreturn tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);}// 2、如果leaseTime==-1,则默认超时时间为30sRFuture<Boolean> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);// 3、监听Future,获取Future返回值ttlRemaining(剩余超时时间),获取锁成功,但是ttlRemaining,则刷新过期时间// 此处用到了Netty的Future-listen模型ttlRemainingFuture.onComplete((ttlRemaining, e) -> {if (e != null) {return;}// lock acquiredif (ttlRemaining) {// 定期刷新该线程对应的key的过期时间(指线程获取锁后需要不断刷新失效时间,避免未执行完锁就失效)scheduleExpirationRenewal(threadId);}});return ttlRemainingFuture;}private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {if (leaseTime != -1) {return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);}RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);ttlRemainingFuture.onComplete((ttlRemaining, e) -> {if (e != null) {return;}// lock acquiredif (ttlRemaining == null) {scheduleExpirationRenewal(threadId);}});return ttlRemainingFuture;}@Overridepublic boolean tryLock() {return get(tryLockAsync());}/*** 使用netty中的TimerTask,每到过期时间的1/3就去刷新过期时间,如果key不存在则停止刷新(返回)*/private void renewExpiration() {ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());// 如果key不存在则返回if (ee == null) {return;}// 如果key存在则每隔过期时间的1/3刷新过期时间,递归调用自己,直到EXPIRATION_RENEWAL_MAP集合中没有这个key为止Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {@Overridepublic void run(Timeout timeout) throws Exception {ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ent == null) {return;}Long threadId = ent.getFirstThreadId();if (threadId == null) {return;}RFuture<Boolean> future = renewExpirationAsync(threadId);future.onComplete((res, e) -> {if (e != null) {log.error("Can't update lock " + getName() + " expiration", e);return;}if (res) {// reschedule itselfrenewExpiration();}});}}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);ee.setTimeout(task);}/*** 定期刷新线程id对应的key的过期时间* @param threadId*/private void scheduleExpirationRenewal(long threadId) {ExpirationEntry entry = new ExpirationEntry();ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);if (oldEntry != null) {oldEntry.addThreadId(threadId);} else {entry.addThreadId(threadId);// 刷新过期时间renewExpiration();}}/*** 判断哈希表中的字段是否存在,若存在则重新设置过期时间* if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then* redis.call('pexpire', KEYS[1], ARGV[1]);* return 1;* end;* return 0;* @param threadId* @return*/protected RFuture<Boolean> renewExpirationAsync(long threadId) {return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return 1; " +"end; " +"return 0;",Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));}void cancelExpirationRenewal(Long threadId) {ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (task == null) {return;}if (threadId != null) {task.removeThreadId(threadId);}if (threadId == null || task.hasNoThreads()) {Timeout timeout = task.getTimeout();if (timeout != null) {timeout.cancel();}EXPIRATION_RENEWAL_MAP.remove(getEntryName());}}/*** if (redis.call('exists', KEYS[1]) == 0) then // 如果key不存在,设置key,并返回空* redis.call('hset', KEYS[1], ARGV[2], 1); // hset name id+":"+threadId 1* redis.call('pexpire', KEYS[1], ARGV[1]); // hset name expireTime ---> 设置key过期时间{防止获取锁后线程挂掉导致死锁}* return nil;* end;* if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then // 查看哈希表的指定字段是否存在,若存在,则给哈希表的指定字段加一,并设置过期时间(锁的可重入性),并返回空* redis.call('hincrby', KEYS[1], ARGV[2], 1);* redis.call('pexpire', KEYS[1], ARGV[1]);* return nil;* end;* return redis.call('pttl', KEYS[1]); // 返回key的剩余过期时间(毫秒)*/<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {internalLockLeaseTime = unit.toMillis(leaseTime);return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,"if (redis.call('exists', KEYS[1]) == 0) then " +"redis.call('hset', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"return redis.call('pttl', KEYS[1]);",Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));}private void acquireFailed(long threadId) {get(acquireFailedAsync(threadId));}protected RFuture<Void> acquireFailedAsync(long threadId) {return RedissonPromise.newSucceededFuture(null);}@Overridepublic boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {long time = unit.toMillis(waitTime);long current = System.currentTimeMillis();long threadId = Thread.currentThread().getId();// 通过lua脚本尝试获取锁,并返回该锁的剩余过期时间(毫秒)Long ttl = tryAcquire(leaseTime, unit, threadId);// lock acquired// 获取到了锁或者redis中的key就是该线程的锁(锁对于同一线程可重入),直接返回if (ttl == null) {return true;}// 重新计算key的失效时间 = 原本失效时间 - 尝试获取锁消耗的时间time -= System.currentTimeMillis() - current;// 如果在规定的等待时间内尝试获取锁失败,则返回falseif (time <= 0) {acquireFailed(threadId);return false;}current = System.currentTimeMillis();// 如果尝试获取锁成功了,则订阅消息 redis channelRFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);// 获取订阅结果,如果订阅失败了则返回falseif (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {if (!subscribeFuture.cancel(false)) {subscribeFuture.onComplete((res, e) -> {if (e == null) {unsubscribe(subscribeFuture, threadId);}});}acquireFailed(threadId);return false;}try {// 重新计算key的失效时间 = 原本失效时间 - 尝试获取锁消耗的时间time -= System.currentTimeMillis() - current;// 如果在规定的等待时间内尝试获取锁失败,则返回falseif (time <= 0) {acquireFailed(threadId);return false;}// 自旋等待获取锁while (true) {long currentTime = System.currentTimeMillis();// 通过lua脚本尝试获取锁,并返回该锁的剩余过期时间(毫秒)ttl = tryAcquire(leaseTime, unit, threadId);// lock acquiredif (ttl == null) {return true;}// 重新计算key的失效时间 = 原本失效时间 - 尝试获取锁消耗的时间time -= System.currentTimeMillis() - currentTime;// 如果在规定的等待时间内尝试获取锁失败,则返回falseif (time <= 0) {acquireFailed(threadId);return false;}// waiting for messagecurrentTime = System.currentTimeMillis();if (ttl >= 0 && ttl < time) {// 使用Semaphore信号量,等待一段时间,然后重新尝试获取锁(并不是一直不停的去尝试获取锁)getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} else {getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);}time -= System.currentTimeMillis() - currentTime;if (time <= 0) {acquireFailed(threadId);return false;}}} finally {// 取消订阅消息 redis channelunsubscribe(subscribeFuture, threadId);}
// return get(tryLockAsync(waitTime, leaseTime, unit));}protected RedissonLockEntry getEntry(long threadId) {return pubSub.getEntry(getEntryName());}protected RFuture<RedissonLockEntry> subscribe(long threadId) {return pubSub.subscribe(getEntryName(), getChannelName());}protected void unsubscribe(RFuture<RedissonLockEntry> future, long threadId) {pubSub.unsubscribe(future.getNow(), getEntryName(), getChannelName());}@Overridepublic boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {return tryLock(waitTime, -1, unit);}@Overridepublic void unlock() {try {get(unlockAsync(Thread.currentThread().getId()));} catch (RedisException e) {if (e.getCause() instanceof IllegalMonitorStateException) {throw (IllegalMonitorStateException) e.getCause();} else {throw e;}}// Future future = unlockAsync();
// future.awaitUninterruptibly();
// if (future.isSuccess()) {
// return;
// }
// if (future.cause() instanceof IllegalMonitorStateException) {
// throw (IllegalMonitorStateException)future.cause();
// }
// throw commandExecutor.convertException(future);}@Overridepublic Condition newCondition() {// TODO implementthrow new UnsupportedOperationException();}@Overridepublic boolean forceUnlock() {return get(forceUnlockAsync());}/*** 直接删除哈希表,然后发布消息订阅,返回1* 否则返回0* if (redis.call('del', KEYS[1]) == 1) then* redis.call('publish', KEYS[2], ARGV[1]);* return 1* else* return 0* end;* @return*/@Overridepublic RFuture<Boolean> forceUnlockAsync() {cancelExpirationRenewal(null);return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"if (redis.call('del', KEYS[1]) == 1) then "+ "redis.call('publish', KEYS[2], ARGV[1]); "+ "return 1 "+ "else "+ "return 0 "+ "end",Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE);}@Overridepublic boolean isLocked() {return isExists();}@Overridepublic RFuture<Boolean> isLockedAsync() {return isExistsAsync();}@Overridepublic RFuture<Boolean> isExistsAsync() {return commandExecutor.writeAsync(getName(), codec, RedisCommands.EXISTS, getName());}@Overridepublic boolean isHeldByCurrentThread() {return isHeldByThread(Thread.currentThread().getId());}@Overridepublic boolean isHeldByThread(long threadId) {RFuture<Boolean> future = commandExecutor.writeAsync(getName(), LongCodec.INSTANCE, RedisCommands.HEXISTS, getName(), getLockName(threadId));return get(future);}private static final RedisCommand<Integer> HGET = new RedisCommand<Integer>("HGET", ValueType.MAP_VALUE, new IntegerReplayConvertor(0));public RFuture<Integer> getHoldCountAsync() {return commandExecutor.writeAsync(getName(), LongCodec.INSTANCE, HGET, getName(), getLockName(Thread.currentThread().getId()));}@Overridepublic int getHoldCount() {return get(getHoldCountAsync());}@Overridepublic RFuture<Boolean> deleteAsync() {return forceUnlockAsync();}@Overridepublic RFuture<Void> unlockAsync() {long threadId = Thread.currentThread().getId();return unlockAsync(threadId);}/*** if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then // key不存在则返回空* return nil;* end;* local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); // 哈希表中的字段计数减一* if (counter > 0) then // 如果计数器大于0,则重新设置过期时间,返回0* redis.call('pexpire', KEYS[1], ARGV[2]);* return 0;* else // 如果计数器小于等于0,则删除哈希表并且发送消息订阅,返回1* redis.call('del', KEYS[1]);* redis.call('publish', KEYS[2], ARGV[1]);* return 1;* end;* return nil; // 否则返回1* @param threadId* @return*/protected RFuture<Boolean> unlockInnerAsync(long threadId) {return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +"return nil;" +"end; " +"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +"if (counter > 0) then " +"redis.call('pexpire', KEYS[1], ARGV[2]); " +"return 0; " +"else " +"redis.call('del', KEYS[1]); " +"redis.call('publish', KEYS[2], ARGV[1]); " +"return 1; "+"end; " +"return nil;",Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));}@Overridepublic RFuture<Void> unlockAsync(long threadId) {RPromise<Void> result = new RedissonPromise<Void>();RFuture<Boolean> future = unlockInnerAsync(threadId);future.onComplete((opStatus, e) -> {if (e != null) {cancelExpirationRenewal(threadId);result.tryFailure(e);return;}if (opStatus == null) {IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "+ id + " thread-id: " + threadId);result.tryFailure(cause);return;}cancelExpirationRenewal(threadId);result.trySuccess(null);});return result;}@Overridepublic RFuture<Void> lockAsync() {return lockAsync(-1, null);}@Overridepublic RFuture<Void> lockAsync(long leaseTime, TimeUnit unit) {long currentThreadId = Thread.currentThread().getId();return lockAsync(leaseTime, unit, currentThreadId);}@Overridepublic RFuture<Void> lockAsync(long currentThreadId) {return lockAsync(-1, null, currentThreadId);}@Overridepublic RFuture<Void> lockAsync(long leaseTime, TimeUnit unit, long currentThreadId) {RPromise<Void> result = new RedissonPromise<Void>();RFuture<Long> ttlFuture = tryAcquireAsync(leaseTime, unit, currentThreadId);ttlFuture.onComplete((ttl, e) -> {if (e != null) {result.tryFailure(e);return;}// lock acquiredif (ttl == null) {if (!result.trySuccess(null)) {unlockAsync(currentThreadId);}return;}RFuture<RedissonLockEntry> subscribeFuture = subscribe(currentThreadId);subscribeFuture.onComplete((res, ex) -> {if (ex != null) {result.tryFailure(ex);return;}lockAsync(leaseTime, unit, subscribeFuture, result, currentThreadId);});});return result;}private void lockAsync(long leaseTime, TimeUnit unit,RFuture<RedissonLockEntry> subscribeFuture, RPromise<Void> result, long currentThreadId) {RFuture<Long> ttlFuture = tryAcquireAsync(leaseTime, unit, currentThreadId);ttlFuture.onComplete((ttl, e) -> {if (e != null) {unsubscribe(subscribeFuture, currentThreadId);result.tryFailure(e);return;}// lock acquiredif (ttl == null) {unsubscribe(subscribeFuture, currentThreadId);if (!result.trySuccess(null)) {unlockAsync(currentThreadId);}return;}RedissonLockEntry entry = getEntry(currentThreadId);if (entry.getLatch().tryAcquire()) {lockAsync(leaseTime, unit, subscribeFuture, result, currentThreadId);} else {// waiting for messageAtomicReference<Timeout> futureRef = new AtomicReference<Timeout>();Runnable listener = () -> {if (futureRef.get() != null) {futureRef.get().cancel();}lockAsync(leaseTime, unit, subscribeFuture, result, currentThreadId);};entry.addListener(listener);if (ttl >= 0) {Timeout scheduledFuture = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {@Overridepublic void run(Timeout timeout) throws Exception {if (entry.removeListener(listener)) {lockAsync(leaseTime, unit, subscribeFuture, result, currentThreadId);}}}, ttl, TimeUnit.MILLISECONDS);futureRef.set(scheduledFuture);}}});}@Overridepublic RFuture<Boolean> tryLockAsync() {return tryLockAsync(Thread.currentThread().getId());}@Overridepublic RFuture<Boolean> tryLockAsync(long threadId) {return tryAcquireOnceAsync(-1, null, threadId);}@Overridepublic RFuture<Boolean> tryLockAsync(long waitTime, TimeUnit unit) {return tryLockAsync(waitTime, -1, unit);}@Overridepublic RFuture<Boolean> tryLockAsync(long waitTime, long leaseTime, TimeUnit unit) {long currentThreadId = Thread.currentThread().getId();return tryLockAsync(waitTime, leaseTime, unit, currentThreadId);}@Overridepublic RFuture<Boolean> tryLockAsync(long waitTime, long leaseTime, TimeUnit unit,long currentThreadId) {RPromise<Boolean> result = new RedissonPromise<Boolean>();AtomicLong time = new AtomicLong(unit.toMillis(waitTime));long currentTime = System.currentTimeMillis();RFuture<Long> ttlFuture = tryAcquireAsync(leaseTime, unit, currentThreadId);ttlFuture.onComplete((ttl, e) -> {if (e != null) {result.tryFailure(e);return;}// lock acquiredif (ttl == null) {if (!result.trySuccess(true)) {unlockAsync(currentThreadId);}return;}long el = System.currentTimeMillis() - currentTime;time.addAndGet(-el);if (time.get() <= 0) {trySuccessFalse(currentThreadId, result);return;}long current = System.currentTimeMillis();AtomicReference<Timeout> futureRef = new AtomicReference<Timeout>();RFuture<RedissonLockEntry> subscribeFuture = subscribe(currentThreadId);subscribeFuture.onComplete((r, ex) -> {if (ex != null) {result.tryFailure(ex);return;}if (futureRef.get() != null) {futureRef.get().cancel();}long elapsed = System.currentTimeMillis() - current;time.addAndGet(-elapsed);tryLockAsync(time, leaseTime, unit, subscribeFuture, result, currentThreadId);});if (!subscribeFuture.isDone()) {Timeout scheduledFuture = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {@Overridepublic void run(Timeout timeout) throws Exception {if (!subscribeFuture.isDone()) {subscribeFuture.cancel(false);trySuccessFalse(currentThreadId, result);}}}, time.get(), TimeUnit.MILLISECONDS);futureRef.set(scheduledFuture);}});return result;}private void trySuccessFalse(long currentThreadId, RPromise<Boolean> result) {acquireFailedAsync(currentThreadId).onComplete((res, e) -> {if (e == null) {result.trySuccess(false);} else {result.tryFailure(e);}});}private void tryLockAsync(AtomicLong time, long leaseTime, TimeUnit unit,RFuture<RedissonLockEntry> subscribeFuture, RPromise<Boolean> result, long currentThreadId) {if (result.isDone()) {unsubscribe(subscribeFuture, currentThreadId);return;}if (time.get() <= 0) {unsubscribe(subscribeFuture, currentThreadId);trySuccessFalse(currentThreadId, result);return;}long curr = System.currentTimeMillis();RFuture<Long> ttlFuture = tryAcquireAsync(leaseTime, unit, currentThreadId);ttlFuture.onComplete((ttl, e) -> {if (e != null) {unsubscribe(subscribeFuture, currentThreadId);result.tryFailure(e);return;}// lock acquiredif (ttl == null) {unsubscribe(subscribeFuture, currentThreadId);if (!result.trySuccess(true)) {unlockAsync(currentThreadId);}return;}long el = System.currentTimeMillis() - curr;time.addAndGet(-el);if (time.get() <= 0) {unsubscribe(subscribeFuture, currentThreadId);trySuccessFalse(currentThreadId, result);return;}// waiting for messagelong current = System.currentTimeMillis();RedissonLockEntry entry = getEntry(currentThreadId);if (entry.getLatch().tryAcquire()) {tryLockAsync(time, leaseTime, unit, subscribeFuture, result, currentThreadId);} else {AtomicBoolean executed = new AtomicBoolean();AtomicReference<Timeout> futureRef = new AtomicReference<Timeout>();Runnable listener = () -> {executed.set(true);if (futureRef.get() != null) {futureRef.get().cancel();}long elapsed = System.currentTimeMillis() - current;time.addAndGet(-elapsed);tryLockAsync(time, leaseTime, unit, subscribeFuture, result, currentThreadId);};entry.addListener(listener);long t = time.get();if (ttl >= 0 && ttl < time.get()) {t = ttl;}if (!executed.get()) {Timeout scheduledFuture = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {@Overridepublic void run(Timeout timeout) throws Exception {if (entry.removeListener(listener)) {long elapsed = System.currentTimeMillis() - current;time.addAndGet(-elapsed);tryLockAsync(time, leaseTime, unit, subscribeFuture, result, currentThreadId);}}}, t, TimeUnit.MILLISECONDS);futureRef.set(scheduledFuture);}}});}}
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
