题目
现有100个请求需要发送,请设计一个算法,使用promise来控制并发(并发数量最大为10),来完成100个请求;
首先先模拟下 100 个请求:
// 请求列表 const requestlist = []; // 为了方便查看,i从1开始计数 for (let i = 1; i <= 100; i++) { requestlist.push( () => new promise(resolve => { settimeout(() => { console.log('done', i); resolve(i); }, math.random() * 1000); }), ); }
promise.all()
初次 看到这个问题,相信大部分同学第一个想到的肯定是 promise.all
,因为它是最常见的并发请求方式,下面来实现一下:
const parallelrun = async max => { const requestslicelist = []; for (let i = 0; i < requestlist.length; i += max) { requestslicelist.push(requestlist.slice(i, i + max)); } for (let i = 0; i < requestslicelist.length; i++) { const group = requestslicelist[i]; try { const res = await promise.all(group.map(fn => fn())); console.log('接口返回值为:', res); } catch (error) { console.error(error); } } };
看下效果:
效果不错!!
每次都是并发 10 个请求,当这 10 个请求都完成返回时,继续下一个 10 个请求,完美实现需求;
可是此时面试官问:如果这里边有一个请求失败了会怎样?
我:额.......,不确定
面试官:回去等通知吧!
虽然回家等通知了,但这道面试题还是得弄明白,修改下模拟请求,使其随机产生一个错误,修改如下:
// 请求列表 const requestlist = []; for (let i = 1; i <= 100; i++) { requestlist.push( () => new promise((resolve, reject) => { settimeout(() => { if (i === 92) { reject(new error('出错了,出错请求:' + i)); } else { console.log('done', i); resolve(i); } }, math.random() * 1000); }), ); }
控制台看下运行结果:
有一个请求失败了,这个 promise.all
就失败了,没有返回值
一组中一个请求失败就无法获取改组其他成员的返回值,这对于不需要判断返回值的情况倒是可以,但是实际业务中,返回值是一个很重要的数据
我们可以接受某个接口失败了没有返回值,但是无法接受一个请求失败了,跟它同组的其他 9 个请求也没有返回值
既然,失败的请求会打断 promise.all
,那有没有一种方法可以不被失败打断呢?
还真有,它就是 promise.allsettled
!
promise.allsettled()
先来看下权威的 mdn 的介绍
promise.allsettled() 方法是 promise 并发方法之一。在你有多个不依赖于彼此成功完成的异步任务时,或者你总是想知道每个 promise 的结果时,使用 promise.allsettled()
简单说就是:每个请求都会返回结果,不管失败还是成功
使用 promise.allsettled()
替换下 promise.all()
:
const parallelrun = async max => { const requestslicelist = []; for (let i = 0; i < requestlist.length; i += max) { requestslicelist.push(requestlist.slice(i, i + max)); } for (let i = 0; i < requestslicelist.length; i++) { const group = requestslicelist[i]; try { // 使用 allsettled 替换 all const res = await promise.allsettled(group.map(fn => fn())); console.log('接口返回值为:', res); } catch (error) { console.error(error); } } };
看下返回结果:
可以看到,接口全部正常有返回值,返回值中会正常记录当前请求时成功还是失败
不错哦,感觉 promise.allsettled()
就是最优解了!
此时面试官又问:那如果有一个请求非常耗时,会出现什么情况?
答:有一个请求非常耗时,那组的请求返回就会很慢,会阻塞了后续的接口并发。
面试官:有没有什么方法可以解决这个问题?
我 :额...... 不知道......
面试官:回去等通知吧~~~
最优解
分析问题
使用 promise.all()
或是 promise.allsettled()
,每次并发 10 个请求,确实可以满足并发要求,但是效率较低:如果存在一个或多个慢接口,那么会出现以下两个问题:
- 有慢接口的并发组返回会很慢,一个慢接口拖慢了其他 9 个接口,得不偿失
- 本来我们是可以并发 10 个请求的,但是一个慢接口导致该组的其他 9 个并发位置都被浪费了,这会导致这 100 个接口的并发时间被无情拉长
- 慢接口组后续的并发组都被阻塞了,更慢了
解决方法
有没有办法解决上述问题呢,答案是肯定的:
可以维护一个运行池和一个等待队列,运行池始终保持 10 个请求并发,当运行池中有一个请求完成时,就从等待队列中拿出一个新请求放到运行池中运行,这样就可以保持运行池始终是满负荷运行,即使有一个慢接口,也不会阻塞后续的接口入池
代码实现
// 运行池 const pool = new set(); // 等待队列 const waitqueue = []; /** * @description: 限制并发数量的请求 * @param {*} reqfn:请求方法 * @param {*} max:最大并发数 */ const request = (reqfn, max) => { return new promise((resolve, reject) => { // 判断运行吃是否已满 const isfull = pool.size >= max; // 包装的新请求 const newreqfn = () => { reqfn() .then(res => { resolve(res); }) .catch(err => { reject(err); }) .finally(() => { // 请求完成后,将该请求从运行池中删除 pool.delete(newreqfn); // 从等待队列中取出一个新请求放入等待运行池执行 const next = waitqueue.shift(); if (next) { pool.add(next); next(); } }); }; if (isfull) { // 如果运行池已满,则将新的请求放到等待队列中 waitqueue.push(newreqfn); } else { // 如果运行池未满,则向运行池中添加一个新请求并执行该请求 pool.add(newreqfn); newreqfn(); } }); }; requestlist.foreach(async item => { const res = await request(item, 10); console.log(res); });
效果
可以看到,100 个接口不断执行,并没有任何等待或是被阻塞的现象,完美!
其他优秀库
社区已有很多优秀的并发限制库,这里重点介绍下 p-limit
安装:
npm install p-limit -s
使用方法:
import plimit from 'p-limit'; const limit = plimit(10); requestlist.foreach(async item => { const res = await limit(item); console.log(res); });
运行效果与上面的队列的运行效果是一致的。下面看下库源码(精简后):
import queue from 'yocto-queue'; export default function plimit(concurrency) { const queue = new queue(); let activecount = 0; const next = () => { activecount--; if (queue.size > 0) { queue.dequeue()(); } }; const run = async (function_, resolve, arguments_) => { activecount++; const result = (async () => function_(...arguments_))(); resolve(result); try { await result; } catch {} next(); }; const enqueue = (function_, resolve, arguments_) => { queue.enqueue(run.bind(undefined, function_, resolve, arguments_)); (async () => { // this function needs to wait until the next microtask before comparing // `activecount` to `concurrency`, because `activecount` is updated asynchronously // when the run function is dequeued and called. the comparison in the if-statement // needs to happen asynchronously as well to get an up-to-date value for `activecount`. await promise.resolve(); if (activecount < concurrency && queue.size > 0) { queue.dequeue()(); } })(); }; const generator = (function_, ...arguments_) => new promise(resolve => { enqueue(function_, resolve, arguments_); }); return generator; }
短短 60 行代码就实现了一个功能强大的并发处理库,真是厉害,下面分析下具体实现:
- 首先 p-limit 库默认导出一个函数
plimit
,该函数接收一个数字,表示最大并发数 plimit
函数函数返回一个generator
函数,该函数返回一个promise
,并且其中调用了enqueue
函数enqueue
函数主要是将run
函数加入队列queue
中,之后判断下activecount < concurrency && queue.size > 0
,表示当前队列大小小于最大并发数且队列不为空,则需要从队列中取出一个请求执行,即执行run
函数run
函数执行时需要先将activecount
加一,之后执行真正的请求函数(async () => function_(...arguments_))()
- 之后等待请求完成
await result;
之后执行next
函数 next
函数主要从队列中取出一个新请求执行并将activecount
减一
总结
本文主要总结了 100 个请求限制并发的方法:
promise.all()
最简单的控制并发,但是请求出错会导致该组无返回值promise.allsettled()
解决了promise.all()
的问题,但是却存在慢接口阻塞后续请求,且浪费其余并发位置的问题- 通过维护一个运行池,当运行池中有请求完成时便从等待队列中取一个心情求入池执行,直到所有的请求都入池
- 介绍了社区的
p-limit
库的使用方法和实现原理
到此这篇关于javascript实现控制并发请求的方法详解的文章就介绍到这了,更多相关javascript控制并发请求内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论