本文是大数据解读2015之Spark篇,明略数据的梁堰波为大家解读Spark在2015年的快速发展,后续InfoQ会有更多关于大数据生态技术的总结。 2015年的Spark社区的进展实在是太快了,我发现1月份出版的一本参考书到现在已经有很多内容是过时的了。社区大踏步前行的同时,用户和应用案例也越来越多,应用行业越来越广泛。到年底了我们来梳理下Spark这快速发展的一年。 先从全局有个认识,我尝试用三句话来概括下Spark最主要的变化,然后在接下来的篇幅选取一些重点内容展开。
在传统意义上Spark的核心是RDD和RDD之上的各种transformation和action,也就是各种算子,RDD可以认为是分布式的Java对象的集合。2013年推出了DataFrame,可以看做分布式的Row对象的集合。DataFrame除了提供了比RDD更丰富的算子以外,更重要的特点就是有执行计划的优化器,这样用户只需要指定自己的操作逻辑,DataFrame的优化器会帮助用户选择一条效率最优的执行路径。同时Tungsten优化(下一章重点讲)使得DataFrame的存储和计算效率比RDD高很多。Spark的机器学习项目MLlib的ML pipeline就是完全基于DataFrame的,而且未来Streaming也会以DataFrame为核心。 (图片引自Databricks) 那么为什么DataFrame比RDD在存储和计算上的效率更高呢?这主要得益于Tungsten项目。Tungsten做的优化概括起来说就是由Spark自己来管理内存而不是使用JVM,这样可以避免JVM GC带来的性能损失;内存中的Java对象被存储成Spark自己的二进制格式,更加紧凑,节省内存空间,而且能更好的估计数据量大小和内存使用情况;计算直接发生在二进制格式上,省去了序列化和反序列化时间。 像传统的Hadoop/Hive系统,磁盘IO是一个很大的瓶颈。而对于像Spark这样的计算框架,主要的瓶颈在于CPU和内存。下面看看Tungsten主要做了哪些优化: 1.基于JVM的语言带来的问题:GC问题和Java对象的内存开销。例如一个字符串”abcd”理论上只有4个bytes,但是用Java String类型来存储却需要48个bytes。Spark的改进就是自己管理内存,不用JVM来管理了,使用的工具是sun.misc.Unsafe。DataFrame的每一行就是一个UnsafeRow,这块内存存的啥东西只有Spark自己能读懂。有了这种特有的二进制存储格式后,DataFrame的算子直接操控二进制数据,同时又省去了很多序列化和反序列化的开销。 2.Cache-aware的计算。现在Spark已经是内存计算引擎了,但是能不能更进一步呢,能不能更好的利用CPU的L1/L2/L3缓存的优势呢,因为CPU缓存的访问效率更高。这个优化点也不是意淫出来的,是在profile了很多Spark应用之后得到的结论,发现很多CPU的时间浪费在等待从内存中取数据的过程。所以在Tungsten中就设计和实现了一系列的cache-friendly的算法和数据结构来加速这个过程,例如aggregations, joins和shuffle操作中进行快速排序和hash操作。 以sort为例,Spark已经实现了cache-aware的sort算法,比原来的性能提升至少有3倍。在传统的排序中是通过指针来索引数据的,但是缺点就是CPU cache命中率不够高,因为我们需要随机访问record做比较。实际上quicksort算法是能够非常好的利用cache的,主要是我们的record不是连续存储的。Spark的优化就是存储一个key prefix和指针在一起,那么就可以通过比较key prefix来直接实现排序,这样CPU cache的命中率就会高很多。例如如果我们需要排序的列是一个string类型,那么我们可以拿这个string的UTF-8编码的前8个字节来做key prefix,并进行排序。 关于这个优化可以参见SPARK-9457 和org.apache.spark.shuffle.sort下面的类,最重要的是ShuffleExternalSorter和ShuffleInMemorySorter两个类。 3.运行时代码生成 运行时代码生成能免去昂贵的虚函数调用,同时也省去了对Java基本类型装箱之类的操作了。Spark SQL将运行时代码生成用于表达式的求值,效果显著。 除了这些优化,我认为还有两个很重要的变化: 1.Unified Memory Management 在以前Spark的内存显式的被分为三部分:execution,storage和其他。execution内存用于shuffle, join, sort和aggregation等操作,而storage内存主要用于cache数据。在1.6版本之前是通过spark.shuffle.memoryFraction和spark.storage.memoryFraction两个参数来配置用于execution和storage的内存份额。从1.6开始这两部分内存合在一起统一管理了,也就是说如果现在没有execution的需要,那么所有的内存都可以给storage用,反过来也是一样的。同时execution可以evict storage的部分内存,但是反过来不行。在新的内存管理框架上使用两个参数来控制spark.memory.fraction和spark.memory.storageFraction。 2.Adaptive query execution 这个特性说大了就是所有数据库最核心的一个功能query execution optimization,可以做的东西非常多。我们自己写Spark程序中经常会碰到一个job跑到最后每个分区的数据量很小的情况,这是因为以前的Spark不会估计下游RDD的每个分区的数据量大小,并根据数据量大小来调整分区个数。以前遇到这种问题就需要手工repartition,用户自己要心里有数到哪个阶段的RDD的partition数据变多了还是变少了,需要跟着调整分区的数目,非常不灵活。从1.6版本开始有了部分支持,主要是能够估计在join和aggregate操作中Shuffle之后的分区的数目,动态调整下游task的数目,从而提高执行效率。 Spark从API的角度看,可以分为两大类:
虽然API不同,但是背后解析出来的算子是一样的,DataFrame的各种算子其实就是各种SQL的语法。Spark在SQL语法的支持越来越丰富的同时内置的SQL函数得到了很大的增强,目前已经有超过100个这样的常用函数(string, math, date, time, type conversion, condition),可以说最常见的SQL内置函数都有了。 作为一个类SQL的分析工具,聚合函数是非常核心的。Spark 1.5和1.6在聚合函数上都有很大改进:实现了一个新的聚合函数接口,支持了一些build-in的聚合函数(例如max/min/count/sum/avg/first/corr/stddev/variance/skewness/kurtosis以及一些窗口函数等),同时基于新接口实现了相应的UDAF接口。新的聚合函数接口是AggregateFunction,有两种具体的实现:ImperativeAggregate和DeclarativeAggregate。 ImperativeAggregate类型的聚合操作就是通过用户定义三个动作 initialize/update/merge的逻辑来实现聚合的;而DeclarativeAggregate则是通过指定initialValues/updateExpressions/mergeExpressions这三个表达式然后通过代码生成的方式来做聚合的操作。这两种方式各有利弊,一般来说代码生成效率更高,但是像variance/stddev/skewness/kurtosis这样的多个表达式需要依赖同一个中间表达式的场景下,代码生成的执行路径由于不能共享中间的结果,从而导致其不如ImperativeAggregate效率更高,所以在Spark内部的实现中这几个聚合函数也是通过ImperativeAggregate来实现的。 SQL API上另一个变化是可以直接在文件上进行SQL操作,不需要把这个文件注册成一个table。例如支持”select a, b from json.`path/to/json/files`”这样的语法,这个应该是从Apache Drill借鉴过来的。 另外一个里程碑式的特性就是Dataset API(SPARK-9999)。Dataset可以认为是DataFrame的一个特例,主要区别是Dataset每一个record存储的是一个强类型值而不是一个Row。这个强类型的值是以编码的二进制形式被存储的,这种存储格式可以不用反序列化就直接可以被上面的算子(例如sort,Shuffle等)操作。所以在创建Dataset的时候需要指定用于这个编码工作的Encoder。 这样一些需要强类型的地方就可以使用Dataset API,不失DataFrame的那些优点,同时又可以帮我们做类型检查。所以从某种角度上说这个Dataset API在将来是要替换掉RDD的。 这个feature可以说是建立起Spark生态系统的基础,使得Spark与大数据生态圈的其他组件联系起来了。可以这么理解,你无论数据是在HDFS上,还是在Cassandra里面,抑或关系型数据库里面,我Spark都可以拿过来做分析和处理,或者机器学习,我这边处理完了你让我写到哪去我就可以写出去。这个特性使得Spark成为了大数据处理的核心一环。目前Spark支持的外部数据源有很多种,主流的像Parquet,JSON,JDBC,ORC,AVRO,HBase,Cassandra,AWS S3,AWS Redshift等。 在这些外部数据源中,Parquet是其中最核心的,Spark的Parquet支持也有了很大的改进:修复了越来越多的bug,Parquet的版本升级到1.7;更快的metadata discovery和schema merging;能够读取其他工具或者库生成的非标准的parquet文件;以及更快更鲁棒的动态分区插入;对于flat schema的Parquet格式的数据的读性能提升了大约1倍(SPARK-11787)。 另外在Hive支持方面,越来越多的Hive特有的SQL语法被加入到Spark中,例如DISTRIBUTE BY... SORT等。支持连接Hive 1.2版本的metastore,同时支持metastore partition pruning(通过spark.sql.hive.metastorePartitionPruning=true开启,默认为false)。因为很多公司的Hive集群都升级到了1.2以上,那么这个改进对于需要访问Hive元数据的Spark集群来说非常重要。 Spark在机器学习方面的发展很快,目前已经支持了主流的统计和机器学习算法。虽然和单机的机器学习库相比MLlib还有一定的差距;但是纵观所有基于分布式架构的开源机器学习库,MLlib是我认为的计算效率最高的。下面列出了目前MLlib支持的主要的机器学习算法:
下面简单说下其中一些亮点:
| |||||||||
|
声明:文章版权归原作者所有 部分文章转自互联网 如有侵权请联系
[邮箱地址] 删除
|