1. 基础下载:
import requests def download_file(url, save_path): response = requests.get(url, stream=true) if response.status_code == 200: with open(save_path, 'wb') as f: f.write(response.content) return true return false # 使用示例 url = "https://example.com/file.pdf" download_file(url, "file.pdf")
2. 大文件分块下载:
import requests from tqdm import tqdm def download_large_file(url, save_path): response = requests.get(url, stream=true) if response.status_code == 200: file_size = int(response.headers.get('content-length', 0)) # 显示进度条 progress = tqdm(response.iter_content(chunk_size=8192), total=file_size, unit='b', unit_scale=true) with open(save_path, 'wb') as f: for data in progress: f.write(data) return true return false
3. 带有断点续传的下载:
import requests import os def resume_download(url, save_path): # 获取已下载文件大小 initial_pos = os.path.getsize(save_path) if os.path.exists(save_path) else 0 # 设置 header headers = {'range': f'bytes={initial_pos}-'} response = requests.get(url, stream=true, headers=headers) # 追加模式打开文件 mode = 'ab' if initial_pos > 0 else 'wb' with open(save_path, mode) as f: for chunk in response.iter_content(chunk_size=8192): if chunk: f.write(chunk)
4. 带有超时和重试的下载:
import requests from requests.adapters import httpadapter from requests.packages.urllib3.util.retry import retry import time def download_with_retry(url, save_path, max_retries=3, timeout=30): session = requests.session() # 设置重试策略 retries = retry(total=max_retries, backoff_factor=1, status_forcelist=[500, 502, 503, 504]) session.mount('http://', httpadapter(max_retries=retries)) session.mount('https://', httpadapter(max_retries=retries)) try: response = session.get(url, stream=true, timeout=timeout) with open(save_path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): if chunk: f.write(chunk) return true except exception as e: print(f"download failed: {str(e)}") return false
5. 完整的下载器实现:
import requests from tqdm import tqdm import os from pathlib import path import hashlib class filedownloader: def __init__(self, chunk_size=8192): self.chunk_size = chunk_size self.session = requests.session() def get_file_size(self, url): response = self.session.head(url) return int(response.headers.get('content-length', 0)) def get_file_hash(self, file_path): sha256_hash = hashlib.sha256() with open(file_path, "rb") as f: for byte_block in iter(lambda: f.read(4096), b""): sha256_hash.update(byte_block) return sha256_hash.hexdigest() def download(self, url, save_path, verify_hash=none): save_path = path(save_path) # 创建目录 save_path.parent.mkdir(parents=true, exist_ok=true) # 获取文件大小 file_size = self.get_file_size(url) # 设置进度条 progress = tqdm(total=file_size, unit='b', unit_scale=true, desc=save_path.name) try: response = self.session.get(url, stream=true) with save_path.open('wb') as f: for chunk in response.iter_content(chunk_size=self.chunk_size): if chunk: f.write(chunk) progress.update(len(chunk)) progress.close() # 验证文件完整性 if verify_hash: downloaded_hash = self.get_file_hash(save_path) if downloaded_hash != verify_hash: raise valueerror("file hash verification failed") return true except exception as e: progress.close() print(f"download failed: {str(e)}") if save_path.exists(): save_path.unlink() return false def download_multiple(self, url_list, save_dir): results = [] for url in url_list: filename = url.split('/')[-1] save_path = path(save_dir) / filename success = self.download(url, save_path) results.append({ 'url': url, 'success': success, 'save_path': str(save_path) }) return results # 使用示例 downloader = filedownloader() # 单文件下载 url = "https://example.com/file.pdf" downloader.download(url, "downloads/file.pdf") # 多文件下载 urls = [ "https://example.com/file1.pdf", "https://example.com/file2.pdf" ] results = downloader.download_multiple(urls, "downloads")
总结
到此这篇关于python requests下载文件的几种常用方法的文章就介绍到这了,更多相关python requests下载文件内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论