CarbonData+Spark SQL的应用实践和调优经验分享

摘要: Apache CarbonData+Spark 主题技术交流会上,四位专家就 CarbonData+Spark 的重要特性和使用介绍做了全面而细致的分享,本文简单整理了其中的部分精彩内容,正文更有所有讲师的演讲 PPT+ 演讲视频奉上。

09-08 23:03 首页 大数据杂谈

大数据时代,中大型企业数据的爆发式增长,几乎每天都能产生约 100GB 到 10TB 的数据。而企业数据分系统构建与扩张,导致不同应用场景下大数据冗余严重。行业亟需一个高效、统一的融合数仓,从海量数据中快速获取有效信息,从而洞察机遇、规避风险。

在这样的现状下,CarbonData 诞生了,作为首个由中国贡献给 Apache 社区的顶级开源项目,CarbonData 提供了一种新的融合数据存储方案,以一份数据同时支持多种大数据应用场景,并通过丰富的索引技术、字典编码、列存等特性提升了 IO 扫描和计算性能,实现了 PB 数据级的秒级响应。

为了帮助开发者深入了解并学习这项大数据开源技术,华为 CarbonData PMC 陈亮牵头,携手技术社区的核心开发者及合作伙伴,举办了一场 Apache CarbonData+Spark 主题的技术交流会,就 CarbonData+Spark 的重要特性和使用介绍,做了全面而细致的分享,本文简单整理了其中的部分精彩内容,同时,作为本次活动的承办方,InfoQ 整理上传了所有讲师的演讲 PPT+ 演讲视频,感兴趣的同学可以关注公众号并发送【CarbonData】,免费获取现场完整资料 。

Spark SQL 的发展史概述  

来自美国 Databricks 公司的范文臣首先讲述了 Spark SQL 的发展史,范文臣同时也是 Apache Spark PMC member,主导 Spark SQL 一些主要功能的设计和研发,定期审计项目代码质量等。现场,他将 Spark SQL 过去的发展分为四个阶段:


  • 2009 年,著名的 Spark 框架诞生。 它是一个围绕速度、易用性和复杂分析构建的大数据处理框架,由伯克 AMP 实验室创建。相比于当时流行的 Hadoop,Spark 提供了更高效的 MapReduce 模型,减少数据落地,也降低了编程难度。

  • 2011 年,Spark 团队将 Hive 的底层物理执行模块从 Hadoop 切换成 Shark,启动了 Shark 项目。然而,由于 Hive 自身的代码复杂性以及和 Hadoop MapReduce 的耦合,Shark 的开发举步维艰,进展缓慢。

  • 2014 年,Spark 团队舍弃 Shark,重新建立了一套完整的查询框架 Catalyst。Catalyst 利用了函数式风格的不可变特性,使 Query Plan 不可变,优化器通过遍历优化策略生成新的 Query Plan。这样优化规则之间的影响更容易理解,提升了代码的可读性和可维护性,也方便了新特性的开发。下图为 Spark SQL 控制框架:

     

  • 2015 年,Spark 团队提出了钨丝计划,通过建立 Tungsten 格式、后端优化、代码生成等手段,将 Spark 的查询性能和执行速度提升到了一个新的台阶。

  • 2017 年,持续探索中……

那么,沿着查询性能这条路,Spark 的未来还会有哪些优化方向?范文臣在最后的演讲中总结到:Spark 的愿景是管理各种不同性质数据集和数据源的大数据处理的需求。Spark 这样一个角色,只关注于计算层,快速查询处理是 Spark 唯一的衡量标准,也是未来不变的发展方向。也因此,在之后的 Spark2.3 里面,在计算框架下如何更快的和储存系统桥接、Spark 代码生成都是未来着重关注的方向。

CarbonData 应用实践 +2.0 新技术规划介绍  

CarbonData 诞生之初是希望以一份数据去满足企业各种各样的场景需求,包括详单过滤和海量数仓以及数据集式操作等。那么,开发者该如何正确使用 CarbonData 技术?华为 CarbonData 总设计师李昆结合实际案例,详细讲解了 CarbonData 应用实践 +2.0 新技术规划。

CarbonData 大数据生态  

Carbondata 在数据查询方面选择和 Spark 结合,据李昆现场介绍,Carbondata+Spark 可以打造一个相对于传统系统来说,更好的交互分析体验,目前 Carbondata 和 Spark1.5、1.6、2.1,Hive,Presto 都做了集成,未来还将对 Spark2.2 做支持;在接口方面,Carbondata 提供 SQL 接口,也支持 Spark DataFrame API;在操作方面,支持查询、数据管理如批量入库、更新、删除等操作。

随后,李昆就 CarbonData 索引建立、CarbonData 表格与物理存储、SQL 引擎对接、数据管理过程等 技术内容做了详细介绍。由于篇幅限制,本文不在此介绍,感兴趣的读者可以下载讲师 PPT 对 CarbonData 的存储原理进行深入了解。

成功案例介绍  

随后,李昆通过电信详单分析场景的举例介绍,详细说明 CarbonData 如何以一份数据支持多种应用场景的。李昆表示,在电信跟金融领域经常需要明细数据分析,优化之前,老的系统需要用 Impala 和 Hbase 两个系统,建立 4 个二级索引才可以完成业务需要的性能。这其中,Impala 用来做报表输出,Hbase 做关键维度查询。这两个系统有各自存在不足:Impala 没有办法很好的扩展,HBase 要做很多二级索引,无法使用 yarn 统一资源管理,只能是一个个集群单独维护。

用 Carbondata+Spark 数据优化后,可以解决既要点查又要处理报表的情况。下图是一个从 2000 亿到 1 万亿的性能测试数据,Q1 是过滤查询,Q2 也是过滤查询,Q1 跟 Q2 数据查询因为用了 Carbondata 索引,需要扫描的数据不会增长很多,数据量增长 5 倍,查询时间增长不到 1 倍。第三个查询是 full scan 查询,主要考察的是 spark 和 carbon 的可扩展性,测试过程中发现扩展性是非常线性的,scalability 很好。

CarbonData2.0 未来规划  

现在,Carbondata 的主要特性是对多场景的支持,不过在大数据时代,更多的场景正扑面而来。包括 SQL 分析、时间序列分析、位置轨迹、文本检索、图查询和机器学习等。这就需要 Carbondata2.0 在各领域的应用上有更多的准备。包括:

  • 入库方面,需要考虑实时事件的流式入库、历史事件的批量入库等;

  • 存储方面分三层,一层是界面,每一个领域有自己的术语,会针对领域常见操作做些 SQL 上的扩展;二是数据组织层,对不同领域做不同的分区、索引和预处理等,以便于它更高效地存储领域数据;三是存储格式层,Carbondata 目前是列存,为了支撑更多查询和分析,数据格式本身也需要具有扩展能力,比如行存、时序、面向 AI 的格式等;

Spark 2.2 核心特性 CBO 介绍(王振华老师)  

在 Spark SQL 的 Catalyst 优化器中,许多基于规则的优化技术已经实现,但优化器本身仍然有很大的改进空间。Spark 2.2 在 Spark SQL 引擎内添加了一个基于成本的优化器框架,此框架通过可靠的统计和精确的估算,能够在以下领域做出好的判定:选择散列连接操作的正确构建端,选择正确的连接算法,调整连接的顺序等等,这个基于成本的优化器就是 CBO。据华为研究工程师王振华介绍,CBO 的目标是希望优化器能够自动为用户选择最优的执行计划,要达到这件事情,需要以下三个步骤:

第一步收集、推断和传播关于源 / 中间数据的表 / 列统计信息。 用户运行 ANALYZE TABLE 命令会收集表格信息比如表的行数、大小,列的统计信息比如最大值、最小值、不同值个数等,并将这些信息存储到 metastore 里面。

第二步 Cardinality Estimation,根据收集到的信息,计算每个操作符的成本,包括输出行数、输出大小等。 如做 filter 时写一个过滤条件,给定的条件会基于条件里面涉及列的统计信息,估算过滤条件执行完了以后,Operator 有多少数据。

如下图,为一个 A 小于等于某数字的估算,如果 A 的 value 比 A 的最小值更小,或者是比 A 的最大值更大,那么过滤率肯定是 0 或者 100%,当落在定义域中间的时候,假设是均匀分布,概率则是 A.min 到 B 的区间所占 A 的定义域的百分比,这个是 Filter 条件最终的 selectivity,有了 selectivity,即可再相应的更新 filter 以后的统计信息。

第三步根据成本计算,选择最优的查询执行计划。 通过建造方选择(Build Side Selection)、散列连接实现:广播与洗牌(Hash Join Implementation: Broadcast vs. Shuffle)、多路连接重新排序(Multi-way Join Reorder)、连接成本计算公式(Join Cost Formula)四个方面阐述了最优计划的选择过程。

其中,在多路连接重新排序方法上,采用了动态规划算法。以四表连接为例,首先,将所有项 (基本连接节点) 放到 0 级;然后,从第 0 级的计划中构建所有的两表连接;第三,从以前的层级 (单节点和两表连接) 中构建出可能的三表连接;最后,构建所有的 4 路连接,并在其中选出最优的计划。而在构建 m- 路径连接时,只需保留同一组 m 项的最佳计划 (最优子解决方案)。如,对于 A、B、C 的三表连接顺序,只保留三个候选计划:(A J B)J C,(A J C)J B 和 (B J C)J A 当中最优的计划。

Join cost 计算方式如下,首先 Cost 一般来说传统的数据库里是基于 CPU 和 IO,这两个 Cost 是线性加合。在 Spark 中,用 Cardinality 模拟 CPU 的开销,用 size 模拟 IO 的开销。

王振华最后介绍到,华为在 2016 年 7 月份开始将 CBO 贡献给 Spark 社区,并建立了 umbrella ticket - SPARK-16026。截至目前为止,创建了超过 40 个 sub-tasks、提交了 50 余个 pull requests 并被合入,同时吸引了十余个社区贡献者的参与。

CBO 的第一个版本已经在 Spark 2.2 中发布,感兴趣的开发者和使用者,如要使用 CBO,可以在收集统计信息之后,打开 spark.sql.cbo.enable 来使用 CBO。

Partition 功能详解 + 上汽实践分享(曹鲁老师)  

CarbonData 的 partition 特性将在 Apache CarbonData 1.2.0 版本里正式发布,此特性将显著提升大数据查询性能。 上汽集团大数据将 CarbonData 作为平台基础组件,以应对迅猛增长的数据量,那么上汽集团在使用 CarbonData 过程中遇到了哪些问题?上汽集团大数据平台开发经理曹鲁就 CarbonData 的 partition 特性以及上汽集团在 CarbonData 项目的实践和测试数据做了分享。

曹鲁首先介绍了文件结构,索引生成过程,初次性能测试等主题内容,引出 Partition 特性带来改变,主要包括两点:1、数据将基于 Partition 列更为集中存储,查询时可过滤掉大量 block,减少 spark task 数量;2、可以使其他列在排序中更靠前,提升查询性能。

Partition Table 的数据加载及查询过程详解  

随后,曹鲁详细介绍了 CarbonData Partition 相关的 DDL 语法,如 Create Partition Table、Show Partition 等,以及 CarbonData Partition Table 的数据加载以及查询过程。下图可以很清晰的看到 CarbonData Partition 的整个数据加载过程。

关于 CarbonData Partition Table 查询过程,大概分为两个部分:

  • 根据 SQL 中的过滤条件 =, <=, <, >, >=, in, not in 以及表达式右值确定命中的 partitionId

  • 如果有其他在排过序的维度列有过滤条件,则在 driver 端根据 B-tree 索引获取 blocklet  所在的文件名,如没有则获取全部,再根据文件名中的 partitionId,筛选得到需要读取的文件,最后再下发 spark task 进行读取;

之后,曹鲁就 Partition 的新增 (add)、拆分 (split) 及删除 (drop) 功能的语法和实现过程展开了分析,其中重点区分了 Drop Partition 但保留数据 RangePartition/ListPartition 两种 Drop Partition 类型的不同语法与实现,感兴趣的读者可以下载讲师 PPT 深入了解。

上汽在 CarbonData 项目的实践分享  

在案例分享环节,曹鲁以上汽的数据作为测试数据,分析了 CarbonData Partition table 和非 Partition table 条件下的加载性能和查询性能对比。并给出了 CarbonData Partition 的性能调优建议。本文为大家展示其中的无排序维度列作为过滤条件,有 partition 列上的范围过滤条件的聚合查询情况的对比结果,如图不难看出,原始查询方式的耗时是添加 partition 性能查询方式耗时的 25 倍。

曹鲁给出的 CarbonData Partition 的性能调优建议:1、 选择最合适的 Partition 列;2、尽可能的使用 Partition 列作为过滤条件,例如 Partition 列为 A,开发者根据业务需求在 Column B 上有筛选条件,但注意到 A 与 B 列之间存在某种固定的 mapping 关系,这时就可以根据 B 列的过滤条件再新增一个 partition 列的过滤条件,以提高查询效率。

现场精彩问答整理  
   Q: 客户在使用 Spark 时不愿意编写代码,更喜欢给他一个页面能能够直接生成 SQL,Spark 后面会不会更多的偏向于业务人员做一些更易应用的东西出来,比如可以直接出来一个页面?

A:Spark 本身不会往这方面走,因为 Spark 只专注于做计算这层,这个模式一般是另外一个项目,比如有项目 zpplin 是专门做供应 GIU 的,可以在 zpplin 上面调 Spark 的一些接口,这些会单独立项,而不是在 Spark 里面做。

   Q:刚才提到 carbon 有一个目标,能够尽量多的支持各种场景,目前我们也做过一些测试,某些特定情况下,不同的场景可能在响应速度和并发性上有比较大的差距,这一点后面有没有改善?

A:这方面需要跟 Spark 一起联合做优化,因为 Spark 是端到端的,从元数据查询到 SQL 优化到 DAG 调度执行,有很多中间过程处理会耗时,建议你做一下打点分析,看主要瓶颈是哪一块,同时 carbon 和 spark 我们也可以做一些联合优化,相信基于社区的努力后面会有改善。

   Q:如果有新的数据添加进来,CarbonData 统计信息如何更新?

A:有两种方式,一种是比较简单的,每次数据表更新重新计算增量,这样比较精确但是会比较慢,另外一种方式是增量的更新统计信息,这种方式较前一种可能会稍微复杂一些。

   Q:在用 Spark 写 Carbondata Partition 的时候,并行比较高,导致每个分区下出现很多小文件,这样有什么好的解决办法?

A:在 CarbonData 中每一个 Block 的大小是可以设置的,Blocklet 也可以设置的,在 load 数据的时候,写满一个 block 的默认大小就会重新再写一个文件,所以可以设置 Block 大小来解决这个问题。另外定期使用 CarbonData 的 compaction 功能也可以合并一些小文件,当然后面我们也会考虑开发 merge partition 的功能来给用户提供更多选择。



首页 - 大数据杂谈 的更多文章: