官方接单发单平台上线!有接单发单需求的请直接发布需求,或注册接单!点击此处查看详情!

结课作业实现MapReduce程序管理Hbase中的数据

时间:2024-09-10 浏览:145 分类:Java程序代做

91代做网-专注各种程序代做

包括但不限于:各类毕设课设、作业辅导、代码答疑、报告论文、商业程序开发、论文复现和小程序开发等。

也欢迎各行业程序员加入我们,具体请联系客服详聊:QQ号:,微信号:,接单Q群:

结课作业

结课作业  实现MapReduce程序管理Hbase中的数据

 实验概述

实现一个可以管理操作Hbase中数据的MapReduce程序

 实验目的

通过该实验后,能在MapReduce程序中存储,操作并处理Hbase数据。

 实验说明

本实验环境中已经配置好Hadoop与hbase集群环境,只需要在主服务器(namenode)上执行hdfs namenode -format 格式化命令后启动Hadoop集群,再在hbase目录下启动Hbase服务。

本实验开始时,读者会有四台Centos 7的linux主机,其中三台master ,slave1,slave2已经配置好Hadoop与hbase集群环境,master为namenode,slave1与slave2为datanode。另外一台develop-pc为用来开发的PC机,上面已经安装好jdk和eclipse。

 实验背景

员工销售业绩表(Employee)中一个员工的的销售业绩有多笔记录,公司现在希望将每个员工的总销售额计算出来并存储到TotalSale表中。

 实验步骤

步骤1  开发TotalSale程序

& 讲解

在develop-pc主机中,打开eclipse开发工具,使用Ecplise新建Maven项目MapReduce,并配置buildpath(主要是将jdk设置为1.8版本的),在实验2中我们已经介绍过如何使用Ecplise工具创建Maven项目了,这里不再详细介绍。

新建完Maven项目后:编辑pom.xml文件内容如下:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>learning</groupId>
  <artifactId>MapReduce</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  
  <properties>
        <hadoop.version>2.7.3</hadoop.version>
  </properties>
   <dependencies>
        <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-client</artifactId>
          <version>${hadoop.version}</version>
        </dependency>
<dependency>
  <groupId>org.apache.hbase</groupId>
  <artifactId>hbase-server</artifactId>
  <version>1.2.4</version>
       </dependency>
  </dependencies>
  <build>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <configuration>
                    <skip>true</skip>
                </configuration>
            </plugin>
        </plugins>
</build>  
</project>

保存后Eclipse会自动下载所需要的Jar包,如果没有自动下载就在项目上点击右键,选择Mavenèupdate project。

在 MapReduce项目中新建package:com.learning.MapReduce并在此包下新建TotalSale 类文件,(本例中共用了实验4中的项目)如下:

TotalSale.java内容如下(其中192.168.26.201是master服务器地址请根据自己实际情况修改):
package com.learning.mapreduce;
 
import java.io.IOException;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
 
public class TotalSale {
 
private static class Map extends TableMapper<Text, IntWritable> {
 
String userId = "";
int sale = 0;
 
public void map(ImmutableBytesWritable rowKey, Result columns, Context context)
throws IOException, InterruptedException {
String inkey = Bytes.toString(rowKey.get()); // 读取HBase用户表rowkey
userId = inkey.split("#")[0];
byte[] bsale = columns.getValue(Bytes.toBytes("cf1"), Bytes.toBytes("sales"));
String sales = new String(bsale);
sale = Integer.valueOf(sales);

context.write(new Text(Bytes.toBytes(userId)), new IntWritable(sale));

}
}
 
private static class Reducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable> {
 
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int total = 0;
for (IntWritable v : values) {
total += v.get();
}
Put put = new Put(Bytes.toBytes(key.toString()));
put.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("Total sales"), Bytes.toBytes(total));
context.write(null, put);
 
}
}

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{
Configuration conf = HBaseConfiguration.create(new Configuration());
conf.set("hbase.zookeeper.property.clientPort", "2181");
conf.set("hbase.zookeeper.quorum", "192.168.26.201");
conf.set("hbase.master", "192.168.26.201:600000");

Job job = Job.getInstance(conf, "Total sale");

job.setJarByClass(TotalSale.class);   
    
Scan scan = new Scan();
scan.setCaching(500);        
scan.setCacheBlocks(false);

TableMapReduceUtil.initTableMapperJob(
"Employee",      // input table
scan,           // Scan instance to control CF and attribute selection
Map.class,   // mapper class
Text.class,           // mapper output key
IntWritable.class,           // mapper output value
job);
TableMapReduceUtil.initTableReducerJob(
"TotalSale",      // output table
Reducer.class,             // reducer class
job);

job.setNumReduceTasks(1);
        
System.exit(job.waitForCompletion(true) ? 0 : 1);
    
}
}

 

 

步骤2  导出jar文件

步骤同《实验4:实现MapReduce程序完成行统计》中的 导出 jar文件

 

步骤3  将jar文件上传到master服务器上

步骤同《实验4:实现MapReduce程序完成行统计》中的 将jar文件上传到master服务器上/usr/local/zhitu/mapreduce 目录下,(没有这个目录就mkdir命令新建)

 

步骤4  添加运行所需要的jar包

& 讲解

需要将运行TotalSale所需要的jar包加入到hadoop运行环境中,登陆master服务器,在/usr/local/zhitu/hadoop-2.7.3  目录下新建目录/contrib/capacity-scheduler,并将需要的jar包上传到该目录下:

创建目录命令:mkdir -p /usr/local/zhitu/hadoop-2.7.3/contrib/capacity-scheduler

需要的jar包有如下8个:

1 hadoop-common-2.7.3.jar  (在/usr/local/zhitu/hadoop-2.7.3/share/hadoop/common/ 目录下可以找到)

2 hbase-client-1.2.4.jar

3 hbase-common-1.2.4.jar

4 hbase-hadoop-compat-1.2.4.jar

5 hbase-procedure-1.2.4.jar

6 hbase-protocol-1.2.4.jar

7 hbase-server-1.2.4.jar

8 metrics-core-2.2.0.jar (以上7个jar文件在/usr/local/zhitu/hbase-1.2.4/lib 目录下可以找到)

 

并使用scp命令将该/contrib 文件夹以及文件夹下的所有内容复制到其他服务器上。

参考命令(slave1,slave2请换成自己从服务器名称或者ip):

scp -r /usr/local/zhitu/hadoop-2.7.3/contrib

root@slave1:/usr/local/zhitu/hadoop-2.7.3/

scp -r /usr/local/zhitu/hadoop-2.7.3/contrib

 root@slave2:/usr/local/zhitu/hadoop-2.7.3/

 

: 操作

实际操作:上传jar包


实际操作:复制jar包到其它从服务器

步骤5  启动Hadoop服务

& 讲解

hadoop的start-all.sh启动的服务包括hdfs和yarn服务,这里除了启动hadoop的hdfs和yarn服务外还需要启动hadoop的jobhistory服务。

启动完成后使用jps命令在各个服务器查看服务是否启动成功。jobhistory服务启动成功在主服务器上会看到JobHistoryServer服务。

参考命令:

cd /usr/local/zhitu/hadoop-2.7.3

bin/hdfs namenode -format

sbin/./start-all.sh

sbin/./mr-jobhistory-daemon.sh start historyserver

sbin/./yarn-daemon.sh start timelineserver

: 操作

实际操作:格式化并启动hadoop服务



实际操作:使用jps命令在各个服务器上确认服务已经启动成功

 

步骤6  启动Hbase服务

& 讲解

启动hbase服务只需要在主服务器上的hbase-1.2.4目录下执行bin/./start-hbase.sh即可。启动成功后,也可以使用jps命令验证服务是否启动成功。主服务器上会有HQuorumPeer服务和HMaster服务,从服务器上会有HRegionServer服务。

参考命令:

cd /usr/local/zhitu/hbase-1.2.4

bin/./start-hbase.sh

/usr/lib/java/jdk1.8/bin/jps



步骤7  准备测试数据

& 讲解

通过hbase shell命令,创建表“Employee”中,包含一个列族“cf1” ,和保存结果的表“TotalSale”,包含一个列族:"cf1"。

再向表Employee中添加如下四条记录

行键:'2014101#1','cf1:sales','150'

行键:'2014101#2','cf1:sales','250'

行键:'2014102#1','cf1:sales','350'

行键:'2014102#2','cf1:sales','750'

最后查询表中的所有记录,确定数据都已经添加到表Employee中。

使用exit命令可推出hbase shell命令行。

参考命令:

hbase shell

create 'Employee','cf1'

create 'TotalSale','cf1'

put 'Employee','2014101#1','cf1:sales','150'

put 'Employee','2014101#2','cf1:sales','250'

put 'Employee','2014102#1','cf1:sales','350'

put 'Employee','2014102#2','cf1:sales','750'

scan 'Employee'

 

  

步骤8  运行TotalSale

& 讲解

进入到hadoop-2.7.3目录下使用命令:bin/hadoop jar 来运行TotalSale程序,

命令:

 bin/hadoop jar /usr/local/zhitu/mapreduce/MapReduce.jar  

com.learning.mapreduce.TotalSale

   

步骤9  查看运行结果

& 讲解

进入到hbase shell命令行,查看表TotalSale里是否有已经生成的数据了。

在hbase中整型数据存储为字节码,如果要以整型查看需要使用org.apache.hadoop.hbase.util.Bytes工具类将其转换为整型。

参考命令:

cd /usr/local/zhitu/hbase-1.2.4

hbase shell

scan 'TotalSale'

org.apache.hadoop.hbase.util.Bytes.toInt("\x00\x00\x01\x90".to_java_bytes)

org.apache.hadoop.hbase.util.Bytes.toInt("\x00\x00\x04L".to_java_bytes)

   

 

 

 

   

 


客服