Druid驱动海量实时多维分析

作者/分享人:Chat 实录

本篇文章整理自合一集团资深工程师张海雷6月15日在『ITA1024大数据技术精英群』里的分享实录:Druid驱动海量实时多维分析 。

enter image description here

我今天分享的话题是Druid驱动海量数据实时多维分析。

需求背景

首先我来谈一下海量实时多维分析的需求背景,我们广告系统有DSP睿视系统和AD exchange等,前段时间品友的曹老师分享时提到DSP Ad exchange,如果对DSP不了解的同学,可以在百度百科上查一下“互联网广告DSP”,实时竞价是DSP的核心,广告主或者优化师需要动态调整出价优化使收益最大化。

广告主调整出价策略或者投放策略进行优化,想要尽快得到实时的反馈,比如修改了地域定投,实时分地域的竞得率和转化率,甚至是分钟粒度的。在我们的DSP系统中我们提供12种维度(不算多)的多维分析,用户可以任意组合下钻查询。

最初的时候我并不是采用Druid,而是采用Storm+Kafka+Redis实时数据处理,以及hive+mysql的离线处理的Lambda架构架构见下图。

enter image description here

实时数据经过Storm ETL,主要是将不同维度组合作为key,计算metrics以后为value存在redis中。离线数据定时将昨天的数据在Hive中 经过一系列ETL,按照维度组合计算metrics以后将结果存储在mysql中。

采用redis作为实时数据的存储有两个核心问题需要解决:

redis不支持range scan,我们需要在app层拼好所有的key,然后调用mget获取,如果执行groupby查询的话例如select area,pv from t where cast='a' group byarea的话,需在客户端好所有的cast_area的key。那么问题来了,我并不知道投放a在哪些城市 产生过曝光,如果有地域定向还好,如果没有的话需要拼接所有城市的key取redis中进行mget。哪这样只是城市维度还好,全国也 只有几百个城市,但如果换做其他基数很大的维度,哪性能开销就比较大了。

如果做到真正的多维分析,需要穷举1维,2维知道N维维度的所有组合,例如有A B C和D,那么一维的组合有A BC D,二维的有 AB BC CD DA等组合,依次类推直到组合到N维。同时每种维度又有自己的基数,例如ABCD各自的基数都是100,最差的情况, 今ABCD这种组合就会有1亿个key,但现实数据一般都是稀疏,不会出现这种的情况。总之穷举维度组合作为key的方式存储空间 会非常大,特别是添加维度以后,有可能是指数级的增长,非常不可控。

为了规避这个问题,我们对功能进行了降级,我们提供某些维度组合的查询,这样就做到了简单可控。但需要历史和实时结合查询的场景,问题就又来了,我需要将mysql的数据和redis的数据整合起来。只能在app端进行数据合并。就这样这个系统愉快地高效运转了一段时间。

4月份的时候我参加Qcon和Druid中国用户组的Meetup,听Druid项目发起人Fangjin Yang将Druid的昨天今天和明天,6年前他们在metamarkets创业公司,主要为广告公司提供多维数据分析,尝试过很多方案,例如传统的关系型数据mysql/postgresql,kv存储的 Hbase,Hbase受限于rowkey设计,都不能很好地解决多维分析,随后创建Druid。我相信很多公司都有类似的经历。说了这么多终于引入正题。

Druid简介

Druid是一个实时处理时序数据的Olap数据库,因为它的索引首先按照时间分片,查询的时候也是按照时间线去路由索引。

下面介绍一下它的几个特性:

  • 亚秒响应的交互式查询。支持较高并发,为面向用户的平台提供Olap查询(注意这是相比其他OLAP的强大优势)。
  • 支持实时导入,导入即可被查询。支持高并发导入。
  • 采用分布式shared-nothing的架构,可以扩展到PB级。
  • 支持聚合函数,count和sum,以及使用javascript实现自定义UDF。
  • 支持复杂的Aggregator,近似查询的Aggregator例如HyperLoglog以及Yahoo开源的DataSketches。
  • 支持Groupby,Select,Search查询。
  • 不支持大表之间的Join,但其lookup功能满足和维度表的Join 。

它是如何做到的呢?

列存储,倒排索引,Roll UP,roaring 或者concisebitmap位图索引以及高效的压缩支撑了Druid的高效查询。提到倒排索引,大家都会联想到Lucene,Solr以及ElasticSearch。确实Elastic Search是Druid的强大竞争对手,有兴趣的同学请移步http://druid.io/docs/latest/comparisons/druid-vs-elasticsearch.html,该部分不是本次分享的重点,就不做展开讨论。

Druid是由一组不同角色的节点,每种节点都是一个单独的子系统负责不同的功能,这些节点组成一个系统协同工作,我们来看一下Druid的整体架构见下图:

enter image description here

这张图上有Realtime,Broker,Historical等节点,以及灰色的代表数据导入的流向,红色的代表数据查询的流向。

Segment是Druid的索引。Druid首先按照时间分片,Segment Granularity代表Segment存储了那些时间范围的数据,这里的 Segment Granularity可以是分钟 小时天等时间粒度。

RealTime部分可以是Indexing Service或者Realtime Node,它负责实时导入数据,生成Segment(索引),在Realtime中采用LSM-tree 的模型,他可以采用push或者pull的方式获取流式数据,数据首先添加到内存的增量索引中,内存增量索引采用SkipList,对写友好而且能够高并发处理,查询也相对来说高效。

当达到一定阈值后,这个阈值可以内存增量中最大条数,或者累积到一定时间。采用异步线程将 内存增量索引转成倒排索引持久化到磁盘中,同时生成一个新的内存增量索引接收数据。

周而复始,当达到设定的Segment Granularity时,Druid会Merge那些Segment Granularity内所有持久化的索引成一个Segment,并且推送到Deep Storage。在推送到 Deep Storage之前,Realtime负责对这些Segment的查询。

Historical是Druid的主力,他从"Deep Storage"中下载Segment,采用mmap的方式加载Segment,并负责来自broker对这些 Segment的查询。在Zookeeper中为每个Historical创建了一个Load Queue来决定下载那些Segment,一旦加载成功,Historical在 Zookeeper中宣告这个Segment由它来负责。

Historical采用多线程的方式处理高并发,比如一个请求过来涉及多个Segment的查询,那么会为每个segment分配一个线程,并发地执行,然后汇总结果返回给broker。由于Historical处理Groupby查询时,需要大量的内存存放中间临时结果,那么内存管理就显得至关重要,Druid采用一个简单的内存管理池,每次申请都会返回固定大小的direct buffer,使用完毕以后还回池中。

前面提到Druid采用Mmap的方式加载Segment,如果索引的总大小小于内存大小,并且留有一定量的direct memory大小,那么Druid变成了内存数据库,瓶颈不在IO,而不是在CPU,索引的压缩以及数据的聚合都很耗CPU。Historical采用Shared-Nothing架构,可以无限扩展,提升内存大小以及总的CPU core数从而提升计算能力。

Broker类似分布式搜索引擎中的元搜索引擎,他不负责任何Segment的查询,他只是一个代理,从Zookeeper中获取TimeLine,这个 TimeLine记录了intervals->List(Server)的mapping关系,接收到Client的请求以后,按照时间段在TimeLine查找Segment分布在那些 Server上。然后并发地去查询然后汇总结果。前面提到在每个realtime或historical内部也是类似这样的架构。

为了便于大家理解,下面我用一幅图来展示整个流程。

enter image description here

Coordinator负责协调Segment的均衡加载,Coordinator从元数据存储mysql中获取那些还未被加载的Segment,根据当前所有 Historical的负载能力均衡地分配到其LoadQueue。默认采用2副本加载,也就是说把同一个Segment分配到两个Historical节点上去, 这样设计是为了保障高可用。

Coordinator同时会检测historical的存活,一旦发现某个Historical当掉或者有新的Historical加入,就会进行自动均衡。

好了经过上面对Druid的整体介绍,我相信大家已经有一个初步印象。重点说明的是Druid采用LSM-tree的方式做到实时索引实时查询,下一步我将讲述支撑高效查询的索引以及压缩。

支撑高效查询的索引以及压缩

我们在讨论高效查询时,那么得首先定义一个标准,如何才能称之为快,Druid官方宣称他们已经实现在一万亿行数据集上执行 query,响应时间小于1秒。这里提到的一万亿行数据集我理解是Raw Data,而不是导入Druid以后存储的数据集大小。

因为Druid在导入数据过程中有Rollup处理,这个特性会减少数据集的大小,当然这只是我的理解并没有和作者求证。言归正传,Druid是如何做到这么快的呢。索引是支撑数据库高效查询的基石。以下面的一组数据举例来演示整个索引以及查询过程。

enter image description here

这也是Druid要求的数据,第一列是时间,是必选项,广告和地域都是维度列,在Druid中维度都作为字符串处理,展示数和点击数为Metric,一般是看作数值处理,可以是 Long或者Double,其实也可以复杂的metric例如HyperLogLog来处理UV,DataSketches的功能更加增强可以实现交并集的处理,可以用做访客的留存或者回访统计,此处不详细展开。

Raw Data的话,每个展示或者点击事件,其数量都是1,Druid在数据导入阶段会进行Rollup,将相同维度组合的数据进行聚合处理,例如在2016/05/14T11期间,在广告C1地域P1有14条件曝光事件,3条点击事件,Rollup会将采用其设定的聚合器进行聚合,在这里我们采用sum的方式,得到展示数14,点击数3.

Druid采用倒排索引和列存储的方式,列存储采用字典编码的方式,将某一个维度的值,按照字典顺序编码成整数,以广告维度为例,这个数据集中共有4条记录,不重复的值有3个C1 、C2和C3,这里提出一个基数的概念,基数可以理解为集合中不重复值的数量。那么广告维度的基数是3.我们把它们按照字 典顺序编码得到C1=0,C2=1,C3=2。字典通常采用数组结构表达,元素的下标为字典编码值。

前面我们将了Druid的数据格式,Rollup和列存储以及字典编码方式。

倒排索引和数据布局

倒排索引被应用在搜索引擎上用于文档的检索,随着大数据的发展,倒排索引同样在大数据分析领域绽放出新的光芒,例如Elastic Search,越来越多的用户把它 用在数据分析上。

Druid同样采用倒排索引,但不象Lucene,Druid专注于数据统计分析,而且只支持布尔查询。Druid的索引结构布局 由字典,正排(列存储)以及倒排组成,其中倒排的PostingList采用压缩的BitMap位图索引,BitMap的优势是能支持快速的布尔查询,后续会介绍。

目前支持Consice和Roaring两种BitMap方式,它们在稀疏的Posting List上均能起到很好的压缩效果,更重要的是在进行布尔运算时不用解压缩,直接在压缩的结构上进行布尔运算。下图展示的结构是将广告编制成索引的数据布局。例如C1只出现在第一行,那么它的BitMap是1000,同理C2出现在第二和三行,那么它的Bitmap是0110.这里是介绍Dimension的索引布局,Metric同样也采用列存储的方式按块压缩。

enter image description here

说到索引,不得不提是压缩技术。

前面提到BitMap支持Consice和Roaring两种压缩编码方式,在Column正排部分使用变长整数编码,不同于其他场景下的可能须有一个bit标示分割,在Druid的索引中,字典的大小是已知的。如果字典的大小小于0Xf,那么只有一个 byte存储,同理,如果字典的大小小于0Xff则用两个byte存储,依次类推。如果维度的基数为100,那个只有一个字节存储,压缩率为25%, 非常高效。

在最新的索引版本中在变长整数编码的技术又加上了LZ系列压缩。压缩是一把双刃剑,平衡的是IO和CPU的开销。说到LZ系列压缩,Druid采用按块压缩,默认块大小为64K。

布尔查询,以这个SQL查询为例,selectsum(click) from table where time between 2016-05-24T11 and2016-05-24T12 and广告=C2 and 地域=P1;首先根据时间段定位到Segment,然后根据广告=C2和地域=P1,得到他们各自的字典编码,C2=1,P1=0,然后根据字典编码得到BitMap的Offet,从而得到Bitmap,C2=1 index为1的bitmap为0110,同理得到P1的bitmap为1100,0110和1100进行And与运算是0100,得到的offset是1,说明我们需要在click中读取offset=1的值是4.

再举一个复杂一点的例子。

Group Byquery。Select 地域,sum(click) fromtable where time between 2016-05-24T11 and 2016-05-24T12 and 广告=C2 group by 地域。Group By较 上面的Select查询不同的是它需要按照地域去汇总点击数。这样就得把符合过滤条件“广告=C2”的地域和点击数两列的值都遍历出来再 在内存中做分组聚合。

接下来演示一下整个过程,首先是根据"广告=C2"得到字典编码=1,然后根据index=1得到bitmap是 0110,Offset是1和2,根据Offset分别在地域和点击数中遍历得到,P1,4和P2 20.当然这个例子的数据非常少,现实中海量数据的话 Group by有可能遍历大量的数据,首先在内存中做聚合再得到最终的结果集。

在这一部分,我们讲了Druid高效查询的基石,倒排索引,巧妙的索引结构布局和高效的压缩,BitMap以及布尔查询。

应用案例

我们回到应用背景中提到的案例,使用Druid以后的架构变成下图。

enter image description here

特别要提出的是最初我们采用RealtimeNode,部署简单,但每次Schema的更新都要重启每个实时节点,并且实时节点没有副本 不能保障高可用,Indexing service作为其的升级替代品,是一套独立的分布式系统能保障高可用,作者推荐使用。这个架构中,依然有离线和实时两条线,离线会在T+1的时候修正数据。

目前我们每天的数据在20亿左右,13个维度,20多个指标项,大部分的查询响应时间都在几百毫秒,其导入即可查询的实时性和高效的交互式查询很好地支撑了业务需求。

微信扫描登录