一、项目初始化
1. 创建虚拟环境并安装依赖
# 创建虚拟环境 python3 -m venv env source env/bin/activate # 安装依赖 pip install django celery redis django-celery-beat aliyun-python-sdk-core-v3 aliyun-python-sdk-mq mysqlclient
2. 创建 django 项目和应用
# 创建项目 django-admin startproject rocketmq_manager cd rocketmq_manager # 创建应用 python manage.py startapp rocketmq
3. 配置 mysql 数据库(rocketmq_manager/settings.py)
databases = { 'default': { 'engine': 'django.db.backends.mysql', 'name': 'rocketmq_manager', # 数据库名 'user': 'your_username', # 用户名 'password': 'your_password', # 密码 'host': 'localhost', # 主机 'port': '3306', # 端口 'options': { 'init_command': "set sql_mode='strict_trans_tables'", }, } }
4. 配置项目其他设置(rocketmq_manager/settings.py)
installed_apps = [ # ... 'django_celery_beat', 'django_celery_results', 'rocketmq', ] # celery配置 celery_broker_url = 'redis://localhost:6379/0' celery_result_backend = 'django-db' celery_accept_content = ['json'] celery_task_serializer = 'json' celery_result_serializer = 'json' celery_timezone = 'asia/shanghai' # 阿里云配置(从环境变量获取) aliyun_access_key_id = os.environ.get('aliyun_access_key_id') aliyun_access_key_secret = os.environ.get('aliyun_access_key_secret') aliyun_region_id = os.environ.get('aliyun_region_id', 'cn-hangzhou')
二、celery 集成配置
1. 创建 celery 应用(rocketmq_manager/celery.py)
from __future__ import absolute_import, unicode_literals import os from celery import celery os.environ.setdefault('django_settings_module', 'rocketmq_manager.settings') app = celery('rocketmq_manager') app.config_from_object('django.conf:settings', namespace='celery') app.autodiscover_tasks() @app.task(bind=true) def debug_task(self): print(f'request: {self.request!r}')
2. 初始化 celery(rocketmq_manager/__init__.py)
from __future__ import absolute_import, unicode_literals from .celery import app as celery_app __all__ = ('celery_app',)
三、model 开发
创建 rocketmq 实例模型(rocketmq/models.py)
python
运行
from django.db import models from django.utils import timezone class rocketmqinstance(models.model): instance_id = models.charfield('实例id', max_length=100, unique=true) instance_name = models.charfield('实例名称', max_length=200, blank=true, null=true) instance_type = models.charfield('实例类型', max_length=50, blank=true, null=true) region_id = models.charfield('区域id', max_length=50) status = models.charfield('状态', max_length=50, blank=true, null=true) create_time = models.datetimefield('创建时间', blank=true, null=true) expire_time = models.datetimefield('过期时间', blank=true, null=true) tags = models.jsonfield('标签', blank=true, null=true) last_updated = models.datetimefield('最后更新时间', auto_now=true) def __str__(self): return f"{self.instance_name} ({self.instance_id})" class meta: verbose_name = 'rocketmq实例' verbose_name_plural = 'rocketmq实例列表' indexes = [ models.index(fields=['instance_id', 'region_id']), ] class instancesynclog(models.model): sync_time = models.datetimefield('同步时间', auto_now_add=true) instance_count = models.integerfield('实例数量', default=0) success = models.booleanfield('是否成功', default=true) error_message = models.textfield('错误信息', blank=true, null=true) execution_time = models.floatfield('执行时间(秒)', blank=true, null=true) def __str__(self): return f"同步记录 - {self.sync_time}" class meta: verbose_name = '实例同步日志' verbose_name_plural = '实例同步日志列表' ordering = ['-sync_time']
迁移数据库
python manage.py makemigrations python manage.py migrate
四、定时任务代码
创建阿里云 api 客户端(rocketmq/aliyun_client.py)
import os from aliyunsdkcore.client import acsclient from aliyunsdkcore.acs_exception.exceptions import clientexception from aliyunsdkcore.acs_exception.exceptions import serverexception from aliyunsdkmq.model.v20190513 import describeinstancesrequest import json import time class aliyunrocketmqclient: def __init__(self): self.access_key_id = os.environ.get('aliyun_access_key_id') self.access_key_secret = os.environ.get('aliyun_access_key_secret') self.region_id = os.environ.get('aliyun_region_id', 'cn-hangzhou') self.client = acsclient(self.access_key_id, self.access_key_secret, self.region_id) def get_instances(self): try: request = describeinstancesrequest.describeinstancesrequest() request.set_accept_format('json') # 添加重试机制 max_retries = 3 for attempt in range(max_retries): try: response = self.client.do_action_with_exception(request) return json.loads(response) except (clientexception, serverexception) as e: if attempt < max_retries - 1: wait_time = (attempt + 1) * 2 print(f"请求失败,{wait_time}秒后重试: {str(e)}") time.sleep(wait_time) else: raise except exception as e: print(f"获取实例信息失败: {str(e)}") raise
定义定时任务(rocketmq/tasks.py)
from celery import shared_task from .models import rocketmqinstance, instancesynclog from .aliyun_client import aliyunrocketmqclient import logging from datetime import datetime import time logger = logging.getlogger(__name__) @shared_task(bind=true, autoretry_for=(exception,), retry_backoff=5, retry_kwargs={'max_retries': 3}) def sync_rocketmq_instances(self): start_time = time.time() try: client = aliyunrocketmqclient() response = client.get_instances() # 处理响应数据 instance_list = [] if 'data' in response and 'instancedolist' in response['data']: for item in response['data']['instancedolist']: instance = { 'instance_id': item.get('instanceid', ''), 'instance_name': item.get('instancename', ''), 'instance_type': item.get('instancetype', ''), 'region_id': item.get('regionid', ''), 'status': item.get('instancestatus', ''), 'create_time': datetime.fromtimestamp(item.get('createtime', 0) / 1000) if item.get('createtime') else none, 'expire_time': datetime.fromtimestamp(item.get('expiretime', 0) / 1000) if item.get('expiretime') else none, 'tags': item.get('tags', {}) } instance_list.append(instance) # 使用事务批量更新数据库 from django.db import transaction with transaction.atomic(): # 先删除不存在的实例(可选) # existing_ids = [item['instance_id'] for item in instance_list] # rocketmqinstance.objects.exclude(instance_id__in=existing_ids).delete() # 批量更新或创建实例 for instance_data in instance_list: rocketmqinstance.objects.update_or_create( instance_id=instance_data['instance_id'], defaults=instance_data ) execution_time = time.time() - start_time # 记录同步日志 log = instancesynclog.objects.create( instance_count=len(instance_list), success=true, execution_time=execution_time ) logger.info(f"成功同步 {len(instance_list)} 个rocketmq实例,耗时: {execution_time:.2f}秒") return f"同步完成,共 {len(instance_list)} 个实例,耗时: {execution_time:.2f}秒" except exception as e: execution_time = time.time() - start_time # 记录错误日志 instancesynclog.objects.create( success=false, error_message=str(e), execution_time=execution_time ) logger.error(f"同步rocketmq实例失败: {str(e)},耗时: {execution_time:.2f}秒") raise
五、接口开发
1. 创建序列化器(rocketmq/serializers.py)
from rest_framework import serializers from .models import rocketmqinstance, instancesynclog class rocketmqinstanceserializer(serializers.modelserializer): class meta: model = rocketmqinstance fields = '__all__' read_only_fields = ['last_updated'] class instancesynclogserializer(serializers.modelserializer): class meta: model = instancesynclog fields = '__all__' read_only_fields = ['sync_time', 'instance_count', 'success', 'error_message', 'execution_time']
2. 创建视图集(rocketmq/views.py)
from rest_framework import viewsets, status from rest_framework.response import response from .models import rocketmqinstance, instancesynclog from .serializers import rocketmqinstanceserializer, instancesynclogserializer from .tasks import sync_rocketmq_instances from rest_framework.decorators import action from rest_framework.permissions import isauthenticated from rest_framework.authentication import tokenauthentication class rocketmqinstanceviewset(viewsets.modelviewset): queryset = rocketmqinstance.objects.all() serializer_class = rocketmqinstanceserializer authentication_classes = [tokenauthentication] permission_classes = [isauthenticated] @action(detail=false, methods=['post']) def sync_now(self, request): """立即触发实例同步""" task = sync_rocketmq_instances.delay() return response({'task_id': task.id, 'message': '同步任务已启动'}, status=status.http_202_accepted) @action(detail=false, methods=['get']) def regions(self, request): """获取所有区域列表""" regions = rocketmqinstance.objects.values_list('region_id', flat=true).distinct() return response(regions, status=status.http_200_ok) class instancesynclogviewset(viewsets.readonlymodelviewset): queryset = instancesynclog.objects.all().order_by('-sync_time') serializer_class = instancesynclogserializer authentication_classes = [tokenauthentication] permission_classes = [isauthenticated]
3. 配置 url(rocketmq/urls.py)
from django.urls import include, path from rest_framework import routers from .views import rocketmqinstanceviewset, instancesynclogviewset router = routers.defaultrouter() router.register(r'instances', rocketmqinstanceviewset) router.register(r'sync-logs', instancesynclogviewset) urlpatterns = [ path('', include(router.urls)), ]
4. 项目 url 配置(rocketmq_manager/urls.py)
from django.contrib import admin from django.urls import path, include from rest_framework.authtoken.views import obtain_auth_token urlpatterns = [ path('admin/', admin.site.urls), path('api/', include('rocketmq.urls')), path('api/token/', obtain_auth_token, name='api_token_auth'), # 获取认证令牌 ]
六、配置定时任务
在settings.py中添加定时任务配置
celery_beat_schedule = { 'sync-rocketmq-instances': { 'task': 'rocketmq.tasks.sync_rocketmq_instances', 'schedule': 3600.0, # 每小时执行一次 'args': () }, }
七、启动服务
1. 设置环境变量
export aliyun_access_key_id=your_access_key_id export aliyun_access_key_secret=your_access_key_secret export aliyun_region_id=cn-hangzhou # 根据实际情况修改
2. 启动 redis
redis-server
3. 启动 celery worker
celery -a rocketmq_manager worker --loglevel=info --pool=prefork --concurrency=4
4. 启动 celery beat
celery -a rocketmq_manager beat --loglevel=info --scheduler django_celery_beat.schedulers:databasescheduler
5. 启动 django 开发服务器
python manage.py runserver
八、api 测试
1. 获取认证令牌
curl -x post -d "username=your_username&password=your_password" http://localhost:8000/api/token/
2. 获取 rocketmq 实例列表
curl -h "authorization: token your_token_here" http://localhost:8000/api/instances/
3. 获取同步日志
curl -h "authorization: token your_token_here" http://localhost:8000/api/sync-logs/
4. 手动触发同步
curl -x post -h "authorization: token your_token_here" http://localhost:8000/api/instances/sync_now/
项目结构
rocketmq_manager/ ├── rocketmq_manager/ │ ├── __init__.py │ ├── celery.py │ ├── settings.py │ ├── urls.py │ └── wsgi.py ├── rocketmq/ │ ├── migrations/ │ ├── __init__.py │ ├── admin.py │ ├── apps.py │ ├── aliyun_client.py │ ├── models.py │ ├── serializers.py │ ├── tasks.py │ ├── urls.py │ └── views.py ├── manage.py └── db.sqlite3
关键特性说明
- mysql 存储:使用 mysql 数据库存储 rocketmq 实例信息和同步日志
- 定时同步:每小时自动拉取阿里云 rocketmq 实例信息
- 数据持久化:将实例信息存储到数据库,支持索引加速查询
- 手动触发:提供 api 接口支持手动触发同步
- 错误处理:任务失败自动重试,记录详细的同步日志和执行时间
- 权限控制:使用 token 认证保护 api 接口
扩展建议
- 添加更多阿里云 api 调用,获取更详细的实例指标(如 tps、消息堆积量等)
- 实现多区域支持,同时监控多个地域的 rocketmq 实例
- 添加告警机制,当实例状态异常或同步失败时发送通知
- 集成缓存系统(如 redis),提高接口响应速度
- 添加 api 限流功能,防止恶意请求
- 实现实例信息的导出功能,支持数据报表生成
这个实现提供了一个完整的 django+celery 定时拉取阿里云 rocketmq 实例信息的解决方案,使用 mysql 存储数据,支持权限控制和手动触发同步,可直接用于生产环境。
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。
发表评论