当前位置: 代码网 > it编程>编程语言>Java > java8 Future异步调用实现方式

java8 Future异步调用实现方式

2025年09月30日 Java 我要评论
一、同步与异步调用概念同步api:调用方在调用某个方法后,等待被调用方返回结果;调用方在取得被调用方的返回值后,再继续运行。调用方顺序执行,同步等待被调用方的返回值,这就是阻塞式调用。异步api:调用

一、同步与异步调用概念

  • 同步api:调用方在调用某个方法后,等待被调用方返回结果;调用方在取得被调用方的返回值后,再继续运行。调用方顺序执行,同步等待被调用方的返回值,这就是阻塞式调用。
  • 异步api:调用方在调用某个方法后,直接返回,不需要等待被调用方返回结果;被调用方开启一个线程处理任务,调用方可以同时去处理其他工作。调用方和被调用方是异步的,这就是非阻塞式调用。

在java种,future用来完成异步工作任务,极大地提高了程序的运行效率。

二、future实现异步调用

2.1 future实现异步调用的方式

在java8之前,直接使用future以异步的方式执行一个耗时的操作。通过这种编程方式,调用方线程使用executorservice,以并发方式调用另一个线程,在执行耗时操作的同时,去执行一些其他的任务。

代码示例:

package com.mvp.test;

import org.junit.test;

import java.util.arrays;
import java.util.concurrent.callable;
import java.util.concurrent.executionexception;
import java.util.concurrent.executorservice;
import java.util.concurrent.executors;
import java.util.concurrent.future;
import java.util.concurrent.timeunit;
import java.util.concurrent.timeoutexception;

public class futureusetest {
    //创建executor-service,实现向线程池提交任务
    executorservice executor = executors.newcachedthreadpool();

    @test
    public void futuretest() {
        long start = system.nanotime();
        //向 executor-service 提交一个callable 对象
        future<double> future = executor.submit(new callable<double>() {
            public double call() {
                //以异步方式在新的线程中执行耗时的操作
                return dosomelongcomputation(start);
            }
        });
        //异步操作进行的同时,你可以做其他的事情
        dosomethingelse(start);
        double result = 0.0;
        try {
            //获取异步操作的结果,如果最终被阻塞,无法得到结果,那么在最多等待1秒钟之后退出
            result = future.get(2, timeunit.seconds);
//            result = future.get();
        } catch (executionexception ee) {
            system.out.println("executionexception="  + arrays.tostring(ee.getstacktrace()));
            // 计算抛出一个异常
        } catch (interruptedexception ie) {
            system.out.println("interruptedexception="  + arrays.tostring(ie.getstacktrace()));
            // 当前线程在等待过程中被中断
        } catch (timeoutexception te) {
            system.out.println("timeoutexception="  + arrays.tostring(te.getstacktrace()));
            // 在future对象完成之前超过已过期
        }
        system.out.println("全部计算完成,耗时:"+ (system.nanotime() - start) / 1_000_000 + " msecs");
        system.out.println("result=" + result);
    }

    public double dosomelongcomputation(long start) {
        delaylong();
        system.out.println("异步执行一个长的计算,耗时:" + (system.nanotime() - start) / 1_000_000 + " msecs");
        return 65000.00;
    }
    public void dosomethingelse(long start) {
        delay();
        system.out.println("当前线程做别的计算,耗时:"+ (system.nanotime() - start) / 1_000_000 + " msecs");
    }

    private void delay() {
        try {
            thread.sleep( (long) (math.random() * 1000));
        } catch (interruptedexception e) {
            throw new runtimeexception(e);
        }
    }

    private void delaylong() {
        try {
            thread.sleep(1500);
        } catch (interruptedexception e) {
            throw new runtimeexception(e);
        }
    }
}

运行结果为:

当前线程做别的计算,耗时:403 msecs
异步执行一个长的计算,耗时:1506 msecs
全部计算完成,耗时:1506 msecs
result=65000.0

2.2 使用completablefuture来实现异步调用

在java中,引入了completablefuture,更为方便地实现异步调用。

代码示例为:

package com.mvp.test;

import org.junit.test;

import java.util.random;
import java.util.concurrent.completablefuture;
import java.util.concurrent.future;

public class completablefutureusettest {

    private double calculateasyncprice(string product) {
        delaylong();
        double price = new random().nextdouble()* 1000 + 150;
        // system.out.println("calculate price of " + product + "is: " + price);
        return price;
    }

    private double calculateprice(string product) {
        delay();
        double price = new random().nextdouble()* 1000 + 150;
        // system.out.println("calculate price of " + product + "is: " + price);
        return price;
    }

    public future<double> getpriceasync(string product) {
        // 创建 completablefuture对象,它会包含计算的结果
        completablefuture<double> futureprice = new completablefuture<>();
        // 在另一个线程中以异步方式执行计算
        new thread( () -> {
            system.out.println("异步线程处理中");
            try {
                // 如果价格计算正常结束,完成future操作,并设置商品价格
                double price = calculateasyncprice(product);
                // 设置future的返回值,用以获得需长时间计算的任务的结果
                futureprice.complete(price);
            } catch (exception ex) {
                // 若存在导致失败的异常,则强制这次future操作异常结束,并抛出future完成异常
                futureprice.completeexceptionally(ex);
            }
        }).start();
        // 无需等待,直接返回 future 对象
        return futureprice;
    }

    public double getpricedirect(long start, string product) {
        double price = calculateprice(product);
        system.out.println("当前线程去查询" + product + "的价格,耗时:"+ (system.nanotime() - start) / 1_000_000 + " msecs");
        return price;
    }

    @test
    public void completablefuturetest() {
        // 执行异步任务
        long startnanotime = system.nanotime();
        future<double> futureprice = getpriceasync("篮球");
        long returnfuturenanotime = system.nanotime();
        long invocationtime = ((returnfuturenanotime - startnanotime) / 1_000_000);
        system.out.println("调用getpriceasyc方法直接返回,耗时: " + invocationtime + " msecs");

        // 执行同步任务
        double pricedirect = getpricedirect(returnfuturenanotime, "足球");

        double priceasync = 0.0;
        try {
            priceasync = futureprice.get();
//            priceasync = futureprice.get(1, timeunit.seconds);
        } catch (exception e) {
            //throw new runtimeexception(e);
            system.out.println("exception=" + e.tostring());
        }
        system.out.printf("篮球和足球的总价格是: %.2f, futureprice.get()耗时=%s msecs %n", priceasync + pricedirect, (system.nanotime() - returnfuturenanotime) / 1_000_000);

        long retrievaltime = ((system.nanotime() - startnanotime) / 1_000_000);
        system.out.println("总耗时:" + retrievaltime + " msecs");
    }

    private void delay() {
        try {
            //thread.sleep( (long) (math.random() * 1000));
            thread.sleep( 200);
        } catch (interruptedexception e) {
            throw new runtimeexception(e);
        }
    }

    private void delaylong() {
        try {
            thread.sleep( 1500);
        } catch (interruptedexception e) {
            throw new runtimeexception(e);
        }
    }
}

运行结果为:

调用getpriceasyc方法直接返回,耗时: 199 msecs
异步线程处理中
当前线程去查询足球的价格,耗时:201 msecs
篮球和足球的总价格是: 914.33, futureprice.get()耗时=1500 msecs
总耗时:1704 msecs

completablefuture类提供了大量精巧的工厂方法,使用这些方法能更容易地完成整个流程,不需担心实现的细节。

例如,在采用supplyasync方法后,可以用一行语句重写上例中的getpriceasync方法,如下所示:

package com.mvp.test;

import org.junit.test;

import java.util.random;
import java.util.concurrent.completablefuture;
import java.util.concurrent.future;

public class completablefuturesupplyasynctest {

    private double calculateasyncprice(string product) {
        delaylong();
        double price = new random().nextdouble()* 1000 + 150;
        //system.out.println("calculate price of " + product + "is: " + price);
        return price;
    }

    private double calculateprice(string product) {
        delay();
        double price = new random().nextdouble()* 1000 + 150;
        //system.out.println("calculate price of " + product + "is: " + price);
        return price;
    }

    //使用工厂方法 supplyasync 创建 completablefuture 对象
    public future<double> getpriceasync(string product) {
        return completablefuture.supplyasync(() -> calculateasyncprice(product));
    }

    public double getpricedirect(long start, string product) {
        double price = calculateprice(product);
        system.out.println("当前线程去查询" + product + "的价格, 耗时:"+ (system.nanotime() - start) / 1_000_000 + " msecs");
        return price;
    }

    @test
    public void futuresupplyasynctest() {
        // 执行异步任务
        long startnanotime = system.nanotime();
        future<double> futureprice = getpriceasync("篮球");
        long returnfuturenanotime = system.nanotime();
        long invocationtime = ((returnfuturenanotime - startnanotime) / 1_000_000);
        system.out.println("调用getpriceasyc方法直接返回,耗时: " + invocationtime + " msecs");

        // 执行同步任务
        long startsyncnanotime = system.nanotime();
        double pricedirect = getpricedirect(startsyncnanotime, "足球");

        double priceasync = 0.0;
        try {
            priceasync = futureprice.get();
            // priceasync = futureprice.get(1, timeunit.seconds);
        } catch (exception e) {
            //throw new runtimeexception(e);
            system.out.println("exception=" + e.tostring());
        }
        system.out.printf("篮球和足球的总价格是: %.2f, futureprice.get() 耗时:%s msecs %n", priceasync + pricedirect, (system.nanotime() - returnfuturenanotime) / 1_000_000);

        long retrievaltime = ((system.nanotime() - startnanotime) / 1_000_000);
        system.out.println("总耗时:" + retrievaltime + " msecs");
    }

    private void delay() {
        try {
            //thread.sleep( (long) (math.random() * 1000));
            thread.sleep( 200);
        } catch (interruptedexception e) {
            throw new runtimeexception(e);
        }
    }

    private void delaylong() {
        try {
            thread.sleep( 1500);
        } catch (interruptedexception e) {
            throw new runtimeexception(e);
        }
    }
}

运行结果为:

调用getpriceasyc方法直接返回,耗时: 176 msecs
当前线程去查询足球的价格, 耗时:204 msecs
篮球和足球的总价格是: 997.99, futureprice.get() 耗时:1502 msecs 
总耗时:1681 msecs

supplyasync方法接受一个生产者(supplier)作为参数,返回一个completablefuture对象(在完成异步执行后,该对象会读取异步方法的返回值)。

异步方法会交由forkjoinpool池中的某个执行器(executor)运行,也可以使用supplyasync方法的重载版本,传递第2个参数指定不同的执行器(executor)执行异步方法。

一般而言,向completablefuture的工厂方法传递可选参数,指定异步方法的执行器。

三、流顺序执行、并行、并发–异步执行、并发–自定义异步执行比较

对流顺序执行、并行、并发–异步执行、并发–自定义异步执行进行比较,代码如下:

package com.mvp.test;

import org.junit.test;

import java.util.arrays;
import java.util.list;
import java.util.random;
import java.util.concurrent.completablefuture;
import java.util.concurrent.executor;
import java.util.concurrent.executors;
import java.util.concurrent.threadfactory;
import java.util.stream.collectors;

public class compareparallelfutureusetest {
    list<string> shopnames = arrays.aslist("北京华联", "华润", "沃尔玛", "大润发", "万果园", "一峰");

    private double calculateprice(string product) {
        double price = new random().nextdouble()* 1000 + 150;
        //system.out.println("calculate price of " + product + "is: " + price);
        return price;
    }

    public double getprice(string product) {
        return calculateprice(product);
    }

    /**
     * 使用流顺序计算
     * @param product 商品名称
     * @return 列表
     */
    public list<string> findprices(string product) {
        return shopnames.stream()
                .map(shopname -> string.format("%s 价格: %.2f", shopname, getprice(product)))
                .collect(collectors.tolist());
    }

    /**
     * 使用流并行计算
     * @param product 商品名称
     * @return 列表
     */
    public list<string> findpricesparallel(string product) {
        return shopnames.parallelstream()
                .map(shopname -> string.format("%s 价格: %.2f", shopname, getprice(product)))
                .collect(collectors.tolist());
    }

    /**
     * 异步运算
     * @param product 商品名称
     * @return 列表
     */
    public list<string> findpricesfuture(string product) {
        list<completablefuture<string>> pricefutures = shopnames.stream()
                .map(shopname -> completablefuture.supplyasync(() -> string.format("%s 价格: %.2f", shopname, getprice(product))))
                .collect(collectors.tolist());
        //completablefuture类中的join方法 和 future接口中的get方法 有相同的含义
        return pricefutures.stream()
                .map(completablefuture::join)
                .collect(collectors.tolist());
    }

    //创建一个线程池,其线程数目为100和商店数目二者中较小的一个值
    private final executor executor1 = executors.newfixedthreadpool(math.min(shopnames.size(), 10),
            new threadfactory() {
                public thread newthread(runnable r) {
                    thread t = new thread(r);
                    // 使用守护线程。这种方式不会阻止程序的关停。
                    t.setdaemon(true);
                    return t;
                }
        });

    /**
     * 异步运算:使用定制的执行器(调整线程池的大小)
     * @param product 商品名称
     * @return 列表
     */
    public list<string> findpricesfuturecustom(string product) {
        list<completablefuture<string>> pricefutures = shopnames.stream()
                .map(shopname -> completablefuture.supplyasync(() -> string.format("%s 价格: %.2f", shopname, getprice(product)), executor1))
                .collect(collectors.tolist());

        return pricefutures.stream()
                .map(completablefuture::join)
                .collect(collectors.tolist());
    }

    @test
    public void futurecomparetest() {
        long start = system.nanotime();
        system.out.println(findprices("羽毛球"));
        system.out.println("使用流顺序计算 done in " + (system.nanotime() - start) / 1_000_000 + " msecs");

        start = system.nanotime();
        system.out.println(findpricesparallel("羽毛球"));
        system.out.println("使用流并行计算 done in " + (system.nanotime() - start) / 1_000_000 + " msecs");

        start = system.nanotime();
        system.out.println(findpricesfuture("羽毛球"));
        system.out.println("并发future异步运算(默认执行器) done in " + (system.nanotime() - start) / 1_000_000 + " msecs");

        //并行和并发不相伯仲,究其原因都一样:它们内部采用的是同样的通用线程池,默认都使用固定数目的线程,具体线程数取决于
        // runtime.getruntime().availableprocessors() 的返回值。
        // 然而,completablefuture具有一定的优势,因为它允许你对执行器(executor)进行配置,尤其是线程池的大小。
        start = system.nanotime();
        system.out.println(findpricesfuturecustom("羽毛球"));
        system.out.println("并发future异步运算(定制执行器:调整线程池的大小) done in " + (system.nanotime() - start) / 1_000_000 + " msecs");
    }
}

运行结果为:

[北京华联 价格: 552.91, 华润 价格: 173.53, 沃尔玛 价格: 981.30, 大润发 价格: 339.54, 万果园 价格: 872.71, 一峰 价格: 338.87]
使用流顺序计算 done in 148 msecs
[北京华联 价格: 475.23, 华润 价格: 991.62, 沃尔玛 价格: 469.81, 大润发 价格: 1140.04, 万果园 价格: 199.57, 一峰 价格: 210.05]
使用流并行计算 done in 5 msecs
[北京华联 价格: 723.78, 华润 价格: 546.76, 沃尔玛 价格: 979.16, 大润发 价格: 402.02, 万果园 价格: 770.86, 一峰 价格: 601.99]
并发future异步运算(默认执行器) done in 5 msecs
[北京华联 价格: 854.24, 华润 价格: 1000.75, 沃尔玛 价格: 1103.58, 大润发 价格: 355.49, 万果园 价格: 849.84, 一峰 价格: 1051.99]
并发future异步运算(定制执行器:调整线程池的大小) done in 4 msecs

从运行结果可以看出,流并行计算、异步运算、自定义执行器异步运算的效率比流顺序计算要高很多。

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com