Hadoop MapReduce

MapReduce是一个框架,我们可以编写应用程序,以可靠的方式并行处理大量商品硬件集群上的大量数据。

什么是MapReduce?

MapReduce是一种基于java的分布式计算处理技术和程序模型。MapReduce算法包含两个重要的任务,即Map和Reduce。Map接收一组数据并将其转换为另一组数据,其中各个元素被分解为元组(键/值对)。其次,减少任务,它将地图的输出作为输入,并将这些数据元组合成一组较小的元组。正如MapReduce名称的顺序所暗示的,reduce任务总是在映射作业之后执行。

MapReduce的主要优点是可以轻松扩展多个计算节点上的数据处理。在MapReduce模型下,数据处理原语被称为映射器和缩减器。将数据处理应用程序分解为映射器和还原器有时是非常重要的。但是,一旦我们在MapReduce表单中编写应用程序,扩展应用程序以在群集中运行数百,数千甚至数万台机器仅仅是一次配置更改。这种简单的可伸缩性吸引了许多程序员使用MapReduce模型。

 

算法

  • 一般而言,MapReduce范例基于将计算机发送到数据所在的位置!
  • MapReduce程序分三个阶段执行,分别是map阶段,shuffle阶段和reduce阶段。
    • 地图阶段 :地图或制图人的工作是处理输入数据。通常,输入数据采用文件或目录的形式,并存储在Hadoop文件系统(HDFS)中。输入文件逐行传递给映射器函数。映射程序处理数据并创建几个小块数据。
    • 减少阶段 :这个阶段是 Shuffle 阶段和 Reduce 阶段的结合。Reducer的工作是处理来自映射器的数据。处理完成后,它会生成一组新的输出,并存储在HDFS中。
  • 在MapReduce作业期间,Hadoop将Map和Reduce任务发送到群集中的相应服务器。
  • 该框架管理数据传递的所有细节,例如发布任务,验证任务完成以及在节点之间的集群周围复制数据。
  • 大多数计算发生在具有本地磁盘上数据的节点上,以减少网络流量。
  • 完成给定任务后,群集收集并减少数据以形成适当的结果,并将其发送回Hadoop服务器。

MapReduce算法

 

输入和输出(Java透视)

MapReduce框架在<key,value>对上运行,即框架将作业的输入视为一组<key,value>对,并生成一组<key,value>对作为作业的输出,可以想象不同的类型。

键和值类应该由框架以序列化的方式进行,因此需要实现Writable接口。此外,关键类必须实现Writable- Comparable接口,以方便框架进行排序。MapReduce作业的输入和输出类型:(输入)<k1,v1> \- > map - > <k2,v2> \-> reduce - > <k3,v3>(输出)。

  输入 输出
Map   list ()
Reduce   list ()

 

术语

  • PayLoad - 应用程序实现Map和Reduce功能,并构成工作的核心。
  • 映射器 - 映射器将输入键/值对映射到一组中间键/值对。
  • NamedNode - 管理Hadoop分布式文件系统(HDFS)的节点。
  • DataNode - 在进行任何处理之前预先显示数据的节点。
  • MasterNode - JobTracker运行的节点,它接受来自客户端的作业请求。
  • SlaveNode - Map和Reduce程序运行的节点。
  • JobTracker - 安排作业并跟踪任务跟踪器的作业。
  • 任务跟踪器 - 跟踪任务并将状态报告给JobTracker。
  • 作业 - 程序是跨数据集执行Mapper和Reducer。
  • 任务 - 在一部分数据上执行Mapper或Reducer。
  • 任务尝试 - 尝试在SlaveNode上执行任务的特定实例。

 

示例场景

以下给出的是有关组织电力消耗的数据。它包含每年的电力消耗量和不同年份的年平均值。

  一月 二月 损伤 四月 可能 七月 八月 九月 十月 十一月 十二月 平均
1979年 23 23 2 43 24 25 26 26 26 26 25 26 25
1980年 26 27 28 28 28 三十 31 31 31 三十 三十 三十 29
1981年 31 32 32 32 33 34 35 36 36 34 34 34 34
1984年 39 38 39 39 39 41 42 43 40 39 38 38 40
1985年 38 39 39 39 39 41 41 41 00 40 39 39 45

如果以上数据作为输入数据,我们必须编写应用程序来处理它并产生结果,例如查找最大使用年份,最小使用年份等。这对于有限数量的记录的程序员来说是一次散步。他们只会编写逻辑来产生所需的输出,并将数据传递给所写的应用程序。

但是,考虑一下代表特定国家所有大型工业自成立以来的耗电量的数据。

当我们编写应用程序来处理这种批量数据时,

  • 他们需要很多时间来执行。
  • 当我们将数据从源服务器移动到网络服务器时,网络流量会很大,依此类推。

为了解决这些问题,我们有了MapReduce框架。

输入数据

以上数据保存为 sample.txt 并作为输入。输入文件如下所示。

1979   23   23   2   43   24   25   26   26   26   26   25   26  25
1980   26   27   28  28   28   30   31   31   31   30   30   30  29
1981   31   32   32  32   33   34   35   36   36   34   34   34  34
1984   39   38   39  39   39   41   42   43   40   39   38   38  40
1985   38   39   39  39   39   41   41   41   00   40   39   39  45

示例程序

下面给出的是使用MapReduce框架的示例数据程序。

package hadoop;

import java.util.*;

import java.io.IOException;
import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;

public class ProcessUnits
{
   //Mapper class
   public static class E_EMapper extends MapReduceBase implements
   Mapper<LongWritable ,/*Input key Type */
   Text,                /*Input value Type*/
   Text,                /*Output key Type*/
   IntWritable>        /*Output value Type*/
   {

      //Map function
      public void map(LongWritable key, Text value,
      OutputCollector<Text, IntWritable> output,   
      Reporter reporter) throws IOException
      {
         String line = value.toString();
         String lasttoken = null;
         StringTokenizer s = new StringTokenizer(line,"\t");
         String year = s.nextToken();

         while(s.hasMoreTokens())
            {
               lasttoken=s.nextToken();
            }

         int avgprice = Integer.parseInt(lasttoken);
         output.collect(new Text(year), new IntWritable(avgprice));
      }
   }


   //Reducer class
   public static class E_EReduce extends MapReduceBase implements
   Reducer< Text, IntWritable, Text, IntWritable >
   {  

      //Reduce function
      public void reduce( Text key, Iterator <IntWritable> values,
         OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
         {
            int maxavg=30;
            int val=Integer.MIN_VALUE;

            while (values.hasNext())
            {
               if((val=values.next().get())>maxavg)
               {
                  output.collect(key, new IntWritable(val));
               }
            }

         }
   }  


   //Main function
   public static void main(String args[])throws Exception
   {
      JobConf conf = new JobConf(ProcessUnits.class);

      conf.setJobName("max_eletricityunits");
      conf.setOutputKeyClass(Text.class);
      conf.setOutputValueClass(IntWritable.class);
      conf.setMapperClass(E_EMapper.class);
      conf.setCombinerClass(E_EReduce.class);
      conf.setReducerClass(E_EReduce.class);
      conf.setInputFormat(TextInputFormat.class);
      conf.setOutputFormat(TextOutputFormat.class);

      FileInputFormat.setInputPaths(conf, new Path(args[0]));
      FileOutputFormat.setOutputPath(conf, new Path(args[1]));

      JobClient.runJob(conf);
   }
}

将上面的程序保存为 ProcessUnits.java。 该程序的编译和执行如下所述。

 

编制和执行过程单位方案

让我们假设我们在Hadoop用户的主目录中(例如/ home / hadoop)。

按照以下步骤编译并执行上述程序。

步骤1

以下命令是创建一个目录来存储已编译的Java类。

$ mkdir units

第2步

下载 Hadoop-core-1.2.1.jar, 用于编译和执行MapReduce程序。访问以下链接http://mvnrepository.com/artifact/org.apache.hadoop/hadoop- core/1.2.1以下载jar。让我们假设下载的文件夹是 / home / hadoop /。

第3步

以下命令用于编译 ProcessUnits.java 程序并为程序创建一个jar。

$ javac -classpath hadoop-core-1.2.1.jar -d units ProcessUnits.java
$ jar -cvf units.jar -C units/ .

步骤4

以下命令用于在HDFS中创建输入目录。

$HADOOP_HOME/bin/hadoop fs -mkdir input_dir

第5步

以下命令用于复制HDFS输入目录下名为 sample.txt 的输入文件。

$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/sample.txt input_dir

第6步

以下命令用于验证输入目录中的文件。

$HADOOP_HOME/bin/hadoop fs -ls input_dir/

第7步

以下命令用于通过从输入目录获取输入文件来运行Eleunit_max应用程序。

$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir

等待一段时间,直到文件被执行。执行后,如下所示,输出将包含输入拆分的数量,Map任务的数量,还原器任务的数量等。

INFO mapreduce.Job: Job job_1414748220717_0002
completed successfully
14/10/31 06:02:52
INFO mapreduce.Job: Counters: 49
File System Counters

FILE: Number of bytes read=61
FILE: Number of bytes written=279400
FILE: Number of read operations=0
FILE: Number of large read operations=0   
FILE: Number of write operations=0
HDFS: Number of bytes read=546
HDFS: Number of bytes written=40
HDFS: Number of read operations=9
HDFS: Number of large read operations=0
HDFS: Number of write operations=2 Job Counters


   Launched map tasks=2  
   Launched reduce tasks=1
   Data-local map tasks=2  
   Total time spent by all maps in occupied slots (ms)=146137
   Total time spent by all reduces in occupied slots (ms)=441   
   Total time spent by all map tasks (ms)=14613
   Total time spent by all reduce tasks (ms)=44120
   Total vcore-seconds taken by all map tasks=146137

   Total vcore-seconds taken by all reduce tasks=44120
   Total megabyte-seconds taken by all map tasks=149644288
   Total megabyte-seconds taken by all reduce tasks=45178880

Map-Reduce Framework

Map input records=5  
   Map output records=5   
   Map output bytes=45  
   Map output materialized bytes=67  
   Input split bytes=208
   Combine input records=5  
   Combine output records=5
   Reduce input groups=5  
   Reduce shuffle bytes=6  
   Reduce input records=5  
   Reduce output records=5  
   Spilled Records=10  
   Shuffled Maps =2  
   Failed Shuffles=0  
   Merged Map outputs=2  
   GC time elapsed (ms)=948  
   CPU time spent (ms)=5160  
   Physical memory (bytes) snapshot=47749120  
   Virtual memory (bytes) snapshot=2899349504  
   Total committed heap usage (bytes)=277684224

File Output Format Counters

   Bytes Written=40

第8步

以下命令用于验证输出文件夹中的结果文件。

$HADOOP_HOME/bin/hadoop fs -ls output_dir/

第9步

以下命令用于查看 Part-00000 文件中的输出。这个文件是由HDFS生成的。

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000

以下是MapReduce程序生成的输出。

1981    34
1984    40
1985    45

第10步

以下命令用于将输出文件夹从HDFS复制到本地文件系统进行分析。

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000/bin/hadoop dfs get output_dir /home/hadoop

 

重要的命令

所有Hadoop命令都由 $ HADOOP_HOME / bin / hadoop 命令调用。不带任何参数运行Hadoop脚本将打印所有命令的描述。

用法 :hadoop [--config confdir]命令

下表列出了可用选项及其说明。

选项 描述
namenode -format 格式化DFS文件系统。
secondarynamenode 运行DFS辅助名称节点。
namenode 运行DFS名称节点。
datanode 运行DFS数据节点。
dfsadmin 运行DFS管理客户端。
mradmin 运行Map-Reduce管理客户端。
fsck 运行DFS文件系统检查实用程序。
fs 运行通用文件系统用户客户端。
balancer 运行集群平衡实用程序。
oiv 将离线fsimage查看器应用于fsimage。
fetchdt 从NameNode获取授权令牌。
jobtracker 运行MapReduce作业跟踪器节点。
pipes 运行管道作业。
tasktracker 运行MapReduce任务跟踪器节点。
historyserver 将作业历史记录服务器作为独立守护程序运行。
job 操作MapReduce作业。
queue 获取有关JobQueues的信息。
version 打印版本。
jar 运行一个jar文件。
distcp 递归复制文件或目录。
distcp2 DistCp版本2。
archive -archiveName NAME -p 创建一个hadoop存档。
*  
classpath 打印获取Hadoop jar和所需库所需的类路径。
daemonlog 获取/设置每个守护进程的日志级别

 

如何与MapReduce作业进行交互

用法:hadoop作业[GENERIC_OPTIONS]

以下是Hadoop作业中可用的通用选项。

GENERIC_OPTIONS 描述
-submit 提交工作。
-status 打印地图并减少完成百分比和所有工作计数器。
-counter 打印计数器值。
-kill 杀死这份工作。
-events 打印jobtracker针对给定范围收到的事件详细信息。
-history [all] - history < jobOutputDir> 打印作业详细信息,失败并终止提示细节。可以通过指定[all]选项来查看有关作业的更多详细信息,如成功执行任务和为每个任务执行的任务尝试。
-list[all] 显示所有作业。-list仅显示尚未完成的作业。
-kill-task 杀死任务。杀死的任务不计入失败的尝试。
-fail-task 失败的任务。失败的任务计入失败的尝试中。
-set-priority 改变工作的优先级。允许的优先级值为VERY_HIGH,HIGH,NORMAL,LOW,VERY_LOW

查看工作状态

$ $HADOOP_HOME/bin/hadoop job -status <JOB-ID>
e.g.
$ $HADOOP_HOME/bin/hadoop job -status job_201310191043_0004

查看工作输出的历史记录

$ $HADOOP_HOME/bin/hadoop job -history <DIR-NAME>
e.g.
$ $HADOOP_HOME/bin/hadoop job -history /user/expert/output

杀死这份工作

$ $HADOOP_HOME/bin/hadoop job -kill <JOB-ID>
e.g.
$ $HADOOP_HOME/bin/hadoop job -kill job_201310191043_0004

Hadoop流媒体是Hadoop发行版附带的实用程序。此实用程序允许您使用任何可执行文件或脚本作为映射器和/或缩减器来创建和运行Map / Reduce作业。使用Python的例子对于Hadoop流媒体,我们正在考虑单 ...