一、celery依赖安装
# python 3.11版本 pip install celery redis django-celery-beat django_celery_results eventlet
二、celery 参数配置
django项目的settings.py中新增如下celery配置
##
installed_apps = [
......
'firewall_app',
'django_celery_beat', # 添加celery应用
'django_celery_results', # 添加celery结果展示应用
]
# celery configuration options
# 使用 redis 作为消息代理
celery_broker_url = 'redis://localhost:6379/0' # 或 'amqp://guest:guest@localhost:5672//' 如果使用 rabbitmq
celery_result_backend = 'django-db'
celery_accept_content = ['json']
celery_task_serializer = 'json'
celery_result_serializer = 'json'
celery_timezone = 'asia/shanghai' # 设置时区
celery_enable_utc = true
# celery beat settings (如果使用定时任务)
celery_beat_scheduler = 'django_celery_beat.schedulers:databasescheduler' # 如果希望在 django admin 中管理定时任务,需要安装 django-celery-beat
# 或者使用默认的本地调度器:
# celery_beat_scheduler = 'celery.beat:persistentscheduler'
celery_broker_connection_retry_on_startup = true
#日志输出配置
logging = {
'version': 1,
'disable_existing_loggers': false,
'handlers': {
'file': {
'level': 'info',
'class': 'logging.filehandler',
'filename': 'celery.log', # 日志文件路径
},
},
'loggers': {
'firewall_app.tasks': { # 匹配您的任务模块
'handlers': ['file'],
'level': 'info',
'propagate': true,
},
},
}三、celery定时任务
1、新建tasks.py
import logging
import time
from celery import shared_task
logger = logging.getlogger(__name__)
def run_crawler_logic():
print("执行爬虫任务...")
# 在这里调用 fortinetcrawler 或相关爬虫函数
# crawler = fortinetcrawler()
# crawler.run()
time.sleep(10) # 模拟任务执行
print("爬虫任务完成.")
def run_mapping_logic():
print("执行漏洞映射任务...")
# 在这里调用 map_vulnerabilities_for_all_firewalls 或相关函数
# map_vulnerabilities_for_all_firewalls()
# 推迟导入爬虫函数,避免循环引用
time.sleep(5) # 模拟任务执行
print("漏洞映射任务完成.")
@shared_task
def run_crawler_task():
"""celery task for running the web crawler."""
# 确保 django 环境已设置 (如果任务需要访问 django 模型)
# 如果 celery worker 和 django 运行在同一环境,通常不需要手动设置
# 但为了保险起见,可以加上
# if not django.apps.apps.ready:
# os.environ.setdefault('django_settings_module', 'firewall_monitor.settings')
# django.setup()
logger.info("漏洞爬虫任务开始执行")
run_crawler_logic()
logger.info("漏洞爬虫任务完成")
return "漏洞爬虫任务成功执行"
@shared_task
def run_firewall_mapping_task():
"""celery task for running the firewall vulnerability mapping."""
# 同上,确保 django 环境
# if not django.apps.apps.ready:
# os.environ.setdefault('django_settings_module', 'firewall_monitor.settings')
# django.setup()
logger.info("防火墙漏洞映射任务开始执行")
run_mapping_logic()
logger.info("防火墙漏洞映射任务完成")
return "防火墙漏洞映射任务成功执行" 2、新建celery.py
import os
from celery import celery
os.environ.setdefault("django_settings_module", "mysite.settings")
app = celery('firewall_monitor')
# 使用 django settings.py 中的配置
app.config_from_object('django.conf:settings', namespace='celery')
# 自动发现应用中的任务(最好指定tasks路径)
app.autodiscover_tasks(['mysite.tasks'])
@app.task(bind=true)
def debug_task(self):
print(f'request: {self.request!r}')(3) __init__.py添加
from .celery import app as celery_app
__all__ = ('celery_app',)四、celery任务启动
# 1、redis-x64-5.0.14.1 window版本 # redis启动 redis-server.exe redis.windows.conf # 2、启动djiango项目 python manage.py runserver 8000 # 3、celery beat启动 celery -a mysite beat -l info --scheduler django_celery_beat.schedulers:databasescheduler # 4、celery 任务启动 celery -a mysite worker --pool=solo -l info -p eventlet
五、celery任务配置
启动djiango项目后,在django-admin后台配置定时任务。

六、celery项目结构

到此这篇关于django celery定时任务的文章就介绍到这了,更多相关django celery定时任务内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论