当前位置: 代码网 > it编程>前端脚本>Python > Python操作Kubernetes集群的完全指南

Python操作Kubernetes集群的完全指南

2026年05月13日 Python 我要评论
基础环境准备1. 安装必要的包首先,我们需要安装python的kubernetes客户端库:pip install kubernetespip install openshift # 可选,用于ope

基础环境准备

1. 安装必要的包

首先,我们需要安装python的kubernetes客户端库:

pip install kubernetes
pip install openshift # 可选,用于openshift集群

2. 配置文件准备

import os
from kubernetes import client, config

# 加载kubeconfig配置
config.load_kube_config()

python kubernetes客户端介绍

主要模块说明

from kubernetes import client, config, watch
from kubernetes.client import apiclient
from kubernetes.client.rest import apiexception

主要模块功能:

  • client: 提供各种api操作接口
  • config: 处理配置文件加载
  • watch: 用于监控资源变化
  • apiclient: 底层api客户端
  • apiexception: 异常处理

连接kubernetes集群

示例1:基础连接配置

from kubernetes import client, config

def connect_kubernetes():
    try:
        # 加载本地kubeconfig
        config.load_kube_config()
        
        # 创建api客户端
        v1 = client.corev1api()
        
        # 测试连接
        ret = v1.list_pod_for_all_namespaces(limit=1)
        print("连接成功!发现 {} 个pod".format(len(ret.items)))
        return v1
    except exception as e:
        print(f"连接失败:{str(e)}")
        return none

# 测试连接
api = connect_kubernetes()

示例2:多集群配置

def connect_multiple_clusters():
    clusters = {
        'prod': '/path/to/prod-kubeconfig',
        'dev': '/path/to/dev-kubeconfig'
    }
    
    apis = {}
    for cluster_name, config_file in clusters.items():
        try:
            config.load_kube_config(config_file=config_file)
            apis[cluster_name] = client.corev1api()
            print(f"成功连接到{cluster_name}集群")
        except exception as e:
            print(f"连接{cluster_name}集群失败:{str(e)}")
    
    return apis

pod操作实战

示例3:创建pod

from kubernetes import client, config

def create_pod(name, image, namespace="default"):
    # 创建pod对象
    pod = client.v1pod(
        metadata=client.v1objectmeta(name=name),
        spec=client.v1podspec(
            containers=[
                client.v1container(
                    name=name,
                    image=image,
                    ports=[client.v1containerport(container_port=80)]
                )
            ]
        )
    )
    
    # 获取api实例
    v1 = client.corev1api()
    
    try:
        # 创建pod
        api_response = v1.create_namespaced_pod(
            namespace=namespace,
            body=pod
        )
        print(f"pod {name} 创建成功")
        return api_response
    except apiexception as e:
        print(f"pod创建失败:{str(e)}")
        return none

# 使用示例
create_pod("nginx-pod", "nginx:latest")

示例4:查询pod状态

def get_pod_status(name, namespace="default"):
    v1 = client.corev1api()
    try:
        pod = v1.read_namespaced_pod(name=name, namespace=namespace)
        return {
            "name": pod.metadata.name,
            "status": pod.status.phase,
            "pod_ip": pod.status.pod_ip,
            "host_ip": pod.status.host_ip,
            "start_time": pod.status.start_time,
            "conditions": [
                {
                    "type": condition.type,
                    "status": condition.status
                }
                for condition in pod.status.conditions or []
            ]
        }
    except apiexception as e:
        print(f"获取pod状态失败:{str(e)}")
        return none

# 使用示例
status = get_pod_status("nginx-pod")
print(status)

deployment管理

示例5:创建deployment

def create_deployment(name, image, replicas=3, namespace="default"):
    # 创建deployment对象
    deployment = client.v1deployment(
        metadata=client.v1objectmeta(name=name),
        spec=client.v1deploymentspec(
            replicas=replicas,
            selector=client.v1labelselector(
                match_labels={"app": name}
            ),
            template=client.v1podtemplatespec(
                metadata=client.v1objectmeta(
                    labels={"app": name}
                ),
                spec=client.v1podspec(
                    containers=[
                        client.v1container(
                            name=name,
                            image=image,
                            ports=[client.v1containerport(container_port=80)]
                        )
                    ]
                )
            )
        )
    )
    
    # 获取api实例
    apps_v1 = client.appsv1api()
    
    try:
        # 创建deployment
        api_response = apps_v1.create_namespaced_deployment(
            namespace=namespace,
            body=deployment
        )
        print(f"deployment {name} 创建成功")
        return api_response
    except apiexception as e:
        print(f"deployment创建失败:{str(e)}")
        return none

# 使用示例
create_deployment("nginx-deployment", "nginx:latest")

示例6:更新deployment

def update_deployment(name, new_image, namespace="default"):
    apps_v1 = client.appsv1api()
    
    try:
        # 获取现有deployment
        deployment = apps_v1.read_namespaced_deployment(name, namespace)
        
        # 更新镜像
        deployment.spec.template.spec.containers[0].image = new_image
        
        # 应用更新
        api_response = apps_v1.patch_namespaced_deployment(
            name=name,
            namespace=namespace,
            body=deployment
        )
        print(f"deployment {name} 更新成功")
        return api_response
    except apiexception as e:
        print(f"deployment更新失败:{str(e)}")
        return none

# 使用示例
update_deployment("nginx-deployment", "nginx:1.19")

service资源操作

示例7:创建service

def create_service(name, selector, port, target_port, namespace="default"):
    # 创建service对象
    service = client.v1service(
        metadata=client.v1objectmeta(name=name),
        spec=client.v1servicespec(
            selector=selector,
            ports=[client.v1serviceport(
                port=port,
                target_port=target_port
            )]
        )
    )
    
    v1 = client.corev1api()
    
    try:
        # 创建service
        api_response = v1.create_namespaced_service(
            namespace=namespace,
            body=service
        )
        print(f"service {name} 创建成功")
        return api_response
    except apiexception as e:
        print(f"service创建失败:{str(e)}")
        return none

# 使用示例
create_service(
    "nginx-service",
    {"app": "nginx-deployment"},
    80,
    80
)

configmap和secret管理

示例8:创建configmap

def create_configmap(name, data, namespace="default"):
    # 创建configmap对象
    configmap = client.v1configmap(
        metadata=client.v1objectmeta(name=name),
        data=data
    )
    
    v1 = client.corev1api()
    
    try:
        # 创建configmap
        api_response = v1.create_namespaced_config_map(
            namespace=namespace,
            body=configmap
        )
        print(f"configmap {name} 创建成功")
        return api_response
    except apiexception as e:
        print(f"configmap创建失败:{str(e)}")
        return none

# 使用示例
config_data = {
    "app.properties": """
    app.name=myapp
    app.env=production
    """
}
create_configmap("app-config", config_data)

示例9:创建secret

import base64

def create_secret(name, data, namespace="default"):
    # 编码数据
    encoded_data = {
        k: base64.b64encode(v.encode()).decode()
        for k, v in data.items()
    }
    
    # 创建secret对象
    secret = client.v1secret(
        metadata=client.v1objectmeta(name=name),
        type="opaque",
        data=encoded_data
    )
    
    v1 = client.corev1api()
    
    try:
        # 创建secret
        api_response = v1.create_namespaced_secret(
            namespace=namespace,
            body=secret
        )
        print(f"secret {name} 创建成功")
        return api_response
    except apiexception as e:
        print(f"secret创建失败:{str(e)}")
        return none

# 使用示例
secret_data = {
    "username": "admin",
    "password": "secret123"
}
create_secret("app-secrets", secret_data)

自定义资源定义(crd)操作

示例10:操作crd资源

def create_custom_resource(group, version, plural, namespace, body):
    # 获取customobjectsapi
    custom_api = client.customobjectsapi()
    
    try:
        # 创建自定义资源
        api_response = custom_api.create_namespaced_custom_object(
            group=group,
            version=version,
            namespace=namespace,
            plural=plural,
            body=body
        )
        print(f"自定义资源创建成功")
        return api_response
    except apiexception as e:
        print(f"自定义资源创建失败:{str(e)}")
        return none

# 使用示例
custom_resource = {
    "apiversion": "stable.example.com/v1",
    "kind": "crontab",
    "metadata": {
        "name": "my-crontab"
    },
    "spec": {
        "cronspec": "* * * * */5",
        "image": "my-cron-image"
    }
}

create_custom_resource(
    group="stable.example.com",
    version="v1",
    plural="crontabs",
    namespace="default",
    body=custom_resource
)

事件监听和watch操作

示例11:监听pod事件

from kubernetes import watch

def watch_pods(namespace="default"):
    v1 = client.corev1api()
    w = watch.watch()
    
    try:
        for event in w.stream(v1.list_namespaced_pod, namespace=namespace):
            pod = event['object']
            event_type = event['type']
            
            print(f"事件类型: {event_type}")
            print(f"pod名称: {pod.metadata.name}")
            print(f"pod状态: {pod.status.phase}")
            print("-------------------")
            
    except apiexception as e:
        print(f"监听失败:{str(e)}")
    except keyboardinterrupt:
        w.stop()
        print("监听已停止")

# 使用示例
# watch_pods()  # 此函数会持续运行直到被中断

高级应用场景

示例12:批量操作和错误处理

def batch_create_resources(resources):
    results = {
        'success': [],
        'failed': []
    }
    
    for resource in resources:
        try:
            if resource['kind'] == 'deployment':
                apps_v1 = client.appsv1api()
                response = apps_v1.create_namespaced_deployment(
                    namespace=resource['namespace'],
                    body=resource['spec']
                )
                results['success'].append({
                    'kind': 'deployment',
                    'name': resource['spec'].metadata.name
                })
            elif resource['kind'] == 'service':
                v1 = client.corev1api()
                response = v1.create_namespaced_service(
                    namespace=resource['namespace'],
                    body=resource['spec']
                )
                results['success'].append({
                    'kind': 'service',
                    'name': resource['spec'].metadata.name
                })
        except apiexception as e:
            results['failed'].append({
                'kind': resource['kind'],
                'name': resource['spec'].metadata.name,
                'error': str(e)
            })
    
    return results

# 使用示例
resources = [
    {
        'kind': 'deployment',
        'namespace': 'default',
        'spec': client.v1deployment(
            metadata=client.v1objectmeta(name="nginx-deployment"),
            spec=client.v1deploymentspec(
                replicas=3,
                selector=client.v1labelselector(
                    match_labels={"app": "nginx"}
                ),
                template=client.v1podtemplatespec(
                    metadata=client.v1objectmeta(
                        labels={"app": "nginx"}
                    ),
                    spec=client.v1podspec(
                        containers=[
                            client.v1container(
                                name="nginx",
                                image="nginx:latest"
                            )
                        ]
                    )
                )
            )
        )
	}
]
		
### 示例13:资源清理和垃圾回收

```python
def cleanup_resources(namespace="default", label_selector=none):
    """
    清理指定命名空间下的资源
    """
    v1 = client.corev1api()
    apps_v1 = client.appsv1api()
    
    cleanup_results = {
        'pods': [],
        'deployments': [],
        'services': [],
        'errors': []
    }
    
    try:
        # 删除pod
        pods = v1.list_namespaced_pod(
            namespace=namespace,
            label_selector=label_selector
        )
        for pod in pods.items:
            try:
                v1.delete_namespaced_pod(
                    name=pod.metadata.name,
                    namespace=namespace
                )
                cleanup_results['pods'].append(pod.metadata.name)
            except apiexception as e:
                cleanup_results['errors'].append(f"pod {pod.metadata.name}: {str(e)}")
        
        # 删除deployment
        deployments = apps_v1.list_namespaced_deployment(
            namespace=namespace,
            label_selector=label_selector
        )
        for deployment in deployments.items:
            try:
                apps_v1.delete_namespaced_deployment(
                    name=deployment.metadata.name,
                    namespace=namespace
                )
                cleanup_results['deployments'].append(deployment.metadata.name)
            except apiexception as e:
                cleanup_results['errors'].append(f"deployment {deployment.metadata.name}: {str(e)}")
        
        # 删除service
        services = v1.list_namespaced_service(
            namespace=namespace,
            label_selector=label_selector
        )
        for service in services.items:
            try:
                v1.delete_namespaced_service(
                    name=service.metadata.name,
                    namespace=namespace
                )
                cleanup_results['services'].append(service.metadata.name)
            except apiexception as e:
                cleanup_results['errors'].append(f"service {service.metadata.name}: {str(e)}")
                
        return cleanup_results
    
    except apiexception as e:
        print(f"清理资源时发生错误:{str(e)}")
        return none

# 使用示例
cleanup_result = cleanup_resources(namespace="default", label_selector="app=nginx")
print("清理结果:", cleanup_result)

示例14:资源健康检查和自动修复

import time
from typing import dict, list

class resourcehealthchecker:
    def __init__(self, namespace: str = "default"):
        self.namespace = namespace
        self.v1 = client.corev1api()
        self.apps_v1 = client.appsv1api()
        
    def check_pod_health(self) -> dict[str, list[str]]:
        """
        检查pod的健康状态
        """
        unhealthy_pods = []
        pending_pods = []
        
        try:
            pods = self.v1.list_namespaced_pod(namespace=self.namespace)
            
            for pod in pods.items:
                if pod.status.phase == 'failed':
                    unhealthy_pods.append(pod.metadata.name)
                elif pod.status.phase == 'pending':
                    pending_pods.append(pod.metadata.name)
            
            return {
                'unhealthy': unhealthy_pods,
                'pending': pending_pods
            }
        
        except apiexception as e:
            print(f"检查pod健康状态时发生错误:{str(e)}")
            return none
    
    def check_deployment_health(self) -> dict[str, list[str]]:
        """
        检查deployment的健康状态
        """
        unhealthy_deployments = []
        
        try:
            deployments = self.apps_v1.list_namespaced_deployment(namespace=self.namespace)
            
            for deployment in deployments.items:
                if deployment.status.ready_replicas != deployment.status.replicas:
                    unhealthy_deployments.append(deployment.metadata.name)
            
            return {
                'unhealthy': unhealthy_deployments
            }
        
        except apiexception as e:
            print(f"检查deployment健康状态时发生错误:{str(e)}")
            return none
    
    def auto_repair(self):
        """
        自动修复不健康的资源
        """
        repair_actions = []
        
        # 检查并修复pod
        pod_health = self.check_pod_health()
        if pod_health:
            for unhealthy_pod in pod_health['unhealthy']:
                try:
                    self.v1.delete_namespaced_pod(
                        name=unhealthy_pod,
                        namespace=self.namespace
                    )
                    repair_actions.append(f"删除不健康的pod: {unhealthy_pod}")
                except apiexception as e:
                    repair_actions.append(f"修复pod {unhealthy_pod} 失败: {str(e)}")
        
        # 检查并修复deployment
        deployment_health = self.check_deployment_health()
        if deployment_health:
            for unhealthy_deployment in deployment_health['unhealthy']:
                try:
                    # 重启deployment
                    patch = {
                        "spec": {
                            "template": {
                                "metadata": {
                                    "annotations": {
                                        "kubectl.kubernetes.io/restartedat": datetime.now().isoformat()
                                    }
                                }
                            }
                        }
                    }
                    self.apps_v1.patch_namespaced_deployment(
                        name=unhealthy_deployment,
                        namespace=self.namespace,
                        body=patch
                    )
                    repair_actions.append(f"重启deployment: {unhealthy_deployment}")
                except apiexception as e:
                    repair_actions.append(f"修复deployment {unhealthy_deployment} 失败: {str(e)}")
        
        return repair_actions

# 使用示例
health_checker = resourcehealthchecker("default")
repair_results = health_checker.auto_repair()
print("修复操作:", repair_results)

示例15:自定义控制器实现

from kubernetes import watch
import threading
import queue

class customcontroller:
    def __init__(self, namespace="default"):
        self.namespace = namespace
        self.v1 = client.corev1api()
        self.apps_v1 = client.appsv1api()
        self.event_queue = queue.queue()
        self.running = false
    
    def start(self):
        """
        启动控制器
        """
        self.running = true
        
        # 启动事件处理线程
        threading.thread(target=self._process_events).start()
        
        # 启动资源监控
        threading.thread(target=self._watch_pods).start()
        threading.thread(target=self._watch_deployments).start()
    
    def stop(self):
        """
        停止控制器
        """
        self.running = false
    
    def _watch_pods(self):
        """
        监控pod变化
        """
        w = watch.watch()
        while self.running:
            try:
                for event in w.stream(
                    self.v1.list_namespaced_pod,
                    namespace=self.namespace
                ):
                    if not self.running:
                        break
                    self.event_queue.put(('pod', event))
            except exception as e:
                print(f"pod监控异常:{str(e)}")
                if self.running:
                    time.sleep(5)  # 发生错误时等待后重试
    
    def _watch_deployments(self):
        """
        监控deployment变化
        """
        w = watch.watch()
        while self.running:
            try:
                for event in w.stream(
                    self.apps_v1.list_namespaced_deployment,
                    namespace=self.namespace
                ):
                    if not self.running:
                        break
                    self.event_queue.put(('deployment', event))
            except exception as e:
                print(f"deployment监控异常:{str(e)}")
                if self.running:
                    time.sleep(5)
    
    def _process_events(self):
        """
        处理事件队列
        """
        while self.running:
            try:
                resource_type, event = self.event_queue.get(timeout=1)
                self._handle_event(resource_type, event)
            except queue.empty:
                continue
            except exception as e:
                print(f"事件处理异常:{str(e)}")
    
    def _handle_event(self, resource_type, event):
        """
        处理具体事件
        """
        event_type = event['type']
        obj = event['object']
        
        print(f"收到{resource_type}事件:")
        print(f"  类型: {event_type}")
        print(f"  名称: {obj.metadata.name}")
        
        if resource_type == 'pod':
            self._handle_pod_event(event_type, obj)
        elif resource_type == 'deployment':
            self._handle_deployment_event(event_type, obj)
    
    def _handle_pod_event(self, event_type, pod):
        """
        处理pod事件
        """
        if event_type == 'modified':
            if pod.status.phase == 'failed':
                print(f"检测到pod {pod.metadata.name} 失败,尝试重启")
                try:
                    self.v1.delete_namespaced_pod(
                        name=pod.metadata.name,
                        namespace=self.namespace
                    )
                except apiexception as e:
                    print(f"重启pod失败:{str(e)}")
    
    def _handle_deployment_event(self, event_type, deployment):
        """
        处理deployment事件
        """
        if event_type == 'modified':
            if deployment.status.ready_replicas != deployment.status.replicas:
                print(f"检测到deployment {deployment.metadata.name} 副本不一致")
                # 这里可以添加自定义的处理逻辑

# 使用示例
controller = customcontroller("default")
controller.start()

# 运行一段时间后停止
# time.sleep(3600)
# controller.stop()

示例16:资源指标监控

from kubernetes.client import customobjectsapi
import time

class metricscollector:
    def __init__(self):
        self.custom_api = customobjectsapi()
    
    def get_node_metrics(self):
        """
        获取节点资源使用指标
        """
        try:
            metrics = self.custom_api.list_cluster_custom_object(
                group="metrics.k8s.io",
                version="v1beta1",
                plural="nodes"
            )
            
            node_metrics = {}
            for item in metrics['items']:
                node_name = item['metadata']['name']
                node_metrics[node_name] = {
                    'cpu': item['usage']['cpu'],
                    'memory': item['usage']['memory']
                }
            
            return node_metrics
        
        except apiexception as e:
            print(f"获取节点指标失败:{str(e)}")
            return none
    
    def get_pod_metrics(self, namespace="default"):
        """
        获取pod资源使用指标
        """
        try:
            metrics = self.custom_api.list_namespaced_custom_object(
                group="metrics.k8s.io",
                version="v1beta1",
                namespace=namespace,
                plural="pods"
            )
            
            pod_metrics = {}
            for item in metrics['items']:
                pod_name = item['metadata']['name']
                containers = {}
                
                for container in item['containers']:
                    containers[container['name']] = {
                        'cpu': container['usage']['cpu'],
                        'memory': container['usage']['memory']
                    }
                
                pod_metrics[pod_name] = containers
            
            return pod_metrics
        
        except apiexception as e:
            print(f"获取pod指标失败:{str(e)}")
            return none
    
    def monitor_resources(self, interval=30):
        """
        持续监控资源使用情况
        """
        while true:
            print("\n=== 资源使用情况 ===")
            
            # 获取节点指标
            node_metrics = self.get_node_metrics()
            if node_metrics:
                print("\n节点资源使用情况:")
                for node_name, metrics in node_metrics.items():
                    print(f"\n节点: {node_name}")
                    print(f"cpu使用: {metrics['cpu']}")
                    print(f"内存使用: {metrics['memory']}")
            
            # 获取pod指标
            pod_metrics = self.get_pod_metrics()
            if pod_metrics:
                print("\npod资源使用情况:")
                for pod_name, containers in pod_metrics.items():
                    print(f"\npod: {pod_name}")
                    for container_name, metrics in containers.items():
                        print(f"容器: {container_name}")
                        print(f"cpu使用: {metrics['cpu']}")
                        print(f"内存使用: {metrics['memory']}")
            
            time.sleep(interval)

# 使用示例
collector = metricscollector()
# collector.monitor_resources()  # 持续监控

最佳实践和注意事项

1.错误处理

  • 始终使用try-except块处理api调用
  • 实现重试机制处理临时性故障
  • 记录详细的错误信息便于调试

2.性能优化

  • 使用批量操作代替单个操作
  • 实现合适的缓存机制
  • 避免频繁的api调用

3.安全考虑

  • 使用最小权限原则
  • 保护敏感信息(如密钥和证书)
  • 实现适当的认证和授权机制

4.可维护性

  • 模块化代码结构
  • 完善的日志记录
  • 清晰的代码注释

总结

本文详细介绍了如何使用python操作kubernetes集群,包括:

  1. 基础环境配置
  2. 常见资源操作
  3. 高级应用场景
  4. 自动化运维实践
  5. 监控和告警实现

通过这些示例和最佳实践,可以构建强大的kubernetes自动化工具和运维系统。

以上就是python操作kubernetes集群的完全指南的详细内容,更多关于python操作kubernetes集群的资料请关注代码网其它相关文章!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2026  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com