首页 编程 软件学院 查看内容

基于Zookeeper的分布式共享锁

2015-7-9 14:49 |来自: http://itindex.net/ 1596 0

摘要: 首先,说说我们的场景,订单服务是做成集群的,当两个以上结点同时收到一个相同订单的创建指令,这时并发就产生了,系统就会重复创建订单。等等......场景。这时,分布式共享锁就闪亮登场了。共享锁在同一个进程中是 ...
关键词: CuratorFramework LockZookeeperCli public version import dependency basePath private apache void

首先,说说我们的场景,订单服务是做成集群的,当两个以上结点同时收到一个相同订单的创建指令,这时并发就产生了,系统就会重复创建订单。等等......场景。这时,分布式共享锁就闪亮登场了。

共享锁在同一个进程中是很容易实现的,但在跨进程或者在不同Server之间就不好实现了。Zookeeper就很容易实现。具体的实现原理官网和其它网站也有翻译,这里就不在赘述了。

官网资料: http://zookeeper.apache.org/doc/r3.4.5/recipes.html

中文资料: https://www.ibm.com/developerworks/cn/opensource/os-cn-zookeeper

详见Locks章节。

原理都知道了,网上一搜索Apache上面已经有提供了,既然已经有轮子了,哪我们也没必要重复造轮子了吧!直接使用 Curator 。但是,我们在测试中发现,用于共享锁的结点无法自动回收,除了最末一级的临时结点会在锁释放和session超时的时候能自动回收外,其它结点均无法自动回收。我们的订单一天有好几万,遇到618和双十一的时候每天的订单量超50W,如果结点长期不回收的话,肯定会影响Zookeeper的性能。这时,我们就想到了一句话“自己动手,丰衣足食”。下面直接上代码:

首先,创建一个Maven工程,在pom文件里导入下面的包:

<dependencies>
    <dependency>
      <groupId>org.apache.zookeeper</groupId>
      <artifactId>zookeeper</artifactId>
      <version>3.4.6</version>
    </dependency>
    <dependency>
      <groupId>org.apache.curator</groupId>
      <artifactId>curator-client</artifactId>
      <version>2.8.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.curator</groupId>
      <artifactId>curator-recipes</artifactId>
      <version>2.8.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.curator</groupId>
      <artifactId>curator-framework</artifactId>
      <version>2.8.0</version>
    </dependency>
    <dependency>
      <groupId>commons-beanutils</groupId>
      <artifactId>commons-beanutils</artifactId>
      <version>1.9.2</version>
    </dependency>
    <dependency>
      <groupId>commons-logging</groupId>
      <artifactId>commons-logging</artifactId>
      <version>1.2</version>
    </dependency>
    <dependency>
      <groupId>commons-lang</groupId>
      <artifactId>commons-lang</artifactId>
      <version>2.6</version>
    </dependency>
  </dependencies>

LockZookeeperClient接口:

package com.**.framework.lock;

import org.apache.curator.framework.CuratorFramework;

/**
 * 
 * description
 * 
 * @author Roadrunners
 * @version 1.0, 2015年7月9日
 */
public inte**ce LockZookeeperClient {
  /**
   * 
   * @return
   */
  CuratorFramework getCuratorFramework();

  /**
   * 
   * @return
   */
  String getBasePath();

  /**
   * garbage collector
   * 
   * @param gcPath
   */
  void gc(String gcPath);
}

LockZookeeperClient接口的实现LockZookeeperClientFactory:

package com.**.framework.lock;

import java.util.Date;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentSkipListSet;

import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;

/**
 * 
 * description
 *  
 * @author Roadrunners
 * @version 1.0, 2015年7月9日
 */
public class LockZookeeperClientFactory implements LockZookeeperClient {
  private static final Log LOG = LogFactory.getLog(LockZookeeperClientFactory.class);

  private boolean hasGc = true;
  private Timer gcTimer;
  private TimerTask gcTimerTask;
  private ConcurrentSkipListSet<String> gcPaths = new ConcurrentSkipListSet<String>();
  private int gcIntervalSecond = 60;

  private CuratorFramework curatorFramework;
  private String zookeeperIpPort = "localhost:2181";
  private int sessionTimeoutMs = 10000;
  private int connectionTimeoutMs = 10000;
  private String basePath = "/locks";

  public void setHasGc(boolean hasGc) {
    this.hasGc = hasGc;
  }

  public void setGcIntervalSecond(int gcIntervalSecond) {
    this.gcIntervalSecond = gcIntervalSecond;
  }

  public void setZookeeperIpPort(String zookeeperIpPort) {
    this.zookeeperIpPort = zookeeperIpPort;
  }

  public void setSessionTimeoutMs(int sessionTimeoutMs) {
    this.sessionTimeoutMs = sessionTimeoutMs;
  }

  public void setConnectionTimeoutMs(int connectionTimeoutMs) {
    this.connectionTimeoutMs = connectionTimeoutMs;
  }

  public void setBasePath(String basePath) {
    basePath = basePath.trim();
    if (basePath.endsWith("/")) {
      basePath = basePath.substring(0, basePath.length() - 1);
    }

    this.basePath = basePath;
  }

  public void init() {
    if(StringUtils.isBlank(zookeeperIpPort)){
      throw new NullPointerException("zookeeperIpPort");
    }

    ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
    curatorFramework = CuratorFrameworkFactory.newClient(zookeeperIpPort.trim(), sessionTimeoutMs, connectionTimeoutMs, retryPolicy);
    curatorFramework.start();
    LOG.info("CuratorFramework initialise succeed.");

    if (hasGc) {
      gc();
    }
  }

  public void destroy() {
    gcPaths.clear();
    gcPaths = null;
    gcStop();
    curatorFramework.close();
    curatorFramework = null;
  }

  @Override
  public void gc(String gcPath) {
    if (hasGc && StringUtils.isNotBlank(gcPath)) {
      gcPaths.add(gcPath.trim());
    }
  }

  @Override
  public CuratorFramework getCuratorFramework() {
    return this.curatorFramework;
  }

  @Override
  public String getBasePath() {
    return this.basePath;
  }

  private synchronized void gc() {
    gcStop();

    try {
      scanningGCNodes();
    } catch (Throwable e) {
      LOG.warn(e);
    }

    gcTimerTask = new TimerTask() {
      @Override
      public void run() {
        doingGc();
      }
    };

    Date begin = new Date();
    begin.setTime(begin.getTime() + (10 * 1000L));
    gcTimer = new Timer("lock-gc", true);
    gcTimer.schedule(gcTimerTask, begin, gcIntervalSecond * 1000L);
  }

  private synchronized void gcStop() {
    if (null != gcTimer) {
      gcTimer.cancel();
      gcTimer = null;
    }

    if (null != gcTimerTask) {
      gcTimerTask.cancel();
      gcTimerTask = null;
    }
  }

  private synchronized void scanningGCNodes() throws Exception {
    if (null == curatorFramework.checkExists().forPath(basePath)) {
      return;
    }

    List<String> paths = curatorFramework.getChildren().forPath(basePath);
    if (CollectionUtils.isEmpty(paths)) {
      gcPaths.add(basePath);
      return;
    }

    for (String path : paths) {
      try{
        String tmpPath = basePath + "/" + path;
        if (null == curatorFramework.checkExists().forPath(tmpPath)) {
          continue;
        }
        
        gcPaths.add(tmpPath);
      } catch(Throwable e){
        LOG.warn("scanning gc nodes error.", e);
      }
    }
  }
  
  private synchronized void doingGc() {
    LOG.debug("GC beginning.");

    if (CollectionUtils.isNotEmpty(gcPaths)) {
      for (String path : gcPaths) {
        try {
          if (null != curatorFramework.checkExists().forPath(path)) {
            if (CollectionUtils.isEmpty(curatorFramework.getChildren().forPath(path))) {
              curatorFramework.delete().forPath(path);
              gcPaths.remove(path);
              LOG.debug("GC " + path);
            }
          } else {
            gcPaths.remove(path);
          }
        } catch (Throwable e) {
          gcPaths.remove(path);
          LOG.warn(e);
        }
      }
    }

    LOG.debug("GC ended.");
  }

}

SharedLock共享锁:

package com.**.framework.lock.shared;

import java.util.concurrent.TimeUnit;

import org.apache.commons.lang.StringUtils;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;

import com.**.framework.lock.LockZookeeperClient;

/**
 * 
 * description
 * 
 * @author Roadrunners
 * @version 1.0, 2015年7月9日
 */
public class SharedLock {
  private InterProcessLock interProcessLock;

  public SharedLock(LockZookeeperClient lockZookeeperClient, String resourceId) {
    super();
    
    if (StringUtils.isBlank(resourceId)) {
      throw new NullPointerException("resourceId");
    }
    String path = lockZookeeperClient.getBasePath();
    path += ("/" + resourceId.trim());

    interProcessLock = new InterProcessMutex(lockZookeeperClient.getCuratorFramework(), path);
    lockZookeeperClient.gc(path);
  }
  
    /**
     * Acquire the mutex - blocking until it's available. Each call to acquire must be balanced by a call
     * to {@link #release()}
     *
     * @throws Exception ZK errors, connection interruptions
     */
  public void acquire() throws Exception {
    interProcessLock.acquire();
  }

    /**
     * Acquire the mutex - blocks until it's available or the given time expires. Each call to acquire that returns true must be balanced by a call
     * to {@link #release()}
     *
     * @param time time to wait
     * @param unit time unit
     * @return true if the mutex was acquired, false if not
     * @throws Exception ZK errors, connection interruptions
     */
  public boolean acquire(long time, TimeUnit unit) throws Exception {
    return interProcessLock.acquire(time, unit);
  }

    /**
     * Perform one release of the mutex.
     *
     * @throws Exception ZK errors, interruptions, current thread does not own the lock
     */
  public void release() throws Exception {
    interProcessLock.release();
  }
  
    /**
     * Returns true if the mutex is acquired by a thread in this JVM
     *
     * @return true/false
     */
  public boolean isAcquiredInThisProcess() {
    return interProcessLock.isAcquiredInThisProcess();
  }
}

到此代码已经完成。下面写一个简单的Demo:

//LockZookeeperClientFactory通常是通过Spring配置注入的,此处是为了Demo的简单明了才这样写的,不建议这样写
    LockZookeeperClientFactory lzc = new LockZookeeperClientFactory();
    lzc.setZookeeperIpPort("10.100.15.1:8900");
    lzc.setBasePath("/locks/sharedLock/");
    lzc.init();

    SharedLock sharedLock = new SharedLock(lzc, "sharedLock1");
    try {
      if (sharedLock.acquire(100, TimeUnit.MILLISECONDS)) {
        System.out.println("sharedLock1 get");
      }
    } catch (Exception e) {
      e.printStackTrace();
    } finally {
      try {
        sharedLock.release();
      } catch (Exception e) {
        e.printStackTrace();
      }
    }

    lzc.destroy();

就这样,系统就会每隔一分钟去回收一次没有使用的结点。

声明:文章版权归原作者所有 部分文章转自互联网 如有侵权请联系 [邮箱地址] 删除
1

路过

雷人

握手

鲜花

鸡蛋

刚表态过的朋友 (1 人)

最新评论

返回顶部