2. 分布式文件系统 hdfs
1. 引入hdfs【面试点】
问题一:如果一个文件中有 10 个数值,一行一个,并且都可以用 int 来度量。现在求 10 个数值的和
思路:
- 逐行读取文件的内容
- 把读取到的内容转换成
int
类型 - 把转换后的数据进行相加
- 输出最后的一个累加和
问题二:10000 个文件,每个文件 2t,文件里的内容依然是每行一个数值,求这一堆文件的所有数值的和
思路与方案:
- 使用单进程的程序执行,即一行一行读取(串行),可能会计算出结果,但效率很低,且大概率是算不出来
- 将串行改为并行,即分布式运算:
- 第一阶段:先把大的任务切分成小的任务,然后将集群中的每个节点都可以对这些小任务进行计算
- 第二阶段:将之前中间性的结果进行汇总
问题三:问题二中的 10000 个 2t 的文件应该怎么分布才能让这 10000 个任务的执行效率达到最高?
思路:
- 如果集群有 10000 个节点,每个节点都放了一个文件,然后对每个节点上的数据启动计算引擎进行任务的计算,这样效率高
- 计算在 a 节点,存储的数据在 b 节点,这样效率不高;计算和存储在同一个节点效率高。因为数据传输肯定有延迟,从而降低效率
问题四:数据的处理(存储和计算)是这么设计的?
答:存储和计算相互依赖。在涉及存储时必须考虑计算,反之相同
存储:hdfs;计算:mapreduce
hdfs 设计思想:把存入到 hdfs 集群的数据均匀分散的存储到整个集群中
说明:集群的配置是去全局的
案例1: 100g 数据分多少集群节点存储的比较
- 都是 100g 数据,假设 1g 的数据需要 1秒 的运算时间
序号 | 集群节点数 | 切分存储块的大小 | 存储方式 | 运算所需时间(秒) |
---|---|---|---|---|
1 | 100 | 1g | 每个节点 1g 数据量 | 1 |
2 | 90 | 1g | 10 台存 2g,80 台存 1g | 2 |
3 | 90 | 512m | 20 台存 1.5g,70 台存 1g | 1.5 |
- 上述案例得出:切分的块是不是越小越好?但有弊端:小文件很多时,会有问题
案例2: 大文件 access.log 100g 的切分方法
- 第一种切分法:block0 50g + block1 50g
- 第二种切分法:block0 20g + block1 20g + block2 20g + block3 20g + block4 20g
-
对于用户来说,一个文件是完整的存储到 hdfs 进来的,所以用户再去下载该文件时要的是完整的文件整体,要把所有的块合并起来且顺序不能错。块越少拼接越容易
-
上述案例得出:切分的块是不是越大越好?
总结:不大不小最好。不大不小:hdfs 在设计时考虑到不同的应用场景,在每个不同的应用场景中可能需要的块的大小不一样,可以自己配置。
hdfs 块的默认大小为:
- hadoop2.x 版本以前,默认块的大小:64m
- hadoop2.x 版本(含)以后,默认块的大小:128m
让大数据能够存储到 hdfs 集群,并考虑计算的效率问题,让文件切分存储,并让这些块均匀分散的存储到整个集群中
hdfs 集群存储的使用场景:
- 数据量特别多
- 前期数据量不大,后期数据量快速增长,可能导致数据量快速增多
hdfs 集群理论上可无限制的增加节点,但有上限:
- hdfs 集群是主从架构,主节点 namenode
- 加的机器的性能一般(数据安全)
问题五:hdfs 如何保障数据安全?
解决:配置多份
多份数据分布的原则:
- 数据备份的数量由用户指定
- 如果一个文件存储多份,这多份数据完全没必要存储在一个节点上
小问题: 若集群有 3 个存储节点,但用户指定存储 4 份,则 hdfs 上最终有几份数据?3 份
结论:hdfs 集群中的任何一个节点,肯定没有完全相同的两份数据
问题六:hdfs 核心思想:分而治之,冗余备份
- 分散存储: 一个大的文件要存储,必须要借助分布式的存储系统,将大文件进行 分而治之(分治)
- 冗余备份: 整个 hdfs 集群架设在不是特别牢靠的服务器上,所以要保证数据安全。采用 副本 的策略,针对用户上传的整个文件,将该文件切分出来的多个块备份多份
冗余备份的默认值:3 份。备份数量的配置文件路径:
/software/hadoop/etc/hadoop/hdfs-site.xml 更改后重启服务生效
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
知识点1:如果节点机器性能有差异怎么均匀分散?
数据节点机器性能差异不是特别多,若某一台机器的性能比较差,可设置该机器少存一些数据。设置:
hadoop fs -setrep [-r] [-w] <numpeplicas> <path>
知识点2:block 块的大小设置多少?
默认 128m,实际生产最多 256m。若不懂就按照默认的,大部分都是按默认的
知识点3:hdfs 集群节点很多会导致什么情况
元数据信息 fsimage 很多,加载到内存中的时间越来越长
datanode 节点多,节点保存的数据块的个数也多
知识点4:跨网络肯定有数据延迟和丢失问题
知识点5:hdfs 不适合存储小文件
原因:存储一亿个小文件,大小仅仅 1t,但要消耗 20g 左右的内存
- 文件存储在硬盘上,存储文件元信息(比如文件的创建者、文件的创建日期和文件的大小等)的区域叫 inode(中文译名为 “索引节点” ),inode 是有限的。当有成千上万个小文件存储于服务器的文件系统中时,最先消耗完的肯定不是磁盘的空间,而是 inode,这会导致大量空闲磁盘的空间无法使用。小文件带来的问题归根结底是由于其小且数量巨大
解决方案:对小文件进行合并,或对小文件提前做处理
- 将一定数量的小文件合并为一个个的大文件,并且只存储合并后的大文件,那存储系统中的文件数量就会大大减少,通过一定方式再从合并后的大文件中分离出小文件,按需获取想要的数据即可
知识点:文件存储在硬盘上,硬盘的最小存储单位叫做 “扇区”(sector)。每个扇区存储512字节(相当于0.5kb)。操作系统读取硬盘时,不会一个个扇区的读取,这样效率太低,而是一次性连续读取多个扇区,即一次性读取一个 “块”(block)。这种由多个扇区组成的 “块”,是文件存储的最小单位。“块” 的大小,最常见的是 4kb,即连续 8 个 sector 组成一个 block。文件数据都存储在 “块” 中,那么很显然,我们还必须找到一个地方存储文件的元信息,比如文件的创建者、文件的创建日期、文件的大小等等。这种存储文件元信息的区域就叫做 inode。
2. hdfs 概述
hdfs 是大数据存储的基础,几乎所有的大数据分布式存储需求都会使用到。
1. hdfs 设计思路
hdfs 被设计成使用低廉的服务器进行海量数据的存储,如何做到?分散存储
- 大文件被切割成小文件,使用分而治之的思想对同一个文件进行管理
- 每个切分后的块都进行冗余备份,高可用不丢失
2. hdfs 架构
主从架构。下边三个节点的架构是最基础的,高可用会有 standbynamenode,用于防止 namenode 宕机。
- namenode 主节点:掌管文件系统的目录树,处理客户端的请求,保存元数据信息
- datanode 从节点:存储实际数据,处理真正的读写
- secondarynamenode(单机/伪分布式/分布式):分担 namenode 的压力,协助合并元数据信息
3. hdfs 优缺点
优点:
- 可构建在廉价机器上,通过多个副本来提高可靠性,文件切分多个块进行存储
- 高容错性。数据可自动保存多个副本,副本丢失后可自动恢复
- 适合批处理。移动计算比移动数据更方便
- 流式文件访问。一次写入,多次读取。可以保证数据一致性
缺点:(不适合以下操作)
- 要求高的数据访问。比如毫秒级
- 小文件存储。寻道时间超过读取时间
- 并发写入,文件随机修改。一个文件只能有一个写,仅支持追加写入
3. hdfs 操作
1. hdfs 的 shell 操作【重点】
命令 | 功能 | 举例 |
---|---|---|
hadoop fs hdfs dfs | 两种方式操作 hdfs 文件的命令前缀 | |
-help | 输出这个命令参数手册 | hadoop fs -help |
-ls | 显示目录信息 | hadoop fs -ls hdfs://ip:9000/ hadoop fs -ls / |
-put | 本地文件上传至 hdfs | 把当前目录下的 a.txt 上传到 hdfs:hadoop fs -put a.txt /hdfspath |
-get | 从 hdfs 下载文件到本地 | hadoop fs -get /a.txt localpath |
-cp | 从 hdfs 的一个路径拷贝到另一个路径 | 把 /a.txt 拷贝到 /aa 下,并更名为 a2.txthadoop fs -cp /a.txt /aa/a2.txt |
-mv | 在 hdfs 目录中移动文件 | hadoop fs -mv /a.txt /aa |
-mkdir | 创建文件夹 | hadoop fs -mkdir /b |
-rm | 删除文件或文件夹 | hadoop fs -rm -r /aa/bb |
-rmdir | 删除空目录 | hadoop fs -rmdir /aa/bb |
-movefromlocal | 从本地剪切到 hdfs | hadoop fs -movefromlocal /home/a.txt /aa/bb |
-movetolocal | 从 hdfs 剪切到本地 | hadoop fs -movetolocal /aa/bb/a.txt /home |
-copyfromlocal | 从本地文件系统中拷贝文件到 hdfs | hadoop fs -copyfromlocal ./a.txt /aa |
-copytolocal | 从 hdfs 拷贝到本地 | hadoop fs -copytolocal /a.txt . |
-appendtofile | 追加一个文件到已经存在的文件末尾 | hadoop fs -appendtofile ./a.txt /a.txt |
-cat | 显示文件内容 | hadoop fs -cat /aa/a.txt |
-tail | 显示一个文件的末尾 | hadoop fs -tail /aa/a.txt |
-text | 以字符形式打印一个文件的内容 | hadoop fs -text /aa/a.txt |
-chmod | 与 linux 文件系统的用法一样,对文件设置权限 | hadoop fs -chmod 666 /aa/a.txt |
-df | 统计文件夹的大小信息 | hadoop fs -df -sh /aa/* |
-count | 统计一个指定目录下的文件节点数量 | hadoop fs -count /aa |
-setrep | 设置 hdfs 中文本的副本数量 | hadoop fs -setrep 3 /aa/a.txt |
hdfs dfsadmin -report | 查看 hdfs 集群工作状态 | live datanodes (2) 说明有两台是正常运行的数据节点 |
2. hdfs 的 api 操作
hdfs 的 api 操作所需的 maven
依赖导入 pom.xml
文件的 <artifactid>module_name</artifactid>
和 </project>
之间,并等待下载完成
<dependencies>
<dependency>
<groupid>org.apache.hadoop</groupid>
<artifactid>hadoop-common</artifactid>
<version>2.7.4</version>
</dependency>
<dependency>
<groupid>org.apache.hadoop</groupid>
<artifactid>hadoop-client</artifactid>
<version>2.7.4</version>
</dependency>
<dependency>
<groupid>org.apache.hadoop</groupid>
<artifactid>hadoop-hdfs</artifactid>
<version>2.7.4</version>
</dependency>
<dependency>
<groupid>org.apache.hadoop</groupid>
<artifactid>hadoop-mapreduce-client-core</artifactid>
<version>2.7.4</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupid>org.apache.maven.plugins</groupid>
<artifactid>maven-compiler-plugin</artifactid>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>utf-8</encoding>
<!-- <verbal>true</verbal>-->
</configuration>
</plugin>
<plugin>
<groupid>org.apache.maven.plugins</groupid>
<artifactid>maven-shade-plugin</artifactid>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<minimizejar>true</minimizejar>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<properties>
<maven.compiler.source>16</maven.compiler.source>
<maven.compiler.target>16</maven.compiler.target>
</properties>
1. 访问数据
1. 获取 filesystem
// filesystem.get()
import org.apache.hadoop.conf.configuration;
import org.apache.hadoop.fs.filesystem;
import java.io.ioexception;
public class hdfs01getfilesystem {
public static void main(string[] args) throws ioexception {
// 1. 创建configuration对象
configuration conf = new configuration();
// 2. 设置文件系统类型
// 第二个参数是访问域名,做过域名解析可设置成 hdfs://hadoop0:8020
conf.set("fs.defaultfs", "hdfs://hadoop0:8020");
// 3. 获取指定文件系统
filesystem filesystem = filesystem.get(conf);
// 4. 打印输出
system.out.println(filesystem);
}}
- 执行上述代码返回下图所示结果即成功:
2. 文件的遍历
// filesystem.listfiles() + for循环
import org.apache.hadoop.conf.configuration;
import org.apache.hadoop.fs.*;
import java.io.ioexception;
import java.net.uri;
import java.net.urisyntaxexception;
public class traversefile {
public static void main(string[] args) throws ioexception, urisyntaxexception, interruptedexception {
// 1. 获取filesystem,默认端口8020
filesystem filesystem = filesystem.get(new uri("hdfs://hadoop0:8020"), new configuration(), "root");
// 2. 调用 listfile()方法 获取根目录下所有的文件信息
remoteiterator<locatedfilestatus> iterator = filesystem.listfiles(new path("/"), true);
// 3. 遍历迭代器
while (iterator.hasnext()) {
locatedfilestatus filestatus = iterator.next();
// 获取文件的绝对路径:hdfs://172.16.15.100/xxx
system.out.println(filestatus.getpath() + "===" + filestatus.getpath().getname());
// 文件的block信息
blocklocation[] blocklocations = filestatus.getblocklocations();
for (blocklocation blocklocation : blocklocations) {
string[] hosts = blocklocation.gethosts();
for (string host : hosts) {
system.out.println("主机为:" + host);
}}
system.out.println("block数量为:" + blocklocations.length);
}}}
- 输出结果:
hdfs://hadoop0:8020/0320/data.txt===data.txt
主机为hadoop1
主机为hadoop2
block数量为:1
hdfs://hadoop0:8020/0320/merge.txt===merge.txt
主机为hadoop2
主机为hadoop1
block数量为:1
...
3. 创建文件夹
// filesystem.mkdirs()
import org.apache.hadoop.conf.configuration;
import org.apache.hadoop.fs.filesystem;
import org.apache.hadoop.fs.path;
import java.io.ioexception;
import java.net.uri;
import java.net.urisyntaxexception;
public class hdfs03createfolder {
public static void main(string[] args) throws ioexception, urisyntaxexception, interruptedexception {
// 1. 获取filesystem
filesystem filesystem = filesystem.get(new uri("hdfs://hadoop0:8020"), new configuration(), "root");
// 2. 创建文件夹
filesystem.mkdirs(new path("/0320"));
// 3. 关闭filesystem
filesystem.close();
}}
- 执行结果:
4. 文件的上传
// filesystem.copyfromlocalfile()
import org.apache.hadoop.conf.configuration;
import org.apache.hadoop.fs.filesystem;
import org.apache.hadoop.fs.path;
import java.io.ioexception;
import java.net.uri;
import java.net.urisyntaxexception;
public class hdfs04fileupload {
public static void main(string[] args) throws interruptedexception, ioexception, urisyntaxexception {
hdfs04fileupload fileupload = new hdfs04fileupload();
fileupload.fileupload();
}
/* 定义上传文件的方法 */
public void fileupload() throws urisyntaxexception, ioexception, interruptedexception {
// 1. 获取文件系统
filesystem filesystem = filesystem.get(new uri("hdfs://hadoop0:8020"), new configuration(), "root");
// 2. 上传文件
filesystem.copyfromlocalfile(new path("/users/jason93/desktop/bigdata/file/data.txt"), new path("/0320"));
// 3. 关闭filesystem
filesystem.close();
}}
- 执行结果:
5. 文件的下载
// ioutils.copy()
import org.apache.commons.io.ioutils;
import org.apache.hadoop.conf.configuration;
import org.apache.hadoop.fs.fsdatainputstream;
import org.apache.hadoop.fs.filesystem;
import org.apache.hadoop.fs.path;
import java.io.fileoutputstream;
import java.io.ioexception;
import java.net.uri;
import java.net.urisyntaxexception;
public class hdfs05filedownload {
public static void main(string[] args) throws ioexception, urisyntaxexception, interruptedexception {
// 1. 获取filesystem
filesystem filesystem = filesystem.get(new uri("hdfs://hadoop0:8020"), new configuration(), "root");
// 2. 获取hdfs的输入流
fsdatainputstream inputstream = filesystem.open(new path("/0320/data.txt"));
// 3. 获取本地文件的输出流
fileoutputstream outputstream = new fileoutputstream("/users/jason93/desktop/bigdata/file/hdfs/datadown.txt");
// 4. 文件的拷贝
ioutils.copy(inputstream, outputstream);
// 5. 关闭流
ioutils.closequietly(inputstream);
ioutils.closequietly(outputstream);
filesystem.close();
}}
- 运行结果:(其文件内容与 data.txt 一样)
2. 合并小文件
1. 合并小文件上传
- 首先准备几个小文件
# /users/jason93/desktop/bigdata/file/hdfs/merge/
# data1.txt
hello,world
# data2.txt
hello,hadoop
# data3.txt
hello,hdfs
- 代码:
// ioutils.copy()
import org.apache.commons.io.ioutils;
import org.apache.hadoop.conf.configuration;
import org.apache.hadoop.fs.*;
import java.io.ioexception;
import java.net.uri;
import java.net.urisyntaxexception;
public class hdfs06mergefileupload {
public static void main(string[] args) throws urisyntaxexception, ioexception, interruptedexception {
// 1. 获取filesystem
filesystem filesystem =
filesystem.get(new uri("hdfs://hadoop0:8020"), new configuration(), "root");
// 2. 获取hdfs大文件的输出流
fsdataoutputstream fsdataoutputstream =
filesystem.create(new path("/0320/hdfs/merge.txt"));
// 3. 获取一个本地文件系统
localfilesystem localfilesystem = filesystem.getlocal(new configuration());
// 4. 获取本地文件夹下所有文件的详情
filestatus[] filestatuses = localfilesystem.liststatus(new path("/users/jason93/desktop/bigdata/file/hdfs/merge"));
// 5. 遍历每个文件,获取每个文件的输入流
for (filestatus filestatus : filestatuses) {
fsdatainputstream fsdatainputstream = localfilesystem.open(filestatus.getpath());
// 6. 将小文件的内容复制到大文件
ioutils.copy(fsdatainputstream, fsdataoutputstream);
ioutils.closequietly(fsdatainputstream);
}
// 7. 关闭流
ioutils.closequietly(fsdataoutputstream);
localfilesystem.close();
filesystem.close();
}}
- 运行后看结果
2. 合并小文件下载
方式一:通过命令行方式
-
将 hdfs 的
/0320/hdfs/
下的三个文件合并下载到本地 -
说明:若本地该文件不存在则创建写入,若存在则覆盖文件的原内容
# 合并指定目录下的所有文件
hadoop fs -getmerge /0320/hdfs/* /home/data/hdfs/mergedown.txt
# 合并目录下的指定文件也可以(相对路径)
hadoop fs -getmerge /0320/hdfs/data1.txt /0320/hdfs/data3.txt mergedown13.txt
# 查看结果:
[root@hadoop0 hdfs]# ls
mergedown13.txt mergedown.txt merge.txt
[root@hadoop0 hdfs]# cat mergedown.txt
hello,world
hello,hadoop
hello,hdfs
[root@hadoop0 hdfs]# cat mergedown13.txt
hello,world
hello,hdfs
方式二:通过 java api方式
// ioutils.copy()
import org.apache.commons.io.ioutils;
import org.apache.hadoop.conf.configuration;
import org.apache.hadoop.fs.*;
import java.io.ioexception;
import java.net.uri;
import java.net.urisyntaxexception;
public class hdfs07mergefiledownload {
public static void main(string[] args) throws urisyntaxexception, ioexception, interruptedexception {
// 1. 获取filesystem
filesystem filesystem = filesystem.get(new uri("hdfs://hadoop0:8020"), new configuration(), "root");
// 2. 获取一个本地文件系统
localfilesystem localfilesystem = filesystem.getlocal(new configuration());
// 3. 获取本地大文件的输出流
fsdataoutputstream outputstream = localfilesystem.create(new path("/users/jason93/desktop/bigdata/file/hdfs/mergedown.txt"), true);
// 4. 获取hdfs下的所有小文件
remoteiterator<locatedfilestatus> listfiles = filesystem.listfiles(new path("/0320/hdfs"), true);
// 5. 遍历
while (listfiles.hasnext()) {
locatedfilestatus locatedfilestatus = listfiles.next();
fsdatainputstream inputstream = filesystem.open(locatedfilestatus.getpath());
// 6. 将小文件复制到大文件中
ioutils.copy(inputstream, outputstream);
ioutils.closequietly(inputstream);
}
// 7. 关闭流
ioutils.closequietly(outputstream);
localfilesystem.close();
filesystem.close();
}}
- 运行结果:
4. hdfs 原理
1. hdfs 的启动流程【重要】
- 先启动 namenode 进程
- 加载 namenode 文件夹中存储的磁盘的元数据信息(fsimage + edits_inprogress)
- namenode 在启动完毕后,会在 namenode 节点启动一个服务,该服务会等待所有 datanode 上线后汇报块信息
- datanode 一旦上线,就会通过心跳机制把自身所持有的所有块信息汇报给 namenode
- 只有 namenode 等到了所有的 datanode 的上线以及把所有的块信息都汇报完毕后,最后 namenode 才能得知:当前集群中所有文件的所有块的副本的分布。这样才代表 namenode 正常启动
2. datanode 页面信息介绍
信息说明:
- node:地址和端口
- last contact:最近通讯时间(正常是 0/1/2 的间隔,若不正常则为最后一次的通讯时间)
- admin state:管理员状态
- capacity:hdfs 容量
- used:已使用容量
- non dfs used:非 hdfs 使用容量
- remaining:剩余容量
- blocks:块容量
- block pool used:块使用占比
- failed volumes:失败卷的个数,确定当前数据节点停止服务允许卷出错的服务,0 代表任何卷出错都停止服务
- version:版本
3. hdfs 的读写剖析
1. hdfs 读数据流程
- 客户端把要读取的文件路径发送给 namenode
- namenode 获取文件的元信息(主要信息是 block 块的存放位置)返回给客户端
- namenode 根据 block 块所在节点与客户端的距离判断返回哪个节点,哪个节点离客户端最近就返回哪个
- 客户端根据返回的信息找到相应 datanode,然后逐个获取文件的 block,并在客户端本地进行数据追加合并,从而获得整个文件
异常情况: hdfs 在读取文件时,如果其中一个块突然坏掉了怎么办?
- 客户端在 datanode 上读取完后,会对读取到的数据进行 checksum 验证(该验证就是将读取到数据和 hdfs 块的元数据进行校验),如果校验过程中发现了错误,则说明该 datanode 读取的数据不完整,可能这个 datanode 坏掉了,这时客户端会跟 namenode 通讯,告诉它存在异常的 datanode,而客户端可以从拥有该 block 备份的其他 datanode 上重新读取文件
- 当 datanode 确认数据异常后,将会启动异步删除,并同时告诉 namenode 更新元数据信息,若没有其余副本,则需通过 secondarynamenode 进行数据恢复
2. hdfs 写数据流程
具体步骤:
-
client 发送写数据请求
-
namenode 响应请求,然后做一系列校验,如果能上传该数据则返回该文件的所有切块应该被存放在哪些 datanode 上的 datanode 列表
block-001: hadoop2 hadoop3 block-002: hadoop3 hadoop4
-
client 拿到 datanode 列表后,开始传数据
-
首先传第一个block-001,datanode 列表就是 hadoop2 和 hadoop3,client 就把 block-001 传到 hadoop2 和 hadoop3 上
-
以此类推,用传第一个数据块的方式传其他的数据
-
当所有的数据块都传完后,client 会给 namenode 返回一个状态信息,表示数据已全部写入成功,或者失败
-
namenode 接收到 client 返回的状态信息来判断当次写入数据的请求是否成功,若成功则更新元数据信息
异常情况:
场景一:hdfs 在上传文件时,若其中一个 datanode 突然挂掉了怎么办?
- 客户端上传文件时与 datanode 建立 pipeline 管道,管道正向是客户端向 datanode 发送的数据包,管道反向是 datanode 向客户端发送 ack 确认,也就是正确接收到数据包后发送一个已确认接收到的应答
- 当 datanode 突然挂掉了,客户端接收不到该 datanode 发送的 ack 确认,此时不会立刻终止写入(如果立刻终止,易用性和可用性都太不友好),客户端会通知 namenode,namenode 检查该块的副本与规定的不符,会通知其他 datanode 去复制副本,并将挂掉的 datanode 作下线处理,不再让它参与文件上传与下载,该过程称为 pipeline recovery
场景二:hdfs 向 datanode 写入数据失败怎么办?(上传 100mb 的文件,上传到 50mb,突然断了,或 block 由于网络等原因异常了,hdfs 会怎么处理?)
- pipeline 数据流管道会被关闭,ack 队列中的 packets 会被添加到数据队列的前面以确保数据包不丢失
- 在已正常存储 block 块的 datanode 的 blockid 版本会更新(升级),这样发生故障的 datanode 节点上的 block 数据会在节点恢复正常后被删除,失效节点也会从 pipeline 中删除
- 剩下的数据会被写入到 pipeline 数据流管道中的其他节点上
5. hdfs 三大机制(核心设计)
hdfs 三大核心机制:心跳机制、安全模式、副本存放策略
1. hdfs 心跳机制【重要】
hadoop 是 master/slave 架构,master 中有 namenode 和 resourcemanager,slave 中有 datanode 和 nodemanager。
【心跳机制】:datanode 每隔一段时间(默认 3 秒)就会跟 namenode 取得一次联系,从而证明自己还活着,让 namenode 能够识别到当前集群中有多少存活的节点。
- 详细点:master 启动时会启动一个 ipc(inter-process communication,进程间通信)server 服务,等待 slave 连接;slave 启动时会主动连接 master 的 ipc server 服务,并且每隔 3 秒连接一次 master,这个每隔一段时间去连接一次的机制称为心跳。slave 通过心跳给 master 汇报自己的信息,master 也通过心跳给 slave 下达命令。namenode 通过心跳得知 datanode 的状态,resourcemanager 通过心跳得知 nodemanager 的状态。如果 master 长时间都没收到 slave 的心跳,就认为该 slave 挂掉了。
namenode 判断 datanode 是否宕机需要一个标准:超时
timeout(超时时长) = 10 * 心跳时长(3秒) + 2 * 检测心跳是否正常工作的间隔(5分钟)
- 即 10 * 3 + 2 * 5 * 60 = 630 秒
超时时间可在 hdfs-site.xml 文件中配置 dfs.heartbeat.interval
参数,或使用 zookeeper 做一个监控,有节点宕机可迅速感知。
心跳机制分两个方面:
- 命令:namenode 给 datanode 发
- 汇报:datanode 给 namenode 发
心跳机制作用:
- 让 namenode 能够识别当前各个 datanode 的状态
- datanode 向 namenode 传送 心跳数据包
心跳数据包:
- 该节点自身状态:磁盘使用量、block 块的数量、block 块的状态
- 该 datanode 节点保存的所有 block 块的信息
- block 块的信息在 linux 系统的文件位置:
2. hdfs 安全模式【重要】
在正常的启动范围内,hdfs 集群会进入安全模式,无法对外提供服务。安全模式下,客户端不能对任何数据进行操作,只能查看元数据信息。
1. 进入安全模式
进入安全模式的场景:
- 大概率是因为集群出现问题时进入安全模式
- 当 hdfs 集群中部分 datanode 节点宕机后,hdfs 启动服务做恢复
- 当丢失数据的比例超过
0.1%
时会进入安全模式- 丢失率 可手动配置
- 默认是:
dfs.safemode.threshold.pct=0.999f
- 新版本的配置是:
dfs.namenode.safemode.threshold-pct=0.999f
- 默认是:
- 丢失率 可手动配置
若要强制对外提供服务,可使用hdfs命令操作:
hdfs dfsadmin -safemode leave # 退出安全模式
hdfs dfsadmin -safemode enter # 进入安全模式
hdfs dfsadmin -safemode get # 获取安全模式状态
hdfs dfsadmin -safemode wait # 等待
2. 退出安全模式
hdfs dfsadmin -safemode leave
说明:
- 找到集群的问题进行修复(比如修复宕机的 datanode),修复好了会自动退出安全模式
- 手动强行退出安全模式,并没有真正解决数据丢失的问题
3. 副本存放策略
决定一个数据块的那几个副本(默认是 3)到底该存储到哪些服务器上
原则:
- 任意一个节点上不可能存储两个一样的副本块
- 如果一个数据块要保存完整的 3 个副本块,则至少有 3 个节点
副本存放策略:
策略:
- 第一个副本块选取和客户端相同的节点
- 第二个副本块选取跟第一个副本块存储节点相邻的机架(rack)上面的任意一个节点
- 第三个副本块存放在和第二个副本块所在机架不同的节点上
策略是一个参考,不是硬性标准。所以实际选取存储空间大、不忙的节点
**方法:**将每个文件的数据分块存储,每一个数据块又保存多个副本,这些数据块副本分布在不同的机器节点上
作用:数据分块存储和副本存放,是保证可靠性和高性能的关键
6. hdfs 三大组件
重点:组件的职责、元数据
1. namenode 主节点
1. 职责
- 维护元数据(查询、修改)
- 响应客户端的读写数据请求
- 配置副本存放策略
- 管理集群数据库负载均衡问题
2. 元数据
如何管理元数据? 使用 wal(write-ahead logging)预写日志系统
wal:数据库中一种高效的日志算法,对于非内存数据库而言,磁盘 i/o 操作是数据库效率的一大瓶颈。在相同的数据量下,采用 wal 日志的数据库系统在事务提交时,磁盘写操作只有传统的回滚日志的一半左右,大大提高了数据库磁盘 i/o 操作的效率,从而提高了数据库的性能。
说明:mysql 实现了 wal,所有的事务操作都会记录日志,若某张表的数据丢失后,可根据该日志拿到对应数据,对表进行恢复
元数据信息的位置:${hadoop_home}/data/namenode/current/
。示例如下:
相关说明:
(1)edits_inprogress_000… 文件:它是时刻操作的文件,按一定时间或一定大小(不同版本有差异)分割为若干 edits_000… 文件
- edits 和 fsimage 的关系:操作性文件 edits_00…-000… 合并起来为镜像文件 fsimage_00… 。比如 fsimage_000…0013725,表示 edits_000…0013725 及之前所有的 edits_000xxx 文件合并后的文件;fsimage_000…0013727 表示 edits_000…0013727 及之前所有的 edits_000xxx 文件合并后的文件。至于什么时候合并,有个 checkpoint 检查点。后一个 fsimage 包含前一个 fsimage 和更新的 edits 文件,生成两个 fsimage 是起到 备份 的作用。若合并 edits_000…0013727 时失败,则在 hdfs 冷启动时加载 fsimage_000…0013726、edits_000…0013727 和 edits_inprogress_000… 三个文件即可;当然若合并 edits_000…0013727 成功了,则只加载 fsimage_000…0013727 和 edits_inprogress_000… 两个文件即可
- 通过生成一个可查看的 xml 文件查看 edits 和 fsimage 文件信息:
# edits 文件:
hdfs oev -i edits_0000000000000013664-0000000000000013665 -o edits.xml
cat edits.xml
# fsimage 文件:
hdfs oiv -i fsimage_0000000000000013725 -p xml -o fsimage.xml
cat fsimage.xml
(2)seen_txid:存放 edits_inprogress_00… 日志最新的 id(存放 transactionid 的文件),比如 edits_inprogress_00xxx0013728,则 (3)seen_txid 为13728。format 之后是 0
(4)version:存放 hdfs 集群的版本信息
(5)fsimage_000xxx.md5:校验性文件
namenode 元数据存储机制:
- 内存 中的元数据信息:metadata,内存中一份完整的元数据信息(目录树结构 + 文件块映射 + 数据库和 datanode 的映射)
- 目录树结构:文件地址的目录信息
- 文件块映射:文件切分成哪些块
- 磁盘 中的元数据镜像:fsimage 快照 + edits 编辑日志 + edits_inprogress(实时操作变化日志),在 namenode 的工作目录中
- 用于衔接内存 metadata 和持久化元数据镜像 fsimage 之间的操作日志(edits 文件)
- 当客户端对 hdfs 中的文件进行新增或修改时,操作记录首先被写入 edits 日志文件中,当客户端操作成功后,相应的元数据会更新到内存 metadata 中
元数据合并的好处【面试点】
- 大大缩小操作日志的大小
- 合并后的镜像磁盘文件可以被快速加载到内存中去。可以不用加载所有的操作性文件,只加载 fsimage 和 edits_inprogress 两个文件,有利于加快程序的冷启动
元数据的 checkpoint: 每隔一段时间,会有 secondarynamenode 将 namenode 上积累的所有 edits 和一个最新的 fsimage 下载到本地,并加载到内存中进行 merge(合并),该过程称为 checkpoint。
2. datanode 从节点
1. 职责
- 维护 namenode 分配给它的 block 块(存储管理用户的文件块数据)
- 通过心跳机制汇报自身所有的块信息给 namenode
- 真正的提供读写数据
数据块的两个参数: 块的大小、副本的个数
data 数据的存放目录:
${hadoop_home}/data/datanode/current/bp-1365453085-172.16.15.103-1646548673937/current/finalized/subdir0/
2. datanode 上下线
例1:一个集群有 500 个节点,现增加 10 个节点。hdfs 如何表现?
- 新增加的 datanode 启动后,会按照配置文件寻找 hdfs 集群的 namenode 进行汇报
- 新上线的 datanode 没有任何数据块的信息,只有自身的状态信息
- 原来的 datanode 和新加的 datanode 之间存在数据倾斜的问题
解决数据倾斜的方法:负载均衡
- 负载均衡类型:服务器之间的负载均衡、磁盘之间的负载均衡
- 说明:启动负载均衡需要手工启动一个 start-balance 的进程
- 负载均衡举例:比如一个节点 4 个磁盘,每个盘 2t,该节点存储了 1t 的数据,若该 1t 的数据都在第一个磁盘上,就意味着其他 3 个磁盘没用到,这时最好做负载均衡:每个磁盘 256g
例2:一个集群 500 个节点,现减少 10 个节点,这 10 个节点上的数据块信息丢失。hdfs 如何表现?
- hdfs 集群会利用自身的恢复机制恢复到原来副本块的个数
知识点:
- 下线节点,在被动情况下,某个块的所有副本所在节点都宕机了,怎么处理?
- 若之前做过
异地灾备
,可以从异地机房做数据恢复 - 若之前没做过异地灾备,那数据就丢失了
- 若之前做过
- 假设一个节点异常,数据被负载到其他节点上了,后来该节点又恢复了,那数据会重新分配吗?
- 某个数据所在的节点异常,在一个时间间隔(630秒)之内数据不会进行恢复;超过该时间后,若原来是 3 个数据块副本,现在是 2 个,则启动恢复模式恢复成 3 个数据块副本。原节点恢复过来后,会在一个时间间隔后向 namenode 汇报,namenode 检测到该数据块副本已经正常了,则恢复过来的节点就不起作用 了
3. secondarynamenode
**职责:**分担 namenode 合并元数据信息(镜像文件和操作日志)的压力
**注意:**secondarynamenode 不要和 namenode 配置在一个节点上
说明:
- secondarynamenode 并不是 namenode 的热备份,所以当 namenode 挂掉后不能代替 namenode 工作,对外提供服务
- secondarynamenode 只在单机模式、伪分布模式和分布式模式中使用,在高可用、联邦集群中由 standbynamenode 取代,所以 secondarynamenode 与 standbynamenode 是互斥的关系,二者存且仅存一个
- secondarynamenode 只能帮助 namenode 恢复部分数据,因为当 secondarynamenode 接收到 namenode 的编辑日志 edits 和镜像文件 fsimage 之后,namenode 之中的操作还会记录到它本身的编辑日志(edits)中,不会同步到 secondarynamenode,所以,secondarynamenode 只有 checkpoint 之前的数据,只能恢复部分的数据,如果 namenode 将 checkpoint 之后的数据丢失则无法恢复
工作机制:
- secondarynamenode 向 namenode 发出请求,看 namenode 是否需要进行 checkpoint 活动
- namenode 返回自己是否需要 checkpoint 活动的结果,若需要则继续,若不需要就没有后续了
- secondarynamenode 在接收到 namenode 需要进行 checkpoint 的请求后,会向 namenode 发起 checkpoint 请求
- namenode 接收到请求后,对编辑日志(edits)进行回滚,然后将编辑日志(edits)和镜像文件(fsimage)拷贝到 secondarynamenode 中
- secondarynamenode 将 namenode 拷贝过来的 fsimage 和 edits 加载到内存中进行合并,生成新的 fsimage.chkpoint
- secondarynamenode 将 fsimage.chkpoint 拷贝到 namenode 中
- namenode 将 fsimage.chkpoint 重新命名为 fsimage
7. hdfs 的高可用和联邦【重点】
1. 高可用(ha)
集群要对外提供服务,首先要保证 namenode 正常,不能宕机。因为企业一般都 7*24 小时不间断提供服务。保证 namenode 实时提供服务而不宕机的机制:ha(high available)高可用
spof(single point of failure)单点故障,是主从架构存在的通性问题
单点故障具体解决方案:做备份
为防止 active 的 namenode 宕机,在旁边准备一台 standby 节点。假设 active 的 namenode 节点是 hadoop0,standby 的 namenode 节点是 hadoop4,若 hadoop0 宕机了,hadoop4 会代替它运行。
hdfs 高可用功能,用配置过 active/standby 两个 namenode 实现在集群中对 namenode 的热备份来解决 namenode 机器宕机或软硬件升级导致集群无法使用的问题。
元数据信息在 namenode 节点 hadoop0 中,当它宕机后,hadoop4 要迅速取代 hadoop0,也就意味着 hadoop4 和 hadoop0 要存储一模一样的元数据信息,即 hadoop4 是 hadoop0 的一个热备份。【重要】
不管 active 节点做了什么操作,standby 节点都要 时刻保持同步。
保持同步的方法:创建 journalnode 集群,namenode(active)写入该集群,namenode(standby)从该集群中读取。journalnode 集群的各个节点跟 zoopeeper 集群类似,每个节点都有可能成为主节点,因此不存在单点故障。至于区分 active 和 standby,由 zookeeper 集群的文件目录树决定。该目录树是一个 lock
,两个 namenode 谁先抢到谁就是 active。
为了保险起见,设置多个 standby 是否可以?可以,但有条件,也不建议特别多,个位数。
- 条件:hadoop2.x 版本中不行,一个 active 只能对应一个 standby;hadoop3.x 版本中可以
多主多从:主节点是一个小集群,从节点也是一个集群(比如 kudu)
2. 联邦(federation)
元数据信息加载到内存中,有可能内存放不下,导致 内存受限。解决内存受限问题:联邦
hdfs federation:指 hdfs 集群可同时存在多个 namenode,包含多组 ha,每组 ha 中各 namenode 存储相同元数据,元数据分多份,均分到各组 ha 的 namenode中。
这种设计可解决单 namenode 存在的以下问题:
- hdfs 集群扩展性
- 性能更高效
- 良好的隔离性
hdfs federation方案:
发表评论