同步器
java 并发包中的同步器是一些用于协调多个线程执行的工具,用于实现线程之间的同步和互斥操作。这些同步器提供了不同的机制来控制线程的访问和执行顺序,以实现线程安全和并发控制。
1、semaphore(信号量)
- semaphore 是 java 并发包中的同步器之一,用于控制对临界区资源的访问数量。它允许多个线程同时访问临界区资源,但限制了同一时间内可以访问资源的线程数量。
- semaphore 维护一个许可证计数,线程可以获取和释放这些许可证。当许可证数量为零时,线程需要等待,直到其他线程释放许可证。
semaphore 基本用法
import java.util.concurrent.semaphore; public class semaphoreexample { public static void main(string[] args) { semaphore semaphore = new semaphore(3); // 初始化信号量,允许同时访问的线程数量为3 // 创建多个线程来模拟访问临界区资源 for (int i = 1; i <= 5; i++) { int threadid = i; thread thread = new thread(() -> { try { semaphore.acquire(); // 获取许可证,如果没有许可证则阻塞 system.out.println("thread " + threadid + " acquired a permit and is accessing the resource."); thread.sleep(2000); // 模拟访问临界区资源的耗时操作 } catch (interruptedexception e) { e.printstacktrace(); } finally { semaphore.release(); // 释放许可证 system.out.println("thread " + threadid + " released the permit."); } }); thread.start(); } } }
运行结果:
thread 1 acquired a permit and is accessing the resource.
thread 3 acquired a permit and is accessing the resource.
thread 2 acquired a permit and is accessing the resource.
thread 2 released the permit.
thread 4 acquired a permit and is accessing the resource.
thread 5 acquired a permit and is accessing the resource.
thread 3 released the permit.
thread 1 released the permit.
thread 4 released the permit.
thread 5 released the permit.
在上述示例中,我们创建了一个 semaphore 实例,并初始化许可证数量为 3。然后创建了多个线程,每个线程在获取许可证后访问临界区资源,模拟耗时操作后释放许可证。由于许可证数量有限,只有一部分线程能够同时访问资源,其他线程需要等待。
semaphore 适用场景
- 有限资源的并发访问,如数据库连接池、线程池等。
- 控制对某个资源的同时访问数量,以避免资源竞争和过度消耗。
2、countdownlatch
countdownlatch 是 java 并发包中的同步器之一,用于实现一种等待机制,允许一个或多个线程等待其他线程完成一组操作后再继续执行。
它通过维护一个计数器来实现等待和通知的机制。
在创建 countdownlatch 时,需要指定初始计数值,每次调用 countdown() 方法会减少计数值,当计数值达到零时,等待的线程会被唤醒继续执行。
countdownlatch 基本用法
import java.util.concurrent.countdownlatch; public class countdownlatchexample { public static void main(string[] args) { int numberoftasks = 3; countdownlatch latch = new countdownlatch(numberoftasks); // 创建多个线程来模拟完成任务 for (int i = 1; i <= numberoftasks; i++) { int taskid = i; thread thread = new thread(() -> { try { system.out.println("task " + taskid + " is executing..."); thread.sleep(2000); // 模拟任务执行耗时 system.out.println("task " + taskid + " is completed."); } catch (interruptedexception e) { e.printstacktrace(); } finally { latch.countdown(); // 完成任务后减少计数 } }); thread.start(); } try { system.out.println("main thread is waiting for tasks to complete..."); latch.await(); // 等待所有任务完成 system.out.println("all tasks are completed. main thread continues."); } catch (interruptedexception e) { e.printstacktrace(); } } }
运行结果:
task 1 is executing...
main thread is waiting for tasks to complete...
task 2 is executing...
task 3 is executing...
task 1 is completed.
task 2 is completed.
task 3 is completed.
all tasks are completed. main thread continues.
在上述示例中,我们创建了一个 countdownlatch 实例,并初始化计数值为 3。然后创建了多个线程来模拟完成任务,每个线程执行完任务后调用 countdown() 方法减少计数。主线程在执行 latch.await() 时等待计数值为零,等待所有任务完成后继续执行。
使用 countdownlatch 可以实现多个线程之间的协调,确保某些操作在其他操作完成后再继续执行。
countdownlatch 适用场景
- 主线程等待多个子线程完成任务后再继续执行。
- 等待多个线程完成初始化工作后再开始并行操作。
3、cyclicbarrier
cyclicbarrier 是 java 并发包中的同步器之一,用于实现一组线程在达到一个共同点之前等待彼此,并在达到共同点后继续执行。它可以被重置并重新使用,适用于需要多个线程协同工作的场景。
cyclicbarrier 维护一个计数器和一个栅栏动作(barrier action)。当线程调用 await() 方法时,计数器减少,当计数器达到零时,所有等待的线程会被唤醒并继续执行,同时会执行栅栏动作。计数器可以被重置,并且可以设置栅栏动作,在达到共同点后执行。
cyclicbarrier 基本用法
import java.util.concurrent.brokenbarrierexception; import java.util.concurrent.cyclicbarrier; public class cyclicbarrierexample { public static void main(string[] args) { int numberofthreads = 3; runnable barrieraction = () -> system.out.println("all threads reached the barrier!"); cyclicbarrier barrier = new cyclicbarrier(numberofthreads, barrieraction); // 创建多个线程来模拟并行执行任务 for (int i = 1; i <= numberofthreads; i++) { int threadid = i; thread thread = new thread(() -> { try { system.out.println("thread " + threadid + " is performing its task."); thread.sleep(2000); // 模拟任务执行耗时 system.out.println("thread " + threadid + " has reached the barrier."); barrier.await(); // 等待其他线程达到栅栏点 system.out.println("thread " + threadid + " continues after the barrier."); } catch (interruptedexception | brokenbarrierexception e) { e.printstacktrace(); } }); thread.start(); } } }
运行结果:
thread 1 is performing its task.
thread 3 is performing its task.
thread 2 is performing its task.
thread 2 has reached the barrier.
thread 3 has reached the barrier.
thread 1 has reached the barrier.
all threads reached the barrier!
thread 1 continues after the barrier.
thread 2 continues after the barrier.
thread 3 continues after the barrier.
在上述示例中,我们创建了一个 cyclicbarrier 实例,初始化等待的线程数量为 3,并设置了栅栏动作。
然后创建多个线程,每个线程模拟执行任务后等待其他线程达到栅栏点,当所有线程都达到栅栏点时,栅栏动作会被执行。
使用 cyclicbarrier 可以实现多线程协同工作的场景,确保所有线程在某个共同点之前等待彼此,并在达到共同点后继续执行。
cyclicbarrier 计数器重置用法
package com.lf.java.basic.concurrent; import java.util.concurrent.brokenbarrierexception; import java.util.concurrent.cyclicbarrier; public class multiplecyclicbarrierexample { public static void main(string[] args) { int numberofthreads = 3; int numberofrounds = 3; runnable barrieraction = () -> system.out.println("all threads reached the barrier!"); for (int round = 1; round <= numberofrounds; round++) { cyclicbarrier barrier = new cyclicbarrier(numberofthreads, barrieraction); system.out.println("round " + round + ": starting tasks"); // 创建多个线程来模拟并行执行任务 for (int i = 1; i <= numberofthreads; i++) { int threadid = i; int finalround = round; thread thread = new thread(() -> { try { system.out.println("thread " + threadid + " is performing its task for round " + finalround); thread.sleep(2000); // 模拟任务执行耗时 system.out.println("thread " + threadid + " has reached the barrier for round " + finalround); barrier.await(); // 等待其他线程达到栅栏点 system.out.println("thread " + threadid + " continues after the barrier for round " + finalround); } catch (interruptedexception | brokenbarrierexception e) { e.printstacktrace(); } }); thread.start(); } // 等待所有线程完成当前轮次的任务 try { thread.sleep(3000); // 等待一段时间以观察效果 } catch (interruptedexception e) { e.printstacktrace(); } system.out.println("round " + round + ": all tasks completed\n"); // 让当前轮次的所有线程都离开栅栏点,以便重新使用 barrier.reset(); } } }
运行结果:
round 1: starting tasks
thread 1 is performing its task for round 1
thread 2 is performing its task for round 1
thread 3 is performing its task for round 1
thread 3 has reached the barrier for round 1
thread 2 has reached the barrier for round 1
thread 1 has reached the barrier for round 1
all threads reached the barrier!
thread 2 continues after the barrier for round 1
thread 1 continues after the barrier for round 1
thread 3 continues after the barrier for round 1
round 1: all tasks completedround 2: starting tasks
thread 1 is performing its task for round 2
thread 2 is performing its task for round 2
thread 3 is performing its task for round 2
thread 3 has reached the barrier for round 2
thread 2 has reached the barrier for round 2
thread 1 has reached the barrier for round 2
all threads reached the barrier!
thread 1 continues after the barrier for round 2
thread 3 continues after the barrier for round 2
thread 2 continues after the barrier for round 2
round 2: all tasks completedround 3: starting tasks
thread 1 is performing its task for round 3
thread 2 is performing its task for round 3
thread 3 is performing its task for round 3
thread 1 has reached the barrier for round 3
thread 2 has reached the barrier for round 3
thread 3 has reached the barrier for round 3
all threads reached the barrier!
thread 3 continues after the barrier for round 3
thread 1 continues after the barrier for round 3
thread 2 continues after the barrier for round 3
round 3: all tasks completed
在上述示例中,我们模拟了多轮任务协同。每一轮都创建一个新的 cyclicbarrier 实例,用于协调线程的等待和通知。在每一轮的任务完成后,我们使用 barrier.reset() 来重置计数器,以便进行下一轮的任务协同。
运行这个示例可以看到多轮任务协同的效果,每一轮的任务都会等待所有线程完成后再继续,然后重置计数器以准备下一轮。
cyclicbarrier 适用场景
- 将多个线程分成阶段进行,每个阶段需要等待其他线程完成后再继续。
- 并行计算中的分治操作,等待所有线程完成分治任务后进行合并计算。
4、phaser
phaser 是 java 并发包中的同步器之一,它提供了更灵活的多阶段线程协调机制,适用于需要分阶段进行多个任务的并行执行和协调的场景。phaser 可以用于更复杂的同步需求,例如循环的多阶段任务协同。
phaser 维护了一个计数器和多个阶段(phase)。在每个阶段,线程可以注册、等待和注销,以及在某个阶段到达时执行特定的操作。
phaser 基本用法
import java.util.concurrent.phaser; public class phaserexample { public static void main(string[] args) { int numberofthreads = 3; int numberofphases = 3; phaser phaser = new phaser(numberofthreads) { @override protected boolean onadvance(int phase, int registeredparties) { system.out.println("phase " + phase + " completed."); return phase == numberofphases - 1 || registeredparties == 0; } }; // 创建多个线程来模拟并行执行任务 for (int i = 0; i < numberofthreads; i++) { int threadid = i; thread thread = new thread(() -> { for (int phase = 0; phase < numberofphases; phase++) { system.out.println("thread " + threadid + " is in phase " + phase); phaser.arriveandawaitadvance(); // 等待其他线程到达当前阶段 } }); thread.start(); } } }
运行结果:
thread 0 is in phase 0
thread 1 is in phase 0
thread 2 is in phase 0
phase 0 completed.
thread 2 is in phase 1
thread 1 is in phase 1
thread 0 is in phase 1
phase 1 completed.
thread 1 is in phase 2
thread 2 is in phase 2
thread 0 is in phase 2
phase 2 completed.
在上述示例中,我们创建了一个 phaser 实例,设置初始注册线程数量为 3。然后,我们创建多个线程来模拟并行执行任务,每个线程都会在每个阶段调用 phaser.arriveandawaitadvance() 等待其他线程到达当前阶段。当所有线程都到达后,onadvance() 方法会被调用,用于执行阶段结束后的操作。
phaser 提供了更灵活的多阶段协同机制,适用于需要多个阶段的任务协同和并行执行的场景。它还支持动态添加或删除等待线程,使其更适用于动态变化的并发需求。
phaser 适用场景
- 需要分阶段执行的任务,每个阶段可以有不同的线程数。
- 需要动态添加或删除等待线程的场景。
5、reentrantlock
reentrantlock 是 java 并发包中的同步器之一,它是一个可重入的互斥锁,提供了与 synchronized 关键字相似的功能,但更为灵活。
与 synchronized 不同,reentrantlock 具有更多的控制选项和功能,例如公平性、可中断性、超时等待等。
reentrantlock 基本用法
import java.util.concurrent.locks.lock; import java.util.concurrent.locks.reentrantlock; public class reentrantlockexample { public static void main(string[] args) { lock lock = new reentrantlock(); // 创建多个线程来模拟使用锁 for (int i = 1; i <= 5; i++) { int threadid = i; thread thread = new thread(() -> { try { lock.lock(); // 获取锁 system.out.println("thread " + threadid + " acquired the lock."); thread.sleep(2000); // 模拟临界区操作耗时 } catch (interruptedexception e) { e.printstacktrace(); } finally { lock.unlock(); // 释放锁 system.out.println("thread " + threadid + " released the lock."); } }); thread.start(); } } }
运行结果:
thread 1 acquired the lock.
thread 1 released the lock.
thread 2 acquired the lock.
thread 2 released the lock.
thread 3 acquired the lock.
thread 3 released the lock.
thread 4 acquired the lock.
thread 4 released the lock.
thread 5 acquired the lock.
thread 5 released the lock.
在上述示例中,我们创建了一个 reentrantlock 实例,并在多个线程中使用它来模拟对共享资源的访问。每个线程在访问资源前调用 lock.lock() 来获取锁,访问资源后调用 lock.unlock() 来释放锁。
需要注意的是,为了避免死锁,应该在 finally 块中释放锁,以确保无论是否发生异常,锁都会被释放。
reentrantlock 还提供了其他方法,如 trylock()(尝试获取锁,如果锁可用则获取,否则返回 false)、lockinterruptibly()(可中断的获取锁,可响应线程中断)等,使其更加灵活和强大。
reentrantlock 中断获取锁用法
package com.lf.java.basic.concurrent; import java.util.concurrent.locks.lock; import java.util.concurrent.locks.reentrantlock; public class interruptiblelockexample { public static void main(string[] args) { reentrantlock lock = new reentrantlock(); // 创建线程尝试获取锁 thread thread = new thread(() -> { try { lock.lockinterruptibly(); // 可中断获取锁 system.out.println("thread acquired the lock."); thread.sleep(5000); // 模拟临界区操作耗时 } catch (interruptedexception e) { //中断唤醒线程 system.out.println("thread interrupted while waiting for the lock."); } finally { if (lock.isheldbycurrentthread()) { lock.unlock(); // 释放锁 system.out.println("thread released the lock."); } } }); // 启动线程 thread.start(); // 主线程等待一段时间后尝试中断线程 try { thread.sleep(2000); system.out.println("thread interrupt before"); thread.interrupt(); // 中断线程的等待 system.out.println("thread interrupt after"); } catch (interruptedexception e) { system.out.println("interruptedexception catch"); e.printstacktrace(); } } }
运行结果:
thread acquired the lock.
thread interrupt before
thread interrupt after
thread interrupted while waiting for the lock.
thread released the lock.
在上述示例中,创建了一个线程尝试获取锁,但是主线程在启动线程后等待了一段时间后中断了该线程的等待。
由于我们使用了 lock.lockinterruptibly() 来获取锁,线程在等待锁的过程中可以响应中断,一旦被中断,它会抛出 interruptedexception,从而可以捕获中断事件并做出相应处理。
reentrantlock 适用场景:
- 需要更精细的同步控制,例如在某些情况下需要手动释放锁。
- 需要可中断或超时等待的线程。
6、readwritelock
readwritelock 是 java 并发包中的同步器之一,用于实现读写分离的锁机制,提供了更高效的并发访问控制。
它允许多个线程同时读取共享资源,但在写入资源时只允许一个线程进行,从而提高了并发性能。
readwritelock 包含两种锁:读锁和写锁。
- 读锁(readlock):多个线程可以同时获取读锁,只要没有线程持有写锁。在没有写锁的情况下,多个线程可以并发读取共享资源,从而提高并发性能。
- 写锁(write lock):写锁是独占的,只有一个线程可以持有写锁。在一个线程持有写锁时,其他线程无法获取读锁或写锁,从而确保对共享资源的写操作是互斥的。
readwritelock 基本用法
import java.util.concurrent.locks.readwritelock; import java.util.concurrent.locks.reentrantreadwritelock; public class readwritelockexample { public static void main(string[] args) { readwritelock readwritelock = new reentrantreadwritelock(); // 创建多个读线程 for (int i = 1; i <= 5; i++) { int threadid = i; thread thread = new thread(() -> { readwritelock.readlock().lock(); // 获取读锁 try { system.out.println("thread " + threadid + " is reading."); thread.sleep(2000); // 模拟读取操作 } catch (interruptedexception e) { e.printstacktrace(); } finally { readwritelock.readlock().unlock(); // 释放读锁 } }); thread.start(); } // 创建一个写线程 thread writethread = new thread(() -> { readwritelock.writelock().lock(); // 获取写锁 try { system.out.println("write thread is writing."); thread.sleep(2000); // 模拟写入操作 } catch (interruptedexception e) { e.printstacktrace(); } finally { readwritelock.writelock().unlock(); // 释放写锁 } }); writethread.start(); } }
运行结果:
thread 1 is reading.
thread 2 is reading.
thread 4 is reading.
thread 3 is reading.
thread 5 is reading.
write thread is writing
在上述示例中,我们创建了一个 readwritelock 实例,然后创建多个读线程和一个写线程来模拟读写操作。
读线程在执行时调用 readwritelock.readlock().lock() 来获取读锁,写线程在执行时调用 readwritelock.writelock().lock() 来获取写锁。
使用 readwritelock 可以提高对共享资源的并发访问性能,适用于读操作频繁,写操作较少的场景。
readwritelock 适用场景
- 读操作频繁,写操作较少的情况,以提高并发性能。
- 允许多个线程同时读取资源,但在写入资源时确保互斥。
7、 condition
condition 是 java 并发包中的同步器之一,它提供了更灵活的线程等待和通知机制,用于在多线程环境下实现精细的线程协调。
condition 是与 lock 结合使用的,它可以替代传统的 wait() 和 notify() 方法,提供更多的控制选项和功能。
通过 condition,我们可以实现更精确的等待和通知,以及更灵活的线程唤醒机制。
condition 基本用法
import java.util.concurrent.locks.condition; import java.util.concurrent.locks.lock; import java.util.concurrent.locks.reentrantlock; public class conditionexample { public static void main(string[] args) { lock lock = new reentrantlock(); condition condition = lock.newcondition(); // 创建一个等待线程 thread waitingthread = new thread(() -> { lock.lock(); try { system.out.println("waiting thread is waiting..."); condition.await(); // 等待条件满足 system.out.println("waiting thread is awake."); } catch (interruptedexception e) { e.printstacktrace(); } finally { lock.unlock(); } }); // 创建一个唤醒线程 thread signalingthread = new thread(() -> { lock.lock(); try { thread.sleep(2000); // 模拟等待一段时间 system.out.println("signaling thread is awake."); condition.signal(); // 唤醒等待线程 } catch (interruptedexception e) { e.printstacktrace(); } finally { lock.unlock(); } }); // 启动线程 waitingthread.start(); signalingthread.start(); } }
运行结果:
waiting thread is waiting...
signaling thread is awake.
waiting thread is awake.
在上述示例中,我们创建了一个 reentrantlock 实例和一个与之关联的 condition,然后创建了一个等待线程和一个唤醒线程。
等待线程在调用 condition.await() 后进入等待状态,直到唤醒线程调用 condition.signal() 来唤醒它。
通过使用 condition,我们可以更加精确地控制线程的等待和通知,使线程协调更加灵活。
condition 实现阻塞队列
import java.util.linkedlist; import java.util.queue; import java.util.concurrent.locks.condition; import java.util.concurrent.locks.lock; import java.util.concurrent.locks.reentrantlock; public class blockingqueuewithcondition<t> { private final queue<t> queue = new linkedlist<>(); private final int capacity; private final lock lock = new reentrantlock(); private final condition notfull = lock.newcondition(); private final condition notempty = lock.newcondition(); public blockingqueuewithcondition(int capacity) { this.capacity = capacity; } public void put(t item) throws interruptedexception { lock.lock(); try { while (queue.size() == capacity) { notfull.await(); } queue.offer(item); notempty.signal(); } finally { lock.unlock(); } } public t take() throws interruptedexception { lock.lock(); try { while (queue.isempty()) { notempty.await(); } t item = queue.poll(); notfull.signal(); return item; } finally { lock.unlock(); } } public int size() { lock.lock(); try { return queue.size(); } finally { lock.unlock(); } } public static void main(string[] args) { blockingqueuewithcondition<integer> queue = new blockingqueuewithcondition<>(5); thread producerthread = new thread(() -> { try { for (int i = 1; i <= 10; i++) { queue.put(i); system.out.println("produced: " + i); thread.sleep(1000); } } catch (interruptedexception e) { e.printstacktrace(); } }); thread consumerthread = new thread(() -> { try { for (int i = 1; i <= 10; i++) { int item = queue.take(); system.out.println("consumed: " + item); thread.sleep(1500); } } catch (interruptedexception e) { e.printstacktrace(); } }); producerthread.start(); consumerthread.start(); } }
运行结果:
produced: 1
consumed: 1
produced: 2
consumed: 2
produced: 3
consumed: 3
produced: 4
produced: 5
consumed: 4
produced: 6
consumed: 5
produced: 7
produced: 8
consumed: 6
produced: 9
consumed: 7
produced: 10
consumed: 8
consumed: 9
consumed: 10
在上述示例中,我们使用 condition 来实现了一个阻塞队列,其中 put() 方法用于向队列中放入元素,take() 方法用于从队列中取出元素。
当队列满时,生产者线程会等待 notfull 条件,当队列为空时,消费者线程会等待 notempty 条件。
这个示例展示了如何使用 condition 来实现线程之间的协调,以及如何实现一个简单的阻塞队列。注意,这个示例并没有处理所有的边界情况和异常情况,实际使用时需要考虑更多细节。
8、blockingqueue
blockingqueue 是 java 并发包中的一个接口,它提供了一种线程安全的队列实现,用于在多线程环境下进行数据的生产和消费。
blockingqueue 支持阻塞操作,当队列满或空时,线程会被阻塞,直到条件满足。
blockingqueue 提供了多种实现,包括:
- arrayblockingqueue:基于数组的有界阻塞队列。
- linkedblockingqueue:基于链表的可选有界阻塞队列。
- priorityblockingqueue:基于优先级的无界阻塞队列。
- delayqueue:基于延迟时间的无界阻塞队列。
- synchronousqueue:不存储元素的阻塞队列,用于直接传递数据。
- linkedtransferqueue:基于链表的无界阻塞队列,结合了 linkedblockingqueue 和synchronousqueue 特性。
- linkedblockingdeque:基于链表的双端阻塞队列。
blockingqueue 基本用法
import java.util.concurrent.arrayblockingqueue; import java.util.concurrent.blockingqueue; public class blockingqueueexample { public static void main(string[] args) { blockingqueue<integer> queue = new arrayblockingqueue<>(5); thread producerthread = new thread(() -> { try { for (int i = 1; i <= 10; i++) { queue.put(i); system.out.println("produced: " + i); thread.sleep(1000); } } catch (interruptedexception e) { e.printstacktrace(); } }); thread consumerthread = new thread(() -> { try { for (int i = 1; i <= 10; i++) { int item = queue.take(); system.out.println("consumed: " + item); thread.sleep(1500); } } catch (interruptedexception e) { e.printstacktrace(); } }); producerthread.start(); consumerthread.start(); } }
运行结果:
consumed: 1
produced: 1
produced: 2
consumed: 2
produced: 3
consumed: 3
produced: 4
produced: 5
consumed: 4
produced: 6
consumed: 5
produced: 7
produced: 8
consumed: 6
produced: 9
consumed: 7
produced: 10
consumed: 8
consumed: 9
consumed: 10
在上述示例中,我们使用了 arrayblockingqueue 来实现阻塞队列,其中生产者线程使用 put() 方法向队列中放入元素,消费者线程使用 take() 方法从队列中取出元素。
当队列满或空时,线程会被阻塞,直到条件满足。
blockingqueue 是实现线程安全的生产者-消费者模式的常用工具,它简化了线程之间的协调和通信。
9、blockingdeque
blockingdeque(阻塞双端队列)是 java 并发包中的一个接口,它是 blockingqueue 接口的扩展,提供了双端队列的功能,并支持阻塞操作。blockingdeque 可以在队列的两端插入和删除元素,同时支持阻塞操作,使得在多线程环境下更容易实现数据的生产和消费。
blockingdeque 接口的实现类包括:
- linkedblockingdeque:基于链表的阻塞双端队列,可选有界或无界。
- linkedblockingdeque:基于链表的双端阻塞队列,无界。
blockingdeque基本用法
import java.util.concurrent.blockingdeque; import java.util.concurrent.linkedblockingdeque; public class blockingdequeexample { public static void main(string[] args) { blockingdeque<integer> deque = new linkedblockingdeque<>(5); thread producerthread = new thread(() -> { try { for (int i = 1; i <= 10; i++) { deque.put(i); system.out.println("produced: " + i); thread.sleep(1000); } } catch (interruptedexception e) { e.printstacktrace(); } }); thread consumerthread = new thread(() -> { try { for (int i = 1; i <= 10; i++) { int item = deque.take(); system.out.println("consumed: " + item); thread.sleep(1500); } } catch (interruptedexception e) { e.printstacktrace(); } }); producerthread.start(); consumerthread.start(); } }
运行结果:
produced: 1
consumed: 1
produced: 2
consumed: 2
produced: 3
consumed: 3
produced: 4
produced: 5
consumed: 4
produced: 6
consumed: 5
produced: 7
produced: 8
consumed: 6
produced: 9
consumed: 7
produced: 10
consumed: 8
consumed: 9
consumed: 10
在上述示例中,我们使用了 linkedblockingdeque 来实现阻塞双端队列,生产者线程使用 put() 方法向队列中放入元素,消费者线程使用 take() 方法从队列中取出元素。
与 blockingqueue 类似,当队列满或空时,线程会被阻塞,直到条件满足。
blockingdeque 可以更灵活地实现在队列两端插入和删除元素,适用于更多种类的场景,例如双向数据传输和窗口滑动等。
以下是一些常用的在队列两端插入和删除元素的方法:
- 在队列头部插入元素:
void addfirst(e e): 将元素添加到队列的头部,如果队列已满,则抛出异常。
boolean offerfirst(e e): 将元素添加到队列的头部,如果队列已满,则返回 false。
void putfirst(e e): 将元素添加到队列的头部,如果队列已满,则阻塞等待直到有空间。
- 在队列尾部插入元素:
void addlast(e e):将元素添加到队列的尾部,如果队列已满,则抛出异常。
boolean offerlast(e e):将元素添加到队列的尾部,如果队列已满,则返回 false。
void putlast(e e):将元素添加到队列的尾部,如果队列已满,则阻塞等待直到有空间。
- 从队列头部删除元素:
e removefirst(): 移除并返回队列头部的元素,如果队列为空,则抛出异常。
e pollfirst(): 移除并返回队列头部的元素,如果队列为空,则返回 null。
e takefirst(): 移除并返回队列头部的元素,如果队列为空,则阻塞等待直到有元素。
- 从队列尾部删除元素:
e removelast():移除并返回队列尾部的元素,如果队列为空,则抛出异常。
e polllast(): 移除并返回队列尾部的元素,如果队列为空,则返回 null。
e takelast(): 移除并返回队列尾部的元素,如果队列为空,则阻塞等待直到有元素。
这些方法使得你可以在双端队列的头部和尾部执行插入和删除操作,根据具体的需求选择合适的方法来实现线程安全的双端队列操作。
10、locksupport
locksupport 是 java 并发包中提供的工具类,用于线程的阻塞和唤醒操作。
它提供了一种基于许可(permit)的方式来控制线程的阻塞和唤醒,相对于传统的 wait() 和 notify() 方法,locksupport 更加灵活和可靠。
主要的方法包括:
- void park():阻塞当前线程,直到获得许可。
- void park(object blocker):阻塞当前线程,并将 blocker 关联到当前线程,用于监控和诊断工具。
- void parknanos(long nanos):阻塞当前线程,最多等待指定的纳秒数,直到获得许可。
- void parknanos(object blocker, long nanos):阻塞当前线程,并将 blocker关联到当前线程,最多等待指定的纳秒数。
- void parkuntil(long deadline):阻塞当前线程,直到指定的时间戳,直到获得许可。
- void parkuntil(object blocker, long deadline):阻塞当前线程,并将 blocker关联到当前线程,直到指定的时间戳。
- void unpark(thread thread):唤醒指定的线程,如果线程被阻塞,则解除阻塞状态。
locksupport基本用法
import java.util.concurrent.locks.locksupport; public class locksupportexample { public static void main(string[] args) { thread thread = new thread(() -> { system.out.println("thread is going to be parked."); locksupport.park(); // 阻塞当前线程 system.out.println("thread is unparked."); }); thread.start(); try { thread.sleep(2000); } catch (interruptedexception e) { e.printstacktrace(); } system.out.println("main thread is unparking the parked thread."); locksupport.unpark(thread); // 唤醒被阻塞的线程 } }
运行结果:
thread is going to be parked.
main thread is unparking the parked thread.
thread is unparked.
在上述示例中,我们创建了一个新线程,调用了 locksupport.park() 来阻塞该线程。
然后,主线程等待 2 秒后,调用了 locksupport.unpark(thread) 来唤醒被阻塞的线程。与传统的 wait() 和 notify() 方法不同,locksupport 是基于许可的,不需要获取某个特定对象的锁来进行阻塞和唤醒操作。
locksupport 提供了一种更直接、灵活和可控的线程阻塞和唤醒机制,适用于各种多线程协调的场景。
11、exchanger
exchanger 是 java 并发包中的同步器之一,用于实现两个线程之间交换数据。
它提供了一个同步点,当两个线程都到达这个同步点时,它们可以交换数据。exchanger 可以用于实现线程间的数据传递和协作
exchanger 提供了两个线程之间交换数据的功能,但仅限于两个线程。当两个线程都到达 exchanger 同步点时,它们可以通过 exchange() 方法交换数据,然后各自继续执行。
exchanger基本用法
import java.util.concurrent.exchanger; public class exchangerexample { public static void main(string[] args) { exchanger<string> exchanger = new exchanger<>(); // 创建一个线程来发送数据 thread senderthread = new thread(() -> { try { string datatosend = "hello from sender"; system.out.println("sender is sending: " + datatosend); exchanger.exchange(datatosend); // 发送数据并等待接收数据 } catch (interruptedexception e) { e.printstacktrace(); } }); // 创建一个线程来接收数据 thread receiverthread = new thread(() -> { try { string receiveddata = exchanger.exchange(null); // 等待接收数据并发送数据 system.out.println("receiver received: " + receiveddata); } catch (interruptedexception e) { e.printstacktrace(); } }); // 启动线程 senderthread.start(); receiverthread.start(); } }
运行结果
sender is sending: hello from sender
receiver received: hello from sender
在上述示例中,我们创建了一个 exchanger 实例,然后创建了一个发送数据的线程和一个接收数据的线程。
当发送数据的线程调用 exchange() 方法时,它会发送数据并等待接收数据;而接收数据的线程调用 exchange() 方法时,它会等待接收数据并发送数据。当两个线程都到达 exchanger 同步点时,它们会交换数据,并继续执行。
需要注意的是,exchanger 只适用于两个线程之间的数据交换。如果需要更多线程之间的数据交换,可能需要组合使用多个 exchanger。
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。
发表评论