Skip to main content.

公告

搜索

最新评论

Hadoop如何在MapReduce中使用压缩

在考虑如何压缩那些将由MapReduce处理的数据时,考虑压缩格式是否支持分割是很重要的。考虑存储在HDFS中的未压缩的文件,其大小为1GB,HDFS的块大小为64MB,所以该文件将被存储为16块,将此文件用作输入的MapReduce作业会创建1个输人分片(split ,也称为“分块”。对于block,我们统一称为“块”。)每个分片都被作为一个独立map任务的输入单独进行处理。

现在假设。该.文件是一个grip格式的压缩文件,压缩后的大小为1GB。和前面一样,HDFS将此文件存储为16块。然而,针对每一块创建一个分块是没有用的,因为不可能从gzip数据流中的任意点开始读取,map任务也不可能独立于其他分块只读取一个分块中的数据。gzip格式使用DEFLATE来存储压缩过的数据,DEFLATE将数据作为一系列压缩过的块进行存储。问题是,每块的开始没有指定用户在数据流中任意点定位到下一个块的起始位置,而是其自身与数据流同步。因此,gzip不支持分割(块)机制。

在这种情况下,MapReduce不分割gzip格式的文件,因为它知道输入是gzip压缩格式的(通过文件扩展名得知),而gzip压缩机制不支持分割机制。这样是以牺牲本地化为代价:一个map任务将处理16个HDFS块。大都不是map的本地数据。与此同时,因为map任务少,所以作业分割的粒度不够细,从而导致运行时间变长。

在我们假设的例子中,如果是一个LZO格式的文件,我们会碰到同样的问题,因为基本压缩格式不为reader提供方法使其与流同步。但是,bzip2格式的压缩文件确实提供了块与块之间的同步标记(一个48位的PI近似值),因此它支持分割机制。(上一篇中我们列出了每种压缩格式是否支持分割。)

对于文件的收集,这些问题会稍有不同。ZIP是存档格式,因此它可以将多个文件合并为一个ZIP文件。每个文件单独压缩,所有文档的存储位置存储在ZIP文件的尾部。这个属性表明ZIP文件支持文件边界处分割,每个分片中包括ZIP压缩文件中的一个或多个文件。

在MapReduce我们应该使用哪种压缩格式?

根据应用的具体情况来决定应该使用哪种压缩格式。就个人而言,更趋向于使用最快的速度压缩,还是使用最优的空间压缩?一般来说,应该尝试不同的策略,并用具有代表性的数据集进行测试,从而找到最佳方法。对于那些大型的、没有边界的文件,如日志文件,有以下选项。

  • 存储未压缩的文件。
  • 使用支持分割机制的压缩格式,如bzip2。
  • 在应用中将文件分割成几个大的数据块,然后使用任何一种支持的压缩格式单独压缩每个数据块(可不用考虑压缩格式是否支持分割)。在这里,需要选择数据块的大小使压缩后的数据块在大小上相当于HDFS的块。
  • 使用支持压缩和分割的Sequence File(序列文件)。

对于大型文件,不要对整个文件使用不支持分割的压缩格式,因为这样会损失本地性优势,从而使降低MapReduce应用的性能。

那么我们如何在MapReduce中使用压缩呢?

如前所述,如果输入的文件是压缩过的,那么在被MapReduce读取时,它们会被自动解压,根据文件扩展名来决定应该使用哪一个压缩解码器。

如果要压缩MapReduce作业的输出,请在作业配置文件中将mapred.output.compress属性设置为true。将mapred.output.compression.codec属性设置为自己打算使用的压缩编码/解码器的类名,如下例所示,此应用程序运行最高气温作业从而产生压缩的输出结果:

public class MaxTemperatureWithCompression {
     public static void main(String[] args) throws IOException {
         if (args.length != 2) {
            System.err.println("Usage: MaxTemperatureWithCompression <input path> " + "<output path>");
            System.exit(-1);
         }
         JobConf conf = new JobConf(MaxTemperatureWithCompression.class);
         conf.setJobName("Max temperature with output compression");
         FileInputFormat.addInputPath(conf, new Path(args[0]));
         FileOutputFormat.setOutputPath(conf, new Path(args[1]));
         conf.setOutputKeyClass(Text.class);
         conf.setOutputValueClass(IntWritable.class);
         conf.setBoolean("mapred.output.compress", true);
         conf.setClass("mapred.output.compression.codec", GzipCodec.class,
         CompressionCodec.class);
         conf.setMapperClass(MaxTemperatureMapper.class);
         conf.setCombinerClass(MaxTemperatureReducer.class);
         conf.setReducerClass(MaxTemperatureReducer.class);
         JobClient.runJob(conf);
     }
}
我们使用压缩过的输入来运行此应用程序(其实不必像它一样使用和输人相同的格式压缩输出),如下所示:

% hadoop MaxTemperatureWithCompression input/ncdc/sample.txt.gz output

最终输出的每部分都是压缩过的。在本例中。只有一部分:

% gunzip -c output/part-00000.gz
1949 111
1950 22

如果为输出使用了一系列文件,可以设置mapred.output.compression.type属性来控制压缩类型,默认为RECORD,它压缩单独的记录。将它改为BLOCK,则可以压缩一组记录。由于它有更好的压缩比,所以推荐使用。

map作业输出结果的压缩

即使MapReduce应用使用非压缩的数据来读取和写入,我们也可以受益于压缩map阶段的中间输出。因为map作业的输出会被写入磁盘并通过网络传输到reducer节点,所以如果使用LZO之类的快速压缩,能得到更好的性能,因为传输的数据量大大减少了。下表显示了启用rnap输出压缩和设置压缩格式的配置属性。

 属性名称  类型  默认值  描述
 mapred.compress.map.output  布尔  false  压缩map输出
 mapred.map.output.compression.codec  类  org.apache.hadoop.io.compress.DefaultCodec  map输出使用的压缩编码/解码器

下面几行代码用于在map作业中启用gzip格式来压缩输出结果:

conf.setCompressMapOutput(true);
conf.setMapOutputCompressorClass(GzipCodec.class);

标签: Hadoop, MapReduce, 压缩
Posted by 架构点滴 @ 2011-12-10 11:38:18 阅读(741) 评论(0)
上一篇:Hadoop中的编码器和解码器
下一篇:Hadoop序列化中的Writable接口概述

Feedback

你还可以输入600/600个字符 发表评论
称呼: (必填) 登录 | 开通博客
邮箱: (选填) 你的邮箱地址不会被公开
网站: (选填)
验证码: (必填)