bboyjing's blog

Zookeeper学习笔记八【Curator典型使用场景(二)】

分布式锁

在分布式环境中,为了保证数据的一致性,经常在程序的某个运行点(例如,减库存操作或流水号生成等)需要进行同步控制。以流水号生成为例,普通的后台应用通常都是使用时间戳方式来生成,但是在用户并发量非常大的情况下,可能会出现并发问题,下面来演示下如何出现以及解决问题。

时间戳生成的并发问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class RecipesNoLock {
public static void main(String[] args) throws Exception {
final CountDownLatch down = new CountDownLatch(1);
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
down.await();
} catch (Exception e) {
}
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss|SSS");
String orderNo = sdf.format(new Date());
System.err.println("生成的订单号是 : " + orderNo);
}).start();
}
down.countDown();
}
}

从输出结果可以看出订单号有很多的重复。

使用Curator实现分布式锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
//使用Curator实现分布式锁功能
public class RecipesLock {
static String lock_path = "/curator_recipes_lock_path";
static CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("localhost:2181")
.retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
public static void main(String[] args) throws Exception {
client.start();
final InterProcessMutex lock = new InterProcessMutex(client, lock_path);
final CountDownLatch down = new CountDownLatch(1);
for (int i = 0; i < 30; i++) {
new Thread(() -> {
try {
down.await();
//分布式锁:锁节点
lock.acquire();
} catch (Exception e) {
}
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss|SSS");
String orderNo = sdf.format(new Date());
System.out.println("生成的订单号是 : " + orderNo);
try {
//分布式锁:释放节点
lock.release();
} catch (Exception e) {
}
}).start();
}
down.countDown();
}
}

分布式计数器

有了上述分布式锁的基础之后,我们就很容易基于其实现一个分布式计数器,基于Zookeeper的分布式计数器的实现思路叶非常简单:指定一个Zookeeper节点作为计数器,多个应用实例在分布式锁的控制下,通过更新该数据节点的内容来实现技术功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 使用Curator实现分布式计数器
public class RecipesDistAtomicInt {
static String distatomicint_path = "/curator_recipes_distatomicint_path";
static CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("localhost:2181")
.retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
public static void main(String[] args) throws Exception {
client.start();
//DistributedAtomicInteger封装了具体逻辑
DistributedAtomicInteger atomicInteger =
new DistributedAtomicInteger(client, distatomicint_path, new RetryNTimes(3, 1000));
AtomicValue<Integer> rc = atomicInteger.add(8);
System.out.println("Result: " + rc.succeeded());
}
}

分布式Barrier

Barrier时一种用来控制多线程之间同步的经典方式,JDK中也自带了CyclicBarrier实现。下面分别看下JDK的实现和Zookeeper的实现。

JDK自带的CyclicBarrier实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//单个JVM中可以使用CyclicBarrier处理此类的线程同步问题
public class JDKCyclicBarrier {
public static CyclicBarrier barrier = new CyclicBarrier( 3 );
public static void main( String[] args ) throws IOException, InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool( 3 );
executor.submit( new Thread( new Runner( "1号选手" ) ) );
executor.submit( new Thread( new Runner( "2号选手" ) ) );
executor.submit( new Thread( new Runner( "3号选手" ) ) );
executor.shutdown();
}
}
class Runner implements Runnable {
private String name;
public Runner( String name ) {
this.name = name;
}
public void run() {
System.out.println( name + " 准备好了." );
try {
JDKCyclicBarrier.barrier.await();
} catch ( Exception e ) {}
System.out.println( name + " 起跑!" );
}
}

分布式Barrier实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class RecipesBarrier {
static String barrierPath = "/curator_recipes_barrier_path";
public static void main(String[] args) throws Exception {
for (int i = 0; i < 5; i++) {
new Thread(() -> {
try {
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("localhost:2181")
.retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
client.start();
DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(client,barrierPath,5);
Thread.sleep(Math.round(Math.random() * 3000));
//enter方法进入等待
System.out.println(Thread.currentThread().getName() + "号进入barrier");
barrier.enter();
//当等待进入的成员达到预期数量后,同时启动
System.out.println("启动...");
Thread.sleep(Math.round(Math.random() * 3000));
//leave再次同时进入等待退出状态
barrier.leave();
//当等待进入的成员达到预期数量后,同时退出
System.out.println("退出...");
} catch (Exception e) {
}
}).start();
}
}
}