[TOC]

简介

常见的并发工具类有这几个:CountDownLatch,CyclicBarrier,Semaphore。除此之外,还有一个不常用的线程同步器类Exchanger。

CountDownLatch

CountDownLatch是一个同步计数器,当计数器等于0时,开始触发。

应用场景:

  • 让多个线程同时阻塞在某一位置, 等待信号到来,再同时继续执行,模拟并发场景;
  • 让单个线程等待,合并多个线程结果;

例子1:

image-20210212152635201

输出

image-20210212152709098

例子2:

image-20210212153137792

输出:

image-20210212153211891

源码分析

从countDown的源码中我们可以看到,CountDownLatch内部使用了共享模式;

public void countDown() {
sync.releaseShared(1);
}

在构造函数中创建了Sync实例;

public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}

Sync继承于AbstractQueuedSynchronizer,在构造方法中将计数器 通过setState方法设置下去了,最终给到了state变量。

private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;

Sync(int count) {
setState(count);
}

int getCount() {
return getState();
}

protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}

protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}

CountDownLatch有个注意事项,当计数器减到0后,便不可以再次使用,需要再次new一个CountDownLatch实例对象。若要强制使用,则需要修改源码,将state值进行重置。此外,有另一个类也实现了CountDownLatch类似的功能,并且是可以重用,那就是CyclicBarrier。

CyclicBarrier

利用CyclicBarrier类可以实现一组线程相互等待,当所有线程都到达某个屏障点后再进行后续的操作。

例子:

image-20210212160846010

输出:

image-20210212160910543

通过例子,可以看出,CyclicBarrier的效果是和CountDownLatch一致的。

源码分析

public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;

if (g.broken)
throw new BrokenBarrierException();

if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}

int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}

// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}

if (g.broken)
throw new BrokenBarrierException();

if (g != generation)
return index;

if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}

执行wait操作时,会将count值进行自减一次,当减到0时,进行触发。

public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}

观察其构造函数,可以发现,barrierAction是一个runnable对象,最终在触发的时候被调用了run方法,并没有交给子线程去做,说明还是在主线程中做的事情。

通过观察,我们发现在触发后执行了nextGeneration()方法,一探究竟,发现count值被复原了,这就是为什么CyclicBarrier可以重用的原因了。

private void nextGeneration() {
// signal completion of last generation
trip.signalAll();
// set up next generation
count = parties;
generation = new Generation();
}

除了CyclicBarrier之外,还有一个和CyclicBarrier类似,且计数可变,那就是Phaser,后面介绍。

Semaphore

Semaphore 信号量,用于控制在一段时间内,可并发访问执行的线程数量。控制访问特定资源的线程数目,底层依赖AQS的State。Semaphore 在计数器不为 0 的时候对线程就放行,当为0时,所有请求将被阻塞。

Semaphore 有两种模式,公平模式非公平模式 ,默认是非公平模式。

  • 公平模式就是调用 acquire 的顺序就是获取许可证的顺序,遵循 FIFO;
  • 非公平模式是抢占式的;

Semaphore的应用场景

可以用来做对公共资源的流量限制,如数据库连接。

释放公共资源许可的时候,有两种方法,一个是release,直接释放;另一个是reducePermits,释放并减少总的许可数量。

例子:

image-20210212163929797

源码分析

默认采用非公平锁;

public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;

NonfairSync(int permits) {
super(permits);
}

protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}

静态内部类Sync继承AbstractQueuedSynchronizer;setState和getState对许可数进行设置和访问;

abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;

Sync(int permits) {
setState(permits);
}

final int getPermits() {
return getState();
}
...

reducePermits方法

reducePermits可以动态控制总的许可证数量;

final void reducePermits(int reductions) {
for (;;) {
int current = getState();
int next = current - reductions;
if (next > current) // underflow
throw new Error("Permit count underflow");
if (compareAndSetState(current, next))
return;
}
}

Exchanger

Exchanger它提供一个同步点,在这个同步点两个线程可以交换彼此的数据。

想要两个线程之间进行数据交换,必然存在一个交换的时间点,在代码中用exchange来标记交互数据的位置。当两个线程均执行到exchange点时,便开启线程间数据的交换。“交换”二字也客观说明了执行该动作的是至少是两个线程,成对出现。若一个线程到达交换的时间点,而另一个线程并未到达,则该线程进行等待另一个线程。

其应用场景主要有:遗传算法、多线程数据校对等。

例子:

image-20210212041536863

输出:

image-20210212041847191

注意事项,在多个线程中(大于两个)使用同一个exchange,导致的结果随机选择到达交换时间点的线程进行信息交换,主要影响因素是CPU的线程调度;

源码解析

public Exchanger() {
participant = new Participant();
}
static final class Participant extends ThreadLocal<Node> {
public Node initialValue() { return new Node(); }
}

exchange方法

public V exchange(V x, long timeout, TimeUnit unit)
throws InterruptedException, TimeoutException {
Object v;
Object item = (x == null) ? NULL_ITEM : x;
long ns = unit.toNanos(timeout);
if ((arena != null ||
(v = slotExchange(item, true, ns)) == null) &&
((Thread.interrupted() ||
(v = arenaExchange(item, true, ns)) == null)))
throw new InterruptedException();
if (v == TIMED_OUT)
throw new TimeoutException();
return (v == NULL_ITEM) ? null : (V)v;
}

slotExchange方法

private final Object slotExchange(Object item, boolean timed, long ns) {
Node p = participant.get();
Thread t = Thread.currentThread();
if (t.isInterrupted()) // preserve interrupt status so caller can recheck
return null;

for (Node q;;) {
if ((q = slot) != null) {
//进行自旋
if (U.compareAndSwapObject(this, SLOT, q, null)) {
Object v = q.item;
q.match = item;
Thread w = q.parked;
if (w != null)
U.unpark(w);
return v;
}
// create arena on contention, but continue until slot null
if (NCPU > 1 && bound == 0 &&
U.compareAndSwapInt(this, BOUND, 0, SEQ))
arena = new Node[(FULL + 2) << ASHIFT];
}
else if (arena != null)
return null; // caller must reroute to arenaExchange
else {
p.item = item;
if (U.compareAndSwapObject(this, SLOT, null, p))
break;
p.item = null;
}
}

// await release
int h = p.hash;
long end = timed ? System.nanoTime() + ns : 0L;
int spins = (NCPU > 1) ? SPINS : 1;
Object v;
while ((v = p.match) == null) {
if (spins > 0) {
h ^= h << 1; h ^= h >>> 3; h ^= h << 10;
if (h == 0)
h = SPINS | (int)t.getId();
else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)
Thread.yield();
}
else if (slot != p)
spins = SPINS;
else if (!t.isInterrupted() && arena == null &&
(!timed || (ns = end - System.nanoTime()) > 0L)) {
U.putObject(t, BLOCKER, this);
p.parked = t;
if (slot == p)
U.park(false, ns);
p.parked = null;
U.putObject(t, BLOCKER, null);
}
else if (U.compareAndSwapObject(this, SLOT, p, null)) {
v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;
break;
}
}
U.putOrderedObject(p, MATCH, null);
p.item = null;
p.hash = h;
return v;
}

第一个线程到达交换时间点后,进行自选操作,等待另一个线程进行值交换;如果等待时间超时了,那么抛出超时的中断异常。

总之,每个工具类都有自己的优势和劣势,在实际开发场景中,还应根据它们的优缺点进行合适的选择。