标签归档:google

Google三大核心技术之一BigTable论文中文版

Google’s BigTable 原理 (翻译)

    题记:google 的成功除了一个个出色的创意外,还因为有 Jeff Dean 这样的软件架构天才。

                                                  —— 编者

官方的 Google Reader blog 中有对BigTable 的解释。这是Google 内部开发的一个用来处理大数据量的系统。这种系统适合处理半结构化的数据比如 RSS 数据源。 以下发言  是 Andrew Hitchcock  在 2005 年10月18号 基于: Google 的工程师 Jeff Dean 在华盛顿大学的一次谈话 (Creative Commons License).

 

 

首先,BigTable 从 2004 年初就开始研发了,到现在为止已经用了将近8个月。(2005年2月)目前大概有100个左右的服务使用BigTable,比如: Print,Search History,Maps和 Orkut。根据Google的一贯做法,内部开发的BigTable是为跑在廉价的PC机上设计的。BigTable 让Google在提供新服务时的运行成本降低,最大限度地利用了计算能力。BigTable 是建立在 GFS ,Scheduler ,Lock Service 和 MapReduce 之上的。

每个Table都是一个多维的稀疏图 sparse map。Table 由行和列组成,并且每个存储单元 cell 都有一个时间戳。在不同的时间对同一个存储单元cell有多份拷贝,这样就可以记录数据的变动情况。在他的例子中,行是URLs ,列可以定义一个名字,比如:contents。Contents 字段就可以存储文件的数据。或者列名是:”language”,可以存储一个“EN”的语言代码字符串。

为了管理巨大的Table,把Table根据行分割,这些分割后的数据统称为:Tablets。每个Tablets大概有 100-200 MB,每个机器存储100个左右的 Tablets。底层的架构是:GFS。由于GFS是一种分布式的文件系统,采用Tablets的机制后,可以获得很好的负载均衡。比如:可以把经常响应的表移动到其他空闲机器上,然后快速重建。

Tablets在系统中的存储方式是不可修改的 immutable 的SSTables,一台机器一个日志文件。当系统的内存满后,系统会压缩一些Tablets。由于Jeff在论述这点的时候说的很快,所以我没有时间把听到的都记录下来,因此下面是一个大概的说明:

压缩分为:主要和次要的两部分。次要的压缩仅仅包括几个Tablets,而主要的压缩时关于整个系统的压缩。主压缩有回收硬盘空间的功能。Tablets的位置实际上是存储在几个特殊的BigTable的存储单元cell中。看起来这是一个三层的系统。

客户端有一个指向METAO的Tablets的指针。如果METAO的Tablets被频繁使用,那个这台机器就会放弃其他的tablets专门支持METAO这个Tablets。METAO tablets 保持着所有的META1的tablets的记录。这些tablets中包含着查找tablets的实际位置。(老实说翻译到这里,我也不太明白。)在这个系统中不存在大的瓶颈,因为被频繁调用的数据已经被提前获得并进行了缓存。

    现在我们返回到对 列的说明:列是类似下面的形式: family:optional_qualifier。在他的例子中,行:www.search-analysis.com  也许有列:”contents:其中包含html页面的代码。 “ anchor:cnn.com/news” 中包含着 相对应的url,”anchor:www.search-analysis.com/” 包含着链接的文字部分。列中包含着类型信息。

    (翻译到这里我要插一句,以前我看过一个关于万能数据库的文章,当时很激动,就联系了作者,现在回想起来,或许google的 bigtable 才是更好的方案,切不说分布式的特性,就是这种建华的表结构就很有用处。)

    注意这里说的是列信息,而不是列类型。列的信息是如下信息,一般是:属性/规则。 比如:保存n份数据的拷贝 或者 保存数据n天长等等。当 tablets 重新建立的时候,就运用上面的规则,剔出不符合条件的记录。由于设计上的原因,列本身的创建是很容易的,但是跟列相关的功能确实非常复杂的,比如上文提到的 类型和规则信息等。为了优化读取速度,列的功能被分割然后以组的方式存储在所建索引的机器上。这些被分割后的组作用于 列 ,然后被分割成不同的 SSTables。这种方式可以提高系统的性能,因为小的,频繁读取的列可以被单独存储,和那些大的不经常访问的列隔离开来。

在一台机器上的所有的 tablets 共享一个log,在一个包含1亿的tablets的集群中,这将会导致非常多的文件被打开和写操作。新的log块经常被创建,一般是64M大小,这个GFS的块大小相等。当一个机器down掉后,控制机器就会重新发布他的log块到其他机器上继续进行处理。这台机器重建tablets然后询问控制机器处理结构的存储位置,然后直接对重建后的数据进行处理。

这个系统中有很多冗余数据,因此在系统中大量使用了压缩技术。

    Dean 对压缩的部分说的很快,我没有完全记下来,所以我还是说个大概吧:压缩前先寻找相似的 行,列,和时间 数据。

    他们使用不同版本的: BMDiff 和 Zippy 技术。

   BMDiff 提供给他们非常快的写速度: 100MB/s  1000MB/s 。Zippy 是和 LZW 类似的。Zippy 并不像 LZW 或者 gzip 那样压缩比高,但是他处理速度非常快。

    Dean 还给了一个关于压缩 web 蜘蛛数据的例子。这个例子的蜘蛛 包含 2.1B 的页面,行按照以下的方式命名:“com.cnn.www/index.html:http”.在未压缩前的web page 页面大小是:45.1 TB ,压缩后的大小是:4.2 TB , 只是原来的 9.2%。Links 数据压缩到原来的 13.9% , 链接文本数据压缩到原来的 12.7%。

Google 还有很多没有添加但是已经考虑的功能。

    1.  数据操作表达式,这样可以把脚本发送到客户端来提供修改数据的功能。
2. 多行数据的事物支持。
3.  提高大数据存储单元的效率。
4. BigTable 作为服务运行。
好像:每个服务比如: maps 和 search history 历史搜索记录都有他们自己的集群运行 BigTable。
他们还考虑运行一个全局的 BigTable 系统,但这需要比较公平的分割资源和计算时间。

原文地址:
http://blog.csdn.net/accesine960/archive/2006/02/09/595628.aspx

http://blog.outer-court.com/archive/2005-10-23-n61.html

Google三大核心技术之一Google File System论文中文版

Google文件系统 (Google File System)

GFS是一个可扩展的分布式文件系统,用于大型的、分布式的、对大量数据进行访问的应用。它运行于廉价的普通硬件上,但可以提供容错功能。它可以给大量的用户提供总体性能较高的服务。
1、设计概览
(1)设计想定
GFS与过去的分布式文件系统有很多相同的目标,但GFS的设计受到了当前及预期的应用方面的工作量及技术环境的驱动,这反映了它与早期的文件系统明显不同的设想。这就需要对传统的选择进行重新检验并进行完全不同的设计观点的探索。
GFS与以往的文件系统的不同的观点如下:
1、部件错误不再被当作异常,而是将其作为常见的情况加以处理。因为文件系统由成百上千个用于存储的机器构成,而这些机器是由廉价的普通部件组成并被大量的客户机访问。部件的数量和质量使得一些机器随时都有可能无法工作并且有一部分还可能无法恢复。所以实时地监控、错误检测、容错、自动恢复对系统来说必不可少。
2、按照传统的标准,文件都非常大。长度达几个GB的文件是很平常的。每个文件通常包含很多应用对象。当经常要处理快速增长的、包含数以万计的对象、长度达TB的数据集时,我们很难管理成千上万的KB规模的文件块,即使底层文件系统提供支持。因此,设计中操作的参数、块的大小必须要重新考虑。对大型的文件的管理一定要能做到高效,对小型的文件也必须支持,但不必优化。
3、大部分文件的更新是通过添加新数据完成的,而不是改变已存在的数据。在一个文件中随机的操作在实践中几乎不存在。一旦写完,文件就只可读,很多数据都有这些特性。一些数据可能组成一个大仓库以供数据分析程序扫描。有些是运行中的程序连续产生的数据流。有些是档案性质的数据,有些是在某个机器上产生、在另外一个机器上处理的中间数据。由于这些对大型文件的访问方式,添加操作成为性能优化和原子性保证的焦点。而在客户机中缓存数据块则失去了吸引力。
4、工作量主要由两种读操作构成:对大量数据的流方式的读操作和对少量数据的随机方式的读操作。在前一种读操作中,可能要读几百KB,通常达 1MB和更多。来自同一个客户的连续操作通常会读文件的一个连续的区域。随机的读操作通常在一个随机的偏移处读几个KB。性能敏感的应用程序通常将对少量数据的读操作进行分类并进行批处理以使得读操作稳定地向前推进,而不要让它来来回回的读。
5、工作量还包含许多对大量数据进行的、连续的、向文件添加数据的写操作。所写的数据的规模和读相似。一旦写完,文件很少改动。在随机位置对少量数据的写操作也支持,但不必非常高效。
6、系统必须高效地实现定义完好的大量客户同时向同一个文件的添加操作的语义。
(2)系统接口
GFS提供了一个相似地文件系统界面,虽然它没有向POSIX那样实现标准的API。文件在目录中按层次组织起来并由路径名标识。
(3)体系结构:
一个GFS集群由一个master和大量的chunkserver构成,并被许多客户(Client)访问。如图1所示。Master和 chunkserver通常是运行用户层服务进程的Linux机器。只要资源和可靠性允许,chunkserver和client可以运行在同一个机器上。
文件被分成固定大小的块。每个块由一个不变的、全局唯一的64位的chunk-handle标识,chunk-handle是在块创建时由 master分配的。ChunkServer将块当作Linux文件存储在本地磁盘并可以读和写由chunk-handle和位区间指定的数据。出于可靠性考虑,每一个块被复制到多个chunkserver上。默认情况下,保存3个副本,但这可以由用户指定。
Master维护文件系统所以的元数据(metadata),包括名字空间、访问控制信息、从文件到块的映射以及块的当前位置。它也控制系统范围的活动,如块租约(lease)管理,孤儿块的垃圾收集,chunkserver间的块迁移。Master定期通过HeartBeat消息与每一个 chunkserver通信,给chunkserver传递指令并收集它的状态。
与每个应用相联的GFS客户代码实现了文件系统的API并与master和chunkserver通信以代表应用程序读和写数据。客户与master的交换只限于对元数据(metadata)的操作,所有数据方面的通信都直接和chunkserver联系。
客户和chunkserver都不缓存文件数据。因为用户缓存的益处微乎其微,这是由于数据太多或工作集太大而无法缓存。不缓存数据简化了客户程序和整个系统,因为不必考虑缓存的一致性问题。但用户缓存元数据(metadata)。Chunkserver也不必缓存文件,因为块时作为本地文件存储的。
(4)单master。
只有一个master也极大的简化了设计并使得master可以根据全局情况作出先进的块放置和复制决定。但是我们必须要将master对读和写的参与减至最少,这样它才不会成为系统的瓶颈。Client从来不会从master读和写文件数据。Client只是询问master它应该和哪个 chunkserver联系。Client在一段限定的时间内将这些信息缓存,在后续的操作中Client直接和chunkserver交互。
以图1解释一下一个简单的读操作的交互。
1、client使用固定的块大小将应用程序指定的文件名和字节偏移转换成文件的一个块索引(chunk index)。
2、给master发送一个包含文件名和块索引的请求。
3、master回应对应的chunk handle和副本的位置(多个副本)。
4、client以文件名和块索引为键缓存这些信息。(handle和副本的位置)。
5、Client 向其中一个副本发送一个请求,很可能是最近的一个副本。请求指定了chunk handle(chunkserver以chunk handle标识chunk)和块内的一个字节区间。
6、除非缓存的信息不再有效(cache for a limited time)或文件被重新打开,否则以后对同一个块的读操作不再需要client和master间的交互。
通常Client可以在一个请求中询问多个chunk的地址,而master也可以很快回应这些请求。
(5)块规模:
块规模是设计中的一个关键参数。我们选择的是64MB,这比一般的文件系统的块规模要大的多。每个块的副本作为一个普通的Linux文件存储,在需要的时候可以扩展。
块规模较大的好处有:
1、减少client和master之间的交互。因为读写同一个块只是要在开始时向master请求块位置信息。对于读写大型文件这种减少尤为重要。即使对于访问少量数据的随机读操作也可以很方便的为一个规模达几个TB的工作集缓缓存块位置信息。
2、Client在一个给定的块上很可能执行多个操作,和一个chunkserver保持较长时间的TCP连接可以减少网络负载。
3、这减少了master上保存的元数据(metadata)的规模,从而使得可以将metadata放在内存中。这又会带来一些别的好处。
不利的一面:
一个小文件可能只包含一个块,如果很多Client访问改文件的话,存储这些块的chunkserver将成为访问的热点。但在实际应用中,应用程序通常顺序地读包含多个块的文件,所以这不是一个主要问题。
(6)元数据(metadata):
master存储了三中类型的metadata:文件的名字空间和块的名字空间,从文件到块的映射,块的副本的位置。所有的metadata都放在内存中。前两种类型的metadata通过向操作日志登记修改而保持不变,操作日志存储在master的本地磁盘并在几个远程机器上留有副本。使用日志使得我们可以很简单地、可靠地更新master的状态,即使在master崩溃的情况下也不会有不一致的问题。相反,mater在每次启动以及当有 chuankserver加入的时候询问每个chunkserver的所拥有的块的情况。
A、内存数据结构:
因为metadata存储在内存中,所以master的操作很快。进一步,master可以轻易而且高效地定期在后台扫描它的整个状态。这种定期地扫描被用于实现块垃圾收集、chunkserver出现故障时的副本复制、为平衡负载和磁盘空间而进行的块迁移。
这种方法的一个潜在的问题就是块的数量也即整个系统的容量是否受限与master的内存。实际上,这并不是一个严重的问题。Master为每个 64MB的块维护的metadata不足64个字节。除了最后一块,文件所有的块都是满的。类似的,每个文件的名字空间数据也不足64个字节,因为文件名是以一种事先确定的压缩方式存储的.如果要支持更大的文件系统,那么增加一些内存的方法对于我们将元数据(metadata)保存在内存种所获得的简单性、可靠性、高性能和灵活性来说,这只是一个很小的代价。
B、块位置:
master并不为chunkserver所拥有的块的副本的保存一个不变的记录。它在启动时通过简单的查询来获得这些信息。Master可以保持这些信息的更新,因为它控制所有块的放置并通过HeartBeat消息来监控chunkserver的状态。
这样做的好处:因为chunkserver可能加入或离开集群、改变路径名、崩溃、重启等,一个集群重有成百个server,这些事件经常发生,这种方法就排除了master与chunkserver之间的同步问题。
另一个原因是:只有chunkserver才能确定它自己到底有哪些块,由于错误,chunkserver中的一些块可能会很自然的消失,这样在master中就没有必要为此保存一个不变的记录。
C、操作日志:
操作日志包含了对metadata所作的修改的历史记录。它作为逻辑时间线定义了并发操作的执行顺序。文件、块以及它们的版本号都由它们被创建时的逻辑时间而唯一地、永久地被标识。
操作日志是如此的重要,我们必须要将它可靠地保存起来,并且只有在metadata的改变固定下来之后才将变化呈现给用户。所以我们将操作日志复制到数个远程的机器上,并且只有在将相应的日志记录写到本地和远程的磁盘上之后才回答用户的请求。
Master可以用操作日志来恢复它的文件系统的状态。为了将启动时间减至最小,日志就必须要比较小。每当日志的长度增长到超过一定的规模后,master就要检查它的状态,它可以从本地磁盘装入最近的检查点来恢复状态。
创建一个检查点比较费时,master的内部状态是以一种在创建一个检查点时并不耽误即将到来的修改操作的方式来组织的。Master切换到一个新的日子文件并在一个单独的线程中创建检查点。这个新的检查点记录了切换前所有的修改。在一个有数十万文件的集群中用一分钟左右就能完成。创建完后,将它写入本地和远程的磁盘。
(7)数据完整性
名字空间的修改必须是原子性的,它们只能有master处理:名字空间锁保证了操作的原子性和正确性,而master的操作日志在全局范围内定义了这些操作的顺序。
文件区间的状态在修改之后依赖于修改的类型,不论操作成功还是失败,也不论是不是并发操作。如果不论从哪个副本上读,所有的客户都看到同样的数据,那么文件的这个区域就是一致的。如果文件的区域是一致的并且用户可以看到修改操作所写的数据,那么它就是已定义的。如果修改是在没有并发写操作的影响下完成的,那么受影响的区域是已定义的,所有的client都能看到写的内容。成功的并发写操作是未定义但却是一致的。失败的修改将使区间处于不一致的状态。
Write操作在应用程序指定的偏移处写入数据,而record append操作使得数据(记录)即使在有并发修改操作的情况下也至少原子性的被加到GFS指定的偏移处,偏移地址被返回给用户。
在一系列成功的修改操作后,最后的修改操作保证文件区域是已定义的。GFS通过对所有的副本执行同样顺序的修改操作并且使用块版本号检测过时的副本(由于chunkserver退出而导致丢失修改)来做到这一点。
因为用户缓存了会位置信息,所以在更新缓存之前有可能从一个过时的副本中读取数据。但这有缓存的截止时间和文件的重新打开而受到限制。
在修改操作成功后,部件故障仍可以是数据受到破坏。GFS通过master和chunkserver间定期的handshake,借助校验和来检测对数据的破坏。一旦检测到,就从一个有效的副本尽快重新存储。只有在GFS检测前,所有的副本都失效,这个块才会丢失。
2、系统交互
(1)租约(lease)和修改顺序:
(2)数据流
我们的目标是充分利用每个机器的网络带宽,避免网络瓶颈和延迟
为了有效的利用网络,我们将数据流和控制流分离。数据是以流水线的方式在选定的chunkerserver链上线性的传递的。每个机器的整个对外带宽都被用作传递数据。为避免瓶颈,每个机器在收到数据后,将它收到数据尽快传递给离它最近的机器。
(3)原子性的record Append:
GFS提供了一个原子性的添加操作:record append。在传统的写操作中,client指定被写数据的偏移位置,向同一个区间的并发的写操作是不连续的:区间有可能包含来自多个client的数据碎片。在record append中, client只是指定数据。GFS在其选定的偏移出将数据至少原子性的加入文件一次,并将偏移返回给client。
在分布式的应用中,不同机器上的许多client可能会同时向一个文件执行添加操作,添加操作被频繁使用。如果用传统的write操作,可能需要额外的、复杂的、开销较大的同步,例如通过分布式锁管理。在我们的工作量中,这些文件通常以多个生产者单个消费者队列的方式或包含从多个不同 client的综合结果。
Record append和前面讲的write操作的控制流差不多,只是在primary上多了一些逻辑判断。首先,client将数据发送到文件最后一块的所有副本上。然后向primary发送请求。Primary检查添加操作是否会导致该块超过最大的规模(64M)。如果这样,它将该块扩充到最大规模,并告诉其它副本做同样的事,同时通知client该操作需要在下一个块上重新尝试。如果记录满足最大规模的要求,primary就会将数据添加到它的副本上,并告诉其它的副本在在同样的偏移处写数据,最后primary向client报告写操作成功。如果在任何一个副本上record append操作失败,client将重新尝试该操作。这时候,同一个块的副本可能包含不同的数据,因为有的可能复制了全部的数据,有的可能只复制了部分。GFS不能保证所有的副本每个字节都是一样的。它只保证每个数据作为一个原子单元被写过至少一次。这个是这样得出的:操作要是成功,数据必须在所有的副本上的同样的偏移处被写过。进一步,从这以后,所有的副本至少和记录一样长,所以后续的记录将被指定到更高的偏移处或者一个不同的块上,即使另一个副本成了primary。根据一致性保证,成功的record append操作的区间是已定义的。而受到干扰的区间是不一致的。
(4)快照(snapshot)
快照操作几乎在瞬间构造一个文件和目录树的副本,同时将正在进行的其他修改操作对它的影响减至最小。
我们使用copy-on-write技术来实现snapshot。当master受到一个snapshot请求时,它首先将要snapshot的文件上块上的lease。这使得任何一个向这些块写数据的操作都必须和master交互以找到拥有lease的副本。这就给master一个创建这个块的副本的机会。
副本被撤销或终止后,master在磁盘上登记执行的操作,然后复制源文件或目录树的metadata以对它的内存状态实施登记的操作。这个新创建的snapshot文件和源文件(其metadata)指向相同的块(chunk)。
Snapshot之后,客户第一次向chunk c写的时候,它发一个请求给master以找到拥有lease的副本。Master注意到chunk c的引用记数比1大,它延迟对用户的响应,选择一个chunk handle C’,然后要求每一有chunk c的副本的chunkserver创建一个块C’。每个chunkserver在本地创建chunk C’避免了网络开销。从这以后和对别的块的操作没有什么区别。
3、MASTER操作
MASTER执行所有名字空间的操作,除此之外,他还在系统范围管理数据块的复制:决定数据块的放置方案,产生新数据块并将其备份,和其他系统范围的操作协同来确保数据备份的完整性,在所有的数据块服务器之间平衡负载并收回没有使用的存储空间。
3.1 名字空间管理和加锁
与传统文件系统不同的是,GFS没有与每个目录相关的能列出其所有文件的数据结构,它也不支持别名(unix中的硬连接或符号连接),不管是对文件或是目录。GFS的名字空间逻辑上是从文件元数据到路径名映射的一个查用表。
MASTER在执行某个操作前都要获得一系列锁,例如,它要对/d1/d2…/dn/leaf执行操作,则它必须获得/d1,/d1/d2,…, /d1/d2/…/dn的读锁,/d1/d2…/dn/leaf的读锁或写锁(其中leaf可以使文件也可以是目录)。MASTER操作的并行性和数据的一致性就是通过这些锁来实现的。
3.2 备份存储放置策略
一个GFS集群文件系统可能是多层分布的。一般情况下是成千上万个文件块服务器分布于不同的机架上,而这些文件块服务器又被分布于不同机架上的客户来访问。因此,不同机架上的两台机器之间的通信可能通过一个或多个交换机。数据块冗余配置策略要达到连个目的:最大的数据可靠性和可用性,最大的网络带宽利用率。因此,如果仅仅把数据的拷贝置于不同的机器上很难满足这两个要求,必须在不同的机架上进行数据备份。这样即使整个机架被毁或是掉线,也能确保数据的正常使用。这也使数据传输,尤其是读数据,可以充分利用带宽,访问到多个机架,而写操作,则不得不涉及到更多的机架。
3.3 产生、重复制、重平衡数据块
当MASTER产生新的数据块时,如何放置新数据块,要考虑如下几个因素:(1)尽量放置在磁盘利用率低的数据块服务器上,这样,慢慢地各服务器的磁盘利用率就会达到平衡。(2)尽量控制在一个服务器上的“新创建”的次数。(3)由于上一小节讨论的原因,我们需要把数据块放置于不同的机架上。
MASTER在可用的数据块备份低于用户设定的数目时需要进行重复制。这种情况源于多种原因:服务器不可用,数据被破坏,磁盘被破坏,或者备份数目被修改。每个被需要重复制的数据块的优先级根据以下几项确定:第一是现在的数目距目标的距离,对于能阻塞用户程序的数据块,我们也提高它的优先级。最后, MASTER按照产生数据块的原则复制数据块,并把它们放到不同的机架内的服务器上。
MASTER周期性的平衡各服务器上的负载:它检查chunk分布和负载平衡,通过这种方式来填充一个新的服务器而不是把其他的内容统统放置到它上面带来大量的写数据。数据块放置的原则与上面讨论的相同,此外,MASTER还决定那些数据块要被移除,原则上他会清除那些空闲空间低于平均值的那些服务器。
3.4 垃圾收集
在一个文件被删除之后,GFS并不立即收回磁盘空间,而是等到垃圾收集程序在文件和数据块级的的检查中收回。
当一个文件被应用程序删除之后,MASTER会立即记录下这些变化,但文件所占用的资源却不会被立即收回,而是重新给文件命了一个隐藏的名字,并附上了删除的时间戳。在MASTER定期检查名字空间时,它删除超过三天(可以设定)的隐藏的文件。在此之前,可以以一个新的名字来读文件,还可以以前的名字恢复。当隐藏的文件在名字空间中被删除以后,它在内存中的元数据即被擦除,这就有效地切断了他和所有数据块的联系。
在一个相似的定期的名字空间检查中,MASTER确认孤儿数据块(不属于任何文件)并擦除他的元数据,在和MASTER的心跳信息交换中,每个服务器报告他所拥有的数据块,MASTER返回元数据不在内存的数据块,服务器即可以删除这些数据块。
3.5 过时数据的探测
在数据更新时如果服务器停机了,那么他所保存的数据备份就会过时。对每个数据块,MASTER设置了一个版本号来区别更新过的数据块和过时的数据块。
当MASTER授权一个新的lease时,他会增加数据块的版本号并会通知更新数据备份。MASTER和备份都会记录下当前的版本号,如果一个备份当时不可用,那么他的版本号不可能提高,当ChunkServer重新启动并向MASTER报告他的数据块集时,MASTER就会发现过时的数据。
MASTER在定期的垃圾收集程序中清除过时的备份,在此以前,处于效率考虑,在各客户及英大使,他会认为根本不存在过时的数据。作为另一个安全措施, MASTER在给客户及关于数据块的应答或是另外一个读取数据的服务器数据是都会带上版本信息,在操作前客户机和服务器会验证版本信息以确保得到的是最新的数据。
4、容错和诊断
4.1 高可靠性
4.1.1 快速恢复
不管如何终止服务,MASTER和数据块服务器都会在几秒钟内恢复状态和运行。实际上,我们不对正常终止和不正常终止进行区分,服务器进程都会被切断而终止。客户机和其他的服务器会经历一个小小的中断,然后它们的特定请求超时,重新连接重启的服务器,重新请求。
4.1.2 数据块备份
如上文所讨论的,每个数据块都会被备份到放到不同机架上的不同服务器上。对不同的名字空间,用户可以设置不同的备份级别。在数据块服务器掉线或是数据被破坏时,MASTER会按照需要来复制数据块。
4.1.3 MASTER备份
为确保可靠性,MASTER的状态、操作记录和检查点都在多台机器上进行了备份。一个操作只有在数据块服务器硬盘上刷新并被记录在MASTER和其备份的上之后才算是成功的。如果MASTER或是硬盘失败,系统监视器会发现并通过改变域名启动它的一个备份机,而客户机则仅仅是使用规范的名称来访问,并不会发现MASTER的改变。
4.2 数据完整性
每个数据块服务器都利用校验和来检验存储数据的完整性。原因:每个服务器随时都有发生崩溃的可能性,并且在两个服务器间比较数据块也是不现实的,同时,在两台服务器间拷贝数据并不能保证数据的一致性。
每个Chunk按64kB的大小分成块,每个块有32位的校验和,校验和和日志存储在一起,和用户数据分开。
在读数据时,服务器首先检查与被读内容相关部分的校验和,因此,服务器不会传播错误的数据。如果所检查的内容和校验和不符,服务器就会给数据请求者返回一个错误的信息,并把这个情况报告给MASTER。客户机就会读其他的服务器来获取数据,而MASTER则会从其他的拷贝来复制数据,等到一个新的拷贝完成时,MASTER就会通知报告错误的服务器删除出错的数据块。
附加写数据时的校验和计算优化了,因为这是主要的写操作。我们只是更新增加部分的校验和,即使末尾部分的校验和数据已被损坏而我们没有检查出来,新的校验和与数据会不相符,这种冲突在下次使用时将会被检查出来。
相反,如果是覆盖现有数据的写,在写以前,我们必须检查第一和最后一个数据块,然后才能执行写操作,最后计算和记录校验和。如果我们在覆盖以前不先检查首位数据块,计算出的校验和则会因为没被覆盖的数据而产生错误。
在空闲时间,服务器会检查不活跃的数据块的校验和,这样可以检查出不经常读的数据的错误。一旦错误被检查出来,服务器会拷贝一个正确的数据块来代替错误的。
4.3 诊断工具
广泛而细致的诊断日志以微小的代价换取了在问题隔离、诊断、性能分析方面起到了重大的作用。GFS服务器用日志来记录显著的事件(例如服务器停机和启动)和远程的应答。远程日志记录机器之间的请求和应答,通过收集不同机器上的日志记录,并对它们进行分析恢复,我们可以完整地重现活动的场景,并用此来进行错误分析。
6 测量
6.1 测试环境
一台主控机,两台主控机备份,16台数据块服务器,16台客户机。
每台机器:2块PIII1.4G处理器,2G内存,2块80G5400rpm的硬盘,1块100Mbps全双工网卡
19台服务器连接到一个HP2524交换机上,16台客户机俩接到领外一台交换机上,两台交换机通过1G的链路相连。

Google三大核心技术之一mapreduce论文中文版

MapReduce:超大机群上的简单数据处理
 
摘要
MapReduce是一个编程模型,和处理,产生大数据集的相关实现.用户指定一个map函数处理一个key/value对,从而产生中间的key/value对集.然后再指定一个reduce函数合并所有的具有相同中间key的中间value.下面将列举许多可以用这个模型来表示的现实世界的工作.
以这种方式写的程序能自动的在大规模的普通机器上实现并行化.这个运行时系统关心这些细节:分割输入数据,在机群上的调度,机器的错误处理,管理机器之间必要的通信.这样就可以让那些没有并行分布式处理系统经验的程序员利用大量分布式系统的资源.
我们的MapReduce实现运行在规模可以灵活调整的由普通机器组成的机群上,一个典型的MapReduce计算处理几千台机器上的以TB计算的数据.程序员发现这个系统非常好用:已经实现了数以百计的MapReduce程序,每天在Google的机群上都有1000多个MapReduce程序在执行.
1.介绍
在过去的5年里,作者和Google的许多人已经实现了数以百计的为专门目的而写的计算来处理大量的原始数据,比如,爬行的文档,Web请求日志,等等.为了计算各种类型的派生数据,比如,倒排索引,Web文档的图结构的各种表示,每个主机上爬行的页面数量的概要,每天被请求数量最多的集合,等等.很多这样的计算在概念上很容易理解.然而,输入的数据量很大,并且只有计算被分布在成百上千的机器上才能在可以接受的时间内完成.怎样并行计算,分发数据,处理错误,所有这些问题综合在一起,使得原本很简介的计算,因为要大量的复杂代码来处理这些问题,而变得让人难以处理.
作为对这个复杂性的回应,我们设计一个新的抽象模型,它让我们表示我们将要执行的简单计算,而隐藏并行化,容错,数据分布,负载均衡的那些杂乱的细节,在一个库里.我们的抽象模型的灵感来自Lisp和许多其他函数语言的map和reduce的原始表示.我们认识到我们的许多计算都包含这样的操作:在我们输入数据的逻辑记录上应用map操作,来计算出一个中间key/value对集,在所有具有相同key的value上应用reduce操作,来适当的合并派生的数据.功能模型的使用,再结合用户指定的map和reduce操作,让我们可以非常容易的实现大规模并行化计算,和使用再次执行作为初级机制来实现容错.
这个工作的主要贡献是通过简单有力的接口来实现自动的并行化和大规模分布式计算,结合这个接口的实现来在大量普通的PC机上实现高性能计算.
第二部分描述基本的编程模型,并且给一些例子.第三部分描述符合我们的基于集群的计算环境的MapReduce的接口的实现.第四部分描述我们觉得编程模型中一些有用的技巧.第五部分对于各种不同的任务,测量我们实现的性能.第六部分探究在Google内部使用MapReduce作为基础来重写我们的索引系统产品.第七部分讨论相关的,和未来的工作.
2.编程模型
计算利用一个输入key/value对集,来产生一个输出key/value对集.MapReduce库的用户用两个函数表达这个计算:map和reduce.
用户自定义的map函数,接受一个输入对,然后产生一个中间key/value对集.MapReduce库把所有具有相同中间key I的中间value聚合在一起,然后把它们传递给reduce函数.
用户自定义的reduce函数,接受一个中间key I和相关的一个value集.它合并这些value,形成一个比较小的value集.一般的,每次reduce调用只产生0或1个输出value.通过一个迭代器把中间value提供给用户自定义的reduce函数.这样可以使我们根据内存来控制value列表的大小.
2.1 实例
考虑这个问题:计算在一个大的文档集合中每个词出现的次数.用户将写和下面类似的伪代码:
map(String key,String value):
 //key:文档的名字
 //value:文档的内容
 for each word w in value:
    EmitIntermediate(w,”1″);
reduce(String key,Iterator values):
//key:一个词
//values:一个计数列表
 int result=0;
 for each v in values:
   result+=ParseInt(v);
 Emit(AsString(resut));
map函数产生每个词和这个词的出现次数(在这个简单的例子里就是1).reduce函数把产生的每一个特定的词的计数加在一起.
另外,用户用输入输出文件的名字和可选的调节参数来填充一个mapreduce规范对象.用户然后调用MapReduce函数,并把规范对象传递给它.用户的代码和MapReduce库链接在一起(用C++实现).附录A包含这个实例的全部文本.
2.2类型
即使前面的伪代码写成了字符串输入和输出的term格式,但是概念上用户写的map和reduce函数有关联的类型:
 map(k1,v1) ->list(k2,v2)
 reduce(k2,list(v2)) ->list(v2)
例如,输入的key,value和输出的key,value的域不同.此外,中间key,value和输出key,values的域相同.
我们的C++实现传递字符串来和用户自定义的函数交互,并把它留给用户的代码,来在字符串和适当的类型间进行转换.
2.3更多实例
这里有一些让人感兴趣的简单程序,可以容易的用MapReduce计算来表示.
分布式的Grep(UNIX工具程序, 可做文件内的字符串查找):如果输入行匹配给定的样式,map函数就输出这一行.reduce函数就是把中间数据复制到输出.
计算URL访问频率:map函数处理web页面请求的记录,输出(URL,1).reduce函数把相同URL的value都加起来,产生一个(URL,记录总数)的对.
倒转网络链接图:map函数为每个链接输出(目标,源)对,一个URL叫做目标,包含这个URL的页面叫做源.reduce函数根据给定的相关目标URLs连接所有的源URLs形成一个列表,产生(目标,源列表)对.
每个主机的术语向量:一个术语向量用一个(词,频率)列表来概述出现在一个文档或一个文档集中的最重要的一些词.map函数为每一个输入文档产生一个(主机名,术语向量)对(主机名来自文档的URL).reduce函数接收给定主机的所有文档的术语向量.它把这些术语向量加在一起,丢弃低频的术语,然后产生一个最终的(主机名,术语向量)对.
倒排索引:map函数分析每个文档,然后产生一个(词,文档号)对的序列.reduce函数接受一个给定词的所有对,排序相应的文档IDs,并且产生一个(词,文档ID列表)对.所有的输出对集形成一个简单的倒排索引.它可以简单的增加跟踪词位置的计算.
分布式排序:map函数从每个记录提取key,并且产生一个(key,record)对.reduce函数不改变任何的对.这个计算依赖分割工具(在4.1描述)和排序属性(在4.2描述).
3实现
MapReduce接口可能有许多不同的实现.根据环境进行正确的选择.例如,一个实现对一个共享内存较小的机器是合适的,另外的适合一个大NUMA的多处理器的机器,而有的适合一个更大的网络机器的集合.
这部分描述一个在Google广泛使用的计算环境的实现:用交换机连接的普通PC机的大机群.我们的环境是:
1.Linux操作系统,双处理器,2-4GB内存的机器.
2.普通的网络硬件,每个机器的带宽或者是百兆或者千兆,但是平均小于全部带宽的一半.
3.因为一个机群包含成百上千的机器,所有机器会经常出现问题.
4.存储用直接连到每个机器上的廉价IDE硬盘.一个从内部文件系统发展起来的分布式文件系统被用来管理存储在这些磁盘上的数据.文件系统用复制的方式在不可靠的硬件上来保证可靠性和有效性.
5.用户提交工作给调度系统.每个工作包含一个任务集,每个工作被调度者映射到机群中一个可用的机器集上.
3.1执行预览
通过自动分割输入数据成一个有M个split的集,map调用被分布到多台机器上.输入的split能够在不同的机器上被并行处理.通过用分割函数分割中间key,来形成R个片(例如,hash(key) mod R),reduce调用被分布到多台机器上.分割数量(R)和分割函数由用户来指定.
图1显示了我们实现的MapReduce操作的全部流程.当用户的程序调用MapReduce的函数的时候,将发生下面的一系列动作(下面的数字和图1中的数字标签相对应):
    1.在用户程序里的MapReduce库首先分割输入文件成M个片,每个片的大小一般从 16到64MB(用户可以通过可选的参数来控制).然后在机群中开始大量的拷贝程序.
      2.这些程序拷贝中的一个是master,其他的都是由master分配任务的worker.有M 个map任务和R个reduce任务将被分配.管理者分配一个map任务或reduce任务给一个空闲的worker.
3.一个被分配了map任务的worker读取相关输入split的内容.它从输入数据中分析出key/value对,然后把key/value对传递给用户自定义的map函数.由map函数产生的中间key/value对被缓存在内存中.
4.缓存在内存中的key/value对被周期性的写入到本地磁盘上,通过分割函数把它们写入R个区域.在本地磁盘上的缓存对的位置被传送给master,master负责把这些位置传送给reduce worker.
5.当一个reduce worker得到master的位置通知的时候,它使用远程过程调用来从map worker的磁盘上读取缓存的数据.当reduce worker读取了所有的中间数据后,它通过排序使具有相同key的内容聚合在一起.因为许多不同的key映射到相同的reduce任务,所以排序是必须的.如果中间数据比内存还大,那么还需要一个外部排序.
      6.reduce worker迭代排过序的中间数据,对于遇到的每一个唯一的中间key,它把key和相关的中间value集传递给用户自定义的reduce函数.reduce函数的输出被添加到这个reduce分割的最终的输出文件中.
7.当所有的map和reduce任务都完成了,管理者唤醒用户程序.在这个时候,在用户程序里的MapReduce调用返回到用户代码.
在成功完成之后,mapreduce执行的输出存放在R个输出文件中(每一个reduce任务产生一个由用户指定名字的文件).一般,用户不需要合并这R个输出文件成一个文件–他们经常把这些文件当作一个输入传递给其他的MapReduce调用,或者在可以处理多个分割文件的分布式应用中使用他们.
3.2master数据结构
master保持一些数据结构.它为每一个map和reduce任务存储它们的状态(空闲,工作中,完成),和worker机器(非空闲任务的机器)的标识.
master就像一个管道,通过它,中间文件区域的位置从map任务传递到reduce任务.因此,对于每个完成的map任务,master存储由map任务产生的R个中间文件区域的大小和位置.当map任务完成的时候,位置和大小的更新信息被接受.这些信息被逐步增加的传递给那些正在工作的reduce任务.
3.3容错
因为MapReduce库被设计用来使用成百上千的机器来帮助处理非常大规模的数据,所以这个库必须要能很好的处理机器故障.
worker故障
master周期性的ping每个worker.如果master在一个确定的时间段内没有收到worker返回的信息,那么它将把这个worker标记成失效.因为每一个由这个失效的worker完成的map任务被重新设置成它初始的空闲状态,所以它可以被安排给其他的worker.同样的,每一个在失败的worker上正在运行的map或reduce任务,也被重新设置成空闲状态,并且将被重新调度.
在一个失败机器上已经完成的map任务将被再次执行,因为它的输出存储在它的磁盘上,所以不可访问.已经完成的reduce任务将不会再次执行,因为它的输出存储在全局文件系统中.
当一个map任务首先被worker A执行之后,又被B执行了(因为A失效了),重新执行这个情况被通知给所有执行reduce任务的worker.任何还没有从A读数据的reduce任务将从worker B读取数据.
MapReduce可以处理大规模worker失败的情况.例如,在一个MapReduce操作期间,在正在运行的机群上进行网络维护引起80台机器在几分钟内不可访问了,MapReduce master只是简单的再次执行已经被不可访问的worker完成的工作,继续执行,最终完成这个MapReduce操作.
master失败
可以很容易的让管理者周期的写入上面描述的数据结构的checkpoints.如果这个master任务失效了,可以从上次最后一个checkpoint开始启动另一个master进程.然而,因为只有一个master,所以它的失败是比较麻烦的,因此我们现在的实现是,如果master失败,就中止MapReduce计算.客户可以检查这个状态,并且可以根据需要重新执行MapReduce操作.
在错误面前的处理机制
当用户提供的map和reduce操作对它的输出值是确定的函数时,我们的分布式实现产生,和全部程序没有错误的顺序执行一样,相同的输出.
我们依赖对map和reduce任务的输出进行原子提交来完成这个性质.每个工作中的任务把它的输出写到私有临时文件中.一个reduce任务产生一个这样的文件,而一个map任务产生R个这样的文件(一个reduce任务对应一个文件).当一个map任务完成的时候,worker发送一个消息给master,在这个消息中包含这R个临时文件的名字.如果master从一个已经完成的map任务再次收到一个完成的消息,它将忽略这个消息.否则,它在master的数据结构里记录这R个文件的名字.
当一个reduce任务完成的时候,这个reduce worker原子的把临时文件重命名成最终的输出文件.如果相同的reduce任务在多个机器上执行,多个重命名调用将被执行,并产生相同的输出文件.我们依赖由底层文件系统提供的原子重命名操作来保证,最终的文件系统状态仅仅包含一个reduce任务产生的数据.
我们的map和reduce操作大部分都是确定的,并且我们的处理机制等价于一个顺序的执行的这个事实,使得程序员可以很容易的理解程序的行为.当map或/和reduce操作是不确定的时候,我们提供虽然比较弱但是合理的处理机制.当在一个非确定操作的前面,一个reduce任务R1的输出等价于一个非确定顺序程序执行产生的输出.然而,一个不同的reduce任务R2的输出也许符合一个不同的非确定顺序程序执行产生的输出.
考虑map任务M和reduce任务R1,R2的情况.我们设定e(Ri)为已经提交的Ri的执行(有且仅有一个这样的执行).这个比较弱的语义出现,因为e(R1)也许已经读取了由M的执行产生的输出,而e(R2)也许已经读取了由M的不同执行产生的输出.
3.4存储位置
在我们的计算机环境里,网络带宽是一个相当缺乏的资源.我们利用把输入数据(由GFS管理)存储在机器的本地磁盘上来保存网络带宽.GFS把每个文件分成64MB的一些块,然后每个块的几个拷贝存储在不同的机器上(一般是3个拷贝).MapReduce的master考虑输入文件的位置信息,并且努力在一个包含相关输入数据的机器上安排一个map任务.如果这样做失败了,它尝试在那个任务的输入数据的附近安排一个map任务(例如,分配到一个和包含输入数据块在一个switch里的worker机器上执行).当运行巨大的MapReduce操作在一个机群中的一部分机器上的时候,大部分输入数据在本地被读取,从而不消耗网络带宽.
3.5任务粒度
象上面描述的那样,我们细分map阶段成M个片,reduce阶段成R个片.M和R应当比worker机器的数量大许多.每个worker执行许多不同的工作来提高动态负载均衡,也可以加速从一个worker失效中的恢复,这个机器上的许多已经完成的map任务可以被分配到所有其他的worker机器上.
在我们的实现里,M和R的范围是有大小限制的,因为master必须做O(M+R)次调度,并且保存O(M*R)个状态在内存中.(这个因素使用的内存是很少的,在O(M*R)个状态片里,大约每个map任务/reduce任务对使用一个字节的数据).
此外,R经常被用户限制,因为每一个reduce任务最终都是一个独立的输出文件.实际上,我们倾向于选择M,以便每一个单独的任务大概都是16到64MB的输入数据(以便上面描述的位置优化是最有效的),我们把R设置成我们希望使用的worker机器数量的小倍数.我们经常执行MapReduce计算,在M=200000,R=5000,使用2000台工作者机器的情况下.
3.6备用任务
一个落后者是延长MapReduce操作时间的原因之一:一个机器花费一个异乎寻常地的长时间来完成最后的一些map或reduce任务中的一个.有很多原因可能产生落后者.例如,一个有坏磁盘的机器经常发生可以纠正的错误,这样就使读性能从30MB/s降低到3MB/s.机群调度系统也许已经安排其他的任务在这个机器上,由于计算要使用CPU,内存,本地磁盘,网络带宽的原因,引起它执行MapReduce代码很慢.我们最近遇到的一个问题是,一个在机器初始化时的Bug引起处理器缓存的失效:在一个被影响的机器上的计算性能有上百倍的影响.
我们有一个一般的机制来减轻这个落后者的问题.当一个MapReduce操作将要完成的时候,master调度备用进程来执行那些剩下的还在执行的任务.无论是原来的还是备用的执行完成了,工作都被标记成完成.我们已经调整了这个机制,通常只会占用多几个百分点的机器资源.我们发现这可以显著的减少完成大规模MapReduce操作的时间.作为一个例子,将要在5.3描述的排序程序,在关闭掉备用任务的情况下,要比有备用任务的情况下多花44%的时间.
4技巧
尽管简单的map和reduce函数的功能对于大多数需求是足够的了,但是我们开发了一些有用的扩充.这些将在这个部分描述.
4.1分割函数
MapReduce用户指定reduce任务和reduce任务需要的输出文件的数量.在中间key上使用分割函数,使数据分割后通过这些任务.一个缺省的分割函数使用hash方法(例如,hash(key) mod R).这个导致非常平衡的分割.然后,有的时候,使用其他的key分割函数来分割数据有非常有用的.例如,有时候,输出的key是URLs,并且我们希望每个主机的所有条目保持在同一个输出文件中.为了支持像这样的情况,MapReduce库的用户可以提供专门的分割函数.例如,使用”hash(Hostname(urlkey)) mod R”作为分割函数,使所有来自同一个主机的URLs保存在同一个输出文件中.
4.2顺序保证
我们保证在一个给定的分割里面,中间key/value对以key递增的顺序处理.这个顺序保证可以使每个分割产出一个有序的输出文件,当输出文件的格式需要支持有效率的随机访问key的时候,或者对输出数据集再作排序的时候,就很容易.
4.3combiner函数
在某些情况下,允许中间结果key重复会占据相当的比重,并且用户定义的reduce函数
满足结合律和交换律.一个很好的例子就是在2.1部分的词统计程序.因为词频率倾向于一个zipf分布(齐夫分布),每个map任务将产生成百上千个这样的记录<the,1>.所有的这些计数将通过网络被传输到一个单独的reduce任务,然后由reduce函数加在一起产生一个数字.我们允许用户指定一个可选的combiner函数,先在本地进行合并一下,然后再通过网络发送.
在每一个执行map任务的机器上combiner函数被执行.一般的,相同的代码被用在combiner和reduce函数.在combiner和reduce函数之间唯一的区别是MapReduce库怎样控制函数的输出.reduce函数的输出被保存最终输出文件里.combiner函数的输出被写到中间文件里,然后被发送给reduce任务.
部分使用combiner可以显著的提高一些MapReduce操作的速度.附录A包含一个使用combiner函数的例子.
4.4输入输出类型
MapReduce库支持以几种不同的格式读取输入数据.例如,文本模式输入把每一行看作是一个key/value对.key是文件的偏移量,value是那一行的内容.其他普通的支持格式以key的顺序存储key/value对序列.每一个输入类型的实现知道怎样把输入分割成对每个单独的map任务来说是有意义的(例如,文本模式的范围分割确保仅仅在每行的边界进行范围分割).虽然许多用户仅仅使用很少的预定意输入类型的一个,但是用户可以通过提供一个简单的reader接口来支持一个新的输入类型.
一个reader不必要从文件里读数据.例如,我们可以很容易的定义它从数据库里读记录,或从内存中的数据结构读取.
4.5副作用
有的时候,MapReduce的用户发现在map操作或/和reduce操作时产生辅助文件作为一个附加的输出是很方便的.我们依靠应用程序写来使这个副作用成为原子的.一般的,应用程序写一个临时文件,然后一旦这个文件全部产生完,就自动的被重命名.
对于单个任务产生的多个输出文件来说,我们没有提供其上的两阶段提交的原子操作支持.因此,一个产生需要交叉文件连接的多个输出文件的任务,应该使确定性的任务.不过这个限制在实际的工作中并不是一个问题.
4.6跳过错误记录
有的时候因为用户的代码里有bug,导致在某一个记录上map或reduce函数突然crash掉.这样的bug使得MapReduce操作不能完成.虽然一般是修复这个bug,但是有时候这是不现实的;也许这个bug是在源代码不可得到的第三方库里.有的时候也可以忽略一些记录,例如,当在一个大的数据集上进行统计分析.我们提供一个可选的执行模式,在这个模式下,MapReduce库检测那些记录引起的crash,然后跳过那些记录,来继续执行程序.
每个worker程序安装一个信号处理器来获取内存段异常和总线错误.在调用一个用户自定义的map或reduce操作之前,MapReduce库把记录的序列号存储在一个全局变量里.如果用户代码产生一个信号,那个信号处理器就会发送一个包含序号的”last gasp”UDP包给MapReduce的master.当master不止一次看到同一个记录的时候,它就会指出,当相关的map或reduce任务再次执行的时候,这个记录应当被跳过.
4.7本地执行
调试在map或reduce函数中问题是很困难的,因为实际的计算发生在一个分布式的系统中,经常是有一个master动态的分配工作给几千台机器.为了简化调试和测试,我们开发了一个可替换的实现,这个实现在本地执行所有的MapReduce操作.用户可以控制执行,这样计算可以限制到特定的map任务上.用户以一个标志调用他们的程序,然后可以容易的使用他们认为好用的任何调试和测试工具(例如,gdb).
4.8状态信息
master运行一个HTTP服务器,并且可以输出一组状况页来供人们使用.状态页显示计算进度,象多少个任务已经完成,多少个还在运行,输入的字节数,中间数据字节数,输出字节数,处理百分比,等等.这个页也包含到标准错误的链接,和由每个任务产生的标准输出的链接.用户可以根据这些数据预测计算需要花费的时间,和是否需要更多的资源.当计算比预期的要慢很多的时候,这些页面也可以被用来判断是不是这样.
此外,最上面的状态页显示已经有多少个工作者失败了,和当它们失败的时候,那个map和reduce任务正在运行.当试图诊断在用户代码里的bug时,这个信息也是有用的.
4.9计数器
MapReduce库提供一个计数器工具,来计算各种事件的发生次数.例如,用户代码想要计算所有处理的词的个数,或者被索引的德文文档的数量.
为了使用这个工具,用户代码创建一个命名的计数器对象,然后在map或/和reduce函数里适当的增加计数器.例如:
Counter * uppercase;
uppercase=GetCounter(“uppercase”);
map(String name,String contents):
 for each word w in contents:
    if(IsCapitalized(w)):
      uppercase->Increment();
    EmitIntermediate(w,”1″);
来自不同worker机器上的计数器值被周期性的传送给master(在ping回应里).master把来自成功的map和reduce任务的计数器值加起来,在MapReduce操作完成的时候,把它返回给用户代码.当前计数器的值也被显示在master状态页里,以便人们可以查看实际的计算进度.当计算计数器值的时候消除重复执行的影响,避免数据的累加.(在备用任务的使用,和由于出错的重新执行,可以产生重复执行)
有些计数器值被MapReduce库自动的维护,比如,被处理的输入key/value对的数量,和被产生的输出key/value对的数量.
用户发现计数器工具对于检查MapReduce操作的完整性很有用.例如,在一些MapReduce操作中,用户代码也许想要确保输出对的数量完全等于输入对的数量,或者处理过的德文文档的数量是在全部被处理的文档数量中属于合理的范围.
5性能
在本节,我们用在一个大型集群上运行的两个计算来衡量MapReduce的性能.一个计算用来在一个大概1TB的数据中查找特定的匹配串.另一个计算排序大概1TB的数据.
这两个程序代表了MapReduce的用户实现的真实的程序的一个大子集.一类是,把数据从一种表示转化到另一种表示.另一类是,从一个大的数据集中提取少量的关心的数据.
5.1机群配置
所有的程序在包含大概1800台机器的机群上执行.机器的配置是:2个2G的Intel Xeon超线程处理器,4GB内存,两个160GB IDE磁盘,一个千兆网卡.这些机器部署在一个由两层的,树形交换网络中,在根节点上大概有100到2000G的带宽.所有这些机器都有相同的部署(对等部署),因此任意两点之间的来回时间小于1毫秒.
在4GB的内存里,大概有1-1.5GB被用来运行在机群中其他的任务.这个程序是在周末的下午开始执行的,这个时候CPU,磁盘,网络基本上是空闲的.
5.2Grep
这个Grep程序扫描大概10^10个,每个100字节的记录,查找比较少的3字符的查找串(这个查找串出现在92337个记录中).输入数据被分割成大概64MB的片(M=15000),全部 的输出存放在一个文件中(R=1).
图2显示计算过程随时间变化的情况.Y轴表示输入数据被扫描的速度.随着更多的机群被分配给这个MapReduce计算,速度在逐步的提高,当有1764个worker的时候这个速度达到最高的30GB/s.当map任务完成的时候,速度开始下降,在计算开始后80秒,输入的速度降到0.这个计算持续的时间大概是150秒.这包括了前面大概一分钟的启动时间.启动时间用来把程序传播到所有的机器上,等待GFS打开1000个输入文件,得到必要的位置优化信息.
5.3排序
这个sort程序排序10^10个记录,每个记录100个字节(大概1TB的数据).这个程序是模仿TeraSort的.
这个排序程序只包含不到50行的用户代码.其中有3行map函数用来从文本行提取10字节的排序key,并且产生一个由这个key和原始文本行组成的中间key/value对.我们使用一个内置的Identity函数作为reduce操作.这个函数直接把中间key/value对作为输出的key/value对.最终的排序输出写到一个2路复制的GFS文件中(也就是,程序的输出会写2TB的数据).
象以前一样,输入数据被分割成64MB的片(M=15000).我们把排序后的输出写到4000个文件中(R=4000).分区函数使用key的原始字节来把数据分区到R个小片中.
我们以这个基准的分割函数,知道key的分布情况.在一般的排序程序中,我们会增加一个预处理的MapReduce操作,这个操作用于采样key的情况,并且用这个采样的key的分布情况来计算对最终排序处理的分割点。
图3(a)显示这个排序程序的正常执行情况.左上图显示输入数据的读取速度.这个速度最高到达13GB/s,并且在不到200秒所有map任务完成之后迅速滑落到0.注意到这个输入速度小于Grep.这是因为这个排序map任务花费大概一半的时间和带宽,来把中间数据写到本地硬盘中.而Grep相关的中间数据可以忽略不计.
左中图显示数据通过网络从map任务传输给reduce任务的速度.当第一个map任务完成后,这个排序过程就开始了.图示上的第一个高峰是启动了第一批大概1700个reduce任务(整个MapReduce任务被分配到1700台机器上,每个机器一次只执行一个reduce任务).大概开始计算后的300秒,第一批reduce任务中的一些完成了,我们开始执行剩下的reduce任务.全部的排序过程持续了大概600秒的时间.
左下图显示排序后的数据被reduce任务写入最终文件的速度.因为机器忙于排序中间数据,所以在第一个排序阶段的结束和写阶段的开始有一个延迟.写的速度大概是2-4GB/s.大概开始计算后的850秒写过程结束.包括前面的启动过程,全部的计算任务持续的891秒.这个和TeraSort benchmark的最高纪录1057秒差不多.
需要注意的事情是:因此位置优化的原因,很多数据都是从本地磁盘读取的而没有通过我们有限带宽的网络,所以输入速度比排序速度和输出速度都要快.排序速度比输出速度快的原因是输出阶段写两个排序后数据的拷贝(我们写两个副本的原因是为了可靠性和可用性).我们写两份的原因是因为底层文件系统的可靠性和可用性的要求.如果底层文件系统用类似容错编码(erasure coding)的方式,而不采用复制写的方式,在写盘阶段可以降低网络带宽的要求。
5.4备用任务的影响
在图3(b)中,显示我们不用备用任务的排序程序的执行情况.除了它有一个很长的几乎没有写动作发生的尾巴外,执行流程和图3(a)相似.在960秒后,只有5个reduce任务没有完成.然而,就是这最后几个落后者知道300秒后才完成.全部的计算任务执行了1283秒,多花了44%的时间.
5.5机器失效
在图3(c)中,显示我们有意的在排序程序计算过程中停止1746台worker中的200台机器上的程序的情况.底层机群调度者在这些机器上马上重新开始新的worker程序(因为仅仅程序被停止,而机器仍然在正常运行).
因为已经完成的map工作丢失了(由于相关的map worker被杀掉了),需要重新再作,所以worker死掉会导致一个负数的输入速率.相关map任务的重新执行很快就重新执行了.整个计算过程在933秒内完成,包括了前边的启动时间(只比正常执行时间多了5%的时间).
6经验
我们在2003年的2月写了MapReduce库的第一个版本,并且在2003年的8月做了显著的增强,包括位置优化,worker机器间任务执行的动态负载均衡,等等.从那个时候起,我们惊奇的发现MapReduce函数库广泛用于我们日常处理的问题.它现在在Google内部各个领域内广泛应用,包括:
    大规模机器学习问题
Google News和Froogle产品的机器问题.
提取数据产生一个流行查询的报告(例如,Google Zeitgeist).
为新的试验和产品提取网页的属性(例如,从一个web页的大集合中提取位置信息   用在位置查询).
   大规模的图计算.
图4显示了我们主要的源代码管理系统中,随着时间推移,MapReduce程序的显著增加,从2003年早先时候的0个增长到2004年9月份的差不多900个不同的程序.MapReduce之所以这样的成功,是因为他能够在不到半小时时间内写出一个简单的能够应用于上千台机器的大规模并发程序,并且极大的提高了开发和原形设计的周期效率.并且,他可以让一个完全没有分布式和/或并行系统经验的程序员,能够很容易的利用大量的资源.
在每一个任务结束的时候,MapReduce函数库记录使用的计算资源的统计信息.在图1里,我们列出了2004年8月份在Google运行的一些MapReduce的工作的统计信息.
6.1大规模索引
到目前为止,最成功的MapReduce的应用就是重写了Google web 搜索服务所使用到的index系统.索引系统处理爬虫系统抓回来的超大量的文档集,这些文档集保存在GFS文件里.这些文档的原始内容的大小,超过了20TB.索引程序是通过一系列的,大概5到10次MapReduce操作来建立索引.通过利用MapReduce(替换掉上一个版本的特别设计的分布处理的索引程序版本)有这样一些好处:
   索引的代码简单,量少,容易理解,因为容错,分布式,并行处理都隐藏在MapReduce库中了.例如,当使用MapReduce函数库的时候,计算的代码行数从原来的3800行C++代码一下减少到大概700行代码.
   MapReduce的函数库的性能已经非常好,所以我们可以把概念上不相关的计算步骤分开处理,而不是混在一起以期减少在数据上的处理.这使得改变索引过程很容易.例如,我们对老索引系统的一个小更改可能要好几个月的时间,但是在新系统内,只需要花几天时间就可以了.
   索引系统的操作更容易了,这是因为机器的失效,速度慢的机器,以及网络失效都已经由MapReduce自己解决了,而不需要操作人员的交互.另外,我们可以简单的通过对索引系统增加机器的方式提高处理性能.
7相关工作
很多系统都提供了严格的设计模式,并且通过对编程的严格限制来实现自动的并行计算.例如,一个结合函数可以通过N个元素的数组的前缀在N个处理器上使用并行前缀计算在log N的时间内计算完.MapReduce是基于我们的大型现实计算的经验,对这些模型的一个简化和精炼.并且,我们还提供了基于上千台处理器的容错实现.而大部分并发处理系统都只在小规模的尺度上实现,并且机器的容错还是程序员来控制的.
Bulk Synchronous Programming以及一些MPI primitives提供了更高级别的抽象,可以更容易写出并行处理的程序.这些系统和MapReduce系统的不同之处在,MapReduce利用严格的编程模式自动实现用户程序的并发处理,并且提供了透明的容错处理.
我们本地的优化策略是受active disks等技术的启发,在active disks中,计算任务是尽量推送到靠近本地磁盘的处理单元上,这样就减少了通过I/O子系统或网络的数据量.我们在少量磁盘直接连接到普通处理机运行,来代替直接连接到磁盘控制器的处理机上,但是一般的步骤是相似的.
我们的备用任务的机制和在Charlotte系统上的积极调度机制相似.这个简单的积极调度的一个缺陷是,如果一个任务引起了一个重复性的失败,那个整个计算将无法完成.我们通过在故障情况下跳过故障记录的机制,在某种程度上解决了这个问题.
MapReduce实现依赖一个内置的机群管理系统来在一个大规模共享机器组上分布和运行用户任务.虽然这个不是本论文的重点,但是集群管理系统在理念上和Condor等其他系统是一样的.
在MapReduce库中的排序工具在操作上和NOW-Sort相似.源机器(map worker)分割将要被排序的数据,然后把它发送到R个reduce worker中的一个上.每个reduce worker来本地排序它的数据(如果可能,就在内存中).当然,NOW-Sort没有用户自定义的map和reduce函数,使得我们的库可以广泛的应用.
River提供一个编程模型,在这个模型下,处理进程可以靠在分布式的队列上发送数据进行彼此通讯.和MapReduce一样,River系统尝试提供对不同应用有近似平均的性能,即使在不对等的硬件环境下或者在系统颠簸的情况下也能提供近似平均的性.River是通过精心调度硬盘和网络的通讯,来平衡任务的完成时间.MapReduce不和它不同.利用严格编程模型,MapReduce构架来把问题分割成大量的任务.这些任务被自动的在可用的worker上调度,以便速度快的worker可以处理更多的任务.这个严格编程模型也让我们可以在工作快要结束的时候安排冗余的执行,来在非一致处理的情况减少完成时间(比如,在有慢机或者阻塞的worker的时候).
BAD-FS是一个很MapReduce完全不同的编程模型,它的目标是在一个广阔的网络上执行工作.然而,它们有两个基本原理是相同的.(1)这两个系统使用冗余的执行来从由失效引起的数据丢失中恢复.(2)这两个系统使用本地化调度策略,来减少通过拥挤的网络连接发送的数据数量.
TACC是一个被设计用来简化高有效性网络服务结构的系统.和MapReduce一样,它通过再次执行来实现容错.
8结束语
MapReduce编程模型已经在Google成功的用在不同的目的.我们把这个成功归于以下几个原因:第一,这个模型使用简单,甚至对没有并行和分布式经验的程序员也是如此,因为它隐藏了并行化,容错,位置优化和负载均衡的细节.第二,大量不同的问题可以用MapReduce计算来表达.例如,MapReduce被用来,为Google的产品web搜索服务,排序,数据挖掘,机器学习,和其他许多系统,产生数据.第三,我们已经在一个好几千台计算机的大型集群上开发实现了这个MapReduce.这个实现使得对于这些机器资源的利用非常简单,因此也适用于解决Google遇到的其他很多需要大量计算的问题.
从这个工作中我们也学习到了一些东西.首先,严格的编程模型使得并行化和分布式计算简单,并且也易于构造这样的容错计算环境.第二,网络带宽是系统的瓶颈.因此在我们的系统中大量的优化目标是减少通过网络发送的数据量,本地优化使用我们从本地磁盘读取数据,并且把中间数据写到本地磁盘,以保留网络带宽.第三,冗余的执行可以用来减少速度慢的机器的影响,和控制机器失效和数据丢失.
感谢
Josh Levenberg校定和扩展了用户级别的MapReduce API,并且结合他的适用经验和其他人的改进建议,增加了很多新的功能.MapReduce从GFS中读取和写入数据.我们要感谢Mohit Aron,Howard Gobioff,Markus Gutschke,David Krame,Shun-Tak Leung,和Josh Redstone,他们在开发GFS中的工作.我们还感谢Percy Liang Olcan Sercinoglu 在开发用于MapReduce的集群管理系统得工作.Mike Burrows,Wilson Hsieh,Josh Levenberg,Sharon Perl,RobPike,Debby Wallach为本论文提出了宝贵的意见.OSDI的无名审阅者,以及我们的审核者Eric Brewer,在论文应当如何改进方面给出了有益的意见.最后,我们感谢Google的工程部的所有MapReduce的用户,感谢他们提供了有用的反馈,建议,以及错误报告等等.
A单词频率统计
本节包含了一个完整的程序,用于统计在一组命令行指定的输入文件中,每一个不同的单词出现频率.
#include “mapreduce/mapreduce.h”
//用户map函数
class WordCounter : public Mapper {
 public:
    virtual void Map(const MapInput& input) {
      const string& text = input.value();
      const int n = text.size();
      for (int i = 0; i < n; ) {
        //跳过前导空格
        while ((i < n) && isspace(text[i]))
             i++;
         // 查找单词的结束位置
         int start = i;
         while ((i < n) && !isspace(text[i]))
              i++;
         if (start < i)
            Emit(text.substr(start,i-start),”1″);
        }
     }
};
REGISTER_MAPPER(WordCounter);
//用户的reduce函数
class Adder : public Reducer {
    virtual void Reduce(ReduceInput* input) {
             //迭代具有相同key的所有条目,并且累加它们的value
              int64 value = 0;
              while (!input->done()) {
                     value += StringToInt(input->value());
                     input->NextValue();
              }
               //提交这个输入key的综合
              Emit(IntToString(value));
       }
};
REGISTER_REDUCER(Adder);
int main(int argc, char** argv) {
       ParseCommandLineFlags(argc, argv);
       MapReduceSpecification spec;
       // 把输入文件列表存入”spec”
       for (int i = 1; i < argc; i++) {
              MapReduceInput* input = spec.add_input();
              input->set_format(“text”);
              input->set_filepattern(argv[i]);
              input->set_mapper_class(“WordCounter”);
       }
        //指定输出文件:
       // /gfs/test/freq-00000-of-00100
       // /gfs/test/freq-00001-of-00100
      // …
       MapReduceOutput* out = spec.output();
       out->set_filebase(“/gfs/test/freq”);
       out->set_num_tasks(100);
       out->set_format(“text”);
       out->set_reducer_class(“Adder”);
       // 可选操作:在map任务中做部分累加工作,以便节省带宽
       out->set_combiner_class(“Adder”);
       // 调整参数: 使用2000台机器,每个任务100MB内存
       spec.set_machines(2000);
       spec.set_map_megabytes(100);
       spec.set_reduce_megabytes(100);
       // 运行它
       MapReduceResult result;
       if (!MapReduce(spec, &result)) abort();
       // 完成: ‘result’结构包含计数,花费时间,和使用机器的信息
       return 0;
}