基础环境准备
1. 安装必要的包
首先,我们需要安装python的kubernetes客户端库:
pip install kubernetes pip install openshift # 可选,用于openshift集群
2. 配置文件准备
import os from kubernetes import client, config # 加载kubeconfig配置 config.load_kube_config()
python kubernetes客户端介绍
主要模块说明
from kubernetes import client, config, watch from kubernetes.client import apiclient from kubernetes.client.rest import apiexception
主要模块功能:
client: 提供各种api操作接口config: 处理配置文件加载watch: 用于监控资源变化apiclient: 底层api客户端apiexception: 异常处理
连接kubernetes集群
示例1:基础连接配置
from kubernetes import client, config
def connect_kubernetes():
try:
# 加载本地kubeconfig
config.load_kube_config()
# 创建api客户端
v1 = client.corev1api()
# 测试连接
ret = v1.list_pod_for_all_namespaces(limit=1)
print("连接成功!发现 {} 个pod".format(len(ret.items)))
return v1
except exception as e:
print(f"连接失败:{str(e)}")
return none
# 测试连接
api = connect_kubernetes()
示例2:多集群配置
def connect_multiple_clusters():
clusters = {
'prod': '/path/to/prod-kubeconfig',
'dev': '/path/to/dev-kubeconfig'
}
apis = {}
for cluster_name, config_file in clusters.items():
try:
config.load_kube_config(config_file=config_file)
apis[cluster_name] = client.corev1api()
print(f"成功连接到{cluster_name}集群")
except exception as e:
print(f"连接{cluster_name}集群失败:{str(e)}")
return apis
pod操作实战
示例3:创建pod
from kubernetes import client, config
def create_pod(name, image, namespace="default"):
# 创建pod对象
pod = client.v1pod(
metadata=client.v1objectmeta(name=name),
spec=client.v1podspec(
containers=[
client.v1container(
name=name,
image=image,
ports=[client.v1containerport(container_port=80)]
)
]
)
)
# 获取api实例
v1 = client.corev1api()
try:
# 创建pod
api_response = v1.create_namespaced_pod(
namespace=namespace,
body=pod
)
print(f"pod {name} 创建成功")
return api_response
except apiexception as e:
print(f"pod创建失败:{str(e)}")
return none
# 使用示例
create_pod("nginx-pod", "nginx:latest")
示例4:查询pod状态
def get_pod_status(name, namespace="default"):
v1 = client.corev1api()
try:
pod = v1.read_namespaced_pod(name=name, namespace=namespace)
return {
"name": pod.metadata.name,
"status": pod.status.phase,
"pod_ip": pod.status.pod_ip,
"host_ip": pod.status.host_ip,
"start_time": pod.status.start_time,
"conditions": [
{
"type": condition.type,
"status": condition.status
}
for condition in pod.status.conditions or []
]
}
except apiexception as e:
print(f"获取pod状态失败:{str(e)}")
return none
# 使用示例
status = get_pod_status("nginx-pod")
print(status)
deployment管理
示例5:创建deployment
def create_deployment(name, image, replicas=3, namespace="default"):
# 创建deployment对象
deployment = client.v1deployment(
metadata=client.v1objectmeta(name=name),
spec=client.v1deploymentspec(
replicas=replicas,
selector=client.v1labelselector(
match_labels={"app": name}
),
template=client.v1podtemplatespec(
metadata=client.v1objectmeta(
labels={"app": name}
),
spec=client.v1podspec(
containers=[
client.v1container(
name=name,
image=image,
ports=[client.v1containerport(container_port=80)]
)
]
)
)
)
)
# 获取api实例
apps_v1 = client.appsv1api()
try:
# 创建deployment
api_response = apps_v1.create_namespaced_deployment(
namespace=namespace,
body=deployment
)
print(f"deployment {name} 创建成功")
return api_response
except apiexception as e:
print(f"deployment创建失败:{str(e)}")
return none
# 使用示例
create_deployment("nginx-deployment", "nginx:latest")
示例6:更新deployment
def update_deployment(name, new_image, namespace="default"):
apps_v1 = client.appsv1api()
try:
# 获取现有deployment
deployment = apps_v1.read_namespaced_deployment(name, namespace)
# 更新镜像
deployment.spec.template.spec.containers[0].image = new_image
# 应用更新
api_response = apps_v1.patch_namespaced_deployment(
name=name,
namespace=namespace,
body=deployment
)
print(f"deployment {name} 更新成功")
return api_response
except apiexception as e:
print(f"deployment更新失败:{str(e)}")
return none
# 使用示例
update_deployment("nginx-deployment", "nginx:1.19")
service资源操作
示例7:创建service
def create_service(name, selector, port, target_port, namespace="default"):
# 创建service对象
service = client.v1service(
metadata=client.v1objectmeta(name=name),
spec=client.v1servicespec(
selector=selector,
ports=[client.v1serviceport(
port=port,
target_port=target_port
)]
)
)
v1 = client.corev1api()
try:
# 创建service
api_response = v1.create_namespaced_service(
namespace=namespace,
body=service
)
print(f"service {name} 创建成功")
return api_response
except apiexception as e:
print(f"service创建失败:{str(e)}")
return none
# 使用示例
create_service(
"nginx-service",
{"app": "nginx-deployment"},
80,
80
)
configmap和secret管理
示例8:创建configmap
def create_configmap(name, data, namespace="default"):
# 创建configmap对象
configmap = client.v1configmap(
metadata=client.v1objectmeta(name=name),
data=data
)
v1 = client.corev1api()
try:
# 创建configmap
api_response = v1.create_namespaced_config_map(
namespace=namespace,
body=configmap
)
print(f"configmap {name} 创建成功")
return api_response
except apiexception as e:
print(f"configmap创建失败:{str(e)}")
return none
# 使用示例
config_data = {
"app.properties": """
app.name=myapp
app.env=production
"""
}
create_configmap("app-config", config_data)
示例9:创建secret
import base64
def create_secret(name, data, namespace="default"):
# 编码数据
encoded_data = {
k: base64.b64encode(v.encode()).decode()
for k, v in data.items()
}
# 创建secret对象
secret = client.v1secret(
metadata=client.v1objectmeta(name=name),
type="opaque",
data=encoded_data
)
v1 = client.corev1api()
try:
# 创建secret
api_response = v1.create_namespaced_secret(
namespace=namespace,
body=secret
)
print(f"secret {name} 创建成功")
return api_response
except apiexception as e:
print(f"secret创建失败:{str(e)}")
return none
# 使用示例
secret_data = {
"username": "admin",
"password": "secret123"
}
create_secret("app-secrets", secret_data)
自定义资源定义(crd)操作
示例10:操作crd资源
def create_custom_resource(group, version, plural, namespace, body):
# 获取customobjectsapi
custom_api = client.customobjectsapi()
try:
# 创建自定义资源
api_response = custom_api.create_namespaced_custom_object(
group=group,
version=version,
namespace=namespace,
plural=plural,
body=body
)
print(f"自定义资源创建成功")
return api_response
except apiexception as e:
print(f"自定义资源创建失败:{str(e)}")
return none
# 使用示例
custom_resource = {
"apiversion": "stable.example.com/v1",
"kind": "crontab",
"metadata": {
"name": "my-crontab"
},
"spec": {
"cronspec": "* * * * */5",
"image": "my-cron-image"
}
}
create_custom_resource(
group="stable.example.com",
version="v1",
plural="crontabs",
namespace="default",
body=custom_resource
)
事件监听和watch操作
示例11:监听pod事件
from kubernetes import watch
def watch_pods(namespace="default"):
v1 = client.corev1api()
w = watch.watch()
try:
for event in w.stream(v1.list_namespaced_pod, namespace=namespace):
pod = event['object']
event_type = event['type']
print(f"事件类型: {event_type}")
print(f"pod名称: {pod.metadata.name}")
print(f"pod状态: {pod.status.phase}")
print("-------------------")
except apiexception as e:
print(f"监听失败:{str(e)}")
except keyboardinterrupt:
w.stop()
print("监听已停止")
# 使用示例
# watch_pods() # 此函数会持续运行直到被中断
高级应用场景
示例12:批量操作和错误处理
def batch_create_resources(resources):
results = {
'success': [],
'failed': []
}
for resource in resources:
try:
if resource['kind'] == 'deployment':
apps_v1 = client.appsv1api()
response = apps_v1.create_namespaced_deployment(
namespace=resource['namespace'],
body=resource['spec']
)
results['success'].append({
'kind': 'deployment',
'name': resource['spec'].metadata.name
})
elif resource['kind'] == 'service':
v1 = client.corev1api()
response = v1.create_namespaced_service(
namespace=resource['namespace'],
body=resource['spec']
)
results['success'].append({
'kind': 'service',
'name': resource['spec'].metadata.name
})
except apiexception as e:
results['failed'].append({
'kind': resource['kind'],
'name': resource['spec'].metadata.name,
'error': str(e)
})
return results
# 使用示例
resources = [
{
'kind': 'deployment',
'namespace': 'default',
'spec': client.v1deployment(
metadata=client.v1objectmeta(name="nginx-deployment"),
spec=client.v1deploymentspec(
replicas=3,
selector=client.v1labelselector(
match_labels={"app": "nginx"}
),
template=client.v1podtemplatespec(
metadata=client.v1objectmeta(
labels={"app": "nginx"}
),
spec=client.v1podspec(
containers=[
client.v1container(
name="nginx",
image="nginx:latest"
)
]
)
)
)
)
}
]
### 示例13:资源清理和垃圾回收
```python
def cleanup_resources(namespace="default", label_selector=none):
"""
清理指定命名空间下的资源
"""
v1 = client.corev1api()
apps_v1 = client.appsv1api()
cleanup_results = {
'pods': [],
'deployments': [],
'services': [],
'errors': []
}
try:
# 删除pod
pods = v1.list_namespaced_pod(
namespace=namespace,
label_selector=label_selector
)
for pod in pods.items:
try:
v1.delete_namespaced_pod(
name=pod.metadata.name,
namespace=namespace
)
cleanup_results['pods'].append(pod.metadata.name)
except apiexception as e:
cleanup_results['errors'].append(f"pod {pod.metadata.name}: {str(e)}")
# 删除deployment
deployments = apps_v1.list_namespaced_deployment(
namespace=namespace,
label_selector=label_selector
)
for deployment in deployments.items:
try:
apps_v1.delete_namespaced_deployment(
name=deployment.metadata.name,
namespace=namespace
)
cleanup_results['deployments'].append(deployment.metadata.name)
except apiexception as e:
cleanup_results['errors'].append(f"deployment {deployment.metadata.name}: {str(e)}")
# 删除service
services = v1.list_namespaced_service(
namespace=namespace,
label_selector=label_selector
)
for service in services.items:
try:
v1.delete_namespaced_service(
name=service.metadata.name,
namespace=namespace
)
cleanup_results['services'].append(service.metadata.name)
except apiexception as e:
cleanup_results['errors'].append(f"service {service.metadata.name}: {str(e)}")
return cleanup_results
except apiexception as e:
print(f"清理资源时发生错误:{str(e)}")
return none
# 使用示例
cleanup_result = cleanup_resources(namespace="default", label_selector="app=nginx")
print("清理结果:", cleanup_result)
示例14:资源健康检查和自动修复
import time
from typing import dict, list
class resourcehealthchecker:
def __init__(self, namespace: str = "default"):
self.namespace = namespace
self.v1 = client.corev1api()
self.apps_v1 = client.appsv1api()
def check_pod_health(self) -> dict[str, list[str]]:
"""
检查pod的健康状态
"""
unhealthy_pods = []
pending_pods = []
try:
pods = self.v1.list_namespaced_pod(namespace=self.namespace)
for pod in pods.items:
if pod.status.phase == 'failed':
unhealthy_pods.append(pod.metadata.name)
elif pod.status.phase == 'pending':
pending_pods.append(pod.metadata.name)
return {
'unhealthy': unhealthy_pods,
'pending': pending_pods
}
except apiexception as e:
print(f"检查pod健康状态时发生错误:{str(e)}")
return none
def check_deployment_health(self) -> dict[str, list[str]]:
"""
检查deployment的健康状态
"""
unhealthy_deployments = []
try:
deployments = self.apps_v1.list_namespaced_deployment(namespace=self.namespace)
for deployment in deployments.items:
if deployment.status.ready_replicas != deployment.status.replicas:
unhealthy_deployments.append(deployment.metadata.name)
return {
'unhealthy': unhealthy_deployments
}
except apiexception as e:
print(f"检查deployment健康状态时发生错误:{str(e)}")
return none
def auto_repair(self):
"""
自动修复不健康的资源
"""
repair_actions = []
# 检查并修复pod
pod_health = self.check_pod_health()
if pod_health:
for unhealthy_pod in pod_health['unhealthy']:
try:
self.v1.delete_namespaced_pod(
name=unhealthy_pod,
namespace=self.namespace
)
repair_actions.append(f"删除不健康的pod: {unhealthy_pod}")
except apiexception as e:
repair_actions.append(f"修复pod {unhealthy_pod} 失败: {str(e)}")
# 检查并修复deployment
deployment_health = self.check_deployment_health()
if deployment_health:
for unhealthy_deployment in deployment_health['unhealthy']:
try:
# 重启deployment
patch = {
"spec": {
"template": {
"metadata": {
"annotations": {
"kubectl.kubernetes.io/restartedat": datetime.now().isoformat()
}
}
}
}
}
self.apps_v1.patch_namespaced_deployment(
name=unhealthy_deployment,
namespace=self.namespace,
body=patch
)
repair_actions.append(f"重启deployment: {unhealthy_deployment}")
except apiexception as e:
repair_actions.append(f"修复deployment {unhealthy_deployment} 失败: {str(e)}")
return repair_actions
# 使用示例
health_checker = resourcehealthchecker("default")
repair_results = health_checker.auto_repair()
print("修复操作:", repair_results)
示例15:自定义控制器实现
from kubernetes import watch
import threading
import queue
class customcontroller:
def __init__(self, namespace="default"):
self.namespace = namespace
self.v1 = client.corev1api()
self.apps_v1 = client.appsv1api()
self.event_queue = queue.queue()
self.running = false
def start(self):
"""
启动控制器
"""
self.running = true
# 启动事件处理线程
threading.thread(target=self._process_events).start()
# 启动资源监控
threading.thread(target=self._watch_pods).start()
threading.thread(target=self._watch_deployments).start()
def stop(self):
"""
停止控制器
"""
self.running = false
def _watch_pods(self):
"""
监控pod变化
"""
w = watch.watch()
while self.running:
try:
for event in w.stream(
self.v1.list_namespaced_pod,
namespace=self.namespace
):
if not self.running:
break
self.event_queue.put(('pod', event))
except exception as e:
print(f"pod监控异常:{str(e)}")
if self.running:
time.sleep(5) # 发生错误时等待后重试
def _watch_deployments(self):
"""
监控deployment变化
"""
w = watch.watch()
while self.running:
try:
for event in w.stream(
self.apps_v1.list_namespaced_deployment,
namespace=self.namespace
):
if not self.running:
break
self.event_queue.put(('deployment', event))
except exception as e:
print(f"deployment监控异常:{str(e)}")
if self.running:
time.sleep(5)
def _process_events(self):
"""
处理事件队列
"""
while self.running:
try:
resource_type, event = self.event_queue.get(timeout=1)
self._handle_event(resource_type, event)
except queue.empty:
continue
except exception as e:
print(f"事件处理异常:{str(e)}")
def _handle_event(self, resource_type, event):
"""
处理具体事件
"""
event_type = event['type']
obj = event['object']
print(f"收到{resource_type}事件:")
print(f" 类型: {event_type}")
print(f" 名称: {obj.metadata.name}")
if resource_type == 'pod':
self._handle_pod_event(event_type, obj)
elif resource_type == 'deployment':
self._handle_deployment_event(event_type, obj)
def _handle_pod_event(self, event_type, pod):
"""
处理pod事件
"""
if event_type == 'modified':
if pod.status.phase == 'failed':
print(f"检测到pod {pod.metadata.name} 失败,尝试重启")
try:
self.v1.delete_namespaced_pod(
name=pod.metadata.name,
namespace=self.namespace
)
except apiexception as e:
print(f"重启pod失败:{str(e)}")
def _handle_deployment_event(self, event_type, deployment):
"""
处理deployment事件
"""
if event_type == 'modified':
if deployment.status.ready_replicas != deployment.status.replicas:
print(f"检测到deployment {deployment.metadata.name} 副本不一致")
# 这里可以添加自定义的处理逻辑
# 使用示例
controller = customcontroller("default")
controller.start()
# 运行一段时间后停止
# time.sleep(3600)
# controller.stop()
示例16:资源指标监控
from kubernetes.client import customobjectsapi
import time
class metricscollector:
def __init__(self):
self.custom_api = customobjectsapi()
def get_node_metrics(self):
"""
获取节点资源使用指标
"""
try:
metrics = self.custom_api.list_cluster_custom_object(
group="metrics.k8s.io",
version="v1beta1",
plural="nodes"
)
node_metrics = {}
for item in metrics['items']:
node_name = item['metadata']['name']
node_metrics[node_name] = {
'cpu': item['usage']['cpu'],
'memory': item['usage']['memory']
}
return node_metrics
except apiexception as e:
print(f"获取节点指标失败:{str(e)}")
return none
def get_pod_metrics(self, namespace="default"):
"""
获取pod资源使用指标
"""
try:
metrics = self.custom_api.list_namespaced_custom_object(
group="metrics.k8s.io",
version="v1beta1",
namespace=namespace,
plural="pods"
)
pod_metrics = {}
for item in metrics['items']:
pod_name = item['metadata']['name']
containers = {}
for container in item['containers']:
containers[container['name']] = {
'cpu': container['usage']['cpu'],
'memory': container['usage']['memory']
}
pod_metrics[pod_name] = containers
return pod_metrics
except apiexception as e:
print(f"获取pod指标失败:{str(e)}")
return none
def monitor_resources(self, interval=30):
"""
持续监控资源使用情况
"""
while true:
print("\n=== 资源使用情况 ===")
# 获取节点指标
node_metrics = self.get_node_metrics()
if node_metrics:
print("\n节点资源使用情况:")
for node_name, metrics in node_metrics.items():
print(f"\n节点: {node_name}")
print(f"cpu使用: {metrics['cpu']}")
print(f"内存使用: {metrics['memory']}")
# 获取pod指标
pod_metrics = self.get_pod_metrics()
if pod_metrics:
print("\npod资源使用情况:")
for pod_name, containers in pod_metrics.items():
print(f"\npod: {pod_name}")
for container_name, metrics in containers.items():
print(f"容器: {container_name}")
print(f"cpu使用: {metrics['cpu']}")
print(f"内存使用: {metrics['memory']}")
time.sleep(interval)
# 使用示例
collector = metricscollector()
# collector.monitor_resources() # 持续监控
最佳实践和注意事项
1.错误处理
- 始终使用try-except块处理api调用
- 实现重试机制处理临时性故障
- 记录详细的错误信息便于调试
2.性能优化
- 使用批量操作代替单个操作
- 实现合适的缓存机制
- 避免频繁的api调用
3.安全考虑
- 使用最小权限原则
- 保护敏感信息(如密钥和证书)
- 实现适当的认证和授权机制
4.可维护性
- 模块化代码结构
- 完善的日志记录
- 清晰的代码注释
总结
本文详细介绍了如何使用python操作kubernetes集群,包括:
- 基础环境配置
- 常见资源操作
- 高级应用场景
- 自动化运维实践
- 监控和告警实现
通过这些示例和最佳实践,可以构建强大的kubernetes自动化工具和运维系统。
以上就是python操作kubernetes集群的完全指南的详细内容,更多关于python操作kubernetes集群的资料请关注代码网其它相关文章!
发表评论