方式一

通过使用CountDownLatch 来模拟并发场景,用synchronized来进行多线程间的同步,保障线程安全。

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;

public class Main1 {
static int count = 0;

public static void main(String[] args) {
final int full = 5;
final int threadNum = 6;
final Object lock = new Object();
final LinkedBlockingQueue<String> link = new LinkedBlockingQueue<>();
final CountDownLatch countDownLatch = new CountDownLatch(1);
Runnable product = new Runnable() {
@Override
public void run() {
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (lock) {
while (link.size() == full) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
link.add(Thread.currentThread().getName() + " product: " + count);
count++;
lock.notifyAll();
}
}
};

Runnable consumer = new Runnable() {
@Override
public void run() {
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (lock) {
while (link.size() == 0) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + " consumer: " + link.poll());
lock.notifyAll();
}
}
};

for (int i = 0; i < threadNum; i++) {
Thread pThread = new Thread(product);
pThread.start();
Thread cThread = new Thread(consumer);
cThread.start();
}
countDownLatch.countDown();
}
}

输出

image-20210306193021365

方式二

采用重入锁(ReentrantLock)机制来实现,使用了两个Condition,分别控制。

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class Main2 {
static int count = 0;

public static void main(String[] args) {
final int full = 5;
final int threadNum = 6;
final ReentrantLock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();

final LinkedBlockingQueue<String> link = new LinkedBlockingQueue<>();
final CountDownLatch countDownLatch = new CountDownLatch(1);
Runnable product = new Runnable() {
@Override
public void run() {
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
lock.lock();
try {
while (link.size() == full) {
try {
notFull.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
link.add(Thread.currentThread().getName() + " product: " + count);
count++;
notEmpty.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
};

Runnable consumer = new Runnable() {
@Override
public void run() {
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
lock.lock();
try {
while (link.size() == 0) {
notEmpty.await();
}
System.out.println(Thread.currentThread().getName() + " consumer: " + link.poll());
notFull.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
};

for (int i = 0; i < threadNum; i++) {
Thread pThread = new Thread(product);
pThread.start();
Thread cThread = new Thread(consumer);
cThread.start();
}
countDownLatch.countDown();
}

}

输出

image-20210306193529183

当然除了以上两种方式之外,还可以用Semaphore 共享锁机制来实现以及阻塞队列的take方法,感兴趣的可以去实现一下。