安装一下hadoop并配置一下ld_library_path
export hadoop_version=2.10.1
export hadoop_home=/opt/hadoop-$hadoop_version
# add hadoop java libraries to your classpath, and
# add native libraries to ld_library_path
export classpath=`$hadoop_home/bin/hadoop classpath --glob`
export hadoop_opts="$hadoop_opts -djava.library.path=$hadoop_home/lib/native"
export ld_library_path=$ld_library_path:$hadoop_home/lib/native/
这几个库目前用不到,但是cmakelists.txt里面会用到一个libhdfs.so。后面再说。完事以后就可以用apache arrow来读写hdfs的parquet文件了。代码如下,cmakelists.txt
cmake_minimum_required(version 2.6)
project(lexical_cast)
add_definitions(-std=c++14)
set( env{arrow_libhdfs_dir} /opt/hadoop-2.10.1/lib/native )
include_directories("/usr/local/include" "/usr/include")
link_directories("/usr/local/lib" "/usr/lib/x86_64-linux-gnu" "/opt/hadoop-2.10.1/lib/native")
file( glob app_sources ${cmake_current_source_dir}/*.cpp)
foreach( sourcefile ${app_sources} )
file(relative_path filename ${cmake_current_source_dir} ${sourcefile})
string(replace ".cpp" "" file ${filename})
add_executable(${file} ${sourcefile})
target_link_libraries(${file} boost_filesystem boost_thread boost_system boost_serialization pthread boost_chrono arrow parquet hdfs)
endforeach( sourcefile ${app_sources} )
注意到这里比读写本地parquet文件多了一个hdfs库,位于/opt/hadoop-2.10.1/lib/native目录,就是本地hdfs安装的目录,否则会出现找不到链接库文件错误。
写入hdfs parquet文件
#include <arrow/io/hdfs.h>
#include <parquet/stream_writer.h>
#include <iostream>
#include <string>
#include <vector>
#include <map>
struct article {
std::string name;
float price;
int quantity;
};
std::vector<article> get_articles() {
std::vector<article> articles {
article {"南昌好景色", 35.0f, 20},
article {"武汉好风景", 24.0f, 30},
article {"北京王府井", 50.0f, 10}
};
return std::move(articles);
}
int main(int argc, char* argv[]) {
std::shared_ptr<arrow::io::hadoopfilesystem> fs;
std::unordered_map<std::string, std::string> extraconf;
arrow::io::hdfsconnectionconfig connectcfg {"172.18.0.2", 0, "root", "", extraconf};
auto connectres = arrow::io::hadoopfilesystem::connect(&connectcfg , &fs);
if(!connectres.ok()) {
std::cerr << "连接到hdfs失败, error: " << connectres.message() << std::endl;
return -1;
}
std::shared_ptr<arrow::io::hdfsoutputstream> out_file;
auto streamres = fs->openwritable("/test.parquet", false, &out_file);
if(!streamres.ok()) {
std::cerr << "连接到hdfs失败, error: " << streamres.message() << std::endl;
return -2;
}
parquet::writerproperties::builder builder;
parquet::schema::nodevector fields;
fields.push_back(parquet::schema::primitivenode::make(
"name", parquet::repetition::optional, parquet::type::byte_array,
parquet::convertedtype::utf8));
fields.push_back(parquet::schema::primitivenode::make(
"price", parquet::repetition::required, parquet::type::float,
parquet::convertedtype::none, -1));
fields.push_back(parquet::schema::primitivenode::make(
"quantity", parquet::repetition::required, parquet::type::int32,
parquet::convertedtype::int_32, -1));
std::shared_ptr<parquet::schema::groupnode> schema = std::static_pointer_cast<parquet::schema::groupnode>(
parquet::schema::groupnode::make("schema", parquet::repetition::required, fields));
parquet::streamwriter os {parquet::parquetfilewriter::open(out_file, schema, builder.build())};
for(const auto& a: get_articles()) {
os << a.name << a.price << a.quantity << parquet::endrow;
}
return 0;
}
读出hdfs parquet文件
#include <arrow/io/hdfs.h>
#include <parquet/stream_reader.h>
#include <iostream>
struct article {
std::string name;
float price;
int quantity;
};
int main(int argc, char* argv[]) {
std::shared_ptr<arrow::io::hadoopfilesystem> fs;
std::unordered_map<std::string, std::string> extraconf;
arrow::io::hdfsconnectionconfig connectcfg {"172.18.0.2", 0, "root", "", extraconf};
auto connectres = arrow::io::hadoopfilesystem::connect(&connectcfg , &fs);
if(!connectres.ok()) {
std::cerr << "连接到hdfs失败, error: " << connectres.message() << std::endl;
return -1;
}
std::shared_ptr<arrow::io::hdfsreadablefile> infile;
auto streamres = fs->openreadable("/test.parquet", false, &infile);
if(!streamres.ok()) {
std::cerr << "连接到hdfs失败, error: " << streamres.message() << std::endl;
return -2;
}
parquet::streamreader is {parquet::parquetfilereader::open(infile)};
article arti;
while(!is.eof()) {
is >> arti.name >> arti.price >> arti.quantity >> parquet::endrow;
std::cout << arti.name << " " << arti.price << " " << arti.quantity << std::endl;
}
return 0;
}
发表评论