结课作业 实现MapReduce程序管理Hbase中的数据
实验概述
实现一个可以管理操作Hbase中数据的MapReduce程序
实验目的
通过该实验后,能在MapReduce程序中存储,操作并处理Hbase数据。
实验说明
本实验环境中已经配置好Hadoop与hbase集群环境,只需要在主服务器(namenode)上执行hdfs namenode -format 格式化命令后启动Hadoop集群,再在hbase目录下启动Hbase服务。
本实验开始时,读者会有四台Centos 7的linux主机,其中三台master ,slave1,slave2已经配置好Hadoop与hbase集群环境,master为namenode,slave1与slave2为datanode。另外一台develop-pc为用来开发的PC机,上面已经安装好jdk和eclipse。
实验背景
员工销售业绩表(Employee)中一个员工的的销售业绩有多笔记录,公司现在希望将每个员工的总销售额计算出来并存储到TotalSale表中。
实验步骤
步骤1 开发TotalSale程序
& 讲解
在develop-pc主机中,打开eclipse开发工具,使用Ecplise新建Maven项目MapReduce,并配置buildpath(主要是将jdk设置为1.8版本的),在实验2中我们已经介绍过如何使用Ecplise工具创建Maven项目了,这里不再详细介绍。
新建完Maven项目后:编辑pom.xml文件内容如下:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>learning</groupId> <artifactId>MapReduce</artifactId> <version>0.0.1-SNAPSHOT</version> <properties> <hadoop.version>2.7.3</hadoop.version> </properties> <dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>1.2.4</version> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>3.0</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <configuration> <skip>true</skip> </configuration> </plugin> </plugins> </build> </project>
保存后Eclipse会自动下载所需要的Jar包,如果没有自动下载就在项目上点击右键,选择Mavenèupdate project。
在 MapReduce项目中新建package:com.learning.MapReduce并在此包下新建TotalSale 类文件,(本例中共用了实验4中的项目)如下:
TotalSale.java内容如下(其中192.168.26.201是master服务器地址请根据自己实际情况修改): package com.learning.mapreduce; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; public class TotalSale { private static class Map extends TableMapper<Text, IntWritable> { String userId = ""; int sale = 0; public void map(ImmutableBytesWritable rowKey, Result columns, Context context) throws IOException, InterruptedException { String inkey = Bytes.toString(rowKey.get()); // 读取HBase用户表rowkey userId = inkey.split("#")[0]; byte[] bsale = columns.getValue(Bytes.toBytes("cf1"), Bytes.toBytes("sales")); String sales = new String(bsale); sale = Integer.valueOf(sales); context.write(new Text(Bytes.toBytes(userId)), new IntWritable(sale)); } } private static class Reducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable> { public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int total = 0; for (IntWritable v : values) { total += v.get(); } Put put = new Put(Bytes.toBytes(key.toString())); put.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("Total sales"), Bytes.toBytes(total)); context.write(null, put); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{ Configuration conf = HBaseConfiguration.create(new Configuration()); conf.set("hbase.zookeeper.property.clientPort", "2181"); conf.set("hbase.zookeeper.quorum", "192.168.26.201"); conf.set("hbase.master", "192.168.26.201:600000"); Job job = Job.getInstance(conf, "Total sale"); job.setJarByClass(TotalSale.class); Scan scan = new Scan(); scan.setCaching(500); scan.setCacheBlocks(false); TableMapReduceUtil.initTableMapperJob( "Employee", // input table scan, // Scan instance to control CF and attribute selection Map.class, // mapper class Text.class, // mapper output key IntWritable.class, // mapper output value job); TableMapReduceUtil.initTableReducerJob( "TotalSale", // output table Reducer.class, // reducer class job); job.setNumReduceTasks(1); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
步骤2 导出jar文件
步骤同《实验4:实现MapReduce程序完成行统计》中的 导出 jar文件
步骤3 将jar文件上传到master服务器上
步骤同《实验4:实现MapReduce程序完成行统计》中的 将jar文件上传到master服务器上/usr/local/zhitu/mapreduce 目录下,(没有这个目录就mkdir命令新建)
步骤4 添加运行所需要的jar包
& 讲解
需要将运行TotalSale所需要的jar包加入到hadoop运行环境中,登陆master服务器,在/usr/local/zhitu/hadoop-2.7.3 目录下新建目录/contrib/capacity-scheduler,并将需要的jar包上传到该目录下:
创建目录命令:mkdir -p /usr/local/zhitu/hadoop-2.7.3/contrib/capacity-scheduler
需要的jar包有如下8个:
1 hadoop-common-2.7.3.jar (在/usr/local/zhitu/hadoop-2.7.3/share/hadoop/common/ 目录下可以找到)
2 hbase-client-1.2.4.jar
3 hbase-common-1.2.4.jar
4 hbase-hadoop-compat-1.2.4.jar
5 hbase-procedure-1.2.4.jar
6 hbase-protocol-1.2.4.jar
7 hbase-server-1.2.4.jar
8 metrics-core-2.2.0.jar (以上7个jar文件在/usr/local/zhitu/hbase-1.2.4/lib 目录下可以找到)
并使用scp命令将该/contrib 文件夹以及文件夹下的所有内容复制到其他服务器上。
参考命令(slave1,slave2请换成自己从服务器名称或者ip):
scp -r /usr/local/zhitu/hadoop-2.7.3/contrib
root@slave1:/usr/local/zhitu/hadoop-2.7.3/
scp -r /usr/local/zhitu/hadoop-2.7.3/contrib
root@slave2:/usr/local/zhitu/hadoop-2.7.3/
: 操作
实际操作:上传jar包
实际操作:复制jar包到其它从服务器
步骤5 启动Hadoop服务
& 讲解
hadoop的start-all.sh启动的服务包括hdfs和yarn服务,这里除了启动hadoop的hdfs和yarn服务外还需要启动hadoop的jobhistory服务。
启动完成后使用jps命令在各个服务器查看服务是否启动成功。jobhistory服务启动成功在主服务器上会看到JobHistoryServer服务。
参考命令:
cd /usr/local/zhitu/hadoop-2.7.3
bin/hdfs namenode -format
sbin/./start-all.sh
sbin/./mr-jobhistory-daemon.sh start historyserver
sbin/./yarn-daemon.sh start timelineserver
: 操作
实际操作:格式化并启动hadoop服务
实际操作:使用jps命令在各个服务器上确认服务已经启动成功
步骤6 启动Hbase服务
& 讲解
启动hbase服务只需要在主服务器上的hbase-1.2.4目录下执行bin/./start-hbase.sh即可。启动成功后,也可以使用jps命令验证服务是否启动成功。主服务器上会有HQuorumPeer服务和HMaster服务,从服务器上会有HRegionServer服务。
参考命令:
cd /usr/local/zhitu/hbase-1.2.4
bin/./start-hbase.sh
/usr/lib/java/jdk1.8/bin/jps
步骤7 准备测试数据
& 讲解
通过hbase shell命令,创建表“Employee”中,包含一个列族“cf1” ,和保存结果的表“TotalSale”,包含一个列族:"cf1"。
再向表Employee中添加如下四条记录
行键:'2014101#1','cf1:sales','150'
行键:'2014101#2','cf1:sales','250'
行键:'2014102#1','cf1:sales','350'
行键:'2014102#2','cf1:sales','750'
最后查询表中的所有记录,确定数据都已经添加到表Employee中。
使用exit命令可推出hbase shell命令行。
参考命令:
hbase shell
create 'Employee','cf1'
create 'TotalSale','cf1'
put 'Employee','2014101#1','cf1:sales','150'
put 'Employee','2014101#2','cf1:sales','250'
put 'Employee','2014102#1','cf1:sales','350'
put 'Employee','2014102#2','cf1:sales','750'
scan 'Employee'
步骤8 运行TotalSale
& 讲解
进入到hadoop-2.7.3目录下使用命令:bin/hadoop jar 来运行TotalSale程序,
命令:
bin/hadoop jar /usr/local/zhitu/mapreduce/MapReduce.jar
com.learning.mapreduce.TotalSale
步骤9 查看运行结果
& 讲解
进入到hbase shell命令行,查看表TotalSale里是否有已经生成的数据了。
在hbase中整型数据存储为字节码,如果要以整型查看需要使用org.apache.hadoop.hbase.util.Bytes工具类将其转换为整型。
参考命令:
cd /usr/local/zhitu/hbase-1.2.4
hbase shell
scan 'TotalSale'
org.apache.hadoop.hbase.util.Bytes.toInt("\x00\x00\x01\x90".to_java_bytes)
org.apache.hadoop.hbase.util.Bytes.toInt("\x00\x00\x04L".to_java_bytes)
鄂ICP备2023011697号-1 | Powered By 91代做