<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<rss version="2.0" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:trackback="http://madskills.com/public/xml/rss/module/trackback/" xmlns:wfw="http://wellformedweb.org/CommentAPI/" xmlns:slash="http://purl.org/rss/1.0/modules/slash/">
<channel>
<title><![CDATA[架构之美]]></title>
<link><![CDATA[http://www.itivy.com/arch]]></link>
<description><![CDATA[<b>青藤园架构设计专题博客，分享软件设计、云计算、网站架构相关技术和思想</b>]]></description>
<language><![CDATA[zh-cn]]></language>
<copyright><![CDATA[]]></copyright>
<webMaster><![CDATA[]]></webMaster>
<generator><![CDATA[]]></generator>
<Image><![CDATA[]]></Image>
<item>
<link><![CDATA[http://www.itivy.com/arch/archive/2011/12/22/hadoop-mapreduce-python.html]]></link>
<title><![CDATA[用python实现hadoop中的map/reduce查询]]></title>
<author><![CDATA[架构点滴]]></author>
<category><![CDATA[]]></category>
<pubDate>Thu, 22 Dec 2011 23:19:26 GMT</pubDate>
<guid><![CDATA[]]></guid>
<description><![CDATA[<p>摘要：</p>
<p>通过上一篇总结Hadoop中的集群环境配置和使用技巧的介绍，我们就假设已经拥有了一个可运行的Hadoop集群环境。以下的这篇文章主要是用python实现hadoop中的map/reduce查询。原文链接：<a target="_blank" href="http://slaytanic.blog.51cto.com/2057708/731750">http://slaytanic.blog.51cto.com/2057708/731750</a></p>
<p>条件，假设你已经装好了hadoop集群，配好了hdfs并可以正常运行。</p>
<p><pre class="brush:bash;">$hadoop dfs -ls /data/dw/explorer
Found 1 items
drwxrwxrwx     - rsync supergroup                    0 2011-11-30 01:06 /data/dw/explorer/20111129


$ hadoop dfs -ls /data/dw/explorer/20111129
Found 4 items
-rw-r--r--     3 rsync supergroup     12294748 2011-11-29 21:10 /data/dw/explorer/20111129/explorer_20111129_19_part-00000.lzo
-rw-r--r--     3 rsync supergroup             1520 2011-11-29 21:11 /data/dw/explorer/20111129/explorer_20111129_19_part-00000.lzo.index
-rw-r--r--     3 rsync supergroup     12337366 2011-11-29 22:09 /data/dw/explorer/20111129/explorer_20111129_20_part-00000.lzo
-rw-r--r--     3 rsync supergroup             1536 2011-11-29 22:10 /data/dw/explorer/20111129/explorer_20111129_20_part-00000.lzo.index</pre></p>
<div>数据格式如下</div>
<div><br />
</div>
<div>20111129/23:59:54 111.161.25.184 
182.132.25.243 &lt;Log_Explorer ProductVer="5.05.1026.1111" 
UUID="{C9B80A9B-704E-B106-9134-1ED3581D0123}"&gt;&lt;UserDoubleClick 
FileExt="mp3" AssociateKey="Audio.mp3" 
Count="1"/&gt;&lt;/Log_Explorer&gt;</div>
<div><br />
</div>
<div>1.map脚本取数据explorer_map.py<br />
<pre class="brush:python;">#!/usr/bin/python
#-*-coding:UTF-8 -*-
import sys
import cElementTree

debug = False#设置lzo文件偏移位
if debug:
        lzo = 0
else:
        lzo = 1

for line in sys.stdin:
        try:
                flags = line[:-1].split('\t')
#hadoop查询走标准输入，数据以\t分隔，去掉每行中的\n
                if len(flags) == 0:
                        break
                if len(flags) != 11+lzo:
#hadoop采用lzo则偏移位+1，lzo设置为False则+1
                        continue
                stat_date=flags[0+lzo]#日期
                stat_date_bar = stat_date[:4]+"-"+stat_date[4:6]+'-'+stat_date[6:8]#拼成2011-11-29格式
                version = flags[4+lzo]
                xmlstr = flags[10+lzo]
                #xmlstr=line
                dom = cElementTree.fromstring(xmlstr)
#xml字段对象，以下均为取值操作
                uuid = dom.attrib['UUID']
                node = dom.find('UserDoubleClick')
                associateKey=node.get('AssociateKey')
                associateKeys=associateKey.split('.')
                player = associateKeys[0]
                fileext=node.get('FileExt')
                count=node.get('Count')
                print stat_date_bar+','+version+','+fileext+','+player+','+associateKey+'\t'+count
#输出map后的数据，这里map不对数据做任何处理，只做取值，拼接操作
#将\t前的字符串作为key输入reduce，\t后的count作为reduce计算用的value
except Exception,e:
print e
#抛出异常 </pre>2.reduce脚本计算结果并输出explorer_red.py<br />
<pre class="brush:python;">#!/usr/bin/python
#-*-coding:UTF-8 -*-
import sys
import cElementTree
import os
import string

res = {}

for line in sys.stdin:
        try:
                flags = line[:-1].split('\t')
#拆分\t以获得map传过来的key和value
                if len(flags) != 2:
#\t切割后，如果数据有问题，元素多于2或者少于2则认为数据不合法，跳出继续下一行
                        continue
                skey= flags[0]
#取出第一个元素作为key
                count=int(flags[1])
#取出第二个元素作为value
                if res.has_key(skey) == False:
                        res[skey]=0
                res[skey] += count
#计算count总和
        except Exception,e:
                pass
#不抛出，继续执行

for key in res.keys():
        print key+','+'%s' % res[key]
#格式化输出，以放入临时文件</pre>3.放入crontab执行的脚本<br />
<pre class="brush:python;">#!/bin/sh

[ $1 ] &amp;&amp; day=$1 DATE=`date -d "$1" +%Y%m%d`
[ $1 ] || day=`date -d "1 day ago" +%Y%m%d`     DATE=`date -d "1 day ago" +%Y%m%d`
#取昨天日期

cd /opt/modules/hadoop/hadoop-0.20.203.0/
#进入hadoop工作目录
bin/hadoop jar contrib/streaming/hadoop-streaming-0.20.203.0.jar -file /home/rsync/explorer/explorer_map.py -file /home/rsync/explorer/explorer_red.py -mapper /home/rsync/explorer/explorer_map.py -reducer /home/rsync/explorer/explorer_red.py -inputformat com.hadoop.mapred.DeprecatedLzoTextInputFormat -input /data/dw/explorer/$DATE -output /tmp/explorer_$DATE
#执行map/reduce，并将排序完结果放入hdfs:///tmp/explorer

bin/hadoop fs -copyToLocal /tmp/explorer_$DATE /tmp
#将m/r结果从hdfs://tmp/explorer_$DATE 保存到本地/tmp下
bin/hadoop dfs -rmr /tmp/explorer_$DATE
#删除hdfs下临时文件夹

cd
#返回自身目录
cd explorer
#进入explorer文件夹
./rm.py $DATE
执行入库和删除临时文件夹脚本</pre>4.将/tmp生成的结果入库并删除临时文件夹<br />
<pre class="brush:python;">#!/usr/bin/python

import os
import sys
import string

if len(sys.argv) == 2:
                date = sys.argv[1:][0] #取脚本参数
                os.system ("mysql -h192.168.1.229 -ujobs -p223238 -P3306    bf5_data    -e \"load data local infile '/tmp/explorer_"+date+"/part-00000' into table explorer FIELDS TERMINATED
BY '\,' (stat_date,ver,FileExt,player,AssociateKey,count)\"")#执行入库sql语句，并用load方式将数据加载到统计表中
                os.system ("rm -rf /tmp/explorer_"+date)#删除map/reduce过的数据
else:
                print "Argv error"

#因为没有安装MySQLdb包，所以用运行脚本的方式加载数据。</pre>原始数据和最后完成的输出数据对比，红色为原数据，绿色为输出数据<br />
<pre class="brush:bash;">20111129/23:59:54 111.161.25.184 182.132.25.243 &lt;Log_Explorer ProductVer="5.05.1026.1111" UUID="{C9B80A9B-704E-B106-9134-1ED3581D0123}"&gt;&lt;UserDoubleClick FileExt="mp3" AssociateKey="Audio.mp3" Count="1"/&gt;&lt;/Log_Explorer&gt;

-----------------------------------------------------------

2011-11-29,5.05.1026.1111,mp3,Audio,Audio.mp3,1</pre><div>5.调试技巧</div>
<div><br />
</div>
<div>因为这种方式比较抽象，所以你很难得到一个直观的调试过程。建议调试如下<br />
<pre class="brush:python;">#将hadoop中的数据文本copy出来一个，lzo需要解压缩，然后将map中的debug模式置为True，也就是不加hadoop中的lzo偏移量。
#用head输入hadoop里的文件，通过管道操作放入map/reduce中执行，看输出结果

$head explorer_20111129 | explorer_map.py | explorer_red.py</pre>一天的数据大概几十个G，以前用awk和perl脚本跑需要至少半小时以上，改用map/reduce方式后，大概20几秒跑完，效率还是提高了很多的。</div>
</div>]]></description>
</item>

<item>
<link><![CDATA[http://www.itivy.com/arch/archive/2011/12/15/hadoop-cluster-config-usage.html]]></link>
<title><![CDATA[总结Hadoop中的集群环境配置和使用技巧]]></title>
<author><![CDATA[架构点滴]]></author>
<category><![CDATA[]]></category>
<pubDate>Thu, 15 Dec 2011 12:57:06 GMT</pubDate>
<guid><![CDATA[]]></guid>
<description><![CDATA[<h2>摘要：</h2>
<p>本文主要是和大家一起探讨Hadoop的集群配置（并非在单机上）的一些细节，通过对这些细节的详细描述，希望能帮助大家轻松搭建Hadoop集群环境，为搭建高性能Web打下坚实的基础。让我们一起来看看吧，原文如下：</p>
<h2>环境</h2>
<p>7台普通的机器，操作系统都是Linux。内存和CPU就不说了，反正Hadoop一大特点就是机器在多不在精。JDK必须是1.5以上的，这个切记。7台机器的机器名务必不同，后续会谈到机器名对于MapReduce有很大的影响。</p>
<h2>部署考虑</h2>
<p>正如上面我描述的，对于Hadoop的集群来说，可以分成两大类角色：Master和Slave，前者主要配置NameNode和
JobTracker的角色，负责总管分布式数据和分解任务的执行，后者配置DataNode和TaskTracker的角色，负责分布式数据存储以及任
务的执行。本来我打算看看一台机器是否可以配置成Master，同时也作为Slave使用，不过发现在NameNode初始化的过程中以及
TaskTracker执行过程中机器名配置好像有冲突（NameNode和TaskTracker对于Hosts的配置有些冲突，究竟是把机器名对应
IP放在配置前面还是把Localhost对应IP放在前面有点问题，不过可能也是我自己的问题吧，这个大家可以根据实施情况给我反馈）。最后反正决定一
台Master，六台Slave，后续复杂的应用开发和测试结果的比对会增加机器配置。</p>
<h2>实施步骤</h2>
<ol><li>在所有的机器上都建立相同的目录，也可以就建立相同的用户，以该用户的home路径来做hadoop的安装路径。例如我在所有的机器上都建立了<code>/home/wenchu</code>。</li>
<li>下载Hadoop，先解压到Master上。这里我是下载的0.17.1的版本。此时Hadoop的安装路径就是<code>/home/wenchu/hadoop-0.17.1</code>。</li>
<li>解压后进入conf目录，主要需要修改以下文件：<code>hadoop-env.sh</code>，<code>hadoop-site.xml</code>、<code>masters</code>、<code>slaves</code>。
    <p>Hadoop的基础配置文件是<code>hadoop-default.xml</code>，看Hadoop的代码可以知道，默认建立一个Job的时候会建立Job的Config，Config首先读入<code>hadoop-default.xml</code>的配置，然后再读入<code>hadoop-site.xml</code>的配置（这个文件初始的时候配置为空），<code>hadoop-site.xml</code>中主要配置你需要覆盖的<code>hadoop-default.xml</code>的系统级配置，以及你需要在你的MapReduce过程中使用的自定义配置（具体的一些使用例如final等参考文档）。</p>
    <p>以下是一个简单的<code>hadoop-site.xml</code>的配置：</p>
<pre class="brush:xml;">&lt;?xml version="1.0"?&gt;
&lt;?xml-stylesheet type="text/xsl" href="configuration.xsl"?&gt;
&lt;!-- Put site-specific property overrides in this file. --&gt;
&lt;configuration&gt;
&lt;property&gt;
   &lt;name&gt;fs.default.name&lt;/name&gt;//你的namenode的配置，机器名加端口
   &lt;value&gt;hdfs://10.2.224.46:54310/&lt;/value&gt;
&lt;/property&gt;
&lt;property&gt;
   &lt;name&gt;mapred.job.tracker&lt;/name&gt;//你的JobTracker的配置，机器名加端口
   &lt;value&gt;hdfs://10.2.224.46:54311/&lt;/value&gt;
&lt;/property&gt;
&lt;property&gt;
   &lt;name&gt;dfs.replication&lt;/name&gt;//数据需要备份的数量，默认是三
   &lt;value&gt;1&lt;/value&gt;
&lt;/property&gt;
&lt;property&gt;
    &lt;name&gt;hadoop.tmp.dir&lt;/name&gt;//Hadoop的默认临时路径，这个最好配置，如果在新增节点或者其他情况下莫名其妙的DataNode启动不了，就删除此文件中的tmp目录即可。不过如果删除了NameNode机器的此目录，那么就需要重新执行NameNode格式化的命令。
    &lt;value&gt;/home/wenchu/hadoop/tmp/&lt;/value&gt;
&lt;/property&gt;
&lt;property&gt;
   &lt;name&gt;mapred.child.java.opts&lt;/name&gt;//java虚拟机的一些参数可以参照配置
   &lt;value&gt;-Xmx512m&lt;/value&gt;
&lt;/property&gt;
&lt;property&gt;
  &lt;name&gt;dfs.block.size&lt;/name&gt;//block的大小，单位字节，后面会提到用处，必须是512的倍数，因为采用crc作文件完整性校验，默认配置512是checksum的最小单元。
  &lt;value&gt;5120000&lt;/value&gt;
  &lt;description&gt;The default block size for new files.&lt;/description&gt;
&lt;/property&gt;
&lt;/configuration&gt;</pre><code>hadoop-env.sh</code>文件只需要修改一个参数：<br />
<pre class="brush:bash;"># The java implementation to use.  Required.
 export JAVA_HOME=/usr/ali/jdk1.5.0_10</pre><p></p>
<p>配置你的Java路径，记住一定要1.5版本以上，免得莫名其妙出现问题。</p>
    <p>Masters中配置Masters的IP或者机器名，如果是机器名那么需要在<code>/etc/hosts</code>中有所设置。Slaves中配置的是Slaves的IP或者机器名，同样如果是机器名需要在<code>/etc/hosts</code>中有所设置。范例如下，我这里配置的都是IP：</p>
<pre class="brush:bash;">Masters:
 10.2.224.46

  Slaves:
 10.2.226.40
 10.2.226.39
 10.2.226.38
 10.2.226.37
 10.2.226.41
 10.2.224.36</pre><p></p>
<p></p>
</li>
<li value="4">建立Master到每一台Slave的SSH受信证书。由于Master将会通过SSH启动所有Slave的Hadoop，所以需要建立单向或者双向证书保证命令执行时不需要再输入密码。在Master和所有的Slave机器上执行：<code>ssh-keygen -t rsa</code>。执行此命令的时候，看到提示只需要回车。然后就会在<code>/root/.ssh/</code>下面产生<code>id_rsa.pub</code>的证书文件，通过scp将Master机器上的这个文件拷贝到Slave上（记得修改名称），例如：<code>scp root@masterIP:/root/.ssh/id_rsa.pub /root/.ssh/46_rsa.pub</code>，然后执行<code>cat /root/.ssh/46_rsa.pub &gt;&gt;/root/.ssh/authorized_keys</code>，建立<code>authorized_keys</code>文
件即可，可以打开这个文件看看，也就是rsa的公钥作为key，user@IP作为value。此时可以试验一下，从master 
ssh到slave已经不需要密码了。由slave反向建立也是同样。为什么要反向呢？其实如果一直都是Master启动和关闭的话那么没有必要建立反
向，只是如果想在Slave也可以关闭Hadoop就需要建立反向。</li>
<li>将Master上的Hadoop通过scp拷贝到每一个Slave相同的目录下，根据每一个Slave的<code>Java_HOME</code>的不同修改其<code>hadoop-env.sh</code>。</li>
<li>修改Master上<code>/etc/profile：</code><br />
 新增以下内容：（具体的内容根据你的安装路径修改，这步只是为了方便使用）<br />
<pre class="brush:bash;">export HADOOP_HOME=/home/wenchu/hadoop-0.17.1
export PATH=$PATH:$HADOOP_HOME/bin</pre>     
    修改完毕后，执行<code>source /etc/profile</code>来使其生效。</li>
<li>在Master上执行<code>Hadoop namenode –format</code>，这是第一需要做的初始化，可以看作格式化吧，以后除了在上面我提到过删除了Master上的<code>hadoop.tmp.dir</code>目录，否则是不需要再次执行的。</li>
<li>然后执行Master上的<code>start-all.sh</code>，这个命令可以直接执行，因为在6中已经添加到了path路径，这个命令是启动hdfs和mapreduce两部分，当然你也可以分开单独启动hdfs和mapreduce，分别是bin目录下的<code>start-dfs.sh</code>和<code>start-mapred.sh</code>。</li>
<li>检查Master的logs目录，看看Namenode日志以及JobTracker日志是否正常启动。</li>
<li>检查Slave的logs目录看看Datanode日志以及TaskTracker日志是否正常。</li>
<li>如果需要关闭，那么就直接执行<code>stop-all.sh</code>即可。</li>
</ol>
<p>以上步骤就可以启动Hadoop的分布式环境，然后在Master的机器进入Master的安装目录，执行<code>hadoop jar hadoop-0.17.1-examples.jar wordcount</code>输入路径和输出路径，就可以看到字数统计的效果了。此处的输入路径和输出路径都指的是HDFS中的路径，因此你可以首先通过拷贝本地文件系统中的目录到HDFS中的方式来建立HDFS中的输入路径：</p>
<p><code>hadoop dfs -copyFromLocal /home/wenchu/test-in test-in。</code>其中<code>/home/wenchu/test-in</code>是本地路径，<code>test-in</code>是将会建立在HDFS中的路径，执行完毕以后可以通过<code>hadoop dfs –ls</code>看到test-in目录已经存在，同时可以通过<code>hadoop dfs –ls test-in</code>查看里面的内容。输出路径要求是在HDFS中不存在的，当执行完那个demo以后，就可以通过<code>hadoop dfs –ls </code>输出路径看到其中的内容，具体文件的内容可以通过<code>hadoop dfs –cat</code>文件名称来查看。</p>
<p>经验总结和注意事项（这部分是我在使用过程中花了一些时间走的弯路）：</p>
<ol><li>Master和Slave上的几个conf配置文件不需要全部同步，如果确定都是通过Master去启动和关闭，那么Slave机器上的配置不需要去维护。但如果希望在任意一台机器都可以启动和关闭Hadoop，那么就需要全部保持一致了。</li>
<li>Master和Slave机器上的<code>/etc/hosts</code>中必须把集群中机器都配置上去，就算在各个配置文件中
使用的是IP。这个吃过不少苦头，原来以为如果配成IP就不需要去配置Host，结果发现在执行Reduce的时候总是卡住，在拷贝的时候就无法继续下
去，不断重试。另外如果集群中如果有两台机器的机器名如果重复也会出现问题。</li>
<li>如果在新增了节点或者删除节点的时候出现了问题，首先就去删除Slave的<code>hadoop.tmp.dir</code>，然后重新启动试试看，如果还是不行那就干脆把Master的<code>hadoop.tmp.dir</code>删除（意味着dfs上的数据也会丢失），如果删除了Master的<code>hadoop.tmp.dir</code>，那么就需要重新<code>namenode –format</code>。</li>
<li>Map任务个数以及Reduce任务个数配置。前面分布式文件系统设计提到一个文件被放入到分布式文件系统中，会被分割成多个block放置到每一个的DataNode上，默认<code>dfs.block.size</code>应该是64M，也就是说如果你放置到HDFS上的数据小于64，那么将只有一个Block，此时会被放置到某一个DataNode中，这个可以通过使用命令：<code>hadoop dfsadmin –report</code>就可以看到各个节点存储的情况。也可以直接去某一个DataNode查看目录：<code>hadoop.tmp.dir/dfs/data/current</code>就
可以看到那些block了。Block的数量将会直接影响到Map的个数。当然可以通过配置来设定Map和Reduce的任务个数。Map的个数通常默认
和HDFS需要处理的blocks相同。也可以通过配置Map的数量或者配置minimum split size来设定，实际的个数为：<code>max(min(block_size,data/#maps),min_split_size)</code>。Reduce可以通过这个公式计算：<code>0.95*num_nodes*mapred.tasktracker.tasks.maximum</code>。</li>
</ol>
<p>总的来说出了问题或者启动的时候最好去看看日志，这样心里有底。</p>
<h2>Hadoop中的命令（Command）总结</h2>
<p>这部分内容其实可以通过命令的Help以及介绍了解，我主要侧重于介绍一下我用的比较多的几个命令。Hadoop dfs 这个命令后面加参数就是对于HDFS的操作，和Linux操作系统的命令很类似，例如：</p>
<ul><li><code>Hadoop dfs –ls</code>就是查看/usr/root目录下的内容，默认如果不填路径这就是当前用户路径；</li>
<li><code>Hadoop dfs –rmr xxx</code>就是删除目录，还有很多命令看看就很容易上手；</li>
<li><code>Hadoop dfsadmin –report</code>这个命令可以全局的查看DataNode的情况；</li>
<li><code>Hadoop job</code>后面增加参数是对于当前运行的Job的操作，例如list,kill等；</li>
<li><code>Hadoop balancer</code>就是前面提到的均衡磁盘负载的命令。</li>
</ul>
<p>其他就不详细介绍了。</p>
<p>作者：岑文初&nbsp;&nbsp; 原文链接：http://www.infoq.com/cn/articles/hadoop-config-tip</p>]]></description>
</item>

<item>
<link><![CDATA[http://www.itivy.com/arch/archive/2011/12/14/hadoop-customize-writable.html]]></link>
<title><![CDATA[Hadoop如何实现自定义的Writable]]></title>
<author><![CDATA[架构点滴]]></author>
<category><![CDATA[]]></category>
<pubDate>Wed, 14 Dec 2011 10:05:12 GMT</pubDate>
<guid><![CDATA[]]></guid>
<description><![CDATA[<p><b>Hadoop</b>自带一系列有用的Writable实现，可以满足绝大多数用途。但有时，我们需要编写自己的自定义实现。通过<b>自定义Writable</b>，我们能够完全控制二进制表示和排序顺序。Writable是MapReduce数据路径的核心，所以调整二进制表示对其性能有显著影响。现有的Hadoop Writable应用已得到很好的优化，但为了对付更复杂的结构，最好创建一个新的Writable类型，而不是使用已有的类型。</p>
<p>为了演示如何创建一个<b>自定义Writable</b>，我们编写了一个表示一对字符串的实现，名为TextPair。</p>
<p></p>
<pre class="brush:java;">import java.io.*;
import org.apache.hadoop.io.*;
public class TextPair implements WritableComparable<textpair> {
private Text first;
private Text second;
public TextPair() {
set(new Text(), new Text());
}
public TextPair(String first, String second) {
set(new Text(first), new Text(second));
}
public TextPair(Text first, Text second) {
set(first, second);
}
public void set(Text first, Text second) {
this.first = first;
this.second = second;
}
public Text getFirst() {
return first;
}
public Text getSecond() {
return second;
}
@Override
public void write(DataOutput out) throws IOException {
first.write(out);
second.write(out);
}
@Override
public void readFields(DataInput in) throws IOException {
first.readFields(in);
second.readFields(in);
}
@Override
public int hashCode() {
return first.hashCode() * 163 + second.hashCode();
}
@Override
public boolean equals(Object o) {
if (o instanceof TextPair) {
TextPair tp = (TextPair) o;
return first.equals(tp.first) &amp;&amp; second.equals(tp.second);
}
return false;
}
@Override
public String toString() {
return first + "\t" + second;
}
@Override
public int compareTo(TextPair tp) {
int cmp = first.compareTo(tp.first);
if (cmp != 0) {
return cmp;
}
return second.compareTo(tp.second);
}
}</textpair></pre>此实现的第一部分直观易懂：有两个Text实例变量(first和second)和相关的构造函数、get方法和set方法。所有的Writable实现都必须有一个默认的构造函数，以便MapReduce框架能够对它们进行实例化，进而调用readFields()方法来填充它们的字段。Writable实例是易变的、经常重用的，所以我们应该尽量避免在write()或readFields()方法中分配对象。<p></p>
<p>通过委托给每个Text对象本身，TextPair的write()方法依次序列化输出流中的每一个Text对象。同样，也通过委托给Text对象本身，readFields()反序列化输人流中的字节。DataOutput和DataInput接口有丰富的整套方法用于序列化和反序列化Java基本类型，所以在一般情况下，我们能够完全控制Writable对象的数据传输格式。</p>
<p>正如为Java写的任意值对象一样，我们会重写java.lang.Object的hashCode()方法，equals()方法和toString()方法。HashPartitioner使用hashCode()方法来选择reduce分区，所以应该确保写一个好的哈希函数来确保reduce函数的分区在大小上是相当的。</p>
<p>TextPair是WritableComparable的实现，所以它提供了compareTo()方法的实现，加入我们希望的顺序：它通过一个一个String逐个排序。请注意，TextPair不同于前面的TextArrayWritable类(除了它可以存储Text对象数之外)，因为TextArrayWritable只是一个Writable，而不是WritableComparable。</p>
<p>实现一个快速的RawComparator</p>
<p>上例中所示代码能够有效工作，但还可以进一步优化。正如前面所述，在MapReduce中，TextPair被用作键时，它必须被反序列化为要调用的compareTo()方法的对象。是否可以通过查看其序列化表示的方式来比较两个TextPair对象。</p>
<p>事实证明，我们可以这样做，因为TextPair由两个Text对象连接而成，二进制Text对象表示是一个可变长度的整型，包含UTF-8表示的字符串中的字节数，后跟UTF-8字节本身。关键在于读取开始的长度。从而得知第一个Text对象的字节表示有多长，然后可以委托Text对象的RawComparator，然后利用第一或者第二个字符串的偏移量来调用它。下面例子给出了具体方法(注意，该代码嵌套在TextPair类中)。</p>
<p></p>
<pre class="brush:cpp;">public static class Comparator extends WritableComparator {
private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator();
public Comparator() {
super(TextPair.class);
}
@Override
public int compare(byte[] b1, int s1, int l1,
byte[] b2, int s2, int l2) {
try {
int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1);
int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2);
int cmp = TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2);
if (cmp != 0) {
return cmp;
}
return TEXT_COMPARATOR.compare(b1, s1 + firstL1, l1 - firstL1,
b2, s2 + firstL2, l2 - firstL2);
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
}
}
static {
WritableComparator.define(TextPair.class, new Comparator());
}</pre>事实上，我们一般都是继承WritableComparator，而不是直接实现RawComparator，因为它提供了一些便利的方法和默认实现。这段代码的精妙之处在于计算firstL1和firstL2,每个字节流中第一个Text字段的长度。每个都由可变长度的整型(由WritableUtils的decodeVIntSize()返回)和它的编码值(由readVInt()返问)组成。<p></p>
<p>静态代码块注册原始的comparator以便MapReduce每次看到TextPair类，就知道使用原始comparator作为其默认comparator。</p>
<p>自定义comparator</p>
<p>从TextPair可知，编写原始的cornparator比较费力，因为必须处理字节级别的细节。如果需要编写自己的实现，org.apache.hadoop.io包中Writable的某些前瞻性实现值得研究研究。WritableUtils的有效方法也比较非常方便。</p>
<p>如果可能，还应把自定义comparator写为RawComparators。这些comparator实现的排序顺序不同于默认comparator定义的自然排序顺序。下面的例子显示了TextPair的comparator，称为First Comparator。只考虑了一对Text对象中的第一个字符串。请注意，我们重写了compare()方法使其使用对象进行比较，所以两个compare()方法的语义是相同的。</p>
<p></p>
<pre class="brush:cpp;">public static class FirstComparator extends WritableComparator {
private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator();
public FirstComparator() {
super(TextPair.class);
}
@Override
public int compare(byte[] b1, int s1, int l1,
byte[] b2, int s2, int l2) {
try {
int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1);
int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2);
return TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2);
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
if (a instanceof TextPair &amp;&amp; b instanceof TextPair) {
return ((TextPair) a).first.compareTo(((TextPair) b).first);
}
return super.compare(a, b);
}
}</pre>好了，Hadoop实现<b>自定义Writable</b>就介绍到这里，比较晚了，改天再研究。<br />
<p></p>]]></description>
</item>

<item>
<link><![CDATA[http://www.itivy.com/arch/archive/2011/12/12/hadoop-writable-interface-introduction.html]]></link>
<title><![CDATA[Hadoop序列化中的Writable接口概述]]></title>
<author><![CDATA[架构点滴]]></author>
<category><![CDATA[]]></category>
<pubDate>Mon, 12 Dec 2011 09:58:47 GMT</pubDate>
<guid><![CDATA[]]></guid>
<description><![CDATA[<p>在<b>Hadoop</b>中，<b>Writable</b>接口定义了两个方法：一个用于将其状态写入二进制格式的DataOutput流，另一个用于从二进制格式的DataInput流读取其态。</p>
<p></p>
<pre class="brush:java;">package org.apache.hadoop.io;
import java.io.DataOutput;
import java.io.DataInput;
import java.io.IOException;
public interface Writable {
    void write(DataOutput out) throws IOException;
    void readFields(DataInput in) throws IOException;
}</pre>让我们来看一个特别的<b>Writable</b>，看看可以对它进行哪些操作。我们要使用IntWritable，这是一个Java的int对象的封装。可以使用set()函数来创建和设置它的值：<br />
<pre class="brush:java;">IntWritable writable = new IntWritable();
writable.set(163);</pre>类似地，我们也可以使用构造函数:<br />
<pre class="brush:java;">IntWritable writable = new IntWritable(163);</pre>为了检查IntWritable的序列化形式，我们写一个小的辅助方法，它把一个java.io.ByteArrayOutputStream封装到java.io.DataOutputStream中（java.io.DataOutput的一个实现），以此来捕获序列化的数据流中的字节:<br />
<pre class="brush:java;">public static byte[] serialize(Writable writable) throws IOException {
    ByteArrayOutputStream out = new ByteArrayOutputStream();
    DataOutputStream dataOut = new DataOutputStream(out);
    writable.write(dataOut);
    dataOut.close();
    return out.toByteArray();
}</pre>整数用四个字节写入(我们使用JUnit 4断言):<br />
<pre class="brush:java;">byte[] bytes = serialize(writable);
assertThat(bytes.length, is(4));</pre>字节使用大端顺序写入(所以，最重要的字节写在数据流的开始处，这是由java.io.DataOutput接口规定的)，我们可以使用Hadoop的StringUtils方法看到它们的十六进制表示:<br />
<pre class="brush:java;">assertThat(StringUtils.byteToHexString(bytes), is("000000a3"));</pre>让我们再来试试反序列化。我们创建一个帮助方法来从一个字节数组读取一个Writable对象:<br />
<pre class="brush:java;">public static byte[] deserialize(Writable writable, byte[] bytes)
throws IOException {
    ByteArrayInputStream in = new ByteArrayInputStream(bytes);
    DataInputStream dataIn = new DataInputStream(in);
    writable.readFields(dataIn);
    dataIn.close();
    return bytes;
}</pre>我们构造一个新的、缺值的IntWritable，然后调用deserialize()方法来读取刚写入的输出流。然后发现它的值(使用get方法检索得到)还是原来的值163：<br />
<pre class="brush:java;">IntWritable newWritable = new IntWritable();
deserialize(newWritable, bytes);
assertThat(newWritable.get(), is(163));</pre><span style="font-weight:bold;">WritableComparable 和comparators</span><p></p>
<p>IntWritable实现了WritableComparable接口，后者是Writable和java.lang.Comparable接口的子接口。</p>
<pre class="brush:java;">package org.apache.hadoop.io;
public interface WritableComparable<t> extends Writable, Comparable<t> {
}</t></t></pre>类型的比较对MapReduce而言至关重要的，键和键之间的比较是在排序阶段完成。Hadoop提供的一个优化方法是从Java Comparator的RawComparator扩展：<br />
<pre class="brush:java;">package org.apache.hadoop.io;
import java.util.Comparator;
public interface RawComparator<t> extends Comparator<t> {
     public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);
}</t></t></pre>这个接口允许执行者比较从流中读取的未被反序列化为对象的记录，从而省去了创建对象的所有开销。例如，IntWritables的comparator使用原始的compare()方法从每个字节数组的指定开始位置（S1和S2）和长度（L1和L2）读取整数b1和b2然后直接进行比较。<p></p>
<p>WritableComparator是RawComparator对WritableComparable类的一个通用实现。它提供两个主要功能。首先，它提供了一个默认的对原始compare()函数的调用，对从数据流对要比较的对象进行反序列化，然后调用对象的compare()方法。其次，它充当的是RawComparator实例的一个工厂方法（Writable方法已经注册)。例如，为获得IntWritable的comparator，我们只需使用:</p>
<pre class="brush:java;">RawComparator<intwritable> comparator = WritableComparator.get(IntWritable.class);</intwritable></pre>comparator可以用来比较两个IntWritable：<br />
<pre class="brush:java;">IntWritable w1 = new IntWritable(163);
IntWritable w2 = new IntWritable(67);
assertThat(comparator.compare(w1, w2), greaterThan(0));</pre>或者它们的序列化描述：<br />
<pre class="brush:bash;">byte[] b1 = serialize(w1);
byte[] b2 = serialize(w2);
assertThat(comparator.compare(b1, 0, b1.length, b2, 0, b2.length), greaterThan(0));</pre>OK，到这里我们就对<b>Hadoop序列化</b>中的Writable接口介绍完了，比较基础，下次我们来讲讲如何在<b>Hadoop</b>中实现自定义的<b>Writable</b>，下回见！<p></p>]]></description>
</item>

<item>
<link><![CDATA[http://www.itivy.com/arch/archive/2011/12/10/hadoop-mapreduce-compression.html]]></link>
<title><![CDATA[Hadoop如何在MapReduce中使用压缩]]></title>
<author><![CDATA[架构点滴]]></author>
<category><![CDATA[]]></category>
<pubDate>Sat, 10 Dec 2011 11:38:18 GMT</pubDate>
<guid><![CDATA[]]></guid>
<description><![CDATA[<p>在考虑如何压缩那些将由<b>MapReduce</b>处理的数据时，考虑压缩格式是否支持分割是很重要的。考虑存储在HDFS中的未<b>压缩</b>的文件，其大小为1GB，HDFS的块大小为64MB，所以该文件将被存储为16块，将此文件用作输入的<b>MapReduce</b>作业会创建1个输人分片（split ,也称为“分块”。对于block，我们统一称为“块”。）每个分片都被作为一个独立map任务的输入单独进行处理。</p>
<p>现在假设。该.文件是一个grip格式的压缩文件，压缩后的大小为1GB。和前面一样，<b>HDFS</b>将此文件存储为16块。然而，针对每一块创建一个分块是没有用的，因为不可能从gzip数据流中的任意点开始读取，map任务也不可能独立于其他分块只读取一个分块中的数据。gzip格式使用DEFLATE来存储压缩过的数据，DEFLATE将数据作为一系列压缩过的块进行存储。问题是，每块的开始没有指定用户在数据流中任意点定位到下一个块的起始位置，而是其自身与数据流同步。因此，gzip不支持分割(块)机制。</p>
<p>在这种情况下，MapReduce不分割gzip格式的文件，因为它知道输入是gzip压缩格式的(通过文件扩展名得知)，而gzip压缩机制不支持分割机制。这样是以牺牲本地化为代价:一个map任务将处理16个HDFS块。大都不是map的本地数据。与此同时，因为map任务少，所以作业分割的粒度不够细，从而导致运行时间变长。</p>
<p>在我们假设的例子中，如果是一个LZO格式的文件，我们会碰到同样的问题，因为基本压缩格式不为reader提供方法使其与流同步。但是，bzip2格式的压缩文件确实提供了块与块之间的同步标记(一个48位的PI近似值)，因此它支持分割机制。（<a target="_blank" href="http://www.itivy.com/arch/archive/2011/12/10/hadoop-codec-usage.html">上一篇</a>中我们列出了每种压缩格式是否支持分割。）</p>
<p>对于文件的收集，这些问题会稍有不同。ZIP是存档格式，因此它可以将多个文件合并为一个ZIP文件。每个文件单独压缩，所有文档的存储位置存储在ZIP文件的尾部。这个属性表明ZIP文件支持文件边界处分割，每个分片中包括ZIP压缩文件中的一个或多个文件。</p>
<p style="font-weight:bold;"><span style="color:#e53333;">在MapReduce我们应该使用哪种压缩格式？</span></p>
<p>根据应用的具体情况来决定应该使用哪种压缩格式。就个人而言，更趋向于使用最快的速度压缩，还是使用最优的空间压缩？一般来说，应该尝试不同的策略，并用具有代表性的数据集进行测试，从而找到最佳方法。对于那些大型的、没有边界的文件，如日志文件，有以下选项。</p>
<ul><li>存储未压缩的文件。</li>
<li>使用支持分割机制的压缩格式，如bzip2。</li>
<li>在应用中将文件分割成几个大的数据块，然后使用任何一种支持的压缩格式单独压缩每个数据块(可不用考虑压缩格式是否支持分割)。在这里，需要选择数据块的大小使压缩后的数据块在大小上相当于HDFS的块。</li>
<li>使用支持压缩和分割的Sequence File(序列文件)。</li>
</ul>
<p>对于大型文件，不要对整个文件使用不支持分割的压缩格式，因为这样会损失本地性优势，从而使降低MapReduce应用的性能。</p>
<p style="font-weight:bold;"><span style="color:#e53333;">那么我们如何在MapReduce中使用压缩呢？</span></p>
<p>如前所述，如果输入的文件是压缩过的，那么在被MapReduce读取时，它们会被自动解压，根据文件扩展名来决定应该使用哪一个压缩解码器。</p>
<p>如果要压缩MapReduce作业的输出，请在作业配置文件中将mapred.output.compress属性设置为true。将mapred.output.compression.codec属性设置为自己打算使用的压缩编码/解码器的类名，如下例所示，此应用程序运行最高气温作业从而产生压缩的输出结果：</p>
<pre class="brush:java;">public class MaxTemperatureWithCompression {
     public static void main(String[] args) throws IOException {
         if (args.length != 2) {
            System.err.println("Usage: MaxTemperatureWithCompression &lt;input path&gt; " + "&lt;output path&gt;");
            System.exit(-1);
         }
         JobConf conf = new JobConf(MaxTemperatureWithCompression.class);
         conf.setJobName("Max temperature with output compression");
         FileInputFormat.addInputPath(conf, new Path(args[0]));
         FileOutputFormat.setOutputPath(conf, new Path(args[1]));
         conf.setOutputKeyClass(Text.class);
         conf.setOutputValueClass(IntWritable.class);
         conf.setBoolean("mapred.output.compress", true);
         conf.setClass("mapred.output.compression.codec", GzipCodec.class,
         CompressionCodec.class);
         conf.setMapperClass(MaxTemperatureMapper.class);
         conf.setCombinerClass(MaxTemperatureReducer.class);
         conf.setReducerClass(MaxTemperatureReducer.class);
         JobClient.runJob(conf);
     }
}</pre>我们使用压缩过的输入来运行此应用程序(其实不必像它一样使用和输人相同的格式压缩输出)，如下所示:<p></p>
<p>% hadoop MaxTemperatureWithCompression input/ncdc/sample.txt.gz output</p>
<p>最终输出的每部分都是压缩过的。在本例中。只有一部分:</p>
<p>% gunzip -c output/part-00000.gz<br />
1949 111<br />
1950 22</p>
<p>如果为输出使用了一系列文件，可以设置mapred.output.compression.type属性来控制压缩类型，默认为RECORD，它压缩单独的记录。将它改为BLOCK，则可以压缩一组记录。由于它有更好的压缩比，所以推荐使用。</p>
<p style="font-weight:bold;">map作业输出结果的压缩</p>
<p>即使<b>MapReduce</b>应用使用非压缩的数据来读取和写入，我们也可以受益于压缩map阶段的中间输出。因为map作业的输出会被写入磁盘并通过网络传输到reducer节点，所以如果使用LZO之类的快速压缩，能得到更好的性能，因为传输的数据量大大减少了。下表显示了启用rnap输出压缩和设置压缩格式的配置属性。</p>
<p><table style="width:99%;" border="1" bordercolor="#000000" cellpadding="2" cellspacing="0"><tbody><tr><td>&nbsp;<span style="font-weight:bold;">属性名称</span></td>
<td>&nbsp;<span style="font-weight:bold;">类型</span></td>
<td>&nbsp;<span style="font-weight:bold;">默认值</span></td>
<td>&nbsp;<span style="font-weight:bold;">描述</span></td>
</tr>
<tr><td>&nbsp;mapred.compress.map.output</td>
<td>&nbsp;布尔</td>
<td>&nbsp;false</td>
<td>&nbsp;压缩map输出</td>
</tr>
<tr><td>&nbsp;mapred.map.output.compression.codec</td>
<td>&nbsp;类</td>
<td>&nbsp;org.apache.hadoop.io.compress.DefaultCodec</td>
<td>&nbsp;map输出使用的压缩编码/解码器</td>
</tr>
</tbody>
</table>
</p>
<p>下面几行代码用于在map作业中启用gzip格式来<b>压缩</b>输出结果:</p>
<p>conf.setCompressMapOutput(true);<br />
conf.setMapOutputCompressorClass(GzipCodec.class);</p>]]></description>
</item>

<item>
<link><![CDATA[http://www.itivy.com/arch/archive/2011/12/10/hadoop-codec-usage.html]]></link>
<title><![CDATA[Hadoop中的编码器和解码器]]></title>
<author><![CDATA[架构点滴]]></author>
<category><![CDATA[]]></category>
<pubDate>Sat, 10 Dec 2011 01:17:27 GMT</pubDate>
<guid><![CDATA[]]></guid>
<description><![CDATA[<p><b>编码器</b>和<b>解码器</b>用以执行压缩解压算法。在<b>Hadoop</b>里，编码/解码器是通过一个压缩解码器接口实现的。因此，例如，GzipCodec封装了gzip压缩的压缩和解压算法。下表列出了Hadoop可用的编码/解码器。</p>
<p><table style="width:99%;" border="1" bordercolor="#000000" cellpadding="2" cellspacing="0"><tbody><tr><td>&nbsp;<span style="font-weight:bold;">压缩格式</span></td>
<td>&nbsp;<span style="font-weight:bold;">Hadoop压缩编码/解码器</span></td>
</tr>
<tr><td>&nbsp;DEFLATE</td>
<td>&nbsp;org.apache.hadoop.io.compress.DefaultCodec</td>
</tr>
<tr><td>&nbsp;gzip</td>
<td>&nbsp;org.apache.hadoop.io.compress.GzipCodec</td>
</tr>
<tr><td>&nbsp;bzip2</td>
<td>&nbsp;org.apache.hadoop.io.compress.BZip2Codec</td>
</tr>
<tr><td>&nbsp;LZO</td>
<td>&nbsp;com.hadoop.compression.lzo.LzopCodec</td>
</tr>
</tbody>
</table>
</p>
<p>LZO格式是基于GPL许可的，不能通过Apache来分发许可，基于此，它的hadoop}编码/解码器必须单独下载，地址是http://code.google.com/p/hadoop-gpl-compression/。lzop编码/解码器兼容干lzop工具，它其实就是LZO格式，但额外还有头部，它正是我们想要的。还有一个纯LZO格式的编码/解码器LzoCodec，它使用.lzo_deflate作为扩展名(根据DEFLATE类推，是没有头部的gzip格式）。</p>
<p>CompressionCodec对流进行压缩和解压缩</p>
<p>CompressionCodec有两个方法可以用于轻松地压缩或解压缩数据。要想对正在被写入一个输出流的数据进行压缩，我们可以使用createOutputStream(OutputStreamout)方法创建一个CompressionOutputStream（未压缩的数据将被写到此)，将其以压缩格式写入底层的流。相反，要想对从输入流读取而来的数据进行解压缩，则调用createInputStream(InputStreamin)函数，从而获得一个CompressionInputStream,，从而从底层的流读取未压缩的数据。CompressionOutputStream和CompressionInputStream类似干java.util.zip.DeflaterOutputStream和java.util.zip.DeflaterInputStream，前两者还可以提供重置其底层压缩和解压缩功能，当把数据流中的section压缩为单独的块时，这比较重要。比如SequenceFile。</p>
<p>下例中说明了如何使用API来压缩从标谁输入读取的数据及如何将它写到标准输出：</p>
<pre class="brush:java;">public class StreamCompressor {
public static void main(String[] args) throws Exception {
String codecClassname = args[0];
Class&lt;?&gt; codecClass = Class.forName(codecClassname);
Configuration conf = new Configuration();
CompressionCodec codec = (CompressionCodec)
ReflectionUtils.newInstance(codecClass, conf);
CompressionOutputStream out = codec.createOutputStream(System.out);
IOUtils.copyBytes(System.in, out, 4096, false);
out.finish();
}
}</pre>此应用需要压缩CompressionCodec的合法全名来作为命令行的第一个参数。我们使用ReflectionUtils来建立一个新的实例，然后获得一个压缩好的System.out。然后我们调用IOUtils上的公共方法copyBytes()将输入复制到经过CompressionOutputStream压缩的输出。最后，调用CompressionOutputStream.的finish()方法，从而向压缩程序表明结束向压缩流写入数据，但不关闭流。我们可以试试以下命令行，使用StreamCompressor程序与GzipCodec压缩字符串“Text”，然后使用gunzip从标准输入对它进行解压缩操作:<br />
<pre class="brush:bash;">% echo "Text" | hadoop StreamCompressor org.apache.hadoop.io.compress.GzipCodec \
| gunzip -
Text</pre>用CompressionCodecFactory方法来推断CompressionCodecs<p></p>
<p>在阅读一个压缩文件时，我们通常可以从其扩展名来推断出它的编码/解码器。以.gz结尾的文件可以用GzipCodec来阅读，如此类推。每个压缩格式的扩展名均以下表所示：</p>
<p><table style="width:99%;" border="1" bordercolor="#000000" cellpadding="2" cellspacing="0"><tbody><tr><td>&nbsp;<span style="font-weight:bold;">压缩格式</span></td>
<td>&nbsp;<span style="font-weight:bold;">工具</span></td>
<td>&nbsp;<span style="font-weight:bold;">算法</span></td>
<td>&nbsp;<span style="font-weight:bold;">文件扩展名</span></td>
<td>&nbsp;<span style="font-weight:bold;">多文件</span></td>
<td>&nbsp;<span style="font-weight:bold;">可分割性</span></td>
</tr>
<tr><td>&nbsp;DEFLATEa</td>
<td>&nbsp;无</td>
<td>&nbsp;DEFLATE</td>
<td>&nbsp;.deflate</td>
<td>&nbsp;不</td>
<td>&nbsp;不</td>
</tr>
<tr><td>&nbsp;gzip</td>
<td>&nbsp;gzip</td>
<td>&nbsp;DEFLATE</td>
<td>&nbsp;.gz</td>
<td>&nbsp;不</td>
<td>&nbsp;不</td>
</tr>
<tr><td>&nbsp;ZIP</td>
<td>&nbsp;zip</td>
<td>&nbsp;DEFLATE</td>
<td>&nbsp;.zip</td>
<td>&nbsp;是</td>
<td>&nbsp;是，在文件范围内</td>
</tr>
<tr><td>&nbsp;bzip2</td>
<td>&nbsp;bzip2</td>
<td>&nbsp;bzip2</td>
<td>&nbsp;.bz2</td>
<td>&nbsp;不</td>
<td>&nbsp;是</td>
</tr>
<tr><td>&nbsp;LZO</td>
<td>&nbsp;lzop</td>
<td>&nbsp;LZO</td>
<td>&nbsp;.lzo</td>
<td>&nbsp;不</td>
<td>&nbsp;不</td>
</tr>
</tbody>
</table>
</p>
<p>CompressionCodecFactory提供了getCodec()方法，从而将文件扩展名映射到相应的CompressionCodec。此方法接受一个Path对象。下面的例子显示了一个应用程序，此程序便使用这个功能来解压缩文件。</p>
<pre class="brush:java;">public class FileDecompressor {
    public static void main(String[] args) throws Exception {
       String uri = args[0];
       Configuration conf = new Configuration();
       FileSystem fs = FileSystem.get(URI.create(uri), conf);
       Path inputPath = new Path(uri);
       CompressionCodecFactory factory = new CompressionCodecFactory(conf);
       CompressionCodec codec = factory.getCodec(inputPath);
       if (codec == null) {
           System.err.println("No codec found for " + uri);
           System.exit(1);
       }
       String outputUri =
       CompressionCodecFactory.removeSuffix(uri, codec.getDefaultExtension());
       InputStream in = null;
       OutputStream out = null;
       try {
           in = codec.createInputStream(fs.open(inputPath));
           out = fs.create(new Path(outputUri));
           IOUtils.copyBytes(in, out, conf);
       } finally {
           IOUtils.closeStream(in);
           IOUtils.closeStream(out);
       }
    }
}</pre>编码/解码器一旦找到，就会被用来去掉文件名后缀生成输出文件名（通过CompressionCodecFactory的静态方法removeSuffix()来实现）。这样，如下调用程序便把一个名为file.gz的文件解压缩为file文件:<br />
<pre class="brush:bash;">% hadoop FileDecompressor file.gz</pre>CompressionCodecFactory从io.compression.codecs配置属性定义的列表中找到编码/解码器。默认情况下，这个列表列出了Hadoop提供的所有编码/解码器(见表4-3)，如果你有一个希望要注册的编码/解码器(如外部托管的LZO编码/解码器)你可以改变这个列表。每个编码/解码器知道它的默认文件扩展名，从而使CompressionCodecFactory可以通过搜索这个列表来找到一个给定的扩展名相匹配的编码/解码器(如果有的话)。<br />
<table style="width:99%;" border="1" bordercolor="#000000" cellpadding="2" cellspacing="0"><tbody><tr><td>&nbsp;<span style="font-weight:bold;">属性名</span></td>
<td>&nbsp;<span style="font-weight:bold;">类型</span></td>
<td>&nbsp;<span style="font-weight:bold;">默认值</span></td>
<td>&nbsp;<span style="font-weight:bold;">描述</span></td>
</tr>
<tr><td>&nbsp;io.compression.codecs</td>
<td>&nbsp;逗号分隔的类名</td>
<td>&nbsp;org.apache.hadoop.io.<br />
&nbsp;compress.DefaultCodec,<br />
&nbsp;org.apache.hadoop.io.<br />
&nbsp;compress.GzipCodec,<br />
&nbsp;org.apache.hadoop.io.<br />
&nbsp;compress.Bzip2Codec</td>
<td>&nbsp;用于压缩/解压的CompressionCodec列表</td>
</tr>
</tbody>
</table>
<p></p>
<p style="text-align:center;">表4-3</p>
<p>本地库</p>
<p>考虑到性能，最好使用一个本地库（native library）来压缩和解压。例如，在一个测试中，使用本地gzip压缩库减少了解压时间50%，压缩时间大约减少了10%(与内置的Java实现相比较)。表4-4展示了Java和本地提供的每个压缩格式的实现。井不是所有的格式都有本地实现(例如bzip2压缩)，而另一些则仅有本地实现（例如LZO）。</p>
<p><table style="width:99%;" border="1" bordercolor="#000000" cellpadding="2" cellspacing="0"><tbody><tr><td>&nbsp;压缩格式</td>
<td>&nbsp;Java实现</td>
<td>&nbsp;本地实现</td>
</tr>
<tr><td>&nbsp;DEFLATE</td>
<td>&nbsp;是</td>
<td>&nbsp;是</td>
</tr>
<tr><td>&nbsp;gzip</td>
<td>&nbsp;是</td>
<td>&nbsp;是</td>
</tr>
<tr><td>&nbsp;bzip2</td>
<td>&nbsp;是</td>
<td>&nbsp;否</td>
</tr>
<tr><td>&nbsp;LZO</td>
<td>&nbsp;否</td>
<td>&nbsp;是</td>
</tr>
</tbody>
</table>
</p>
<p>Hadoop带有预置的32位和64位Linux的本地压缩库，位于库/本地目录。对于其他平台，需要自己编译库，具体请参见Hadoop的维基百科http://wiki.apache.org/hadoop/NativeHadoop。</p>
<p>本地库通过Java系统属性java.library.path来使用。Hadoop的脚本在bin目录中已经设置好这个属性，但如果不使用该脚本，则需要在应用中设置属性。</p>
<p>默认情况下，Hadoop会在它运行的平台上查找本地库，如果发现就自动加载。这意味着不必更改任何配置设置就可以使用本地库。在某些情况下，可能希望禁用本地库，比如在调试压缩相关问题的时候。为此，将属性hadoop.native.lib设置为false，即可确保内置的Java等同内置实现被使用(如果它们可用的话)。</p>
<p>CodecPool(压缩解码池)</p>
<p>如果要用本地库在应用中大量执行压缩解压任务，可以考虑使用CodecPool，从而重用压缩程序和解压缩程序，节约创建这些对象的开销。</p>
<p>下例所用的API只创建了一个很简单的压缩程序，因此不必使用这个池。此应用程序使用一个压缩池程序来压缩从标准输入读入然后将其写入标准愉出的数据：</p>
<pre class="brush:java;">public class PooledStreamCompressor {
    public static void main(String[] args) throws Exception {
    String codecClassname = args[0];
    Class&lt;?&gt; codecClass = Class.forName(codecClassname);
    Configuration conf = new Configuration();
    CompressionCodec codec = (CompressionCodec)
    ReflectionUtils.newInstance(codecClass, conf);
    Compressor compressor = null;
    try {
        compressor = CodecPool.getCompressor(codec);
        CompressionOutputStream out = codec.createOutputStream(System.out, compressor);
        IOUtils.copyBytes(System.in, out, 4096, false);
        out.finish();
    } finally {
        CodecPool.returnCompressor(compressor);
    }
  }
}</pre>我们从缓冲池中为指定的CompressionCodec检索到一个Compressor实例，codec的重载方法createOutputStream()中使用的便是它。通过使用finally块，我们便可确保此压缩程序会被返回缓冲池，即使在复制数据流之间的字节期间抛出了一个IOException。<p></p>
<p>到此，就介绍完了<b>Hadoop</b>中的<b>编码器</b>和<b>解码器</b>，希望对大家有所帮助，下次我将继续为大家分享<b>Hadoop</b>的相关知识，再见！</p>]]></description>
</item>


</channel>
</rss>

