hadoop 概述
hadoop 是一个由apache 软件基金会开发的分布式基础架构。用户可以在不了解分布式底层细节的情况下,开发分布式程序,充分利用集群的威力进行高速运算和存储。
hadoop 实现一个分布式文件系统(hadoop distributed file system, hdfs)。hdfs具有高容错性的特点,并设计它用来部署在廉价的硬件上,而且它提供高吞吐量来访问应用程序的数据,适合那些有着超大数据集的应用程序。hadoop 框架的核心设计是hdfs和mapreduce。hdfs为海量数据提供了存储,而mapreduce则为海量的数据提供了计算。
hadoop 核心三大组件
- hdfs
- mapreduce
- yarn
基于linux 安装hadoop 3 伪分布式版本/单机版本
请参考文章:centos7 安装hadoop3 单机版本(伪分布式版本)
hdfs 原理
hdfs采用master/slave架构。
hdfs系统架构图,主要有三个角色,client、namenode、datanode。
hdfs的一些关键元素
- block:将文件分块,通常为64m。
- namenode:是master节点,是大领导。管理数据块映射;处理客户端的读写请求;配置副本策略;管理hdfs的名称空间。保存整个文件系统的目录信息、文件信息及分块信息,由唯一一台主机专门保存。
- secondarynamenode:是一个小弟,分担大哥namenode的工作量;是namenode的冷备份;合并fsimage和fsedits然后再发给namenode。(热备份:b是a的热备份,如果a坏掉。那么b马上运行代替a的工作。冷备份:b是a的冷备份,如果a坏掉。那么b不能马上代替a工作。但是b上存储a的一些信息,减少a坏掉之后的损失。)
- datanode:是slave节点,奴隶,干活的。负责存储client发来的数据块block;执行数据块的读写操作。
- fsimage:元数据镜像文件(文件系统的目录树)
- edits:元数据的操作日志(针对文件系统做的修改操作记录)
hdfs设计重点
- hdfs 数据备份hdfs被设计成一个可以在大集群中、跨机器、可靠的存储海量数据的框架。它将所有文件存储成block块组成的序列,除了最后一个block块,所有的block块大小都是一样的。
- hdfs中的文件默认规则是write one(一次写、多次读)的,并且严格要求在任何时候只有一个writer。
- namenode全权管理数据块的复制,它周期性地从集群中的每个datanode接受心跳信号和块状态报告(blockreport)。接收到心跳信号以为该datanode工作正常,块状态报告包含了一个该datanode上所有数据块的列表。
- namenode内存中存储的是=fsimage+edits。secondarynamenode负责定时(默认1小时)从namenode上,获取fsimage和edits来进行合并,然后再发送给namenode。减少namenode的工作量。
hdfs 文件写入
client向namenode发起文件写入的请求。
- namenode根据文件大小和文件块配置情况,返回给client它所管理部分datanode的信息。
- client将文件划分为多个block块,并根据datanode的地址信息,按顺序写入到每一个datanode块中。
实例讲解:有一个文件filea,100m大小。client将filea写入到hdfs上。
hadoop 环境配置说明:
- hdfs分布在三个机架上rack1,rack2,rack3。
文件写入过程如下:
- client将filea按64m分块。分成两块,block1和block2;
- client向namenode发送写数据请求,如图蓝色虚线①------>。
- namenode节点,记录block信息。并返回可用的datanode,如粉色虚线②--------->。
- block1: host2,host1,host3
- block2: host7,host8,host4
- 原理:
- namenode具有rackaware机架感知功能,这个可以配置。
- 若client为datanode节点,那存储block时,规则为:副本1,同client的节点上;副本2,不同机架节点上;副本3,同第二个副本机架的另一个节点上;其他副本随机挑选。
- 若client不为datanode节点,那存储block时,规则为:副本1,随机选择一个节点上;副本2,不同副本1,机架上;副本3,同副本2相同的另一个节点上;其他副本随机挑选。
- client向datanode发送block1;发送过程是以流式写入。流式写入过程如下:
- 将64m的block1按64k的package划分;
- 然后将第一个package发送给host2;
- host2接收完后,将第一个package发送给host1,同时client想host2发送第二个package;
- host1接收完第一个package后,发送给host3,同时接收host2发来的第二个package。
- 以此类推,如图红线实线所示,直到将block1发送完毕。
- host2,host1,host3向namenode,host2向client发送通知,说“消息发送完了”。如图粉红颜色实线所示。
- client收到host2发来的消息后,向namenode发送消息,说我写完了。这样就真完成了。如图黄色粗实线
- 发送完block1后,再向host7、host8、host4发送block2,如图蓝色实线所示。
- 发送完block2后,host7、host8、host4向namenode,host7向client发送通知,如图浅绿色实线所示。
- client向namenode发送消息,说我写完了,如图黄色粗实线。。。这样就完毕了。
- 分析:通过写过程,我们可以了解到
- 写1t文件,我们需要3t的存储,3t的网络流量贷款。
- 在执行读或写的过程中,namenode和datanode通过heartbeat进行保存通信,确定datanode活着。如果发现datanode死掉了,就将死掉的datanode上的数据,放到其他节点去。读取时,要读其他节点去。
- 挂掉一个节点,没关系,还有其他节点可以备份;甚至,挂掉某一个机架,也没关系;其他机架上,也有备份。
hdfs 文件读取
client向namenode发起文件读取的请求。
- client向namenode发起文件读取的请求。
- namenode返回文件存储的block块信息、及其block块所在datanode的信息。
- client读取文件信息。
如图所示,client要从datanode上,读取filea。而filea由block1和block2组成。读操作流程如下:
- client向namenode发送读请求。
- namenode查看metadata信息,返回filea的block的位置。
- block1:host2,host1,host3
- block2:host7,host8,host4
- block的位置是有先后顺序的,先读block1,再读block2。而且block1去host2上读取;然后block2,去host7上读取。
上面例子中,client位于机架外,那么如果client位于机架内某个datanode上,例如,client是host6。那么读取的时候,遵循的规律是:优选读取本机架上的数据。
hdfs 数据备份
备份数据的存放是hdfs可靠性和性能的关键。hdfs采用一种称为rack-aware的策略来决定备份数据的存放。
通过一个称为rack awareness的过程,namenode决定每个datanode所属rack id。
缺省情况下,一个block块会有三个备份:
- 一个在namenode指定的datanode上
- 一个在指定datanode非同一rack的datanode上
- 一个在指定datanode同一rack的datanode上。
这种策略综合考虑了同一rack失效、以及不同rack之间数据复制性能问题。
副本的选择:为了降低整体的带宽消耗和读取延时,hdfs会尽量读取最近的副本。如果在同一个rack上有一个副本,那么就读该副本。如果一个hdfs集群跨越多个数据中心,那么将首先尝试读本地数据中心的副本。
hdfs shell 命令
hdfs shell 简介
调用文件系统(hdfs)shell命令应使用 bin/hadoop fs <args>的形式。 所有的的fs shell命令使用uri路径作为参数。uri格式是scheme://authority/path。对hdfs文件系统,scheme是hdfs,对本地文件系统,scheme是file。其中scheme和authority参数都是可选的,如果未加指定,就会使用配置中指定的默认scheme。
hdfs shell 命令详解
cat
使用方法:hadoop fs -cat uri [uri …]
将路径指定文件的内容输出到stdout。
示例:
- hadoop fs -cat hdfs://host1:port1/file1 hdfs://host2:port2/file2
- hadoop fs -cat file:///file3 /user/hadoop/file4
返回值:
成功返回0,失败返回-1。
chgrp
使用方法:hadoop fs -chgrp [-r] group uri [uri …] change group association of files. with -r, make the change recursively through the directory structure. the user must be the owner of files, or else a super-user. additional information is in the permissions user guide. -->
改变文件所属的组。使用-r将使改变在目录结构下递归进行。命令的使用者必须是文件的所有者或者超级用户。更多的信息请参见hdfs权限用户指南。
chmod
使用方法:hadoop fs -chmod [-r] <mode[,mode]... | octalmode> uri [uri …]
改变文件的权限。使用-r将使改变在目录结构下递归进行。命令的使用者必须是文件的所有者或者超级用户。更多的信息请参见hdfs权限用户指南。
chown
使用方法:hadoop fs -chown [-r] [owner][:[group]] uri [uri ]
改变文件的拥有者。使用-r将使改变在目录结构下递归进行。命令的使用者必须是超级用户。更多的信息请参见hdfs权限用户指南。
copyfromlocal
使用方法:hadoop fs -copyfromlocal <localsrc> uri
除了限定源路径是一个本地文件外,和put命令相似。
copytolocal
使用方法:hadoop fs -copytolocal [-ignorecrc] [-crc] uri <localdst>
除了限定目标路径是一个本地文件外,和get命令类似。
cp
使用方法:hadoop fs -cp uri [uri …] <dest>
将文件从源路径复制到目标路径。这个命令允许有多个源路径,此时目标路径必须是一个目录。
示例:
- hadoop fs -cp /user/hadoop/file1 /user/hadoop/file2
- hadoop fs -cp /user/hadoop/file1 /user/hadoop/file2 /user/hadoop/dir
返回值:
成功返回0,失败返回-1。
du
使用方法:hadoop fs -du uri [uri …]
显示目录中所有文件的大小,或者当只指定一个文件时,显示此文件的大小。
示例:
hadoop fs -du /user/hadoop/dir1 /user/hadoop/file1 hdfs://host:port/user/hadoop/dir1
返回值:
成功返回0,失败返回-1。
dus
使用方法:hadoop fs -dus <args>
显示文件的大小。
expunge
使用方法:hadoop fs -expunge
清空回收站。请参考hdfs设计文档以获取更多关于回收站特性的信息。
get
使用方法:hadoop fs -get [-ignorecrc] [-crc] <src> <localdst>
复制文件到本地文件系统。可用-ignorecrc选项复制crc校验失败的文件。使用-crc选项复制文件以及crc信息。
示例:
- hadoop fs -get /user/hadoop/file localfile
- hadoop fs -get hdfs://host:port/user/hadoop/file localfile
返回值:
成功返回0,失败返回-1。
getmerge
使用方法:hadoop fs -getmerge <src> <localdst> [addnl]
接受一个源目录和一个目标文件作为输入,并且将源目录中所有的文件连接成本地目标文件。addnl是可选的,用于指定在每个文件结尾添加一个换行符。
ls
使用方法:hadoop fs -ls <args>
如果是文件,则按照如下格式返回文件信息:
文件名 <副本数> 文件大小 修改日期 修改时间 权限 用户id 组id
如果是目录,则返回它直接子文件的一个列表,就像在unix中一样。目录返回列表的信息如下:
目录名 <dir> 修改日期 修改时间 权限 用户id 组id
示例:
hadoop fs -ls /user/hadoop/file1 /user/hadoop/file2 hdfs://host:port/user/hadoop/dir1 /nonexistentfile
返回值:
成功返回0,失败返回-1。
lsr
使用方法:hadoop fs -lsr <args>
ls命令的递归版本。类似于unix中的ls -r。
mkdir
使用方法:hadoop fs -mkdir <paths>
接受路径指定的uri作为参数,创建这些目录。其行为类似于unix的mkdir -p,它会创建路径中的各级父目录。
示例:
- hadoop fs -mkdir /user/hadoop/dir1 /user/hadoop/dir2
- hadoop fs -mkdir hdfs://host1:port1/user/hadoop/dir hdfs://host2:port2/user/hadoop/dir
返回值:
成功返回0,失败返回-1。
movefromlocal
使用方法:dfs -movefromlocal <src> <dst>
输出一个”not implemented“信息。
mv
使用方法:hadoop fs -mv uri [uri …] <dest>
将文件从源路径移动到目标路径。这个命令允许有多个源路径,此时目标路径必须是一个目录。不允许在不同的文件系统间移动文件。
示例:
- hadoop fs -mv /user/hadoop/file1 /user/hadoop/file2
- hadoop fs -mv hdfs://host:port/file1 hdfs://host:port/file2 hdfs://host:port/file3 hdfs://host:port/dir1
返回值:
成功返回0,失败返回-1。
put
使用方法:hadoop fs -put <localsrc> ... <dst>
从本地文件系统中复制单个或多个源路径到目标文件系统。也支持从标准输入中读取输入写入目标文件系统。
- hadoop fs -put localfile /user/hadoop/hadoopfile
- hadoop fs -put localfile1 localfile2 /user/hadoop/hadoopdir
- hadoop fs -put localfile hdfs://host:port/hadoop/hadoopfile
- hadoop fs -put - hdfs://host:port/hadoop/hadoopfile
从标准输入中读取输入。
返回值:
成功返回0,失败返回-1。
rm
使用方法:hadoop fs -rm uri [uri …]
删除指定的文件。只删除非空目录和文件。请参考rmr命令了解递归删除。
示例:
- hadoop fs -rm hdfs://host:port/file /user/hadoop/emptydir
返回值:
成功返回0,失败返回-1。
rmr
使用方法:hadoop fs -rmr uri [uri …]
delete的递归版本。
示例:
- hadoop fs -rmr /user/hadoop/dir
- hadoop fs -rmr hdfs://host:port/user/hadoop/dir
返回值:
成功返回0,失败返回-1。
setrep
使用方法:hadoop fs -setrep [-r] <path>
改变一个文件的副本系数。-r选项用于递归改变目录下所有文件的副本系数。
示例:
- hadoop fs -setrep -w 3 -r /user/hadoop/dir1
返回值:
成功返回0,失败返回-1。
stat
使用方法:hadoop fs -stat uri [uri …]
返回指定路径的统计信息。
示例:
- hadoop fs -stat path
返回值:
成功返回0,失败返回-1。
tail
使用方法:hadoop fs -tail [-f] uri
将文件尾部1k字节的内容输出到stdout。支持-f选项,行为和unix中一致。
示例:
- hadoop fs -tail pathname
返回值:
成功返回0,失败返回-1。
test
使用方法:hadoop fs -test -[ezd] uri
选项:
-e 检查文件是否存在。如果存在则返回0。
-z 检查文件是否是0字节。如果是则返回0。
-d 如果路径是个目录,则返回1,否则返回0。
示例:
- hadoop fs -test -e filename
text
使用方法:hadoop fs -text <src>
将源文件输出为文本格式。允许的格式是zip和textrecordinputstream。
touchz
使用方法:hadoop fs -touchz uri [uri …]
创建一个0字节的空文件。
示例:
- hadoop -touchz pathname
返回值:
成功返回0,失败返回-1。
hdfs python api
hdfs 除了通过hdfs shell 命令的方式进行操作,还可以通过java api、python api、c++ api等方式进行编程操作。在使用python api 编程前,需要安装hdfs 依赖的第三方库:pyhdfs
pyhdfs
官方文档地址:https://pyhdfs.readthedocs.io/en/latest/pyhdfs.html
原文简介:
webhdfs client with support for nn ha and automatic error checking
for details on the webhdfs endpoints, see the hadoop documentation:
https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/webhdfs.html
https://hadoop.apache.org/docs/current/api/org/apache/hadoop/fs/filesystem.html
https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/filesystem/filesystem.html
大致意思是:pyhdfs 模块/库使用webhdfs 客户端连接hdfs,支持nn ha 和自动错误检测,详细使用参考官方文档地址。
pyhdfs 核心类
python 3 使用 pyhdfs(重点是hdfsclient 类)
安装pyhdfs 模块/库
pip install pyhdfs
pyhdfs 使用实例
# _*_ coding : utf-8_*_
# 开发者 : zhuozhiwengang
# 开发时间 : 2023/8/11 22:35
# 文件名称 : pythonhdfs_1
# 开发工具 : pycharm
import pyhdfs
# 基于pyhdfs 模块, 连接hadoop 主机:9870 端口
fs = pyhdfs.hdfsclient(hosts="192.168.43.11:9870", user_name="root")
# 返回用户根目录
print(fs.get_home_directory())
# 返回可用namenode节点
print(fs.get_active_namenode())
# 返回指定目录下所有文件
print(fs.listdir("/"))
# hadoop 创建指定目录
fs.mkdirs('/uploads')
# 再次执行返货指定目录下所有文件
print(fs.listdir("/"))
# 执行本地文件上传hadoop 指定目录
# fs.copy_from_local("d:\one.txt", '/uploads/one.txt')
# 执行hadoop 文件下载
# fs.copy_to_local("/uploads/one.txt", r'd:\two.txt')
# 判断目录是否存在
print(fs.exists("/uploads"))
# 返回目录下的所有目录,路径,文件名
print(list(fs.walk('/uploads')))
# 删除目录/文件
fs.delete("/uploads", recursive=true) # 删除目录 recursive=true
fs.delete("/uploads/one.txt") # 删除文件
python 3 通用封装 pyhdfs
# _*_ coding : utf-8_*_
# 开发者 : zhuozhiwengang
# 开发时间 : 2023/8/11 22:35
# 文件名称 : pythonhdfs
# 开发工具 : pycharm
import sys
import pyhdfs
class hdfs:
def __init__(self, host='192.168.43.11',user_name='root'):
self.host = host
self.user_name=user_name
def get_con(self):
try:
hdfs = pyhdfs.hdfsclient(hosts = self.host,user_name = self.user_name)
return hdfs
except pyhdfs.hdfsexception,e:
print "error:%s" % e
# 返回指定目录下的所有文件
def listdir(self,oper):
try:
client = self.get_con()
dirs = client.listdir(oper)
for row in dirs:
print row
except pyhdfs.hdfsexception, e:
print "error:%s" % e
# 返回用户的根目录
def get_home_directory(self):
try:
client = self.get_con()
print client.get_home_directory()
except pyhdfs.hdfsexception, e:
print "error:%s" % e
# 返回可用的namenode节点
def get_active_namenode(self):
try:
client = self.get_con()
print client.get_active_namenode()
except pyhdfs.hdfsexception, e:
print "error:%s" % e
# 创建新目录
def mkdirs(self,oper):
try:
client = self.get_con()
print client.mkdirs(oper)
except pyhdfs.hdfsexception, e:
print "error:%s" % e
# 从集群上copy到本地
def copy_to_local(self, dest,localsrc):
try:
client = self.get_con()
print client.copy_to_local(dest,localsrc)
except pyhdfs.hdfsexception, e:
print "error:%s" % e
# 从本地上传文件至集群
def copy_from_local(self, localsrc, dest):
try:
client = self.get_con()
print client.copy_from_local(localsrc, dest)
except pyhdfs.hdfsexception, e:
print "error:%s" % e
# 查看文件内容
def read_files(self,oper):
try:
client = self.get_con()
response = client.open(oper)
print response.read()
except pyhdfs.hdfsexception, e:
print "error:%s" % e
# 向一个已经存在的文件追加内容
def append_files(self, file,content):
try:
client = self.get_con()
print client.append(file,content)
except pyhdfs.hdfsexception, e:
print "error:%s" % e
# 查看是否存在文件
def check_files(self,oper):
try:
client = self.get_con()
print client.exists(oper)
except pyhdfs.hdfsexception, e:
print "error:%s" % e
# 查看文件的校验和
def get_file_checksum(self,oper):
try:
client = self.get_con()
print client.get_file_checksum(oper)
except pyhdfs.hdfsexception, e:
print "error:%s" % e
# 查看路径总览信息
def get_content_summary(self,oper):
try:
client = self.get_con()
print client.get_content_summary(oper)
except pyhdfs.hdfsexception, e:
print "error:%s" % e
# 查看当前路径的状态
def list_status(self,oper):
try:
client = self.get_con()
print client.list_status(oper)
except pyhdfs.hdfsexception, e:
print "error:%s" % e
# 删除文件
def delete_files(self,path):
try:
client = self.get_con()
print client.delete(path)
except pyhdfs.hdfsexception, e:
print "error:%s" % e
if __name__ == '__main__':
db = hdfs('hadoop3-master','root')
# db.listdir('/user')
# db.get_home_directory()
# db.get_active_namenode()
# db.mkdirs('/dave')
# db.copy_from_local("d:/one.txt","/uploads/two.txt")
# db.listdir('/uploads')
# db.read_files('/uploads/two.txt')
# db.check_files('/uploads/two.txt')
# db.get_file_checksum('/uploads/two.txt')
# db.get_content_summary('/')
# db.list_status('/')
# db.list_status('/uploads/two.txt')
# db.copy_to_local("/uploads/two.txt","d:/one.txt")
# db.append_files('/uploads/two.txt',"88, csdn 博客")
# db.read_files('/uploads/two.txt')
# db.copy_from_local("d:/three.txt", "/uploads/two.txt")
db.listdir('/uploads')
# db.delete_files('/uploads/two.txt')
发表评论