Hadoop3环境搭建_伪分布模式
1、在Linux中安装和配置JDK8
2、Hadoop3安装和配置(伪分布模式)
3、Hadoop3安装测试
掌握Hadoop3伪分布模式环境的搭建、文件配置和环境测试。
Hadoop的运行模式分为3种:本地运行模式,伪分布运行模式,集群运行模式。
伪分布模式是将守护程序运行在本地主机,模拟一个小规模集群。这里不但需要本地模式的操作过程,需要对配置文件进行配置。
Hadoop3的运行需要JDK 8的支持,所以需要先安装JDK 8。
1、对JDK 8的安装包解压缩。
在终端窗口中,执行以下命令,进入软件包所在文件夹中,然后将JDK8解压安装到“/data/bigdata/”目录下:
$ cd /data/software$ tar -zxvf jdk-8u73-linux-x64.tar.gz -C /data/bigdata/
查看解压后的JDK安装文件:
$ ls /data/bigdata/
2、配置环境变量。
在终端窗口中,执行以下命令,打开系统文件:
$ vi /etc/profile
移动到文件的末尾,添加如下的内容到文件中(替换原来的即可):
export JAVA_HOME=/data/bigdata/jdk1.8.0_73
export PATH=$PATH:$JAVA_HOME/bin
保存文件内容。然后在终端窗口中,执行如下命令,使编辑后的系统文件生效:
$ source /etc/profile
3、测试JDK的安装。
在终端窗口中,执行如下命令,查看JDK的版本号,检验安装是否正确,如下所示:
$ Java -version
如果出现如下的版本信息,说明JDK8的安装和配置正确。
java version "1.8.0_73"Java(TM) SE Runtime Environment (build 1.8.0_73-b02)Java HotSpot(TM) 64-Bit Server VM (build 23-b02, mixed mode)
至此,JDK8的安装完成。
安装SSH服务,实现集群中SSH无密码连接(从主节点到从节点的无密码登录):
1、SSH免密配置
Hadoop在启动过程中是通过SSH远程操作的,所以在不做特殊配置下,每次启动到相关节点时,都要输入密码,如果想避免每次都输入密码,需要设置免密登录。
使用命令,生成密钥(过程中会出现输入提示,直接回车即可):
rm -rf ~/.ssh (本设备已经提前配置,需要先删除)
ssh-keygen
结果如下所示:
root@localhost:/data/bigdata/# ssh-keygen
Generating public/private rsa key pair.
Enter file in which to save the key (/root/.ssh/id_rsa):
Enter passphrase (empty for no passphrase):
Enter same passphrase again:
Your identification has been saved in /root/.ssh/id_rsa.
Your public key has been saved in /root/.ssh/id_rsa.pub.
The key fingerprint is:
SHA256:RjkQ5wqbj43LDrODrSsi4xEeoaaNqMSXE+byA0lZo6w root@e744c0e5069c
The key's randomart image is:
+---[RSA 2048]----+
| o.. |
| o + . |
|..+ o = |
|.=. + o . |
|++.oo . S |
|E== o= . |
|+O==o o |
|O B*.. |
|O=.+= |
+----[SHA256]-----+
在目录“~/.ssh/”下会生成两个文件:
id_rsa: 私钥
id_rsa.pub:公钥
将公钥导入到认证文件中,下次再登录则可以免密登录:
cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys
验证免密登录是否成功:
root@localhost:/data/bigdata/# ssh localhost
Welcome to Ubuntu 16.04.5 LTS (GNU/Linux 3.10.0-327.36.3.el7.x86_64 x86_64)
* Documentation: https://help.ubuntu.com
* Management: https://landscape.canonical.com
* Support: https://ubuntu.com/advantage
Last login: Tue Nov 12 04:11:54 2019 from 192.168.14.183
root@localhost:~# exit
注销
Connection to localhost closed.
整个登录过程不需要输入密码,则表示配置成功。
1、在终端窗口中,执行如下命令,进入软件包所在文件夹中:
$ cd /data/software
将Hadoop3解压安装到“/data/bigdata/”目录下:
$ tar -zxvf hadoop-3.3.0.tar.gz -C /data/bigdata/
2、在终端窗口中,执行如下命令,查看解压后的Hadoop3安装文件:
$ ls /data/bigdata/
3、打开"/etc/profile"配置文件,配置hadoop3环境变量(注意,标点符号全都要是英文半角)将hadoop下的环境变量注释掉。
export HADOOP_HOME=/data/bigdata/hadoop-3.3.0export PATH=$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$PATHexport HADOOP_OPTS="-Djava.library.path=$HADOOP_HOME/lib:$HADOOP_HOME/lib/native"
然后保存。
4、执行"/etc/profile",让配置生效:
$ source /etc/profile
5、测试hadoop安装。在终端窗口中,执行如下命令:
$ hadoop version
如果安装正确,可以看到如下的版本信息:
Hadoop 3.3.0Source code repository https://gitbox.apache.org/repos/asf/hadoop.git -r aa96f1871bfd858f9bac59cf2a81ec470da649afCompiled by brahma on 2020-07-06T18:44ZCompiled with protoc 3.7.1From source with checksum 5dc29b802d6ccd77b262ef9d04d19c4This command was run using /home/hduser/bigdata/hadoop-3.3.0/share/hadoop/common/hadoop-common-3.3.0.jar
配置Hadoop,共需要配置5个文件(另外可选地配置workers文件),均位于Hadoop安装目录下的"etc/hadoop/"子目录下。首先进入到该目录下:
$ cd /data/bigdata/hadoop-3.3.0/etc/hadoop
1、配置hadoop-env.sh文件:
$ vi hadoop-env.sh
然后找到并修改JAVA_HOME属性的值:
export JAVA_HOME=/data/bigdata/jdk1.8.0_73
2、配置core-site.xml文件:
$ vi core-site.xml
找到其中的"<configuration></configuration>"标签,在其中指定各个配置参数,如下所示:
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://localhost:9000/</value>
</property>
<property>
<name>hadoop.tmp.dir</name>
<value>/data/bigdata/hadoop-3.3.0/tmp</value>
</property>
<property>
<name>hadoop.proxyuser.hduser.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.hduser.groups</name>
<value>*</value>
</property></configuration>
3、配置hdfs-site.xml文件:
$ vi hdfs-site.xml
找到其中的"<configuration></configuration>"标签,在其中指定各个配置参数,如下所示:
<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
<property>
<name>dfs.datanode.max.transfer.threads</name>
<value>4096</value>
</property></configuration>
4、配置mapred-site.xml文件
$ vi mapred-site.xml
找到其中的"<configuration></configuration>"标签,在其中指定各个配置参数,如下所示:
<configuration>
<property>
<name>mapreduce.job.tracker</name>
<value>hdfs://localhost:8001</value>
<final>true</final>
</property></configuration>
5、配置yarn-site.xml文件
$ vi yarn-site.xml
找到其中的"<configuration></configuration>"标签,在其中指定各个配置参数,如下所示:
<configuration>
<property>
<name>yarn.nodemanager.resource.memory-mb</name>
<value>3072</value>
</property>
<property>
<name>yarn.nodemanager.resource.cpu-vcores</name>
<value>2</value>
</property>
<property>
<name>yarn.scheduler.minimum-allocation-mb</name>
<value>256</value>
</property></configuration>
格式化HDFS(仅需执行格式化一次)。在终端窗口,执行下面的命令:
$ cd /data/bigdata/hadoop-3.3.0/bin/$ ./hdfs namenode -format
注:如果因为某些原因需要从头重新配置集群,那么在重新格式化HDFS之前,先把Haoop下的tmp目录删除。 这个目录是在hdfs-site.xml文件中自己指定的,其下有两个子目录name和data,重新格式化之前必须删除它们。
格式化namenode,实际上就是在namenode上创建一块命名空间。在创建过程中,会加载所配置的文件,检验是否配置正确。
1、修改启动文件
$ cd /data/bigdata/hadoop-3.3.0/sbin$ vi start-dfs.sh$ vi stop-dfs.sh
分别添加内容:
#将文件内容放在脚本开头HDFS_DATANODE_USER=rootHDFS_SECURE_DN_USER=hdfsHDFS_NAMENODE_USER=rootHDFS_SECONDARYNAMENODE_USER=root
2、首先启动HDFS集群:
$ ./start-dfs.sh
3、使用jps命令查看当前节点上运行的服务:
$ jps
4、成功启动后,可以通过Web界面查看NameNode 和 Datanode 信息和HDFS文件系统。
NameNode Web接口:http://localhost:9870
5、修改启动文件
$ cd /data/bigdata/hadoop-3.3.0/sbin$ vi start-yarn.sh$ vi stop-yarn.sh
分别添加内容:
YARN_RESOURCEMANAGER_USER=rootHADOOP_SECURE_DN_USER=yarnYARN_NODEMANAGER_USER=root
6、启动yarn:
$ ./start-yarn.sh$ jps
7、启动historyserver历史服务器和timelineserver时间线服务器:
$ cd ../bin/$ ./mapred --daemon start historyserver$ ./yarn --daemon start timelineserver
8、运行pi程序:
先进入到程序示例.jar包所在的目录,然后运行MR程序:
$ cd /data/bigdata/hadoop-3.3.0/share/hadoop/mapreduce
$ hadoop jar hadoop-mapreduce-examples-3.3.0.jar pi 1 2
在输出内容中,可以找到计算出的PI值。
9、可以通过 Web 界面查看:
打开浏览器,在地址栏输入:http://localhost:8088
查看任务进度:http://localhost:8088/cluster ,在 Web 界面点击 “Tracking UI” 这一列的 History 连接,可以看到任务的运行信息。
10、关闭集群:
$ cd /data/bigdata/hadoop-3.3.0/sbin$ ./stop-yarn.sh$ ./stop-dfs.sh$ cd ../bin$ ./mapred --daemon stop historyserver$ ./yarn --daemon stop timelineserver
硬件:Ubuntu16.04
软件:JDK-1.8、Hadoop-3.3
数据存放路径:/data/dataset
tar包路径:/data/software
tar包压缩路径:/data/bigdata
软件安装路径:/opt
实验设计创建文件:/data/resource
第2章
Hadoop3分布式文件系统HDFS
2-1 HDFSShell文件操作
1、查看hdfs shell帮助命令
2、使用shell命令创建和查看目录
3、使用shell命令上传文件和文件夹
4、使用shell命令下载文件和文件夹
5、使用shell命令重命名文件和文件夹
6、使用shell命令移动文件和文件夹
6、使用shell命令删除文件和文件夹
熟练掌握HDFS Shell的常用操作命令
硬件:Ubuntu16.04
软件:JDK-1.8、Hadoop-3.3
数据存放路径:/data/dataset
tar包路径:/data/software
tar包压缩路径:/data/bigdata
软件安装路径:/opt
实验设计创建文件:/data/resource
1、启动HDFS:
$ cd /opt/hadoop-3.3.0/sbin/$ ./start-dfs.sh
查看HDFS的守护进程,如下图所示:
$ jps
如果看到如下进程,说明HDFS服务已经正确启动。
2626 DataNode4471 Jps2488 NameNode2812 SecondaryNameNode
注意:需要在配置文件/etc/profile文件中打开hadoop3的相关环境变量设置。
1、在终端窗口中,执行以下命令,查看hdfs shell帮助信息:
$ hdfs dfs -help
如下图所示:
Usage: hadoop fs [generic options]
[-appendToFile <localsrc> ... <dst>]
[-cat [-ignoreCrc] <src> ...]
[-checksum [-v] <src> ...]
[-chgrp [-R] GROUP PATH...]
[-chmod [-R] <MODE[,MODE]... | OCTALMODE> PATH...]
[-chown [-R] [OWNER][:[GROUP]] PATH...]
[-copyFromLocal [-f] [-p] [-l] [-d] [-t <thread count>] <localsrc> ... <dst>]
[-copyToLocal [-f] [-p] [-ignoreCrc] [-crc] <src> ... <localdst>]
[-count [-q] [-h] [-v] [-t [<storage type>]] [-u] [-x] [-e] <path> ...]
[-cp [-f] [-p | -p[topax]] [-d] <src> ... <dst>]
[-createSnapshot <snapshotDir> [<snapshotName>]]
[-deleteSnapshot <snapshotDir> <snapshotName>]
[-df [-h] [<path> ...]]
[-du [-s] [-h] [-v] [-x] <path> ...]
[-expunge [-immediate] [-fs <path>]]
[-find <path> ... <expression> ...]
[-get [-f] [-p] [-ignoreCrc] [-crc] <src> ... <localdst>]
[-getfacl [-R] <path>]
[-getfattr [-R] {-n name | -d} [-e en] <path>]
[-getmerge [-nl] [-skip-empty-file] <src> <localdst>]
[-head <file>]
[-help [cmd ...]]
[-ls [-C] [-d] [-h] [-q] [-R] [-t] [-S] [-r] [-u] [-e] [<path> ...]]
[-mkdir [-p] <path> ...]
[-moveFromLocal <localsrc> ... <dst>]
[-moveToLocal <src> <localdst>]
[-mv <src> ... <dst>]
[-put [-f] [-p] [-l] [-d] <localsrc> ... <dst>]
[-renameSnapshot <snapshotDir> <oldName> <newName>]
[-rm [-f] [-r|-R] [-skipTrash] [-safely] <src> ...]
[-rmdir [--ignore-fail-on-non-empty] <dir> ...]
[-setfacl [-R] [{-b|-k} {-m|-x <acl_spec>} <path>]|[--set <acl_spec> <path>]]
[-setfattr {-n name [-v value] | -x name} <path>]
[-setrep [-R] [-w] <rep> <path> ...]
[-stat [format] <path> ...]
[-tail [-f] [-s <sleep interval>] <file>]
[-test -[defswrz] <path>]
[-text [-ignoreCrc] <src> ...]
[-touch [-a] [-m] [-t TIMESTAMP ] [-c] <path> ...]
[-touchz <path> ...]
[-truncate [-w] <length> <path> ...]
[-usage [cmd ...]]
......
会列出所有的HDFS文件系统操作的命令,以及每个命令的用法。
2、可以进一步查询某个命令的具体使用方法。例如,要查看"ls"命令的用法,在终端窗口执行如下命令:
$ hdfs dfs -help ls
窗口中会给出"ls"这个命令的具体用法说明,如下所示:
-ls [-C] [-d] [-h] [-q] [-R] [-t] [-S] [-r] [-u] [-e] [<path> ...] :
List the contents that match the specified file pattern. If path is not
specified, the contents of /user/<currentUser> will be listed. For a directory a
list of its direct children is returned (unless -d option is specified).
Directory entries are of the form:
permissions - userId groupId sizeOfDirectory(in bytes)
modificationDate(yyyy-MM-dd HH:mm) directoryName
and file entries are of the form:
permissions numberOfReplicas userId groupId sizeOfFile(in bytes)
modificationDate(yyyy-MM-dd HH:mm) fileName
-C Display the paths of files and directories only.
-d Directories are listed as plain files.
-h Formats the sizes of files in a human-readable fashion
rather than a number of bytes.
-q Print ? instead of non-printable characters.
-R Recursively list the contents of directories.
-t Sort files by modification time (most recent first).
-S Sort files by size.
-r Reverse the order of the sort.
-u Use time of last access instead of modification for
display and sorting.
-e Display the erasure coding policy of files and directories.
3、例如,要查看"get"和"put"两个命令的用法,在终端窗口执行如下命令:
$ hdfs dfs -help get put
窗口中会给出"ls"这个命令的具体用法说明,如下所示:
-get [-f] [-p] [-ignoreCrc] [-crc] <src> ... <localdst> :
Copy files that match the file pattern <src> to the local name. <src> is kept.
When copying multiple files, the destination must be a directory. Passing -f
overwrites the destination if it already exists and -p preserves access and
modification times, ownership and the mode.
-put [-f] [-p] [-l] [-d] <localsrc> ... <dst> :
Copy files from the local file system into fs. Copying fails if the file already
exists, unless the -f flag is given.
Flags:
-p Preserves access and modification times, ownership and the mode.
-f Overwrites the destination if it already exists.
-l Allow DataNode to lazily persist the file to disk. Forces
replication factor of 1. This flag will result in reduced
durability. Use with care.
-d Skip creation of temporary file(<dst>._COPYING_).
1、查看HDFS文件系统的根目录。在终端窗口中,执行如下命令:
$ hdfs dfs -ls /
2、查看HDFS文件系统的用户主目录。在终端窗口中,执行如下命令:
$ hdfs dfs -ls /user
3、在HDFS文件系统的根目录下创建子目录data。在终端窗口中,执行如下命令:
$ hdfs dfs -mkdir /data
然后查看HDFS文件系统的根目录和/data/目录:
$ hdfs dfs -ls /$ hdfs dfs -ls /data
4、在HDFS文件系统的"/data"目录下创建嵌套子目录"hadoop/mr",需要使用参数"-p"。在终端窗口中,执行如下命令:
$ hdfs dfs -mkdir -p /data/hadoop/mr
然后查看HDFS文件系统的根目录和/data/目录:
$ hdfs dfs -ls /data$ hdfs dfs -ls /data/hadoop$ hdfs dfs -ls /data/hadoop/mr
1、首先在本地编辑一个文本文件。在终端窗口中,执行如下命令:
$ mkdir ~/hello$ cd ~/hello$ vi study.txt
编辑study.txt文本文件的内容如下:
good good study
day day upto be or not to be
this is a question
2、将study.txt上传到HDFS上。在终端窗口中,执行如下命令:
$ hdfs dfs -put ~/hello/study.txt /data/
使用如下命令查看上传到HDFS的文件:
$ hdfs dfs -ls /data/$ hdfs dfs -cat /data/study.txt
3、将hello文件夹及其中的文件上传到HDFS上。在终端窗口中,执行如下命令:
$ hdfs dfs -put ~/hello /data/
使用如下命令查看上传到HDFS的文件:
$ hdfs dfs -ls /data/$ hdfs dfs -ls /data/hello$ hdfs dfs -cat /data/hello/study.txt
当内容过多的时候,只想查看前几条内容或者后几条内容时,操作命令如下:
$ hdfs dfs -cat /data/hello/study.txt|head -2
当内容过多的时候,只想查看最后1K的内容时,操作命令如下:
$ hdfs dfs -tail /data/hello/study.txt
也可以将HDFS上的文件或文件夹下载到本地文件系统。
1、在终端窗口,执行如下命令,将HDFS上的文件下载到本地,并改名:
$ hdfs dfs -get /data/study.txt ~/study2.txt
查看下载到本地的文件内容:
$ cat ~/study2.txt
2、在终端窗口,执行如下命令,将HDFS上的文件夹下载到本地,并改名:
$ hdfs dfs -get /data ~/
查看下载到本地的文件夹内容:
$ ls ~/data/
3、在终端窗口,执行如下命令,将HDFS上的文件夹下载到本地,并改名:
$ hdfs dfs -get /data ~/data2
查看下载到本地的文件内容:
$ ls ~/data2
可以在HDFS中对文件、文件夹重命名或移动。
1、将HDFS中的"/data/study.txt"重命名为"wc.txt"。在终端窗口中,执行如下命令:
$ hdfs dfs -mv /data/study.txt /data/wc.txt
查看重命名的文件,如下图所示:
$ hdfs dfs -ls /data/
2、将HDFS中的"/data/hadoop/mr"目录重命名为"mapreduce"。在终端窗口中,执行如下命令:
$ hdfs dfs -mkdir -p /data/hadoop/mr$ hdfs dfs -mv /data/hadoop/mr /data/hadoop/mapreduce
查看重命名的目录:
$ hdfs dfs -ls /data/hadoop/
1、将HDFS上的"/data/wc.txt"文件移动到HDFS的根目录下:
$ hdfs dfs -mv /data/wc.txt /
查看移动后的文件是否存在:
$ hdfs dfs -ls /
2、将HDFS上的"/data/hadoop"文件夹移动到HDFS的根目录下:
$ hdfs dfs -mv /data/hadoop /
查看移动后的文件是否存在:
$ hdfs dfs -ls /
1、在终端窗口中,使用如下的命令,删除HDFS根目录下的"wc.txt"文件:
$ hdfs dfs -rm /wc.txt
查看是否删除文件,如下图所示:
$ hdfs dfs -ls /
2、在终端窗口中,使用如下的命令,删除HDFS根目录下的"hadoop"文件夹及其内容:
$ hdfs dfs -rm -r /hadoop
查看是否删除文件夹,如下图所示:
$ hdfs dfs -ls /
搭建好Hadoop3环境之后,可以使用HDFS shell命令对HDFS文件系统进行操作。
调用文件系统(HDFS)shell命令应使用bin/hdfs dfs 的形式。
所有的HDFS shell命令使用URI路径作为参数。URI格式是scheme: //authority/path。HDFS的scheme是HDFS,对本地文件系统,scheme是file。其中scheme和authority参数都是可选的,如果未加指定,就会使用配置中指定的默认scheme。
命令行
2-2HDFSShell文件权限操作
1、修改根目录的所有文件的权限
2、修改文件属主
3、修改文件的操作权限
4、修改文件的属组和属主
5、HDFS安全模式管理
熟练掌握HDFS的文件权限操作命令。
硬件:Ubuntu16.04
软件:JDK-1.8、Hadoop-3.3
数据存放路径:/data/dataset
tar包路径:/data/software
tar包压缩路径:/data/bigdata
软件安装路径:/opt
实验设计创建文件:/data/resource
1、启动HDFS:
$ cd /opt/hadoop-3.3.0/sbin/$ ./start-dfs.sh
查看HDFS的守护进程,如下图所示:
$ jps
如果看到如下进程,说明HDFS服务已经正确启动。
2626 DataNode4471 Jps2488 NameNode2812 SecondaryNameNode
注意:需要在配置文件/etc/profile文件中打开hadoop3的相关环境变量设置。
1、在终端窗口中,执行以下命令,查看HDFS文件和目录的权限:
$ hdfs dfs -mkdir /data$ hdfs dfs -put /data/dataset/user.txt /$ hdfs dfs -ls /
其中以"d"开头的是文件夹,以"-"开头的是文件。
drwxr-xr-x - root supergroup 0 2020-09-04 14:01 /data
drwx------ - root supergroup 0 2020-09-02 11:01 /tmp
drwxr-xr-x - root supergroup 0 2020-09-02 11:01 /user
-rw-r--r-- 1 root supergroup 245 2020-09-04 14:06 /user.txt
2、修改"/data"目录的所有文件的权限(-R表示目录下所有文件)。在终端窗口执行如下命令:
$ hdfs dfs -chmod -R 777 /data
查看修改后的文件权限,如下所示:
drwxrwxrwx - root supergroup 0 2020-09-04 14:01 /data
drwx------ - root supergroup 0 2020-09-02 11:01 /tmp
drwxr-xr-x - root supergroup 0 2020-09-02 11:01 /user
-rw-r--r-- 1 root supergroup 245 2020-09-04 14:06 /user.txt
3、修改文件所有者(属主)。
首先查看修改前的所有者信息。在终端窗口执行如下命令:
$ hdfs dfs -ls /
输出结果如下所示:
drwxrwxrwx - root supergroup 0 2020-09-04 13:55 /data
将"/data"的所有者修改为"zhangsan"。在终端窗口中,执行如下的命令:
$ hdfs dfs -chown zhangsan /data
查看修改后的所有者信息。在终端窗口执行如下命令:
$ hdfs dfs -ls /
输出结果如下所示:
drwxrwxrwx - zhangsan supergroup 0 2020-09-04 14:01 /data
drwx------ - root supergroup 0 2020-09-02 11:01 /tmp
drwxr-xr-x - root supergroup 0 2020-09-02 11:01 /user
-rw-r--r-- 1 root supergroup 245 2020-09-04 14:06 /user.txt
可以看到,HDFS上的"/data"文件所有者已经修改为了"zhangsan"。
4、修改文件的属组。
首先查看修改前的所属组信息。在终端窗口执行如下命令:
$ hdfs dfs -ls /
输出结果如下所示:
drwxrwxrwx - zhangsan supergroup 0 2020-09-04 14:01 /data
drwx------ - root supergroup 0 2020-09-02 11:01 /tmp
drwxr-xr-x - root supergroup 0 2020-09-02 11:01 /user
-rw-r--r-- 1 root supergroup 245 2020-09-04 14:06 /user.txt
将"/user.txt"的所属组修改为"zhangsan"。在终端窗口中,执行如下的命令:
$ hdfs dfs -chgrp zhangsan /user.txt
查看修改后的所属性组信息。在终端窗口执行如下命令:
$ hdfs dfs -ls /
输出结果如下所示:
drwxrwxrwx - zhangsan supergroup 0 2020-09-04 14:01 /data
drwx------ - root supergroup 0 2020-09-02 11:01 /tmp
drwxr-xr-x - root supergroup 0 2020-09-02 11:01 /user
-rw-r--r-- 1 root zhangsan 245 2020-09-04 14:06 /user.txt
可以看到,HDFS上的"/user.txt"文件所属组已经修改为了"zhangsan"。
5、同时修改文件/文件夹的属主和属组。
将"/data"的属主和属组都修改为"hduser"。在终端窗口中,执行如下的命令:
$ hdfs dfs -chown hduser:hduser /data
查看修改后的所属性组信息。在终端窗口执行如下命令:
$ hdfs dfs -ls /
输出结果如下所示:
drwxrwxrwx - hduser hduser 0 2020-09-04 14:01 /data
drwx------ - root supergroup 0 2020-09-02 11:01 /tmp
drwxr-xr-x - root supergroup 0 2020-09-02 11:01 /user
-rw-r--r-- 1 root zhangsan 245 2020-09-04 14:06 /user.txt
可以看到,HDFS上的"/data"文件所属组已经修改为了"hduser"。
1、查看HDFS文件系统当前是否处在安全模式。在终端窗口中,执行如下命令:
$ hdfs dfsadmin -safemode get
输出内容如下:
Safe mode is OFF
说明当前安全模式已关闭,HDFS文件系统并非处于安全模式。
2、让HDFS文件系统进入安全模式。在终端窗口中,执行如下命令:
$ hdfs dfsadmin -safemode enter
再次查看当前HDFS是否处于安全模式。在终端窗口中,执行如下命令:
$ hdfs dfsadmin -safemode get
输出内容如下:
Safe mode is ON
说明当前安全模式已打开,HDFS文件系统正处于安全模式。
3、让HDFS文件系统离开安全模式。在终端窗口中,执行如下命令:
$ hdfs dfsadmin -safemode leave
说明HDFS文件系统已经离开(关闭)安全模式。
4、可以使用如下命令强制HDFS退出安全模式。在终端窗口中,执行如下命令:
$ hdfs dfsadmin -safemode forceExit
Hadoop文件的操作权限和linux非常类似,可以通过hadoop的命令进行修改,从而控制文件的操作权限,增加集群文件的保密性。在日常的工作中安全对企业是非常重要的。
桌面运行
第3章
Hadoop3 I/O
3-1序列化_Writable接口
1、实现字节数组的序列化和反序列化操作
2、实现HDFS输入输出流的序列化和反序列化操作
理解序列化和反序列的概念;掌握Writable类型的序列化和反序列化操作。
硬件:Ubuntu16.04
软件:JDK-1.8、Hadoop-3.3
数据存放路径:/data/dataset
tar包路径:/data/software
tar包压缩路径:/data/bigdata
软件安装路径:/opt
实验设计创建文件:/data/resource
注意:需要在配置文件/etc/profile文件中打开hadoop3的相关环境变量设置。
#### 启动HDFS集群服务
1、在终端窗口中,执行如下命令,启动HDFS集群。
$ cd /opt/hadoop-3.3.0/sbin/$ ./start-all.sh
2、在终端窗口中,执行如下命令,查看HDFS服务启动情况:
$ jps
1、启动开发工具Eclipse。
2、打开eclipse开发工具后,在菜单栏中,选择【File】->【New】->【Project】->【Java project】创建Java项目并命名为【Hadoop3Demo】,点击【Finish】完成创建,如下图所示:
3、导入hadoop相关的jar包,首先右击项目选择【New】—>【Folder】创建一个【lib】文件夹并把指定位置中(/data/software/hadoop3_lib/)的包放入该文件中。如下图所示:
4、把lib下所有的jar包导入到环境变量,首先全选【lib】文件夹下的jar包文件,右键点击,选择【build path】->【add to build path】,添加后,发现在项目下多一个列表项【Referenced Libraries】。如下图所示:
Hadoop提供了自己的Text数据类型,用来存储UTF8文本。
1、在项目【src】目录下,单击右键,创建名为"com.simple.IntWritableDemo"的Java类。
2、在IntWritableDemo类中,编写两个辅助方法,分别实现对IntWritable类型数据的序列化和反序列化操作。实现代码如下:
package com.simple;
import java.io.ByteArrayInputStream;import java.io.ByteArrayOutputStream;import java.io.DataInputStream;import java.io.DataOutputStream;import java.io.IOException;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Writable;import org.apache.hadoop.util.StringUtils;public class IntWritableDemo {
// 序列化:辅助方法,捕获序列化流中的字节
public static byte[] serialize(Writable writable) throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
DataOutputStream dataOut = new DataOutputStream(out);
writable.write(dataOut);
dataOut.close();
return out.toByteArray();
}
// 反序列化
public static byte[] deserialize(Writable writable, byte[] bytes) throws IOException {
ByteArrayInputStream in = new ByteArrayInputStream(bytes);
DataInputStream dataIn = new DataInputStream(in);
writable.readFields(dataIn);
dataIn.close();
return bytes;
}
}
3、编辑IntWritableDemo类的main方法,先序列化一个IntWritable类型的整数,然后再对其输出进行反序列化操作。实现代码如下所示:
package com.simple;
import java.io.ByteArrayInputStream;import java.io.ByteArrayOutputStream;import java.io.DataInputStream;import java.io.DataOutputStream;import java.io.IOException;
import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Writable;import org.apache.hadoop.util.StringUtils;
public class IntWritableDemo {
public static void main(String[] args) throws IOException {
// Writable类型的整数
IntWritable writable = new IntWritable(163);
// 序列化
byte[] bytes = serialize(writable);
// 十六进制表示
System.out.println(StringUtils.byteToHexString(bytes));
// 反序列化
// 构造一个新的、无值的IntWritable,然后调用deserialize()从刚刚写入的输出数据中读取数据。
IntWritable newWritable = new IntWritable();
deserialize(newWritable, bytes);
System.out.println(newWritable.get());
}
// 序列化:辅助方法,捕获序列化流中的字节
public static byte[] serialize(Writable writable) throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
DataOutputStream dataOut = new DataOutputStream(out);
writable.write(dataOut);
dataOut.close();
return out.toByteArray();
}
// 反序列化
public static byte[] deserialize(Writable writable, byte[] bytes) throws IOException {
ByteArrayInputStream in = new ByteArrayInputStream(bytes);
DataInputStream dataIn = new DataInputStream(in);
writable.readFields(dataIn);
dataIn.close();
return bytes;
}
}
4、在代码的任意空白处,单击右键,在弹出的环境菜单中,选择"【Run As】->【Java Application】"菜单项,运行程序。操作如下图所示:
3、如果一切正常,则可以观察到Eclipse控制台输出信息如下:
000000a3163
Writable用于在Hadoop中创建序列化的数据类型。Hadoop的基于Writable的序列化框架提供了对数据更有效且可自定义的序列化和表示。与Java的序列化相比,Hadoop的Writable框架并不写入每个对象的类型名称,这使得序列化处理更快,结果更紧凑。
1、在项目【src】目录下,单击右键,创建名为"com.simple.IntWritableDemo2"的Java类,并编辑源代码如下:
package com.simple;
import java.io.IOException;import java.net.URI;
import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.FSDataOutputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.zookeeper.common.IOUtils;
public class IntWritableDemo2 {
static final String hdfsurl = "hdfs://localhost:9000";
public static void main(String[] args) throws IOException {
Configuration conf = new Configuration(); // 获取环境变量
FileSystem fs = FileSystem.get(URI.create(hdfsurl), conf);// 获取文件系统实例
// (1) 序列化过程
// Writable类型的整数
IntWritable writable = new IntWritable(163);
// 构造HDFS输出流
FSDataOutputStream fsout = fs.create(new Path("/int.data"));
writable.write(fsout); // 把序列化值写入到int.data文件中
IOUtils.closeStream(fsout);
// (2) 反序列化过程
// 构造一个新的、无值的IntWritable
IntWritable newWritable = new IntWritable();
// 构造HDFS输入流
FSDataInputStream fsin = fs.open(new Path("/int.data")); // 建立输入流
newWritable.readFields(fsin); // 反序列化流中的数据
System.out.println(newWritable.get());
IOUtils.closeStream(fsin);
}
}
在上面的代码中,我们先把一个IntWritable类型的变量序列化到HDFS文件系统中存储(通过调用writable.write(fsout)方法),然后又通过一个反序列化过程将存储的序列化数据反序列化到一个新的、无值的IntWritable变量中,最后通过其get方法提取其封装的整数值。
2、在代码的任意空白处,单击右键,在弹出的环境菜单中,选择"【Run As】->【Java Application】"菜单项,运行程序。操作如下图所示:
3、如果一切正常,则可以观察到Eclipse控制台输出信息如下:
163
4、在终端窗口中,执行如下代码,查看在HDFS文件系统中存储序列化数据的文件:
$ hdfs dfs -ls /
可以观察到,在HDFS文件系统中已经生成了"int.data"数据文件,如下所示:
-rw-r--r-- 3 hduser supergroup 4 2020-08-25 19:01 /int.data
Hadoop提供了一个WritableComparator类,它是RawComparator的一个通用实现,用于对两个WritableComparable类进行比较。它不但实现了原始的compare方法,并且可以作为RawComparator实例的工厂类。
1、在项目【src】目录下,单击右键,创建名为"com.simple.WritableComparatorDemo"的Java类,并编辑源代码如下:
package com.simple;
import java.io.ByteArrayOutputStream;import java.io.DataOutputStream;import java.io.IOException;
import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.RawComparator;import org.apache.hadoop.io.Writable;import org.apache.hadoop.io.WritableComparator;
public class WritableComparatorDemo {
public static void main(String[] args) throws IOException {
RawComparator<IntWritable> comparator = WritableComparator.get(IntWritable.class);
// 比较两个IntWritable对象
IntWritable w1 = new IntWritable(163);
IntWritable w2 = new IntWritable(67);
IntWritable w3 = new IntWritable(163);
IntWritable w4 = new IntWritable(267);
int result = comparator.compare(w1, w2);
System.out.println(result); // 1,w1大于w2
result = comparator.compare(w1, w3);
System.out.println(result); // 0,w1等于w2
result = comparator.compare(w1, w4);
System.out.println(result); // -1,w1小于w2
// 或者比较它们的序列化表示
byte[] b1 = serialize(w1);
byte[] b2 = serialize(w2);
int result2 = comparator.compare(b1, 0, b1.length, b2, 0, b2.length);
System.out.println(result2); // 1, b1等于b2
}
// 序列化:辅助方法,捕获序列化流中的字节
public static byte[] serialize(Writable writable) throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
DataOutputStream dataOut = new DataOutputStream(out);
writable.write(dataOut);
dataOut.close();
return out.toByteArray();
}
}
2、在代码的任意空白处,单击右键,在弹出的环境菜单中,选择"【Run As】->【Java Application】"菜单项,运行程序。操作如下图所示:
3、如果一切正常,则可以观察到Eclipse控制台输出信息如下:
1
0-1
1
Hadoop使用基于Writable接口的类作为HDFS文件中数据的数据类型。该Writable接口定义了在传输和存储该数据时Hadoop应该怎样序列化和反序列化这个值。
相对于使用Java的本地序列化框架,Hadoop的基于Writable的序列化框架提供了对数据更有效且可自定义的序列化和表示。与 Java的序列化相比,Hadoop的Writable框架并不写入每个对象的类型名称,这使得序列化处理更快,结果更紧凑,可随机访问的序列化数据格式易于被非Java客户端解释。Hadoop的基于Writable的序列化还通过重用该Writable对象来减少对象创建负载,而Java的本地化序列框架是不可能做到这一点的。
3-2序列化_Writable类
1、使用Hadoop的Text数据类型
2、使用Hadoop的IntWritable数据类型
3、掌握自定义Hadoop序列化数据类型
掌握Hadoop基于Writable接口的序列化数据类型的使用。
硬件:Ubuntu16.04
软件:JDK-1.8、Hadoop-3.3
数据存放路径:/data/dataset
tar包路径:/data/software
tar包压缩路径:/data/bigdata
软件安装路径:/opt
实验设计创建文件:/data/resource
注意:需要在配置文件/etc/profile文件中打开hadoop3的相关环境变量设置。
1、在终端窗口中,执行如下命令,启动HDFS集群。
$ cd /opt/hadoop-3.3.0/sbin/$ ./start-all.sh
2、在终端窗口中,执行如下命令,查看HDFS服务启动情况:
$ jps
1、启动开发工具Eclipse。
2、打开eclipse开发工具后,在菜单栏中,选择【File】->【New】->【Project】->【Java project】创建Java项目并命名为【Hadoop3Demo】,点击【Finish】完成创建,如下图所示:
3、导入hadoop相关的jar包,首先右击项目选择【New】—>【Folder】创建一个【lib】文件夹并把指定位置中(/data/software/hadoop3_lib/)的包放入该文件中。如下图所示:
4、把lib下所有的jar包导入到环境变量,首先全选【lib】文件夹下的jar包文件,右键点击,选择【build path】->【add to build path】,添加后,发现在项目下多一个列表项【Referenced Libraries】。如下图所示:
Hadoop提供了自己的Text数据类型,用来存储UTF8文本。
1、在项目【src】目录下,单击右键,创建名为"com.simple.TextDemo"的Java类,并编辑源代码如下:
package com.simple;import org.apache.hadoop.io.Text;
// I/O中序列化类型public class TextDemo {
public static void main(String[] args) {
Text text = new Text("hello world"); // 定义Text类型
System.out.println(text.getLength()); // 定义Text类型长度 ——11
System.out.println(text.find("lo")); // 获取lo对应的位置——3
System.out.println(text.find("world")); // 获取world对应的位置——6
System.out.println(text.charAt(0)); // 获取第0个字符量——104
Text text1 = new Text(); // 创建一个Text类型实例
text1.set("had"); // 进行赋值
text1.append("oop".getBytes(), 0, "oop".getBytes().length); // 追加数据
System.out.println(text1); // 输出结果
}
}
2、在代码的任意空白处,单击右键,在弹出的环境菜单中,选择"【Run As】->【Java Application】"菜单项,运行程序。操作如下图所示:
3、如果一切正常,则可以观察到Eclipse控制台输出信息如下:
11
3
6
104
hadoop
虽然在序列化和反序列化时使用Text类型来封装字符串数据,但在处理Text类型的数据时,通常将其转为Java String类型,目的是为了利用Java String类型丰富的字符串操作方法。
1、在项目【src】目录下,单击右键,创建名为"com.simple.TextToUpper"的Java类,并编辑源代码如下:
package com.simple;
import org.apache.hadoop.io.Text;
public class TextToUpper {
public static void main(String[] args) {
Text text = new Text("hello is the best cloud language");// 设置Text值
String str = text.toString(); // 从Hadoop Text类型转为Java String类型
String[] temp = str.split(" "); // 对字符串进行拆分
for (int i = 0; i < temp.length; i++) { // 开始进行处理
// 实现tem[1]串首字母变大写
temp[i] = temp[i].substring(0, 1).toUpperCase() + temp[i].substring(1);
System.out.println(temp[i].toString()); //打印输出
}
StringBuffer sb = new StringBuffer(); // 设置辅助可变字符串
for (String string : temp) { // 转化为可变字符串
sb.append(string);
sb.append(" ");
}
text.set(sb.toString()); // 重新设置Text内容
System.out.println(text.toString()); // 打印输出
}
}
2、在代码的任意空白处,单击右键,在弹出的环境菜单中,选择"【Run As】->【Java Application】"菜单项,运行程序。操作如下图所示:
3、如果一切正常,则可以观察到Eclipse控制台输出信息如下:
Hello
Is
TheBestCloudLanguage
Hello Is The Best Cloud Language
Hadoop提供了自己的IntWritable数据类型,用来存储int类型整数。
1、在项目【src】目录下,单击右键,创建名为"com.simple.IntWritableDemo"的Java类,并编辑源代码如下:
package com.simple;
import java.io.IOException;import org.apache.hadoop.io.IntWritable;
public class IntWritableDemo {
public static void main(String[] args) throws IOException {
IntWritable iw = new IntWritable(123);
System.out.println(iw.get()); // 提取存储的int值
System.out.println(iw.compareTo(iw)); // 比较,结果为-1,0,1
}
}
2、在代码的任意空白处,单击右键,在弹出的环境菜单中,选择"【Run As】->【Java Application】"菜单项,运行程序。操作如下图所示:
3、如果一切正常,则可以观察到Eclipse控制台输出信息如下:
123
0
BytesWritable是一个二进制数据数组的包装器。它的序列化格式是一个4字节的整数字段,指定后面跟着字节数和字节本身。
1、在项目【src】目录下,单击右键,创建名为"com.simple.ByteWritableDemo"的Java类,并编辑源代码如下:
package com.simple;
import java.io.ByteArrayOutputStream;import java.io.DataOutputStream;import java.io.IOException;
import org.apache.hadoop.io.BytesWritable;import org.apache.hadoop.io.Writable;import org.apache.hadoop.util.StringUtils;
public class ByteWritableDemo {
public static void main(String[] args) throws IOException {
// BytesWritable是一个二进制数据数组的包装器。
// 它的序列化格式是一个4字节的整数字段,指定后面跟着字节数和字节本身。
BytesWritable b = new BytesWritable(new byte[] { 3, 5, 7});
byte[] bytes = serialize(b);
String hexStr = StringUtils.byteToHexString(bytes);
System.out.println(hexStr);
// BytesWritable是可变的,它的值可以通过调用它的set()方法来更改
byte[] b2 = new byte[] {9,11,13,15};
b.set(b2, 0, b2.length);
hexStr = StringUtils.byteToHexString(serialize(b));
System.out.println(hexStr);
// 可以通过调用getLength()来确定BytesWritable的大小
b.setCapacity(11);
System.out.println(b.getLength());
System.out.println(b.getBytes().length);
}
// 序列化:辅助方法,捕获序列化流中的字节
public static byte[] serialize(Writable writable) throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
DataOutputStream dataOut = new DataOutputStream(out);
writable.write(dataOut);
dataOut.close();
return out.toByteArray();
}
}
2、在代码的任意空白处,单击右键,在弹出的环境菜单中,选择"【Run As】->【Java Application】"菜单项,运行程序。操作如下图所示:
3、如果一切正常,则可以观察到Eclipse控制台输出信息如下:
00000003030507
00000004090b0d0f
4
11
在org.apache.hadoop.io包中包括六种Writable 集合类型:
- ArrayWritable
- ArrayPrimitiveWritable
- twodarrayrayraywritable
- MapWritable
- SortedMapWritable
- EnumSetWritable
1、在项目【src】目录下,单击右键,创建名为"com.simple.MapWritableDemo"的Java类,并编辑源代码如下:
package com.simple;
import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.MapWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.VIntWritable;import org.apache.hadoop.util.ReflectionUtils;
public class MapWritableDemo {
public static void main(String[] args) throws IOException {
MapWritable src = new MapWritable();
src.put(new IntWritable(1), new Text("cat"));
src.put(new VIntWritable(2), new LongWritable(163));
MapWritable dest = new MapWritable();
// 使用序列化将Writable对象复制到另一个MapWritable
Configuration conf = new Configuration();
ReflectionUtils.copy(conf, src, dest);
Text text1 = (Text) dest.get(new IntWritable(1)); // "cat"
System.out.println(text1.toString());
LongWritable long2 = (LongWritable) dest.get(new VIntWritable(2)); // 163
System.out.println(long2.get());
}
}
2、在代码的任意空白处,单击右键,在弹出的环境菜单中,选择"【Run As】->【Java Application】"菜单项,运行程序。操作如下图所示:
3、如果一切正常,则可以观察到Eclipse控制台输出信息如下:
cat
163
Hadoop提供了大量的Writable实现类, 均位于org.apache.hadoop.io包中。
除了char类型(可存储在IntWritable中)之外,所有Java基本类型都有Writable的包装器。它们都有一个get()和set()方法来检索和存储包装后的值。
这些Java基本类型的Writable包装器类如下表所示:
3-3序列化_自定义Writable类
1、自定义一个可在HDFS中存储和可比较的订单类;
2、将该订单类实例序列化和反序列化到HDFS中。
掌握如何自定义Writable类型。
硬件:Ubuntu16.04
软件:JDK-1.8、Hadoop-3.3
数据存放路径:/data/dataset
tar包路径:/data/software
tar包压缩路径:/data/bigdata
软件安装路径:/opt
实验设计创建文件:/data/resource
注意:需要在配置文件/etc/profile文件中打开hadoop3的相关环境变量设置。
#### 启动HDFS集群服务
1、在终端窗口中,执行如下命令,启动HDFS集群。
$ cd /opt/hadoop-3.3.0/sbin/$ ./start-all.sh
2、在终端窗口中,执行如下命令,查看HDFS服务启动情况:
$ jps
1、启动开发工具Eclipse。
2、打开eclipse开发工具后,在菜单栏中,选择【File】->【New】->【Project】->【Java project】创建Java项目并命名为【Hadoop3Demo】,点击【Finish】完成创建,如下图所示:
3、导入hadoop相关的jar包,首先右击项目选择【New】—>【Folder】创建一个【lib】文件夹并把指定位置中(/data/software/hadoop3_lib/)的包放入该文件中。如下图所示:
4、把lib下所有的jar包导入到环境变量,首先全选【lib】文件夹下的jar包文件,右键点击,选择【build path】->【add to build path】,添加后,发现在项目下多一个列表项【Referenced Libraries】。如下图所示:
1、在项目【src】目录下,单击右键,创建名为"com.simple.OrderWritable"的Java类,并编辑源代码如下:
package com.simple;
import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import java.util.Date;
import org.apache.hadoop.io.WritableComparable;
// 自定义Writable类public class OrderWritable implements WritableComparable<OrderWritable> {
private long orderId; // 订单号
private Date orderDate; // 订单日期
private double money; // 订单金额
private String userName;
public OrderWritable() {
super();
}
public OrderWritable(long orderId, Date orderDate, double money, String userName) {
super();
this.orderId = orderId;
this.orderDate = orderDate;
this.money = money;
this.userName = userName;
}
public long getOrderId() {
return orderId;
}
public void setOrderId(long orderId) {
this.orderId = orderId;
}
public Date getOrderDate() {
return orderDate;
}
public void setOrderDate(Date orderDate) {
this.orderDate = orderDate;
}
public double getMoney() {
return money;
}
public void setMoney(double money) {
this.money = money;
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
/**
* 只对比orderId
*/
@Override
public boolean equals(Object obj) {
if (obj instanceof OrderWritable) {
OrderWritable other = (OrderWritable) obj;
return other.getOrderId() == this.getOrderId();
}
return false;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + (int) (orderId ^ (orderId >>> 32));
return result;
}
/**
* 序列化
*/
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(this.orderId);
out.writeUTF(this.userName);
out.writeDouble(this.money);
out.writeLong(this.orderDate.getTime());
}
/**
* 反序列化
*/
@Override
public void readFields(DataInput in) throws IOException {
this.orderId = in.readLong();
this.userName = in.readUTF();
this.money = in.readDouble();
this.orderDate = new Date(in.readLong());
}
/**
* 只比较orderId
*/
@Override
public int compareTo(OrderWritable obj) {
int result = -1;
if (obj instanceof OrderWritable) {
OrderWritable other = (OrderWritable) obj;
if (this.getOrderId() > other.getOrderId()) {
result = 1;
} else if (this.getOrderId() == other.getOrderId()) {
result = 0;
}
}
return result;
}
@Override
public String toString() {
return "订单: [orderId=" + orderId
+ ", orderDate=" + orderDate.toLocaleString()
+ ", money=" + money + ", userName="
+ userName + "]";
}
}
1、在项目【src】目录下,单击右键,创建名为"com.simple.OrderWritableDemo2"的Java类,并编辑源代码如下:
package com.simple;import java.io.IOException;import java.net.URI;import java.util.Date;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.FSDataOutputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IOUtils;public class OrderWritableDemo2 {
static String hdfsurl = "hdfs://localhost:9000";
public static void main(String[] args) throws IOException, InterruptedException {
// 生成两个订单数据
OrderWritable orderOne = new OrderWritable(1001, new Date(), 102345.00, "订单-one");
OrderWritable orderTwo = new OrderWritable(1002, new Date(), 357685.00, "订单-two");
// 连接HDFS文件系统
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(hdfsurl), conf);
// (1) 将第一个订单序列化保存到HDFS中
FSDataOutputStream fsout = fs.create(new Path("/order.data"));
orderOne.write(fsout); // 把序列化值写入到int.data文件中
IOUtils.closeStream(fsout);
// (2) 将存储在HDFS中的序列化订单数据反序列化到一个新的、无值的IntWritable
OrderWritable newOrder = new OrderWritable();
FSDataInputStream fsin = fs.open(new Path("/order.data")); // 建立输入流
newOrder.readFields(fsin);
System.out.println(newOrder); // 输出反序列化的订单信息
IOUtils.closeStream(fsin);
// 对比两个订单是否相同
if(orderOne == orderTwo) {
System.out.println("两个订单相同");
}else {
System.out.println("两个订单不相同");
}
}
}
2、在代码的任意空白处,单击右键,在弹出的环境菜单中,选择"【Run As】->【Java Application】"菜单项,运行程序。操作如下图所示:
3、如果一切正常,则可以观察到Eclipse控制台输出信息如下:
订单: [orderId=1001, orderDate=2020-8-25 21:00:47, money=102345.0, userName=订单-one]
两个订单不相同
4、在终端窗口中,执行如下的命令,查看序列化到HDFS中的文件:
$ hdfs dfs -ls /
应该可以看到生成的序列化文件,如下所示:
-rw-r--r-- 3 hduser supergroup 36 2020-08-25 21:00 /order.data
有可能会出现这样的情形:没有一个内置的数据类型满足你的业务需求,或者一个经过优化的自定义数据类型有可能比Hadoop内置的数据类型性能更好。在这种场景下,我们可以通过实现org.apache.hadoop.io.Writable接口很容易地编写一个自定义的Writable数据类型。
在Hadoop中,系统节点间的进程间通信是使用远程过程调用(remote procedure call, RPC)实现的。RPC协议使用序列化将消息呈现为将发送到远程节点的二进制流,然后远程节点将二进制流反序列化为原始消息。
Hadoop框架的核心部分,也就是shuffle和sort阶段,如果不使用Writable,将无法执行。
1、写入一个SequenceFile文件;
2、读取一个SequenceFile文件。
3、排序和合并SequenceFile文件。
掌握如何SequenceFile文件。
硬件:Ubuntu16.04
软件:JDK-1.8、Hadoop-3.3
数据存放路径:/data/dataset
tar包路径:/data/software
tar包压缩路径:/data/bigdata
软件安装路径:/opt
实验设计创建文件:/data/resource
注意:需要在配置文件/etc/profile文件中打开hadoop3的相关环境变量设置。
1、在/opt/hadoop-3.3.0/etc/hadoop目录下修改mapred-site.xml
<configuration>
<property>
<name>mapreduce.job.tracker</name>
<value>hdfs://localhost:8001</value>
<final>true</final>
</property></configuration>
修改yarn-site.xml
<configuration>
<property>
<name>yarn.nodemanager.resource.memory-mb</name>
<value>3072</value>
</property>
<property>
<name>yarn.nodemanager.resource.cpu-vcores</name>
<value>2</value>
</property>
<property>
<name>yarn.scheduler.minimum-allocation-mb</name>
<value>256</value>
</property></configuration>
在终端窗口中,执行如下命令,启动HDFS集群。
$ cd /opt/hadoop-3.3.0/sbin/$ ./start-all.sh
2、在终端窗口中,执行如下命令,查看HDFS服务启动情况:
$ jps
1、启动开发工具Eclipse。
2、打开eclipse开发工具后,在菜单栏中,选择【File】->【New】->【Project】->【Java project】创建Java项目并命名为【Hadoop3Demo】,点击【Finish】完成创建,如下图所示:
3、导入hadoop相关的jar包,首先右击项目选择【New】—>【Folder】创建一个【lib】文件夹并把指定位置中(/data/software/hadoop3_lib/)的包放入该文件中。如下图所示:
4、把lib下所有的jar包导入到环境变量,首先全选【lib】文件夹下的jar包文件,右键点击,选择【build path】->【add to build path】,添加后,发现在项目下多一个列表项【Referenced Libraries】。如下图所示:
1、在项目【src】目录下,单击右键,创建名为"com.simple.SequenceFileWriteDemo"的Java类,并编辑源代码如下:
package com.simple;import java.io.IOException;import java.net.URI;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IOUtils;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.SequenceFile;import org.apache.hadoop.io.Text;
public class SequenceFileWriteDemo {
private static final String[] DATA = {
"One, two, buckle my shoe",
"Three, four, shut the door",
"Five, six, pick up sticks",
"Seven, eight, lay them straight",
"Nine, ten, a big fat hen"
};
private static final String hdfsurl = "hdfs://localhost:9000";
public static void main(String[] args) throws IOException {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(hdfsurl), conf);
IntWritable key = new IntWritable(); // key
Text value = new Text(); // value
Path path = new Path(hdfsurl + "/data.txt");
SequenceFile.Writer writer = null;
try {
// 旧的方法
// writer = SequenceFile.createWriter(fs, conf, path, key.getClass(), value.getClass());
// 新的方法
SequenceFile.Writer.Option optionfile = SequenceFile.Writer.file(path);
SequenceFile.Writer.Option optionkey = SequenceFile.Writer.keyClass(key.getClass());
SequenceFile.Writer.Option optionvalue = SequenceFile.Writer.valueClass(value.getClass());
writer = SequenceFile.createWriter(conf, optionfile, optionkey, optionvalue);
for (int i = 0; i < 100; i++) {
key.set(100 - i);
value.set(DATA[i % DATA.length]);
System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key, value);
writer.append(key, value);
}
} finally {
IOUtils.closeStream(writer);
}
}
}
2、在代码的任意空白处,单击右键,在弹出的环境菜单中,选择"【Run As】->【Java Application】"菜单项,运行程序。操作如下图所示:
3、如果一切正常,则可以观察到Eclipse控制台输出信息如下:
......
INFO [main] - Got brand-new compressor [.deflate][128] 100 One, two, buckle my shoe[173] 99 Three, four, shut the door[220] 98 Five, six, pick up sticks[264] 97 Seven, eight, lay them straight[314] 96 Nine, ten, a big fat hen[359] 95 One, two, buckle my shoe[404] 94 Three, four, shut the door[451] 93 Five, six, pick up sticks[495] 92 Seven, eight, lay them straight[545] 91 Nine, ten, a big fat hen[590] 90 One, two, buckle my shoe[635] 89 Three, four, shut the door[682] 88 Five, six, pick up sticks[726] 87 Seven, eight, lay them straight[776] 86 Nine, ten, a big fat hen
......
4、在终端窗口中,执行如下的命令,查看序列化到HDFS中的文件:
$ hdfs dfs -ls /
应该可以看到生成的序列化文件,如下所示:
-rw-r--r-- 3 hduser supergroup 4788 2020-08-26 14:02 /data.txt
下面这个示例演示了如何读取具有Writable的键和值的序列文件。
1、在项目【src】目录下,单击右键,创建名为"com.simple.SequenceFileReadDemo"的Java类,并编辑源代码如下:
package com.simple;import java.io.IOException;import java.net.URI;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IOUtils;import org.apache.hadoop.io.SequenceFile;import org.apache.hadoop.io.Writable;import org.apache.hadoop.util.ReflectionUtils;
public class SequenceFileReadDemo {
private static final String hdfsurl = "hdfs://localhost:9000";
public static void main(String[] args) throws IOException {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(hdfsurl), conf);
Path path = new Path(hdfsurl + "/data.txt");
SequenceFile.Reader reader = null;
try {
// 旧方法
// reader = new SequenceFile.Reader(fs, path, conf);
// 新方法
SequenceFile.Reader.Option optionfile = SequenceFile.Reader.file(path);
reader = new SequenceFile.Reader(conf, optionfile);
Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
long position = reader.getPosition();
while (reader.next(key, value)) {
String syncSeen = reader.syncSeen() ? "*" : "";
System.out.printf("[%s%s]\t%s\t%s\n", position, syncSeen, key, value);
position = reader.getPosition(); // 下一条记录的开始
}
} finally {
IOUtils.closeStream(reader);
}
}
}
2、在代码的任意空白处,单击右键,在弹出的环境菜单中,选择"【Run As】->【Java Application】"菜单项,运行程序。操作如下图所示:
3、如果一切正常,则可以观察到Eclipse控制台输出信息如下:
......
INFO [main] - Got brand-new decompressor [.deflate][128] 100 One, two, buckle my shoe[173] 99 Three, four, shut the door[220] 98 Five, six, pick up sticks[264] 97 Seven, eight, lay them straight[314] 96 Nine, ten, a big fat hen[359] 95 One, two, buckle my shoe[404] 94 Three, four, shut the door[451] 93 Five, six, pick up sticks[495] 92 Seven, eight, lay them straight[545] 91 Nine, ten, a big fat hen
......[4512] 6 Nine, ten, a big fat hen[4557] 5 One, two, buckle my shoe[4602] 4 Three, four, shut the door[4649] 3 Five, six, pick up sticks[4693] 2 Seven, eight, lay them straight[4743] 1 Nine, ten, a big fat hen
......
hdfs dfs命令有一个-text选项,用于以文本形式显示序列文件。
1、在终端窗口中,执行以下命令,查看序列文件的内容:
$ hdfs dfs -text /data.txt | head
可以看到如下的内容:
排序和合并SequenceFile
对一个或多个序列文件进行排序(和合并)的最强大的方法是使用MapReduce。
1、我们可以通过指定输入和输出是序列文件,并设置key和value类型,共使用Hadoop附带的排序示例。在终端窗口中,执行以下命令:
$ cd /opt/
$ rm -rf hadoop
$ rm -rf hadoop-2.7.3 (并在/etc/profile中删除对应hadoop2环境变量,打开hadoop3环境变量,然后执行source /etc/profile使其生效)
$ cd /opt/hadoop-3.3.0
$ hadoop jar ./share/hadoop/mapreduce/hadoop-mapreduce-examples-*.jar \
sort -r 1 \
-inFormat org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat \
-outFormat org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat \
-outKey org.apache.hadoop.io.IntWritable \
-outValue org.apache.hadoop.io.Text \
/data.txt /sorted
3、在终端窗口中,执行以下命令,查看排序后生成的序列文件的内容:
$ hdfs dfs -text /sorted/part-r-00000 | head
可以看到如下的内容:
对于某些应用程序,我们需要一个专门的数据结构来保存数据。Hadoop的SequenceFile类为二进制key-value对提供了一个持久的数据结构。
SequenceFile是Hadoop API 提供的一种二进制文件,它将数据以字节流的形式序列化到文件中。这种二进制文件内部使用Hadoop 的标准的Writable 接口实现序列化和反序列化。
SequenceFile也可以作为较小文件的容器。HDFS和MapReduce针对大文件进行了优化,因此将文件打包成SequenceFile可以使存储和处理小文件更高效。
第四章 Hadoop3MapReduce初级编程
4-1 MapReduce编程——单词计数
理解mapreduce执行原理
掌握mapreduce程序开发技术
熟悉mapreduce作业提交流程
1、准备数据文件
2、mapreduce程序编写
3、程序测试及运行
MapReducer的主要过程主要分为map阶段与Reduce阶段,首先通过Map读取HDFS中的数据,然后经过拆分,将每个文件中的每行数据分拆成键值对,最后输出作为Reduce的输入,通过Reduce进行数据逻辑上的处理。
编写一个mapreduce程序进行wordcount统计,其中一个map类继承了Mapper类,一个reduce类继承了Reducer类,还有一个主类用来提交程序
对原始数据进行处理,把文档中所有的英文单词进行统计所有单词的个数。
首先对待处理的信息进行拆分,拆分之后在map阶段,拆分后计算出单词个数并作为map方法的输出值,而map的方法输出键作为NullWritable即可,最后在reduce阶段对每个键的值集合进行遍历并把遍历的值进行相加,输出结果即可。
硬件:Ubuntu16.04
软件:JDK-1.8、Hadoop-3.3、Eclipse
数据存放路径:/data/dataset
tar包路径:/data/software
tar包压缩路径:/data/bigdata
软件安装位置:/opt
实验设计创建文件:/data/resource
注意:需要在配置文件/etc/profile文件中打开hadoop3的相关环境变量设置。
1、在终端窗口中,执行如下命令,启动HDFS集群。
$ cd /opt/hadoop-3.3.0/sbin/$ ./start-all.sh
2、在终端窗口中,执行如下命令,查看HDFS服务启动情况:
$ jps
1、编辑数据文件。在终端窗口中,执行如下命令,编辑数据文件"word.txt":
$ cd /data/dataset/$ vi word.txt
在"word.txt"文件中,输入如下内容,单词间以空格分隔:
good good studyday day up
保存并退出文件编辑。
2、将数据文件"word.txt"上传至HDFS的根目录下。在终端窗口中,执行如下命令:
$ hdfs dfs -put /data/dataset/word.txt /
1、启动开发工具Eclipse。
2、打开eclipse开发工具后,在菜单栏中,选择【File】->【New】->【Project】->【Java project】创建Java项目并命名为【Hadoop3Demo】,点击【Finish】完成创建,如下图所示:
3、导入hadoop相关的jar包,首先右击项目选择【New】—>【Folder】创建一个【lib】文件夹并把指定位置中(/data/software/hadoop3_lib/)的包放入该文件中。如下图所示:
4、把lib下所有的jar包导入到环境变量,首先全选【lib】文件夹下的jar包文件,右键点击,选择【build path】->【add to build path】,添加后,发现在项目下多一个列表项【Referenced Libraries】。如下图所示:
1、编写Maper类,完成对单词的切分处理,并以(k,v)的形式输出到Reduce阶段
在项目【src】目录上,单击右键,创建名为”com.simple.WordCountMapper”的Java类,并编辑源代码如下:
package com.simple;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private final IntWritable one = new IntWritable(1);
private Text word = new Text();
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer token = new StringTokenizer(line);
while (token.hasMoreTokens()) {
word.set(token.nextToken());
context.write(word, one);
}
}
}
2、编写WordCountReducer类代码,实现对单词个数的统计。
在项目【src】目录上,单击右键,创建名为”com.simple.WordCountReducer”的Java类,并编辑源代码如下:
package com.simple;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
context.write(key, new IntWritable(sum));
}
}
3、创建驱动程序类WordCountDriver,提交和运行作业。
在项目【src】目录上,单击右键,创建名为"com.simple.WordCountDriver"的Java类,并编辑源代码如下:
package com.simple;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.examples.WordCount.TokenizerMapper;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.reduce.IntSumReducer;
public class WordCountDriver {
public static void main(String[] args) throws Exception {
final String hdfsurl = "hdfs://localhost:9000";
// 组织一个job,并提交
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCountDriver.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
// 如果map输出的中间结果类型,与reduce输出的结果类型相同时,可省略map的输出类型设置
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 指定要处理的输入数据文件的路径,执行时传入的第一个参数指定
FileInputFormat.addInputPath(job, new Path(hdfsurl+"/word.txt"));
// 指定最后reducer输出结果保存的文件路径,执行时传入的第二个参数指定
FileOutputFormat.setOutputPath(job, new Path(hdfsurl+"/word-output"));
// 参数true:是否在控制台打印执行过程的详细信息
boolean flag = job.waitForCompletion(false);
System.exit(flag?0:1);
}
}
1、在"WordCountDriver"类文件的任意空白处,单击右键,在弹出的环境菜单中,选择"【Run As】->【Java Application】"菜单项,运行程序。操作如下图所示:
2、如果一切正常,则可以在HDFS上查看统计的结果文件。在终端窗口中,执行如下命令:
$ hdfs dfs -cat /word-output/part-r-00000
可以看到单词计数的结果如下:
day 2good 2study 1up 1
4-2 MapReduce编程——数据去重
掌握去重的原理并使用MapReduce进行编程
1、启动Hadoop服务并查看处理数据
2、程序编写
目标:原始数据中出现次数超过一次的数据在输出文件中只出现一次。
算法思想:根据reduce的过程特性,会自动根据key来计算输入的value集合,把数据作为key输出给reduce,无论这个数据出现多少次,reduce最终结果中key只能输出一次。
1.实例中每个数据代表输入文件中的一行内容,map阶段采用Hadoop默认的作业输入方式。将value设置为key,并直接输出。 map输出数据的key为数据,将value设置成空值
2.在MapReduce流程中,map的输出<key,value>经过shuffle过程聚集成<key,value-list>后会交给reduce
3.reduce阶段不管每个key有多少个value,它直接将输入的key复制为输出的key,并输出(输出中的value被设置成空)。
硬件:Ubuntu16.04
软件:JDK-1.8、Hadoop-3.3、Eclipse
数据存放路径:/data/dataset
tar包路径:/data/software
tar包压缩路径:/data/bigdata
软件安装路径:/opt
实验设计创建文件:/data/resource
注意:需要在配置文件/etc/profile文件中打开hadoop3的相关环境变量设置。
1、在终端窗口中,执行如下命令,启动HDFS集群。
$ cd /opt/hadoop-3.3.0/sbin/$ ./start-all.sh
2、在终端窗口中,执行如下命令,查看HDFS服务启动情况:
$ jps
1、查看源数据文件内容。在终端窗口中,执行如下命令:
$ cat /data/dataset/Deduplicationinfo.txt
可以看到,文件内容如下:
2012-3-1 a2012-3-2 b2012-3-3 c2012-3-4 d2012-3-5 a2012-3-6 b2012-3-7 c2012-3-3 c2012-3-1 b2012-3-2 a2012-3-3 b2012-3-4 d2012-3-5 a2012-3-7 d2012-3-3 c
2、将数据源文件上传至HDFS的根目录下。在终端窗口中,执行如下命令:
$ hdfs dfs -put /data/dataset/Deduplicationinfo.txt /
1、启动开发工具Eclipse。
2、打开eclipse开发工具后,在菜单栏中,选择【File】->【New】->【Project】->【Java project】创建Java项目并命名为【Hadoop3Demo】,点击【Finish】完成创建,如下图所示:
3、导入hadoop相关的jar包,首先右击项目选择【New】—>【Folder】创建一个【lib】文件夹并把指定位置中(/data/software/hadoop3_lib/)的包放入该文件中。如下图所示:
4、把lib下所有的jar包导入到环境变量,首先全选【lib】文件夹下的jar包文件,右键点击,选择【build path】->【add to build path】,添加后,发现在项目下多一个列表项【Referenced Libraries】。如下图所示:
1、在项目src目录下,右键点击,选择【New】->【Class】创建一个类文件名称为"com.simple.DeduplicationMapper"。
2、让类【DeduplicationMapper】继承类Mapper同时指定需要的参数类型,根据业务逻辑修改map类的内容如下。
package com.simple;
import java.io.IOException;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;
public class DeduplicationMapper extends Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
//按行读取信息并作为mapper的输出键,mapper的输出值置为空文本即可
Text line = value;
context.write(line, new Text(""));
}
}
3、在项目src目录下右键点击,新建一个类名为"com.simple.DeduplicationReducer"并继承Reducer类,然后添加该类中的代码内容如下所示。
package com.simple;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class DeduplicationReducer extends Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text key, Iterable<Text> value, Context context)
throws IOException, InterruptedException {
//Reducer阶段直接按键输出即可,键直接可以实现去重
context.write(key, new Text(""));
}
}
4、在项目src目录下右键点击,新建一个测试主类名为"com.simple.TestDeduplication"并指定main主方法,测试代码如下所示:
package com.simple;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class TestDeduplication {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://localhost:9000");
//获取作业对象
Job job = Job.getInstance(conf);
//设置主类
job.setJarByClass(TestDeduplication.class);
//设置job参数
job.setMapperClass(DeduplicationMapper.class);
job.setReducerClass(DeduplicationReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//设置job输入输出
FileInputFormat.addInputPath(job, new Path("/Deduplicationinfo.txt"));
FileOutputFormat.setOutputPath(job, new Path("/simple/output"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
1、在"WordCountDriver"类文件的任意空白处,单击右键,在弹出的环境菜单中,选择"【Run As】->【Java Application】"菜单项,运行程序。操作如下图所示:
程序运行后,控制台打印如下图所示,且无错误日志产生,程序运行完毕。
2、程序执行完毕之后,查看对数据处理后产生的结果。如下图所示:
4-3 MapReduce编程——数据排序
理解排序的原理并使用MapReduce编写程序
1、查看数据并启动Hadoop服务
2、程序编写
在MapReduce操作时,传递的< key,value >会按照key的大小进行排序,最后输出的结果是按照key排过序的。在key排序的基础上,对value也进行排序。这种需求就是二次排序。二次排序是在框架在对key2排序后再对reduce输出结果的结果value3进行二次排序的需求。
在map阶段,使用job.setInputFormatClass定义的InputFormat将输入的数据集分割成小数据块splites,同时InputFormat提供一个RecordReder的实现。本例子中使用的是TextInputFormat,他提供的RecordReader会将文本的字节偏移量作为key,这一行的文本作为value。
核心总结:
1、map最后阶段进行partition分区,一般使用job.setPartitionerClass设置的类,如果没有自定义Key的hashCode()方法进行排序。
2、(第一次排序)每个分区内部调用job.setSortComparatorClass设置的key的比较函数类进行排序,如果没有则使用Key的实现的compareTo方法。
3、(第二次排序)当reduce接收到所有map传输过来的数据之后,调用job.setSortComparatorClass设置的key比较函数类对所有数据对排序,如果没有则使用Key的实现的compareTo方法。
4、紧接着使用job.setGroupingComparatorClass设置的分组函数类,进行分组,同一个Key的value放在一个迭代器里面。
硬件:Ubuntu16.04
软件:JDK-1.8、Hadoop-3.3、Eclipse
数据存放路径:/data/dataset
tar包路径:/data/software
tar包压缩路径:/data/bigdata
软件安装路径:/opt
实验设计创建文件:/data/resource
注意:需要在配置文件/etc/profile文件中打开hadoop3的相关环境变量设置。
1、在终端窗口中,执行如下命令,启动HDFS集群。
$ cd /opt/hadoop-3.3.0/sbin/$ ./start-all.sh
2、在终端窗口中,执行如下命令,查看HDFS服务启动情况:
$ jps
1、查看源数据文件内容。在终端窗口中,执行如下命令:
$ cat /data/dataset/SecondarySort.txt
可以看到,文件内容如下:
20 2150 5150 5250 5350 5460 5160 5360 5260 5660 5770 5860 6170 5470 5570 5670 5770 581 23 45 67 82203 2150 51250 52250 53530 5440 51120 5320 52260 5660 57740 5863 61730 5471 5571 5673 5774 5812 21131 4250 627 8
2、将数据源文件上传至HDFS的根目录下。在终端窗口中,执行如下命令:
$ hdfs dfs -put /data/dataset/SecondarySort.txt /
1、启动开发工具Eclipse。
2、打开eclipse开发工具后,在菜单栏中,选择【File】->【New】->【Project】->【Java project】创建Java项目并命名为【Hadoop3Demo】,点击【Finish】完成创建,如下图所示:
3、导入hadoop相关的jar包,首先右击项目选择【New】—>【Folder】创建一个【lib】文件夹并把指定位置中(/data/software/hadoop3_lib/)的包放入该文件中。如下图所示:
1、在项目【src】目录下,右键点击,选择"New"创建一个类文件名称为"com.simple.IntPair",该类是对给定数据的两列值的封装,并作为mapper的输出键对象 。实现代码如下:
package com.simple;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class IntPair implements WritableComparable<IntPair> {
private int first;
private int second;
public IntPair() {
super();
}
public IntPair(int first, int second) {
super();
this.first = first;
this.second = second;
}
public int getFirst() {
return first;
}
public void setFirst(int first) {
this.first = first;
}
public int getSecond() {
return second;
}
public void setSecond(int second) {
this.second = second;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + first;
result = prime * result + second;
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
IntPair other = (IntPair) obj;
if (first != other.first)
return false;
if (second != other.second)
return false;
return true;
}
@Override
public String toString() {
return "IntPair [first=" + first + ", second=" + second + "]";
}
@Override
public int compareTo(IntPair intPair) {
//首先比较第一个数,当第一个数不一样时,对第一个数进行比较,设置排序规则
if(first-intPair.getFirst()!=0) {
return first>intPair.first?1:-1;
}else {
//当第一个数一样时,比较第二个数,并设置排序规则
return second>intPair.second?1:-1;
}
}
@Override
//readFiedls方法用于序列化过程中的数据读取
public void readFields(DataInput in) throws IOException {
this.first=in.readInt();
this.second=in.readInt();
}
@Override
//write方法用于序列化过程中的数据写出
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
out.writeInt(first);
out.writeInt(second);
}
}
2、在项目【src】目录下,右键点击创建一个类文件名称为"com.simple.FirstPartitioner",该类是对数据处理后的结果进行分区设置 。代码实现如下:
package com.simple;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
/*
* 分区函数类
实现其自定义分区功能
*/
public class FirstPartitioner extends Partitioner<IntPair, Text> {
@Override
public int getPartition(IntPair key, Text value, int numPartitions) {
//这里取key的hashcode值*127,然后取其绝对值,对numPartitions取模,这里numPartitions与ReduceTask数保持一致
return Math.abs(key.getFirst() * 127) % numPartitions;
}
}
3、在项目src目录下,右键点击创建一个类文件名称为"com.simple.GroupingComparator",该类是对处理的数据进行分组设置 。实现代码如下:
package com.simple;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
/*
* 分组函数类
*/
public class GroupingComparator extends WritableComparator {
// 必须要有这个构造器,构造器中必须要实现这个
protected GroupingComparator() {
super(IntPair.class, true);
}
// 重载 compare:对组合键按第一个自然键排序分组
@SuppressWarnings("rawtypes")
public int compare(WritableComparable w1, WritableComparable w2) {
IntPair ip1 = (IntPair) w1;
IntPair ip2 = (IntPair) w2;
return ip1.compareTo(ip2);
}
}
4、在项目【src】目录下,右键点击创建一个类文件名称为"com.simple.SecondarySortMapper",继承类Mapper同时指定需要的参数类型,根据业务逻辑修改map类的内容如下:
package com.simple;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class SecondarySortMapper extends Mapper<LongWritable, Text, IntPair, Text> {
private final IntPair keyPair = new IntPair();
String[] lineArr = null;
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
//获取行的内容并以一个空格进行分割,然后将切割后的第一个字段赋值给keyPair的first,
// 第二个字段赋值给keyPair的second,并以keyPair作为k,value作为v,写出
String line = value.toString();
lineArr = line.split(" ", -1);
keyPair.setFirst(Integer.parseInt(lineArr[0]));
keyPair.setSecond(Integer.parseInt(lineArr[1]));
context.write(keyPair, value);
}
}
5、在项目【src】目录下右键点击,新建一个类名为"com.simple.SecondarySortReducer"并继承Reducer类,然后添加该类中的代码内容如下所示:
package com.simple;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class SecondarySortReducer extends Reducer<IntPair, Text, Text, Text> {
private static final Text SEPARATOR = new Text("---------------------");
public void reduce(IntPair key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
//对每一个IntPair输出一个"-------"划分观察
context.write(SEPARATOR, null);
//迭代输出
for (Text val : values) {
context.write(null, val);
}
}
}
6、在项目【src】目录下右键点击,新建一个测试主类名为"com.simple.SecondarySortJob",并指定main主方法。测试代码如下所示:
package com.simple;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class SecondarySortJob {
public static void main(String[] args) throws Exception {
// 获取作业对象
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://localhost:9000");
Job job = Job.getInstance(conf);
// 设置主类
job.setJarByClass(SecondarySortJob.class);
// 设置job参数
job.setMapperClass(SecondarySortMapper.class);
job.setReducerClass(SecondarySortReducer.class);
job.setMapOutputKeyClass(IntPair.class);
job.setMapOutputValueClass(Text.class);
// 设置分区
job.setPartitionerClass(FirstPartitioner.class);
// 设置分组
job.setGroupingComparatorClass(GroupingComparator.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
// 设置job输入输出
FileInputFormat.setInputPaths(job, new Path("/SecondarySort.txt"));
FileOutputFormat.setOutputPath(job, new Path("/simple/output"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
1、在"SecondarySortJob"类文件的任意空白处,单击右键,在弹出的环境菜单中,选择"【Run As】->【Java Application】"菜单项,运行程序。操作如下图所示:
2、查看控制台显示内容查看是否正确执行。如下图所示:
3、程序执行完毕之后,查看对数据处理后产生的结果。如下图所示:
4-4 MapReduce编程——数据分区
掌握分区的原理以及使用mapreduce进行编程
1、启动Hadoop服务并查看处理数据
2、程序编写
Hadoop采用的派发方式默认是根据散列值来派发,当数据进行map转换后,根据map后数据的key值进行散列派发,这样的一个弊端就是当数据key的值过于相似且集中时,大部分的数据就会分到同一个reducer中,从而造成数据倾斜,影响程序的运行效率。所以需要我们自己定制partition来根据自己的要求,选择记录的reducer。自定义partitioner很简单,只要自定义一个类,并且继承Partitioner类,重写其getPartition方法就好了,在使用的时候通过调用Job的setPartitionerClass指定一下即可。Map的结果,会通过partition分发到Reducer上。如果设置了Combiner,Map的结果会先送到Combiner进行合并,再将合并后数据发送给Reducer。
Mapper最终处理的键值对<key,value>,是需要送到Reducer去合并的,合并的时候,有相同key的键/值对会送到同一个Reducer。哪个key到哪个Reducer的分配过程,是由Partitioner规定的。它只有一个方法,
getPartition(Text key, Text value, int numPartitions)
系统缺省的Partitioner是HashPartitioner,它以key的Hash值对Reducer的数目取模,得到对应的Reducer。这样就保证如果有相同的key值,肯定被分配到同一个reducre上。
硬件:Ubuntu16.04
软件:JDK-1.8、Hadoop-3.3、Eclipse
数据存放路径:/data/dataset
tar包路径:/data/software
tar包压缩路径:/data/bigdata
软件安装路径:/opt
实验设计创建文件:/data/resource
注意:需要在配置文件/etc/profile文件中打开hadoop3的相关环境变量设置。
1、在终端窗口中,执行如下命令,启动HDFS集群。
$ cd /opt/hadoop-3.3.0/sbin/$ ./start-all.sh
2、在终端窗口中,执行如下命令,查看HDFS服务启动情况:
$ jps
1、查看源数据文件内容。在终端窗口中,执行如下命令:
$ cat /data/dataset/StuAgeCata.txt
可以看到,文件内容如下:
tom 13jerry 28lisa 34marry 22tonny 17kaisa 18bruce 29
2、将数据源文件上传至HDFS的根目录下。在终端窗口中,执行如下命令:
$ hdfs dfs -put /data/dataset/StuAgeCata.txt /
1、启动开发工具Eclipse。
2、打开eclipse开发工具后,在菜单栏中,选择【File】->【New】->【Project】->【Java project】创建Java项目并命名为【Hadoop3Demo】,点击【Finish】完成创建,如下图所示:
3、导入hadoop相关的jar包,首先右击项目选择【New】—>【Folder】创建一个【lib】文件夹并把指定位置中(/data/software/hadoop3_lib/)的包放入该文件中。如下图所示:
4、把lib下所有的jar包导入到环境变量,首先全选【lib】文件夹下的jar包文件,右键点击,选择【build path】->【add to build path】,添加后,发现在项目下多一个列表项【Referenced Libraries】。如下图所示:
1、在项目【src】目录下,右键点击,选择【New】->【Class】创建一个类文件名称为"com.simple.StudentWritable"。该类是对给定数据的三列值的封装,并作为mapper的输出键值对象 。实现代码如下:
package com.simple;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
public class StudentWritable implements Writable {
private String name;
private int age;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public StudentWritable() {
}
public StudentWritable(String name, int age) {
this.name = name;
this.age = age;
}
@Override
public String toString() {
return "StudentWritable [name=" + name + ", age=" + age + "]";
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(name);
out.writeInt(age);
}
@Override
public void readFields(DataInput in) throws IOException {
this.name = in.readUTF();
this.age = in.readInt();
}
}
2、在项目src目录下,右键点击选择【New】—>【Class】创建一个类文件名称为"com.simple.StuPartitioner",该类是对数据处理后的结果进行分区设置 。代码实现如下:
package com.simple;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;
public class StuPartitioner extends Partitioner<NullWritable, StudentWritable> {
@Override
public int getPartition(NullWritable key, StudentWritable value, int numPartitions) {
//按年龄进行分区,分区条件为大于18岁和小于18岁
if (value.getAge() >= 18) {
return 1;
} else {
return 0;
}
}
}
3、让类【StudentMapper】继承类Mapper同时指定需要的参数类型,根据业务逻辑修改map类的内容如下:
package com.simple;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class StudentMapper extends Mapper<LongWritable, Text, NullWritable, StudentWritable> {
@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, NullWritable, StudentWritable>.Context context)
throws IOException, InterruptedException {
//以空格切分
String stuArr[] = value.toString().split(" ");
context.write(NullWritable.get(), new StudentWritable(stuArr[0], Integer.parseInt(stuArr[1])));
}
}
4、在项目【src】目录下右键点击,新建一个类名为"com.simple.StudentReducer"并继承Reducer类,然后添加该类中的代码内容如下所示:
package com.simple;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
public class StudentReducer extends Reducer<NullWritable, StudentWritable, NullWritable, Text> {
@Override
protected void reduce(NullWritable key, Iterable<StudentWritable> iter,
Reducer<NullWritable, StudentWritable, NullWritable, Text>.Context context)
throws IOException, InterruptedException {
// 遍历数据
Iterator<StudentWritable> it = iter.iterator();
while (it.hasNext()) {
context.write(NullWritable.get(), new Text(it.next().toString()));
}
}
}
5、在项目【src】目录下右键点击,新建一个测试主类名为"com.simple.TestStuMapReducer",并指定main主方法,编写测试代码如下:
package com.simple;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class TestStuMapReducer {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://localhost:9000");
//获取一个Job实例
Job job = Job.getInstance(conf);
// 设置主类
job.setJarByClass(TestStuMapReducer.class);
// 设置Mapper类和Reducer类
job.setMapperClass(StudentMapper.class);
job.setReducerClass(StudentReducer.class);
job.setPartitionerClass(StuPartitioner.class);
job.setNumReduceTasks(2);
//设置map、reduce的输出类型
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(StudentWritable.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
//设置输入输出目录或文件
FileInputFormat.setInputPaths(job, new Path("/StuAgeCata.txt"));
FileOutputFormat.setOutputPath(job, new Path("/simple/output"));
//提交任务
job.waitForCompletion(true);
}
}
1、在"WordCountDriver"类文件的任意空白处,单击右键,在弹出的环境菜单中,选择"【Run As】->【Java Application】"菜单项,运行程序。操作如下图所示:
2、查看控制台显示内容查看是否正确执行。如下图所示:
3、程序执行完毕之后,查看对数据处理后产生的结果。如下图所示:
鄂ICP备2023011697号-1 | Powered By 91代做