借助Zookeeper实现排他锁

        排他锁:又称写锁或者独占锁,是一种基本的锁类型。如果事务T1对数据O加上排他锁之后,那么在整个加锁期间,只允许事务T1对数据O进行读写和更新操作,其他任何事务都不能再对这个数据进行任何的类型的操作,直到T1释放了排他锁。

        获取锁原理:在Zookeeper中,多个客户端通过create() 接口创建同一个节点,zookeeper会保证最终只有一个客户端能够创建成功;那么可以认为创建节点成功的客户端获取到了锁,没有创建节点成功的客户端没有获取到锁,没有获取到锁的客户端然后注册一个节点变更的Watcher监听,当这个节点删除后再去创建,直到创建成功(获取到锁)。

        释放锁原理:1、因为客户端创建的节点是一个临时节点,所以当获取到锁的客户端发生了岩机,zookeeper上的这个临时节点会被移除;2、获取到锁的客户端,正常执行完业务逻辑后,客户端会主动将自己创建的临时节点删除。

        简单的代码实现:代码中只实现了正常的获取锁和释放锁的逻辑。

// 定义一个锁接口
public interface Lock {boolean lock();void unLock();
}/*** 分布式排他锁* 		支持重入**/
public class MutexLock implements Lock {private static final String MUTEX_LOCK_ROOT = "/mutex-lock-root";private final String lockName;private final ConcurrentMap threadData = Maps.newConcurrentMap();@SuppressWarnings("unused")private static class LockData {final Thread owningThread;final AtomicInteger lockCount = new AtomicInteger(1);private LockData(Thread owningThread){this.owningThread = owningThread;}}public MutexLock(String lockName) {this.lockName = PathUtils.validatePath(MUTEX_LOCK_ROOT + "-" + lockName);}@Overridepublic boolean lock() {boolean result = true;Thread currentThread = Thread.currentThread();LockData lockData = threadData.get(currentThread);if (null != lockData) {lockData.lockCount.incrementAndGet();return result;}try {String lockPath = innerLock();if (StringUtils.isNotEmpty(lockPath)) {lockData = new LockData(currentThread);threadData.put(currentThread, lockData);}} catch (Exception e) {e.printStackTrace();result = false;}return result;}@Overridepublic void unLock() {Thread currentThread = Thread.currentThread();LockData lockData = threadData.get(currentThread);if (null == lockData) {return;}int count = lockData.lockCount.decrementAndGet();if (0 < count) {return;}threadData.remove(currentThread);// 允许重试5次ZooKeeper zooKeeper = LockUtils.newZookeeper();int retry = 5;while (retry-- > 0) {try {zooKeeper.delete(lockName, -1);retry = 0;} catch (InterruptedException | KeeperException e) {e.printStackTrace();}}}private String innerLock() throws Exception {final ZooKeeper zooKeeper = LockUtils.newZookeeper();CountDownLatch countDownLatch = new CountDownLatch(1);String result = createPath(zooKeeper, countDownLatch);countDownLatch.await();while (StringUtils.isEmpty(result)) {result = createPath(zooKeeper, countDownLatch);}return result;}private String createPath(ZooKeeper zooKeeper, CountDownLatch countDownLatch) {String resultPath = null;try {// 创建临时节点resultPath = zooKeeper.create(lockName, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);// 创建成功countDownLatch.countDown();} catch (KeeperException.NodeExistsException e) {// 节点存在watchPath(zooKeeper, countDownLatch);} catch (Exception e) {e.printStackTrace();}return resultPath;}private void watchPath(ZooKeeper zooKeeper, CountDownLatch countDownLatch) {try {Stat stat = zooKeeper.exists(lockName, new Watcher() {@Overridepublic void process(WatchedEvent event) {if (EventType.NodeDeleted == event.getType()) {countDownLatch.countDown();}}	});if (null == stat) {//节点不存在countDownLatch.countDown();}} catch (KeeperException | InterruptedException e) {e.printStackTrace();}}}public class LockUtils {private static ZooKeeper zooKeeper;public static ZooKeeper newZookeeper() {if (null == zooKeeper) {init();}return zooKeeper;}private static void init() {if (null == zooKeeper) {synchronized (LockUtils.class) {if (null == zooKeeper) {try {CountDownLatch countDownLatch = new CountDownLatch(1);zooKeeper = new ZooKeeper("127.0.0.1:2181", 5000, new Watcher() {@Overridepublic void process(WatchedEvent event) {if (KeeperState.SyncConnected == event.getState()) {countDownLatch.countDown();}}});countDownLatch.await();System.out.println("=======init success========");} catch (Exception e) {e.printStackTrace();}}}}}
}

        测试结果:

public class TestLock {public static void main(String[] args) {LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();loggerContext.getLogger("org.apache.zookeeper").setLevel(Level.valueOf("info"));new TestLock().test();}private void test() {final Lock lock = new MutexLock("test");CountDownLatch countDownLatch = new CountDownLatch(1);for (int i=0; i<10; i++) {new Thread(new Runnable() {@Overridepublic void run() {try {countDownLatch.await();} catch (Exception e) {e.printStackTrace();}if (lock.lock()) {System.out.println("one  " + Thread.currentThread().getName() + "----- 当前时间 : " + System.currentTimeMillis());if (lock.lock()) {System.out.println("two  " + Thread.currentThread().getName() + "----- 当前时间 : " + System.currentTimeMillis());lock.unLock();}lock.unLock();}}}).start();}countDownLatch.countDown();}
}测试结果:
one  Thread-0----- 当前时间 : 1561083459488
two  Thread-0----- 当前时间 : 1561083459488
one  Thread-9----- 当前时间 : 1561083459821
two  Thread-9----- 当前时间 : 1561083459823
one  Thread-4----- 当前时间 : 1561083459880
two  Thread-4----- 当前时间 : 1561083459880
one  Thread-7----- 当前时间 : 1561083459925
two  Thread-7----- 当前时间 : 1561083459925
one  Thread-5----- 当前时间 : 1561083459964
two  Thread-5----- 当前时间 : 1561083459964
one  Thread-6----- 当前时间 : 1561083459996
two  Thread-6----- 当前时间 : 1561083459996
one  Thread-3----- 当前时间 : 1561083460079
two  Thread-3----- 当前时间 : 1561083460079
one  Thread-2----- 当前时间 : 1561083460409
two  Thread-2----- 当前时间 : 1561083460409
one  Thread-1----- 当前时间 : 1561083460433
two  Thread-1----- 当前时间 : 1561083460433
one  Thread-8----- 当前时间 : 1561083460447
two  Thread-8----- 当前时间 : 1561083460447

 


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部