当前位置: 代码网 > it编程>编程语言>Java > java并发中的同步器使用方式

java并发中的同步器使用方式

2025年06月10日 Java 我要评论
同步器java 并发包中的同步器是一些用于协调多个线程执行的工具,用于实现线程之间的同步和互斥操作。这些同步器提供了不同的机制来控制线程的访问和执行顺序,以实现线程安全和并发控制。1、semaphor

同步器

java 并发包中的同步器是一些用于协调多个线程执行的工具,用于实现线程之间的同步和互斥操作。这些同步器提供了不同的机制来控制线程的访问和执行顺序,以实现线程安全和并发控制。

1、semaphore(信号量)

  • semaphore 是 java 并发包中的同步器之一,用于控制对临界区资源的访问数量。它允许多个线程同时访问临界区资源,但限制了同一时间内可以访问资源的线程数量
  • semaphore 维护一个许可证计数,线程可以获取和释放这些许可证。当许可证数量为零时,线程需要等待,直到其他线程释放许可证。

semaphore 基本用法

import java.util.concurrent.semaphore;

public class semaphoreexample {
    public static void main(string[] args) {
        semaphore semaphore = new semaphore(3); // 初始化信号量,允许同时访问的线程数量为3

        // 创建多个线程来模拟访问临界区资源
        for (int i = 1; i <= 5; i++) {
            int threadid = i;
            thread thread = new thread(() -> {
                try {
                    semaphore.acquire(); // 获取许可证,如果没有许可证则阻塞
                    system.out.println("thread " + threadid + " acquired a permit and is accessing the resource.");
                    thread.sleep(2000); // 模拟访问临界区资源的耗时操作
                } catch (interruptedexception e) {
                    e.printstacktrace();
                } finally {
                    semaphore.release(); // 释放许可证
                    system.out.println("thread " + threadid + " released the permit.");
                }
            });
            thread.start();
        }
    }
}

运行结果:

thread 1 acquired a permit and is accessing the resource.
thread 3 acquired a permit and is accessing the resource.
thread 2 acquired a permit and is accessing the resource.
thread 2 released the permit.
thread 4 acquired a permit and is accessing the resource.
thread 5 acquired a permit and is accessing the resource.
thread 3 released the permit.
thread 1 released the permit.
thread 4 released the permit.
thread 5 released the permit.

在上述示例中,我们创建了一个 semaphore 实例,并初始化许可证数量为 3。然后创建了多个线程,每个线程在获取许可证后访问临界区资源,模拟耗时操作后释放许可证。由于许可证数量有限,只有一部分线程能够同时访问资源,其他线程需要等待。

semaphore 适用场景

  1. 有限资源的并发访问,如数据库连接池、线程池等。
  2. 控制对某个资源的同时访问数量,以避免资源竞争和过度消耗。

2、countdownlatch

countdownlatch 是 java 并发包中的同步器之一,用于实现一种等待机制,允许一个或多个线程等待其他线程完成一组操作后再继续执行。

它通过维护一个计数器来实现等待和通知的机制。

在创建 countdownlatch 时,需要指定初始计数值,每次调用 countdown() 方法会减少计数值,当计数值达到零时,等待的线程会被唤醒继续执行。

countdownlatch 基本用法

import java.util.concurrent.countdownlatch;

public class countdownlatchexample {
    public static void main(string[] args) {
        int numberoftasks = 3;
        countdownlatch latch = new countdownlatch(numberoftasks);

        // 创建多个线程来模拟完成任务
        for (int i = 1; i <= numberoftasks; i++) {
            int taskid = i;
            thread thread = new thread(() -> {
                try {
                    system.out.println("task " + taskid + " is executing...");
                    thread.sleep(2000); // 模拟任务执行耗时
                    system.out.println("task " + taskid + " is completed.");
                } catch (interruptedexception e) {
                    e.printstacktrace();
                } finally {
                    latch.countdown(); // 完成任务后减少计数
                }
            });
            thread.start();
        }

        try {
            system.out.println("main thread is waiting for tasks to complete...");
            latch.await(); // 等待所有任务完成
            system.out.println("all tasks are completed. main thread continues.");
        } catch (interruptedexception e) {
            e.printstacktrace();
        }
    }
}

运行结果:

task 1 is executing...
main thread is waiting for tasks to complete...
task 2 is executing...
task 3 is executing...
task 1 is completed.
task 2 is completed.
task 3 is completed.
all tasks are completed. main thread continues.

在上述示例中,我们创建了一个 countdownlatch 实例,并初始化计数值为 3。然后创建了多个线程来模拟完成任务,每个线程执行完任务后调用 countdown() 方法减少计数。主线程在执行 latch.await() 时等待计数值为零,等待所有任务完成后继续执行。

使用 countdownlatch 可以实现多个线程之间的协调,确保某些操作在其他操作完成后再继续执行。

countdownlatch 适用场景

  1. 主线程等待多个子线程完成任务后再继续执行。
  2. 等待多个线程完成初始化工作后再开始并行操作。

3、cyclicbarrier

cyclicbarrier 是 java 并发包中的同步器之一,用于实现一组线程在达到一个共同点之前等待彼此,并在达到共同点后继续执行。它可以被重置并重新使用,适用于需要多个线程协同工作的场景。

cyclicbarrier 维护一个计数器和一个栅栏动作(barrier action)。当线程调用 await() 方法时,计数器减少,当计数器达到零时,所有等待的线程会被唤醒并继续执行,同时会执行栅栏动作计数器可以被重置,并且可以设置栅栏动作,在达到共同点后执行。

cyclicbarrier 基本用法

import java.util.concurrent.brokenbarrierexception;
import java.util.concurrent.cyclicbarrier;

public class cyclicbarrierexample {
    public static void main(string[] args) {
        int numberofthreads = 3;
        runnable barrieraction = () -> system.out.println("all threads reached the barrier!");

        cyclicbarrier barrier = new cyclicbarrier(numberofthreads, barrieraction);

        // 创建多个线程来模拟并行执行任务
        for (int i = 1; i <= numberofthreads; i++) {
            int threadid = i;
            thread thread = new thread(() -> {
                try {
                    system.out.println("thread " + threadid + " is performing its task.");
                    thread.sleep(2000); // 模拟任务执行耗时
                    system.out.println("thread " + threadid + " has reached the barrier.");
                    barrier.await(); // 等待其他线程达到栅栏点
                    system.out.println("thread " + threadid + " continues after the barrier.");
                } catch (interruptedexception | brokenbarrierexception e) {
                    e.printstacktrace();
                }
            });
            thread.start();
        }
    }
}

运行结果

thread 1 is performing its task.
thread 3 is performing its task.
thread 2 is performing its task.
thread 2 has reached the barrier.
thread 3 has reached the barrier.
thread 1 has reached the barrier.
all threads reached the barrier!
thread 1 continues after the barrier.
thread 2 continues after the barrier.
thread 3 continues after the barrier.

在上述示例中,我们创建了一个 cyclicbarrier 实例,初始化等待的线程数量为 3,并设置了栅栏动作。

然后创建多个线程,每个线程模拟执行任务后等待其他线程达到栅栏点,当所有线程都达到栅栏点时,栅栏动作会被执行。

使用 cyclicbarrier 可以实现多线程协同工作的场景,确保所有线程在某个共同点之前等待彼此,并在达到共同点后继续执行。

cyclicbarrier 计数器重置用法

package com.lf.java.basic.concurrent;

import java.util.concurrent.brokenbarrierexception;
import java.util.concurrent.cyclicbarrier;

public class multiplecyclicbarrierexample {
    public static void main(string[] args) {
        int numberofthreads = 3;
        int numberofrounds = 3;
        runnable barrieraction = () -> system.out.println("all threads reached the barrier!");

        for (int round = 1; round <= numberofrounds; round++) {
            cyclicbarrier barrier = new cyclicbarrier(numberofthreads, barrieraction);

            system.out.println("round " + round + ": starting tasks");

            // 创建多个线程来模拟并行执行任务
            for (int i = 1; i <= numberofthreads; i++) {
                int threadid = i;
                int finalround = round;
                thread thread = new thread(() -> {
                    try {
                        system.out.println("thread " + threadid + " is performing its task for round " + finalround);
                        thread.sleep(2000); // 模拟任务执行耗时
                        system.out.println("thread " + threadid + " has reached the barrier for round " + finalround);
                        barrier.await(); // 等待其他线程达到栅栏点
                        system.out.println("thread " + threadid + " continues after the barrier for round " + finalround);
                    } catch (interruptedexception | brokenbarrierexception e) {
                        e.printstacktrace();
                    }
                });
                thread.start();
            }

            // 等待所有线程完成当前轮次的任务
            try {
                thread.sleep(3000); // 等待一段时间以观察效果
            } catch (interruptedexception e) {
                e.printstacktrace();
            }

            system.out.println("round " + round + ": all tasks completed\n");

            // 让当前轮次的所有线程都离开栅栏点,以便重新使用
            barrier.reset();
        }
    }
}

运行结果:

round 1: starting tasks
thread 1 is performing its task for round 1
thread 2 is performing its task for round 1
thread 3 is performing its task for round 1
thread 3 has reached the barrier for round 1
thread 2 has reached the barrier for round 1
thread 1 has reached the barrier for round 1
all threads reached the barrier!
thread 2 continues after the barrier for round 1
thread 1 continues after the barrier for round 1
thread 3 continues after the barrier for round 1
round 1: all tasks completed

round 2: starting tasks
thread 1 is performing its task for round 2
thread 2 is performing its task for round 2
thread 3 is performing its task for round 2
thread 3 has reached the barrier for round 2
thread 2 has reached the barrier for round 2
thread 1 has reached the barrier for round 2
all threads reached the barrier!
thread 1 continues after the barrier for round 2
thread 3 continues after the barrier for round 2
thread 2 continues after the barrier for round 2
round 2: all tasks completed

round 3: starting tasks
thread 1 is performing its task for round 3
thread 2 is performing its task for round 3
thread 3 is performing its task for round 3
thread 1 has reached the barrier for round 3
thread 2 has reached the barrier for round 3
thread 3 has reached the barrier for round 3
all threads reached the barrier!
thread 3 continues after the barrier for round 3
thread 1 continues after the barrier for round 3
thread 2 continues after the barrier for round 3
round 3: all tasks completed
 

在上述示例中,我们模拟了多轮任务协同。每一轮都创建一个新的 cyclicbarrier 实例,用于协调线程的等待和通知。在每一轮的任务完成后,我们使用 barrier.reset() 来重置计数器,以便进行下一轮的任务协同。

运行这个示例可以看到多轮任务协同的效果,每一轮的任务都会等待所有线程完成后再继续,然后重置计数器以准备下一轮。

cyclicbarrier 适用场景

  1. 将多个线程分成阶段进行,每个阶段需要等待其他线程完成后再继续。
  2. 并行计算中的分治操作,等待所有线程完成分治任务后进行合并计算。

4、phaser

phaser 是 java 并发包中的同步器之一,它提供了更灵活的多阶段线程协调机制,适用于需要分阶段进行多个任务的并行执行和协调的场景。phaser 可以用于更复杂的同步需求,例如循环的多阶段任务协同

phaser 维护了一个计数器和多个阶段(phase)。在每个阶段,线程可以注册、等待和注销,以及在某个阶段到达时执行特定的操作

phaser 基本用法

import java.util.concurrent.phaser;

public class phaserexample {
    public static void main(string[] args) {
        int numberofthreads = 3;
        int numberofphases = 3;

        phaser phaser = new phaser(numberofthreads) {
            @override
            protected boolean onadvance(int phase, int registeredparties) {
                system.out.println("phase " + phase + " completed.");
                return phase == numberofphases - 1 || registeredparties == 0;
            }
        };

        // 创建多个线程来模拟并行执行任务
        for (int i = 0; i < numberofthreads; i++) {
            int threadid = i;
            thread thread = new thread(() -> {
                for (int phase = 0; phase < numberofphases; phase++) {
                    system.out.println("thread " + threadid + " is in phase " + phase);
                    phaser.arriveandawaitadvance(); // 等待其他线程到达当前阶段
                }
            });
            thread.start();
        }
    }
}

运行结果:

thread 0 is in phase 0
thread 1 is in phase 0
thread 2 is in phase 0
phase 0 completed.
thread 2 is in phase 1
thread 1 is in phase 1
thread 0 is in phase 1
phase 1 completed.
thread 1 is in phase 2
thread 2 is in phase 2
thread 0 is in phase 2
phase 2 completed.
 

在上述示例中,我们创建了一个 phaser 实例,设置初始注册线程数量为 3。然后,我们创建多个线程来模拟并行执行任务,每个线程都会在每个阶段调用 phaser.arriveandawaitadvance() 等待其他线程到达当前阶段。当所有线程都到达后,onadvance() 方法会被调用,用于执行阶段结束后的操作。

phaser 提供了更灵活的多阶段协同机制,适用于需要多个阶段的任务协同和并行执行的场景。它还支持动态添加或删除等待线程,使其更适用于动态变化的并发需求。

phaser 适用场景

  1. 需要分阶段执行的任务,每个阶段可以有不同的线程数。
  2. 需要动态添加或删除等待线程的场景。

5、reentrantlock

reentrantlock 是 java 并发包中的同步器之一,它是一个可重入的互斥锁,提供了与 synchronized 关键字相似的功能,但更为灵活。

与 synchronized 不同,reentrantlock 具有更多的控制选项和功能,例如公平性、可中断性、超时等待等。

reentrantlock 基本用法

import java.util.concurrent.locks.lock;
import java.util.concurrent.locks.reentrantlock;

public class reentrantlockexample {
    public static void main(string[] args) {
        lock lock = new reentrantlock();
        
        // 创建多个线程来模拟使用锁
        for (int i = 1; i <= 5; i++) {
            int threadid = i;
            thread thread = new thread(() -> {
                try {
                    lock.lock(); // 获取锁
                    system.out.println("thread " + threadid + " acquired the lock.");
                    thread.sleep(2000); // 模拟临界区操作耗时
                } catch (interruptedexception e) {
                    e.printstacktrace();
                } finally {
                    lock.unlock(); // 释放锁
                    system.out.println("thread " + threadid + " released the lock.");
                }
            });
            thread.start();
        }
    }
}

运行结果:

thread 1 acquired the lock.
thread 1 released the lock.
thread 2 acquired the lock.
thread 2 released the lock.
thread 3 acquired the lock.
thread 3 released the lock.
thread 4 acquired the lock.
thread 4 released the lock.
thread 5 acquired the lock.
thread 5 released the lock.

在上述示例中,我们创建了一个 reentrantlock 实例,并在多个线程中使用它来模拟对共享资源的访问。每个线程在访问资源前调用 lock.lock() 来获取锁,访问资源后调用 lock.unlock() 来释放锁。

需要注意的是,为了避免死锁,应该在 finally 块中释放锁,以确保无论是否发生异常,锁都会被释放。

reentrantlock 还提供了其他方法,如 trylock()(尝试获取锁,如果锁可用则获取,否则返回 false)、lockinterruptibly()(可中断的获取锁,可响应线程中断)等,使其更加灵活和强大。

reentrantlock 中断获取锁用法

package com.lf.java.basic.concurrent;

import java.util.concurrent.locks.lock;
import java.util.concurrent.locks.reentrantlock;

public class interruptiblelockexample {
    public static void main(string[] args) {
        reentrantlock lock = new reentrantlock();

        // 创建线程尝试获取锁
        thread thread = new thread(() -> {
            try {
                lock.lockinterruptibly(); // 可中断获取锁
                system.out.println("thread acquired the lock.");
                thread.sleep(5000); // 模拟临界区操作耗时
            } catch (interruptedexception e) {
                //中断唤醒线程
                system.out.println("thread interrupted while waiting for the lock.");
            } finally {
                if (lock.isheldbycurrentthread()) {
                    lock.unlock(); // 释放锁
                    system.out.println("thread released the lock.");
                }
            }
        });

        // 启动线程
        thread.start();

        // 主线程等待一段时间后尝试中断线程
        try {
            thread.sleep(2000);
            system.out.println("thread interrupt before");
            thread.interrupt(); // 中断线程的等待
            system.out.println("thread interrupt after");
        } catch (interruptedexception e) {
            system.out.println("interruptedexception catch");
            e.printstacktrace();
        }
    }
}

运行结果:

thread acquired the lock.
thread interrupt before
thread interrupt after
thread interrupted while waiting for the lock.
thread released the lock.

在上述示例中,创建了一个线程尝试获取锁,但是主线程在启动线程后等待了一段时间后中断了该线程的等待。

由于我们使用了 lock.lockinterruptibly() 来获取锁,线程在等待锁的过程中可以响应中断,一旦被中断,它会抛出 interruptedexception,从而可以捕获中断事件并做出相应处理。

reentrantlock 适用场景:

  1. 需要更精细的同步控制,例如在某些情况下需要手动释放锁。
  2. 需要可中断或超时等待的线程。

6、readwritelock

readwritelock 是 java 并发包中的同步器之一,用于实现读写分离的锁机制,提供了更高效的并发访问控制。

它允许多个线程同时读取共享资源,但在写入资源时只允许一个线程进行,从而提高了并发性能。

readwritelock 包含两种锁:读锁和写锁。

  1. 读锁(readlock):多个线程可以同时获取读锁,只要没有线程持有写锁。在没有写锁的情况下,多个线程可以并发读取共享资源,从而提高并发性能。
  2. 写锁(write lock):写锁是独占的,只有一个线程可以持有写锁。在一个线程持有写锁时,其他线程无法获取读锁或写锁,从而确保对共享资源的写操作是互斥的。

readwritelock 基本用法

import java.util.concurrent.locks.readwritelock;
import java.util.concurrent.locks.reentrantreadwritelock;

public class readwritelockexample {
    public static void main(string[] args) {
        readwritelock readwritelock = new reentrantreadwritelock();
        
        // 创建多个读线程
        for (int i = 1; i <= 5; i++) {
            int threadid = i;
            thread thread = new thread(() -> {
                readwritelock.readlock().lock(); // 获取读锁
                try {
                    system.out.println("thread " + threadid + " is reading.");
                    thread.sleep(2000); // 模拟读取操作
                } catch (interruptedexception e) {
                    e.printstacktrace();
                } finally {
                    readwritelock.readlock().unlock(); // 释放读锁
                }
            });
            thread.start();
        }

        // 创建一个写线程
        thread writethread = new thread(() -> {
            readwritelock.writelock().lock(); // 获取写锁
            try {
                system.out.println("write thread is writing.");
                thread.sleep(2000); // 模拟写入操作
            } catch (interruptedexception e) {
                e.printstacktrace();
            } finally {
                readwritelock.writelock().unlock(); // 释放写锁
            }
        });
        writethread.start();
    }
}

运行结果:

thread 1 is reading.
thread 2 is reading.
thread 4 is reading.
thread 3 is reading.
thread 5 is reading.
write thread is writing

在上述示例中,我们创建了一个 readwritelock 实例,然后创建多个读线程和一个写线程来模拟读写操作。

读线程在执行时调用 readwritelock.readlock().lock() 来获取读锁,写线程在执行时调用 readwritelock.writelock().lock() 来获取写锁。

使用 readwritelock 可以提高对共享资源的并发访问性能,适用于读操作频繁,写操作较少的场景。

readwritelock 适用场景

  1. 读操作频繁,写操作较少的情况,以提高并发性能。
  2. 允许多个线程同时读取资源,但在写入资源时确保互斥。

7、 condition

condition 是 java 并发包中的同步器之一,它提供了更灵活的线程等待和通知机制,用于在多线程环境下实现精细的线程协调。

condition 是与 lock 结合使用的,它可以替代传统的 wait() 和 notify() 方法,提供更多的控制选项和功能。

通过 condition,我们可以实现更精确的等待和通知,以及更灵活的线程唤醒机制。

condition 基本用法

import java.util.concurrent.locks.condition;
import java.util.concurrent.locks.lock;
import java.util.concurrent.locks.reentrantlock;

public class conditionexample {
    public static void main(string[] args) {
        lock lock = new reentrantlock();
        condition condition = lock.newcondition();

        // 创建一个等待线程
        thread waitingthread = new thread(() -> {
            lock.lock();
            try {
                system.out.println("waiting thread is waiting...");
                condition.await(); // 等待条件满足
                system.out.println("waiting thread is awake.");
            } catch (interruptedexception e) {
                e.printstacktrace();
            } finally {
                lock.unlock();
            }
        });

        // 创建一个唤醒线程
        thread signalingthread = new thread(() -> {
            lock.lock();
            try {
                thread.sleep(2000); // 模拟等待一段时间
                system.out.println("signaling thread is awake.");
                condition.signal(); // 唤醒等待线程
            } catch (interruptedexception e) {
                e.printstacktrace();
            } finally {
                lock.unlock();
            }
        });

        // 启动线程
        waitingthread.start();
        signalingthread.start();
    }
}

运行结果:

waiting thread is waiting...
signaling thread is awake.
waiting thread is awake.

在上述示例中,我们创建了一个 reentrantlock 实例和一个与之关联的 condition,然后创建了一个等待线程和一个唤醒线程。

等待线程在调用 condition.await() 后进入等待状态,直到唤醒线程调用 condition.signal() 来唤醒它。

通过使用 condition,我们可以更加精确地控制线程的等待和通知,使线程协调更加灵活。

condition 实现阻塞队列

import java.util.linkedlist;
import java.util.queue;
import java.util.concurrent.locks.condition;
import java.util.concurrent.locks.lock;
import java.util.concurrent.locks.reentrantlock;

public class blockingqueuewithcondition<t> {
    private final queue<t> queue = new linkedlist<>();
    private final int capacity;

    private final lock lock = new reentrantlock();
    private final condition notfull = lock.newcondition();
    private final condition notempty = lock.newcondition();

    public blockingqueuewithcondition(int capacity) {
        this.capacity = capacity;
    }

    public void put(t item) throws interruptedexception {
        lock.lock();
        try {
            while (queue.size() == capacity) {
                notfull.await();
            }
            queue.offer(item);
            notempty.signal();
        } finally {
            lock.unlock();
        }
    }

    public t take() throws interruptedexception {
        lock.lock();
        try {
            while (queue.isempty()) {
                notempty.await();
            }
            t item = queue.poll();
            notfull.signal();
            return item;
        } finally {
            lock.unlock();
        }
    }

    public int size() {
        lock.lock();
        try {
            return queue.size();
        } finally {
            lock.unlock();
        }
    }

    public static void main(string[] args) {
        blockingqueuewithcondition<integer> queue = new blockingqueuewithcondition<>(5);

        thread producerthread = new thread(() -> {
            try {
                for (int i = 1; i <= 10; i++) {
                    queue.put(i);
                    system.out.println("produced: " + i);
                    thread.sleep(1000);
                }
            } catch (interruptedexception e) {
                e.printstacktrace();
            }
        });

        thread consumerthread = new thread(() -> {
            try {
                for (int i = 1; i <= 10; i++) {
                    int item = queue.take();
                    system.out.println("consumed: " + item);
                    thread.sleep(1500);
                }
            } catch (interruptedexception e) {
                e.printstacktrace();
            }
        });

        producerthread.start();
        consumerthread.start();
    }
}

运行结果:

produced: 1
consumed: 1
produced: 2
consumed: 2
produced: 3
consumed: 3
produced: 4
produced: 5
consumed: 4
produced: 6
consumed: 5
produced: 7
produced: 8
consumed: 6
produced: 9
consumed: 7
produced: 10
consumed: 8
consumed: 9
consumed: 10

在上述示例中,我们使用 condition 来实现了一个阻塞队列,其中 put() 方法用于向队列中放入元素,take() 方法用于从队列中取出元素。

当队列满时,生产者线程会等待 notfull 条件,当队列为空时,消费者线程会等待 notempty 条件。

这个示例展示了如何使用 condition 来实现线程之间的协调,以及如何实现一个简单的阻塞队列。注意,这个示例并没有处理所有的边界情况和异常情况,实际使用时需要考虑更多细节。

8、blockingqueue

blockingqueue 是 java 并发包中的一个接口,它提供了一种线程安全的队列实现,用于在多线程环境下进行数据的生产和消费。

blockingqueue 支持阻塞操作,当队列满或空时,线程会被阻塞,直到条件满足。

blockingqueue 提供了多种实现,包括:

  1. arrayblockingqueue:基于数组的有界阻塞队列。
  2. linkedblockingqueue:基于链表的可选有界阻塞队列。
  3. priorityblockingqueue:基于优先级的无界阻塞队列。
  4. delayqueue:基于延迟时间的无界阻塞队列。
  5. synchronousqueue:不存储元素的阻塞队列,用于直接传递数据。
  6. linkedtransferqueue:基于链表的无界阻塞队列,结合了 linkedblockingqueue 和synchronousqueue 特性。
  7. linkedblockingdeque:基于链表的双端阻塞队列。

blockingqueue 基本用法

import java.util.concurrent.arrayblockingqueue;
import java.util.concurrent.blockingqueue;

public class blockingqueueexample {
    public static void main(string[] args) {
        blockingqueue<integer> queue = new arrayblockingqueue<>(5);

        thread producerthread = new thread(() -> {
            try {
                for (int i = 1; i <= 10; i++) {
                    queue.put(i);
                    system.out.println("produced: " + i);
                    thread.sleep(1000);
                }
            } catch (interruptedexception e) {
                e.printstacktrace();
            }
        });

        thread consumerthread = new thread(() -> {
            try {
                for (int i = 1; i <= 10; i++) {
                    int item = queue.take();
                    system.out.println("consumed: " + item);
                    thread.sleep(1500);
                }
            } catch (interruptedexception e) {
                e.printstacktrace();
            }
        });

        producerthread.start();
        consumerthread.start();
    }
}

运行结果:

consumed: 1
produced: 1
produced: 2
consumed: 2
produced: 3
consumed: 3
produced: 4
produced: 5
consumed: 4
produced: 6
consumed: 5
produced: 7
produced: 8
consumed: 6
produced: 9
consumed: 7
produced: 10
consumed: 8
consumed: 9
consumed: 10

在上述示例中,我们使用了 arrayblockingqueue 来实现阻塞队列,其中生产者线程使用 put() 方法向队列中放入元素,消费者线程使用 take() 方法从队列中取出元素。

当队列满或空时,线程会被阻塞,直到条件满足。

blockingqueue 是实现线程安全的生产者-消费者模式的常用工具,它简化了线程之间的协调和通信。

9、blockingdeque

blockingdeque(阻塞双端队列)是 java 并发包中的一个接口,它是 blockingqueue 接口的扩展,提供了双端队列的功能,并支持阻塞操作。blockingdeque 可以在队列的两端插入和删除元素,同时支持阻塞操作,使得在多线程环境下更容易实现数据的生产和消费。

blockingdeque 接口的实现类包括:

  1. linkedblockingdeque:基于链表的阻塞双端队列,可选有界或无界。
  2. linkedblockingdeque:基于链表的双端阻塞队列,无界。

blockingdeque基本用法

import java.util.concurrent.blockingdeque;
import java.util.concurrent.linkedblockingdeque;

public class blockingdequeexample {
    public static void main(string[] args) {
        blockingdeque<integer> deque = new linkedblockingdeque<>(5);

        thread producerthread = new thread(() -> {
            try {
                for (int i = 1; i <= 10; i++) {
                    deque.put(i);
                    system.out.println("produced: " + i);
                    thread.sleep(1000);
                }
            } catch (interruptedexception e) {
                e.printstacktrace();
            }
        });

        thread consumerthread = new thread(() -> {
            try {
                for (int i = 1; i <= 10; i++) {
                    int item = deque.take();
                    system.out.println("consumed: " + item);
                    thread.sleep(1500);
                }
            } catch (interruptedexception e) {
                e.printstacktrace();
            }
        });

        producerthread.start();
        consumerthread.start();
    }
}

运行结果:

produced: 1
consumed: 1
produced: 2
consumed: 2
produced: 3
consumed: 3
produced: 4
produced: 5
consumed: 4
produced: 6
consumed: 5
produced: 7
produced: 8
consumed: 6
produced: 9
consumed: 7
produced: 10
consumed: 8
consumed: 9
consumed: 10

在上述示例中,我们使用了 linkedblockingdeque 来实现阻塞双端队列,生产者线程使用 put() 方法向队列中放入元素,消费者线程使用 take() 方法从队列中取出元素。

与 blockingqueue 类似,当队列满或空时,线程会被阻塞,直到条件满足。

blockingdeque 可以更灵活地实现在队列两端插入和删除元素,适用于更多种类的场景,例如双向数据传输和窗口滑动等。

以下是一些常用的在队列两端插入和删除元素的方法:

  1. 在队列头部插入元素:

void addfirst(e e): 将元素添加到队列的头部,如果队列已满,则抛出异常。

boolean offerfirst(e e): 将元素添加到队列的头部,如果队列已满,则返回 false。

void putfirst(e e): 将元素添加到队列的头部,如果队列已满,则阻塞等待直到有空间。

  1. 在队列尾部插入元素:

void addlast(e e):将元素添加到队列的尾部,如果队列已满,则抛出异常。

boolean offerlast(e e):将元素添加到队列的尾部,如果队列已满,则返回 false。

void putlast(e e):将元素添加到队列的尾部,如果队列已满,则阻塞等待直到有空间。

  1. 从队列头部删除元素:

e removefirst(): 移除并返回队列头部的元素,如果队列为空,则抛出异常。

e pollfirst(): 移除并返回队列头部的元素,如果队列为空,则返回 null。

e takefirst(): 移除并返回队列头部的元素,如果队列为空,则阻塞等待直到有元素。

  1. 从队列尾部删除元素:

e removelast():移除并返回队列尾部的元素,如果队列为空,则抛出异常。

e polllast(): 移除并返回队列尾部的元素,如果队列为空,则返回 null。

e takelast(): 移除并返回队列尾部的元素,如果队列为空,则阻塞等待直到有元素。

这些方法使得你可以在双端队列的头部和尾部执行插入和删除操作,根据具体的需求选择合适的方法来实现线程安全的双端队列操作。

10、locksupport

locksupport 是 java 并发包中提供的工具类,用于线程的阻塞和唤醒操作。

它提供了一种基于许可(permit)的方式来控制线程的阻塞和唤醒,相对于传统的 wait() 和 notify() 方法,locksupport 更加灵活和可靠。

主要的方法包括:

  • void park():阻塞当前线程,直到获得许可。
  • void park(object blocker):阻塞当前线程,并将 blocker 关联到当前线程,用于监控和诊断工具。
  • void parknanos(long nanos):阻塞当前线程,最多等待指定的纳秒数,直到获得许可。
  • void parknanos(object blocker, long nanos):阻塞当前线程,并将 blocker关联到当前线程,最多等待指定的纳秒数。
  • void parkuntil(long deadline):阻塞当前线程,直到指定的时间戳,直到获得许可。
  • void parkuntil(object blocker, long deadline):阻塞当前线程,并将 blocker关联到当前线程,直到指定的时间戳。
  • void unpark(thread thread):唤醒指定的线程,如果线程被阻塞,则解除阻塞状态。

locksupport基本用法

import java.util.concurrent.locks.locksupport;

public class locksupportexample {
    public static void main(string[] args) {
        thread thread = new thread(() -> {
            system.out.println("thread is going to be parked.");
            locksupport.park(); // 阻塞当前线程
            system.out.println("thread is unparked.");
        });

        thread.start();

        try {
            thread.sleep(2000);
        } catch (interruptedexception e) {
            e.printstacktrace();
        }

        system.out.println("main thread is unparking the parked thread.");
        locksupport.unpark(thread); // 唤醒被阻塞的线程
    }
}

运行结果:

thread is going to be parked.
main thread is unparking the parked thread.
thread is unparked.

在上述示例中,我们创建了一个新线程,调用了 locksupport.park() 来阻塞该线程。

然后,主线程等待 2 秒后,调用了 locksupport.unpark(thread) 来唤醒被阻塞的线程。与传统的 wait() 和 notify() 方法不同,locksupport 是基于许可的,不需要获取某个特定对象的锁来进行阻塞和唤醒操作。

locksupport 提供了一种更直接、灵活和可控的线程阻塞和唤醒机制,适用于各种多线程协调的场景。

11、exchanger

exchanger 是 java 并发包中的同步器之一,用于实现两个线程之间交换数据。

它提供了一个同步点,当两个线程都到达这个同步点时,它们可以交换数据。exchanger 可以用于实现线程间的数据传递和协作

exchanger 提供了两个线程之间交换数据的功能,但仅限于两个线程。当两个线程都到达 exchanger 同步点时,它们可以通过 exchange() 方法交换数据,然后各自继续执行。

exchanger基本用法

import java.util.concurrent.exchanger;

public class exchangerexample {
    public static void main(string[] args) {
        exchanger<string> exchanger = new exchanger<>();

        // 创建一个线程来发送数据
        thread senderthread = new thread(() -> {
            try {
                string datatosend = "hello from sender";
                system.out.println("sender is sending: " + datatosend);
                exchanger.exchange(datatosend); // 发送数据并等待接收数据
            } catch (interruptedexception e) {
                e.printstacktrace();
            }
        });

        // 创建一个线程来接收数据
        thread receiverthread = new thread(() -> {
            try {
                string receiveddata = exchanger.exchange(null); // 等待接收数据并发送数据
                system.out.println("receiver received: " + receiveddata);
            } catch (interruptedexception e) {
                e.printstacktrace();
            }
        });

        // 启动线程
        senderthread.start();
        receiverthread.start();
    }
}

运行结果

sender is sending: hello from sender
receiver received: hello from sender

在上述示例中,我们创建了一个 exchanger 实例,然后创建了一个发送数据的线程和一个接收数据的线程。

当发送数据的线程调用 exchange() 方法时,它会发送数据并等待接收数据;而接收数据的线程调用 exchange() 方法时,它会等待接收数据并发送数据。当两个线程都到达 exchanger 同步点时,它们会交换数据,并继续执行。

需要注意的是,exchanger 只适用于两个线程之间的数据交换。如果需要更多线程之间的数据交换,可能需要组合使用多个 exchanger。

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com