目录
1.前言
本文是作者大数据专栏系列的其中一篇,前文中已经详细聊过分布式文件系统hdfs和分布式数据库hbase了,本文将会是它们的实操讲解。
hdfs相关前文:
【大数据】大数据概论与hadoop_大数据导论与hadoop-csdn博客
hbase相关前文:
2.hdfs
2.1.指令操作
创建目录:
递归创建目录:
上传文件到hdfs:
下载文件到本地:
删除文件:
递归删除目录:
查看目录内容:
递归查看目录内容:
查看文件详细信息:
移动或重命名文件:
复制文件、目录:
查看文件内容:
hdfs dfs -cat /user/mydir/file.txt
2.2.java api
首先这里有个巨坑:
一定要把core-site.xml里面的fs.defaultfs换成真实ip地址,不能用localhsot
<configuration
<property>
<name>hadoop.tmp.version</name>
<value>file:/usr/local/hadoop/tmp</value>
</property>
<property>
<name>fs.defaultfs</name>
<value>hdfs://localhost:9000</value>
</property>
</configuration>
如果java api的client端会先找hdfs拿到fs.defaultfs,然后再去访问拿到的地址上的hdfs,如果java api的client端和hdfs不在一台机器上,java api的client就会去访问它本地的localhost的9000端口上的服务,会直接报错:
connection refused: no further information
依赖:
代码示例:
import java.io.*;
import org.apache.hadoop.conf.configuration;
import org.apache.hadoop.fs.filesystem;
import org.apache.hadoop.fs.path;
import org.apache.hadoop.io.ioutils;
public class hdfssample {
public static void main(string[] args) throws ioexception {
configuration conf = new configuration();
filesystem fs = filesystem.get(conf);
// 创建目录
createdirectory(fs, "/user/hadoop/testdir");
// 上传文件
uploadfile(fs, "/user/hadoop/testfile.txt", "c:/localfile.txt");
// 下载文件
downloadfile(fs, "/user/hadoop/testfile.txt", "c:/downloadedfile.txt");
// 列出目录内容
listdirectory(fs, "/user/hadoop");
// 删除文件
deletefile(fs, "/user/hadoop/testfile.txt");
// 删除目录
deletedirectory(fs, "/user/hadoop/testdir");
// 关闭文件系统
fs.close();
}
private static void createdirectory(filesystem fs, string dirpath) throws ioexception {
fs.mkdirs(new path(dirpath));
system.out.println("directory created: " + dirpath);
}
private static void uploadfile(filesystem fs, string hdfspath, string localfilepath) throws ioexception {
path hdfspathobj = new path(hdfspath);
path localpathobj = new path(localfilepath);
fs.copyfromlocalfile(false, true, localpathobj, hdfspathobj);
system.out.println("file uploaded: " + localfilepath + " to " + hdfspath);
}
private static void downloadfile(filesystem fs, string hdfspath, string localfilepath) throws ioexception {
path hdfspathobj = new path(hdfspath);
path localpathobj = new path(localfilepath);
fs.copytolocalfile(true, hdfspathobj, localpathobj);
system.out.println("file downloaded: " + hdfspath + " to " + localfilepath);
}
private static void listdirectory(filesystem fs, string dirpath) throws ioexception {
for (filestatus file : fs.liststatus(new path(dirpath))) {
system.out.println("file/directory: " + file.getpath().tostring());
}
}
private static void deletefile(filesystem fs, string filepath) throws ioexception {
path filepathobj = new path(filepath);
if (fs.exists(filepathobj)) {
fs.delete(filepathobj, false);
system.out.println("file deleted: " + filepath);
} else {
system.out.println("file not found: " + filepath);
}
}
private static void deletedirectory(filesystem fs, string dirpath) throws ioexception {
path dirpathobj = new path(dirpath);
if (fs.exists(dirpathobj)) {
fs.delete(dirpathobj, true);
system.out.println("directory deleted: " + dirpath);
} else {
system.out.println("directory not found: " + dirpath);
}
}
}
3.hbase
3.1.指令操作
创建一个列族为info的student表:
往表里插数据:
查询单个:
get 'student', '1'
查询批量:
scan 'student'
条件批量查询:
scan 'student', {filter => "singlecolumnvaluefilter('info','age', >=, 'binary:20')"}
在hbase中,scan对象用于定义在表上进行扫描时的参数,包括哪些行和列需要被检索,以及如何处理这些数据。filter是scan的一部分,用于在服务器端对返回的数据进行过滤,以减少网络传输的数据量,提高查询效率。 filter类提供了一种方式来指定复杂的过滤逻辑,允许你基于行键(row key)、列族、列限定符和时间戳来筛选结果。以下是一些常见的filter类型及其用法:
-
rowfilter: 用于基于行键的比较,如rowfilter(=, 'binary:rowkey'),匹配特定的行键。
-
singlecolumnvaluefilter: 用于基于列族和列限定符的值进行比较,如singlecolumnvaluefilter('cf', 'qualifier', compareop.greater_or_equal,binarycomparator.valueof(bytes.tobytes(20))),匹配特定列族和列限定符的值大于或等于给定值的行。
-
prefixfilter: 用于匹配以特定前缀开头的行键,如prefixfilter(bytes.tobytes('row-prefix'))。
-
regexstringcomparator: 用于基于正则表达式匹配行键,如rowfilter(compareop.equal, regexstringcomparator('.pattern.'))。
-
multiplecolumnprefixfilter: 用于匹配具有相同前缀的多个列,如multiplecolumnprefixfilter(bytes.tobytes('col-prefix'))。
-
pagefilter: 用于限制返回结果的数量,这对于大数据量的扫描很有用,如pagefilter(pagesize),pagesize是你希望一次返回的最大行数。
-
timestampsfilter: 用于指定返回的行必须包含特定时间戳范围内的版本,如timestampsfilter(timestamps),timestamps是一个包含多个时间戳的列表。
-
valuefilter 和 qualifierfilter: 分别基于列值和列限定符进行过滤。
使用不同类型的过滤器的指令示例:
清理表:
3.2.java api
hbase也要注意和hdfs中相似的问题,hbase-site.xml中也要用真实的ip地址,不然java api的client端和hbase不在一台机器上的会,就会访问不到hbase,下面的代码中作为演示代码并没有用真实ip,仍然用的localhost,这点要注意。
依赖:
代码示例:
import org.apache.hadoop.conf.configuration;
import org.apache.hadoop.hbase.hbaseconfiguration;
import org.apache.hadoop.hbase.tablename;
import org.apache.hadoop.hbase.client.connection;
import org.apache.hadoop.hbase.client.connectionfactory;
import org.apache.hadoop.hbase.client.delete;
import org.apache.hadoop.hbase.client.get;
import org.apache.hadoop.hbase.client.put;
import org.apache.hadoop.hbase.client.result;
import org.apache.hadoop.hbase.client.table;
import org.apache.hadoop.hbase.util.bytes;
public class hbaseexample {
public static void main(string[] args) {
configuration config = hbaseconfiguration.create();
config.set("hbase.zookeeper.quorum", "localhost"); // 设置zookeeper地址
config.set("hbase.zookeeper.property.clientport", "2181"); // 设置zookeeper端口
try (connection connection = connectionfactory.createconnection(config);
table table = connection.gettable(tablename.valueof("students"))) {
// 创建表
table.createifnotexists();
// 插入数据
put put1 = new put(bytes.tobytes("student1"));
put1.addcolumn(bytes.tobytes("info"), bytes.tobytes("name"), bytes.tobytes("alice"));
put1.addcolumn(bytes.tobytes("info"), bytes.tobytes("age"), bytes.tobytes("20"));
put1.addcolumn(bytes.tobytes("info"), bytes.tobytes("major"), bytes.tobytes("cs"));
table.put(put1);
put put2 = new put(bytes.tobytes("student2"));
put2.addcolumn(bytes.tobytes("info"), bytes.tobytes("name"), bytes.tobytes("bob"));
put2.addcolumn(bytes.tobytes("info"), bytes.tobytes("age"), bytes.tobytes("21"));
put2.addcolumn(bytes.tobytes("info"), bytes.tobytes("major"), bytes.tobytes("math"));
table.put(put2);
// 查询数据
get get = new get(bytes.tobytes("student1"));
result result = table.get(get);
system.out.println("name: " + bytes.tostring(result.getvalue(bytes.tobytes("info"), bytes.tobytes("name"))));
system.out.println("age: " + bytes.toint(result.getvalue(bytes.tobytes("info"), bytes.tobytes("age"))));
system.out.println("major: " + bytes.tostring(result.getvalue(bytes.tobytes("info"), bytes.tobytes("major"))));
// 根据条件删除数据
delete delete = new delete(bytes.tobytes("student1"));
table.delete(delete);
} catch (ioexception e) {
e.printstacktrace();
}
}
}
发表评论