一、503 错误产生的原因
在 http 协议中,503 错误表示服务器当前无法处理请求,通常是因为服务器暂时过载或维护。在多线程爬虫场景下,503 错误可能由以下几种原因引起:
- 服务器负载过高:当多个线程同时向服务器发送请求时,服务器可能因负载过高而拒绝部分请求,返回 503 错误。
- 请求频率过快:如果爬虫的请求频率超过了服务器的处理能力,服务器可能会认为这是一种攻击行为,从而返回 503 错误。
- 服务器配置问题:某些服务器可能配置了特定的防护机制,如防火墙或反爬虫策略,当检测到异常请求时会返回 503 错误。
- 网络问题:网络不稳定或代理服务器故障也可能导致 503 错误。
二、503 错误处理的最佳实践
(一)合理控制并发线程数量
过多的并发线程会增加服务器的负载,导致 503 错误。因此,合理控制并发线程的数量是避免 503 错误的关键。可以通过设置线程池来限制并发线程的数量。
import concurrent.futures
import requests
def fetch_url(url):
try:
response = requests.get(url)
response.raise_for_status()
return response.text
except requests.exceptions.httperror as e:
if e.response.status_code == 503:
print(f"503 error occurred for {url}")
# handle 503 error
else:
raise
def main():
urls = ["http://example.com/page1", "http://example.com/page2", ...]
max_workers = 10 # 控制并发线程数量
with concurrent.futures.threadpoolexecutor(max_workers=max_workers) as executor:
futures = [executor.submit(fetch_url, url) for url in urls]
for future in concurrent.futures.as_completed(futures):
try:
data = future.result()
# process data
except exception as e:
print(f"error: {e}")
if __name__ == "__main__":
main()
(二)设置合理的请求间隔
为了避免因请求频率过快导致的 503 错误,可以在请求之间设置合理的间隔时间。这可以通过在请求代码中添加 <font style="color:rgba(0, 0, 0, 0.9);background-color:rgba(0, 0, 0, 0.03);">time.sleep()</font> 来实现。
import time
import requests
def fetch_url(url):
try:
response = requests.get(url)
response.raise_for_status()
return response.text
except requests.exceptions.httperror as e:
if e.response.status_code == 503:
print(f"503 error occurred for {url}")
# handle 503 error
else:
raise
def main():
urls = ["http://example.com/page1", "http://example.com/page2", ...]
for url in urls:
fetch_url(url)
time.sleep(1) # 设置请求间隔为 1 秒
if __name__ == "__main__":
main()
(三)使用代理服务器和用户代理
使用代理服务器可以隐藏爬虫的真实 ip 地址,减少被服务器封禁的风险。同时,代理服务器可以分散请求,降低单个 ip 的请求频率。服务器可能会根据请求的用户代理(user-agent)来判断请求是否来自爬虫。通过设置随机的用户代理,可以降低被服务器识别为爬虫的风险。
import requests
import time
from requests.adapters import httpadapter
from urllib3.util.retry import retry
# 代理配置
proxyhost = "www.16yun.cn"
proxyport = "5445"
proxyuser = "16qmsoml"
proxypass = "280651"
# 用户代理池
user_agents = [
"mozilla/5.0 (windows nt 10.0; win64; x64) applewebkit/537.36 (khtml, like gecko) chrome/58.0.3029.110 safari/537.3",
"mozilla/5.0 (windows nt 6.1; wow64) applewebkit/537.36 (khtml, like gecko) chrome/51.0.2704.103 safari/537.36",
"mozilla/5.0 (macintosh; intel mac os x 10_15_7) applewebkit/605.1.15 (khtml, like gecko) version/14.0.3 safari/605.1.15",
"mozilla/5.0 (windows nt 10.0; win64; x64; rv:88.0) gecko/20100101 firefox/88.0"
]
def get_proxy():
"""获取认证代理"""
return f"http://{proxyuser}:{proxypass}@{proxyhost}:{proxyport}"
def create_session():
"""创建带有重试机制的会话"""
session = requests.session()
retry_strategy = retry(
total=3,
backoff_factor=1,
status_forcelist=[500, 502, 503, 504]
)
adapter = httpadapter(max_retries=retry_strategy)
session.mount("http://", adapter)
session.mount("https://", adapter)
return session
def fetch_url(url):
"""获取url内容"""
session = create_session()
proxy = get_proxy()
headers = {"user-agent": random.choice(user_agents)}
try:
response = session.get(
url,
proxies={"http": proxy, "https": proxy},
headers=headers,
timeout=10
)
response.raise_for_status()
print(f"成功获取: {url} [状态码: {response.status_code}]")
return response.text
except requests.exceptions.httperror as e:
if e.response.status_code == 503:
print(f"503错误: {url} - 服务器暂时不可用")
# 可以在这里添加重试逻辑或记录到日志
else:
print(f"http错误 {e.response.status_code}: {url}")
raise
except exception as e:
print(f"请求异常: {url} - {str(e)}")
raise
def main():
"""主函数"""
urls = [
"http://example.com/page1",
"http://example.com/page2",
"http://example.com/page3"
]
for url in urls:
try:
fetch_url(url)
time.sleep(1) # 请求间隔
except exception as e:
print(f"处理 {url} 时出错: {e}")
continue
if __name__ == "__main__":
import random # 为user_agents随机选择
main()
(四)重试机制
当遇到 503 错误时,可以设置重试机制,等待一段时间后再次尝试请求。这可以通过 <font style="color:rgba(0, 0, 0, 0.9);background-color:rgba(0, 0, 0, 0.03);">requests</font> 库的 <font style="color:rgba(0, 0, 0, 0.9);background-color:rgba(0, 0, 0, 0.03);">session</font> 对象和 <font style="color:rgba(0, 0, 0, 0.9);background-color:rgba(0, 0, 0, 0.03);">retry</font> 类来实现。
import requests
from requests.adapters import httpadapter
from requests.packages.urllib3.util.retry import retry
def fetch_url(url):
session = requests.session()
retries = retry(total=5, backoff_factor=1, status_forcelist=[503])
session.mount("http://", httpadapter(max_retries=retries))
try:
response = session.get(url)
response.raise_for_status()
return response.text
except requests.exceptions.httperror as e:
if e.response.status_code == 503:
print(f"503 error occurred for {url}")
# handle 503 error
else:
raise
def main():
urls = ["http://example.com/page1", "http://example.com/page2", ...]
for url in urls:
fetch_url(url)
if __name__ == "__main__":
main()
三、综合实践案例
以下是一个综合运用上述最佳实践的完整代码示例:
import concurrent.futures
import requests
import time
import random
from requests.adapters import httpadapter
from requests.packages.urllib3.util.retry import retry
user_agents = [
"mozilla/5.0 (windows nt 10.0; win64; x64) applewebkit/537.36 (khtml, like gecko) chrome/58.0.3029.110 safari/537.3",
"mozilla/5.0 (windows nt 6.1; wow64) applewebkit/537.36 (khtml, like gecko) chrome/51.0.2704.103 safari/537.36",
# 添加更多用户代理
]
proxies = ["http://proxy1.example.com:8080", "http://proxy2.example.com:8080", ...]
def fetch_url(url):
headers = {"user-agent": random.choice(user_agents)}
session = requests.session()
retries = retry(total=5, backoff_factor=1, status_forcelist=[503])
session.mount("http://", httpadapter(max_retries=retries))
try:
response = session.get(url, headers=headers, proxies=random.choice(proxies))
response.raise_for_status()
return response.text
except requests.exceptions.httperror as e:
if e.response.status_code == 503:
print(f"503 error occurred for {url}")
# handle 503 error
else:
raise
def main():
urls = ["http://example.com/page1", "http://example.com/page2", ...]
max_workers = 10 # 控制并发线程数量
with concurrent.futures.threadpoolexecutor(max_workers=max_workers) as executor:
futures = [executor.submit(fetch_url, url) for url in urls]
for future in concurrent.futures.as_completed(futures):
try:
data = future.result()
# process data
except exception as e:
print(f"error: {e}")
time.sleep(1) # 设置请求间隔为 1 秒
if __name__ == "__main__":
main()
四、总结
在 python 爬虫多线程并发时,503 错误是一个常见的问题。通过合理控制并发线程数量、设置合理的请求间隔、使用代理服务器、添加重试机制和伪装用户代理等方法,可以有效降低 503 错误的发生概率,提高爬虫的稳定性和可靠性。在实际开发中,开发者需要根据目标网站的具体情况,灵活运用这些最佳实践方法,以确保爬虫的高效运行。
到此这篇关于python多线程并发时出现503错误的最佳处理的文章就介绍到这了,更多相关python处理503错误内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论