当前位置: 代码网 > it编程>编程语言>Java > java通过kerberos权限认证集成hdfs

java通过kerberos权限认证集成hdfs

2024年08月06日 Java 我要评论
java通过kerberos权限认证集成hdfs,并操作hdfs实现增删查、赋权、目录配额等功能

java通过kerberos权限认证集成hdfs,并操作hdfs实现增删查、赋权、目录配额等功能

1、pom文件中引入hdfs包

		<dependency>
            <groupid>org.apache.hadoop</groupid>
            <artifactid>hadoop-client</artifactid>
            <version>2.7.3</version>
        </dependency>
        <dependency>
            <groupid>org.apache.hadoop</groupid>
            <artifactid>hadoop-common</artifactid>
            <version>2.7.3</version>
        </dependency>
        <dependency>
            <groupid>org.apache.hadoop</groupid>
            <artifactid>hadoop-hdfs</artifactid>
            <version>2.7.3</version>
        </dependency>

2、从集群中下载认证过可以登录的keytab文件,以及krb5.conf文件还有core-site.xml、hdfs-site.xml、hive-site.xml、yarn-site.xml放到项目的resources目录下
(xml文件按照自己项目需要下载)
在这里插入图片描述

3、代码实现

package com.example.demo.hdfs;

import org.apache.hadoop.conf.configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.permission.fspermission;
import org.apache.hadoop.hdfs.client.hdfsadmin;
import org.apache.hadoop.io.ioutils;
import org.apache.hadoop.security.usergroupinformation;

import java.io.file;
import java.io.fileinputstream;
import java.io.fileoutputstream;
import java.io.ioexception;
import java.security.privilegedexceptionaction;


/**
 * 主要用于操作hdfs
 */
public class hdfsutil {
    //文件系统
    private static filesystem filesystem;
    public static usergroupinformation ugi = null;
    public static configuration conf = new configuration();

    public static void main(string[] args) throws ioexception {
        connectionhdfs();//连接hdfs
        //createdir("/user/hdfs/test_username");//创建hdfs目录路径
        //delete("/user/hdfs/test_username");//删除hdfs目录路径
        //setfilepermission();//设置hdfs文件的权限
        //uploadfile("d:\\testtxt.txt","/user/hdfs/test_username/");//本地文件上传到服务器hdfs路径下
        //downloadfile("/user/hdfs/test.sql","d:\\");//服务器hdfs路径文件下载文件到本地
        //setspacequota(new path("/user/hdfs/test_username"),104857600);//设置hdfs指定目录目录空间大小 100m
        //clrallquota(new path("/user/hdfs/test_username"));//清除指定hdfs目录的所有配额限制
        gethdfsdata("/user/hdfs/");//查询hdfs下面的文件

    }

    public static void connectionhdfs(){
        string principal = "hdfs";  //kerberos principal,如果不然realm,则使用krb5.conf默认的
        string keytabfile = "src/main/resources/hdfs.keytab";   //kdc生成的keytab文件

        conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.distributedfilesystem");
        conf.set("fs.defaultfs", "hdfs://192.168.0.110:8020");  //hdfs地址
        //需要设置krb5.conf文件
        system.setproperty("java.security.krb5.conf", "src/main/resources/krb5.conf");

        try {

            usergroupinformation.loginuserfromkeytab(principal, keytabfile);
            ugi = usergroupinformation.getloginuser();

            //使用usergroupinformation来获filesystem
            try {
                filesystem = ugi.doas(new privilegedexceptionaction<filesystem>() {
                    @override
                    public filesystem run() throws exception {
                        return filesystem.get(conf);
                    }
                });
            } catch (interruptedexception e) {
                e.printstacktrace();
            }

        } catch (ioexception e) {
            e.printstacktrace();
        }
    }

    /***
     * 查询hdfs下面的文件
     * @param path
     * @throws ioexception
     */
    public static void gethdfsdata(string path) throws ioexception {
        filestatus[] filestatus =  filesystem.liststatus(new path(path));
        for (filestatus fst : filestatus) {
            system.out.println(fst.getpath());
        }
        // 得到一个迭代器:装有指定目录下所有的文件信息
        /*remoteiterator<locatedfilestatus> remoteiterator = filesystem.listfiles(new path(path), true);
        // 遍历迭代器
        while (remoteiterator.hasnext()) {
            locatedfilestatus locatedfilestatus = remoteiterator.next();
            // 文件名称
            string filename = locatedfilestatus.getpath().getname();
            // 长度
            long len = locatedfilestatus.getlen();
            // 权限
            fspermission permission = locatedfilestatus.getpermission();
            // 所属组
            string group = locatedfilestatus.getgroup();
            // 所属用户
            string owner = locatedfilestatus.getowner();
            system.out.println(filename + "\t" + len + "\t" + permission + "\t" + group + "\t" + owner);
            system.out.println("================================");

            // 块信息
            blocklocation[] blocklocations = locatedfilestatus.getblocklocations();
            for (blocklocation blocklocation : blocklocations) {
                string[] hosts = blocklocation.gethosts();
                for (string host : hosts) {
                    system.out.println("主机名称:" + host);
                }
            }
        }*/

    }

    /**
     * 创建一个文件夹方法
     * @param path
     */
    public static void createdir(string path) throws ioexception {
        boolean flag = false;
        if (!filesystem.exists(new path(path))){//如果文件夹不存在
            flag = filesystem.mkdirs(new path(path));
        }
        if(flag){
            system.out.println("文件夹已经创建成功。。。");
        }else{
            system.out.println("文件夹已经存在。。。。");
        }
    }

    /**
     * 删除文件夹以及文件
     * @param path
     */
    public static void delete(string path) throws ioexception {
        boolean flag = false;
        if(filesystem.exists(new path(path))){
            flag = filesystem.delete(new path(path),true);
        }
        if(flag){
            system.out.println("文件或者文件夹已经删除");
        }else{
            system.out.println("文件或者文件夹不存在");
        }
    }

    /**
     * 上传到hdfs上
     * @param srcpath
     * @param destpath
     * @throws ioexception
     */
    public static void uploadfile(string srcpath,string destpath) throws ioexception {
        filesystem.copyfromlocalfile(false,true,new path(srcpath),new path(destpath));
        system.out.println("文件上传成功!!!");
    }
    public static void downloadfile(string srcpath,string destpath) throws ioexception {
        // boolean delsrc 指是否将原文件删除
        // path src 指要下载的文件路径
        // path dst 指将文件下载到的路径
        // boolean userawlocalfilesystem 是否开启文件校验
        filesystem.copytolocalfile(false,new path(srcpath),new path(destpath),true);
        system.out.println("文件下载成功");
    }

    /**
     * 通过流的方式上传文件到hdfs
     * @param localpath 本地路径
     * @param hdfspath hdfs路径
     * @param isoverwrite 是否覆盖
     */
    public static void uploadstream(string localpath, string hdfspath, boolean isoverwrite) throws exception {
        fsdataoutputstream outputstream = filesystem.create(new path(hdfspath), isoverwrite);
        fileinputstream inputstream = new fileinputstream(localpath);
        ioutils.copybytes(inputstream, outputstream, 4096);
    }

    /**
     * 下载hdfs文件
     * @param hdfspath hdfs路径
     * @param localpath 本地路径
     */
    public static void downloadstream(string hdfspath, string localpath) throws illegalargumentexception, ioexception {
        //先获取一个文件的输入流----针对hdfs上的
        fsdatainputstream in = filesystem.open(new path(hdfspath));
        //再构造一个文件的输出流----针对本地的
        fileoutputstream out = new fileoutputstream(new file(localpath));
        //再将输入流中数据传输到输出流
        ioutils.copybytes(in, out, 4096);
    }

    /**
     * 设置hdfs文件的权限
     */
    public static void setfilepermission(){
        try {
            filesystem.setpermission(new path("/user/hdfs/test_username"), new fspermission("700"));
        } catch (ioexception e) {
            e.printstacktrace();
        }
    }

    /**
     * 设置hdfs指定目录目录空间大小
     * hdfs dfsadmin -setspacequota 104857600  hdfs://192.168.0.110/user/hdfs/test_username
     * @param path
     * @param quota
     */
    public static void setspacequota(path path, long quota) {
        hdfsadmin hdfsadmin = null;
        try {
            hdfsadmin = new hdfsadmin(filesystem.geturi(), conf);
        } catch (ioexception e) {
            e.printstacktrace();
        }
        try {
            hdfsadmin.setspacequota(path, quota);
            system.out.println("成功设置hdfs的" + path.getname() + "目录空间大小为:" + quota);
        } catch (ioexception e) {
            e.printstacktrace();
        }
    }
    
      /**
     * 清除指定hdfs目录的所有配额限制
     * @param path
     */
    public static void clrallquota(path path) {
        hdfsadmin hdfsadmin = null;
        try {
            hdfsadmin = new hdfsadmin(filesystem.geturi(), conf);
        } catch (ioexception e) {
            e.printstacktrace();
        }
        try {
            hdfsadmin.clearspacequota(path);
            hdfsadmin.clearquota(path);
            system.out.println("成功清除hdfs的" + path.getname() + "目录的配额限制");
        } catch (ioexception e) {
            e.printstacktrace();
        }
    }

}

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com