Hadoop
Hadoop介绍
Hadoop是什么
- Hadoop是一个由Apache基金会所开发的分布式系统基础架构。
- 主要解决,海量数据的存储和海量数据的分析计算问题。
- 广义上来说,Hadoop通常是指一个更广泛的概念–Hadoop生态圈。
Hadoop三大发行版本
Hadoop三大发行版本:Apache、Cloudera、Hortonworks。
- Apache版本最原始(最基础)的版本,对于入门学习最好。2006
- Cloudera内部集成了很多大数据框架,对应产品CDH。2008
- Hortonworks文档较好,对应产品HDP。2011
- Hortonworks现在已经被Cloudera公司收购,推出新的品牌CDP。
Hadoop优势
- 高可靠性:Hadoop底层维护多个数据副本,所以即使Hadoop某个计算元素或存储出现故障,也不会导致数据的丢失。
- 高扩展性:在集群间分配任务数据,可方便的扩展数以干计的节点。
- 高效性:在MapReduce的思想下,Hadoop是并行工作的,以加快任务处理速度。
- 高容错性:能够自动将失败的任务重新分配。
Hadoop组成
1.X
- MapReduce(计算+资源调度)
- HDFS(数据存储)
- Common(辅助工具)
2.X
- MapReduce(计算)
- Yarn(资源调度) - CPU和Free
- HDFS(数据存储)
- Common(辅助工具)
在Hadoop1.x时代,Hadoop中的MapReduce同时处理业务逻辑运算和资源的调度,耦合性较大。在Hadoop22.x时代,增
加了Yarn。Yarn只负责资源的调度,MapReduce只负责运算。
Hadoop3.x在组成上没有变化。
HDFS架构概述
Hadoop Distributed File System
,简称HDFS,是一个分布式文件系统。
NameNode
(n):存储文件的元数据,如文件名,文件目录结构,文件属性(生成时间、副本数、文件权限),以及每个文件的块列表和块所在的DataNode等。DataNode
(n):在本地文件系统存储文件块数据,以及块数据的校验和。Secondary NameNode
(2min):每隔一段时间对NameNode元数据备份。
YARN架构概述
Yet Another Resource Negotiator简称YARN,另一种资源协调者,是Hadoop的资源管理器。
ResourceManager
(RM):整个集群资源(内存、CPU等)的老大NodeManager
(NM):单个节点服务器资源老大ApplicationMaster
(AM):单个任务运行的老大Container
:容器,相当一台独立的服务器,里面封装了任务运行所需要的资源,如内存、CPU、磁盘、网络等。说明1:客户端可以有多个
说明2:集群上可以运行多个ApplicationMaster
说明3:每个NodeManager上可以有多个Container
MapReduce架构概述
MapReduce将计算过程分为两个阶段:Map和Reduces
Map阶段并行处理输入数据
Reduce阶段对Map结果进行汇总
HDFS、YARN、MapReduce三者关系
大数据技术生态体系
图中涉及的技术名词解释如下:
- Sqoop:Sqoop是一款开源的工具,主要用于在Hadoop、Hive与传统的数据库(MySQL)间进行数据的传递,可以将一个关系型数据库(例如:MySQL,Oracle等)中的数据导进到Hadoop的HDFS中,也可以将HDFS的数据导进到关系型数据库中。
- Flume:Flume是一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;
- Kafka:Kafka是一种高吞吐量的分布式发布订阅消息系统;
- Spark:Spark是当前最流行的开源大数据内存计算框架。可以基于Hadoop上存储的大数据进行计算。
- Flinkk:Flink是当前最流行的开源大数据内存计算框架。用于实时计算的场景较多。
- Oozie:Oozie是一个管理Hadoop作业(job)的工作流程调度管理系统。
- HBase:HBase是一个分布式的、面向列的开源数据库。HBase不同于一般的关系数据库,它是一个适合于非结构化数据存储的数据库。
- Hive:Hive是基于Hadoop的一个数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供简单的SQL查询功能,可以将SQL语句转换为MapReduce任务进行运行。其优点是学习成本低,可以通过类SQL语句快速实现简单的MapReduce统计,不必开发专门的MapReduce应用,十分适合数据仓库的统计分析。
- ZooKeeper:它是一个针对大型分布式系统的可靠协调系统,提供的功能包括:配置维护、名字服务、分布式同步、组服务等。
推荐系统架构图
Hadoop运行环境搭建
模板虚拟机环境准备
安装模板虚拟机,IP地址192.168.10.100、主机名称hadoop100、内存4G、硬盘50G
安装
epel-release
注:Extra Packages for Enterprise Linux是为“红帽系”的操作系统提供额外的软件包,适用于RHEL、CentOS和Scientific Linux。相当于是一个软件仓库,大多数rpm包在官方repository中是找不到的)1
yum install -y epel-release
注意:如果Linux安装的是最小系统版,还需要安装如下工具;如果安装的是Linux桌面标准版,不需要执行如下操作
net-tool:工具包集合,包含
ip addr
等命令1
yum install -y net-tools
vim 编辑器
1
yum install -y vim
关闭防火墙,关闭防火墙开机自启
1
2
3systemctl status firewalld.service # 查看当前防火墙是否开启
systemctl stop firewalld.service # 关闭防火墙
systemctl disable firewalld.service # 禁用防火墙注意:在企业开发时,通常单个服务器的防火墙时关闭的。公司整体对外会设置非常安全的防火墙
创建
hadoop
用户,并修改hadoop
用户的密码为hadoop
1
2useradd hadoop
passwd hadoop配置
hadoop
用户具有root权限,方便后期加sudo执行root权限的命令1
2
3
4vim /etc/sudoers
# 在%wheel ALL=(ALL) ALL后面添加下面这段
hadoop ALL=(ALL) NOPASSWD:ALL注意:hadoop这一行不要直接放到root行下面,因为所有用户都属于wheel组,你先配置了hadoop具有免密功能,但是程序执行到%wheel行时,该功能又被覆盖回需要密码。所以hadoop要放到
%wheel
这行下面。在
/opt
目录下创建文件夹,并修改所属主和所属组在/opt目录下创建module、software文件夹
1
sudo mkdir module software
修改所属组
1
sudo chown hadoop:hadoop /opt/module/ /opt/software/
御载虚拟机自带的JDK(最小化安装不需要)
1
rpm -ga | grep -i java | xargs -n1 rpm -e --nodeps
rpm -ga
:查询所安装的所有rpm软件包grep -i
:忽略大小写xags -n1
:表示每次只传递一个参数rpm -e --nodeps
:强制卸载软件
重启虚拟机
1
reboot
Hadoop运行模式
- Hadoop官方网站:https://hadoop.apache.org
- Hadoop运行模式包括:本地模式、伪分布式模式以及完全分布式模式。
- 本地模式:单机运行,只是用来演示一下官方案例。生产环境不用。
- 伪分布式模式:也是单机运行,但是具备Hadoop集群的所有功能,一台服务器模拟一个分布式的环境。个别缺钱的公司用来测试,生产环境不用。
- 完全分布式模式:多台服务器组成分布式环境。生产环境使用。
本地运行模式(官方WordCount)
创建在hadoop-3.3.5文件下面创建一个wcinput文件夹
1
mkdir wcinput
在wcinput文件下创建一个word.txt文件
1
cd wcinput
编辑word.txt文件
1
vim word.txt
在文件中输入如下内容
1
2
3
4
5ss ss
cls cls
banzhang
bobo
yangge执行一下命令(
/opt/module/hadoop-3.3.5
下执行)1
bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-3.3.5.jar wordcount wcinput/ ./wcoutput/
查看结果
1
2
3
4
5
6
7cat wcoutput/part-r-00000
banzhang 1
bobo 1
cls 2
ss 2
yangge 1
完全分布式运行模式(开发重点)
虚拟机准备
准备3台客户机(关闭防火墙、静态IP、主机名称)
安装JDK
配置环境变量
1
2
3
4[hadoop@hadoop102 hadoop-3.3.5]$ cat /etc/profile.d/jdk.sh
# JAVA_HOME
export JAVA_HOME=/opt/module/jdk1.8.0_361
export PATH=$PATH:$JAVA_HOME/bin安装Hadoop
配置环境变量
1
2
3
4
5
6[hadoop@hadoop102 hadoop-3.3.5]$ cat /etc/profile.d/hadoop.sh
# hadoop
export HADOOP_HOME=/opt/module/hadoop-3.3.5
export PATH=$PATH:$HADOOP_HOME/bin
export PATH=$PATH:$HADOOP_HOME/sbin配置集群
单点启动
配置ssh
群起并测试集群
编写集群分发脚本xsync
scp(secure copy)安全拷贝
scp定义
scp可以实现服务器与服务器之间的数据拷贝。(from server1 to server2)
基本语法
1
2scp -r $pdir/$fname $user@$host:$pdir/$fname
# 命令 递归 要拷贝的文件路径/名称 目的地用户@主机:目的地路径/名称案例实操
前提:在hadoop102、hadoop103、hadoop104都已经创建好的
/opt/module
、opt/software
两个目录,并且已经把这两个目录修改为hadoop:hadoop
1
sudo chown hadoop:hadoop -R /opt/module
在hadoop102上,将hadoop102中
/opt/module/jdk1.8.0_361
目录拷贝到hadoop103上1
scp -r /opt/module/jdk1.8.0_361 hadoop@hadoop103:/opt/module
在hadoop103上,将hadoop102中
/opt/module/hadoop-3.3.5
目录拷贝到hadoop103上1
scp -r hadoop@hadoop102:/opt/module/hadoop-3.3.5 /opt/module
在hadoop103上,将hadoop102中
/opt/module/*
目录拷贝到hadoop104上1
scp -r hadoop@hadoop102:/opt/module/* hadoop@hadoop104:/opt/module
rsync远程同步工具
rsync主要用于备份和镜像。具有速度快、避免复制相同内容和支持符号链接的优点。
rsync和scp区别:用rsync做文件的复制要比scp的速度快,rsync只对差异文件做更新,scp是把所有文件都复制过去。
注:命令不存在执行一下命令安装(三台机器都要安装
)
1 |
|
基本语法
1
2rsync -av $pdir/$fname $user@$host:$pdir/$fname
# 命令 选项参数 要拷贝的文件路径/名称 目的地用户@主机:目的地路径/名称选项参数说明
选项 功能 -a
归档拷贝 -v
显示复制过程 案例实操
删除hadoop103中
/opt/module/hadoop-3.3.5/wcinput
1
rm -rf /opt/module/hadoop-3.3.5/wcinput/ /opt/module/hadoop-3.3.5/wcoutput/
同步hadoopl102中的
/opt/module/hadoop-3.3.5
到hadoop1031
rsync -av /opt/module/hadoop-3.3.5/ hadoop@hadoop103:/opt/module/hadoop-3.3.5/
xsync集群分发脚本
循环复制文件到所有节点的相同目录下
1 |
|
1 |
|
分发JDK与Hadoop环境变量
1
sudo ~/bin/xsync /etc/profile.d/jdk.sh /etc/profile.d/hadoop.sh
分别刷新三台机器的环境变量
1
source /etc/profile
SSH免密登录
来到
hadoop102
的home目录,ls -al
查看所有的隐藏文件,可以看到.ssh
文件(访问过其他服务器才有)1
2
3
4
5cd ~/.ssh
ssh-keygen -t rsa # 三次回车,可以看到有三个文件 id_rsa.pub 公钥 id_rsa 私钥
ssh-copy-id hadoop103 # 将hadoop102的公钥拷贝到hadoop103
ssh-copy-id hadoop104 # 将hadoop102的公钥拷贝到hadoop102
ssh-copy-id hadoop102 # 允许自己访问 ssh hadoop102 不用输入密码hadoop103、hadoop104
如上操作查看三台机器
/home/hadoop/.ssh/authorized_keys
文件,可以发现三台机器都可以互相访问切换
root
用户,配置root
免密登录也大致如上操作
集群配置
集群部署规划
注意:NameNode
和SecondaryNameNode
不要安装在同一台服务器ResourceManager
也很消耗内存,不要和NameNode、SecondaryNameNode
配置在同一台机器上
hadoop102 hadoop103 hadoop104 HDFS NameNode
DateNodeDateNode SecondaryNameNode
DateNodeYARN NodeManager ResourceManager
NodeManagerNodeManager 配置文件说明
Hadoop配置文件分两类:默认配置文件和自定义配置文件,只有用户想修改某一默认配置值时,才需要修改自定义配置文件,更改相应属性值。默认配置文件:
要获取的默认文件 文件存放在Hadoop的jar包中的位置 [core-default.xml] hadoop-common-3.3.5.jar/core-default.xml [hdfs-default.xml] hadoop-hdfs-3.3.5.jar/hdfs-default.xml [yarn-default.xml] hadoop-yarn-common-3.3.5.jar/yarn-default.xml [mapred-default.xml] hadoop-mapreduce-client-core-3.3.5.jar/mapred-default.xml 自定义配置文件::
core-site.xml、hdfs-site,xml、yarn-site,xml、mapred-site.xml四个配置文件存放在
$HADOOP_HOME/etc/hadoop
这个路径上,用户可以根据项目需求重新进行修改配置。
配置集群
核心配置文件
core-site.xml
1
vim $HADOOP_HOME/etc/hadoop/core-site.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<!-- 指定NameNode的地址 -->
<property>
<name>fs.defaultFS</name>
<value>hdfs://hadoop102:8020</value>
</property>
<!-- 指定hadoop数据的存储目录 -->
<property>
<name>hadoop.tmp.dir</name>
<value>/opt/module/hadoop-3.3.5/data</value>
</property>
<!-- 配置HDFS网页登录使用的静态用户为hadoop -->
<property>
<name>hadoop.http.staticuser.user</name>
<value>hadoop</value>
</property>
</configuration>HDFS配置文件
hdfs-site.xml
1
vim $HADOOP_HOME/etc/hadoop/hdfs-site.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<!-- 指定NameNode web端访问的地址 -->
<property>
<name>dfs.namenode.http-address</name>
<value>hadoop102:9870</value>
</property>
<!-- SecondaryNameNode web端Http访问地址 -->
<property>
<name>dfs.namenode.secondary.http-address</name>
<value>hadoop104:9868</value>
</property>
</configuration>YARN配置文件
yarn-site.xml
1
vim $HADOOP_HOME/etc/hadoop/yarn-site.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<!-- 指定MapReduce走shuffle -->
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<!-- 指定ResourceManager的地址 -->
<property>
<name>yarn.resourcemanager.hostname</name>
<value>hadoop103</value>
</property>
<!-- 环境变量的继承 3.2.x 一下版本需要配置,少了HADOOP_MAPRED_HOME环境变量-->
<property>
<name>yarn.nodemanager.env-whitelist</name>
<value>JAVA_HOME,HADOOP_COMMON_HOME,HADOOP_HDES_HOME,HADOOP_CONF_DIR,CLASSPATH_PREPEND_DISTCACHE,HADOOP_YARN_HOME,HADOOP_MAPRED_HOME</value>
</property>
</configuration>MapReduce配置文件
mapred-site.xml
1
vim $HADOOP_HOME/etc/hadoop/mapred-site.xml
1
2
3
4
5
6
7
8
9<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<!-- 指定MapReduce程序运行在Yarn上 -->
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
</configuration>文件分发
1
xsync /opt/module/hadoop-3.3.5/etc/hadoop
群起集群
配置workers
1
vim /opt/module/hadoop-3.3.5/etc/hadoop/workers
1
2
3hadoop102
hadoop103
hadoop104注意:该文件中添加的内容结尾不允许有空格,文件中不允许有空行。
1
xsync /opt/module/hadoop-3.3.5/etc/hadoop
启动集群
如果集群是第一次启动,需要在
hadoop102
节点格式化NameNode(注意:格式化NameNode,会产生新的集群id,导致NameNode和DataNode的集群id不一致,集群找不到已往数据。如果集群在运行过程中报错,需要重新格式化NameNode的话,一定要先停止namenode和datanode进程,并且要删除所有机器的data和logs目录,然后再进行格式化)1
hdfs namenode -format
启动HDFS
1
sbin/start-dfs.sh
**在配置了ResourceManager的节点(hadoop103)**启动YARN
1
sbin/start-yarn.sh
Web端查看HDFS的NameNode
- 浏览器中输入:http://hadoop102:9870
- 查看HDFS上存储的数据信息
Web端查看YARN的ResourceManager
- 浏览器中输入:http://hadoop103:8088
- 查看YARN上运行的Job信息
集群测试
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26# 创建文件夹
[hadoop@hadoop102 hadoop-3.3.5]$ hadoop fs -mkdir /wcinput
# 上传word.txt文件到刚刚创建的文件夹 http://hadoop102:9870/explorer.html#/wcinput
[hadoop@hadoop102 hadoop-3.3.5]$ hadoop fs -put wcinput/word.txt /wcinput
# 统计词数
[hadoop@hadoop102 hadoop-3.3.5]$ hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-3.3.5.jar wordcount /input /output
# 传递一个大的文件jdk
[hadoop@hadoop102 hadoop-3.3.5]$ hadoop fs -put /opt/software/jdk-8u361-linux-x64.tar.gz /wcinput
# 查看HDFS文件存储的路径
[hadoop@hadoop102 subdir0]$ pwd
/opt/module/hadoop-3.3.5/data/dfs/data/current/BP-1561999921-192.168.10.102-1680530974017/current/finalized/subdir0/subdir0
# 查看HDFS在磁盘存储的内容
[hadoop@hadoop102 subdir0]$ cat blk_1073741825
ss ss
cls cls
banzhang
bobo
yangge
# 拼接文件
[hadoop@hadoop102 subdir0]$ cat blk_1073741826 >> tmp.tar.gz
[hadoop@hadoop102 subdir0]$ cat blk_1073741827 >> tmp.tar.gz
# 解压文件,发现是刚刚的jdk
[hadoop@hadoop102 subdir0]$ tar -zvxf tmp.tar.gz
配置历史服务器
为了查看程序的历史运行情况,需要配置一下历史服务器。具体配置步骤如下:
配置
mapred-site.xml
1
vim $HADOOP_HOME/etc/hadoop/mapred-site.xml
在该文件里面增加如下配置。
1
2
3
4
5
6
7
8
9
10<!-- 历史服务器端地址 -->
<property>
<name>mapreduce.jobhistory.address</name>
<value>hadoop102:10020</value>
</property>
<!-- 历史服务器web端地址 -->
<property>
<name>mapreduce.jobhistory.webapp.address</name>
<value>hadoop102:19888</value>
</property>分发配置
1
xsync $HADOOP_HOME/etc/hadoop/mapred-site.xml
在hadoop102启动历史服务器
1
bin/mapred --daemon start historyserver
查看历史服务器是否启动
1
jps
配置日志的聚集
日志聚集概念:应用运行完成以后,将程序运行日志信息上传到HDFS系统上。
日志聚集功能好处:可以方便的查看到程序运行详情,方便开发调试。
注意:开启日志聚集功能,需要重新启动NodeManager、ResourceManager和HistoryServer
开启日志聚集功能具体步骤如下:
配置
yarn-site.xml
1
vim $HADOOP_HOME/etc/hadoop/yarn-site.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15<!-- 开启日志聚集功能 -->
<property>
<name>yarn.log-aggregation-enable</name>
<value>true</value>
</property>
<!-- 二设置日志聚集服务器地址 -->
<property>
<name>yarn.log.server.url</name>
<value>http://hadoop102:19888/jobhistory/logs</value>
</property>
<!-- 三设置日志保留时间为7天 -->
<property>
<name>yarn.log-aggregation.retain-seconds</name>
<value>604800</value>
</property>文件分发
1
xsync $HADOOP_HOME/etc/hadoop/yarn-site.xml
重启历史服务器
1
2mapred --daemon stop historyserver
mapred --daemon start historyserver重启
Yarn
服务器(hadoop103)1
2sbin/stop-yarn.sh
sbin/start-yarn.sh
集群启动/停止方式总结
各个模块分开启动/停止(配置ssh是前提)常用
整体启动/停止HDFS
1
start-dfs.sh/stop-dfs.sh
整体启动/停止YARN
1
start-yarn.sh/stop-yarn.sh
各个服务组件逐一启动/停止
分别启动/停止HDFS组件
1
hdfs --daemon start/stop namenode/datanode/secondarynamenoded
启动/停止YARN
1
yarn --daemon start/stop resourcemanager/nodemanager
编写Hadoop集群常用脚本
Hadoop集群启停脚本(包含
HDFS
,Yarn
,Historyserver
): hadoop_shell1
2
3vim ~/bin/hadoop_shell
# 赋予可执行权限
chmod +x vim ~/bin/hadoop_shell1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85#!/bin/bash
# 定义颜色变量
RED='\033[0;31m' # 红色
GREEN='\033[0;32m' # 绿色
YELLOW='\033[0;33m' # 黄色
BLUE='\033[0;34m' # 蓝色
MAGENNTA='\033[0;35m' # 紫红色
CYAN='\033[0;36m' # 青色
NC='\033[0m' # NC表示no color,用于将颜色重置为默认值
HADOOP_HOME="/opt/module/hadoop-3.3.5"
if [ $# -lt 1 ]
then
echo -e " ${RED}No Args Input...${NC} "
exit;
fi
function start() {
echo -e " ${BLUE}==================启动Hadoop集群 ==================${NC} "
echo -e " ${BLUE}------------------ 启动 HDFS ------------------${NC} "
ssh hadoop102 "${HADOOP_HOME}/sbin/start-dfs.sh"
echo -e " ${BLUE}------------------ 启动 Yarn ------------------${NC} "
ssh hadoop103 "${HADOOP_HOME}/sbin/start-yarn.sh"
echo -e " ${BLUE}------------------ 启动 HistoryServer ------------------${NC} "
ssh hadoop104 "${HADOOP_HOME}/bin/mapred --daemon start historyserver"
}
function stop() {
echo -e " ${RED}================== 关闭 Hadoop 集群 ==================${NC} "
echo -e " ${RED}------------------ 关闭 HistoryServer ------------------${NC} "
ssh hadoop102 "${HADOOP_HOME}/bin/mapred --daemon stop historyserver"
echo -e " ${RED}------------------ 关闭 Yarn ------------------${NC} "
ssh hadoop103 "${HADOOP_HOME}/sbin/stop-yarn.sh"
echo -e " ${RED}------------------ 关闭 HDFS ------------------${NC} "
ssh hadoop104 "${HADOOP_HOME}/sbin/stop-dfs.sh"
}
function help() {
echo -e " ${YELLOW} start 启动Hadoop集群 ${NC} "
echo -e " ${YELLOW} stop 停止Hadoop集群 ${NC} "
echo -e " ${YELLOW} restart 重启Hadoop集群 ${NC} "
echo -e " ${YELLOW} jpsall 查看Hadoop集群进程 ${NC} "
}
function jpsall() {
for host in hadoop102 hadoop103 hadoop104
do
echo -e " ${CYAN}========================= $host =========================${NC} "
ssh $host jps
done
}
case $1 in
"start")
start
;;
"stop")
stop
;;
"jpsall")
jpsall
;;
"restart")
stop
start
;;
"help")
help
;;
*)
echo -e " ${RED}有效参数如下${NC} "
help
;;
esac脚本分发
1
xsync ~/bin/hadoop_shell
注意:对应机器赋予脚本可执行权限
常用端口号说明
端口名称 | Hadoop2.x | Hadoop3.x |
---|---|---|
NameNode内部通信端口 | 8020/9000 | 8020/9000/9820 |
NameNode HTTP Ul | 50070 | 9870 |
MapReduce查看执行任务端口 | 8088 | 8088 |
历史服务器通信端口 | 19888 | 19888 |
集群时间同步
关于CTS时区的可能:
美国中部时间:Central Standard Time (USA) UT-6:00
澳大利亚中部时间:Central Standard Time (Australia) UT+9:30
中国标准时间:China Standard Time UT+8:00
古巴标准时间:Cuba Standard Time UT-4:00
外网同步
1
2
3
4
5
6
7
8
9# 安装时间同步插件
yum install ntpdate
# 开启时间同步
service ntpdate restart
# 设置时区
# 删除本地时间(美国时间)
rm -rf /etc/localtime
# 设置时区为上海
ln -s /usr/share/zoneinfo/Asia/Shanghai /etc/localtime内网时间服务器配置(必须root用户)
- 查看所有节点npd服务状态和开机自启动状态
1
2
3
4
5
6
7sudo systemctl status ntpdate # 查看ntpdate状态
sudo systemctl start ntpdate # 启动ntpdate
sudo systemctl stop ntpdate # 停止ntpdate
sudo systemctl is-enabled ntpdate # 查看开机是否自启
sudo systemctl enable ntpdate # 开机自启
sudo systemctl disable ntpdate # 移除开机自启
sudo systemctl list-unit-files|grep ntpdate # 查看是否开机自启
- 查看所有节点npd服务状态和开机自启动状态
Hadoop之HDFS
HDFS概述
HDFS产出背景及定义
HDFS产生背景
随着数据量越来越大,在一个操作系统存不下所有的数据,那么就分配到更多的操作系统管理的磁盘中,但是不方便管理和维护,迫切需要一种系统来管理多台机器上的文件,这
就是分布式文件管理系统。HDFS只是分布式文件管理系统中的一种。HDFS定义
HDFS(Hadoop Distributed File System),它是一个文件系统,用于存储文件,通过目录树来定位文件;其次,它是分布式的,由很多服务器联合起来实现其功能,集群中的服务
器有各自的角色。HDFS的使用场景:适合一次写入,多次读出的场景。一个文件经过创建、写入和关闭之后就不需要改变。
HDFS优缺点
优点
- 高容错性
- 数据自动保存多个副本。它通过增加副本的形式,提高容错性。
- 某一个副本丢失以后,它可以自动恢复。
- 适合处理大数据
- 数据规模:能够处理数据规模达到GB、TB、甚至PB级别的数据
- 文件规模:能够处理百万规模以上的文件数量,数量相当之大。
- 可构建在廉价机器上,通过多副本机制,提高可靠性。
缺点
- 不适合低延时数据访问,比如毫秒级的存储数据,是做不到的。
- 无法高效的对大量小文件进行存储。
- 存储大量小文件的话,它会占用NameNode大量的内存来存储文件目录和块信息。这样是不可取的,因为NameNode的内存总是有限的;
- 小文件存储的寻址时间会超过读取时间,它违反了HDFS的设计目标
- 不支持并发写入、文件随机修改。
- 一个文件只能有一个写,不允许多个线程同时写
- **仅支持数据append(追加)**,不支持文件的随机修改。
HDFS架构组成
NameNode(nn)
:就是Master,它是一个主管、管理者。- 管理HDFS的名称空间;
- 配置副本策略;
- 管理数据块(Block)映射信息;
- 处理客户端的读写请求
DataNode
:就是Slave。NameNode下达命令,DataNode执行实际的操作。- 存储实际的数据块;
- 执行数据块的读/写操作
Client
:就是客户端。- 文件切分。文件上传HDFS的时候,Client将文件切分成一个一个的Block,然后进行上传:
- 与NameNode交互,获取文件的位置信息;
- 与DataNode交互,读取或者写入据:
- Client提供一些命令来管理HDFS,比如NameNode格式化;
- Client可以通过一些命令来访问HDFS,比如对HDFS增删查改操作;
Secondary NameNode
:并非NameNode的热备。当NameNode挂掉的时候,它并不能马上替换NameNode并提供服务。- 辅助NameNode,分担其工作量,比如定期合并
Fsimage
和Edits
,并推送给NameNode; - 在紧急情况下,可辅助恢复NameNode。
- 辅助NameNode,分担其工作量,比如定期合并
HDFS文件块大小
HDFS中的文件在物理上是分块存储(Block),块的大小可以通过配置参数(dfs.blocksize
)来规定,默认大小在Hadoop2.x/3.x版本中是128M,1.x版本中是64M。
- 集群中的Block
- 如果寻址时间约为10ms,即查找到目标block的时间为10ms。
- 寻址时间为传输时间的1%时,则为最佳状态。因此,传输时间=10ms/0.01=1000ms=1s
- 而目前磁盘的传输速率普遍为100MB/s
- Block大小=1s*100MB/s=100MB
思考:为什么块的大小不能设置太小,也不能设置太大?
- HDFS的块设置太小,会增加寻址时间,程序一直在找块的开始位置:
- 如果块设置的太大,从磁盘传输数据的时间会明显大于定位这个块开始位置所需的时间。导致程序在处理这块数据时,会非常慢。
总结:HDFS块的大小设置主要取决于磁盘传输速率。
HDFS的Shell操作
基本语法
hadoop fs 具体命令 OR hdfs dfs具体命令
两个是完全相同的。
常用命令
- 确保Hadoop集群已启动
-help
:输出这个命令参数hadoop fs -help rm
- 创建
/sanguo
文件夹hadoop fs -mkdir /sanguo
上传
-moveFromLocal
:从本地剪切粘贴到HDFS1
2
3
4
5# 创建文件,插入值 shuguo
vim shuguo.txt
# 执行成功查看当前路径,发现本地shuguo.txt文件已经没有了
hadoop fs -moveFromLocal ./shuguo.txt /sanguo-copyFromLocal
:从本地文件系统中拷贝文件到HDFS路径去1
2
3
4
5# 创建文件,插入值 weiguo
vim weiguo.txt
# 执行成功查看当前路径,发现本地weiguo.txt文件还存在
hadoop fs -copyFromLocal ./weiguo.txt /sanguo-put
:等同于-copyFromLocal
生产环境更习惯用put1
2
3
4# 创建文件,插入值 wuguo
vim wuguo.txt
hadoop fs -put ./wuguo.txt /sanguo-appendToFile
:追加一个文件到已经存在的文件末尾1
2
3
4
5# 创建文件,插入值 liubei
vim liubei.txt
# 把 liubei.txt 文件中的值(liubei)追加到 HDFS的/sanguo/shuguo.txt 文件中
hadoop fs -appendToFile ./liubei.txt /sanguo/shuguo.txt
下载
-copyToLocal
:从HDFS拷贝文件到本地1
2# 从文系统的/sanguo/shuguo.txt文件拷贝到当前路径
hadoop fs -copyToLocal /sanguo/shuguo.txt ./-get
: 等同于-copyToLocal
,生产环境更习惯用get1
2# 从文系统的/sanguo/shuguo.txt文件拷贝到当前路径并且下载的文件夹名为shuguo2.txt
hadoop fs -copyToLocal /sanguo/shuguo.txt ./shuguo2.txt
HDFS直接操作
-ls
:显示日志信息1
2# 查看文件系统/sanguo目录下的文件信息
hadoop fs -ls /sanguo-cat
:显示文件内容1
2# 查看文件系统/sanguo/shuguo.txt 文件的内容
hadoop fs -cat /sanguo/shuguo.txt-chgrp、-chmod、-chown
:Linux文件系统中的用法一样,修改文件所属权限1
2hadoop fs -chmod 666 /sanguo/shuguo.txt
hadoop fs -chown root:root /sanguo/shuguo.txt-mkdir
:创建路径1
hadoop fs -mkdir /jinguo
-cp
:从HDFS的一个路径拷贝到HDFS的另一个路径1
hadoop fs -cp /sanguo/shuguo.txt /jinguo
-mv
:在HDFS目录中移动文件1
2hadoop fs -mv /sanguo/weiguo.txt /jinguo
hadoop fs -mv /sanguo/wuguo.txt /jinguo-tail
:显示一个文件的末尾1kb的数据1
hadoop fs -tail /jinguo/shuguo.txt
-rm
:删除文件或文件夹1
hadoop fs -rm /sanguo/shuguo.txt
-rm -r
:递归删除文件及文件里面的内容1
hadoop fs -rm -r /sanguo
-du
:统计文件夹的大小信息1
2
3
4
5
6
7
8[hadoop@hadoop102 hadoop-3.3.5]$ hadoop fs -du -s -h /jinguo
27 81 /jinguo
[hadoop@hadoop102 hadoop-3.3.5]$ hadoop fs -du -h /jinguo
14 42 /jinguo/shuguo.txt
7 21 /jinguo/weiguo.txt
6 18 /jinguo/wuguo.txt
# 说明:27表示文件大小;81表示27*3个副本;/jinguo表示查看的目录-setrep
:设置HDFS中文件的副本数量1
hadoop fs -setrep 10 /jinguo/shuguo.txt
这里设置的副本数只是记录在
NameNode
的元数据中,是否真的会有这么多副本,还得看DataNode
的数量。因为目前只有3台设备,最多也就3个副本,只有节点数的增加到l0
台时,副本数才能达到10。
HDFS的API操作
环境变量及文件参考:https://github.com/cdarlint/winutils
创建一个SpringBoot项目
pom.xml
1 |
|
1 |
|
HDFS的读写流程
HDFS的写数据流程
- 客户端创建
Distributed FileSystem
,该模块向HDFS
的老大哥NameNode
请求上传文件ss.avi
- 老大哥
NameNode
是个做事很守规矩的人,所以他要检查权限和目录结构是否存在,如果存在就向Distributed FileSystem
响应可以上传文件 - 但是
Distributed FileSystem
并不知道我上传的文件应该传给哪个小老弟(DataNode1、2、3
),就问老大哥:”大哥,我应该上传到哪个DataNodee
小老弟服务器啊?“ - 老大哥
NameNode
需要想一下文件应该存到哪个机器上(这个过程叫做副本存储节点选择
,下文有写),想好之后就返回dn1、2、3
说”这三个可以存文件“ - 这时候客户端就要创建
文件输出流FSDataOutPutStream
让他去上传数据 - 数据是需要通过
管道
进行传输的,所以文件输出流
就需要先铺设管道,它首先请求dn1
、dn1
再调用dn2
,最后dn2
调用dn3
,将这个通信管道建立完成,dn1、2、3
逐级应答客户端建立完毕 - 这时候,客户端就开始往
dn1
上传第一个Block
,以Packet
为单位,dn1
收到一个Packet
就会传给dn2
,dn2
再传给dn3
。为了保证数据不丢失,三个机器在传数据的时候,都有自己的一个应答对列放Packet
,直到收到下一台机器传回的ack
应答信息,才把Packet
删掉。 - 当一个
Block
传输完成之后,客户端会再次请求NameNode
上传第二个Block
的服务器。重复执行上面的操作,直到所有的数据都上传完成
网络拓扑 - 节点距离计算
在HDFS
写数据的过程中,NameNode
会选择距离待上传数据最近距离的DataNode
接收数据,需要计算出这个距离。
节点距离:两个节点到达最近的功能祖先的距离总和
现实生活中,服务器都会在机架
上放着,然后形成一个图
,看下图:
我们要想计算节点距离,可以把他抽象成一个图:
如果我们要算机器d1->r2->n1
到d2->r6->n0
的距离,则需要找到他们的共同祖先,然后把把路径相加就可以了,如下图所示:
机架感知(副本存储节点选择)
For the common case, when the replication factor is three, HDFS’s placement policy is to put one replica on the local machine if the writer is on a datanode, otherwise on a random datanode in the same rack as that of the writer, another replica on a node in a different (remote) rack, and the last on a different node in the same remote rack. This policy cuts the inter-rack write traffic which generally improves write performance. The chance of rack failure is far less than that of node failure; this policy does not impact data reliability and availability guarantees. However, it does not reduce the aggregate network bandwidth used when reading data since a block is placed in only two unique racks rather than three. With this policy, the replicas of a block do not evenly distribute across the racks. Two replicas are on different nodes of one rack and the remaining replica is on a node of one of the other racks. This policy improves write performance without compromising data reliability or read performance.
对于常见的情况,当复制系数为3时,HDFS的放置策略是,如果写作者在数据节点上,则将一个副本放在本地机器上,否则放在与写作者相同机架的随机数据节点上,另一个副本放在不同(远程)机架的节点上,最后一个放在同一远程机架的不同节点上。这种策略减少了机架间的写入流量,通常会提高写入性能。机架故障的几率远远小于节点故障的几率;这个策略不影响数据的可靠性和可用性保证。然而,它并没有减少读取数据时使用的总网络带宽,因为一个区块只被放在两个唯一的机架上,而不是三个。在这种策略下,一个区块的副本不会均匀地分布在机架上。两个副本在一个机架的不同节点上,其余的副本在其他机架的一个节点上。这个策略在不影响数据可靠性或读取性能的情况下提高了写入性能。
源码说明:org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault#chooseTargetInOrder
HDFS
写文件会把第一个副本存储在客户端所处节点上,第二个副本在另一个机架上的随机一个节点,第三个副本在第二个副本所在机架的另外一个随机节点上。如下图所示:
HDFS读数据流程
- 客户端通过
DistributedFileSystem
向NameNode
请求下载文件,NameNode
通过查询元数据,找到文件块所在的DataNode
地址 - 挑选一台
DataNode
(就近原则,然后随机)服务器,请求读取数据 -
DataNode
开始传输数据给客户端(从磁盘里面读取数据输入流,以Packet
为单位来做校验) - 客户端以
Packet
为单位接收,现在本地缓存,然后写入目标文件
NameNode与SecondaryNameNode
NameNode
简称为NN
,SecondaryNameNode
简称为2NN
NN和2NN工作机制
元数据在什么地方?
NameNode
中有元数据,那么元数据是存在什么地方呢?
如果元数据存储在NN
的内存中,内存有两个特点就是:不可靠(断电即失)、计算速度快;这样保证不了数据的可靠性,一旦断电、元数据丢失,整个集群就无法工作了
如果元数据存储在NN
的硬盘中,硬盘的特点是:可靠、计算速度慢;这样虽然保证了数据的可靠性,但是它的计算速度就慢了。
因此HDFS
采用了内存 + 磁盘
的方式,并且引入了两个概念FsImage与Edits
:FsImage
负责存储数据;Edits
负责追加内容(不计算)。每当元数据有更新或者或者添加元数据时,修改内存中的元数据并追加到Edits
中。 这样,一旦NN
节点断电,就可以通过FsImage
和Edits
的合并,合成元数据。
但是随着Edits
文件数据过大,它的效率降低,一旦断电、恢复元数据的时间也过长,因此需要定期进行FsImage
和Edits
的合并,这个工作如果只有NN
做,又会效率过低。所以引入一个新的节点2NN
专门用于FsImage
和Edits
的合并。
NN和2NN的工作机制
下面一张图解释了NN
和2NN
的工作机制:
它分为两个阶段:
第一阶段:NameNode
启动:
- 第一次启动
NN
格式化后,创建FsImage
和Edits
文件,如果不是第一次启动,直接加载FsImage
和Edits
到内存 - 客户端对元数据进行增删改的请求
-
NN
的Edits
记录操作日志,更新滚动日志 -
NN
在内存中对元数据进行增删改
第二阶段:Secondary NameNode
工作:
-
2NN
会定期或当Edits
数据满的时候询问NN
是否需要CheckPoint
(检查是否需要合并) -
2NN
请求执行CheckPoint
-
NN
滚动正在写的Edits
日志 - 然后将滚动前的编辑日志和镜像文件拷贝到
2NN
-
2NN
加载编辑日志和镜像文件到内存,并合并 - 生成新的镜像文件
fsimage.chkpoint
- 拷贝
fsimage.chkpoint
到NN
-
NN
将fsimage.chkpoint
重新命名为fsimage
FsImage和Edits解析
我们可以查看NN
所在的hadoop102
和2NN
所在的hadoop104
他们的fsimage
和edits
文件:
hadoop102
路径:/opt/module/hadoop-3.3.5/data/dfs/name/current
hadoop104
路径:/opt/module/hadoop-3.3.5/data/dfs/namesecondary/current
可以发现NN
比2NN
多一个文件edits_inprogress...
,如果NN
宕机,那么2NN
可以帮助恢复NN
元数据,但是只能恢复一部分,缺少的部分正是该文件。该文件存储的是正在执行的操作
文件解读:
-
Fsimage文件
:HDFS
文件系统元数据的一个永久性的检查点,其中包含HDFS
文件系统的所有目录和文件inode
的序列化信息 -
Edits
文件:存放HDFS
文件系统的所有更新操作的路径,文件系统客户端执行的所有写操作首先会被记录到Edits
文件中 -
seen_txid
文件保存的是一个数字,就是最后一个edits_
的数字 - 每次
NN
启动的时候都会将Fsimage
文件读入内存,加载Edits
里面的更新操作,保证内存中的元数据信息是最新的、同步的,可以看成NN
启动的时候就将Fsimage
和Edits
文件进行了合并
oiv查看Fsimage文件
语法:hdfs oiv -p 文件类型 -i 镜像文件 -o 转换后文件输出路径
现在使用这个命令查看一下fsimage_0000000000000000254
:
1 |
|
执行完成之后,会输出一个文件 cat /tmp/fsimage.xml
1 |
|
oev查看Edits文件
语法:hdfs oev -p 文件类型 -i 编辑日志 -o 转换后文件输出路径
现在使用这个命令查看一下edits_inprogress...
1 |
|
已经有了该文件: cat /tmp/edits.xml
1 |
|
CheckPoint时间设置
通常情况下2NN
每隔一个小时执行一次检查,可以查看hdfs-default.xml
:
1 |
|
当操作数达到一百万,2NN
执行一次:
1 |
|
如果想要更换时间或者更改操作数,可以配置到hdfs-site.xml
HDFS的DataNode
DataNode工作机制
它的工作流程是:
- 当
DataNode
启动后,必须向NameNode
汇报自己的块信息,然后定期(6个小时)扫描、上报自己所有块的信息。块信息包括:数据、数据长度、校验和(即数据完整性)、时间戳 - 每个
DataNode
必须定期向NameNode
汇报说:我还活着。这个过程叫做心跳,心跳每三秒一次;如果超过10分钟+30秒NameNode
没有收到DataNode
的心跳,就会认为DataNode
挂掉了
DN向NN汇报当前块信息的时间间隔,默认6个小时,在hdfs-default.xml
文件中有配置:
1 |
|
DN扫描自己自己节点块的信息的时间,默认6个小时,同样在hdfs-default.xml
文件中有配置:
1 |
|
如果想改变时间间隔,可以将上述两个配置信息配置到hdfs-site.xml
中,然后分发配置重启hadoop
集群,配置就会生效
数据完整性
数据完整性就是要保证数据在网络传输中不发生错误,所以要采取一些校验数据的手段,比如奇偶校验、CRC循环冗余校验等
比如我们要在网络中传输
第一个数据:
1000100
第二个数据:
1000101
奇偶校验就是:数所有的位中有多少个1
,如果1
的个数是偶数,那么就在末尾添加0
;反之,如果1
的个数是奇数,那么就在末尾添加1
。对于上面的两个数据,他们应该各自加:
第一个数据:
1000100 | 0
第二个数据:
1000101 | 1
这样只要在传输过去之后,再次计算校验位,然后与携带过来的校验位进行对比,就可以知道数据有没有传输失误了。
但是,使用奇偶校验有一个很明显的问题,那就是如果两个位发生了改变,最后得出的校验位还是原来的数字。比如第一个数据:1000100在传输过程中,变成了1100110,数据发生了改变,但是校验位依旧是0,显然是有很大的问题的
所有Hadoop
就采用了另外一种更安全的校验方法CRC循环冗余校验
。这种校验法会随机生成一个多项式
,然后把原始数据与多项式进行除法操作,最后把得出的余数一起发送过去。接收端再对原始数据除一下多项式,如果得出的余数和发送端一样,就没有任何问题。
掉线时限参数设置
在前文提到的心跳
中,如果DataNode
在10分钟+30秒
内不向NameNode
心跳一下,就会认为DataNode
挂掉了
那么这个10分钟+30秒
是怎么得来的呢?
它的计算公式是:TimeOut = 2 * dfs.namenode.heartbeat.recheck-interval + 10 * dfs.heartbeat.interval
看一下hdfs-default.xml
的默认设置:其中dfs.namenode.heartbeat.recheck-interval
的默认时间是300000毫秒
即五分钟,dfs.heartbeat.interval
的默认时间是3秒
,于是超时时间就是10分钟+30秒
1 |
|
如果想要修改这些配置,可以把这些配置信息都配置到hdfs-site.xml
中
Hadoop之MapReduce
Hadoop
中的MapReduce
是一种编程模型,用于大规模数据集的并行运算
MapReduce 概述
MapReduce定义
MapReduce
是Hadoop
的一个分布式(多台机器)运算程序的编程框架,也是Hadoop
的核心框架
如果让我们用程序实现一个多台机器并发计算的程序,我们可能需要关心到数据传输、如何并发等很多底层的问题,MapReduce
为我们提供了一系列的框架,我们可以直接调用
MapReduce
核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个Hadoop
集群上
综上所述:MapReduce = 自己写的业务逻辑代码 + 它自身默认的底层代码
MapReduce的优缺点
优点
- 易于编程:因为
MapReduce
已经封装好了底层代码,我们只需要关心业务逻辑代码,实现框架的接口就好了 - 有良好的扩展性:如果服务器资源枯竭,可以动态增加服务器,解决计算资源不够的问题
- 高容错性:任何一台机器挂掉,都可以将任务转移到其他节点
- 适合海量数据计算:TB/PB级别,几千台服务器共同计算
缺点
当然因为MapReduce
把中间结果存放在了硬盘,所以它不适合用于:
- 不适用于实时计算
- 不擅长流式计算(Spark Streaming、Flink适合)
- 不擅长DAG有向无环图计算
MapReduce核心编程思想
拿一个需求来举例说明MapReduce
的思想:有个300M的文件,统计每一个单词出现的总次数,要求查询结果a-p
的为一个文件,q-z
的为一个文件
如果拿到现实中,要统计这么多单词并且按照a-p,p-z
分区,会拿出两张纸,一张纸记录a-p
的、另一张纸记录p-z
的,但是300M
的文件太大了,自己一个人肯定完不成,所以找来两个人来帮忙,他们都拿出两张纸按照同样的方法进行统计,到最后他们各自会拿出满满的两页纸,上面记录着各个单词,每个单词出现一次就记录为(hadoop->1
)这样的键值对。这个就是Map
阶段,他们三个人每个人做的都是MapTask
任务。
上面的任务做完了,就需要专门派两个人来统计a-p
与p-z
开头的单词,把相同的单词后面数字相加,这样就得到了最后的结果。这个阶段是Reduce
聚合阶段,这两个人做的是ReduceTask
任务。
换成专业的说法,看下图:
综上所述:
- 分布式的运算程序王王需要分成至少两个阶段
- 第一个阶段的
MapTask
并发实例,完全并行运行,互不相干 - 第二个阶段的
ReduceTask
并发实例互不相干,但是他们依赖于MapTask
的结果 -
MapReduce
编程模型只能包含一个Map
阶段和一个Reduce
阶段,如果用户的业务逻辑非常复杂,那么只好编写多个MapReduce
程序,但是这些都是串行执行的
MapReduce进程
一个完整的MapReduce程序在分布式运行时有三类实例进程:
MrAppMaster
:负责整个程序的过程调度及状态协调。MapTask
:负责Map阶段的整个数据处理流程。ReduceTask
:责Reduce阶段的整个数据处理流程。
官方WordCount源码
常用数据序列化类型
看一下WordCount代码:
从上面的代码中,我们可以看到有很多之前没有见过的数据类型,这些类型都是Hadoop
自己的类型,下表总结了Java
类型与Hadoop
数据类型的对比:
可以发现除了String
对应的是Text
,其他的类型只不过是在最后加了关键字Writable
,所以Hadoop
的数据类型还是很好记忆与掌握的
MapReduce编程规范
从上面的案例代码中可以看到整个WordCount
程序分为了三个部分(Mapper
、Reducer
和Driver
),下面把他们的方法签名都抽取出来:
1 |
|
其中main
对应的是Driver
阶段;IntSumReducer
对应的是Reduce
阶段,继承了Reducer
类;TokenizerMapper
对应的是Map
阶段,继承了Mapper
类
可以看到继承的类后面跟了很多的泛型,接下来逐个击破!
Mapper阶段
- 用户自定义的
Mapper
要继承自己的父类,即继承了Mapper
类 Mapper
后面跟的泛型,前两个是一个k-v
键值对(用户可自定义),对应的是输入数据
Mapper
的输出数据也是一个K-V
键值对,对应的是后面两个泛型
Mapper
中的业务逻辑写在map()
方法中,map()即MapTask进程
方法对每一个k-v
调用一次,看下图:
Reducer阶段
- 用户自定义的
Reducer
要继承自己的父类Reducer
Reducer
的输入数据类型对应Mapper
的输出数据类型,也是K-V
键值对,如下图:
Reducer
的业务逻辑写在reduce()
方法中,ReduceTask
进程对每一组相同的k
的k-v
组调用一次reduce()
方法
Driver阶段
相当于YARN
集群的客户端,用于提交整个程序到YARN
集群,提交的是封装了MapReduce
程序相关运行参数的job
对象。
WordCount案例
需求分析
统计给定文本中每一个单词出现的个数
准备的文本数据:
1 |
|
期望的输出数据:
1 |
|
根据MapReduce
的编程规范,应该将该程序分为三个部分:
Mapper
类:负责数据的拆分- 将内容先转换为
String
- 使用
String.split()
将这一行的数据切分成单词 - 将单词封装为
<xue,1>
- 将内容先转换为
Reducer
类:汇总数据,即聚合,统计出出现次数- 汇总各个
key
的个数 - 输出该
key
的总次数
- 汇总各个
Driver
类:固定的七大步套路- 获取
Job
- 设置本程序
jar
包所在的本地路径 - 关联
Mapper
和Reducer
类 - 指定
Mapper
输出数据的k-v
类型 - 指定最终输出的数据的
k-v
类型 - 指定
job
输入与输出文件路径 - 提交作业
- 获取
环境准备
创建三个类WordCountMapper
、WordCountReducer
、WordCountDriver
pom.xml
新增如下:
1 |
|
编写程序
Mapper类
在下面的编写代码中,切记不要导错包,一定要导mapreduce包下的包,不要导入mapred的
该类需要继承Mapper
类,我们先写上:
1 |
|
按住ctrl+b
,点击Mapper
查看源码:
首先看Contxt
这个抽象内部类:
然后找到run
方法,这个方法是用于执行Mapper
操作的:
在源码中setup
和cleanup
都是空方法,什么都不做:
编写Mapper类代码
1 |
|
Reducer类
Reducer源码
该类需要继承Reducer
类,先写上:
1 |
|
按住ctrl+B
,点击Reducer
,查看源码:
看run
方法:
编写Reducer类
该类需要继承Reducer
,同样的他也需要加四个泛型,两个k-v
键值对
第一个键值对,对应的是Mapper
的输出类型,应该填:Text,IntWritable
第二个键值对,是最终输出的类型,这里填写我们期望输出数据的类型:Text,IntWritable
1 |
|
这个类需要重写reduce
方法,直接输入reduce
按下回车:
在这个方法里面,我们只需要做两件事:循环遍历Iterable<IntWritable>
集合,把值相加;最终通过Context
把值输出,这里同样做一次变量提升:
1 |
|
Driver类
前文的需求分析,已经提到了七大步,这里直接给出代码:
1 |
|
查看结果
._SUCCESS.crc
:校验文件.part-r-00000.crc
:校验文件_SUCCESS
:标志成功part-r-00000
:真实数据输出文件
part-r-00000
文件内容:
1 |
|
Hadoop序列化
序列化概述
什么是序列化
序列化就是把内存中的对象,转换成字节序列,然后用户存储到磁盘或网络传输
反序列化就是把收到的序列化序列,转换成内存的对象,两者是相反的
一方面,因为MapReduce
在生产环境中是分布式工作的,那么就避免不了内存中的数据在网络上的传输,使用序列化是为了更加方便的传输
另一方面,Hadoop
提供的序列化的数据类型太少, 并不能满足生产的需求。比如有时候期望输出的是一组数据,那么就需要一个bean
对象存储这些基本的数据类型,这时候把bean
实现序列化,放在Mapper
和Reducer
的泛型上就好了
Java序列化机制与Hadoop序列化的对比
Java
序列化机制是由Serializable
实现的,它的序列化会附加各种校验信息、头信息、继承体系等,如果用于网络传输,那么要传的数据太大了
Hadoop
序列化机制只附加了简单校验信息,数据量小,有利于网络传输;如下图所示:
Hadoop
序列化的特点:
- 紧凑:高效使用存储空间
- 快速:读写数据的额外开销小
- 互操作性强:支持多语言的交互
自定义bean对象实现序列化(writable
)
-
Bean
对象要实现Writable
接口 - 重写接口中的序列化和反序列化方法,值得注意的是:序列化的顺序要和反序列化的顺序完全一致
- 必须要由空参构造方法
- 重写
toString
方法,用\t
把数据分开 -
Hadoop
的key
自带排序,如果把自定义的bean
对象放在key
中传输,则还需要实现Comparable
接口,因为MapReduce框中的Shuffle过程要求对key必须能排序
序列化案例实操
需求分析
统计每一个手机号耗费的总上行流量、总下行流量、总流量
1 |
|
在Map
阶段:
- 需要给四个泛型,前两个泛型就是固定的偏移量和这一行的数据,即
LongWritable,Text
- 后两个泛型用
手机号
表示key
,自定义bean
实现序列化接口表示上行流量、下行流量、总流量
- 在此阶段,首先需要读取一行数据,按照
\t
切分数据,抽取三个数据,再封装到context
在Reduce
阶段:
- 同样需要给四个泛型,前两个与
Map
保持一致 - 后两个泛型与我们期望的输出数据也一致
- 在此阶段,只需要累加上、下行流量,得到总流量,最后封装就好了
代码实操
自定义bean
对象,写入上行、下行、总流量属性,并添加setter与getter
方法:此处使用Lombok
1 |
|
Mapper、Reducer、Driver编写
Mapper
类:
1 |
|
Reducer
类:
1 |
|
Driver
类:
1 |
|
查看结果:part-r-00000
文件
1 |
|
MapReduce 框架原理
InputFormat数据输入
切片与MapTask并行度决定机制
MapTask
的并行度决定Map
阶段的任务处理并发读,进而影响到整个Job
的处理速度,引入两个概念:
- 数据块:
Block
是HDFS
物理上把数据分成一块一块,数据块是HDFS
存储数据单位 - 数据切片: 只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。数据切片是MapReduce程序计算输入数据的单位,一个切片会对应启动一个
MapTask
Job提交流程
提交一个Job
要经过:
- 建立连接
connect()
- 在这里会判断该
Job
是本地运行环境还是YARN
集群运行环境
- 在这里会判断该
- 提交
Job
,submitJobInternal()
- 创建给集群提交数据的
Stag
路径—getStagingDir()
- 获取
JobId
,并创建Job
路径—getNewJobID()
- 拷贝
jar
包到集群—copyAndConfigureFiles()
与uploadFiles
- 计算切片,生成切片的规划文件—
writeSplits
- 向
Stag
路径写XML
配置文件—writreXml()
- 最后提交
Job
,返回提交状态
- 创建给集群提交数据的
切片执行流程解析
org.apache.hadoop.mapreduce.JobSubmitter#writeSplits
程序先找到你数据存储的目录。
开始遍历处理(规划切片)目录下的每一个文件
遍历第一个文件
input.txt
获取文件大小
fs.sizeOf(input.txt)
计算切片大小
org.apache.hadoop.mapreduce.lib.input.FileInputFormat#computeSplitSize
1
2
3
4
5
6
7
8
9
10/**
* @param blockSize 本地模式32M 集群模式128M
* @param minSize 1
* @param maxSize Long的最大值
* @return 需要的切片大小
*/
protected long computeSplitSize(long blockSize, long minSize,
long maxSize) {
return Math.max(minSize, Math.min(maxSize, blockSize));
}默认情况下,切片大小=blocksize
开始切,形成第1个切片:
input.txt0:128M
、第2个切片input.txt一128:256M
、第3个切片input.txt一256M:300M
(每次切片时,都要判断切完剩下的部分是否大于块的1.1倍,不大于1.1倍就划分一块切片)将切片信息写到一个切片规划文件中
整个切片的核心过程在
org.apache.hadoop.mapreduce.InputFormat#getSplits
方法中完成InputSplit
只记录了切片的元数据信息,比如起始位置、长度以及所在的节点列表等。
提交切片规划文件到YARN上,YARN上的MrAppMaster就可以根据切片规划文件计算开启MapTask个数
FileInputFormat切片机制
切片机制:
- 简单地按照文件的内容长度进行切片
- 切片的大小默认等于
Block
大小 - 切片时不考虑数据集整体,而是逐个针对每一个文件单独切片
比如输入数据有两个文件:
1 |
|
经过FileInputFormat
的切片机制运算后,形成的切片信息如下:
1 |
|
源码中计算切片大小的公式:
1 |
|
默认情况下,切片大小=blocksize
切片大小设置:
-
maxsize
(切片最大值):参数如果调的比blockSize
小,则会让切片变小,而且就等于配置的这个参数值 -
minsize
(切片最小值):参数调的比blockSize
大,则可以让切片变得比blockSize
还大
获取切片信息API:
1 |
|
TextInputFormat
在运行MapReuce
程序时,输入的文件格式包括:基于行的日志文件、二进制格式文件、数据库表等。针对不同的数据类型,MapReduce
给用户提供了很多的接口
FileInputFormat
常见的实现类包括:TextInputFormat
、KeyValueTextInputFormat
、NLineInputFormat
、CombineTextInputFormat
和自定义InputFormat
等
TextInputFormat
是默认的 FileInputFormat
实现类。按行读取每条记录。键是存储该行在整个文件中的起始字节偏移量, LongWritable
类型。值是这行的内容,不包括任何行终止 符(换行符和回车符),Text
类型。
以下是一个示例,比如,一个分片包含了如下 4 条文本记录。
1 |
|
每条记录表示为以下键/值对:
1 |
|
CombineTextInputFormat切片机制
框架默认的TextInputFormat切片机制是对任务按文件规划切片,不管文件多小,都会是一个单独的切片,都会交给一个MapTask,这样如果有大量小文件,就会产生大量的
MapTask,处理效率极其低下。
应用场景:
CombineTextInputFormat用于小文件过多的场景,它可以将多个小文件从逻辑上规划到一个切片中,这样,多个小文件就可以交给一个MapTask处理。
虚拟存储切片最大值设置
CombineTextInputFormat.setMaxInputSplitSize(job,4194304); //4m
注意:虚拟存储切片最大值设置最好根据实际的小文件大小情况来设置具体的值。
切片机制
生成切片过程包括:虚拟存储过程和切片过程二部分。
虚拟存储过程:
将输入目录下所有文件大小,依次和设置的
setMaxInputSplitSize
值比较,如果不大于设置的最大值,逻辑上划分一个块。如果输入文件大于设置的最大值且大于两倍,
那么以最大值切割一块;当剩余数据大小超过设置的最大值且不大于最大值2倍,此时将文件均分成2个虚拟存储块(防止出现太小切片)。 例如
setMaxInputSplitSize
值为4M,输入文件大小为8.02M,则先逻辑上分成一个4M。剩余的大小为4.02M,如果按照4M逻辑划分,就会出现0.02M的小的虚拟存储文件,所以将剩余的4.02M文件切分成(2.01M和2.01M)两个文件。切片过程:
判断虚拟存储的文件大小是否大于setMaxInputSplitSize值,大于等于则单独形成一个切片。
如果不大于则跟下一个虚拟存储文件进行合并,共同形成一个切片。
测试举例:有4个小文件大小分别为1.7M、5.1M、3.4M以及6.8M这四个小文件,则虚拟存储之后形成6个文件块,大小分别为:
1.7M,(2.55M、2.55M),3.4M以及(3.4M、3.4M)
最终会形成3个切片,大小分别为:
(1.7+2.55)M,(2.55+3.4)M,(3.4+3.4)M