java通过kerberos权限认证集成hdfs,并操作hdfs实现增删查、赋权、目录配额等功能
1、pom文件中引入hdfs包
<dependency>
<groupid>org.apache.hadoop</groupid>
<artifactid>hadoop-client</artifactid>
<version>2.7.3</version>
</dependency>
<dependency>
<groupid>org.apache.hadoop</groupid>
<artifactid>hadoop-common</artifactid>
<version>2.7.3</version>
</dependency>
<dependency>
<groupid>org.apache.hadoop</groupid>
<artifactid>hadoop-hdfs</artifactid>
<version>2.7.3</version>
</dependency>
2、从集群中下载认证过可以登录的keytab文件,以及krb5.conf文件还有core-site.xml、hdfs-site.xml、hive-site.xml、yarn-site.xml放到项目的resources目录下
(xml文件按照自己项目需要下载)
3、代码实现
package com.example.demo.hdfs;
import org.apache.hadoop.conf.configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.permission.fspermission;
import org.apache.hadoop.hdfs.client.hdfsadmin;
import org.apache.hadoop.io.ioutils;
import org.apache.hadoop.security.usergroupinformation;
import java.io.file;
import java.io.fileinputstream;
import java.io.fileoutputstream;
import java.io.ioexception;
import java.security.privilegedexceptionaction;
/**
* 主要用于操作hdfs
*/
public class hdfsutil {
//文件系统
private static filesystem filesystem;
public static usergroupinformation ugi = null;
public static configuration conf = new configuration();
public static void main(string[] args) throws ioexception {
connectionhdfs();//连接hdfs
//createdir("/user/hdfs/test_username");//创建hdfs目录路径
//delete("/user/hdfs/test_username");//删除hdfs目录路径
//setfilepermission();//设置hdfs文件的权限
//uploadfile("d:\\testtxt.txt","/user/hdfs/test_username/");//本地文件上传到服务器hdfs路径下
//downloadfile("/user/hdfs/test.sql","d:\\");//服务器hdfs路径文件下载文件到本地
//setspacequota(new path("/user/hdfs/test_username"),104857600);//设置hdfs指定目录目录空间大小 100m
//clrallquota(new path("/user/hdfs/test_username"));//清除指定hdfs目录的所有配额限制
gethdfsdata("/user/hdfs/");//查询hdfs下面的文件
}
public static void connectionhdfs(){
string principal = "hdfs"; //kerberos principal,如果不然realm,则使用krb5.conf默认的
string keytabfile = "src/main/resources/hdfs.keytab"; //kdc生成的keytab文件
conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.distributedfilesystem");
conf.set("fs.defaultfs", "hdfs://192.168.0.110:8020"); //hdfs地址
//需要设置krb5.conf文件
system.setproperty("java.security.krb5.conf", "src/main/resources/krb5.conf");
try {
usergroupinformation.loginuserfromkeytab(principal, keytabfile);
ugi = usergroupinformation.getloginuser();
//使用usergroupinformation来获filesystem
try {
filesystem = ugi.doas(new privilegedexceptionaction<filesystem>() {
@override
public filesystem run() throws exception {
return filesystem.get(conf);
}
});
} catch (interruptedexception e) {
e.printstacktrace();
}
} catch (ioexception e) {
e.printstacktrace();
}
}
/***
* 查询hdfs下面的文件
* @param path
* @throws ioexception
*/
public static void gethdfsdata(string path) throws ioexception {
filestatus[] filestatus = filesystem.liststatus(new path(path));
for (filestatus fst : filestatus) {
system.out.println(fst.getpath());
}
// 得到一个迭代器:装有指定目录下所有的文件信息
/*remoteiterator<locatedfilestatus> remoteiterator = filesystem.listfiles(new path(path), true);
// 遍历迭代器
while (remoteiterator.hasnext()) {
locatedfilestatus locatedfilestatus = remoteiterator.next();
// 文件名称
string filename = locatedfilestatus.getpath().getname();
// 长度
long len = locatedfilestatus.getlen();
// 权限
fspermission permission = locatedfilestatus.getpermission();
// 所属组
string group = locatedfilestatus.getgroup();
// 所属用户
string owner = locatedfilestatus.getowner();
system.out.println(filename + "\t" + len + "\t" + permission + "\t" + group + "\t" + owner);
system.out.println("================================");
// 块信息
blocklocation[] blocklocations = locatedfilestatus.getblocklocations();
for (blocklocation blocklocation : blocklocations) {
string[] hosts = blocklocation.gethosts();
for (string host : hosts) {
system.out.println("主机名称:" + host);
}
}
}*/
}
/**
* 创建一个文件夹方法
* @param path
*/
public static void createdir(string path) throws ioexception {
boolean flag = false;
if (!filesystem.exists(new path(path))){//如果文件夹不存在
flag = filesystem.mkdirs(new path(path));
}
if(flag){
system.out.println("文件夹已经创建成功。。。");
}else{
system.out.println("文件夹已经存在。。。。");
}
}
/**
* 删除文件夹以及文件
* @param path
*/
public static void delete(string path) throws ioexception {
boolean flag = false;
if(filesystem.exists(new path(path))){
flag = filesystem.delete(new path(path),true);
}
if(flag){
system.out.println("文件或者文件夹已经删除");
}else{
system.out.println("文件或者文件夹不存在");
}
}
/**
* 上传到hdfs上
* @param srcpath
* @param destpath
* @throws ioexception
*/
public static void uploadfile(string srcpath,string destpath) throws ioexception {
filesystem.copyfromlocalfile(false,true,new path(srcpath),new path(destpath));
system.out.println("文件上传成功!!!");
}
public static void downloadfile(string srcpath,string destpath) throws ioexception {
// boolean delsrc 指是否将原文件删除
// path src 指要下载的文件路径
// path dst 指将文件下载到的路径
// boolean userawlocalfilesystem 是否开启文件校验
filesystem.copytolocalfile(false,new path(srcpath),new path(destpath),true);
system.out.println("文件下载成功");
}
/**
* 通过流的方式上传文件到hdfs
* @param localpath 本地路径
* @param hdfspath hdfs路径
* @param isoverwrite 是否覆盖
*/
public static void uploadstream(string localpath, string hdfspath, boolean isoverwrite) throws exception {
fsdataoutputstream outputstream = filesystem.create(new path(hdfspath), isoverwrite);
fileinputstream inputstream = new fileinputstream(localpath);
ioutils.copybytes(inputstream, outputstream, 4096);
}
/**
* 下载hdfs文件
* @param hdfspath hdfs路径
* @param localpath 本地路径
*/
public static void downloadstream(string hdfspath, string localpath) throws illegalargumentexception, ioexception {
//先获取一个文件的输入流----针对hdfs上的
fsdatainputstream in = filesystem.open(new path(hdfspath));
//再构造一个文件的输出流----针对本地的
fileoutputstream out = new fileoutputstream(new file(localpath));
//再将输入流中数据传输到输出流
ioutils.copybytes(in, out, 4096);
}
/**
* 设置hdfs文件的权限
*/
public static void setfilepermission(){
try {
filesystem.setpermission(new path("/user/hdfs/test_username"), new fspermission("700"));
} catch (ioexception e) {
e.printstacktrace();
}
}
/**
* 设置hdfs指定目录目录空间大小
* hdfs dfsadmin -setspacequota 104857600 hdfs://192.168.0.110/user/hdfs/test_username
* @param path
* @param quota
*/
public static void setspacequota(path path, long quota) {
hdfsadmin hdfsadmin = null;
try {
hdfsadmin = new hdfsadmin(filesystem.geturi(), conf);
} catch (ioexception e) {
e.printstacktrace();
}
try {
hdfsadmin.setspacequota(path, quota);
system.out.println("成功设置hdfs的" + path.getname() + "目录空间大小为:" + quota);
} catch (ioexception e) {
e.printstacktrace();
}
}
/**
* 清除指定hdfs目录的所有配额限制
* @param path
*/
public static void clrallquota(path path) {
hdfsadmin hdfsadmin = null;
try {
hdfsadmin = new hdfsadmin(filesystem.geturi(), conf);
} catch (ioexception e) {
e.printstacktrace();
}
try {
hdfsadmin.clearspacequota(path);
hdfsadmin.clearquota(path);
system.out.println("成功清除hdfs的" + path.getname() + "目录的配额限制");
} catch (ioexception e) {
e.printstacktrace();
}
}
}
发表评论