实现思路
- fastapi 服务器
- celery 任务队列
- rabbitmq 作为消息代理
- 定时任务处理
完整步骤
首先创建项目结构:
c:\users\administrator\desktop\meitu\
├── app/
│ ├── __init__.py
│ ├── main.py
│ ├── celery_app.py
│ ├── tasks.py
│ └── config.py
├── requirements.txt
└── celery_worker.py
1.首先创建 requirements.txt:
fastapi==0.104.1 uvicorn==0.24.0 celery==5.3.4 python-dotenv==1.0.0 requests==2.31.0
2.创建配置文件:
from dotenv import load_dotenv import os load_dotenv() # rabbitmq配置 rabbitmq_host = os.getenv("rabbitmq_host", "localhost") rabbitmq_port = os.getenv("rabbitmq_port", "5672") rabbitmq_user = os.getenv("rabbitmq_user", "guest") rabbitmq_pass = os.getenv("rabbitmq_pass", "guest") # celery配置 celery_broker_url = f"amqp://{rabbitmq_user}:{rabbitmq_pass}@{rabbitmq_host}:{rabbitmq_port}//" celery_result_backend = "rpc://" # 定时任务配置 celery_beat_schedule = { 'process-images-every-hour': { 'task': 'app.tasks.process_images', 'schedule': 3600.0, # 每小时执行一次 }, 'daily-cleanup': { 'task': 'app.tasks.cleanup_old_images', 'schedule': 86400.0, # 每天执行一次 } }
3.创建 celery 应用:
from celery import celery from app.config import celery_broker_url, celery_result_backend, celery_beat_schedule celery_app = celery( 'image_processing', broker=celery_broker_url, backend=celery_result_backend, include=['app.tasks'] ) # 配置定时任务 celery_app.conf.beat_schedule = celery_beat_schedule celery_app.conf.timezone = 'asia/shanghai'
4.创建任务文件:
from app.celery_app import celery_app from app.watermark import imagewatermarker import os from datetime import datetime, timedelta @celery_app.task def add_watermark_task(image_path, text, position='center'): """异步添加水印任务""" watermarker = imagewatermarker() try: result_path = watermarker.add_watermark( image_path=image_path, text=text, position=position ) return {"status": "success", "output_path": result_path} except exception as e: return {"status": "error", "message": str(e)} @celery_app.task def process_images(): """定时处理图片任务""" image_dir = "images/pending" if not os.path.exists(image_dir): return {"status": "error", "message": "pending directory not found"} processed = 0 for image in os.listdir(image_dir): if image.lower().endswith(('.png', '.jpg', '.jpeg')): add_watermark_task.delay( os.path.join(image_dir, image), "自动处理水印", 'center' ) processed += 1 return {"status": "success", "processed": processed} @celery_app.task def cleanup_old_images(): """清理旧图片任务""" output_dir = "images/processed" if not os.path.exists(output_dir): return {"status": "error", "message": "output directory not found"} threshold_date = datetime.now() - timedelta(days=7) cleaned = 0 for image in os.listdir(output_dir): image_path = os.path.join(output_dir, image) if os.path.getctime(image_path) < threshold_date.timestamp(): os.remove(image_path) cleaned += 1 return {"status": "success", "cleaned": cleaned}
5.创建 fastapi 应用:
from fastapi import fastapi, file, uploadfile, backgroundtasks from fastapi.responses import jsonresponse import os from app.tasks import add_watermark_task from app.celery_app import celery_app app = fastapi(title="图片水印处理服务") @app.post("/upload/") async def upload_image( file: uploadfile = file(...), text: str = "水印文本", position: str = "center" ): # 保存上传的文件 file_path = f"images/uploads/{file.filename}" os.makedirs(os.path.dirname(file_path), exist_ok=true) with open(file_path, "wb") as buffer: content = await file.read() buffer.write(content) # 创建异步任务 task = add_watermark_task.delay(file_path, text, position) return jsonresponse({ "status": "success", "message": "图片已上传并加入处理队列", "task_id": task.id }) @app.get("/task/{task_id}") async def get_task_status(task_id: str): task = celery_app.asyncresult(task_id) if task.ready(): return {"status": "completed", "result": task.result} return {"status": "processing"} @app.get("/tasks/scheduled") async def get_scheduled_tasks(): return {"tasks": celery_app.conf.beat_schedule}
6.创建 celery worker 启动文件:
from app.celery_app import celery_app if __name__ == '__main__': celery_app.start()
使用说明
首先安装依赖:
pip install -r requirements.txt
确保 rabbitmq 服务已启动
启动 fastapi 服务器:
uvicorn app.main:app --reload
启动 celery worker:
celery -a celery_worker.celery_app worker --loglevel=info
启动 celery beat(定时任务):
celery -a celery_worker.celery_app beat --loglevel=info
这个系统提供以下功能:
- 通过 fastapi 接口上传图片并异步处理水印
- 使用 celery 处理异步任务队列
- 使用 rabbitmq 作为消息代理
- 支持定时任务:
- 每小时自动处理待处理图片
- 每天清理一周前的旧图片
- 支持任务状态查询
- 支持查看计划任务列表
api 端点:
- post /upload/ - 上传图片并创建水印任务
- get /task/{task_id} - 查询任务状态
- get /tasks/scheduled - 查看计划任务列表
以上就是python fastapi+celery+rabbitmq实现分布式图片水印处理系统的详细内容,更多关于python图片水印的资料请关注代码网其它相关文章!
发表评论