当前位置: 代码网 > it编程>前端脚本>Python > Python FastAPI+Celery+RabbitMQ实现分布式图片水印处理系统

Python FastAPI+Celery+RabbitMQ实现分布式图片水印处理系统

2025年04月04日 Python 我要评论
实现思路fastapi 服务器celery 任务队列rabbitmq 作为消息代理定时任务处理完整步骤首先创建项目结构:c:\users\administrator\desktop\meitu\├──

实现思路

  • fastapi 服务器
  • celery 任务队列
  • rabbitmq 作为消息代理
  • 定时任务处理

完整步骤

首先创建项目结构:

c:\users\administrator\desktop\meitu\
├── app/
│   ├── __init__.py
│   ├── main.py
│   ├── celery_app.py
│   ├── tasks.py
│   └── config.py
├── requirements.txt
└── celery_worker.py

1.首先创建 requirements.txt:

fastapi==0.104.1
uvicorn==0.24.0
celery==5.3.4
python-dotenv==1.0.0
requests==2.31.0

2.创建配置文件:

from dotenv import load_dotenv
import os

load_dotenv()

# rabbitmq配置
rabbitmq_host = os.getenv("rabbitmq_host", "localhost")
rabbitmq_port = os.getenv("rabbitmq_port", "5672")
rabbitmq_user = os.getenv("rabbitmq_user", "guest")
rabbitmq_pass = os.getenv("rabbitmq_pass", "guest")

# celery配置
celery_broker_url = f"amqp://{rabbitmq_user}:{rabbitmq_pass}@{rabbitmq_host}:{rabbitmq_port}//"
celery_result_backend = "rpc://"

# 定时任务配置
celery_beat_schedule = {
    'process-images-every-hour': {
        'task': 'app.tasks.process_images',
        'schedule': 3600.0,  # 每小时执行一次
    },
    'daily-cleanup': {
        'task': 'app.tasks.cleanup_old_images',
        'schedule': 86400.0,  # 每天执行一次
    }
}

3.创建 celery 应用:

from celery import celery
from app.config import celery_broker_url, celery_result_backend, celery_beat_schedule

celery_app = celery(
    'image_processing',
    broker=celery_broker_url,
    backend=celery_result_backend,
    include=['app.tasks']
)

# 配置定时任务
celery_app.conf.beat_schedule = celery_beat_schedule
celery_app.conf.timezone = 'asia/shanghai'

4.创建任务文件:

from app.celery_app import celery_app
from app.watermark import imagewatermarker
import os
from datetime import datetime, timedelta

@celery_app.task
def add_watermark_task(image_path, text, position='center'):
    """异步添加水印任务"""
    watermarker = imagewatermarker()
    try:
        result_path = watermarker.add_watermark(
            image_path=image_path,
            text=text,
            position=position
        )
        return {"status": "success", "output_path": result_path}
    except exception as e:
        return {"status": "error", "message": str(e)}

@celery_app.task
def process_images():
    """定时处理图片任务"""
    image_dir = "images/pending"
    if not os.path.exists(image_dir):
        return {"status": "error", "message": "pending directory not found"}

    processed = 0
    for image in os.listdir(image_dir):
        if image.lower().endswith(('.png', '.jpg', '.jpeg')):
            add_watermark_task.delay(
                os.path.join(image_dir, image),
                "自动处理水印",
                'center'
            )
            processed += 1

    return {"status": "success", "processed": processed}

@celery_app.task
def cleanup_old_images():
    """清理旧图片任务"""
    output_dir = "images/processed"
    if not os.path.exists(output_dir):
        return {"status": "error", "message": "output directory not found"}

    threshold_date = datetime.now() - timedelta(days=7)
    cleaned = 0

    for image in os.listdir(output_dir):
        image_path = os.path.join(output_dir, image)
        if os.path.getctime(image_path) < threshold_date.timestamp():
            os.remove(image_path)
            cleaned += 1

    return {"status": "success", "cleaned": cleaned}

5.创建 fastapi 应用:

from fastapi import fastapi, file, uploadfile, backgroundtasks
from fastapi.responses import jsonresponse
import os
from app.tasks import add_watermark_task
from app.celery_app import celery_app

app = fastapi(title="图片水印处理服务")

@app.post("/upload/")
async def upload_image(
    file: uploadfile = file(...),
    text: str = "水印文本",
    position: str = "center"
):
    # 保存上传的文件
    file_path = f"images/uploads/{file.filename}"
    os.makedirs(os.path.dirname(file_path), exist_ok=true)
    
    with open(file_path, "wb") as buffer:
        content = await file.read()
        buffer.write(content)
    
    # 创建异步任务
    task = add_watermark_task.delay(file_path, text, position)
    
    return jsonresponse({
        "status": "success",
        "message": "图片已上传并加入处理队列",
        "task_id": task.id
    })

@app.get("/task/{task_id}")
async def get_task_status(task_id: str):
    task = celery_app.asyncresult(task_id)
    if task.ready():
        return {"status": "completed", "result": task.result}
    return {"status": "processing"}

@app.get("/tasks/scheduled")
async def get_scheduled_tasks():
    return {"tasks": celery_app.conf.beat_schedule}

6.创建 celery worker 启动文件:

from app.celery_app import celery_app

if __name__ == '__main__':
    celery_app.start()

使用说明

首先安装依赖:

pip install -r requirements.txt

确保 rabbitmq 服务已启动

启动 fastapi 服务器:

uvicorn app.main:app --reload

启动 celery worker:

celery -a celery_worker.celery_app worker --loglevel=info

启动 celery beat(定时任务):

celery -a celery_worker.celery_app beat --loglevel=info

这个系统提供以下功能:

  • 通过 fastapi 接口上传图片并异步处理水印
  • 使用 celery 处理异步任务队列
  • 使用 rabbitmq 作为消息代理
  • 支持定时任务:
    • 每小时自动处理待处理图片
    • 每天清理一周前的旧图片
  • 支持任务状态查询
  • 支持查看计划任务列表

api 端点:

  • post /upload/ - 上传图片并创建水印任务
  • get /task/{task_id} - 查询任务状态
  • get /tasks/scheduled - 查看计划任务列表

以上就是python fastapi+celery+rabbitmq实现分布式图片水印处理系统的详细内容,更多关于python图片水印的资料请关注代码网其它相关文章!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com