在 java 中,阻塞队列(blockingqueue) 是一种线程安全的队列结构,用于实现生产者-消费者模式。它的核心特性是在队列为空时阻塞消费者线程,在队列满时阻塞生产者线程,从而自动协调线程之间的协作。
阻塞队列的核心特性
- 线程安全:所有操作(如
put
、take
)都是线程安全的。 - 阻塞操作:
- 队列满时:生产者线程阻塞(等待消费者消费)。
- 队列空时:消费者线程阻塞(等待生产者生产)。
- 超时控制:支持带超时时间的操作(如
offer(timeout, unit)
和poll(timeout, unit)
)。 - 公平策略:部分实现(如
arrayblockingqueue
)支持公平锁,防止线程饥饿。
java 中的 7 种阻塞队列
以下是 java 提供的 7 种阻塞队列及其特点和适用场景:
1. arrayblockingqueue(有界队列)
- 特点:
- 基于数组实现,容量固定(初始化时指定)。
- 支持公平锁(默认非公平锁)。
- 适用场景:
- 需要严格限制队列容量的场景(如任务量可控的线程池)。
- 示例代码:
blockingqueue<string> queue = new arrayblockingqueue<>(3); queue.put("a"); // 队列满时阻塞 string task = queue.take(); // 队列空时阻塞
2. linkedblockingqueue(有界/无界队列)
- 特点:
- 基于链表实现,默认容量为
integer.max_value
(可视为无界)。 - 入队和出队使用独立锁(
putlock
和takelock
),提高并发性能。
- 基于链表实现,默认容量为
- 适用场景:
- 任务量不可预测的高吞吐量场景(如线程池默认队列)。
- 示例代码:
blockingqueue<integer> queue = new linkedblockingqueue<>(100); // 指定容量 queue.put(1); // 队列满时阻塞 integer task = queue.take(); // 队列空时阻塞
3. priorityblockingqueue(无界优先级队列)
- 特点:
- 无界队列,基于优先级堆实现。
- 元素必须实现
comparable
接口或提供比较器。
- 适用场景:
- 需要按优先级处理任务(如紧急任务优先)。
- 示例代码:
blockingqueue<prioritytask> queue = new priorityblockingqueue<>(); queue.put(new prioritytask(1)); // 任务优先级由 compareto() 决定 prioritytask task = queue.take();
4. delayqueue(延迟队列)
- 特点:
- 无界队列,元素必须实现
delayed
接口。 - 只有在延迟时间到达后,任务才能被取出。
- 无界队列,元素必须实现
- 适用场景:
- 定时任务(如缓存过期、订单超时)。
- 示例代码:
delayqueue<delayedtask> queue = new delayqueue<>(); queue.put(new delayedtask(5000)); // 5 秒后可用 delayedtask task = queue.take(); // 等待任务延迟时间到期
5. synchronousqueue(同步队列)
- 特点:
- 无容量队列,每个插入操作必须等待一个对应的移除操作。
- 生产者和消费者直接传递数据。
- 适用场景:
- 高吞吐量的短任务场景(如
newcachedthreadpool
)。
- 高吞吐量的短任务场景(如
- 示例代码:
blockingqueue<string> queue = new synchronousqueue<>(); queue.put("x"); // 阻塞直到有消费者调用 take() string task = queue.take(); // 阻塞直到有生产者调用 put()
6. linkedtransferqueue(无界队列)
- 特点:
- 支持“预占模式”(生产者和消费者直接交互)。
- 高性能的无界队列。
- 适用场景:
- 需要高效传递任务的场景(如快速响应的系统)。
- 示例代码:
blockingqueue<integer> queue = new linkedtransferqueue<>(); queue.put(100); // 直接传递给消费者 integer task = queue.take(); // 立即获取任务
7. linkedblockingdeque(双向阻塞队列)
- 特点:
- 双端队列,支持从两端插入/移除元素。
- 可作为有界或无界队列。
- 适用场景:
- 工作窃取算法(如
forkjoinpool
)。
- 工作窃取算法(如
- 示例代码:
blockingqueue<string> queue = new linkedblockingdeque<>(100); queue.put("a"); // 从尾部插入 string task = queue.take(); // 从头部移除
阻塞队列的核心方法
方法 | 行为 | 抛出异常 | 返回布尔值 | 阻塞 | 超时阻塞 |
---|---|---|---|---|---|
add(e e) | 插入元素 | 队列满时抛异常 | - | 否 | 否 |
offer(e e) | 插入元素 | 队列满时返回 false | - | 否 | 否 |
offer(e e, long timeout, timeunit unit) | 插入元素 | 超时后返回 false | - | 是 | 是 |
put(e e) | 插入元素 | - | - | 是 | 否 |
remove() | 移除元素 | 队列空时抛异常 | - | 否 | 否 |
poll() | 移除元素 | 队列空时返回 null | - | 否 | 否 |
poll(long timeout, timeunit unit) | 移除元素 | 超时后返回 null | - | 是 | 是 |
take() | 移除元素 | - | - | 是 | 否 |
如何选择阻塞队列?
- 有界 vs 无界:
- 有界队列(如 arrayblockingqueue):防止内存溢出,需合理设置容量。
- 无界队列(如 linkedblockingqueue):可能导致任务堆积,需结合拒绝策略使用。
- 优先级需求:
- 使用 priorityblockingqueue 实现优先级排序。
- 延迟任务:
- 使用 delayqueue 实现定时或延迟执行。
- 高性能场景:
- synchronousqueue 适合高吞吐量的短任务。
- linkedtransferqueue 适合快速传递任务的场景。
示例:使用阻塞队列实现生产者-消费者模型
import java.util.concurrent.*; public class producerconsumerexample { public static void main(string[] args) { blockingqueue<string> queue = new linkedblockingqueue<>(10); // 生产者线程 thread producer = new thread(() -> { try { for (int i = 0; i < 10; i++) { string task = "task-" + i; queue.put(task); // 队列满时阻塞 system.out.println("produced: " + task); thread.sleep(100); } } catch (interruptedexception e) { thread.currentthread().interrupt(); } }); // 消费者线程 thread consumer = new thread(() -> { try { while (true) { string task = queue.take(); // 队列空时阻塞 system.out.println("consumed: " + task); thread.sleep(200); } } catch (interruptedexception e) { thread.currentthread().interrupt(); } }); producer.start(); consumer.start(); } }
注意事项
- 避免内存泄漏:
- 使用无界队列时,需结合线程池的拒绝策略(如
callerrunspolicy
)。
- 使用无界队列时,需结合线程池的拒绝策略(如
- 公平性:
arrayblockingqueue
支持公平锁(构造函数传入true
)。
- 线程中断:
- 阻塞操作(如
put
/take
)会响应中断,需捕获interruptedexception
。
- 阻塞操作(如
通过合理选择阻塞队列类型,可以高效地实现线程间的协作,解决生产者-消费者问题。
到此这篇关于java 阻塞队列的7种类型小结的文章就介绍到这了,更多相关java 阻塞队列内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论