在 java 中,concurrentlinkedqueue
、arrayblockingqueue
、linkedblockingqueue
等并发队列通常用于生产者-消费者模式。这些队列使用锁和条件(condition
)来实现线程间的通信。
以下是基于 arrayblockingqueue
或 linkedblockingqueue
的生产者-消费者模式中,消费者如何知道队列中有元素可用的基本原理:
-
锁(lock): 队列内部维护了一个锁,通常是
reentrantlock
。这个锁用于同步对队列的访问,确保在任何时刻只有一个线程可以修改队列。 -
条件(condition): 锁关联了两个条件,通常称为
notempty
和notfull
。notempty
条件用于通知消费者队列非空,而notfull
条件用于通知生产者队列未满。 -
消费者等待: 当队列为空时,消费者线程会调用条件
notempty
的await()
方法。这将导致消费者线程释放锁并等待,直到另一个线程(生产者)在队列中插入一个元素并调用signal()
或signalall()
方法来唤醒等待的消费者线程。 -
生产者通知: 当生产者向队列中添加一个元素时,它会调用
notempty
条件的signal()
方法来唤醒一个(或所有)等待的消费者线程。
以下是一个简化的例子:
import java.util.concurrent.locks.condition;
import java.util.concurrent.locks.lock;
import java.util.concurrent.locks.reentrantlock;
public class boundedbuffer {
final lock lock = new reentrantlock();
final condition notempty = lock.newcondition();
final condition notfull = lock.newcondition();
final object[] items = new object[100]; // 假设缓冲区大小为100
int putptr, takeptr, count;
public void put(object x) throws interruptedexception {
lock.lock();
try {
while (count == items.length) // 如果队列已满,则等待
notfull.await();
items[putptr] = x; // 在这里插入元素
if (++putptr == items.length) putptr = 0;
++count;
notempty.signal(); // 通知消费者队列非空
} finally {
lock.unlock();
}
}
public object take() throws interruptedexception {
lock.lock();
try {
while (count == 0) // 如果队列为空,则等待
notempty.await();
object x = items[takeptr]; // 在这里取出元素
if (++takeptr == items.length) takeptr = 0;
--count;
notfull.signal(); // 通知生产者队列未满
return x;
} finally {
lock.unlock();
}
}
}
在这个例子中,消费者在 notempty
条件上等待,而生产者在 notfull
条件上等待。当队列状态改变时(例如,生产者添加了一个元素或消费者取出一个元素),相应的条件会被信号唤醒,这样等待的线程就可以重新获取锁并继续执行。
发表评论