java.util.concurrent的工具

本篇主要继续盘点 java.util.concurrent 中的工具包

ReentrantLock/Semaphore/CountDownLatch/CyclicBarrier/Phaser/Exchanger

如上一篇文章中写到 ReentrantLock 提供了重入锁的调用, 通过 lock()/unlock() 实现加锁/解锁的操作. 例如

class LockTest implements Runnable {

ReentrantLock lock;

public LockTest(ReentrantLock lock) {

this.lock = lock;

}

@Override

public void run(){

try{

lock.lock();

//do something

}finally{

lock.unlock();

}

}

}

/* * * * * *

ReentrantLock lock = new ReentrantLock();

Thread t1 = new Thread(new LockTest(lock));

Thread t2 = new Thread(new LockTest(lock));

t1.start();

t2.start();

Semaphore

Semaphore 意思为旗语, 一般翻译为信号量, 完成资源的并发控制. 多个线程共享一个信号量变量, 通过 acquire() 竞争给定参数的资源, 通过 release() 释放. 非重入锁可以理解为参数为 1 的信号量.

CountDonwLatch

CountDownLatch 可以实现在完成一组其他线程中执行的操作之前, 它允许线程一直等待. 通过 CountDonwLatch(int args) 进行初始化, 并且传递到线程中, 使用 countDown() 方法使计数器减一, 使用 await() 方法产生等待. 在计数器归零之后继续执行. 使用 getCount() 方法可以获取计数器当前数值.

CountDonwLatch 只能进入一次, 被清零之后便无法继续使用. 在源码层, CountDonwLatch 最开始初始一个 state 为 args 的 SHARED 模式的 AQS, 每一次 countDown() 便进行一次 release, 每一次 await() 便执行一次 tryAcquire(), 在 countDown 清零时, 便会以 SHARED 方式 获取执行权

CyclicBarrier

CyclicBarrier 允许两个或者多个线程在某一个点进行同步. 与 CountDonwLatch 曲边在于, 线程在抵达指定点后, 线程会睡眠等待其他所有线程到达. 当最后一个线程抵达时, 所有线程会被唤醒. CyclicBarrier 可以传入一个 Runnable 对象进行初始化, 使其支持分治算法.

CyclicBarrier 与 CountDownLatch 区别还在于, CyclicBarrier 可以被重置其初始化状态.

CyclicBarrier 内部使用了ReentrantLock, 在 await() 中调用 dowait() 方法

private int dowait(boolean timed, long nanos)

throws InterruptedException, BrokenBarrierException,

TimeoutException {

final ReentrantLock lock = this.lock;

lock.lock();

try {

final Generation g = generation;

if (g.broken)

throw new BrokenBarrierException();

if (Thread.interrupted()) {

breakBarrier();

throw new InterruptedException();

}

int index = --count;

if (index == 0) { // tripped

boolean ranAction = false;

try {

final Runnable command = barrierCommand;

if (command != null)

command.run();

ranAction = true;

nextGeneration();//signalAll()

return 0;

} finally {

if (!ranAction)

breakBarrier(); //

}

}

// loop until tripped, broken, interrupted, or timed out

for (;;) {

try {

if (!timed)

trip.await();//挂起, 并最终释放锁

else if (nanos > 0L)

nanos = trip.awaitNanos(nanos);

} catch (InterruptedException ie) {

if (g == generation && ! g.broken) {

breakBarrier();

throw ie;

} else {

// We're about to finish waiting even if we had not

// been interrupted, so this interrupt is deemed to

// "belong" to subsequent execution.

Thread.currentThread().interrupt();

}

}

if (g.broken)

throw new BrokenBarrierException();

if (g != generation)

return index;

if (timed && nanos <= 0L) {

breakBarrier();

throw new TimeoutException();

}

}

} finally {

lock.unlock();

}

}

以上代码逻辑比较简单, 主要以检测是否broken, 否则抛出异常, 是否所有线程都调用 await(), 如果不是则挂起并释放锁, 否则会通知所有挂起的线程.

Phaser

Phaser 与 CyclicBarrier 类似, 但是允许线程执行多阶段的方法, Phaser 允许每一步结束时对线程进行同步, 当所有线程都完成了这一步允许执行下一步.

Phaser 有两个变量: party/phase 分别描述的是最开始注册的线程数, 与共的阶段数.

Phaser 有以下几个方法:

arrive() 在当前的 phase 到达并注册, 不等待其他 parties 到来. 如果没有可用 register (也就是这个阶段没有线程等待的位置了), 抛出异常. 方法返回 phase 的值, 如果 Phaser 已经完结则返回负数.

arriveAndDeregister() 到达并注销一个 register, 会使party数减一. (可以理解为这个线程到这里就撂挑子了不打算继续往前了, ヽ(●-`Д´-)ノ) 接下来的 phase register 会少一个.

awaitAdvance(int phase) 将会在指定 phase 的位置等待其他所有 parties 都到达, 如果指定的 phase 与当前 phase 不一致, 则会立即返回.

arriveAndAwaitAdvance() 相当于awaitAdvance(arrive()), 到达, 注册, 并等待.

除此之外还有 Interruptibly 版本的方法, 是可以抛出 InterruptedException 的.

Phaser 的源码较为复杂, 也是通过 UNSAFE 包的 CAS 操作, 对自定义的内部变量 state 进行修改, 篇幅所限不多赘述.

Exchanger

Exchanger 允许并发任务之间交换数据. Exchanger 通过在两个线程之间定义了同步点, 当两个线程都到达同步点时, 可以交换数据结构.

Exchanger 初始化时需要指定容器类型, 例如

Exchanger> exchanger = new Exchanger>();

接下来调用 exchange(List args) 方法, 如果已经调用过, 则产生交换, 否则则会阻塞直到第二次调用.

dual stack 可以实现exchange, dual stack 的栈顶元素有两个 slot. 发生 exchange 时, 如果栈顶的 slot 均为空, 则将待交换的数据结构 CAS 放入第一个 slot, 并自旋等待第二个 slot 非空.

此时如果第二个待交换的数据结构发现栈顶, 则获取栈顶, 获取第一个slot的元素(作为返回值), 并且将数据结构 CAS 塞入第二个 slot, 最后 CAS 更新栈顶为null.

此时前一个线程检测到第二个 slot 非空, 则获取第二个 slot 并返回. 在 Java 5 中如此实现:

Object exchange(Object x, boolean tiemd, long patience)throws TimeoutException {

boolean success = false;

long start = System.nanoTime();

Node mine = new Node(x);

for(;;){

Node top = stack.getTop();

if(top == null){

if(stack.casTop(null, mine)){

while(null == mine.hole){

if(timedOut(start, time, patience)) {

if(mine.casHole(null, FAIL))

throw new TimeoutException;

break;

}

/* else spin */

} 

return mine.hole.item;

}

else{

success = top.casHole(null, mine);

stack.casTop(top, null);

if(success)

return top.item;

}

}

}

}

在更高版本的 Java 中, 为了更好的多线程的特性, 将 Stack/Nodes 升级为 Arena/Slot, 方便更多线程同时产生交换的情景. 具体的代码涉及的情况较多, slot 进行了 cache line 填充, 防止出现伪共享的情况.

假设一个类的两个相互独立的属性a和b在内存地址上是连续的(比如FIFO队列的头尾指针),那么它们通常会被加载到相同的cpu cache line里面。并发情况下,如果一个线程修改了a,会导致整个cache line失效(包括b),这时另一个线程来读b,就需要从内存里再次加载了,这种多线程频繁修改ab的情况下,虽然a和b看似独立,但它们会互相干扰,非常影响性能。

详细代码篇幅所限不赘述, 利用了锁分离的思想, 同ConcurrentHashMap类型, Exchange没有只定义一个slot, 而是定义了一个slot的数组. 这样在多线程调用exchange的时候, 可以各自在不同的slot里面进行匹配.

exchange的基本思路如下:

i. 根据每个线程的thread id, hash计算出自己所在的slot index

ii. 如果运气好, 这个slot被人占着(slot里面有node) 并且有人正在等待交换,那就和它进行交换

iii. slot为空的(slot里面没有node), 自己占着,等人交换. 没人交换, 向前挪个位置, 把当前slot里面内容取消, index减半, 再看有没有交换

iv. 挪到0这个位置, 还没有人交互, 那就阻塞, 一直等着. 别的线程, 也会一直挪动, 直到0这个位置.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.