- 1 -
基于 scribe 和 Hadoop 的海量数据分析系统
的设计与应用
刘彤1,2,辛阳1,2**
(1. 北京邮电大学计算机学院,北京 100876; 5
2. 北京安码科技有限公司,北京 100876)
摘要:随着互联网、移动互联网和物联网的发展,各种终端、信息收集器的数量和种类不断
增加,每个人、世间万事万物每时每刻产生的大量数据,都在不断进入信息系统,等待存储、
分析和充分利用。本文介绍了海量数据分析中遇到的问题,分析了 scribe 和 hadoop 系统在
数据分析系统中的可靠性,设计了基于 scribe 和 Hadoop 的数据收集和分析系统,对该系统10
的性能进行了测试、分析和应用。
关键词:计算机应用;Scribe;Hadoop;MapReduce;HDFS
中图分类号:TP317
Design and Application on Mass Data Analysis System 15
Based On Scribe and Hadoop
LIU Tong1,2, XIN Yang1,2
(1. School of Computer Science,Beijing University of Posts and Telecommunications,
Beijing 100876;
2. Beijing Safe-Code Technology Co.,Ltd, Beijing 100876) 20
Abstract: with the development of Internet, mobile Internet and development of Internet of things,
a variety of terminal, information collector and the number of species increases ceaselessly, each
of us, everything in the world always generate large quantities of data, are constantly into
information system, for storage, analysis and make full use of. This paper introduces the problems
encountered in the analysis of mass data, analysis of scribe and Hadoop system in the data analysis 25
system of reliability, based on the design of the scribe and Hadoop data collection and analysis
system, the performance of the system was tested, analysis and Application.
Keywords: Technology of Computer Application; Scribe; Hadoop; MapReduce; HDFS
0 引言 30
随着计算机技术和互联网技术的快速发展,海量数据正在不断生成,对于急需改变自己
传统 IT 架构的企业而言,如何面对海量数据,如何分析并有效利用其价值,同时优化企业
业务已成为现代企业转型过程中不可避免的问题。
海量数据量只是海量数据所遇到的挑战的一个方面,其他两个方面指的是速度和多样
性。速度表示数据的收集、处理和数据查询的速度需求。 35
随着业务量的增长,日志的大小随着使用用户的增长,也在同比例的增长。怎样收集这
些日志文件,甚至怎样实时的收集这些日志文件。怎样去处理这些海量数据,怎样去分析历
史的数据和现在数据的关系,都成为这些 IT 公司面临的问题。
Hadoop[1]是一个基于 Java 的框架,支持分布式应用程序,使用程序能够应用数千个处
理器节点处理 TB 级的数据,甚至 BP 级的数据[2] [3]。针对顺序读取大型文件进行了优化,40
它可以自动管理数据复制和恢复。即特定处理器上发生了故障,数据仍然会被复制,处理也
- 2 -
将继续进行而不会中断或丢失剩余的计算结果,这使该系统具有一定的容错能力,能够快速
的处理 TB 级的数据。除了 Yahoo!和 FaceBook 两大站点在大量使用超过 10,000 个微处理器
核心的Linux计算机集群运行Hadoop,还有、 IBM 、ImageShack、 、Powerset、
纽约时报、百度、淘宝等,在使用 Hadoop 系统进行数据挖掘、实验和对广告系统优化等。 45
由于 Hadoop 的分布式文件系统(NameNode)采用的是 Master/Slave 架构,采用唯一的
NameNode 提供文件的命名空间等服务。一般 NameNode 不能提供服务的情况可分为两种:
正常关闭以及异常关闭。正常关闭的情形主要发生在 NameNode 的系统维护或软硬件升级,
其中软件升级的情况可能较为常见。异常关闭的情形主要发生在:软件的错误配置、网络问
题、以及客户端的错误行为、硬件故障等。雅虎的经验表明,NameNode 故障的原因中,前50
三种占大多数,在雅虎运行的 15 个集群中,三年时间内,只有 3 次 NameNode 的故障与硬
件问题有关。由于 Hadoop 采用的是唯一一个 NameNode 提供服务,这就增加了系统的不稳
定性,增加了由于 NameNode 宕机而引起的服务不可用。为了解决该问题,我们采用利用
Facebook 提出的 AvatarNode 机制。
1 相关知识 55
日志分析系统一般包括日志收集,日志的存储和日志的计算分析。本文应用的日志收集
系统为 scribe,日志存储系统为 Hadoop 的分布式文件系统 HDFS,日志计算分析框架为
Hadoop 的 MapReduce。
Scribe
Scribe[4]是一个开源的日志收集系统,它能够从各种的日志源上收集日志,并存储到存60
储系统(可以是 NFS,分布式文件系统 HDFS 等)上,以便于进行集中统计分析处理。它
为日志的“分布式收集,统一处理”提供了一个可扩展的,高容错的方案。
Scribe 具备可扩展能力,并且网络故障及服务器节点故障,都不会对日志收集造成影响。
大规模日志集群系统中的每个节点上都运行了一个 Scribe 服务,这个 Scribe 服务器可以收
集信息然后将信息发送到一个中央 Scribe 服务器(也可以是多个中央 Scribe 服务器),如果中65
央 Scribe 服务器(或中央服务器组)出现故障不可用时,各个节点的 Scribe 服务器就会将日志
信息写到本地磁盘,待中央 Scribe 服务器恢复正常时再发送。中央 Scribe 服务器会将这些
信息写文件保存到最终的文件系统上,这里的文件系统可以是一般文件系统或者一个分布式
文件系统中,有时也会把这些日志文件传输到其他层的 Scribe 服务器组中。
Application
Application
Application
thrift
thrift
…
thrift
线程
Push
线程
本地
Push
Push
消息队列
日志生成
…
HDFS文件
系统、
Scribe系统
Hdfs,scribe正常运行
…
Hdfs,scribe正常运行
Hdfs,scribe不可用时
70
图 1 scribe 的架构
- 3 -
当 Scribe 的存储系统不可用时(例如,存储的机器宕机等),Scribe 会将数据写到本地
磁盘上,待存储系统恢复正常后,Scribe 将日志重新加载到存储系统中。
scribe 采用了 thrift[5]接口传输日志,由于使用 thrift,所以不论是什么语言的项目都可以
实现基于 Scribe 的日志收集,达到远程或者是本地同步远程的分布式日志收集效果。 75
Hadoop
Hadoop 框架中最核心设计就是:MapReduce 和 HDFS。MapReduce 的思想是由 Google
的一篇论文所提及而被广为流传,MapReduce 就是任务的分解与结果的汇总。HDFS(Hadoop
Distributed File System)是 Hadoop 分布式文件系统,为分布式计算提供了底层的数据存储
支持[6]。 80
HDFS
Hadoop 分布式文件系统(HDFS)被设计成适合运行在通用硬件(commodity hardware)上
的分布式文件系统。它和现有的分布式文件系统有很多共同点。但同时,它和其他的分布式
文件系统的区别也是很明显的。HDFS 是一个高度容错性的系统,适合部署在廉价的机器上。
HDFS 能提供高吞吐量的数据访问,非常适合大规模数据集上的应用。HDFS 放宽了一部分85
POSIX 约束,来实现流式读取文件系统数据的目的。HDFS 在最开始是作为 Apache Nutch
搜索引擎项目的基础架构而开发的。HDFS 是 Apache Hadoop Core 项目的一部分。
HDFS 采用 master/slave 架构。一个 HDFS 集群是由一个 NameNode 和一定数目的
DataNode 组成。NameNode 是一个中心服务器,负责管理文件系统的名字空间(namespace)
以及客户端对文件的访问。集群中的 DataNode 一般是每个节点一个,负责数据的存储。HDFS90
暴露了文件系统的名字空间,用户能够以文件的形式在上面存储数据。从内部看,一个文件
其实被分成一个或多个数据块,这些块存储在一组 DataNode 上。NameNode 执行文件系统
的名字空间操作,比如打开、关闭、重命名文件或目录。它也负责确定数据块到具体 DataNode
节点的映射。DataNode 负责处理文件系统客户端的读写请求。在 NameNode 的统一调度下
进行数据块的创建、删除和复制。 95
图 2 HDFS框架
Namenode 和 Datanode 被设计成可以在普通的商用机器上运行。这些机器一般运行着
GNU/Linux 操作系统(OS)。HDFS 采用 Java 语言开发,因此任何支持 Java 的机器都可以部100
署 Namenode 或 Datanode。由于采用了可移植性极强的 Java 语言,使得 HDFS 可以部署到
多种类型的机器上。
- 4 -
中国科技论文在线
MapReduce
Hadoop 的 MapReduce 是一个使用简易的软件框架,基于它写出来的应用程序能够运行
在由上千个商用机器组成的大型集群上,并以一种可靠容错的方式并行处理上 T 级别的数105
据集。
MapReduce 编程模型由 google 公司 Jeffery Dean 等人[1],用于在分布式并行环境中处理
海量数据的计算。它将一个任务分解成更多份细粒度的子任务,这些子任务在空闲的处理节
点之间被调度和快速处理之后,最终通过特定的规则进行合并生成最终的结果,其处理模型
有点类似于传统编程模型中的分解和归纳方法。 110
MapReduce 模型将分布式运算抽象成 Map 和 Reduce 两个步骤,从而实现高效的分布式
应用。
图 3 MapReduce 的框构
115
MapReduce 是一个可靠计算系统,从编程规范上,只专注处理逻辑,出错几率更少;
从计算运行上,代码可小规模测试验证,可跟踪计算过程;从系统架构上,计算节点出现故
障时,自动针对故障的数据重新计算。其实 Google 最早提出 MapReduce 也就是为了海量数
据分析。同时 HDFS 最早是为了搜索引擎实现而开发的,后来才被用于分布式计算框架中。
海量数据被分割于多个节点,然后由每一个节点并行计算,将得出的结果归并到输出。同时120
第一阶段的输出又可以作为下一阶段计算的输入,因此可以想象到一个树状结构的分布式计
算图,在不同阶段都有不同产出,同时并行和串行结合的计算也可以很好地在分布式集群的
资源下得以高效的处理。
2 海量数据分析系统的设计
目前,在 IT 公司中为了了解自己的网站的访问量(PV)、独立访客(UV),或客户125
端得使用每天使用的人数,产品的各个功能使用情况,甚至潜在的数据挖掘等。一般通过打
点的方式在一些 web 服务器(Apache、Nginx 等)上产品的功能使用信息,然后再对这些
日志进行统计分析。对日志经行统计分析一般包括日志的收集,日志的统计分析,结果的展
示。总体的结构设计如图 4 所示。
- 5 -
中国科技论文在线
130
图 4 日志分析系统设计架构
在该系统中使用了 Hadoop 分布式计算框架,由于 Hadoop 的文件系统采用的单
NameNode 的 master/slave 架构,所以当 NameNode 出现故障时,Hadoop 集群就不可用,为
了解决该问题,该框架在 Hadoop 集群使用 Avatar Node 机制,为 NameNode 提供热备。在135
Hadoop 集群升级(重启 NameNode 的时间随着 HDFS 文件数的增多而增长)、NameNode
出现故障时进行切换。
图 5 AvatarNode 运行机制
140
AvatarNode 完全封装 NameNode。AvatarNode 运行有两种模式 Primary Avatar 或者
Standby Avatar. 如果启动运行在 Primary Avatar 模式,那么它就和当前 NameNode 功能完全
一样,运行 Primary Avatar 模式的 AvatarNode 机器,保存 HDFS 事务日志(editlog)到一个共
享的 NFS。在另外一台机器上,启动 AvatarNode 的另一个实例,运行在 Standby Avatar 模
式。 Standby AvatarNode 封装了 NameNode 和 SecondaryNameNode。Standby AvatarNode 持145
续的从共享的 NFS 中读取 HDFS editlog,并持续的把这些事务推送到 Standby AvatarNode
中 NameNode 实例中。Standby AvatarNode 中的 NameNode 运行在 SafeMode。原因是不能
让它负责 NameNode 的工作,但是必须保持和 NameNode 同步的 NameNode Metadata 信息。
- 6 -
中国科技论文在线
AvatarNode0 以 PrimaryAvatar 方式启动,AvatarNode1 以 StandbyAvatar 方式启动,然
后启动各个 DataNode。当 scribe 向 AvatarNode0 发出读写请求时,即 150
1) 用户向 AvatarNode0(Primary Avatar)发出读、写请求,并且得到相应的相应后,
然后向 DataNode 直接读、写数据。
2) AvatarNode0(Primary Avatar)相应用户的请求后,保存 HDFS 事务日志(editlog)
并且把该事务共享到 NFS 服务器上,为 AvatarNode1 提供更新自己 NameNode 的
元数据信息。 155
3) DataNode 向两个 AvatarNode(即 AvatarNode0 和 AvatarNode1)发送心跳信息,用
来监控 DataNode 的运行状态。
4) AvatarNode1 节点读取 NFS 服务器上的 AvatarNode0 事务日志(editlog),并用
这些事务更新 AvatarNode1 中的 NameNode 的内存中的元数据信息。
当 AvatarNode0 发生故障后,手动切换 AvatarNode1 为 Primary Avatar,同时,选取一160
个备用节点,以 StandbyAvatar 启动,这样,当 AvatarNode1 发生故障时,即可将备用节点
切换到 Primary 状态,始终保持 NameNode 可以对外提供服务。
3 海量数据分析系统性能分析与应用
海量数据分析系统性能分析
海量数据分析系统是由数据收集和数据分析两大部分组成,下面我们将这两部分的性能165
做进一步的分析。
Scribe 日志收集性能分析
为了测试 Scribe 的性能,我们设计了下图的测试模型,对我们设计的系统进行测试。
①
①
④
④
⑤
图 6 测试模型 170
测试模型的数据流流向是:
1) 客户端向 web 服务器(Apache 或 Nginx)发出请求。
- 7 -
中国科技论文在线
2) Web 服务器(Apache 或 Nginx)会记录用户的访问请求的日志信息,访问信息记
录在日志文件中。 175
3) 脚本实现了向 Scribe Local Server 发送日志的功能。具体的是,当日
志文件有新的访问信息, 就会把新的访问信息发送给本地的 Scribe
server。
4) 当向本地的 scribe server 发送信息失败时,scribe 就会把日志文件缓存到本地的磁
盘上,一旦本地的 scribe sever 服务正常后,本地的 scribe server 会把缓存在本地的180
日志信息发送到中心 scribe server。
5) 中心 scribe server 向 HDFS 发送从各个本地的 scribe sever 收集来的日志文件数据,
当中心 scribe server 向 HDFS 发送日志文件失败时,也会把日志文件数据缓存到本
地,待到中心 scribe server 可以向 HDFS 发送数据时,即服务恢复时,再把本地缓
存的文件数据发到 HDFS 上,完场对日志的近似于实时的收集。 185
由于我们的日志服务器一般是有多个域名公用的,而且是按照不同的请求进行切分的。
所以我们的测试用例设计为,收集日志的进程数为多个(即, 的进程数为多个)。
测试的进程数分别是 1,4,16,32,主要考察 scribe Center Server 的 cpu 和带宽使用,收
集日志条数的准确性。测试数据使用 AB 测试,并发链接数为 1000,请求次数为 5000000
(日志机采用的是 16 核的 64 位 centos 系统,所以图 9 的纵坐标的最大值为 1600%。)。190
测试的结果如下图所示:
%
%
%
%
%
%
%
%
%
1 4 16 32
峰值
平均值
图 7 scribe server 的 cpu使用效率
0
10
20
30
40
50
60
70
80
1 4 16 32
M/
s
带宽峰值
带宽平均值
195
图 8 scribe server 的带宽使用率
从以上的测试结果来看,在并发连接数为 1000,请求次数为 5000000,收集日志的进程
- 8 -
中国科技论文在线
数,从 1 到 4 到 16 到 32 的过程中,cpu 和带宽的平均使用率对于 16 核的千兆网卡而言,
符合我们的预期结果。 200
Hadoop 计算性能分析
下面测试的任务是,利用 Hadoop 的 MapReduce 实现类似两个表数据关联(inner join)
的任务[7],考查的性能有运行时间和数据量输入,
图 9 运行时间 205
图 10 运行状态
从图 10 可已看出 MapReduce 的输入文件大约是 12G 的数据(12283129866 Byte),210
MapReduce 的输出结果文件大小为 29M(29224089Byte),该程序从图 9 可以看出运行了
158 秒,与 MySQL 等传统关系型数据库相比性能有了改进(MySql 数据库单表的数据量最
多为 2000 万,若果多余 2000 万,该数据库的效率就会很慢,从图 10 总可以看出 map input
records 为 254914761,这个数已经远远大于 2000 了万)。
海量数据分析系统性能分析与应用 215
目前该系统主要用来收集 web 服务器(Apache 和 Nginx)用户请求的日志,利用 Hadoop
对这些收集的日志经行分析和计算。主要用于实现类似数据库中的聚类函数(count()、sum()、
avg())的功能和数据库中的海量数据关联(数据库中的 left join、right join、inner join),下
面以一个 inner join 的为例。
具体需求是:我们每天有一些客户端产品都会被安装,我们称这部分安装用户为新用户,220
用户安装成功后都会记日志,在日志中会记录这些产品是通过那些渠道安装的。还有每天都
有用户使用该客户端产品,我们称这部分用户为活跃用户。现在是在今天的活跃用户中出现
- 9 -
中国科技论文在线
昨天各个渠道安装的用户数,即通过两部分的数据关联(inner join)可以算出来。
由于关联的两部分数据比较大分别为 11G(11941784574 Byte)和 330M(330859452
Byte),所以我们采用的 MapReduce 中的 Reduce 端的关联,没采用 Map 端得关联是因为225
Map 端要采用 distribute cache 这种方法,需要把两种文件中最小的读入内存,会引起内存
不够用的错误。当然采用 Reduce 端得关联,会使得大量的数据通过 HTTP 协议在 Map 端向
Reduce 端传输数据,会影响到程序的执行效率和网络开销等问题。
4 结论
在海量数据分析中,scribe 和 Hadoop 结合的计算框架的使用,解决了海量数据收集的230
问题,Hadoop 的使用解决了海量数据存储以及海量数据的计算分析问题,AvatarNode 的使
用进一步使得 Hadoop 集群的可靠性(主要是针对 NameNode)增强。在 Hadoop 家族中,
HDFS 和 MapReduce 是最核心的基本组成部分,在HDFS 和 MapReduce 上还可以运行 Pig[8]、
Hive[9] 、Hbase[10]等数据分析和数据挖掘工具,使得该日志分析框架具有扩展性,为以后的
数据挖掘提供底层支持。 235
[参考文献] (References)
[1] Jeffrey Dean and Sanjay Ghemawat jeff ,MapReduce: Simplified Data Processing on Large Clusters.
Communications of the ACM - 50th anniversary issue: 1958 - 2008 CACM Homepage archive Volume 51 Issue 1,
January 2008. 240
[2] Tom White .O'Reilly - Hadoop The Definitive Guide 2nd Edition Publisher: Yahoo Press, Oct 2010.
[3] Tom White. Hadoop 权威指南(2 版)[M].曾大聃.北京:清华大学出版社,2010.
[4] Scribe.
[5] Apache Thrift.
[6] Apache Hadoop. 245
[7] Chuck Lam .Hadoop in Action. Publisher: Manning Publications, 1 edition .December 6, 2010.
[8] Apache Pig.
[9] Apache ://.
[10] Apache ://.
250