Hadoop

Hadoop介绍

Hadoop是什么

  1. Hadoop是一个由Apache基金会所开发的分布式系统基础架构
  2. 主要解决,海量数据的存储和海量数据的分析计算问题。
  3. 广义上来说,Hadoop通常是指一个更广泛的概念–Hadoop生态圈

Hadoop三大发行版本

Hadoop三大发行版本:Apache、Cloudera、Hortonworks。

  • Apache版本最原始(最基础)的版本,对于入门学习最好。2006
  • Cloudera内部集成了很多大数据框架,对应产品CDH。2008
  • Hortonworks文档较好,对应产品HDP。2011
    • Hortonworks现在已经被Cloudera公司收购,推出新的品牌CDP

Hadoop优势

  1. 高可靠性:Hadoop底层维护多个数据副本,所以即使Hadoop某个计算元素或存储出现故障,也不会导致数据的丢失。
  2. 高扩展性:在集群间分配任务数据,可方便的扩展数以干计的节点。
  3. 高效性:在MapReduce的思想下,Hadoop是并行工作的,以加快任务处理速度。
  4. 高容错性:能够自动将失败的任务重新分配。

Hadoop组成

1.X

  • MapReduce(计算+资源调度)
  • HDFS(数据存储)
  • Common(辅助工具)

2.X

  • MapReduce(计算)
  • Yarn(资源调度) - CPU和Free
  • HDFS(数据存储)
  • Common(辅助工具)

在Hadoop1.x时代,Hadoop中的MapReduce同时处理业务逻辑运算和资源的调度,耦合性较大。在Hadoop22.x时代,增
加了Yarn。Yarn只负责资源的调度,MapReduce只负责运算。

Hadoop3.x在组成上没有变化。

HDFS架构概述

Hadoop Distributed File System,简称HDFS,是一个分布式文件系统。

  1. NameNode(n):存储文件的元数据,如文件名文件目录结构文件属性(生成时间、副本数、文件权限),以及每个文件的块列表和块所在的DataNode等。
  2. DataNode(n):在本地文件系统存储文件块数据,以及块数据的校验和
  3. Secondary NameNode(2min):每隔一段时间对NameNode元数据备份

YARN架构概述

Yet Another Resource Negotiator简称YARN,另一种资源协调者,是Hadoop的资源管理器。

  1. ResourceManager(RM):整个集群资源(内存、CPU等)的老大

  2. NodeManager(NM):单个节点服务器资源老大

  3. ApplicationMaster(AM):单个任务运行的老大

  4. Container:容器,相当一台独立的服务器,里面封装了任务运行所需要的资源,如内存、CPU、磁盘、网络等

    说明1:客户端可以有多个

    说明2:集群上可以运行多个ApplicationMaster

    说明3:每个NodeManager上可以有多个Container

MapReduce架构概述

MapReduce将计算过程分为两个阶段:Map和Reduces

Map阶段并行处理输入数据

Reduce阶段对Map结果进行汇总

HDFS、YARN、MapReduce三者关系

HDFS、YARN、MapReduce三者关系

大数据技术生态体系

大数据技术生态体系

图中涉及的技术名词解释如下:

  1. Sqoop:Sqoop是一款开源的工具,主要用于在Hadoop、Hive与传统的数据库(MySQL)间进行数据的传递,可以将一个关系型数据库(例如:MySQL,Oracle等)中的数据导进到Hadoop的HDFS中,也可以将HDFS的数据导进到关系型数据库中。
  2. Flume:Flume是一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;
  3. Kafka:Kafka是一种高吞吐量的分布式发布订阅消息系统;
  4. Spark:Spark是当前最流行的开源大数据内存计算框架。可以基于Hadoop上存储的大数据进行计算。
  5. Flinkk:Flink是当前最流行的开源大数据内存计算框架。用于实时计算的场景较多。
  6. Oozie:Oozie是一个管理Hadoop作业(job)的工作流程调度管理系统。
  7. HBase:HBase是一个分布式的、面向列的开源数据库。HBase不同于一般的关系数据库,它是一个适合于非结构化数据存储的数据库。
  8. Hive:Hive是基于Hadoop的一个数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供简单的SQL查询功能,可以将SQL语句转换为MapReduce任务进行运行。其优点是学习成本低,可以通过类SQL语句快速实现简单的MapReduce统计,不必开发专门的MapReduce应用,十分适合数据仓库的统计分析。
  9. ZooKeeper:它是一个针对大型分布式系统的可靠协调系统,提供的功能包括:配置维护、名字服务、分布式同步、组服务等。

推荐系统架构图

推荐系统架构图

Hadoop运行环境搭建

模板虚拟机环境准备

  1. 安装模板虚拟机,IP地址192.168.10.100、主机名称hadoop100、内存4G、硬盘50G

  2. 安装epel-release
    注:Extra Packages for Enterprise Linux是为“红帽系”的操作系统提供额外的软件包,适用于RHEL、CentOS和Scientific Linux。相当于是一个软件仓库,大多数rpm包在官方repository中是找不到的)

    1
    yum install -y epel-release
  3. 注意:如果Linux安装的是最小系统版,还需要安装如下工具;如果安装的是Linux桌面标准版,不需要执行如下操作

    • net-tool:工具包集合,包含ip addr等命令

      1
      yum install -y net-tools
    • vim 编辑器

      1
      yum install -y vim
  4. 关闭防火墙,关闭防火墙开机自启

    1
    2
    3
    systemctl status firewalld.service # 查看当前防火墙是否开启
    systemctl stop firewalld.service # 关闭防火墙
    systemctl disable firewalld.service # 禁用防火墙

    注意:在企业开发时,通常单个服务器的防火墙时关闭的。公司整体对外会设置非常安全的防火墙

  5. 创建hadoop用户,并修改hadoop用户的密码为hadoop

    1
    2
    useradd hadoop
    passwd hadoop
  6. 配置hadoop用户具有root权限,方便后期加sudo执行root权限的命令

    1
    2
    3
    4
    vim /etc/sudoers

    # 在%wheel ALL=(ALL) ALL后面添加下面这段
    hadoop ALL=(ALL) NOPASSWD:ALL

    注意:hadoop这一行不要直接放到root行下面,因为所有用户都属于wheel组,你先配置了hadoop具有免密功能,但是程序执行到%wheel行时,该功能又被覆盖回需要密码。所以hadoop要放到%wheel这行下面。

  7. /opt目录下创建文件夹,并修改所属主和所属组

    1. 在/opt目录下创建module、software文件夹

      1
      sudo mkdir module software
    2. 修改所属组

      1
      sudo chown hadoop:hadoop /opt/module/ /opt/software/
  8. 御载虚拟机自带的JDK(最小化安装不需要)

    1
    rpm -ga | grep -i java | xargs -n1 rpm -e --nodeps
    • rpm -ga:查询所安装的所有rpm软件包
    • grep -i:忽略大小写
    • xags -n1:表示每次只传递一个参数
    • rpm -e --nodeps:强制卸载软件
  9. 重启虚拟机

    1
    reboot

Hadoop运行模式

  1. Hadoop官方网站:https://hadoop.apache.org
  2. Hadoop运行模式包括:本地模式伪分布式模式以及完全分布式模式
    • 本地模式:单机运行,只是用来演示一下官方案例。生产环境不用。
    • 伪分布式模式:也是单机运行,但是具备Hadoop集群的所有功能,一台服务器模拟一个分布式的环境。个别缺钱的公司用来测试,生产环境不用。
    • 完全分布式模式:多台服务器组成分布式环境。生产环境使用。

本地运行模式(官方WordCount)

  1. 创建在hadoop-3.3.5文件下面创建一个wcinput文件夹

    1
    mkdir wcinput
  2. 在wcinput文件下创建一个word.txt文件

    1
    cd wcinput
  3. 编辑word.txt文件

    1
    vim word.txt

    在文件中输入如下内容

    1
    2
    3
    4
    5
    ss ss
    cls cls
    banzhang
    bobo
    yangge
  4. 执行一下命令(/opt/module/hadoop-3.3.5下执行)

    1
    bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-3.3.5.jar wordcount wcinput/ ./wcoutput/
  5. 查看结果

    1
    2
    3
    4
    5
    6
    7
    cat wcoutput/part-r-00000 

    banzhang 1
    bobo 1
    cls 2
    ss 2
    yangge 1

完全分布式运行模式(开发重点)

虚拟机准备

  1. 准备3台客户机(关闭防火墙、静态IP、主机名称)

  2. 安装JDK

  3. 配置环境变量

    1
    2
    3
    4
    [hadoop@hadoop102 hadoop-3.3.5]$ cat /etc/profile.d/jdk.sh 
    # JAVA_HOME
    export JAVA_HOME=/opt/module/jdk1.8.0_361
    export PATH=$PATH:$JAVA_HOME/bin
  4. 安装Hadoop

  5. 配置环境变量

    1
    2
    3
    4
    5
    6
    [hadoop@hadoop102 hadoop-3.3.5]$ cat /etc/profile.d/hadoop.sh
    # hadoop
    export HADOOP_HOME=/opt/module/hadoop-3.3.5

    export PATH=$PATH:$HADOOP_HOME/bin
    export PATH=$PATH:$HADOOP_HOME/sbin
  6. 配置集群

  7. 单点启动

  8. 配置ssh

  9. 群起并测试集群

编写集群分发脚本xsync

scp(secure copy)安全拷贝

  1. scp定义

    scp可以实现服务器与服务器之间的数据拷贝。(from server1 to server2)

  2. 基本语法

    1
    2
      scp		-r 			$pdir/$fname					$user@$host:$pdir/$fname
    # 命令 递归 要拷贝的文件路径/名称 目的地用户@主机:目的地路径/名称
  3. 案例实操

    • 前提:在hadoop102、hadoop103、hadoop104都已经创建好的/opt/moduleopt/software两个目录,并且已经把这两个目录修改为hadoop:hadoop

      1
      sudo chown hadoop:hadoop -R /opt/module
    • hadoop102上,将hadoop102中/opt/module/jdk1.8.0_361目录拷贝到hadoop103上

      1
      scp -r /opt/module/jdk1.8.0_361 hadoop@hadoop103:/opt/module
    • hadoop103上,将hadoop102中/opt/module/hadoop-3.3.5目录拷贝到hadoop103上

      1
      scp -r hadoop@hadoop102:/opt/module/hadoop-3.3.5 /opt/module
    • hadoop103上,将hadoop102中/opt/module/*目录拷贝到hadoop104上

      1
      scp -r hadoop@hadoop102:/opt/module/* hadoop@hadoop104:/opt/module

rsync远程同步工具

rsync主要用于备份和镜像。具有速度快、避免复制相同内容和支持符号链接的优点。

rsync和scp区别:用rsync做文件的复制要比scp的速度快,rsync只对差异文件做更新,scp是把所有文件都复制过去。

注:命令不存在执行一下命令安装(三台机器都要安装)

1
sudo yum install -y rsync
  1. 基本语法

    1
    2
       rsync 		-av 			$pdir/$fname 						$user@$host:$pdir/$fname
    # 命令 选项参数 要拷贝的文件路径/名称 目的地用户@主机:目的地路径/名称

    选项参数说明

    选项 功能
    -a 归档拷贝
    -v 显示复制过程
  2. 案例实操

    • 删除hadoop103/opt/module/hadoop-3.3.5/wcinput

      1
      rm -rf /opt/module/hadoop-3.3.5/wcinput/  /opt/module/hadoop-3.3.5/wcoutput/ 
    • 同步hadoopl102中的/opt/module/hadoop-3.3.5hadoop103

      1
      rsync -av /opt/module/hadoop-3.3.5/ hadoop@hadoop103:/opt/module/hadoop-3.3.5/

xsync集群分发脚本

循环复制文件到所有节点的相同目录下

1
2
3
4
5
6
mkdir bin
cd bin/
vim xsync

# 赋予可执行权限
chmod +x xsync
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#!/bin/bash

#1.判断参数个数
if [ $# -lt 1 ]
then
echo Not Enough Arquement!
exit;
fi

#2.遍历集群所有机器
for host in hadoop102 hadoop103 hadoop104
do
echo ================= $host =================
#3.遍历所有目录,挨个发送
for file in $@
do
#4.判断文件是否存在
if -e $file
then
#5.获取父目录
pdir=$(cd -P $(dirname $file); pwd)

#6.获取当前文件的名称
fname=$(basename $file)
ssh $host "mkdir -p $pdir"
rsync -av $pdir/$fname $host:$pdir
else
echo $file does not exists!
fi
done
done

  1. 分发JDK与Hadoop环境变量

    1
    sudo ~/bin/xsync /etc/profile.d/jdk.sh /etc/profile.d/hadoop.sh
  2. 分别刷新三台机器的环境变量

    1
    source /etc/profile

SSH免密登录

  1. 来到hadoop102home目录, ls -al查看所有的隐藏文件,可以看到.ssh文件(访问过其他服务器才有)

    1
    2
    3
    4
    5
    cd ~/.ssh
    ssh-keygen -t rsa # 三次回车,可以看到有三个文件 id_rsa.pub 公钥 id_rsa 私钥
    ssh-copy-id hadoop103 # 将hadoop102的公钥拷贝到hadoop103
    ssh-copy-id hadoop104 # 将hadoop102的公钥拷贝到hadoop102
    ssh-copy-id hadoop102 # 允许自己访问 ssh hadoop102 不用输入密码
  2. hadoop103、hadoop104如上操作

  3. 查看三台机器/home/hadoop/.ssh/authorized_keys文件,可以发现三台机器都可以互相访问

  4. 切换root用户,配置root免密登录也大致如上操作

集群配置

  1. 集群部署规划
    注意:

    • NameNodeSecondaryNameNode不要安装在同一台服务器
    • ResourceManager也很消耗内存,不要和NameNode、SecondaryNameNode配置在同一台机器上
    hadoop102 hadoop103 hadoop104
    HDFS NameNode
    DateNode
    DateNode SecondaryNameNode
    DateNode
    YARN NodeManager ResourceManager
    NodeManager
    NodeManager
  2. 配置文件说明
    Hadoop配置文件分两类:默认配置文件和自定义配置文件,只有用户想修改某一默认配置值时,才需要修改自定义配置文件,更改相应属性值。

    1. 默认配置文件:

      要获取的默认文件 文件存放在Hadoop的jar包中的位置
      [core-default.xml] hadoop-common-3.3.5.jar/core-default.xml
      [hdfs-default.xml] hadoop-hdfs-3.3.5.jar/hdfs-default.xml
      [yarn-default.xml] hadoop-yarn-common-3.3.5.jar/yarn-default.xml
      [mapred-default.xml] hadoop-mapreduce-client-core-3.3.5.jar/mapred-default.xml
    2. 自定义配置文件::

      core-site.xmlhdfs-site,xmlyarn-site,xmlmapred-site.xml四个配置文件存放在$HADOOP_HOME/etc/hadoop这个路径上,用户可以根据项目需求重新进行修改配置。

  3. 配置集群

    1. 核心配置文件core-site.xml

      1
      vim $HADOOP_HOME/etc/hadoop/core-site.xml
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      <?xml version="1.0" encoding="UTF-8"?>
      <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
      <configuration>
      <!-- 指定NameNode的地址 -->
      <property>
      <name>fs.defaultFS</name>
      <value>hdfs://hadoop102:8020</value>
      </property>
      <!-- 指定hadoop数据的存储目录 -->
      <property>
      <name>hadoop.tmp.dir</name>
      <value>/opt/module/hadoop-3.3.5/data</value>
      </property>
      <!-- 配置HDFS网页登录使用的静态用户为hadoop -->
      <property>
      <name>hadoop.http.staticuser.user</name>
      <value>hadoop</value>
      </property>
      </configuration>
    2. HDFS配置文件hdfs-site.xml

      1
      vim $HADOOP_HOME/etc/hadoop/hdfs-site.xml
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      <?xml version="1.0" encoding="UTF-8"?>
      <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
      <configuration>
      <!-- 指定NameNode web端访问的地址 -->
      <property>
      <name>dfs.namenode.http-address</name>
      <value>hadoop102:9870</value>
      </property>
      <!-- SecondaryNameNode web端Http访问地址 -->
      <property>
      <name>dfs.namenode.secondary.http-address</name>
      <value>hadoop104:9868</value>
      </property>
      </configuration>
    3. YARN配置文件yarn-site.xml

      1
      vim $HADOOP_HOME/etc/hadoop/yarn-site.xml
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      <?xml version="1.0" encoding="UTF-8"?>
      <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
      <configuration>
      <!-- 指定MapReduce走shuffle -->
      <property>
      <name>yarn.nodemanager.aux-services</name>
      <value>mapreduce_shuffle</value>
      </property>
      <!-- 指定ResourceManager的地址 -->
      <property>
      <name>yarn.resourcemanager.hostname</name>
      <value>hadoop103</value>
      </property>
      <!-- 环境变量的继承 3.2.x 一下版本需要配置,少了HADOOP_MAPRED_HOME环境变量-->
      <property>
      <name>yarn.nodemanager.env-whitelist</name>
      <value>JAVA_HOME,HADOOP_COMMON_HOME,HADOOP_HDES_HOME,HADOOP_CONF_DIR,CLASSPATH_PREPEND_DISTCACHE,HADOOP_YARN_HOME,HADOOP_MAPRED_HOME</value>
      </property>
      </configuration>
    4. MapReduce配置文件mapred-site.xml

      1
      vim $HADOOP_HOME/etc/hadoop/mapred-site.xml
      1
      2
      3
      4
      5
      6
      7
      8
      9
      <?xml version="1.0" encoding="UTF-8"?>
      <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
      <configuration>
      <!-- 指定MapReduce程序运行在Yarn上 -->
      <property>
      <name>mapreduce.framework.name</name>
      <value>yarn</value>
      </property>
      </configuration>
    5. 文件分发

      1
      xsync /opt/module/hadoop-3.3.5/etc/hadoop

群起集群

  1. 配置workers

    1
    vim /opt/module/hadoop-3.3.5/etc/hadoop/workers
    1
    2
    3
    hadoop102
    hadoop103
    hadoop104

    注意:该文件中添加的内容结尾不允许有空格,文件中不允许有空行。

    1
    xsync /opt/module/hadoop-3.3.5/etc/hadoop
  2. 启动集群

    1. 如果集群是第一次启动,需要在hadoop102节点格式化NameNode(注意:格式化NameNode,会产生新的集群id,导致NameNode和DataNode的集群id不一致,集群找不到已往数据。如果集群在运行过程中报错,需要重新格式化NameNode的话,一定要先停止namenode和datanode进程,并且要删除所有机器的data和logs目录,然后再进行格式化)

      1
      hdfs namenode -format
    2. 启动HDFS

      1
      sbin/start-dfs.sh
    3. **在配置了ResourceManager的节点(hadoop103)**启动YARN

      1
      sbin/start-yarn.sh
    4. Web端查看HDFS的NameNode

    5. Web端查看YARN的ResourceManager

  3. 集群测试

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    # 创建文件夹
    [hadoop@hadoop102 hadoop-3.3.5]$ hadoop fs -mkdir /wcinput
    # 上传word.txt文件到刚刚创建的文件夹 http://hadoop102:9870/explorer.html#/wcinput
    [hadoop@hadoop102 hadoop-3.3.5]$ hadoop fs -put wcinput/word.txt /wcinput
    # 统计词数
    [hadoop@hadoop102 hadoop-3.3.5]$ hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-3.3.5.jar wordcount /input /output
    # 传递一个大的文件jdk
    [hadoop@hadoop102 hadoop-3.3.5]$ hadoop fs -put /opt/software/jdk-8u361-linux-x64.tar.gz /wcinput

    # 查看HDFS文件存储的路径
    [hadoop@hadoop102 subdir0]$ pwd
    /opt/module/hadoop-3.3.5/data/dfs/data/current/BP-1561999921-192.168.10.102-1680530974017/current/finalized/subdir0/subdir0
    # 查看HDFS在磁盘存储的内容
    [hadoop@hadoop102 subdir0]$ cat blk_1073741825
    ss ss
    cls cls
    banzhang
    bobo
    yangge

    # 拼接文件
    [hadoop@hadoop102 subdir0]$ cat blk_1073741826 >> tmp.tar.gz
    [hadoop@hadoop102 subdir0]$ cat blk_1073741827 >> tmp.tar.gz

    # 解压文件,发现是刚刚的jdk
    [hadoop@hadoop102 subdir0]$ tar -zvxf tmp.tar.gz

配置历史服务器

为了查看程序的历史运行情况,需要配置一下历史服务器。具体配置步骤如下:

  1. 配置mapred-site.xml

    1
    vim $HADOOP_HOME/etc/hadoop/mapred-site.xml

    在该文件里面增加如下配置。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    <!-- 历史服务器端地址 -->
    <property>
    <name>mapreduce.jobhistory.address</name>
    <value>hadoop102:10020</value>
    </property>
    <!-- 历史服务器web端地址 -->
    <property>
    <name>mapreduce.jobhistory.webapp.address</name>
    <value>hadoop102:19888</value>
    </property>
  2. 分发配置

    1
    xsync $HADOOP_HOME/etc/hadoop/mapred-site.xml
  3. hadoop102启动历史服务器

    1
    bin/mapred --daemon start historyserver
  4. 查看历史服务器是否启动

    1
    jps
  5. 查看JobHistory

配置日志的聚集

日志聚集概念:应用运行完成以后,将程序运行日志信息上传到HDFS系统上。

日志聚集功能好处:可以方便的查看到程序运行详情,方便开发调试。

注意:开启日志聚集功能,需要重新启动NodeManager、ResourceManager和HistoryServer

开启日志聚集功能具体步骤如下:

  1. 配置yarn-site.xml

    1
    vim $HADOOP_HOME/etc/hadoop/yarn-site.xml
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    <!-- 开启日志聚集功能 -->
    <property>
    <name>yarn.log-aggregation-enable</name>
    <value>true</value>
    </property>
    <!-- 二设置日志聚集服务器地址 -->
    <property>
    <name>yarn.log.server.url</name>
    <value>http://hadoop102:19888/jobhistory/logs</value>
    </property>
    <!-- 三设置日志保留时间为7天 -->
    <property>
    <name>yarn.log-aggregation.retain-seconds</name>
    <value>604800</value>
    </property>
  2. 文件分发

    1
    xsync $HADOOP_HOME/etc/hadoop/yarn-site.xml
  3. 重启历史服务器

    1
    2
    mapred --daemon stop historyserver
    mapred --daemon start historyserver
  4. 重启Yarn服务器(hadoop103)

    1
    2
    sbin/stop-yarn.sh
    sbin/start-yarn.sh

集群启动/停止方式总结

  1. 各个模块分开启动/停止(配置ssh是前提)常用

    • 整体启动/停止HDFS

      1
      start-dfs.sh/stop-dfs.sh
    • 整体启动/停止YARN

      1
      start-yarn.sh/stop-yarn.sh
  2. 各个服务组件逐一启动/停止

    • 分别启动/停止HDFS组件

      1
      hdfs --daemon start/stop namenode/datanode/secondarynamenoded
    • 启动/停止YARN

      1
      yarn --daemon start/stop resourcemanager/nodemanager

编写Hadoop集群常用脚本

  1. Hadoop集群启停脚本(包含HDFS,Yarn,Historyserver): hadoop_shell

    1
    2
    3
    vim ~/bin/hadoop_shell
    # 赋予可执行权限
    chmod +x vim ~/bin/hadoop_shell
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    #!/bin/bash

    # 定义颜色变量
    RED='\033[0;31m' # 红色
    GREEN='\033[0;32m' # 绿色
    YELLOW='\033[0;33m' # 黄色
    BLUE='\033[0;34m' # 蓝色
    MAGENNTA='\033[0;35m' # 紫红色
    CYAN='\033[0;36m' # 青色
    NC='\033[0m' # NC表示no color,用于将颜色重置为默认值

    HADOOP_HOME="/opt/module/hadoop-3.3.5"

    if [ $# -lt 1 ]
    then
    echo -e " ${RED}No Args Input...${NC} "
    exit;
    fi

    function start() {
    echo -e " ${BLUE}==================启动Hadoop集群 ==================${NC} "

    echo -e " ${BLUE}------------------ 启动 HDFS ------------------${NC} "
    ssh hadoop102 "${HADOOP_HOME}/sbin/start-dfs.sh"
    echo -e " ${BLUE}------------------ 启动 Yarn ------------------${NC} "
    ssh hadoop103 "${HADOOP_HOME}/sbin/start-yarn.sh"
    echo -e " ${BLUE}------------------ 启动 HistoryServer ------------------${NC} "
    ssh hadoop104 "${HADOOP_HOME}/bin/mapred --daemon start historyserver"
    }

    function stop() {
    echo -e " ${RED}================== 关闭 Hadoop 集群 ==================${NC} "

    echo -e " ${RED}------------------ 关闭 HistoryServer ------------------${NC} "
    ssh hadoop102 "${HADOOP_HOME}/bin/mapred --daemon stop historyserver"
    echo -e " ${RED}------------------ 关闭 Yarn ------------------${NC} "
    ssh hadoop103 "${HADOOP_HOME}/sbin/stop-yarn.sh"
    echo -e " ${RED}------------------ 关闭 HDFS ------------------${NC} "
    ssh hadoop104 "${HADOOP_HOME}/sbin/stop-dfs.sh"
    }

    function help() {
    echo -e " ${YELLOW} start 启动Hadoop集群 ${NC} "
    echo -e " ${YELLOW} stop 停止Hadoop集群 ${NC} "
    echo -e " ${YELLOW} restart 重启Hadoop集群 ${NC} "
    echo -e " ${YELLOW} jpsall 查看Hadoop集群进程 ${NC} "
    }

    function jpsall() {
    for host in hadoop102 hadoop103 hadoop104
    do
    echo -e " ${CYAN}========================= $host =========================${NC} "
    ssh $host jps
    done
    }

    case $1 in
    "start")
    start
    ;;

    "stop")
    stop
    ;;

    "jpsall")
    jpsall
    ;;

    "restart")
    stop
    start
    ;;

    "help")
    help
    ;;

    *)
    echo -e " ${RED}有效参数如下${NC} "
    help
    ;;

    esac

  2. 脚本分发

    1
    xsync ~/bin/hadoop_shell

    注意:对应机器赋予脚本可执行权限

常用端口号说明

端口名称 Hadoop2.x Hadoop3.x
NameNode内部通信端口 8020/9000 8020/9000/9820
NameNode HTTP Ul 50070 9870
MapReduce查看执行任务端口 8088 8088
历史服务器通信端口 19888 19888

集群时间同步

关于CTS时区的可能:

美国中部时间:Central Standard Time (USA) UT-6:00
澳大利亚中部时间:Central Standard Time (Australia) UT+9:30
中国标准时间:China Standard Time UT+8:00
古巴标准时间:Cuba Standard Time UT-4:00

  1. 外网同步

    1
    2
    3
    4
    5
    6
    7
    8
    9
    # 安装时间同步插件
    yum install ntpdate
    # 开启时间同步
    service ntpdate restart
    # 设置时区
    # 删除本地时间(美国时间)
    rm -rf /etc/localtime
    # 设置时区为上海
    ln -s /usr/share/zoneinfo/Asia/Shanghai /etc/localtime
  2. 内网时间服务器配置(必须root用户)

    1. 查看所有节点npd服务状态和开机自启动状态
      1
      2
      3
      4
      5
      6
      7
      sudo systemctl status ntpdate		    								# 查看ntpdate状态
      sudo systemctl start ntpdate # 启动ntpdate
      sudo systemctl stop ntpdate # 停止ntpdate
      sudo systemctl is-enabled ntpdate # 查看开机是否自启
      sudo systemctl enable ntpdate # 开机自启
      sudo systemctl disable ntpdate # 移除开机自启
      sudo systemctl list-unit-files|grep ntpdate # 查看是否开机自启

Hadoop之HDFS

HDFS概述

HDFS产出背景及定义

  1. HDFS产生背景
    随着数据量越来越大,在一个操作系统存不下所有的数据,那么就分配到更多的操作系统管理的磁盘中,但是不方便管理和维护,迫切需要一种系统来管理多台机器上的文件,这
    就是分布式文件管理系统。HDFS只是分布式文件管理系统中的一种

  2. HDFS定义
    HDFS(Hadoop Distributed File System),它是一个文件系统,用于存储文件,通过目录树来定位文件;其次,它是分布式的,由很多服务器联合起来实现其功能,集群中的服务
    器有各自的角色。

    HDFS的使用场景:适合一次写入,多次读出的场景。一个文件经过创建、写入和关闭之后就不需要改变。

HDFS优缺点

优点

  1. 高容错性
    • 数据自动保存多个副本。它通过增加副本的形式,提高容错性。
    • 某一个副本丢失以后,它可以自动恢复。
  2. 适合处理大数据
    • 数据规模:能够处理数据规模达到GB、TB、甚至PB级别的数据
    • 文件规模:能够处理百万规模以上的文件数量,数量相当之大。
  3. 构建在廉价机器上,通过多副本机制,提高可靠性。

缺点

  1. 不适合低延时数据访问,比如毫秒级的存储数据,是做不到的。
  2. 无法高效的对大量小文件进行存储。
    • 存储大量小文件的话,它会占用NameNode大量的内存来存储文件目录和块信息。这样是不可取的,因为NameNode的内存总是有限的;
    • 小文件存储的寻址时间会超过读取时间,它违反了HDFS的设计目标
  3. 不支持并发写入、文件随机修改。
    • 一个文件只能有一个写,不允许多个线程同时写
    • **仅支持数据append(追加)**,不支持文件的随机修改。

HDFS架构组成

HDFS架构组成

  1. NameNode(nn):就是Master,它是一个主管、管理者。
    • 管理HDFS的名称空间;
    • 配置副本策略;
    • 管理数据块(Block)映射信息;
    • 处理客户端的读写请求
  2. DataNode:就是Slave。NameNode下达命令,DataNode执行实际的操作。
    • 存储实际的数据块;
    • 执行数据块的读/写操作
  3. Client:就是客户端。
    • 文件切分。文件上传HDFS的时候,Client将文件切分成一个一个的Block,然后进行上传:
    • 与NameNode交互,获取文件的位置信息;
    • 与DataNode交互,读取或者写入据:
    • Client提供一些命令来管理HDFS,比如NameNode格式化;
    • Client可以通过一些命令来访问HDFS,比如对HDFS增删查改操作;
  4. Secondary NameNode:并非NameNode的热备。当NameNode挂掉的时候,它并不能马上替换NameNode并提供服务。
    • 辅助NameNode,分担其工作量,比如定期合并FsimageEdits,并推送给NameNode;
    • 在紧急情况下,可辅助恢复NameNode。

HDFS文件块大小

HDFS中的文件在物理上是分块存储(Block),块的大小可以通过配置参数(dfs.blocksize)来规定,默认大小在Hadoop2.x/3.x版本中是128M,1.x版本中是64M。

  1. 集群中的Block
  2. 如果寻址时间约为10ms,即查找到目标block的时间为10ms。
  3. 寻址时间为传输时间的1%时,则为最佳状态。因此,传输时间=10ms/0.01=1000ms=1s
  4. 而目前磁盘的传输速率普遍为100MB/s
  5. Block大小=1s*100MB/s=100MB

思考:为什么块的大小不能设置太小,也不能设置太大?

  1. HDFS的块设置太小,会增加寻址时间,程序一直在找块的开始位置:
  2. 如果块设置的太大,从磁盘传输数据的时间会明显大于定位这个块开始位置所需的时间。导致程序在处理这块数据时,会非常慢。

总结:HDFS块的大小设置主要取决于磁盘传输速率。

HDFS的Shell操作

基本语法

hadoop fs 具体命令 OR hdfs dfs具体命令

两个是完全相同的。

常用命令

  1. 确保Hadoop集群已启动
  2. -help:输出这个命令参数hadoop fs -help rm
  3. 创建/sanguo文件夹 hadoop fs -mkdir /sanguo

上传

  1. -moveFromLocal:从本地剪切粘贴到HDFS

    1
    2
    3
    4
    5
     # 创建文件,插入值 shuguo
    vim shuguo.txt

    # 执行成功查看当前路径,发现本地shuguo.txt文件已经没有了
    hadoop fs -moveFromLocal ./shuguo.txt /sanguo
  2. -copyFromLocal:从本地文件系统中拷贝文件到HDFS路径去

    1
    2
    3
    4
    5
     # 创建文件,插入值 weiguo
    vim weiguo.txt

    # 执行成功查看当前路径,发现本地weiguo.txt文件还存在
    hadoop fs -copyFromLocal ./weiguo.txt /sanguo
  3. -put:等同于-copyFromLocal生产环境更习惯用put

    1
    2
    3
    4
     # 创建文件,插入值 wuguo
    vim wuguo.txt

    hadoop fs -put ./wuguo.txt /sanguo
  4. -appendToFile:追加一个文件到已经存在的文件末尾

    1
    2
    3
    4
    5
     # 创建文件,插入值 liubei
    vim liubei.txt

    # 把 liubei.txt 文件中的值(liubei)追加到 HDFS的/sanguo/shuguo.txt 文件中
    hadoop fs -appendToFile ./liubei.txt /sanguo/shuguo.txt

下载

  1. -copyToLocal:从HDFS拷贝文件到本地

    1
    2
    # 从文系统的/sanguo/shuguo.txt文件拷贝到当前路径
    hadoop fs -copyToLocal /sanguo/shuguo.txt ./
  2. -get: 等同于-copyToLocal,生产环境更习惯用get

    1
    2
    # 从文系统的/sanguo/shuguo.txt文件拷贝到当前路径并且下载的文件夹名为shuguo2.txt
    hadoop fs -copyToLocal /sanguo/shuguo.txt ./shuguo2.txt

HDFS直接操作

  1. -ls:显示日志信息

    1
    2
    # 查看文件系统/sanguo目录下的文件信息
    hadoop fs -ls /sanguo
  2. -cat:显示文件内容

    1
    2
    # 查看文件系统/sanguo/shuguo.txt 文件的内容
    hadoop fs -cat /sanguo/shuguo.txt
  3. -chgrp、-chmod、-chown:Linux文件系统中的用法一样,修改文件所属权限

    1
    2
    hadoop fs -chmod 666 /sanguo/shuguo.txt
    hadoop fs -chown root:root /sanguo/shuguo.txt
  4. -mkdir:创建路径

    1
    hadoop fs -mkdir /jinguo
  5. -cp:从HDFS的一个路径拷贝到HDFS的另一个路径

    1
    hadoop fs -cp /sanguo/shuguo.txt /jinguo
  6. -mv:在HDFS目录中移动文件

    1
    2
    hadoop fs -mv /sanguo/weiguo.txt /jinguo
    hadoop fs -mv /sanguo/wuguo.txt /jinguo
  7. -tail:显示一个文件的末尾1kb的数据

    1
    hadoop fs -tail /jinguo/shuguo.txt
  8. -rm:删除文件或文件夹

    1
    hadoop fs -rm /sanguo/shuguo.txt
  9. -rm -r:递归删除文件及文件里面的内容

    1
    hadoop fs -rm -r /sanguo
  10. -du:统计文件夹的大小信息

    1
    2
    3
    4
    5
    6
    7
    8
    [hadoop@hadoop102 hadoop-3.3.5]$ hadoop fs -du -s -h /jinguo
    27 81 /jinguo
    [hadoop@hadoop102 hadoop-3.3.5]$ hadoop fs -du -h /jinguo
    14 42 /jinguo/shuguo.txt
    7 21 /jinguo/weiguo.txt
    6 18 /jinguo/wuguo.txt

    # 说明:27表示文件大小;81表示27*3个副本;/jinguo表示查看的目录
  11. -setrep:设置HDFS中文件的副本数量

    1
    hadoop fs -setrep 10 /jinguo/shuguo.txt

    这里设置的副本数只是记录在NameNode的元数据中,是否真的会有这么多副本,还得看DataNode的数量。因为目前只有3台设备,最多也就3个副本,只有节点数的增加到l0
    台时,副本数才能达到10。

HDFS的API操作

环境变量及文件参考:https://github.com/cdarlint/winutils

创建一个SpringBoot项目

pom.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.0.5</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.hadoop.hdfs</groupId>
<artifactId>HDFSClinet</artifactId>
<version>0.0.1</version>
<name>HDFSClinet</name>
<description>HDFSClinet</description>
<properties>
<java.version>17</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs-client</artifactId>
<version>3.3.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.3.5</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.16</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
import cn.hutool.core.util.StrUtil;
import cn.hutool.system.SystemUtil;
import lombok.SneakyThrows;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

import java.net.URI;
import java.util.Arrays;
import java.util.HashMap;

@SpringBootTest
class HdfsClinetApplicationTests {

private static FileSystem FILE_SYSTEM;

@SneakyThrows
@Test
void mkdir() {
FILE_SYSTEM.mkdirs(new Path("/xiyou/huaguoshan"));
}

@SneakyThrows
@Test
/**
* 文件上传
* hdsf-default.xml ===> hdfs.site.xml ===> 项目资源目录下配置文件 ===> 代码里面的配置
*/
void upload() {
/*
* delSrc -是否删除原数据
* overwrite -是否覆盖已有文件
* src- 原数据路径
* dst- 目的地路径
*/
FILE_SYSTEM.copyFromLocalFile(false, true, new Path("C:\\Users\\XiaoYu\\Downloads\\jdk-8u361-linux-x64.tar.gz"), new Path("hdfs://hadoop102/xiyou/huaguoshan"));
}

@Test
@SneakyThrows
void download() {
/*
* delSrc -源文件是否删除
* src- 原数据路径 HDFS 路径
* dst- 目的地路径 Windows路径
* useRawLocalFileSystem - 开启本地校验 下载下来的文件多了一个.crc文件,该文件是循环冗余校验 http://www.ip33.com/crc.html
*/
FILE_SYSTEM.copyToLocalFile(false, new Path("hdfs://hadoop102/xiyou/huaguoshan/jdk-8u361-linux-x64.tar.gz"), new Path("C:\\Users\\XiaoYu\\Downloads"), false);
}

@Test
@SneakyThrows
void del() {
/*
* f - 删除路径。
* recursive - 是否递归删除
*/
// 删除文件
// FILE_SYSTEM.delete(new Path("hdfs://hadoop102/xiyou/huaguoshan/jdk-8u361-linux-x64.tar.gz"), false);
// 删除空目录
// FILE_SYSTEM.delete(new Path("hdfs://hadoop102/xiyou/huaguoshan"), false);
// 删除非空目录
FILE_SYSTEM.delete(new Path("hdfs://hadoop102/jinguo"), true);
}

@Test
@SneakyThrows
/**
* 文件(夹)的更名和移动
*/
void move() {
/*
* src- 源文件(夹)路径
* dst- 目标文件(夹)路径
*/
// 文件(夹)名称的修改
FILE_SYSTEM.rename(new Path("/xiyou"), new Path("/xiyouji"));

// 文件(夹)的移动
// FILE_SYSTEM.rename(new Path("/xiyouji/jdk-8u361-linux-x64.tar.gz"), new Path("/jdk-8u361-linux-x64.tar.gz"));
}

@Test
@SneakyThrows
void fileDetail() {
// 获取所有的文件信息
RemoteIterator<LocatedFileStatus> iterator = FILE_SYSTEM.listFiles(new Path("/"), true);
log(31, 1, "========== 文件信息如下:==========");
while (iterator.hasNext()) {
LocatedFileStatus file = iterator.next();
log(33, 1, StrUtil.format("文件路径:{}", file.getPath()));
log(33, 1, StrUtil.format("文件名称:{}", file.getPath().getName()));
log(33, 1, StrUtil.format("文件权限:{}", file.getPermission()));
log(33, 1, StrUtil.format("文件所属者:{}", file.getOwner()));
log(33, 1, StrUtil.format("文件所属组:{}", file.getGroup()));
log(33, 1, StrUtil.format("文件大小:{}", file.getLen()));
log(33, 1, StrUtil.format("文件最后一次修改时间:{}", file.getModificationTime()));
log(33, 1, StrUtil.format("文件副本:{}", file.getReplication()));
log(33, 1, StrUtil.format("文件块大小:{}", file.getBlockSize()));

log(36, 1, StrUtil.format("↓↓↓↓↓↓↓ {} 块信息如下 ↓↓↓↓↓↓↓",file.getPath().getName()));
log(36, 1, Arrays.toString(file.getBlockLocations()));
}
}

@Test
@SneakyThrows
void file() {
FileStatus[] fileStatuses = FILE_SYSTEM.listStatus(new Path("/"));
for (FileStatus fileStatus : fileStatuses) {
if (fileStatus.isFile()) {
log(33, 1, StrUtil.format("文件:{}", fileStatus.getPath().getName()));
}else {
log(36, 3, StrUtil.format("目录:{}", fileStatus.getPath().getName()));
}
}
}

@SneakyThrows
@BeforeEach
void contextLoads() {
// 手动设置Hadoop环境变量
SystemUtil.set("HADOOP_HOME", "C:\\Users\\XiaoYu\\Desktop\\hadoop\\hadoop-3.2.2");
SystemUtil.set("hadoop.home.dir", SystemUtil.get("HADOOP_HOME"));
Configuration config = new Configuration();
// hdsf-default.xml ===> hdfs.site.xml ===> 项目资源目录下配置文件 ===> 代码里面的配置
// config.setInt("dfs.replication", 2);

FILE_SYSTEM = FileSystem.get(URI.create("hdfs://hadoop102:8020"), config, "hadoop");
}

@SneakyThrows
@AfterEach
void close() {
FILE_SYSTEM.close();
}

/**
* @param colour 颜色代号:背景颜色代号(41-46);前景色代号(31-36)
* @param fontType 样式代号:0无;1加粗;3斜体;4下划线
* @param content 要打印的内容
*/
void log(int colour, int fontType, String content) {
System.out.printf("\u001B[%d;%dm%s\u001B[0m%n", colour, fontType, content);
}

private static final HashMap<Integer, String> COLOR = new HashMap<>() {
{
put(31, "红色字体");
put(32, "绿色字体");
put(33, "黄色字体");
put(34, "蓝色字体");
put(35, "紫色字体");
put(36, "青色字体");
put(37, "灰色字体");
put(40, "黑色背景");
put(41, "红色背景");
put(42, "绿色背景");
put(43, "黄色背景");
put(44, "蓝色背景");
put(45, "紫色背景");
put(46, "青色背景");
put(47, "灰色背景");
}
};
}

HDFS的读写流程

HDFS的写数据流程

HDFS的写数据流程

  • 客户端创建Distributed FileSystem,该模块向HDFS的老大哥NameNode请求上传文件ss.avi
  • 老大哥NameNode是个做事很守规矩的人,所以他要检查权限和目录结构是否存在,如果存在就向Distributed FileSystem响应可以上传文件
  • 但是Distributed FileSystem并不知道我上传的文件应该传给哪个小老弟(DataNode1、2、3),就问老大哥:”大哥,我应该上传到哪个DataNodee小老弟服务器啊?“
  • 老大哥NameNode需要想一下文件应该存到哪个机器上(这个过程叫做副本存储节点选择,下文有写),想好之后就返回dn1、2、3说”这三个可以存文件“
  • 这时候客户端就要创建文件输出流FSDataOutPutStream让他去上传数据
  • 数据是需要通过管道进行传输的,所以文件输出流就需要先铺设管道,它首先请求dn1dn1再调用dn2,最后dn2调用dn3,将这个通信管道建立完成,dn1、2、3逐级应答客户端建立完毕
  • 这时候,客户端就开始往dn1上传第一个Block,以Packet为单位,dn1收到一个Packet就会传给dn2dn2再传给dn3为了保证数据不丢失,三个机器在传数据的时候,都有自己的一个应答对列放Packet,直到收到下一台机器传回的ack应答信息,才把Packet删掉。
  • 当一个Block传输完成之后,客户端会再次请求NameNode上传第二个Block的服务器。重复执行上面的操作,直到所有的数据都上传完成

网络拓扑 - 节点距离计算

HDFS写数据的过程中,NameNode会选择距离待上传数据最近距离的DataNode接收数据,需要计算出这个距离。

节点距离:两个节点到达最近的功能祖先的距离总和

现实生活中,服务器都会在机架上放着,然后形成一个,看下图:

我们要想计算节点距离,可以把他抽象成一个图:

如果我们要算机器d1->r2->n1d2->r6->n0的距离,则需要找到他们的共同祖先,然后把把路径相加就可以了,如下图所示:

机架感知(副本存储节点选择)

官方说明

For the common case, when the replication factor is three, HDFS’s placement policy is to put one replica on the local machine if the writer is on a datanode, otherwise on a random datanode in the same rack as that of the writer, another replica on a node in a different (remote) rack, and the last on a different node in the same remote rack. This policy cuts the inter-rack write traffic which generally improves write performance. The chance of rack failure is far less than that of node failure; this policy does not impact data reliability and availability guarantees. However, it does not reduce the aggregate network bandwidth used when reading data since a block is placed in only two unique racks rather than three. With this policy, the replicas of a block do not evenly distribute across the racks. Two replicas are on different nodes of one rack and the remaining replica is on a node of one of the other racks. This policy improves write performance without compromising data reliability or read performance.

对于常见的情况,当复制系数为3时,HDFS的放置策略是,如果写作者在数据节点上,则将一个副本放在本地机器上,否则放在与写作者相同机架的随机数据节点上,另一个副本放在不同(远程)机架的节点上最后一个放在同一远程机架的不同节点上。这种策略减少了机架间的写入流量,通常会提高写入性能。机架故障的几率远远小于节点故障的几率;这个策略不影响数据的可靠性和可用性保证。然而,它并没有减少读取数据时使用的总网络带宽,因为一个区块只被放在两个唯一的机架上,而不是三个。在这种策略下,一个区块的副本不会均匀地分布在机架上。两个副本在一个机架的不同节点上,其余的副本在其他机架的一个节点上。这个策略在不影响数据可靠性或读取性能的情况下提高了写入性能。

源码说明:org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault#chooseTargetInOrder

HDFS写文件会把第一个副本存储在客户端所处节点上,第二个副本在另一个机架上的随机一个节点,第三个副本在第二个副本所在机架的另外一个随机节点上。如下图所示:

HDFS读数据流程

HDFS读数据流程

  • 客户端通过DistributedFileSystemNameNode请求下载文件,NameNode通过查询元数据,找到文件块所在的DataNode地址
  • 挑选一台DataNode(就近原则,然后随机)服务器,请求读取数据
  • DataNode开始传输数据给客户端(从磁盘里面读取数据输入流,以Packet为单位来做校验)
  • 客户端以Packet为单位接收,现在本地缓存,然后写入目标文件

NameNode与SecondaryNameNode

NameNode简称为NNSecondaryNameNode简称为2NN

NN和2NN工作机制

元数据在什么地方?

NameNode中有元数据,那么元数据是存在什么地方呢?

如果元数据存储在NN的内存中,内存有两个特点就是:不可靠(断电即失)、计算速度快;这样保证不了数据的可靠性,一旦断电、元数据丢失,整个集群就无法工作了

如果元数据存储在NN的硬盘中,硬盘的特点是:可靠、计算速度慢;这样虽然保证了数据的可靠性,但是它的计算速度就慢了。

因此HDFS采用了内存 + 磁盘的方式,并且引入了两个概念FsImage与EditsFsImage负责存储数据;Edits负责追加内容(不计算)。每当元数据有更新或者或者添加元数据时,修改内存中的元数据并追加到Edits中。 这样,一旦NN节点断电,就可以通过FsImageEdits的合并,合成元数据。

但是随着Edits文件数据过大,它的效率降低,一旦断电、恢复元数据的时间也过长,因此需要定期进行FsImageEdits的合并,这个工作如果只有NN做,又会效率过低。所以引入一个新的节点2NN专门用于FsImageEdits的合并。

NN和2NN的工作机制

下面一张图解释了NN2NN的工作机制:

NameNode和SecondaryNameNode工作机制

它分为两个阶段:

第一阶段:NameNode启动:

  • 第一次启动NN格式化后,创建FsImageEdits文件,如果不是第一次启动,直接加载FsImageEdits到内存
  • 客户端对元数据进行增删改的请求
  • NNEdits记录操作日志,更新滚动日志
  • NN在内存中对元数据进行增删改

第二阶段:Secondary NameNode工作:

  • 2NN定期或当Edits数据满的时候询问NN是否需要CheckPoint(检查是否需要合并)
  • 2NN请求执行CheckPoint
  • NN滚动正在写的Edits日志
  • 然后将滚动前的编辑日志和镜像文件拷贝到2NN
  • 2NN加载编辑日志和镜像文件到内存,并合并
  • 生成新的镜像文件fsimage.chkpoint
  • 拷贝fsimage.chkpointNN
  • NNfsimage.chkpoint重新命名为fsimage

FsImage和Edits解析

我们可以查看NN所在的hadoop1022NN所在的hadoop104他们的fsimageedits文件:

hadoop102路径:/opt/module/hadoop-3.3.5/data/dfs/name/current

hadoop104路径:/opt/module/hadoop-3.3.5/data/dfs/namesecondary/current

可以发现NN2NN多一个文件edits_inprogress...,如果NN宕机,那么2NN可以帮助恢复NN元数据,但是只能恢复一部分,缺少的部分正是该文件。该文件存储的是正在执行的操作

文件解读:

  • Fsimage文件HDFS文件系统元数据的一个永久性的检查点,其中包含HDFS文件系统的所有目录和文件inode的序列化信息
  • Edits文件:存放HDFS文件系统的所有更新操作的路径,文件系统客户端执行的所有写操作首先会被记录到Edits文件中
  • seen_txid文件保存的是一个数字,就是最后一个edits_的数字
  • 每次NN启动的时候都会将Fsimage文件读入内存,加载Edits里面的更新操作,保证内存中的元数据信息是最新的、同步的,可以看成NN启动的时候就将FsimageEdits文件进行了合并

oiv查看Fsimage文件

语法:hdfs oiv -p 文件类型 -i 镜像文件 -o 转换后文件输出路径

现在使用这个命令查看一下fsimage_0000000000000000254

1
hdfs oiv -p XML -i fsimage_0000000000000000254 -o /tmp/fsimage.xml

执行完成之后,会输出一个文件 cat /tmp/fsimage.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
<?xml version="1.0"?>
<fsimage><version><layoutVersion>-66</layoutVersion><onDiskVersion>1</onDiskVersion><oivRevision>706d88266abcee09ed78fbaa0ad5f74d818ab0e9</oivRevision></version>
<NameSection><namespaceId>1978262501</namespaceId><genstampV1>1000</genstampV1><genstampV2>1032</genstampV2><genstampV1Limit>0</genstampV1Limit><lastAllocatedBlockId>1073741855</lastAllocatedBlockId><txid>254</txid></NameSection>
<ErasureCodingSection>
<erasureCodingPolicy>
<policyId>1</policyId><policyName>RS-6-3-1024k</policyName><cellSize>1048576</cellSize><policyState>DISABLED</policyState><ecSchema>
<codecName>rs</codecName><dataUnits>6</dataUnits><parityUnits>3</parityUnits></ecSchema>
</erasureCodingPolicy>

<erasureCodingPolicy>
<policyId>2</policyId><policyName>RS-3-2-1024k</policyName><cellSize>1048576</cellSize><policyState>DISABLED</policyState><ecSchema>
<codecName>rs</codecName><dataUnits>3</dataUnits><parityUnits>2</parityUnits></ecSchema>
</erasureCodingPolicy>

<erasureCodingPolicy>
<policyId>3</policyId><policyName>RS-LEGACY-6-3-1024k</policyName><cellSize>1048576</cellSize><policyState>DISABLED</policyState><ecSchema>
<codecName>rs-legacy</codecName><dataUnits>6</dataUnits><parityUnits>3</parityUnits></ecSchema>
</erasureCodingPolicy>

<erasureCodingPolicy>
<policyId>4</policyId><policyName>XOR-2-1-1024k</policyName><cellSize>1048576</cellSize><policyState>DISABLED</policyState><ecSchema>
<codecName>xor</codecName><dataUnits>2</dataUnits><parityUnits>1</parityUnits></ecSchema>
</erasureCodingPolicy>

<erasureCodingPolicy>
<policyId>5</policyId><policyName>RS-10-4-1024k</policyName><cellSize>1048576</cellSize><policyState>DISABLED</policyState><ecSchema>
<codecName>rs</codecName><dataUnits>10</dataUnits><parityUnits>4</parityUnits></ecSchema>
</erasureCodingPolicy>

</ErasureCodingSection>

<INodeSection><lastInodeId>16431</lastInodeId><numInodes>3</numInodes><inode><id>16385</id><type>DIRECTORY</type><name></name><mtime>1680658826458</mtime><permission>hadoop:supergroup:0755</permission><nsquota>9223372036854775807</nsquota><dsquota>-1</dsquota></inode>
<inode><id>16419</id><type>DIRECTORY</type><name>xiyouji</name><mtime>1680657803997</mtime><permission>hadoop:supergroup:0755</permission><nsquota>-1</nsquota><dsquota>-1</dsquota></inode>
<inode><id>16431</id><type>FILE</type><name>jdk-8u361-linux-x64.tar.gz</name><replication>3</replication><mtime>1680658228772</mtime><atime>1680658226757</atime><preferredBlockSize>134217728</preferredBlockSize><permission>hadoop:supergroup:0644</permission><blocks><block><id>1073741854</id><genstamp>1031</genstamp><numBytes>134217728</numBytes></block>
<block><id>1073741855</id><genstamp>1032</genstamp><numBytes>4544502</numBytes></block>
</blocks>
<storagePolicyId>0</storagePolicyId></inode>
</INodeSection>
<INodeReferenceSection></INodeReferenceSection><SnapshotSection><snapshotCounter>0</snapshotCounter><numSnapshots>0</numSnapshots></SnapshotSection>
<INodeDirectorySection><directory><parent>16385</parent><child>16431</child><child>16419</child></directory>
</INodeDirectorySection>
<FileUnderConstructionSection></FileUnderConstructionSection>
<SecretManagerSection><currentId>0</currentId><tokenSequenceNumber>0</tokenSequenceNumber><numDelegationKeys>0</numDelegationKeys><numTokens>0</numTokens></SecretManagerSection><CacheManagerSection><nextDirectiveId>1</nextDirectiveId><numDirectives>0</numDirectives><numPools>0</numPools></CacheManagerSection>
</fsimage>

oev查看Edits文件

语法:hdfs oev -p 文件类型 -i 编辑日志 -o 转换后文件输出路径

现在使用这个命令查看一下edits_inprogress...

1
hdfs oev -p XML -i edits_inprogress_0000000000000000255 -o /tmp/edits.xml

已经有了该文件: cat /tmp/edits.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<EDITS>
<EDITS_VERSION>-66</EDITS_VERSION>
<RECORD>
<OPCODE>OP_START_LOG_SEGMENT</OPCODE>
<DATA>
<TXID>255</TXID>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_ADD</OPCODE>
<DATA>
<TXID>256</TXID>
<LENGTH>0</LENGTH>
<INODEID>16432</INODEID>
<PATH>/xiyouji/Hadoop_Logo.png</PATH>
<REPLICATION>3</REPLICATION>
<MTIME>1680666513756</MTIME>
<ATIME>1680666513756</ATIME>
<BLOCKSIZE>134217728</BLOCKSIZE>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_-1884544483_289</CLIENT_NAME>
<CLIENT_MACHINE>192.168.10.102</CLIENT_MACHINE>
<OVERWRITE>false</OVERWRITE>
<PERMISSION_STATUS>
<USERNAME>hadoop</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
<ERASURE_CODING_POLICY_ID>0</ERASURE_CODING_POLICY_ID>
<RPC_CLIENTID>6d903f8c-0a77-47a9-8117-3b636cba392f</RPC_CLIENTID>
<RPC_CALLID>16588</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_ALLOCATE_BLOCK_ID</OPCODE>
<DATA>
<TXID>257</TXID>
<BLOCK_ID>1073741856</BLOCK_ID>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_SET_GENSTAMP_V2</OPCODE>
<DATA>
<TXID>258</TXID>
<GENSTAMPV2>1033</GENSTAMPV2>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_ADD_BLOCK</OPCODE>
<DATA>
<TXID>259</TXID>
<PATH>/xiyouji/Hadoop_Logo.png</PATH>
<BLOCK>
<BLOCK_ID>1073741856</BLOCK_ID>
<NUM_BYTES>0</NUM_BYTES>
<GENSTAMP>1033</GENSTAMP>
</BLOCK>
<RPC_CLIENTID/>
<RPC_CALLID>-2</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_CLOSE</OPCODE>
<DATA>
<TXID>260</TXID>
<LENGTH>0</LENGTH>
<INODEID>0</INODEID>
<PATH>/xiyouji/Hadoop_Logo.png</PATH>
<REPLICATION>3</REPLICATION>
<MTIME>1680666513868</MTIME>
<ATIME>1680666513756</ATIME>
<BLOCKSIZE>134217728</BLOCKSIZE>
<CLIENT_NAME/>
<CLIENT_MACHINE/>
<OVERWRITE>false</OVERWRITE>
<BLOCK>
<BLOCK_ID>1073741856</BLOCK_ID>
<NUM_BYTES>131642</NUM_BYTES>
<GENSTAMP>1033</GENSTAMP>
</BLOCK>
<PERMISSION_STATUS>
<USERNAME>hadoop</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
</DATA>
</RECORD>
</EDITS>

CheckPoint时间设置

通常情况下2NN每隔一个小时执行一次检查,可以查看hdfs-default.xml

1
2
3
4
<property>
<name>dfs.namenode.checkpoint.period</name>
<value>3600s</value>
</property>

当操作数达到一百万,2NN执行一次:

1
2
3
4
5
6
7
8
9
10
<property>
<name>dfs.namenode.checkpoint.txns</name>
<value>1000000</value>
<description>操作动作次数</description>
</property>
<property>
<name>dfs.namenode.checkpoint.check.period</name>
<value>60s</value>
<description> 1 分钟检查一次操作次数</description>
</property>

如果想要更换时间或者更改操作数,可以配置到hdfs-site.xml

HDFS的DataNode

DataNode工作机制

它的工作流程是:

  • DataNode启动后,必须向NameNode汇报自己的块信息,然后定期(6个小时)扫描、上报自己所有块的信息。块信息包括:数据、数据长度、校验和(即数据完整性)、时间戳
  • 每个DataNode必须定期向NameNode汇报说:我还活着。这个过程叫做心跳,心跳每三秒一次;如果超过10分钟+30秒 NameNode没有收到DataNode的心跳,就会认为DataNode挂掉了

DN向NN汇报当前块信息的时间间隔,默认6个小时,在hdfs-default.xml文件中有配置:

1
2
3
4
5
<property>
<name>dfs.blockreport.intervalMsec</name>
<value>21600000</value>
<description>Determines block reporting interval in milliseconds.</description>
</property>

DN扫描自己自己节点块的信息的时间,默认6个小时,同样在hdfs-default.xml文件中有配置:

1
2
3
4
5
6
7
8
9
<property>
<name>dfs.datanode.directoryscan.interval</name>
<value>21600s</value>
<description>Interval in seconds for Datanode to scan data
directories and reconcile the difference between blocks in memory and on
the disk. Support multiple time unit suffix(case insensitive), as described
in dfs.heartbeat.interval.
</description>
</property>

如果想改变时间间隔,可以将上述两个配置信息配置到hdfs-site.xml中,然后分发配置重启hadoop集群,配置就会生效

数据完整性

数据完整性就是要保证数据在网络传输中不发生错误,所以要采取一些校验数据的手段,比如奇偶校验、CRC循环冗余校验

比如我们要在网络中传输

第一个数据:
1000100
第二个数据:
1000101

奇偶校验就是:数所有的位中有多少个1,如果1的个数是偶数,那么就在末尾添加0;反之,如果1的个数是奇数,那么就在末尾添加1。对于上面的两个数据,他们应该各自加:

第一个数据:
1000100 | 0
第二个数据:
1000101 | 1

这样只要在传输过去之后,再次计算校验位,然后与携带过来的校验位进行对比,就可以知道数据有没有传输失误了。

但是,使用奇偶校验有一个很明显的问题,那就是如果两个位发生了改变,最后得出的校验位还是原来的数字。比如第一个数据:1000100在传输过程中,变成了1100110,数据发生了改变,但是校验位依旧是0,显然是有很大的问题的

所有Hadoop就采用了另外一种更安全的校验方法CRC循环冗余校验。这种校验法会随机生成一个多项式,然后把原始数据与多项式进行除法操作,最后把得出的余数一起发送过去。接收端再对原始数据除一下多项式,如果得出的余数和发送端一样,就没有任何问题。

掉线时限参数设置

在前文提到的心跳中,如果DataNode10分钟+30秒内不向NameNode心跳一下,就会认为DataNode挂掉了

那么这个10分钟+30秒是怎么得来的呢?

它的计算公式是:TimeOut = 2 * dfs.namenode.heartbeat.recheck-interval + 10 * dfs.heartbeat.interval

看一下hdfs-default.xml的默认设置:其中dfs.namenode.heartbeat.recheck-interval的默认时间是300000毫秒即五分钟,dfs.heartbeat.interval的默认时间是3秒,于是超时时间就是10分钟+30秒

1
2
3
4
5
6
7
8
<property>
<name>dfs.namenode.heartbeat.recheck-interval</name>
<value>300000</value>
</property>
<property>
<name>dfs.heartbeat.interval</name>
<value>3s</value>
</property>

如果想要修改这些配置,可以把这些配置信息都配置到hdfs-site.xml

Hadoop之MapReduce

Hadoop中的MapReduce是一种编程模型,用于大规模数据集的并行运算

MapReduce 概述

MapReduce定义

MapReduceHadoop的一个分布式(多台机器)运算程序的编程框架,也是Hadoop的核心框架

如果让我们用程序实现一个多台机器并发计算的程序,我们可能需要关心到数据传输、如何并发等很多底层的问题,MapReduce为我们提供了一系列的框架,我们可以直接调用

MapReduce核心功能是将用户编写的业务逻辑代码自带默认组件整合成一个完整的分布式运算程序,并发运行在一个Hadoop集群上

综上所述:MapReduce = 自己写的业务逻辑代码 + 它自身默认的底层代码

MapReduce的优缺点

优点

  • 易于编程:因为MapReduce已经封装好了底层代码,我们只需要关心业务逻辑代码,实现框架的接口就好了
  • 有良好的扩展性:如果服务器资源枯竭,可以动态增加服务器,解决计算资源不够的问题
  • 高容错性:任何一台机器挂掉,都可以将任务转移到其他节点
  • 适合海量数据计算:TB/PB级别,几千台服务器共同计算

缺点

当然因为MapReduce把中间结果存放在了硬盘,所以它不适合用于:

  • 不适用于实时计算
  • 不擅长流式计算(Spark Streaming、Flink适合)
  • 不擅长DAG有向无环图计算

MapReduce核心编程思想

拿一个需求来举例说明MapReduce的思想:有个300M的文件,统计每一个单词出现的总次数,要求查询结果a-p的为一个文件,q-z的为一个文件

如果拿到现实中,要统计这么多单词并且按照a-p,p-z分区,会拿出两张纸,一张纸记录a-p的、另一张纸记录p-z的,但是300M的文件太大了,自己一个人肯定完不成,所以找来两个人来帮忙,他们都拿出两张纸按照同样的方法进行统计,到最后他们各自会拿出满满的两页纸,上面记录着各个单词,每个单词出现一次就记录为(hadoop->1)这样的键值对。这个就是Map阶段,他们三个人每个人做的都是MapTask任务。

上面的任务做完了,就需要专门派两个人来统计a-pp-z开头的单词,把相同的单词后面数字相加,这样就得到了最后的结果。这个阶段是Reduce聚合阶段,这两个人做的是ReduceTask任务。

换成专业的说法,看下图:

综上所述:

  • 分布式的运算程序王王需要分成至少两个阶段
  • 第一个阶段的MapTask并发实例,完全并行运行,互不相干
  • 第二个阶段的ReduceTask并发实例互不相干,但是他们依赖于MapTask的结果
  • MapReduce编程模型只能包含一个Map阶段和一个Reduce阶段,如果用户的业务逻辑非常复杂,那么只好编写多个MapReduce程序,但是这些都是串行执行的

MapReduce进程

一个完整的MapReduce程序在分布式运行时有三类实例进程:

  1. MrAppMaster:负责整个程序的过程调度及状态协调。
  2. MapTask:负责Map阶段的整个数据处理流程。
  3. ReduceTask:责Reduce阶段的整个数据处理流程。

官方WordCount源码

WordCount.java

常用数据序列化类型

看一下WordCount代码:

从上面的代码中,我们可以看到有很多之前没有见过的数据类型,这些类型都是Hadoop自己的类型,下表总结了Java类型与Hadoop数据类型的对比:

可以发现除了String对应的是Text,其他的类型只不过是在最后加了关键字Writable,所以Hadoop的数据类型还是很好记忆与掌握的

MapReduce编程规范

从上面的案例代码中可以看到整个WordCount程序分为了三个部分(MapperReducerDriver),下面把他们的方法签名都抽取出来:

1
2
3
public static void main(String[] args)
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable>
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>

其中main对应的是Driver阶段;IntSumReducer对应的是Reduce阶段,继承了Reducer类;TokenizerMapper对应的是Map阶段,继承了Mapper

可以看到继承的类后面跟了很多的泛型,接下来逐个击破!

Mapper阶段

  • 用户自定义的Mapper要继承自己的父类,即继承了Mapper
  • Mapper后面跟的泛型,前两个是一个k-v键值对(用户可自定义),对应的是输入数据
  • Mapper的输出数据也是一个K-V键值对,对应的是后面两个泛型
  • Mapper中的业务逻辑写在map()方法中,map()即MapTask进程方法对每一个k-v调用一次,看下图:

Reducer阶段

  • 用户自定义的Reducer要继承自己的父类Reducer
  • Reducer的输入数据类型对应Mapper的输出数据类型,也是K-V键值对,如下图:
  • Reducer的业务逻辑写在reduce()方法中,ReduceTask进程对每一组相同的kk-v组调用一次reduce()方法

Driver阶段

相当于YARN集群的客户端,用于提交整个程序到YARN集群,提交的是封装了MapReduce程序相关运行参数的job对象。

WordCount案例

需求分析

统计给定文本中每一个单词出现的个数

准备的文本数据:

1
2
3
4
5
6
7
atguigu atguigu
ss ss
cls cls
jiao
banzhang
xue
hadoop

期望的输出数据:

1
2
3
4
5
6
7
atguigu	2
banzhang 1
cls 2
hadoop 1
jiao 1
ss 2
xue 1

根据MapReduce的编程规范,应该将该程序分为三个部分:

  • Mapper类:负责数据的拆分
    • 将内容先转换为String
    • 使用String.split()将这一行的数据切分成单词
    • 将单词封装为<xue,1>
  • Reducer类:汇总数据,即聚合,统计出出现次数
    • 汇总各个key的个数
    • 输出该key的总次数
  • Driver类:固定的七大步套路
    • 获取Job
    • 设置本程序jar包所在的本地路径
    • 关联MapperReducer
    • 指定Mapper输出数据的k-v类型
    • 指定最终输出的数据的k-v类型
    • 指定job输入与输出文件路径
    • 提交作业

环境准备

创建三个类WordCountMapperWordCountReducerWordCountDriver

pom.xml新增如下:

1
2
3
4
5
6
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-mapreduce-client-core -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>3.3.5</version>
</dependency>

编写程序

Mapper类

在下面的编写代码中,切记不要导错包,一定要导mapreduce包下的包,不要导入mapred

该类需要继承Mapper类,我们先写上:

1
2
3
4
5
6
7
8
/**
* KEYIN:map阶段输入的key的类型 LongWritable
* VALUEIN:map阶段输入的value的类型 Text
* KEYOUT:map阶段输出的key的类型 Text
* VALUEOUT:map阶段输出的value的类型 IntWritable
*/
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
}

按住ctrl+b,点击Mapper查看源码:

首先看Contxt这个抽象内部类:

然后找到run方法,这个方法是用于执行Mapper操作的:

在源码中setupcleanup都是空方法,什么都不做:

编写Mapper类代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

private Text outK = new Text();
private IntWritable outV = new IntWritable(1);

@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
// 1、获取 行
String line = value.toString();

// 2、将“行”的数据按照空格分隔为字符串数组
String[] words = line.split(StrUtil.SPACE);

// 3、循环遍历,往外写出
for (String word : words) {
// 设置K
outK.set(word);
// 写出,因为传递的是Hadoop的数据类型,需要先对String转义
context.write(outK, outV);
}
}
}
Reducer类

Reducer源码

该类需要继承Reducer类,先写上:

1
2
3
4
5
6
7
8
/**
* KEYIN:reducer阶段输入的key的类型 Text
* VALUEIN:reducer阶段输入的value的类型 IntWritable
* KEYOUT:reducer阶段输出的key的类型 Text
* VALUEOUT:reducer阶段输出的value的类型 IntWritable
*/
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
}

按住ctrl+B,点击Reducer,查看源码:

run方法:

编写Reducer类

该类需要继承Reducer,同样的他也需要加四个泛型,两个k-v键值对

第一个键值对,对应的是Mapper的输出类型,应该填:Text,IntWritable

第二个键值对,是最终输出的类型,这里填写我们期望输出数据的类型:Text,IntWritable

1
public class WordCountReducer extends Reducer<Text, IntWritable,Text,IntWritable>

这个类需要重写reduce方法,直接输入reduce按下回车:

在这个方法里面,我们只需要做两件事:循环遍历Iterable<IntWritable>集合,把值相加;最终通过Context把值输出,这里同样做一次变量提升:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable outK = new IntWritable();

@Override
protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
outK.set(sum);

context.write(key, outK);
}
}
Driver类

前文的需求分析,已经提到了七大步,这里直接给出代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
public class WordCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

// 手动设置Hadoop环境变量
SystemUtil.set("HADOOP_HOME", "C:\\Users\\XiaoYu\\Desktop\\hadoop\\hadoop-3.2.2");
SystemUtil.set("hadoop.home.dir", SystemUtil.get("HADOOP_HOME"));

// 1、获取Job
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);

// 2、设置jar包路径
job.setJarByClass(WordCountDriver.class);

// 3、关联Mapper和Reducer
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);

// 4、设置map输出的K-V类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);

// 5、设置最终输出K-V类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

// 6、设置输入/输出路径
// 一定要导入正确的包
String inputPath = "C:\\Users\\XiaoYu\\Downloads\\input";
String outputPath = "C:\\Users\\XiaoYu\\Downloads\\output";

runningCheck(inputPath, outputPath);

FileInputFormat.setInputPaths(job, new Path(inputPath));
FileOutputFormat.setOutputPath(job, new Path(outputPath));

// 7、提交Job
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}

private static void runningCheck(String inputPath, String outputPath) {
if (FileUtil.exist(outputPath)) {
FileUtil.del(outputPath);
}

File file = FileUtil.file(inputPath);
if (!file.exists()) {
File inpputFile = FileUtil.touch(inputPath + "\\input.txt");
String input = "atguigu atguigu\n" +
"ss ss\n" +
"cls cls\n" +
"jiao\n" +
"banzhang\n" +
"xue\n" +
"hadoop";
FileWriter.create(inpputFile).write(input);
}
}
}
查看结果
  • ._SUCCESS.crc:校验文件
  • .part-r-00000.crc:校验文件
  • _SUCCESS:标志成功
  • part-r-00000:真实数据输出文件

part-r-00000文件内容:

1
2
3
4
5
6
7
atguigu	2
banzhang 1
cls 2
hadoop 1
jiao 1
ss 2
xue 1

Hadoop序列化

序列化概述

什么是序列化

序列化就是把内存中的对象,转换成字节序列,然后用户存储到磁盘或网络传输

反序列化就是把收到的序列化序列,转换成内存的对象,两者是相反的

一方面,因为MapReduce在生产环境中是分布式工作的,那么就避免不了内存中的数据在网络上的传输,使用序列化是为了更加方便的传输

另一方面,Hadoop提供的序列化的数据类型太少, 并不能满足生产的需求。比如有时候期望输出的是一组数据,那么就需要一个bean对象存储这些基本的数据类型,这时候把bean实现序列化,放在MapperReducer的泛型上就好了

Java序列化机制与Hadoop序列化的对比

Java序列化机制是由Serializable实现的,它的序列化会附加各种校验信息、头信息、继承体系等,如果用于网络传输,那么要传的数据太大了

Hadoop序列化机制只附加了简单校验信息,数据量小,有利于网络传输;如下图所示:

Java序列化机制与Hadoop序列化的对比

Hadoop序列化的特点:

  • 紧凑:高效使用存储空间
  • 快速:读写数据的额外开销小
  • 互操作性强:支持多语言的交互

自定义bean对象实现序列化(writable)

  • Bean对象要实现Writable接口
  • 重写接口中的序列化和反序列化方法,值得注意的是:序列化的顺序要和反序列化的顺序完全一致
  • 必须要由空参构造方法
  • 重写toString方法,用\t把数据分开
  • Hadoopkey自带排序,如果把自定义的bean对象放在key中传输,则还需要实现Comparable接口,因为MapReduce框中的Shuffle过程要求对key必须能排序

序列化案例实操

需求分析

统计每一个手机号耗费的总上行流量、总下行流量、总流量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
1	13736230513		192.196.100.1	www.atguigu.com		2481	24681	200
2 13846544121 192.196.100.2 264 0 200
3 13956435636 192.196.100.3 132 1512 200
4 13966251146 192.168.100.1 240 0 404
5 18271575951 192.168.100.2 www.atguigu.com 1527 2106 200
6 84188413 192.168.100.3 www.atguigu.com 4116 1432 200
7 13590439668 192.168.100.4 1116 954 200
8 15910133277 192.168.100.5 www.hao123.com 3156 2936 200
9 13729199489 192.168.100.6 240 0 200
10 13630577991 192.168.100.7 www.shouhu.com 6960 690 200
11 15043685818 192.168.100.8 www.baidu.com 3659 3538 200
12 15959002129 192.168.100.9 www.atguigu.com 1938 180 500
13 13560439638 192.168.100.10 918 4938 200
14 13470253144 192.168.100.11 180 180 200
15 13682846555 192.168.100.12 www.qq.com 1938 2910 200
16 13992314666 192.168.100.13 www.gaga.com 3008 3720 200
17 13509468723 192.168.100.14 www.qinghua.com 7335 110349 404
18 18390173782 192.168.100.15 www.sogou.com 9531 2412 200
19 13975057813 192.168.100.16 www.baidu.com 11058 48243 200
20 13768778790 192.168.100.17 120 120 200
21 13568436656 192.168.100.18 www.alibaba.com 2481 24681 200
22 13568436656 192.168.100.19 1116 954 200

Map阶段:

  • 需要给四个泛型,前两个泛型就是固定的偏移量和这一行的数据,即LongWritable,Text
  • 后两个泛型用手机号表示key,自定义bean实现序列化接口表示上行流量、下行流量、总流量
  • 在此阶段,首先需要读取一行数据,按照\t切分数据,抽取三个数据,再封装到context

Reduce阶段:

  • 同样需要给四个泛型,前两个与Map保持一致
  • 后两个泛型与我们期望的输出数据也一致
  • 在此阶段,只需要累加上、下行流量,得到总流量,最后封装就好了

代码实操

自定义bean对象,写入上行、下行、总流量属性,并添加setter与getter方法:此处使用Lombok

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package com.hadoop.hdfs.hdfsclinet.wirtable;

import cn.hutool.core.util.StrUtil;
import lombok.Data;
import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
* @author XiaoYu
* @description 1、定义类实现writable接口 2、重写序列化和反序列化方法 3、重写空参构造 4、toString方法
* @since 2023/4/11 8:37
*/
@Data
public class FlowBean implements Writable {
private long upFlow;
private long downFlow;
private long sumFlow;

public void setSumFlow() {
this.sumFlow = upFlow + downFlow;
}


@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}

@Override
public void readFields(DataInput in) throws IOException {
this.upFlow = in.readLong();
this.downFlow = in.readLong();
this.sumFlow = in.readLong();
}

@Override
public String toString() {
return upFlow + StrUtil.TAB + downFlow + StrUtil.TAB + sumFlow;
}
}

Mapper、Reducer、Driver编写

Mapper类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean> {

private final Text outK = new Text();
private final FlowBean outV = new FlowBean();

@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException {
//1、获取一行数据
//1 13736230513 192.196.100.1 www.atguigu.com 2481 24681 200
String line = value.toString();

//2、切分数据,为一个字符串数组
//1,13736230513,192.196.100.1,www.atguigu.com,2481,24681,200
String[] split = line.split(StrUtil.TAB);

//3、封装数据
//手机号:13736230513
//上行流量:2481
//下行流量:24681
outK.set(split[1]);
outV.setUpFlow(Long.parseLong(split[split.length - 3]));
outV.setDownFlow(Long.parseLong(split[split.length - 2]));
outV.setSumFlow();

//4、提交数据
context.write(outK, outV);
}
}

Reducer类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class FlowReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
private final FlowBean outV = new FlowBean();
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Reducer<Text, FlowBean, Text, FlowBean>.Context context) throws IOException, InterruptedException {
//1、循环遍历values,累加上下行流量
long totalUpFlow = 0;
long totalDownFlow = 0;
for (FlowBean value : values) {
totalUpFlow += value.getUpFlow();
totalDownFlow += value.getDownFlow();
}

//2、封装数据
outV.setUpFlow(totalUpFlow);
outV.setDownFlow(totalDownFlow);
outV.setSumFlow();

//3、写数据
context.write(key, outV);
}
}

Driver类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
public class FlowDriver {

@SneakyThrows
public static void main(String[] args) {
// 手动设置Hadoop环境变量
SystemUtil.set("HADOOP_HOME", "C:\\Users\\XiaoYu\\Desktop\\hadoop\\hadoop-3.2.2");
SystemUtil.set("hadoop.home.dir", SystemUtil.get("HADOOP_HOME"));

// 1、获取Job
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);

// 2、设置jar包路径
job.setJarByClass(FlowDriver.class);

// 3、关联Mapper和Reducer
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);

// 4、设置map输出的K-V类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);

// 5、设置最终输出K-V类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);

// 6、设置输入/输出路径
// 一定要导入正确的包
String inputPath = "C:\\Users\\XiaoYu\\Downloads\\inputFlow";
String outputPath = "C:\\Users\\XiaoYu\\Downloads\\outputFlow";

runningCheck(inputPath, outputPath);

FileInputFormat.setInputPaths(job, new Path(inputPath));
FileOutputFormat.setOutputPath(job, new Path(outputPath));

// 7、提交Job
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}

private static void runningCheck(String inputPath, String outputPath) {
if (FileUtil.exist(outputPath)) {
FileUtil.del(outputPath);
}

File file = FileUtil.file(inputPath);
if (!file.exists()) {
File inpputFile = FileUtil.touch(inputPath + "\\phone_data.txt");
String input = "1\t13736230513\t192.196.100.1\twww.atguigu.com\t2481\t24681\t200\n" +
"2\t13846544121\t192.196.100.2\t264\t0\t200\n" +
"3 \t13956435636\t192.196.100.3\t132\t1512\t200\n" +
"4 \t13966251146\t192.168.100.1\t240\t0\t404\n" +
"5 \t18271575951\t192.168.100.2\twww.atguigu.com\t1527\t2106\t200\n" +
"6 \t84188413\t192.168.100.3\twww.atguigu.com\t4116\t1432\t200\n" +
"7 \t13590439668\t192.168.100.4\t1116\t954\t200\n" +
"8 \t15910133277\t192.168.100.5\twww.hao123.com\t3156\t2936\t200\n" +
"9 \t13729199489\t192.168.100.6\t240\t0\t200\n" +
"10 \t13630577991\t192.168.100.7\twww.shouhu.com\t6960\t690\t200\n" +
"11 \t15043685818\t192.168.100.8\twww.baidu.com\t3659\t3538\t200\n" +
"12 \t15959002129\t192.168.100.9\twww.atguigu.com\t1938\t180\t500\n" +
"13 \t13560439638\t192.168.100.10\t918\t4938\t200\n" +
"14 \t13470253144\t192.168.100.11\t180\t180\t200\n" +
"15 \t13682846555\t192.168.100.12\twww.qq.com\t1938\t2910\t200\n" +
"16 \t13992314666\t192.168.100.13\twww.gaga.com\t3008\t3720\t200\n" +
"17 \t13509468723\t192.168.100.14\twww.qinghua.com\t7335\t110349\t404\n" +
"18 \t18390173782\t192.168.100.15\twww.sogou.com\t9531\t2412\t200\n" +
"19 \t13975057813\t192.168.100.16\twww.baidu.com\t11058\t48243\t200\n" +
"20 \t13768778790\t192.168.100.17\t120\t120\t200\n" +
"21 \t13568436656\t192.168.100.18\twww.alibaba.com\t2481\t24681\t200\n" +
"22 \t13568436656\t192.168.100.19\t1116\t954\t200\n";
FileWriter.create(inpputFile).write(input);
}
}
}

查看结果:part-r-00000文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
13470253144	180	180	360
13509468723 7335 110349 117684
13560439638 918 4938 5856
13568436656 3597 25635 29232
13590439668 1116 954 2070
13630577991 6960 690 7650
13682846555 1938 2910 4848
13729199489 240 0 240
13736230513 2481 24681 27162
13768778790 120 120 240
13846544121 264 0 264
13956435636 132 1512 1644
13966251146 240 0 240
13975057813 11058 48243 59301
13992314666 3008 3720 6728
15043685818 3659 3538 7197
15910133277 3156 2936 6092
15959002129 1938 180 2118
18271575951 1527 2106 3633
18390173782 9531 2412 11943
84188413 4116 1432 5548

MapReduce 框架原理

InputFormat数据输入

切片与MapTask并行度决定机制

MapTask的并行度决定Map阶段的任务处理并发读,进而影响到整个Job的处理速度,引入两个概念:

  • 数据块:BlockHDFS物理上把数据分成一块一块,数据块是HDFS存储数据单位
  • 数据切片: 只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。数据切片是MapReduce程序计算输入数据的单位,一个切片会对应启动一个MapTask

Job提交流程

提交一个Job要经过:

  • 建立连接connect()
    • 在这里会判断该Job是本地运行环境还是YARN集群运行环境
  • 提交JobsubmitJobInternal()
    • 创建给集群提交数据的Stag路径—getStagingDir()
    • 获取JobId,并创建Job路径—getNewJobID()
    • 拷贝jar包到集群—copyAndConfigureFiles()uploadFiles
    • 计算切片,生成切片的规划文件—writeSplits
    • Stag路径写XML配置文件—writreXml()
    • 最后提交Job,返回提交状态

切片执行流程解析

org.apache.hadoop.mapreduce.JobSubmitter#writeSplits

  1. 程序先找到你数据存储的目录。

  2. 开始遍历处理(规划切片)目录下的每一个文件

  3. 遍历第一个文件input.txt

    • 获取文件大小fs.sizeOf(input.txt)

    • 计算切片大小

      org.apache.hadoop.mapreduce.lib.input.FileInputFormat#computeSplitSize

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      /**
      * @param blockSize 本地模式32M 集群模式128M
      * @param minSize 1
      * @param maxSize Long的最大值
      * @return 需要的切片大小
      */
      protected long computeSplitSize(long blockSize, long minSize,
      long maxSize) {
      return Math.max(minSize, Math.min(maxSize, blockSize));
      }
    • 默认情况下,切片大小=blocksize

    • 开始切,形成第1个切片:input.txt0:128M、第2个切片input.txt一128:256M、第3个切片input.txt一256M:300M每次切片时,都要判断切完剩下的部分是否大于块的1.1倍,不大于1.1倍就划分一块切片)

    • 将切片信息写到一个切片规划文件中

    • 整个切片的核心过程在org.apache.hadoop.mapreduce.InputFormat#getSplits方法中完成

    • InputSplit只记录了切片的元数据信息,比如起始位置、长度以及所在的节点列表等。

  4. 提交切片规划文件到YARN上,YARN上的MrAppMaster就可以根据切片规划文件计算开启MapTask个数

FileInputFormat切片机制

切片机制:

  • 简单地按照文件的内容长度进行切片
  • 切片的大小默认等于Block大小
  • 切片时不考虑数据集整体,而是逐个针对每一个文件单独切片

比如输入数据有两个文件:

1
2
3
file1.txt			320M
file2.txt 10M

经过FileInputFormat的切片机制运算后,形成的切片信息如下:

1
2
3
4
5
file1.txt.split1			0~128
file1.txt.split2 128~256
file1.txt.split3 256~320
file2.txt.split1 0~19

源码中计算切片大小的公式:

1
2
3
4
Math.max(minSize,Math.max(maxSize,blockSize));
mapreduce.input.fileinputformat.split.minsize=1 // 默认值为1
mapreduce.input.fileinputformat.split.maxsize=Long.MAXValue // 默认值Long.MAXValue

默认情况下,切片大小=blocksize

切片大小设置:

  • maxsize(切片最大值):参数如果调的比blockSize小,则会让切片变小,而且就等于配置的这个参数值
  • minsize(切片最小值):参数调的比blockSize大,则可以让切片变得比blockSize还大

获取切片信息API:

1
2
3
4
5
//获取切片的文件名称
String name = inputSplit.getPath().getName();
//根据文件类型获取切片信息
FileSplit inputSplit = (FileSplit) context.getInputSplit();

TextInputFormat

在运行MapReuce程序时,输入的文件格式包括:基于行的日志文件、二进制格式文件、数据库表等。针对不同的数据类型,MapReduce给用户提供了很多的接口

FileInputFormat常见的实现类包括:TextInputFormatKeyValueTextInputFormatNLineInputFormatCombineTextInputFormat和自定义InputFormat

TextInputFormat 是默认的 FileInputFormat 实现类。按行读取每条记录。键是存储该行在整个文件中的起始字节偏移量, LongWritable 类型。值是这行的内容,不包括任何行终止 符(换行符和回车符),Text 类型。

以下是一个示例,比如,一个分片包含了如下 4 条文本记录。

1
2
3
4
5
Rich learning form
Intelligent learning engine
Learning more convenient
From the real demand for more close to the enterprise

每条记录表示为以下键/值对:

1
2
3
4
5
(0,Rich learning form)
(20,Intelligent learning engine)
(49,Learning more convenient)
(74,From the real demand for more close to the enterprise)

CombineTextInputFormat切片机制

框架默认的TextInputFormat切片机制是对任务按文件规划切片,不管文件多小,都会是一个单独的切片,都会交给一个MapTask,这样如果有大量小文件,就会产生大量的
MapTask,处理效率极其低下。

  1. 应用场景:

    CombineTextInputFormat用于小文件过多的场景,它可以将多个小文件从逻辑上规划到一个切片中,这样,多个小文件就可以交给一个MapTask处理。

  2. 虚拟存储切片最大值设置

    CombineTextInputFormat.setMaxInputSplitSize(job,4194304); //4m

    注意:虚拟存储切片最大值设置最好根据实际的小文件大小情况来设置具体的值。

  3. 切片机制

    生成切片过程包括:虚拟存储过程和切片过程二部分。

  1. 虚拟存储过程:

    ​ 将输入目录下所有文件大小,依次和设置的setMaxInputSplitSize值比较,如果不大于设置的最大值,逻辑上划分一个块。如果输入文件大于设置的最大值且大于两倍,
    那么以最大值切割一块;当剩余数据大小超过设置的最大值且不大于最大值2倍,此时将文件均分成2个虚拟存储块(防止出现太小切片)。

    ​ 例如setMaxInputSplitSize值为4M,输入文件大小为8.02M,则先逻辑上分成一个4M。剩余的大小为4.02M,如果按照4M逻辑划分,就会出现0.02M的小的虚拟存储文件,所以将剩余的4.02M文件切分成(2.01M和2.01M)两个文件。

  2. 切片过程:

    • 判断虚拟存储的文件大小是否大于setMaxInputSplitSize值,大于等于则单独形成一个切片。

    • 如果不大于则跟下一个虚拟存储文件进行合并,共同形成一个切片。

    • 测试举例:有4个小文件大小分别为1.7M、5.1M、3.4M以及6.8M这四个小文件,则虚拟存储之后形成6个文件块,大小分别为:

      1.7M,(2.55M、2.55M),3.4M以及(3.4M、3.4M)

      最终会形成3个切片,大小分别为:

      (1.7+2.55)M,(2.55+3.4)M,(3.4+3.4)M


Hadoop
https://xiaoyu72.com/articles/b3349d42/
Author
XiaoYu
Posted on
April 4, 2023
Updated on
August 28, 2023
Licensed under