当前位置: 代码网 > it编程>编程语言>Java > Apache Arrow User Guide——使用Apache Arrow读写HDFS中的parquet文件

Apache Arrow User Guide——使用Apache Arrow读写HDFS中的parquet文件

2024年08月02日 Java 我要评论
这几个库目前用不到,但是CMakeLists.txt里面会用到一个libhdfs.so。完事以后就可以用Apache Arrow来读写HDFS的parquet文件了。代码如下,CMakeLists.txt。注意到这里比读写本地parquet文件多了一个hdfs库,位于/opt/hadoop-2.10.1/lib/native目录,就是本地HDFS安装的目录,否则会出现找不到链接库文件错误。安装一下HADOOP并配置一下LD_LIBRARY_PATH。

安装一下hadoop并配置一下ld_library_path

export hadoop_version=2.10.1
export hadoop_home=/opt/hadoop-$hadoop_version

# add hadoop java libraries to your classpath, and
# add native libraries to ld_library_path
export classpath=`$hadoop_home/bin/hadoop classpath --glob`
export hadoop_opts="$hadoop_opts -djava.library.path=$hadoop_home/lib/native"
export ld_library_path=$ld_library_path:$hadoop_home/lib/native/

这几个库目前用不到,但是cmakelists.txt里面会用到一个libhdfs.so。后面再说。完事以后就可以用apache arrow来读写hdfs的parquet文件了。代码如下,cmakelists.txt

cmake_minimum_required(version 2.6)
project(lexical_cast)

add_definitions(-std=c++14)
set( env{arrow_libhdfs_dir} /opt/hadoop-2.10.1/lib/native )

include_directories("/usr/local/include" "/usr/include")
link_directories("/usr/local/lib" "/usr/lib/x86_64-linux-gnu" "/opt/hadoop-2.10.1/lib/native")
file( glob app_sources ${cmake_current_source_dir}/*.cpp)
foreach( sourcefile ${app_sources} )
    file(relative_path filename ${cmake_current_source_dir} ${sourcefile})
    string(replace ".cpp" "" file ${filename})
    add_executable(${file} ${sourcefile})
    target_link_libraries(${file} boost_filesystem boost_thread boost_system boost_serialization pthread boost_chrono arrow parquet hdfs)
endforeach( sourcefile ${app_sources} )

注意到这里比读写本地parquet文件多了一个hdfs库,位于/opt/hadoop-2.10.1/lib/native目录,就是本地hdfs安装的目录,否则会出现找不到链接库文件错误。

写入hdfs parquet文件

#include <arrow/io/hdfs.h>

#include <parquet/stream_writer.h>

#include <iostream>
#include <string>
#include <vector>
#include <map>

struct article {
    std::string name;
    float price;
    int quantity;
};

std::vector<article> get_articles() {
    std::vector<article> articles {
        article {"南昌好景色", 35.0f, 20},
        article {"武汉好风景", 24.0f, 30},
        article {"北京王府井", 50.0f, 10}
    };
    return std::move(articles);
}

int main(int argc, char* argv[]) {

    std::shared_ptr<arrow::io::hadoopfilesystem> fs;
    std::unordered_map<std::string, std::string> extraconf;
    arrow::io::hdfsconnectionconfig connectcfg {"172.18.0.2", 0, "root", "", extraconf};

    auto connectres = arrow::io::hadoopfilesystem::connect(&connectcfg , &fs);

    if(!connectres.ok()) {
        std::cerr << "连接到hdfs失败, error: " << connectres.message() << std::endl;
        return -1;
    }

    std::shared_ptr<arrow::io::hdfsoutputstream> out_file;
    auto streamres = fs->openwritable("/test.parquet", false, &out_file);

    if(!streamres.ok()) {
        std::cerr << "连接到hdfs失败, error: " << streamres.message() << std::endl;
        return -2;
    }
    
    parquet::writerproperties::builder builder;
    parquet::schema::nodevector fields;

    fields.push_back(parquet::schema::primitivenode::make(
        "name", parquet::repetition::optional, parquet::type::byte_array,
        parquet::convertedtype::utf8));

    fields.push_back(parquet::schema::primitivenode::make(
        "price", parquet::repetition::required, parquet::type::float,
        parquet::convertedtype::none, -1));

    fields.push_back(parquet::schema::primitivenode::make(
        "quantity", parquet::repetition::required, parquet::type::int32,
        parquet::convertedtype::int_32, -1));
    
    std::shared_ptr<parquet::schema::groupnode> schema = std::static_pointer_cast<parquet::schema::groupnode>(
      parquet::schema::groupnode::make("schema", parquet::repetition::required, fields));

    parquet::streamwriter os {parquet::parquetfilewriter::open(out_file, schema, builder.build())};

    for(const auto& a: get_articles()) {
        os << a.name << a.price << a.quantity << parquet::endrow;
    }
    
    return 0;
}

读出hdfs parquet文件

#include <arrow/io/hdfs.h>

#include <parquet/stream_reader.h>

#include <iostream>

struct article {
    std::string name;
    float price;
    int quantity;
};

int main(int argc, char* argv[]) {

    std::shared_ptr<arrow::io::hadoopfilesystem> fs;
    std::unordered_map<std::string, std::string> extraconf;
    arrow::io::hdfsconnectionconfig connectcfg {"172.18.0.2", 0, "root", "", extraconf};

    auto connectres = arrow::io::hadoopfilesystem::connect(&connectcfg , &fs);

    if(!connectres.ok()) {
        std::cerr << "连接到hdfs失败, error: " << connectres.message() << std::endl;
        return -1;
    }

    
    std::shared_ptr<arrow::io::hdfsreadablefile> infile;
    auto streamres = fs->openreadable("/test.parquet", false, &infile);

    if(!streamres.ok()) {
        std::cerr << "连接到hdfs失败, error: " << streamres.message() << std::endl;
        return -2;
    }
    
    parquet::streamreader is {parquet::parquetfilereader::open(infile)};

    article arti;

    while(!is.eof()) {
        is >> arti.name >> arti.price >> arti.quantity >> parquet::endrow;
        std::cout << arti.name << " " << arti.price << " " << arti.quantity << std::endl;
    }
    
    return 0;
}
(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com