一、项目初始化
1. 创建虚拟环境并安装依赖
# 创建虚拟环境 python3 -m venv myenv source myenv/bin/activate # 安装依赖 pip install django celery redis django-celery-beat
2. 创建 django 项目和应用
# 创建项目 django-admin startproject task_manager cd task_manager # 创建应用 python manage.py startapp tasks
3. 配置项目(task_manager/settings.py)
installed_apps = [ # ... 'django_celery_beat', 'django_celery_results', 'tasks', ] # 数据库配置 databases = { 'default': { 'engine': 'django.db.backends.sqlite3', 'name': base_dir / 'db.sqlite3', } } # celery配置 celery_broker_url = 'redis://localhost:6379/0' celery_result_backend = 'django-db' # 使用django-celery-results存储结果 celery_accept_content = ['json'] celery_task_serializer = 'json' celery_result_serializer = 'json' celery_timezone = 'asia/shanghai'
二、celery 集成配置
1. 创建 celery 应用(task_manager/celery.py)
from __future__ import absolute_import, unicode_literals import os from celery import celery os.environ.setdefault('django_settings_module', 'task_manager.settings') app = celery('task_manager') app.config_from_object('django.conf:settings', namespace='celery') app.autodiscover_tasks() @app.task(bind=true) def debug_task(self): print(f'request: {self.request!r}')
2. 初始化 celery(task_manager/__init__.py)
from __future__ import absolute_import, unicode_literals from .celery import app as celery_app __all__ = ('celery_app',)
三、model 开发
创建任务模型(tasks/models.py)
from django.db import models from django.utils import timezone class scheduledtask(models.model): task_types = ( ('periodic', '周期性任务'), ('one_time', '一次性任务'), ) name = models.charfield('任务名称', max_length=100) task_type = models.charfield('任务类型', max_length=20, choices=task_types) task_function = models.charfield('任务函数', max_length=200) cron_expression = models.charfield('cron表达式', max_length=100, blank=true, null=true) interval_seconds = models.integerfield('间隔秒数', blank=true, null=true) next_run_time = models.datetimefield('下次执行时间', blank=true, null=true) is_active = models.booleanfield('是否激活', default=true) created_at = models.datetimefield('创建时间', auto_now_add=true) updated_at = models.datetimefield('更新时间', auto_now=true) def __str__(self): return self.name class meta: verbose_name = '定时任务' verbose_name_plural = '定时任务列表' class taskexecutionlog(models.model): task = models.foreignkey(scheduledtask, on_delete=models.cascade, related_name='logs') execution_time = models.datetimefield('执行时间', auto_now_add=true) status = models.charfield('执行状态', max_length=20, choices=( ('success', '成功'), ('failed', '失败'), )) result = models.textfield('执行结果', blank=true, null=true) error_message = models.textfield('错误信息', blank=true, null=true) def __str__(self): return f"{self.task.name} - {self.execution_time}" class meta: verbose_name = '任务执行日志' verbose_name_plural = '任务执行日志列表'
迁移数据库
python manage.py makemigrations python manage.py migrate
四、接口开发
1. 创建序列化器(tasks/serializers.py)
from rest_framework import serializers from .models import scheduledtask, taskexecutionlog class scheduledtaskserializer(serializers.modelserializer): class meta: model = scheduledtask fields = '__all__' class taskexecutionlogserializer(serializers.modelserializer): class meta: model = taskexecutionlog fields = '__all__'
2. 创建视图集(tasks/views.py)
from rest_framework import viewsets, status from rest_framework.response import response from .models import scheduledtask, taskexecutionlog from .serializers import scheduledtaskserializer, taskexecutionlogserializer from celery import current_app from django_celery_beat.models import periodictask, intervalschedule, crontabschedule import json class scheduledtaskviewset(viewsets.modelviewset): queryset = scheduledtask.objects.all() serializer_class = scheduledtaskserializer def create(self, request, *args, **kwargs): serializer = self.get_serializer(data=request.data) serializer.is_valid(raise_exception=true) # 创建celery定时任务 task = serializer.save() self._create_celery_task(task) headers = self.get_success_headers(serializer.data) return response(serializer.data, status=status.http_201_created, headers=headers) def update(self, request, *args, **kwargs): partial = kwargs.pop('partial', false) instance = self.get_object() serializer = self.get_serializer(instance, data=request.data, partial=partial) serializer.is_valid(raise_exception=true) # 更新celery定时任务 task = serializer.save() self._update_celery_task(task) return response(serializer.data) def destroy(self, request, *args, **kwargs): instance = self.get_object() # 删除celery定时任务 self._delete_celery_task(instance) self.perform_destroy(instance) return response(status=status.http_204_no_content) def _create_celery_task(self, task): if task.task_type == 'periodic': # 创建间隔调度 schedule, _ = intervalschedule.objects.get_or_create( every=task.interval_seconds, period=intervalschedule.seconds, ) periodictask.objects.create( interval=schedule, name=task.name, task=task.task_function, enabled=task.is_active, args=json.dumps([]), kwargs=json.dumps({}), ) elif task.task_type == 'one_time': # 一次性任务使用eta pass def _update_celery_task(self, task): try: periodic_task = periodictask.objects.get(name=task.name) if task.task_type == 'periodic': schedule, _ = intervalschedule.objects.get_or_create( every=task.interval_seconds, period=intervalschedule.seconds, ) periodic_task.interval = schedule periodic_task.enabled = task.is_active periodic_task.save() except periodictask.doesnotexist: self._create_celery_task(task) def _delete_celery_task(self, task): try: periodic_task = periodictask.objects.get(name=task.name) periodic_task.delete() except periodictask.doesnotexist: pass class taskexecutionlogviewset(viewsets.readonlymodelviewset): queryset = taskexecutionlog.objects.all() serializer_class = taskexecutionlogserializer
3. 配置 url(tasks/urls.py)
from django.urls import include, path from rest_framework import routers from .views import scheduledtaskviewset, taskexecutionlogviewset router = routers.defaultrouter() router.register(r'tasks', scheduledtaskviewset) router.register(r'logs', taskexecutionlogviewset) urlpatterns = [ path('', include(router.urls)), ]
4. 项目 url 配置(task_manager/urls.py)
from django.contrib import admin from django.urls import path, include urlpatterns = [ path('admin/', admin.site.urls), path('api/', include('tasks.urls')), ]
五、创建示例任务
定义任务函数(tasks/tasks.py)
from celery import shared_task from .models import scheduledtask, taskexecutionlog import logging logger = logging.getlogger(__name__) @shared_task(bind=true, autoretry_for=(exception,), retry_backoff=3, retry_kwargs={'max_retries': 3}) def sample_task(self, task_id): try: task = scheduledtask.objects.get(id=task_id) # 模拟任务执行 result = f"任务 {task.name} 执行成功,时间:{str(self.request.time_start)}" # 记录执行日志 taskexecutionlog.objects.create( task=task, status='success', result=result ) logger.info(f"任务执行成功: {task.name}") return result except exception as e: # 记录错误日志 task = scheduledtask.objects.get(id=task_id) if scheduledtask.objects.filter(id=task_id).exists() else none if task: taskexecutionlog.objects.create( task=task, status='failed', error_message=str(e) ) logger.error(f"任务执行失败: {str(e)}") raise
六、启动服务
1. 启动 redis
redis-server
2. 启动 celery worker
celery -a task_manager worker --loglevel=info --pool=prefork --concurrency=4
3. 启动 celery beat
celery -a task_manager beat --loglevel=info --scheduler django_celery_beat.schedulers:databasescheduler
4. 启动 django 开发服务器
python manage.py runserver
七、api 测试
1. 创建周期性任务
curl -x post http://localhost:8000/api/tasks/ -d '{ "name": "示例周期性任务", "task_type": "periodic", "task_function": "tasks.tasks.sample_task", "interval_seconds": 60, "is_active": true }' -h "content-type: application/json"
2. 查看任务列表
curl http://localhost:8000/api/tasks/
3. 查看执行日志
curl http://localhost:8000/api/logs/
项目结构
task_manager/ ├── task_manager/ │ ├── __init__.py │ ├── celery.py │ ├── settings.py │ ├── urls.py │ └── wsgi.py ├── tasks/ │ ├── migrations/ │ ├── __init__.py │ ├── admin.py │ ├── apps.py │ ├── models.py │ ├── serializers.py │ ├── tasks.py │ ├── urls.py │ └── views.py ├── manage.py └── db.sqlite3
关键特性说明
- 动态任务管理:通过 api 创建 / 更新 / 删除定时任务
- 任务执行记录:自动记录任务执行结果和状态
- 失败重试机制:任务失败时自动重试(最多 3 次)
- 多种调度方式:支持周期性任务和一次性任务
- 可视化管理:通过 django admin 界面管理定时任务
扩展建议
- 添加任务参数支持,允许在创建任务时传递参数
- 实现任务暂停 / 恢复功能
- 添加任务优先级队列配置
- 集成监控系统(如 prometheus+grafana)
- 实现任务执行结果的异步通知(邮件、短信等)
这个实现提供了一个完整的 django+celery 定时任务系统,支持动态管理和监控,可直接用于生产环境。
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。
发表评论