Contents
  1. 1. 项目实战
  2. 2. 注意事项
  3. 3. 代码逻辑
  4. 4. 代码讲解
  5. 5. 核心函数

分布式锁实现含源码,跑通过
https://blog.csdn.net/peace1213/article/details/52571445

实现的源码
https://github.com/huangzhenshi/DistributedLockZk/tree/master

项目实战

  1. 实现方案
  • 数据库锁实现
  • zk实现
  • redis实现
  1. zk的实现项目逻辑
    写2个集群的微服务,循环获取分布式锁,消费相同数据库中的存票余量,来检测分布式锁的实现效果。
    获取锁的时候输出一段话,做一些模拟的消费操作,然后查询数据库,获取count,然后-1再save到数据库中。最后打印释放锁和释放的时间。

    注意事项

  2. 每一个线程都应该创建一个单独的zkClient,这样当他释放锁时zk.close(),就会删除掉当前的临时节点
  3. 每个线程就是一个Znode,就有一个单独的zkClient,对他之前的Znode都绑定了一个监听,监听里面有解除当前线程阻塞的CountDownLatch.countdown()
    ConcurrentTask[] tasks = new ConcurrentTask[5];
    for(int i=0;i<tasks.length;i++){
    ConcurrentTask task3 = new ConcurrentTask(){
    public void run() {
    DistributedLock lock = null;
    try {
    lock = new DistributedLock("127.0.0.1:2181","lock2");
    lock.lock();
    System.out.println("Thread " + Thread.currentThread().getId() + " running");
    Thread.sleep(1000);
    } catch (Exception e) {
    e.printStackTrace();
    }
    finally {
    lock.unlock();
    lock=null;
    }
    }
    };
    tasks[i] = task3;
    }
    new ConcurrentTest(tasks);//执行task

代码逻辑

trylock的时候会触发在zk中创建一个唯一的顺序节点,由ZK来保证创建一个不会重复的顺序节点(确保了原子性),并且在排队节点时,在上一个节点上绑定一个Watcher,通过阻塞的方式直到获取到分布式锁。

  • 每个线程会初始化一个zkClient,并且该zkClient带有一个Watcher
  • 获取分布式锁的时候会创建一个临时的Znode,并且判断当前节点是否是首节点,是的话,表示获取锁成功,执行业务逻辑,不是的话,给上一个顺序节点绑定一个监听器,当前线程阻塞,这里面还引入了一个countdownLatch,来实现触发器的解禁阻塞的功能。
  • 解禁之后表示当前线程获取分布式锁,执行业务代码
  • 方法调用结束后,删除当前节点,关闭zkClient
  • 实现的分布式锁是严格的按照顺序访问的并发锁

代码讲解

使用:先初始化一个锁,然后调用锁(可能会获取到,可能会在排队阻塞,最后获取锁,最后释放锁

final DistributedLock lock = new DistributedLock("127.0.0.1:2181","lock");
Runnable task1 = new Runnable(){
public void run() {
try {
lock.lock();
System.out.println("进入锁成功");
Thread.sleep(2000);
System.out.println("===Thread " + Thread.currentThread().getId() + " running");
} catch (Exception e) {
e.printStackTrace();
}
finally {
if(lock != null)
lock.unlock();
}
}
};
new Thread(task1).start();

核心函数

//构造一个以特定lockName的分布式锁
public DistributedLock(String config, String lockName)
//这个方法的执行就是获取分布式锁,结果两种,一种是tryLock成功,立马执行业务操作,一种是线程阻塞,直到waitForLock解禁,然后再执行业务操作
public void lock() --> tryLock()、waitForLock(waitNode, sessionTimeout);
//释放锁,删除当前节点,关闭zk(其实不用手动删除也可以,因为是临时节点,但是手动删除会更快的触发监听器)
public void unlock()
//如果获取分布式锁成功则返回true,如果获取失败的话,则会把成员变量里面的waitNode设置为当前node的前一个node
public boolean tryLock()
//一个线程阻塞的方法,会给lower节点设置监听器,直到监听器触发解禁,当前线程才解除阻塞,获取分布式锁成功
private boolean waitForLock(String lower, long waitTime)

ZKClient的监听函数

public void process(WatchedEvent event) {
System.out.println("已经触发了" + event.getType() + "事件!");
//建立连接用
if(event.getState()==KeeperState.SyncConnected&&event.getType()==EventType.None){
connectedSignal.countDown();
return;
}
if(this.latch != null) {
this.latch.countDown();
}
}

Contents
  1. 1. 项目实战
  2. 2. 注意事项
  3. 3. 代码逻辑
  4. 4. 代码讲解
  5. 5. 核心函数