当前位置: 代码网 > it编程>编程语言>Java > 【大数据】HDFS、HBase操作教程(含指令和JAVA API)

【大数据】HDFS、HBase操作教程(含指令和JAVA API)

2024年07月31日 Java 我要评论
详解HDFS和HBASE的指令操作以及JAVA API。

目录

1.前言

2.hdfs

2.1.指令操作

2.2.java api

3.hbase

3.1.指令操作

3.2.java api


1.前言

本文是作者大数据专栏系列的其中一篇,前文中已经详细聊过分布式文件系统hdfs和分布式数据库hbase了,本文将会是它们的实操讲解。

hdfs相关前文:

【大数据】分布式文件系统hdfs-csdn博客

【大数据】大数据概论与hadoop_大数据导论与hadoop-csdn博客

hbase相关前文:

【大数据】分布式数据库hbase-csdn博客

【大数据】分布式数据库hbase下载安装教程-csdn博客

2.hdfs

2.1.指令操作

创建目录:

递归创建目录:

上传文件到hdfs:

下载文件到本地:

删除文件:

递归删除目录:

查看目录内容:

递归查看目录内容:

查看文件详细信息:

移动或重命名文件:

复制文件、目录:

查看文件内容:

hdfs dfs -cat /user/mydir/file.txt

2.2.java api

首先这里有个巨坑:

一定要把core-site.xml里面的fs.defaultfs换成真实ip地址,不能用localhsot

<configuration
        <property>
                <name>hadoop.tmp.version</name>
                <value>file:/usr/local/hadoop/tmp</value>
        </property>
        <property>
                <name>fs.defaultfs</name>
                <value>hdfs://localhost:9000</value>
        </property>
</configuration>

如果java api的client端会先找hdfs拿到fs.defaultfs,然后再去访问拿到的地址上的hdfs,如果java api的client端和hdfs不在一台机器上,java api的client就会去访问它本地的localhost的9000端口上的服务,会直接报错:

connection refused: no further information

依赖:

代码示例:

import java.io.*;
import org.apache.hadoop.conf.configuration;
import org.apache.hadoop.fs.filesystem;
import org.apache.hadoop.fs.path;
import org.apache.hadoop.io.ioutils;
​
public class hdfssample {
​
    public static void main(string[] args) throws ioexception {
        configuration conf = new configuration();
        filesystem fs = filesystem.get(conf);
​
        // 创建目录
        createdirectory(fs, "/user/hadoop/testdir");
​
        // 上传文件
        uploadfile(fs, "/user/hadoop/testfile.txt", "c:/localfile.txt");
​
        // 下载文件
        downloadfile(fs, "/user/hadoop/testfile.txt", "c:/downloadedfile.txt");
​
        // 列出目录内容
        listdirectory(fs, "/user/hadoop");
​
        // 删除文件
        deletefile(fs, "/user/hadoop/testfile.txt");
​
        // 删除目录
        deletedirectory(fs, "/user/hadoop/testdir");
​
        // 关闭文件系统
        fs.close();
    }
​
    private static void createdirectory(filesystem fs, string dirpath) throws ioexception {
        fs.mkdirs(new path(dirpath));
        system.out.println("directory created: " + dirpath);
    }
​
    private static void uploadfile(filesystem fs, string hdfspath, string localfilepath) throws ioexception {
        path hdfspathobj = new path(hdfspath);
        path localpathobj = new path(localfilepath);
        fs.copyfromlocalfile(false, true, localpathobj, hdfspathobj);
        system.out.println("file uploaded: " + localfilepath + " to " + hdfspath);
    }
​
    private static void downloadfile(filesystem fs, string hdfspath, string localfilepath) throws ioexception {
        path hdfspathobj = new path(hdfspath);
        path localpathobj = new path(localfilepath);
        fs.copytolocalfile(true, hdfspathobj, localpathobj);
        system.out.println("file downloaded: " + hdfspath + " to " + localfilepath);
    }
​
    private static void listdirectory(filesystem fs, string dirpath) throws ioexception {
        for (filestatus file : fs.liststatus(new path(dirpath))) {
            system.out.println("file/directory: " + file.getpath().tostring());
        }
    }
​
    private static void deletefile(filesystem fs, string filepath) throws ioexception {
        path filepathobj = new path(filepath);
        if (fs.exists(filepathobj)) {
            fs.delete(filepathobj, false);
            system.out.println("file deleted: " + filepath);
        } else {
            system.out.println("file not found: " + filepath);
        }
    }
​
    private static void deletedirectory(filesystem fs, string dirpath) throws ioexception {
        path dirpathobj = new path(dirpath);
        if (fs.exists(dirpathobj)) {
            fs.delete(dirpathobj, true);
            system.out.println("directory deleted: " + dirpath);
        } else {
            system.out.println("directory not found: " + dirpath);
        }
    }
}

3.hbase

3.1.指令操作

创建一个列族为info的student表:

往表里插数据:

查询单个:

get 'student', '1'

查询批量:

scan 'student'

条件批量查询:

scan 'student', {filter => "singlecolumnvaluefilter('info','age', >=, 'binary:20')"}

在hbase中,scan对象用于定义在表上进行扫描时的参数,包括哪些行和列需要被检索,以及如何处理这些数据。filter是scan的一部分,用于在服务器端对返回的数据进行过滤,以减少网络传输的数据量,提高查询效率。 filter类提供了一种方式来指定复杂的过滤逻辑,允许你基于行键(row key)、列族、列限定符和时间戳来筛选结果。以下是一些常见的filter类型及其用法:

  • rowfilter: 用于基于行键的比较,如rowfilter(=, 'binary:rowkey'),匹配特定的行键。

  • singlecolumnvaluefilter: 用于基于列族和列限定符的值进行比较,如singlecolumnvaluefilter('cf', 'qualifier', compareop.greater_or_equal,binarycomparator.valueof(bytes.tobytes(20))),匹配特定列族和列限定符的值大于或等于给定值的行。

  • prefixfilter: 用于匹配以特定前缀开头的行键,如prefixfilter(bytes.tobytes('row-prefix'))。

  • regexstringcomparator: 用于基于正则表达式匹配行键,如rowfilter(compareop.equal, regexstringcomparator('.pattern.'))。

  • multiplecolumnprefixfilter: 用于匹配具有相同前缀的多个列,如multiplecolumnprefixfilter(bytes.tobytes('col-prefix'))。

  • pagefilter: 用于限制返回结果的数量,这对于大数据量的扫描很有用,如pagefilter(pagesize),pagesize是你希望一次返回的最大行数。

  • timestampsfilter: 用于指定返回的行必须包含特定时间戳范围内的版本,如timestampsfilter(timestamps),timestamps是一个包含多个时间戳的列表。

  • valuefilter 和 qualifierfilter: 分别基于列值和列限定符进行过滤。

使用不同类型的过滤器的指令示例:

清理表:

3.2.java api

hbase也要注意和hdfs中相似的问题,hbase-site.xml中也要用真实的ip地址,不然java api的client端和hbase不在一台机器上的会,就会访问不到hbase,下面的代码中作为演示代码并没有用真实ip,仍然用的localhost,这点要注意。

依赖:

代码示例:

import org.apache.hadoop.conf.configuration;
import org.apache.hadoop.hbase.hbaseconfiguration;
import org.apache.hadoop.hbase.tablename;
import org.apache.hadoop.hbase.client.connection;
import org.apache.hadoop.hbase.client.connectionfactory;
import org.apache.hadoop.hbase.client.delete;
import org.apache.hadoop.hbase.client.get;
import org.apache.hadoop.hbase.client.put;
import org.apache.hadoop.hbase.client.result;
import org.apache.hadoop.hbase.client.table;
import org.apache.hadoop.hbase.util.bytes;

public class hbaseexample {

    public static void main(string[] args) {
        configuration config = hbaseconfiguration.create();
        config.set("hbase.zookeeper.quorum", "localhost"); // 设置zookeeper地址
        config.set("hbase.zookeeper.property.clientport", "2181"); // 设置zookeeper端口

        try (connection connection = connectionfactory.createconnection(config);
             table table = connection.gettable(tablename.valueof("students"))) {

            // 创建表
            table.createifnotexists();

            // 插入数据
            put put1 = new put(bytes.tobytes("student1"));
            put1.addcolumn(bytes.tobytes("info"), bytes.tobytes("name"), bytes.tobytes("alice"));
            put1.addcolumn(bytes.tobytes("info"), bytes.tobytes("age"), bytes.tobytes("20"));
            put1.addcolumn(bytes.tobytes("info"), bytes.tobytes("major"), bytes.tobytes("cs"));
            table.put(put1);

            put put2 = new put(bytes.tobytes("student2"));
            put2.addcolumn(bytes.tobytes("info"), bytes.tobytes("name"), bytes.tobytes("bob"));
            put2.addcolumn(bytes.tobytes("info"), bytes.tobytes("age"), bytes.tobytes("21"));
            put2.addcolumn(bytes.tobytes("info"), bytes.tobytes("major"), bytes.tobytes("math"));
            table.put(put2);

            // 查询数据
            get get = new get(bytes.tobytes("student1"));
            result result = table.get(get);
            system.out.println("name: " + bytes.tostring(result.getvalue(bytes.tobytes("info"), bytes.tobytes("name"))));
            system.out.println("age: " + bytes.toint(result.getvalue(bytes.tobytes("info"), bytes.tobytes("age"))));
            system.out.println("major: " + bytes.tostring(result.getvalue(bytes.tobytes("info"), bytes.tobytes("major"))));

            // 根据条件删除数据
            delete delete = new delete(bytes.tobytes("student1"));
            table.delete(delete);

        } catch (ioexception e) {
            e.printstacktrace();
        }
    }
}

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com