使用zookeeper原生API实现分布式锁
分析Curator实现分布式锁的原理
实现带注册中心的RPC框架
使用zookeeper原生API实现分布式锁
什么是分布式锁?
我们在选择一个方案去实现分布式锁的时候,我们得知道我们要解决的是一个什么问题,或者说我们使用zookeeper实现对应的分布式锁是要解决什么问题。
在以前的以及现在的单进程的多线程模型中,我们会利用多线程的基础数据去提高我们程序的运行效率,在现在CPU多核心的背景之下,每个CPU的每个核心能够同时去运行同一个线程,那么这个时候就会存在一个线程的并行执行,如果说多线程并行去访问某一个共享资源的话,那么就会造成共享资源、共享变量的线程安全问题,这是多线程领域中线程安全的一个问题。我们会通过 Synchronized
或者 Lock
去解决,并发带来的线程安全的问题。
在分布式架构里边,我们的概念是相似的。如果我们把整个架构看作是一整个超级计算机的话,那么组成超级计算机的每一个独立的节点都是一个独立的进程、每一个进程都可以并行地去访问去执行一些操作。比如说我们在一个电商平台里的商品库存问题,我们现在要去做秒杀,库存有100个,用户可以去购买,在并行的情况下,再对库存资源进行一个处理的话,如何去避免超卖或者少卖的问题,这个时候我们需要用一些互斥的手段,防止彼此的一些干扰,这个是分布式环境下遇到的问题,我们怎么去解决?分布式环境下的Synchronized和lock不能够解决这些问题。因为他们没办法在多个跨进程的情况下实现锁的特性。
所以我们需要一些机制去实现分布式锁,
zookeeper、redis、数据库都可以去实现分布式锁
我们只需要依赖第三方的一个中间件去做一个锁的存储,锁的释放就好了。
常见的锁有很多种,有排它锁(独占锁),读写锁,共享锁。在不同的情况下,我们会使用不同的锁,去对我们的资源去做一些协调。这就是我们锁这一块使用背景和使用原因。
Zookeeper 能够实现分布式锁的原因在于 Zookeeper 本身有一个节点的特性。首先,我们要实现分布式锁的机制,我们首先要获得锁和释放锁。
Zookeeper 结构是一个节点特性的、文件系统风格的对应的树形的结构。我们现在要通过zookeeper去实现分布式锁的话。
我们现在有三个客户端 clientA,clientB,clientC 要访问某一个资源,我们在操作资源之前要获得锁,我们获得锁的方式有很多种。
实现分布式锁
第一种:我会在 /Locks
节点下去创建一个 lock 节点,每一个客户端都会去创建,按照 zookeeper 本身的节点特性,在同一节点下,它的名字是唯一的,那么,我们客户端三个去创建节点的时候,能够在这里创建成功的只有一个客户端,其他的都会失败,失败的话就去监听节点的变化,如果 /Locks/lock
发生了一个变化,就会触发一个 watcher 的通知,剩下的客户端就会收到通知,再一次去争抢这个锁,这种实现锁的方式会有一个惊群效应!!!(惊群效应:打个比方,我们现在有 30 个节点需要同时获得这个锁。如果说其中一个节点释放了锁,那么就会触发剩下的 29 个节点一起去争抢这个锁,就是一个锁的释放,会引起所有的参与的客户端都会发起一个争抢,也叫羊群效应。这个效应会导致在短时间之内会有大量的事件变更,watcherEvent 发给想要获得锁的客户端,那么实际情况下,只会有一个客户端去获得锁,因此这种方式下,集群的规模比较大的时候,我们的节点访问情况下比较多的情况下,我们不建议这种方式去实现,)
我们可以利用有序节点来实现分布式锁
每一个节点注册的节点都加上seq,这时候我们保证我们的每一个节点都能够注册一个节点上去,我要获得锁的话,我就去这个大的节点下获得一个最小的节点,获得最小的节点以后,表示当前这个客户端能够获得对应的锁,那其他的没有比它小的,没有获得锁的节点,它们之间是做一个相互的监听,加入说我们的 /lock_seq1 获得了锁,那么 /lock_seq2监听/lock_seq1,/lock_seq3会对这个/lock_seq2去做一个监听,只监听比我小一个值的节点,当比它小的节点发生变化的时候,它就可以收到一个事件,再去判断当前的这个节点是不是所有节点里边最小的节点,如果说是的话,那么它就获得锁,如果不是,那就继续等待,直到比它小的节点变化了,再做一个唤醒,那么在这样的一个背景下,我们就不需要去对所有的节点去做一个惊群的一个处理,而是我只对我所关注的节点去做一个处理就好了,这就是利用有序节点来实现分布式锁的概念和原理。
实现分布式锁代码:
首先引入zookeeper包,
可以实现lock,watcher接口
利用 zookeeper 的有序节点实现分布式锁
代码
DistributedLock zookeeper实现分布式锁
package com.gupao.study.vip;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
/**
* <br>类说明 :
* 基于 Lock 来实现分布式锁,
*
* <br>属性说明:
* <br>作者:Darian
**/
public class DistributedLock implements Lock, Watcher {
// 首先定义一个 zookeeper 的连接
private ZooKeeper zooKeeper = null;
// 定义根节点
private String ROOT_LOCK = "/locks";
// 等待前一个锁 ,得到监控的前一个锁的节点
private String WAIT_LOCK;
// 表示当前的锁
private String CURRENT_LOCK;
// 用来做处理
private CountDownLatch countDownLatch;
public DistributedLock() {
try {
zooKeeper = new ZooKeeper("192.168.136.128:2181",
4000, this);
// 判断根节点是否存在
Stat stat = zooKeeper.exists(ROOT_LOCK, false);
if (stat == null) {
// 当前的节点是不存在的
zooKeeper.create(ROOT_LOCK, "0".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
@Override
public void lock() {
if (this.tryLock()) {
// 如果获得锁成功
System.out.println(Thread.currentThread().getName()
+ "-->>"
+ CURRENT_LOCK
+ "获得锁成功");
return;
}
// 没有获得锁,继续等待获得锁
try {
waitForLock(WAIT_LOCK);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 这是一个持续阻塞去获得所得一个过程
**/
private boolean waitForLock(String prev) throws KeeperException, InterruptedException {
// 因为我要等待锁的话,我需要监听上一个锁的节点
// 通过 exists 去监听
// 监听当前节点的上一个节点
Stat stat = zooKeeper.exists(prev, true);
if (stat != null) {
// 表示上一个节点确实存在
System.out.println(Thread.currentThread().getName()
+ "-->>等待锁"
+ ROOT_LOCK
+ "/"
+ prev
+ "释放");
countDownLatch = new CountDownLatch(1);
countDownLatch.await();
System.out.println(Thread.currentThread().getName()
+ "-->>获得锁成功");
}
return true;
}
@Override
public void lockInterruptibly() throws InterruptedException {
}
@Override
public boolean tryLock() {
try {
// 创建临时有序节点
CURRENT_LOCK = zooKeeper.create(ROOT_LOCK + "/", "0".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(Thread.currentThread().getName()
+ "-->>"
+ CURRENT_LOCK
+ "尝试去竞争锁");
// 获取根节点下的所有子节点
List<String> childrens = zooKeeper.getChildren(ROOT_LOCK, false);
// 定义一个集合进行排序
SortedSet<String> sortedSet = new TreeSet<>();
for (String children : childrens) {
sortedSet.add(ROOT_LOCK + "/" + children);
}
// 获得所有子节点中最小的节点
String firstnode = sortedSet.first();
SortedSet<String> lessThenMe = ((TreeSet<String>) sortedSet).headSet(CURRENT_LOCK);
if (CURRENT_LOCK.equals(firstnode)) {
// 通过当前的节点和子节点中最小的节点进行比较,如果相等,标识获得锁成功
return true;
}
if (!lessThenMe.isEmpty()) {
// 如果没有比自己更小的节点
// 获得比当前节点更小的最后一个节点,设置给WAIT_LOCK
WAIT_LOCK = lessThenMe.last();
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return false;
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false;
}
@Override
public void unlock() {
// 释放锁
System.out.println(Thread.currentThread().getName()
+ "-->>释放锁"
+ CURRENT_LOCK);
try {
// 删除掉这个节点,version = -1 表示不管三七二十一都删掉
zooKeeper.delete(CURRENT_LOCK, -1);
CURRENT_LOCK = null;
zooKeeper.close();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
@Override
public Condition newCondition() {
return null;
}
@Override
public void process(WatchedEvent watchedEvent) {
// 在这里边去处理监听事件
if (this.countDownLatch != null) {
// 去释放锁
this.countDownLatch.countDown();
}
}
}
package com.gupao.study.vip;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
/**
* <br>类说明 :
* 基于 Lock 来实现分布式锁,
*
* <br>属性说明:
* <br>作者:Darian
**/
public class DistributedLock implements Lock, Watcher {
// 首先定义一个 zookeeper 的连接
private ZooKeeper zooKeeper = null;
// 定义根节点
private String ROOT_LOCK = "/locks";
// 等待前一个锁 ,得到监控的前一个锁的节点
private String WAIT_LOCK;
// 表示当前的锁
private String CURRENT_LOCK;
// 用来做处理
private CountDownLatch countDownLatch;
public DistributedLock() {
try {
zooKeeper = new ZooKeeper("192.168.136.128:2181",
4000, this);
// 判断根节点是否存在
Stat stat = zooKeeper.exists(ROOT_LOCK, false);
if (stat == null) {
// 当前的节点是不存在的
zooKeeper.create(ROOT_LOCK, "0".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
@Override
public void lock() {
if (this.tryLock()) {
// 如果获得锁成功
System.out.println(Thread.currentThread().getName()
+ "-->>"
+ CURRENT_LOCK
+ "获得锁成功");
return;
}
// 没有获得锁,继续等待获得锁
try {
waitForLock(WAIT_LOCK);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 这是一个持续阻塞去获得所得一个过程
**/
private boolean waitForLock(String prev) throws KeeperException, InterruptedException {
// 因为我要等待锁的话,我需要监听上一个锁的节点
// 通过 exists 去监听
// 监听当前节点的上一个节点
Stat stat = zooKeeper.exists(prev, true);
if (stat != null) {
// 表示上一个节点确实存在
System.out.println(Thread.currentThread().getName()
+ "-->>等待锁"
+ ROOT_LOCK
+ "/"
+ prev
+ "释放");
countDownLatch = new CountDownLatch(1);
countDownLatch.await();
System.out.println(Thread.currentThread().getName()
+ "-->>获得锁成功");
}
return true;
}
@Override
public void lockInterruptibly() throws InterruptedException {
}
@Override
public boolean tryLock() {
try {
// 创建临时有序节点
CURRENT_LOCK = zooKeeper.create(ROOT_LOCK + "/", "0".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(Thread.currentThread().getName()
+ "-->>"
+ CURRENT_LOCK
+ "尝试去竞争锁");
// 获取根节点下的所有子节点
List<String> childrens = zooKeeper.getChildren(ROOT_LOCK, false);
// 定义一个集合进行排序
SortedSet<String> sortedSet = new TreeSet<>();
for (String children : childrens) {
sortedSet.add(ROOT_LOCK + "/" + children);
}
// 获得所有子节点中最小的节点
String firstnode = sortedSet.first();
SortedSet<String> lessThenMe = ((TreeSet<String>) sortedSet).headSet(CURRENT_LOCK);
if (CURRENT_LOCK.equals(firstnode)) {
// 通过当前的节点和子节点中最小的节点进行比较,如果相等,标识获得锁成功
return true;
}
if (!lessThenMe.isEmpty()) {
// 如果没有比自己更小的节点
// 获得比当前节点更小的最后一个节点,设置给WAIT_LOCK
WAIT_LOCK = lessThenMe.last();
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return false;
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false;
}
@Override
public void unlock() {
// 释放锁
System.out.println(Thread.currentThread().getName()
+ "-->>释放锁"
+ CURRENT_LOCK);
try {
// 删除掉这个节点,version = -1 表示不管三七二十一都删掉
zooKeeper.delete(CURRENT_LOCK, -1);
CURRENT_LOCK = null;
zooKeeper.close();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
@Override
public Condition newCondition() {
return null;
}
@Override
public void process(WatchedEvent watchedEvent) {
// 在这里边去处理监听事件
if (this.countDownLatch != null) {
// 去释放锁
this.countDownLatch.countDown();
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
TestDistrubutedLock 测试类
package com.gupao.study.vip;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
/**
* <br>类说明 :
* <br>属性说明:
* <br>作者:Darian
**/
public class TestDistributedLock {
public static void main(String[] args) throws IOException {
final CountDownLatch countDownLatch = new CountDownLatch(10);
for (int i = 0; i < 10; i++) {
new Thread(()->{
try {
countDownLatch.await();
DistributedLock distributedLock = new DistributedLock();
// 获得锁
distributedLock.lock();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "Thread-"+i).start();
countDownLatch.countDown();
}
System.in.read();
}
}
package com.gupao.study.vip;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
/**
* <br>类说明 :
* <br>属性说明:
* <br>作者:Darian
**/
public class TestDistributedLock {
public static void main(String[] args) throws IOException {
final CountDownLatch countDownLatch = new CountDownLatch(10);
for (int i = 0; i < 10; i++) {
new Thread(()->{
try {
countDownLatch.await();
DistributedLock distributedLock = new DistributedLock();
// 获得锁
distributedLock.lock();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "Thread-"+i).start();
countDownLatch.countDown();
}
System.in.read();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
控制台
Thread-3-->>/locks/0000000033尝试去竞争锁
Thread-9-->>/locks/0000000030尝试去竞争锁
Thread-4-->>/locks/0000000034尝试去竞争锁
Thread-1-->>/locks/0000000031尝试去竞争锁
Thread-8-->>/locks/0000000036尝试去竞争锁
Thread-7-->>/locks/0000000032尝试去竞争锁
Thread-5-->>/locks/0000000035尝试去竞争锁
Thread-6-->>/locks/0000000039尝试去竞争锁
Thread-2-->>/locks/0000000038尝试去竞争锁
Thread-0-->>/locks/0000000037尝试去竞争锁
Thread-9-->>/locks/0000000030获得锁成功
Thread-4-->>等待锁/locks//locks/0000000033释放
Thread-8-->>等待锁/locks//locks/0000000035释放
Thread-2-->>等待锁/locks//locks/0000000037释放
Thread-5-->>等待锁/locks//locks/0000000034释放
Thread-3-->>等待锁/locks//locks/0000000032释放
Thread-7-->>等待锁/locks//locks/0000000031释放
Thread-0-->>等待锁/locks//locks/0000000036释放
Thread-1-->>等待锁/locks//locks/0000000030释放
Thread-6-->>等待锁/locks//locks/0000000038释放
Thread-3-->>/locks/0000000033尝试去竞争锁
Thread-9-->>/locks/0000000030尝试去竞争锁
Thread-4-->>/locks/0000000034尝试去竞争锁
Thread-1-->>/locks/0000000031尝试去竞争锁
Thread-8-->>/locks/0000000036尝试去竞争锁
Thread-7-->>/locks/0000000032尝试去竞争锁
Thread-5-->>/locks/0000000035尝试去竞争锁
Thread-6-->>/locks/0000000039尝试去竞争锁
Thread-2-->>/locks/0000000038尝试去竞争锁
Thread-0-->>/locks/0000000037尝试去竞争锁
Thread-9-->>/locks/0000000030获得锁成功
Thread-4-->>等待锁/locks//locks/0000000033释放
Thread-8-->>等待锁/locks//locks/0000000035释放
Thread-2-->>等待锁/locks//locks/0000000037释放
Thread-5-->>等待锁/locks//locks/0000000034释放
Thread-3-->>等待锁/locks//locks/0000000032释放
Thread-7-->>等待锁/locks//locks/0000000031释放
Thread-0-->>等待锁/locks//locks/0000000036释放
Thread-1-->>等待锁/locks//locks/0000000030释放
Thread-6-->>等待锁/locks//locks/0000000038释放
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Xshell
:2181(CONNECTED) 1] ls /locks
[0000000034, 0000000033, 0000000036, 0000000035, 0000000038, 0000000037, 0000000039, 0000000030, 0000000032, 0000000031]
[zk: localhost:2181(CONNECTED) 2] delete /locks/0000000030
[zk: localhost:2181(CONNECTED) 3] delete /locks/0000000031
[zk: localhost:2181(CONNECTED) 4] delete /locks/0000000032
[zk: localhost:2181(CONNECTED) 5]
:2181(CONNECTED) 1] ls /locks
[0000000034, 0000000033, 0000000036, 0000000035, 0000000038, 0000000037, 0000000039, 0000000030, 0000000032, 0000000031]
[zk: localhost:2181(CONNECTED) 2] delete /locks/0000000030
[zk: localhost:2181(CONNECTED) 3] delete /locks/0000000031
[zk: localhost:2181(CONNECTED) 4] delete /locks/0000000032
[zk: localhost:2181(CONNECTED) 5]
2
3
4
5
6
控制台
Thread-3-->>/locks/0000000033尝试去竞争锁
Thread-9-->>/locks/0000000030尝试去竞争锁
Thread-4-->>/locks/0000000034尝试去竞争锁
Thread-1-->>/locks/0000000031尝试去竞争锁
Thread-8-->>/locks/0000000036尝试去竞争锁
Thread-7-->>/locks/0000000032尝试去竞争锁
Thread-5-->>/locks/0000000035尝试去竞争锁
Thread-6-->>/locks/0000000039尝试去竞争锁
Thread-2-->>/locks/0000000038尝试去竞争锁
Thread-0-->>/locks/0000000037尝试去竞争锁
Thread-9-->>/locks/0000000030获得锁成功
Thread-4-->>等待锁/locks//locks/0000000033释放
Thread-8-->>等待锁/locks//locks/0000000035释放
Thread-2-->>等待锁/locks//locks/0000000037释放
Thread-5-->>等待锁/locks//locks/0000000034释放
Thread-3-->>等待锁/locks//locks/0000000032释放
Thread-7-->>等待锁/locks//locks/0000000031释放
Thread-0-->>等待锁/locks//locks/0000000036释放
Thread-1-->>等待锁/locks//locks/0000000030释放
Thread-6-->>等待锁/locks//locks/0000000038释放
Thread-1-->>获得锁成功
Thread-7-->>获得锁成功
Thread-3-->>获得锁成功
Thread-3-->>/locks/0000000033尝试去竞争锁
Thread-9-->>/locks/0000000030尝试去竞争锁
Thread-4-->>/locks/0000000034尝试去竞争锁
Thread-1-->>/locks/0000000031尝试去竞争锁
Thread-8-->>/locks/0000000036尝试去竞争锁
Thread-7-->>/locks/0000000032尝试去竞争锁
Thread-5-->>/locks/0000000035尝试去竞争锁
Thread-6-->>/locks/0000000039尝试去竞争锁
Thread-2-->>/locks/0000000038尝试去竞争锁
Thread-0-->>/locks/0000000037尝试去竞争锁
Thread-9-->>/locks/0000000030获得锁成功
Thread-4-->>等待锁/locks//locks/0000000033释放
Thread-8-->>等待锁/locks//locks/0000000035释放
Thread-2-->>等待锁/locks//locks/0000000037释放
Thread-5-->>等待锁/locks//locks/0000000034释放
Thread-3-->>等待锁/locks//locks/0000000032释放
Thread-7-->>等待锁/locks//locks/0000000031释放
Thread-0-->>等待锁/locks//locks/0000000036释放
Thread-1-->>等待锁/locks//locks/0000000030释放
Thread-6-->>等待锁/locks//locks/0000000038释放
Thread-1-->>获得锁成功
Thread-7-->>获得锁成功
Thread-3-->>获得锁成功
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
Curator 是对 zookeeper 应用场景的一个封装。
Maven:org.apache.curator:curator-recipes:4.0.0
包结构
|org.apache.curator.framework.recipes
|-atomic
|-barriers
|-cache
|-leader
|-locks
|-nodes
|-queue
|-shared
|org.apache.curator.framework.recipes
|-atomic
|-barriers
|-cache
|-leader
|-locks
|-nodes
|-queue
|-shared
2
3
4
5
6
7
8
9
这些包都是对 zookeeper 的 API 做的一个高度的封装。
我们就不需要去写很复杂的东西。
利用curator实现分布式锁
代码
package com.gupao.study.vip;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
/**
* <br>类说明 :
* <br>属性说明:
* <br>作者:Darian
**/
public class CuratorDemo {
public static void main(String[] args) {
/**
* 获取分布式锁,
* 是利用最小节点的特性去实现的
* InterProcessMutex 是个可重入锁
*
**/
CuratorFramework curatorFramework =
CuratorFrameworkFactory.builder().build();
InterProcessMutex interProcessMutex =
new InterProcessMutex(curatorFramework,"/locks");
try {
// 去获得锁
interProcessMutex.acquire();
} catch (Exception e) {
e.printStackTrace();
}
}
}
package com.gupao.study.vip;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
/**
* <br>类说明 :
* <br>属性说明:
* <br>作者:Darian
**/
public class CuratorDemo {
public static void main(String[] args) {
/**
* 获取分布式锁,
* 是利用最小节点的特性去实现的
* InterProcessMutex 是个可重入锁
*
**/
CuratorFramework curatorFramework =
CuratorFrameworkFactory.builder().build();
InterProcessMutex interProcessMutex =
new InterProcessMutex(curatorFramework,"/locks");
try {
// 去获得锁
interProcessMutex.acquire();
} catch (Exception e) {
e.printStackTrace();
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
Curator实现分布式锁原理分析
InterPorcessMutex.class
public InterProcessMutex(CuratorFramework client, String path) {
this(client, path, new StandardLockInternalsDriver());
}
public InterProcessMutex(CuratorFramework client, String path) {
this(client, path, new StandardLockInternalsDriver());
}
2
3
InterPorcessMutex.class
public InterProcessMutex(CuratorFramework client, String path, LockInternalsDriver driver) {
this(client, path, "lock-", 1, driver);
}
public InterProcessMutex(CuratorFramework client, String path, LockInternalsDriver driver) {
this(client, path, "lock-", 1, driver);
}
2
3
InterPorcessMutex.class
InterProcessMutex(CuratorFramework client, String path, String lockName, int maxLeases, LockInternalsDriver driver) {
this.threadData = Maps.newConcurrentMap();
this.basePath = PathUtils.validatePath(path);
this.internals = new LockInternals(client, driver, path, lockName, maxLeases);
}
InterProcessMutex(CuratorFramework client, String path, String lockName, int maxLeases, LockInternalsDriver driver) {
this.threadData = Maps.newConcurrentMap();
this.basePath = PathUtils.validatePath(path);
this.internals = new LockInternals(client, driver, path, lockName, maxLeases);
}
2
3
4
5
获得锁
// 去获得锁
interProcessMutex.acquire();
// 去获得锁
interProcessMutex.acquire();
2
3
4
5
InterProcessMutex
public void acquire() throws Exception {
if (!this.internalLock(-1L, (TimeUnit)null)) {
throw new IOException("Lost connection while trying to acquire lock: " + this.basePath);
}
}
public void acquire() throws Exception {
if (!this.internalLock(-1L, (TimeUnit)null)) {
throw new IOException("Lost connection while trying to acquire lock: " + this.basePath);
}
}
2
3
4
5
interprocessMutx
获得锁的核心逻辑
private boolean internalLock(long time, TimeUnit unit) throws Exception
{
/*
Note on concurrency: a given lockData instance
can be only acted on by a single thread so locking isn't necessary
*/
Thread currentThread = Thread.currentThread();
// 因可重入锁,所以先判断当前那线程是不是存在
// 如果是的话,表示当前这个线程已经获得锁了
LockData lockData = threadData.get(currentThread);
if ( lockData != null )
{
// re-entering
// 它获得锁的计数做一个递增就好了
lockData.lockCount.incrementAndGet();
return true;
}
//所以就不需要去再次获得锁,这是可重入锁的一个特性
// 这是去尝试获得所得代码
String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
if ( lockPath != null )
{
LockData newLockData = new LockData(currentThread, lockPath);
threadData.put(currentThread, newLockData);
return true;
}
return false;
}
private boolean internalLock(long time, TimeUnit unit) throws Exception
{
/*
Note on concurrency: a given lockData instance
can be only acted on by a single thread so locking isn't necessary
*/
Thread currentThread = Thread.currentThread();
// 因可重入锁,所以先判断当前那线程是不是存在
// 如果是的话,表示当前这个线程已经获得锁了
LockData lockData = threadData.get(currentThread);
if ( lockData != null )
{
// re-entering
// 它获得锁的计数做一个递增就好了
lockData.lockCount.incrementAndGet();
return true;
}
//所以就不需要去再次获得锁,这是可重入锁的一个特性
// 这是去尝试获得所得代码
String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
if ( lockPath != null )
{
LockData newLockData = new LockData(currentThread, lockPath);
threadData.put(currentThread, newLockData);
return true;
}
return false;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
LockInternals
这是去尝试获得锁的一个代码
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
{
final long startMillis = System.currentTimeMillis();
final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;
final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
int retryCount = 0;
String ourPath = null;
boolean hasTheLock = false;
boolean isDone = false;
while ( !isDone )
{
isDone = true;
try
{
// 去创建一个节点
ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
// 去判断这个节点是否可以获得锁
hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
}
catch ( KeeperException.NoNodeException e )
{
// gets thrown by StandardLockInternalsDriver when it can't find the lock node
// this can happen when the session expires, etc. So, if the retry allows, just try it all again
if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )
{
isDone = false;
}
else
{
throw e;
}
}
}
if ( hasTheLock )
{
return ourPath;
}
return null;
}
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
{
final long startMillis = System.currentTimeMillis();
final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;
final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
int retryCount = 0;
String ourPath = null;
boolean hasTheLock = false;
boolean isDone = false;
while ( !isDone )
{
isDone = true;
try
{
// 去创建一个节点
ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
// 去判断这个节点是否可以获得锁
hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
}
catch ( KeeperException.NoNodeException e )
{
// gets thrown by StandardLockInternalsDriver when it can't find the lock node
// this can happen when the session expires, etc. So, if the retry allows, just try it all again
if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )
{
isDone = false;
}
else
{
throw e;
}
}
}
if ( hasTheLock )
{
return ourPath;
}
return null;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
StandardLockInternalsDriver
@Override
public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception
{
String ourPath;
if ( lockNodeBytes != null )
{
// 创建的是临时有序节点
ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);
}
else
{
ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
}
return ourPath;
}
@Override
public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception
{
String ourPath;
if ( lockNodeBytes != null )
{
// 创建的是临时有序节点
ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);
}
else
{
ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
}
return ourPath;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
InterProcessMutex
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
{
// 它去做了一个循环处理,还设置了过期时间。
// 获得锁的一个机制,等待锁的一个时间,它都有一个处理
boolean haveTheLock = false;
boolean doDelete = false;
try
{
if ( revocable.get() != null )
{
client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
}
while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
// 判断当前的这个CuuatorFramework 是不是一个启动状态,是的话,死循化
// 去做一个处理,并且已经获得锁的话,就不会再做这个
{
// 得到排序过后的子节点
List<String> children = getSortedChildren();
String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
// 去判断节点是不是在这个节点里边最小的。
PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
if ( predicateResults.getsTheLock() )
{
// 获得了锁
haveTheLock = true;
}
else
{
// 没有获得锁
String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
synchronized(this)
{
try
{
// use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
client.getData().usingWatcher(watcher).forPath(previousSequencePath);
if ( millisToWait != null )
{
millisToWait -= (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
if ( millisToWait <= 0 )
{
doDelete = true; // timed out - delete our node
break;
}
wait(millisToWait);
}
else
{
wait();
}
}
catch ( KeeperException.NoNodeException e )
{
// it has been deleted (i.e. lock released). Try to acquire again
}
}
}
}
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
doDelete = true;
throw e;
}
finally
{
if ( doDelete )
{
deleteOurPath(ourPath);
}
}
return haveTheLock;
}
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
{
// 它去做了一个循环处理,还设置了过期时间。
// 获得锁的一个机制,等待锁的一个时间,它都有一个处理
boolean haveTheLock = false;
boolean doDelete = false;
try
{
if ( revocable.get() != null )
{
client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
}
while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
// 判断当前的这个CuuatorFramework 是不是一个启动状态,是的话,死循化
// 去做一个处理,并且已经获得锁的话,就不会再做这个
{
// 得到排序过后的子节点
List<String> children = getSortedChildren();
String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
// 去判断节点是不是在这个节点里边最小的。
PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
if ( predicateResults.getsTheLock() )
{
// 获得了锁
haveTheLock = true;
}
else
{
// 没有获得锁
String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
synchronized(this)
{
try
{
// use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
client.getData().usingWatcher(watcher).forPath(previousSequencePath);
if ( millisToWait != null )
{
millisToWait -= (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
if ( millisToWait <= 0 )
{
doDelete = true; // timed out - delete our node
break;
}
wait(millisToWait);
}
else
{
wait();
}
}
catch ( KeeperException.NoNodeException e )
{
// it has been deleted (i.e. lock released). Try to acquire again
}
}
}
}
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
doDelete = true;
throw e;
}
finally
{
if ( doDelete )
{
deleteOurPath(ourPath);
}
}
return haveTheLock;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
LockInternals
public static List<String> getSortedChildren(CuratorFramework client, String basePath, final String lockName, final LockInternalsSorter sorter) throws Exception
{
// 得到子节点
List<String> children = client.getChildren().forPath(basePath);
// 去做一个排序
List<String> sortedList = Lists.newArrayList(children);
Collections.sort
(
sortedList,
new Comparator<String>()
{
@Override
public int compare(String lhs, String rhs)
{
return sorter.fixForSorting(lhs, lockName).compareTo(sorter.fixForSorting(rhs, lockName));
}
}
);
return sortedList;
}
public static List<String> getSortedChildren(CuratorFramework client, String basePath, final String lockName, final LockInternalsSorter sorter) throws Exception
{
// 得到子节点
List<String> children = client.getChildren().forPath(basePath);
// 去做一个排序
List<String> sortedList = Lists.newArrayList(children);
Collections.sort
(
sortedList,
new Comparator<String>()
{
@Override
public int compare(String lhs, String rhs)
{
return sorter.fixForSorting(lhs, lockName).compareTo(sorter.fixForSorting(rhs, lockName));
}
}
);
return sortedList;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Curator 在监听上一个节点,手写的是根据路径去比较
StandardLockInternalsDriver
@Override
public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception
{
// 创建好的编号在 children 中的一个索引
int ourIndex = children.indexOf(sequenceNodeName);
validateOurIndex(sequenceNodeName, ourIndex);
// 如果当前的节点的索引值是所有节点里边的最小的,
// 那么就意味着获得锁成功,否则的话,得到一个需要去监控的节点。
boolean getsTheLock = ourIndex < maxLeases;
String pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);
return new PredicateResults(pathToWatch, getsTheLock);
}
@Override
public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception
{
// 创建好的编号在 children 中的一个索引
int ourIndex = children.indexOf(sequenceNodeName);
validateOurIndex(sequenceNodeName, ourIndex);
// 如果当前的节点的索引值是所有节点里边的最小的,
// 那么就意味着获得锁成功,否则的话,得到一个需要去监控的节点。
boolean getsTheLock = ourIndex < maxLeases;
String pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);
return new PredicateResults(pathToWatch, getsTheLock);
}
2
3
4
5
6
7
8
9
10
11
12
13
Curator也是利用最小节点的特性实现分布式锁
实现带注册中心的RPC框架
分布式架构现状:
注册中心引入
我们需要引入注册中心,来管理服务集群各个节点地址的维护。注册中心,能够管理地址,还能够动态地感知节点的上下线,感知宕机的变化。
分布式锁的实用性
实现了socket + 反射 实现 简单的 RPC
服务端控制台输出
log4j:WARN No appenders could be found for logger (org.apache.curator.utils.Compatibility).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
服务注册成功:/registrys/com.gupao.study.vip.RPC.IGPHello/127.0.0.1:8080
服务注册成功com.gupao.study.vip.RPC.IGPHello-->>127.0.0.1:8080
log4j:WARN No appenders could be found for logger (org.apache.curator.utils.Compatibility).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
服务注册成功:/registrys/com.gupao.study.vip.RPC.IGPHello/127.0.0.1:8080
服务注册成功com.gupao.study.vip.RPC.IGPHello-->>127.0.0.1:8080
2
3
4
5
客户端控制台输出
log4j:WARN No appenders could be found for logger (org.apache.curator.utils.Compatibility).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
interface com.gupao.study.vip.RPC.IGPHello
sun.misc.Launcher$AppClassLoader@18b4aac2
调用的方法的名字:public abstract java.lang.String com.gupao.study.vip.RPC.IGPHello.sayHello(java.lang.String)
要调用的接口的名字:com.gupao.study.vip.RPC.IGPHello
服务在zookeeper上的地址:/registrys/com.gupao.study.vip.RPC.IGPHello
创建一个新的连接
服务的IP: 127.0.0.1:8080
hello, Darian
log4j:WARN No appenders could be found for logger (org.apache.curator.utils.Compatibility).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
interface com.gupao.study.vip.RPC.IGPHello
sun.misc.Launcher$AppClassLoader@18b4aac2
调用的方法的名字:public abstract java.lang.String com.gupao.study.vip.RPC.IGPHello.sayHello(java.lang.String)
要调用的接口的名字:com.gupao.study.vip.RPC.IGPHello
服务在zookeeper上的地址:/registrys/com.gupao.study.vip.RPC.IGPHello
创建一个新的连接
服务的IP: 127.0.0.1:8080
hello, Darian
2
3
4
5
6
7
8
9
10
11
服务有多个版本怎么办?
控制台输出
interface com.gupao.study.vip.RPC.IGPHello
sun.misc.Launcher$AppClassLoader@18b4aac2
调用的方法的名字:public abstract java.lang.String com.gupao.study.vip.RPC.IGPHello.sayHello(java.lang.String)
要调用的接口的名字:com.gupao.study.vip.RPC.IGPHello
服务在zookeeper上的地址:/registrys/com.gupao.study.vip.RPC.IGPHello
创建一个新的连接
服务的IP: 127.0.0.1:8080
hello, Darian
=-----=版本二
interface com.gupao.study.vip.RPC.IGPHello
sun.misc.Launcher$AppClassLoader@18b4aac2
调用的方法的名字:public abstract java.lang.String com.gupao.study.vip.RPC.IGPHello.sayHello(java.lang.String)
要调用的接口的名字:com.gupao.study.vip.RPC.IGPHello
服务在zookeeper上的地址:/registrys/com.gupao.study.vip.RPC.IGPHello
创建一个新的连接
服务的IP: 127.0.0.1:8080
I'm version 2.0 ,hello, darian2
interface com.gupao.study.vip.RPC.IGPHello
sun.misc.Launcher$AppClassLoader@18b4aac2
调用的方法的名字:public abstract java.lang.String com.gupao.study.vip.RPC.IGPHello.sayHello(java.lang.String)
要调用的接口的名字:com.gupao.study.vip.RPC.IGPHello
服务在zookeeper上的地址:/registrys/com.gupao.study.vip.RPC.IGPHello
创建一个新的连接
服务的IP: 127.0.0.1:8080
hello, Darian
=-----=版本二
interface com.gupao.study.vip.RPC.IGPHello
sun.misc.Launcher$AppClassLoader@18b4aac2
调用的方法的名字:public abstract java.lang.String com.gupao.study.vip.RPC.IGPHello.sayHello(java.lang.String)
要调用的接口的名字:com.gupao.study.vip.RPC.IGPHello
服务在zookeeper上的地址:/registrys/com.gupao.study.vip.RPC.IGPHello
创建一个新的连接
服务的IP: 127.0.0.1:8080
I'm version 2.0 ,hello, darian2
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
实现的时序图:
zookeeper的扩容升级
zookeeper集群的扩容,比较麻烦,麻烦在于说,zookeeper有一个leader选举的机制。所以我们在一个本身的扩容的时候,会存在一个问题,我们需要去配置 zoo.cfg
。而且在zookeeper集群动态扩容的时候我们要了解它选举的过程,很多时候都是停机扩容,就是去对所有的位置做一个变更,再做一个停机,重启。
还有一个就是逐台重启,一个更改以后,去重启,就相当于挂掉,再重启的过程。