微信扫描登录
或者
请输入您的邮件地址来登录或者创建帐号
提 交取 消
GITBOOK.CN需要您的浏览器打开cookies设置以支持登录功能

大数据平台架构和关键系统实现

本篇文章整理自曹国梁5月5日在『ITA1024大数据技术精英群』里的分享实录:算法之下-大数据平台架构和关键系统实现 。

这次本来想和大家分享三个系统的研发和调优经验,但由于我准备时间有限和分享篇幅的限制,这次先和大家分享第一个系统的研发经验。

0Agenda

  • preface:简要介绍品友互动、DSP、RTB
  • 品友大数据平台演进和现状
  • 满足RTB:超低负载 vs. 超低延迟 vs. 超大存储,之LogAggregator
  • TODO 满足RTB:超低负载 vs. 超低延迟 vs. 超大存储,之Kafka2HDFS
  • TODO 多维实时查询之Apache HAWQ

1Preface

品友互动作为国内最大的DSP(市场份额59.8%),这几年发展迅速,随之带来的变化之一就是数据量的爆发式增长。拿RTB程序中的unbid数据来举例,目前每个小时新产生的数据在700G左右。整个数据平台,每天新产生的数据在20T以上。

我想先宏观地说一下DSP是什么,以及与其密切相关的RTB又是什么。

下面的定义,来自Wikipedia。

Where a DSP fits in an online advertising system

enter image description here

https://en.wikipedia.org/wiki/Demand-side_platform A demand-side platform (DSP) is a system that allows buyers of digital advertising inventory to manage multiple ad exchange and data exchange accounts through one interface. Using DSPs allows users to optimize based on set Key Performance Indicators such as effective Cost per Click (eCPC), and effective Cost per Action (eCPA). DSPs are unique because they incorporate many of the facets previously offered by advertising networks, such as wide access to inventory and vertical and lateral targeting, with the ability to serve ads, real-time bid on ads, track the ads, and optimize.

从上面得宏观的定义可以看出,一个最基本的DSP,要能够同时与Ad Exchange对接以及与Advertiser对接,这就是DSP最基本的业务逻辑。

RTB的简单定义: https://en.wikipedia.org/wiki/Real-time_bidding Real-time bidding (RTB) is a means by which advertising inventory is bought and sold on a per-impression basis, via programmatic instantaneous auction, similar to financial markets.

RTB的定义,有两个意思最突出,一是dynamic,二是programmatic。

我还要专门再提的一点是,DSP并不等于RTB。

比如从业务逻辑和数据的角度,DSP至少还包括Impression(曝光)、Click(点击)、Conversion(转化)、Advertiser(广告主)等相关业务逻辑和数据。现在,高端DSP(比如 品友 :-))也在DMP(Data Management Platform)、PDB(Programmatic Direct Buy)等方面深度发力。

到目前为止,我一直在说得都是品友这个公司和所在的这个行业的业务逻辑,看起来和技术都没有什么关系。之所以这样,是因为我认为,没有通用的架构和技术实现,它们必然与其支撑的业务密切相连。所以,所有彻底通用的架构都是“骗子”。

架构、实现都是人来做的,同时也是人来用的。

架构     做     人
 |   <--------> |
实现     用     人

刚才说了业务,现在来简单说说人。 在品友内部,技术团队分为两个大的部门,一个是业务研发部,一个是数据部。整个大数据平台同时服务于业务研发部和数据部。

数据平台的目标之一就是为算法团队和挖掘分析团队提供一个足够快足够准确的平台。

2品友大数据平台演进和现状

说完业务和人,现在开始说大数据平台。

我在数据平台/数据流的进化过程中,找了两个有代表性的阶段,希望能让大家从整体上先感受一下。

enter image description here

这是早期的数据流。处理具体业务(RTB、PDB、etc.)的前端机,实时得在机器本地产生各种日志(bid、unbid、impression、click、convert、advertiser、cookie mapping、statistics、DMP、etc.)。这些日志数据是后续数据挖掘分析团队和算法团队处理的基础,及其重要。

enter image description here

一台前端业务机上,主要跑了实现业务逻辑(比如RTB)的程序。除了业务逻辑的实现,这些程序还需要写本地日志,以及与MQ集群通信,即使用MQ client API,把日志数据发送到MQ集群。

为了支撑上面的数据流,数据平台是这样的。

enter image description here

有一组专门的机器,通过一些rsync-based程序,按照一定时间(比如每小时)把前端业务机上的日志,同步到HDFS集群。

这样的数据流和数据平台,有五个易发现的问题:

一、业务程序必须在内部实现写MQ的功能(以及处理各种相关cases及其failover),不能专心实现业务逻辑,增加业务程序工程师的负担;

二、基于rsync的程序,latency过大,不能满足下游的数据处理的需求。也不能灵活调整latency和throughput;

三、各种non-clustered程序过多,导致scalability受限制、成本大,也不能充分利用各个机器的资源,同时,管理麻烦,给运维造成负担;

四、HDFS的data format只有最原始的text和lzo压缩,导致下游的程序访问数据成本高(text vs. column-based formats),同时,HDFS空间利用率不高;

五、Facebook的Presto,不能受Hadoop YARN管理;

其实还有一点,像HDFS NameNode、YARN ResourceManager、Presto等等,都是运行于JVM,我们在JVM上花费了很多时间和精力。这些时间和精力,性价比很低。

如我开头提到的,随着业务的发展和数据量的爆炸,我们从整体考虑,同时兼顾已有的问题,逐步改进我们的平台。

新的数据流是这样的。

enter image description here

所有前端机上的日志,通过LogAggregator,统一收集到Kafka brokers集群,同时,Kafka集群上的数据,实时地/准实时地(latency和throughput可调)写入HDFS集群。所有的clustered apps,都可以自由的访问Kafka集群和HDFS集群获取需要的数据。 enter image description here

前端机上的程序,也简化了,它们本身不必与MQ交互,只需要专心实现业务逻辑。同时,每个前端机上部署一个LogAggregator。

随之,新平台是这样的。

enter image description here

新的平台有以下显而易见的好处:

一、LogAggregator使业务程序更加纯粹了,去除了写MQ的部分,同时极大的提高了消息发送的可靠性,并自动处理各种edge cases;

二、Kafka2HDFS取代各种基于rsync的程序,能够通过配置灵活调整latency和throughput。生产中,相比基于rsync的程序,数据写入HDFS的latency从70分钟以上减小到10~20分钟。同时,机器数量从原来的26台(基于rsync的程序锁使用的)减小到4台;

三、各种应用程序都集群化;

四、HDFS的data format,从以text为主,升级为以Parquet为主,节省了存储空间,减小了处理复杂度,缩短了处理时间;

五、HAWQ的libyarn使HAWQ这个SQL-on-Hadoop引擎可以被Hadoop YARN管理,而且在表现上大大优于Impala的LLAMa。同时,HAWQ对count-distinct类问题的精确计算,基本可以达到HyperLogLog的速度;

六、平台本身更加简单,脉络更加清晰,更易于扩展;

在新的平台里,有三个关键子系统:LogAggregator、Kafka2HDFS、HAWQ。下面的部分,主要聚焦在这三个子系统。

3满足RTB:超低负载 vs. 超低延迟 vs. 超大存储

LogAggregator和Kafka2HDFS是我们自己研发的两个系统,其中,LogAggregator收集大量独立的前端机上的数据到Kafka brokers集群,Kafka2HDFS把Kafka brokers集群的数据转移到HDFS。

LogAggregator

LogAggregator乍一看类似于Flume、Scribe之类,但它们太“通用”以致解决不了我们的问题。

下面结合代码,来看看LogAggregator解决了什么问题以及怎么解决的。同时,LogAggregator还存在未解决的问题,所以也非常想听听大家的意见。

在大量前端机上都跑着RTB程序,RTB对时间的要求极为苛刻,所有流程都必须在100ms以内完成,这是理论值,实际生产中比这还要短。这100ms包括了广域网I/O的时间。

所以,部署在每个前端机上的agent(比如Flume agent)都只能占用极少的系统资源,丝毫不能影响到RTB程序对系统资源的使用。其实仅仅这一条,就把Flume、Scribe等否了。

其次,前端机上的日志目录结构较为复杂,动态性大,Flume、Scribe无法收集这样目录。

下面是LogAggregator所处理的最常见的五种日志类型:

enter image description here

冒号(“:”)左边是topic(Kafka的一个概念,表示一类消息的名称)名称,右边是路径。有的路径是动态的,包含了一些具有特定规则的通配符,比如:[Ymd]、{8}。

style 1有一层动态目录,即年月日,目录名称共8个字符,例如20160504。每天变化一次,变化的时间不固定,只有当目录下有文件生成时,相应的目录才会生成,不会提前生成。

style 2有两层动态目录,除了年月日,还有一层动态的数字目录,例如0。数字目录有最大值,比如最大值是16,那么动态数字目录可能是[0,15]中的任何一个,但是否生成、何时生成都不确定。

style 3是人为对动态目录进行划分,目的是使数据量偏大的日志,以分布式的方式来处理(distributed nodes or distributed processess),把负载分摊开来,因为前面提到,单个LogAggregator只能使用及其有限的资源。

style 4是很普通的一种日志目录,就是目录路径(包括文件名basename)一直不变,但会不定期分割(rolling),所以实际inode是变化的。

style 5也是很普通的一种目录,就是目录路径(不包括文件名basename)一直不变,但目录下的文件名(basename)一直在改变(新增文件)。

LogAggregator功能点之一,怎么处理多种目录类型。

enter image description here

从配置文件(比如log_aggregator.conf,文件内容的格式就如slide_8.png所示)的开头依次读取每一行配置项,根据路径的不同类型,分别放入不同的std::map。slide_8中所示的style 1、2、3属于raw_paths,style 4属于direct_files,style 5属于normal_paths。

这几个map的声明如下。

enter image description here

路径的不同类型,是通过is_direct_path来区分的,它的实现很简单,如果将来需要处理更多类型的路径,可能需要加强。

enter image description here

如果路径中有任何通配符,就是raw_paths,反之,就是normal_paths或者direct_files。

三种路径中,raw_paths处理最复杂,因为需要解析通配符。也因此,这类路径的灵活性和可配置最好。

enter image description here

遍历处理raw_paths中的每个路径,并把处理后的路径(normalized)放入normal_paths。

具体的处理是在normalize函数中进行的。

enter image description here

enter image description here

具体就是根据特定通配符的含义,还原成正常的路径,并存入normal_paths中。所以,对于解析完配置文件后已经在normal_paths中的路径(即style 5这样的路径),不需要再处理。

因为路径本身以及路径下面的子路径和文件都是动态生成的,所以在这里把所有可能需要监听(watch & notify)的路径,都存起来,否则可能丢失文件。

direct_files中的路径(包括文件的basename),不需要做路径本身的处理。但由于这类文件本身是变化的,即create-open-append-close-roll,所以它们在监听时处理方式不同。同时,为了在访问这类文件时避免多余的操作,需要做一些预处理,比如缓存文件指针,这样可以避免频繁的open-close开销。

enter image description here

LogAggregator功能点之二,高效监听(watch & notify)操作。

LogAggregator利用了Linux kernel提供的inotify,它是INOde noTIFY的简写。它的详细说明,可自行man 7 inotify。这里引述一部分: The inotify API provides a mechanism for monitoring filesystem events. Inotify was merged into the 2.6.13 Linux kernel. The required library interfaces were added to glibc in version 2.4. The inotify API is Linux-specific.

先看一下怎么开始使用inotify。

enter image description here

意料之中,符合Linux的传统,inotify的资源被抽象为一个FD(file descriptor)。同样意料之中,我们把这个FD设置为non-blocking。

上面已经讲过,我们解析完所有的目录路径后,分为两类分别放在normal_paths和direct_files中,这两类的区别一是路径是否包含文件的basename,二是文件的行为不同。

从preprocess_initofy函数中可以看到,对normal_paths中的路径(即不包含文件的basename),利用inotify进行监控(watch),监控的events包括IN_MOVED_TO和IN_CREATED。

这样,当所监控的目录下,有文件被mv过去,或者有文件被创建时,我们会得到FileSystem的通知,第一时间(low lantency and low resource-consuming)进行处理(比如读取文件内容)。

而对direct_files中的路径,我们并没有利用inotify进行监控,原因我会在下面讲到。

关于inotify有两点需要注意和解决。

第一点,大家可以看到,每watch一个路径,就生成一个对应的WD(watch descriptor),类似FD,每个WD也占用/代表一定的系统资源。所以,对于不再需要的WD,要释放。

第二点,inotify不支持recursive watching。举个例子,存在目录/A,并watch了/A,当目录B在A下生成时(/A/B),产生事件并得到通知,此时可以watch新产生的路径/A/B。

但是,如果在处理B的过程中(即在完成对B的watch之前),在B目录下生成了文件c.log(/A/B/c.log),由于不支持recursiveness,针对c.log产生的events将丢失,因为c.log的生成发生在目录B完成watch之前。

第一点,通过两个函数(线程)来解决。

enter image description here

enter image description here

zero_update是一个独立的线程。zero_update实际每天只工作一次,发生在00:00:00前后15秒,其他时间都在sleep。

代码的L1394~L1411实现了对时间的控制。接下来,因为到了新的一天,依次更新了normal_paths、wd_path_map、path_offset_table等数据结构,释放旧的WD和其他一些数据。

enter image description here

routine_update也是一个独立的线程,它所做的工作,是zero_update的一个子集,它用来定时地更新在零点时不能处理的数据。

对第二点的处理,有些复杂,并且没有完美的解决方案。 先看一下代码。

enter image description here

L1042~L1060,就是处理在目录/A下生成了目录B(/A/B)这种情况。其中的关键是handle_new_inotify_dir这个函数。

enter image description here

enter image description here

在handle_new_inotify_dir中,首先扫描新产生的目录(比如上面提到的目录B),得到其中可能丢失的目录项(包括子目录和文件),然后根据目录项的类型,再分别处理。对于子目录,递归调用handle_new_inotify_dir函数,因为路径完全是动态的,无法事先知道目录的层级。实际生产中,到目前为止不超过三层。

扫描新产生的目录是由scan_new_inotify_dir函数完成的。

enter image description here

enter image description here

刚才我提到,对于第二点,没有完美的解决方案,因为没有一个量化的标准可以遵循。

大家注意scan_new_inotify_dir函数中的几个常量和变量:SCAN_DIRENT_MAX、count、n,它们决定了最多可以找回多少个丢失的子目录和文件(比如上面提到的目录B和文件c.log)。不过幸运的是,在我们的测试和生产环境中,到目前为止没有出过问题。

LogAggregator功能点之三,create-open-append-close-roll类文件的处理。

上面讲的是对normal_paths中路径(不包含文件的basename)的处理。下面开始讲对direct_files中路径(包含文件的basename)的处理。

前面提到,没有对direct_files中的文件使用inotify来监控。因为这些文件是随时在改变的,行为链是create-open-append-close-roll,在打开期间一直在append。如果用inotify,将导致FileSystem events产生的过于频繁,失去了event-based notification的意义,也使inotify本身消耗过多系统资源。

enter image description here

enter image description here

实际处理是,循环遍历direct_files中的各个文件,读取每个文件的数据(按行),积攒到一定行数后,统一发送。这里需要注意的是,当文件发生rolling时,应该怎么处理,也就是代码中的L785~L802。

判断一个文件是否被rolled,依据是文件的真实路径(包含basename)是否与direct_files中保存的路径一致,如果不一致,就是发生了rolling。这里的关键是,根据文件指针(FILE *)得到文件真实路径,由get_path_from_fp函数实现。

enter image description here

在get_path_from_fp函数中,根据文件指针(FILE *)得到FD(file descriptor),然后利用/proc目录下的对应此FD的symbolic link作为参数,调用readlink syscall,获取真实路径。

LogAggregator功能点之四,发送数据。

简单说一下主发送逻辑,它是非常典型的lock+queue模型。这也是广义的client端程序(client/server模型)如何实现异步,如何平衡latency与throughput的最常用的方式。

在看操作函数之前,先看一下队列的声明和锁的声明。

enter image description here

再看如何操作队列。

enter image description here

enter image description here

L1120~L1145是入队时锁和信号量的操作。

enter image description here

L1068是具体的入队(enqueue/push)操作。

enter image description here

L652~L675是出队时锁和信号量的操作。

上面介绍了LogAggregator的四个主要功能点以及其中的关键实现。LogAggregator通过灵活的设计和精细的实现,基本满足了RTB类业务对环境的要求,在使用有限系统资源的前提下,做到了实时准确收集数据供下游系统使用。 再做个小广告,Kafka2HDFS的设计和实现,我个人认为比LogAggregator要精彩。希望能尽快再次与大家分享和交流。

今天的分享就到这里,谢谢大家。

问:相比flume,各方面的对比参数如何,包括内存等等?

曹国梁:在内存和cpu方面,比flume小非常多,不是一个量级的。 enter image description here