zookeeper实现分布式锁

一、分布式锁介绍

分布式锁主要用于在分布式环境中保护跨进程、跨主机、跨网络的共享资源实现互斥访问,以达到保证数据的一致性。

二、架构介绍

在介绍使用Zookeeper实现分布式锁之前,首先看当前的系统架构图

解释: 左边的整个区域表示一个Zookeeper集群,locker是Zookeeper的一个持久节点,node_1、node_2、node_3是locker这个持久节点下面的临时顺序节点。client_1、client_2、client_n表示多个客户端,Service表示需要互斥访问的共享资源。

三、分布式锁获取思路

1.获取分布式锁的总体思路

在获取分布式锁的时候在locker节点下创建临时顺序节点,释放锁的时候删除该临时节点。客户端调用createNode方法在locker下创建临时顺序节点,然后调用getChildren(“locker”)来获取locker下面的所有子节点,注意此时不用设置任何Watcher。客户端获取到所有的子节点path之后,如果发现自己在之前创建的子节点序号最小,那么就认为该客户端获取到了锁。如果发现自己创建的节点并非locker所有子节点中最小的,说明自己还没有获取到锁,此时客户端需要找到比自己小的那个节点,然后对其调用exist()方法,同时对其注册事件监听器。之后,让这个被关注的节点删除,则客户端的Watcher会收到相应通知,此时再次判断自己创建的节点是否是locker子节点中序号最小的,如皋是则获取到了锁,如果不是则重复以上步骤继续获取到比自己小的一个节点并注册监听。当前这个过程中还需要许多的逻辑判断。

2.获取分布式锁的核心算法流程

下面同个一个流程图来分析获取分布式锁的完整算法,如下:

解释:客户端A要获取分布式锁的时候首先到locker下创建一个临时顺序节点(node_n),然后立即获取locker下的所有(一级)子节点。

此时因为会有多个客户端同一时间争取锁,因此locker下的子节点数量就会大于1。对于顺序节点,特点是节点名称后面自动有一个数字编号,先创建的节点数字编号小于后创建的,因此可以将子节点按照节点名称后缀的数字顺序从小到大排序,这样排在第一位的就是最先创建的顺序节点,此时它就代表了最先争取到锁的客户端!此时判断最小的这个节点是否为客户端A之前创建出来的node_n,如果是则表示客户端A获取到了锁,如果不是则表示锁已经被其它客户端获取,因此客户端A要等待它释放锁,也就是等待获取到锁的那个客户端B把自己创建的那个节点删除。

此时就通过监听比node_n次小的那个顺序节点的删除事件来知道客户端B是否已经释放了锁,如果是,此时客户端A再次获取locker下的所有子节点,再次与自己创建的node_n节点对比,直到自己创建的node_n是locker的所有子节点中顺序号最小的,此时表示客户端A获取到了锁!

四、基于Zookeeper的分布式锁的代码实现

1.定义分布式锁接口

定义的分布式锁接口如下:

public interface DistributedLock {

   /**获取锁,如果没有得到就等待*/
   public void acquire() throws Exception;

   /**
   * 获取锁,直到超时
   * @param time超时时间
   * @param unit time参数的单位
   * @return是否获取到锁
   * @throws Exception
   */
   public boolean acquire (long time, TimeUnit unit) throws Exception;

   /**
    * 释放锁
    * @throws Exception
    */
   public void release() throws Exception;
}

2.定义一个简单的互斥锁

定义一个互斥锁类,实现以上定义的锁接口,同时继承一个基类BaseDistributedLock,该基类主要用于与Zookeeper交互,包含一个尝试获取锁的方法和一个释放锁。

/**锁接口的具体实现,主要借助于继承的父类BaseDistributedLock来实现的接口方法
 * 该父类是基于Zookeeper实现分布式锁的具体细节实现*/
public class SimpleDistributedLockMutex extends BaseDistributedLock implements DistributedLock {
  /*用于保存Zookeeper中实现分布式锁的节点,如名称为locker:/locker,
 *该节点应该是持久节点,在该节点下面创建临时顺序节点来实现分布式锁 */
 private final String basePath;

 /*锁名称前缀,locker下创建的顺序节点例如都以lock-开头,这样便于过滤无关节点
 *这样创建后的节点类似:lock-00000001,lock-000000002*/
 private staticfinal String LOCK_NAME ="lock-";

 /*用于保存某个客户端在locker下面创建成功的顺序节点,用于后续相关操作使用(如判断)*/
 private String ourLockPath;

 /**
 * 用于获取锁资源,通过父类的获取锁方法来获取锁
 * @param time获取锁的超时时间
 * @param unit time的时间单位
 * @return是否获取到锁
 * @throws Exception
 */
 private boolean internalLock (long time, TimeUnit unit) throws Exception {
  //如果ourLockPath不为空则认为获取到了锁,具体实现细节见attemptLock的实现
  ourLockPath = attemptLock(time, unit);
  return ourLockPath !=null;
 }

 /**
  * 传入Zookeeper客户端连接对象,和basePath
  * @param client Zookeeper客户端连接对象
  * @param basePath basePath是一个持久节点
  */
 public SimpleDistributedLockMutex(ZkClientExt client, String basePath){
  /*调用父类的构造方法在Zookeeper中创建basePath节点,并且为basePath节点子节点设置前缀
  *同时保存basePath的引用给当前类属性*/
  super(client,basePath,LOCK_NAME);
  this.basePath = basePath;
 }

 /**获取锁,直到超时,超时后抛出异常*/
 public void acquire() throws Exception {
  //-1表示不设置超时时间,超时由Zookeeper决定
  if (!internalLock(-1,null)){
   throw new IOException("连接丢失!在路径:'"+basePath+"'下不能获取锁!");
  }
 }

 /**
 * 获取锁,带有超时时间
 */
 public boolean acquire(long time, TimeUnit unit) throws Exception {
  return internalLock(time, unit);
 }

 /**释放锁*/
 public void release()throws Exception {
  releaseLock(ourLockPath);
 }
}

3. 分布式锁的实现细节

获取分布式锁的重点逻辑在于BaseDistributedLock,实现了基于Zookeeper实现分布式锁的细节。

public class BaseDistributedLock {

 private final ZkClientExt client;
 private final String path;
 private final String basePath;
 private final String lockName;
 private static final Integer MAX_RETRY_COUNT = 10;

 public BaseDistributedLock(ZkClientExt client, String path, String lockName){
 this.client = client;
 this.basePath = path;
 this.path = path.concat("/").concat(lockName);
 this.lockName = lockName;
 }

 private void deleteOurPath(String ourPath) throws Exception{
 client.delete(ourPath);
 }

 private String createLockNode(ZkClient client, String path) throws Exception{
 return client.createEphemeralSequential(path, null);
 }

 /**
 * 获取锁的核心方法
 * @param startMillis
 * @param millisToWait
 * @param ourPath
 * @return
 * @throws Exception
 */
 private boolean waitToLock(long startMillis, Long millisToWait, String ourPath) throws Exception{

 boolean haveTheLock = false;
 boolean doDelete = false;

 try{
  while ( !haveTheLock ) {
  //该方法实现获取locker节点下的所有顺序节点,并且从小到大排序
  List<String> children = getSortedChildren();
  String sequenceNodeName = ourPath.substring(basePath.length()+1);

  //计算刚才客户端创建的顺序节点在locker的所有子节点中排序位置,如果是排序为0,则表示获取到了锁
  int ourIndex = children.indexOf(sequenceNodeName);

  /*如果在getSortedChildren中没有找到之前创建的[临时]顺序节点,这表示可能由于网络闪断而导致
   *Zookeeper认为连接断开而删除了我们创建的节点,此时需要抛出异常,让上一级去处理
   *上一级的做法是捕获该异常,并且执行重试指定的次数 见后面的 attemptLock方法 */
  if ( ourIndex<0 ){
   throw new ZkNoNodeException("节点没有找到: " + sequenceNodeName);
  }

  //如果当前客户端创建的节点在locker子节点列表中位置大于0,表示其它客户端已经获取了锁
  //此时当前客户端需要等待其它客户端释放锁,
  boolean isGetTheLock = ourIndex == 0;

  //如何判断其它客户端是否已经释放了锁?从子节点列表中获取到比自己次小的哪个节点,并对其建立监听
  String pathToWatch = isGetTheLock ? null : children.get(ourIndex - 1);

  if ( isGetTheLock ){
   haveTheLock = true;
  }else{
   //如果次小的节点被删除了,则表示当前客户端的节点应该是最小的了,所以使用CountDownLatch来实现等待
   String previousSequencePath = basePath .concat( "/" ) .concat( pathToWatch );
   final CountDownLatch latch = new CountDownLatch(1);
   final IZkDataListener previousListener = new IZkDataListener() {

   //次小节点删除事件发生时,让countDownLatch结束等待
   //此时还需要重新让程序回到while,重新判断一次!
   public void handleDataDeleted(String dataPath) throws Exception {
    latch.countDown();
   }

   public void handleDataChange(String dataPath, Object data) throws Exception {
    // ignore
   }
   };

   try{
   //如果节点不存在会出现异常
   client.subscribeDataChanges(previousSequencePath, previousListener);

   if ( millisToWait != null ){
    millisToWait -= (System.currentTimeMillis() - startMillis);
    startMillis = System.currentTimeMillis();
    if ( millisToWait <= 0 ){
    doDelete = true; // timed out - delete our node
    break;
    }

    latch.await(millisToWait, TimeUnit.MICROSECONDS);
   }else{
    latch.await();
   }

   }catch ( ZkNoNodeException e ){
   //ignore
   }finally{
   client.unsubscribeDataChanges(previousSequencePath, previousListener);
   }
  }
  }
 }catch ( Exception e ){
  //发生异常需要删除节点
  doDelete = true;
  throw e;

 }finally{
  //如果需要删除节点
  if ( doDelete ){
  deleteOurPath(ourPath);
  }
 }
 return haveTheLock;
 } 

 private String getLockNodeNumber(String str, String lockName) {
 int index = str.lastIndexOf(lockName);
 if ( index >= 0 ){
  index += lockName.length();
  return index <= str.length() ? str.substring(index) : "";
 }
 return str;
 }

 private List<String> getSortedChildren() throws Exception {
 try{
  List<String> children = client.getChildren(basePath);
  Collections.sort(
  children,
  new Comparator<String>(){
   public int compare(String lhs, String rhs){
   return getLockNodeNumber(lhs, lockName).compareTo(getLockNodeNumber(rhs, lockName));
   }
  }
  );
  return children;

 }catch(ZkNoNodeException e){
  client.createPersistent(basePath, true);
  return getSortedChildren();
 }
 }

 protected void releaseLock(String lockPath) throws Exception{
 deleteOurPath(lockPath);
 }

 protected String attemptLock(long time, TimeUnit unit) throws Exception{
 final long startMillis = System.currentTimeMillis();
 final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;

 String  ourPath = null;
 boolean  hasTheLock = false;
 boolean  isDone = false;
 int  retryCount = 0;

 //网络闪断需要重试一试
 while ( !isDone ){
  isDone = true;

  try{
  //createLockNode用于在locker(basePath持久节点)下创建客户端要获取锁的[临时]顺序节点
  ourPath = createLockNode(client, path);
  /**
   * 该方法用于判断自己是否获取到了锁,即自己创建的顺序节点在locker的所有子节点中是否最小
   * 如果没有获取到锁,则等待其它客户端锁的释放,并且稍后重试直到获取到锁或者超时
   */
  hasTheLock = waitToLock(startMillis, millisToWait, ourPath);

  }catch ( ZkNoNodeException e ){
  if ( retryCount++ < MAX_RETRY_COUNT ){
   isDone = false;
  }else{
   throw e;
  }
  }
 }

 if ( hasTheLock ){
  return ourPath;
 }

 return null;
}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • 浅谈分布式锁的几种使用方式(redis、zookeeper、数据库)

    Q:一个业务服务器,一个数据库,操作:查询用户当前余额,扣除当前余额的3%作为手续费 synchronized lock dblock Q:两个业务服务器,一个数据库,操作:查询用户当前余额,扣除当前余额的3%作为手续费 分布式锁 我们需要怎么样的分布式锁? 可以保证在分布式部署的应用集群中,同一个方法在同一时间只能被一台机器上的一个线程执行. 这把锁要是一把可重入锁(避免死锁) 这把锁最好是一把阻塞锁(根据业务需求考虑要不要这条) 这把锁最好是一把公平锁(根据业务需求考虑要不要这条) 有高可用

  • Java分布式锁的概念与实现方式详解

    什么是分布式锁?在回答这个问题之前,我们先回答一下什么是锁. 普通的锁,即在单机多线程环境下,当多个线程需要访问同一个变量或代码片段时,被访问的变量或代码片段叫做临界区域,我们需要控制线程一个一个的顺序执行,否则会出现并发问题. 如何控制呢?就是设置一个各个线程都能看的见的标志.然后,每个线程想访问临界区域时,都要先查看标志,如果标志没有被占用,则说明目前没有线程在访问临界区域.如果标志被占用了,则说明目前有线程正在访问临界区域,则当前线程需要等待. 这个标志,就是锁. 在单机多线程的java程

  • java使用zookeeper实现的分布式锁示例

    使用zookeeper实现的分布式锁 分布式锁,实现了Lock接口 复制代码 代码如下: package com.concurrent; import java.io.IOException;import java.util.ArrayList;import java.util.Collections;import java.util.List;import java.util.concurrent.CountDownLatch;import java.util.concurrent.TimeU

  • 浅谈Java(SpringBoot)基于zookeeper的分布式锁实现

    通过zookeeper实现分布式锁 1.创建zookeeper的client 首先通过CuratorFrameworkFactory创建一个连接zookeeper的连接CuratorFramework client public class CuratorFactoryBean implements FactoryBean<CuratorFramework>, InitializingBean, DisposableBean { private static final Logger LOGG

  • 如何操作Redis和zookeeper实现分布式锁

    如何操作Redis和zookeeper实现分布式锁 在分布式场景下,有很多种情况都需要实现最终一致性.在设计远程上下文的领域事件的时候,为了保证最终一致性,在通过领域事件进行通讯的方式中,可以共享存储(领域模型和消息的持久化数据源),或者做全局XA事务(两阶段提交,数据源可分开),也可以借助消息中间件(消费者处理需要能幂等).通过Observer模式来发布领域事件可以提供很好的高并发性能,并且事件存储也能追溯更小粒度的事件数据,使各个应用系统拥有更好的自治性. 1.分布式锁 分布式锁一般用在分布

  • zookeeper实现分布式锁

    一.分布式锁介绍 分布式锁主要用于在分布式环境中保护跨进程.跨主机.跨网络的共享资源实现互斥访问,以达到保证数据的一致性. 二.架构介绍 在介绍使用Zookeeper实现分布式锁之前,首先看当前的系统架构图 解释: 左边的整个区域表示一个Zookeeper集群,locker是Zookeeper的一个持久节点,node_1.node_2.node_3是locker这个持久节点下面的临时顺序节点.client_1.client_2.client_n表示多个客户端,Service表示需要互斥访问的共享

  • 基于Zookeeper实现分布式锁详解

    目录 1.什么是Zookeeper? 2.Zookeeper节点类型 3.Zookeeper环境搭建 4.Zookeeper基本使用 5.Zookeeper应用场景 6.Zookeeper分布式锁 7.公平式Zookeeper分布式锁 8.zookeeper和Redis锁对比? 1.什么是Zookeeper? Zookeeper是一个分布式的,开源的分布式应用程序协调服务,是Hadoop和hbase的重要组件. 引用官网的图例: 特征: zookeeper的数据机构是一种节点树的数据结构,zNo

  • springboot+zookeeper实现分布式锁的示例代码

    目录 依赖 本地封装 配置 测试代码 JMeter测试 InterProcessMutex内部实现了zookeeper分布式锁的机制,所以接下来我们尝试使用这个工具来为我们的业务加上分布式锁处理的功能 zookeeper分布式锁的特点:1.分布式 2.公平锁 3.可重入 依赖 <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId>

  • Spring Boot整合Zookeeper实现分布式锁的场景分析

    目录 一.Java当中关于锁的概念 1.1.什么是锁 1.2.锁的使用场景 1.3.什么是分布式锁 1.4.分布式锁的使用场景 二.zk实现分布式锁 2.1.zk中锁的种类: 2.2.zk如何上读锁 2.3.zk如何上写锁 2.4.⽺群效应 三.springboot整合分布式锁 温馨提示:本篇文章要求掌握zk的数据结构,以及临时序号节点! zk实现分布式锁完全是依靠zk节点类型当中的临时序号节点来实现的 一.Java当中关于锁的概念 1.1.什么是锁 锁是用来控制多个线程访问共享资源的方式,一般

  • ZooKeeper 实现分布式锁的方法示例

    ZooKeeper 是一个典型的分布式数据一致性解决方案,分布式应用程序可以基于 ZooKeeper 实现诸如数据发布/订阅.负载均衡.分布式协调/通知.集群管理.Master 选举.分布式锁等功能. 节点 在介绍 ZooKeeper 分布式锁前需要先了解一下 ZooKeeper 中节点(Znode),ZooKeeper 的数据存储数据模型是一棵树(Znode Tree),由斜杠(/)的进行分割的路径,就是一个 Znode(如 /locks/my_lock).每个 Znode 上都会保存自己的数

  • 分析ZooKeeper分布式锁的实现

    目录 一.分布式锁方案比较 二.ZooKeeper实现分布式锁 2.1.方案一 2.2.方案二 一.分布式锁方案比较 方案 实现思路 优点 缺点 利用 MySQL 的实现方案 利用数据库自身提供的锁机制实现,要求数据库支持行级锁 实现简单 性能差,无法适应高并发场景:容易出现死锁的情况:无法优雅的实现阻塞式锁 利用 Redis 的实现方案 使用 Setnx 和 lua 脚本机制实现,保证对缓存操作序列的原子性 性能好 实现相对复杂,有可能出现死锁:无法优雅的实现阻塞式锁 利用 ZooKeeper

  • C# 实现Zookeeper分布式锁的参考示例

    目录 分布式锁 Zookeeper分布式锁原理 C#实现Zookeeper分布式锁 分布式锁 互联网初期,我们系统一般都是单点部署,也就是在一台服务器完成系统的部署,后期随着用户量的增加,服务器的压力也越来越大,响应速度越来越慢,甚至出现服务器崩溃的情况. 为解决服务器压力太大,响应慢的特点,分布式系统部署出现了. 简单的说,就是我们将系统资源部署到多台服务器中,然后使用一台服务器做入口代理,根据一些决策将接收到的请求转发到资源服务器,这也就是我们常说的 反向代理(一般就是使用nginx) 虽然

随机推荐