当前位置: 代码网 > it编程>前端脚本>Python > Python处理SSH和Redis的示例代码详解

Python处理SSH和Redis的示例代码详解

2026年01月23日 Python 我要评论
一、python处理ssh基于python代码直接可以远程操作linux服务器(此类技术的运用:利用python完成远程应用系统部署,远程监控,文件传输,无代理模式)安装第三方库:paramikopi

一、python处理ssh

基于python代码直接可以远程操作linux服务器(此类技术的运用:利用python完成远程应用系统部署,远程监控,文件传输,无代理模式)

安装第三方库:paramiko

pip install paramiko

python代码实现

第一阶段:最基础的ssh连接

import paramiko

# 第一步:创建ssh客户端
ssh_client = paramiko.sshclient()

# 第二步:设置自动接受未知主机密钥(学习时使用,生产环境不建议)
ssh_client.set_missing_host_key_policy(paramiko.autoaddpolicy())

# 第三步:连接到服务器
ssh_client.connect(
    hostname='192.168.102.131',
    port=22,           # ssh默认端口
    username='yt',
    password='123.com'
)

print("连接成功!")

# 第四步:执行一个简单命令
# stdin,stdout,stderr = ssh_client.exec_command('pwd')
# stdin,stdout,stderr = ssh_client.exec_command('ls -la')
# stdin,stdout,stderr = ssh_client.exec_command('cd /usr/ && pwd')
cmd = '''
cd /usr
ls -la
pwd
whoami
'''
stdin,stdout,stderr = ssh_client.exec_command(cmd)

# 第五步:读取命令输出
output = stdout.read().decode('utf-8')
print(f"命令输出结果:{output}")

# 第六步:关闭连接
ssh_client.close()
print("连接已关闭")

第二阶段:处理命令输出和错误

import paramiko

# 创建一个执行命令的函数
def execute_ssh_command(host, user, password, command):
    ssh = paramiko.sshclient()
    ssh.set_missing_host_key_policy(paramiko.autoaddpolicy())

    try:
        ssh.connect(hostname=host, username=user, password=password)

        # 执行命令
        stdin, stdout, stderr = ssh.exec_command(command)

        # 等待命令完成,并获取退出状态码
        exit_status = stdout.channel.recv_exit_status()

        # 读取输出
        output = stdout.read().decode('utf-8')
        error_output = stderr.read().decode('utf-8')

        print(f"命令: {command}")
        print(f"退出状态码: {exit_status} (0表示成功)")

        if output:
            print("输出结果:")
            print(output)

        if error_output:
            print("错误信息:")
            print(error_output)

    finally:
        ssh.close()


# 测试不同的命令
commands = ['pwd', 'ls /nonexistent', 'date']

for cmd in commands:
    execute_ssh_command('192.168.102.131', 'yt', '123.com', cmd)
    print("-" * 50)

第三阶段:传输文件

import paramiko

def upload_file():
    """上传文件到服务器"""
    # 先建立连接
    ssh = paramiko.sshclient()
    ssh.set_missing_host_key_policy(paramiko.autoaddpolicy())
    ssh.connect('服务器ip', username='用户名', password='密码')
    
    # 打开sftp会话
    sftp = ssh.open_sftp()
    
    # 上传本地文件到远程服务器
    local_file = 'test.txt'        # 本地文件
    remote_file = '/tmp/test.txt'  # 远程路径
    
    sftp.put(local_file, remote_file)
    print(f"文件上传成功:{local_file} -> {remote_file}")
    
    # 关闭连接
    sftp.close()
    ssh.close()

def download_file():
    """从服务器下载文件"""
    ssh = paramiko.sshclient()
    ssh.set_missing_host_key_policy(paramiko.autoaddpolicy())
    ssh.connect('服务器ip', username='用户名', password='密码')
    
    sftp = ssh.open_sftp()
    
    # 下载文件
    remote_file = '/var/log/syslog'  # 远程文件
    local_file = 'syslog_copy.log'   # 保存到本地
    
    sftp.get(remote_file, local_file)
    print(f"文件下载成功:{remote_file} -> {local_file}")
    
    sftp.close()
    ssh.close()

def list_directory():
    """列出远程目录内容"""
    ssh = paramiko.sshclient()
    ssh.set_missing_host_key_policy(paramiko.autoaddpolicy())
    ssh.connect('服务器ip', username='用户名', password='密码')
    
    sftp = ssh.open_sftp()
    
    # 列出目录内容
    remote_dir = '/tmp'
    files = sftp.listdir(remote_dir)
    
    print(f"目录 {remote_dir} 中的内容:")
    for file in files:
        print(f"  - {file}")
    
    sftp.close()
    ssh.close()

第四阶段:封装成实用工具

import paramiko

class simplesshclient:
    def __init__(self, host, username, password=none, key_path=none):
        self.host = host
        self.username = username
        self.password = password
        self.key_path = key_path
        self.client = none
    
    def connect(self):
        """建立连接"""
        self.client = paramiko.sshclient()
        self.client.set_missing_host_key_policy(paramiko.autoaddpolicy())
        
        if self.key_path:
            # 使用密钥认证
            key = paramiko.rsakey.from_private_key_file(self.key_path)
            self.client.connect(
                hostname=self.host,
                username=self.username,
                pkey=key
            )
        else:
            # 使用密码认证
            self.client.connect(
                hostname=self.host,
                username=self.username,
                password=self.password
            )
        print(f"已连接到 {self.host}")
    
    def run_command(self, command):
        """执行命令并返回结果"""
        if not self.client:
            print("错误:请先调用connect()方法连接")
            return none
        
        stdin, stdout, stderr = self.client.exec_command(command)
        
        # 获取结果
        exit_code = stdout.channel.recv_exit_status()
        output = stdout.read().decode('utf-8')
        error = stderr.read().decode('utf-8')
        
        return {
            'command': command,
            'exit_code': exit_code,
            'output': output,
            'error': error
        }
    
    def disconnect(self):
        """断开连接"""
        if self.client:
            self.client.close()
            print("连接已关闭")
    
    def upload(self, local_path, remote_path):
        """上传文件"""
        sftp = self.client.open_sftp()
        sftp.put(local_path, remote_path)
        sftp.close()
        print(f"上传完成:{local_path} -> {remote_path}")
    
    def download(self, remote_path, local_path):
        """下载文件"""
        sftp = self.client.open_sftp()
        sftp.get(remote_path, local_path)
        sftp.close()
        print(f"下载完成:{remote_path} -> {local_path}")

# 使用示例
def main():
    # 创建客户端(使用密码)
    ssh = simplesshclient(
        host='服务器ip',
        username='用户名',
        password='密码'
    )
    
    # 或者使用密钥
    # ssh = simplesshclient(
    #     host='服务器ip',
    #     username='用户名',
    #     key_path='/path/to/private_key'
    # )
    
    # 连接
    ssh.connect()
    
    # 执行命令
    result = ssh.run_command('ls -la')
    print(f"命令: {result['command']}")
    print(f"退出码: {result['exit_code']}")
    print(f"输出:\n{result['output']}")
    
    # 上传文件
    ssh.upload('本地文件.txt', '/tmp/远程文件.txt')
    
    # 断开连接
    ssh.disconnect()

if __name__ == "__main__":
    main()

二、python处理redis

1.验证安装

import redis
# 测试连接
try:
    r = redis.redis(host='192.168.102.136', password='123.com',port=6379, db=0)
    r.ping()
    print("✅ redis 连接成功!")
except exception as e:
    print(f"❌ 连接失败: {e}")

2. 连接 redis

import redis

# 基本连接
r = redis.redis(
    host='192.168.102.136',  # 主机
    port=6379,         # 端口
    db=0,              # 数据库编号
    password='123.com',     # 密码(如果有)
    decode_responses=true  # 自动解码为字符串
)

# 连接池方式(推荐)
pool = redis.connectionpool(
    host='localhost',
    port=6379,
    db=0,
    max_connections=10  # 最大连接数
)
r = redis.redis(connection_pool=pool)

3.基础操作

import redis

# 连接池方式(推荐)
pool = redis.connectionpool(
    host='192.168.102.136',
    password='123.com',
    port=6379,
    db=0,
    max_connections=10  # 最大连接数
)
r = redis.redis(connection_pool=pool)

# 设置键值对
r.set('name', 'alice')
r.set('age', '25')
r.setex('temp_key', 60, '临时数据')  # 60秒后过期

# 获取值
name = r.get('name')
print(f"姓名: {name}")  # alice

# 批量处理
r.mset({'key1':'value1','key2':'value2'})
values = r.mget(['key1','key2'])
print(f"批量获取:{values}")

# 自增/自减
r.set('counter', 0)
r.incr('counter')  # 增加到1
r.incrby('counter', 5)  # 增加到6
r.decr('counter')  # 减少到5
print(r.get('counter'))

print("="*20)
# 列表操作

# 添加元素
r.lpush('mylist', 'first')   # 左侧插入
r.rpush('mylist', 'second')  # 右侧插入
r.lpush('mylist', 'zero')

# 获取元素
print(r.lrange('mylist', 0, -1))  # 获取所有
print(r.lindex('mylist', 1))      # 获取索引1的元素

# 弹出元素
left_item = r.lpop('mylist')
right_item = r.rpop('mylist')
print(r.lrange('mylist', 0, -1))  # 获取所有

# 列表长度
length = r.llen('mylist')
print(length)

print("="*20)

# 设置哈希字段
r.hset('user:1001', 'name', 'bob')
r.hset('user:1001', 'age', 30)
r.hset('user:1001', 'city', 'beijing')

# 批量设置
r.hmset('user:1002', {'name': 'charlie', 'age': 25})

# 获取字段
name = r.hget('user:1001', 'name')
print(name)
all_fields = r.hgetall('user:1001')
print(f"所有字段: {all_fields}")

# 删除字段
r.hdel('user:1001', 'city')
all_fields = r.hgetall('user:1001')
print(f"所有字段: {all_fields}")

# 集合操作
# 添加元素
r.sadd('tags', 'python', 'redis', 'database')
r.sadd('tags', 'python')  # 重复元素不会添加

# 获取所有元素
tags = r.smembers('tags')
print(f"标签: python
ssh
redis
")

# 集合运算
r.sadd('set1', 'a', 'b', 'c')
r.sadd('set2', 'b', 'c', 'd')

# 交集
inter = r.sinter('set1', 'set2')
print(f"交集: {inter}")

# 并集
union = r.sunion('set1', 'set2')
print(f"并集: {union}")

# 差集
diff = r.sdiff('set1', 'set2')
print(f"差集: {diff}")

# 有序集合
# 添加带分数的元素
r.zadd('scores', {'alice': 90, 'bob': 85, 'charlie': 95})

# 获取排名
rank = r.zrank('scores', 'bob')  # 升序排名
rev_rank = r.zrevrank('scores', 'bob')  # 降序排名

# 获取分数范围
top3 = r.zrevrange('scores', 0, 2, withscores=true)
print(f"前三名: {top3}")

# 获取分数
score = r.zscore('scores', 'alice')

官方文档https://redis.io/documentation

python redis 文档https://redis-py.readthedocs.io/

到此这篇关于python处理ssh和redis的示例代码详解的文章就介绍到这了,更多相关python处理ssh和redis内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2026  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com