当前位置: 代码网 > it编程>前端脚本>Python > Python asyncio异步编程中5大踩坑实录

Python asyncio异步编程中5大踩坑实录

2026年03月18日 Python 我要评论
上周接了个需求,把一个同步爬虫改成异步的。老板说「应该很快吧,不就是加几个 async await 嘛」。我当时也觉得是,结果整整踩了三天坑,有两天搞到凌晨一点多。今天把这些坑整理出来,希望后面的兄弟

上周接了个需求,把一个同步爬虫改成异步的。老板说「应该很快吧,不就是加几个 async await 嘛」。我当时也觉得是,结果整整踩了三天坑,有两天搞到凌晨一点多。今天把这些坑整理出来,希望后面的兄弟们少走点弯路。

先说结论

问题描述严重程度排查耗时
在 async 函数里调同步阻塞代码⭐⭐⭐⭐⭐4h
忘了 await 导致拿到协程对象⭐⭐⭐30min
aiohttp session 没正确关闭⭐⭐⭐⭐2h
事件循环嵌套(loop 里套 loop)⭐⭐⭐⭐⭐5h
并发量没控制导致被封 ip⭐⭐⭐⭐1h(加上等解封的时间就不止了)

下面一个一个说。

坑 1:async 函数里混入了同步阻塞代码

最致命的坑,因为它不报错,只是慢。

我原来的代码长这样:

import asyncio
import requests  # 注意这是同步库

async def fetch_page(url: str) -> str:
    # 看起来很正常对吧?但 requests.get 是同步阻塞的
    resp = requests.get(url, timeout=10)
    return resp.text

async def main():
    urls = [f"https://httpbin.org/delay/1" for _ in range(10)]
    tasks = [fetch_page(url) for url in urls]
    results = await asyncio.gather(*tasks)
    print(f"抓了 {len(results)} 个页面")

asyncio.run(main())

跑完一看,10 个请求花了 10 秒多。不对啊,不是异步吗,不应该 1 秒多就完事?

requests.get() 是同步阻塞调用。放在 async 函数里也一样,执行的时候还是会阻塞整个事件循环。asyncio 的事件循环是单线程的,一个任务阻塞了,其他任务全得等着。

给函数加 async 关键字不会让里面的同步代码变成异步的,这只是声明了这个函数是个协程。

正确做法是换成 aiohttp:

import asyncio
import aiohttp

async def fetch_page(session: aiohttp.clientsession, url: str) -> str:
    async with session.get(url, timeout=aiohttp.clienttimeout(total=10)) as resp:
        return await resp.text()

async def main():
    urls = [f"https://httpbin.org/delay/1" for _ in range(10)]
    async with aiohttp.clientsession() as session:
        tasks = [fetch_page(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        print(f"抓了 {len(results)} 个页面")

asyncio.run(main())

这回 10 个请求 1.2 秒搞定。

如果实在没法替换同步库——比如某些数据库驱动只有同步版本——可以用 asyncio.to_thread() 把同步调用丢到线程池里:

import asyncio
import requests

def sync_fetch(url: str) -> str:
    """这是个普通同步函数"""
    resp = requests.get(url, timeout=10)
    return resp.text

async def fetch_page(url: str) -> str:
    # python 3.9+ 可用,把同步函数丢到线程池执行
    return await asyncio.to_thread(sync_fetch, url)

async def main():
    urls = [f"https://httpbin.org/delay/1" for _ in range(10)]
    tasks = [fetch_page(url) for url in urls]
    results = await asyncio.gather(*tasks)
    print(f"抓了 {len(results)} 个页面")

asyncio.run(main())

asyncio.to_thread 是 python 3.9 加的,还在用 3.8 的话(该升了兄弟),用 loop.run_in_executor(none, sync_fetch, url) 也行。

坑 2:忘了 await,拿到一个协程对象

刚写 asyncio 的时候真的很容易犯:

import asyncio

async def get_data():
    await asyncio.sleep(1)
    return {"status": "ok", "count": 42}

async def main():
    data = get_data()  # 忘了 await!
    print(data)         # <coroutine object get_data at 0x...>
    print(data["status"])  # typeerror: 'coroutine' object is not subscriptable

asyncio.run(main())

控制台还会给你一个 warning:runtimewarning: coroutine 'get_data' was never awaited

这个 warning 其实挺明显的,但日志多的时候,或者在 jupyter 里跑,可能就淹没了。

解决方案就是别忘了 await:

async def main():
    data = await get_data()  # 加上 await
    print(data["status"])     # ok

我后来养成了一个习惯:凡是调用 async 函数,ide 没有高亮 await 关键字的,都多看一眼。用 pycharm 或者 cursor 的话,忘了 await 会有提示,这个功能真的能救命。

坑 3:aiohttp session 没正确关闭

这个坑比较隐蔽。代码跑完会报一个 warning:

unclosed client session
client_session: <aiohttp.client.clientsession object at 0x...>

我一开始的写法:

import aiohttp
import asyncio

async def fetch(url: str) -> str:
    session = aiohttp.clientsession()  # 每次调用都创建新 session
    resp = await session.get(url)
    text = await resp.text()
    # 忘了关 session
    return text

async def main():
    urls = ["https://httpbin.org/get"] * 50
    tasks = [fetch(url) for url in urls]
    results = await asyncio.gather(*tasks)
    print(f"完成 {len(results)} 个请求")

asyncio.run(main())

这段代码有两个问题。每次请求都创建新 session:aiohttp 的 session 内部维护了连接池,频繁创建销毁等于放弃了连接复用,性能白白浪费。session 没关闭:会导致底层连接泄漏,请求量大了之后文件描述符耗尽,直接崩。

正确写法:

import aiohttp
import asyncio

async def fetch(session: aiohttp.clientsession, url: str) -> str:
    async with session.get(url) as resp:
        return await resp.text()

async def main():
    urls = ["https://httpbin.org/get"] * 50
    # 用 async with 确保 session 最终被关闭
    async with aiohttp.clientsession() as session:
        tasks = [fetch(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        print(f"完成 {len(results)} 个请求")

asyncio.run(main())

一个 session 搞定所有请求,用 async with 保证关闭。

坑 4:事件循环嵌套,这个真的折磨人

这个坑出现在我想在已有的 flask 项目里调用 asyncio 代码的时候。

import asyncio

async def async_work():
    await asyncio.sleep(1)
    return "done"

def sync_handler():
    # 在同步代码里调异步函数
    result = asyncio.run(async_work())  # 第一次调没问题
    return result

# 但如果外层已经有事件循环在跑(比如 jupyter、某些框架内部):
# runtimeerror: asyncio.run() cannot be called from a running event loop

在 jupyter notebook 里这个问题 100% 必现,因为 jupyter 自己就有一个事件循环在跑。

我试过几种方案:

方案 a:nest_asyncio(快速解决,但不太优雅)

import nest_asyncio
nest_asyncio.apply()  # 允许事件循环嵌套

import asyncio

async def async_work():
    await asyncio.sleep(1)
    return "done"

# 现在 jupyter 里也能用了
result = asyncio.run(async_work())
print(result)

这个库就是打了个猴子补丁让嵌套合法化,jupyter 里用用可以,生产环境我不太敢。

方案 b:用线程跑独立的事件循环(推荐)

import asyncio
from concurrent.futures import future
import threading

def run_async_in_thread(coro):
    """在独立线程中启动新的事件循环来执行协程"""
    result_future: future = future()
    
    def _run():
        try:
            loop = asyncio.new_event_loop()
            asyncio.set_event_loop(loop)
            result = loop.run_until_complete(coro)
            result_future.set_result(result)
        except exception as e:
            result_future.set_exception(e)
        finally:
            loop.close()
    
    thread = threading.thread(target=_run)
    thread.start()
    thread.join()
    return result_future.result()

async def async_work():
    await asyncio.sleep(1)
    return "done"

# 在同步代码里安全调用异步函数
result = run_async_in_thread(async_work())
print(result)  # done

这个方案在 flask 项目里跑得挺稳。当然如果项目可以全面切异步框架(fastapi、starlette),就没这个问题了。我后来把那个 flask 服务迁到 fastapi 了,世界清净了很多。

坑 5:并发量不控制,直接被封 ip

这个坑跟 asyncio 本身关系不大,但用了 asyncio 之后几乎必然会遇到。

同步爬虫天然就慢,很少触发限流。换成异步以后,几百个请求瞬间打出去,对面服务器直接把你封了。

import asyncio
import aiohttp

# 用信号量控制并发数
sem = asyncio.semaphore(10)  # 最多 10 个并发

async def fetch(session: aiohttp.clientsession, url: str) -> str:
    async with sem:  # 获取信号量,超过 10 个就等着
        print(f"开始请求: {url}")
        async with session.get(url) as resp:
            text = await resp.text()
        # 加个随机延迟,别太暴力
        await asyncio.sleep(0.5)
        return text

async def main():
    urls = [f"https://httpbin.org/get?page={i}" for i in range(100)]
    
    connector = aiohttp.tcpconnector(limit=20)  # 连接池也限制一下
    async with aiohttp.clientsession(connector=connector) as session:
        tasks = [fetch(session, url) for url in urls]
        results = await asyncio.gather(*tasks, return_exceptions=true)
        
        success = sum(1 for r in results if not isinstance(r, exception))
        failed = sum(1 for r in results if isinstance(r, exception))
        print(f"成功: {success}, 失败: {failed}")

asyncio.run(main())

几个关键点:

  • asyncio.semaphore:控制并发数的核心,比自己手写队列靠谱多了
  • tcpconnector(limit=20):限制底层 tcp 连接数
  • return_exceptions=true:让 gather 不会因为一个任务报错就全部取消,失败的任务会返回异常对象
  • 加延迟:await asyncio.sleep() 是异步的,不会阻塞别的任务。time.sleep() 会阻塞整个循环——回到坑 1

额外说一个:异步代码的异常处理

不算坑但容易忽略。asyncio.gather 默认行为是一个任务抛异常就取消其他所有任务:

import asyncio

async def good_task():
    await asyncio.sleep(1)
    return "我执行完了"

async def bad_task():
    await asyncio.sleep(0.5)
    raise valueerror("我炸了")

async def main():
    try:
        # 默认行为:bad_task 一炸,good_task 也被取消
        results = await asyncio.gather(good_task(), bad_task())
    except valueerror as e:
        print(f"捕获到异常: {e}")
    
    print("---")
    
    # 加 return_exceptions=true:不会互相影响
    results = await asyncio.gather(
        good_task(), bad_task(), return_exceptions=true
    )
    for r in results:
        if isinstance(r, exception):
            print(f"任务失败: {r}")
        else:
            print(f"任务成功: {r}")

asyncio.run(main())

生产环境基本都要加 return_exceptions=true,不然一个请求失败整批全废,太亏了。

小结

回过头来看,asyncio 的核心概念不复杂:事件循环 + 协程 + await。但坑基本都出在异步和同步的边界上:同步代码混进异步函数会阻塞整个循环;同步环境调异步代码会循环嵌套冲突;session 忘关会泄漏;并发量不控制下游扛不住。

我个人的经验是,小项目别硬上 asyncio。只是写个脚本抓十几个页面,多线程 + requests 完全够用,代码还好理解。asyncio 真正发挥威力的场景是高并发 io 密集型服务,比如 api 网关、websocket 服务、大批量数据采集。

还有就是,写异步代码之前先确认用到的所有库都有异步版本。requests → aiohttp,psycopg2 → asyncpg,redis-py 现在自带 async 支持了。如果核心依赖没有异步版本,硬上 asyncio 意义不大,到处 to_thread 反而更乱。

到此这篇关于python asyncio异步编程中5大踩坑实录的文章就介绍到这了,更多相关python asyncio异步编程内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2026  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com