Spark:大数据产品的一种测试方法与达成
发布时间:2021-06-05 09:56:13 所属栏目:大数据 来源:互联网
导读:ETL能兼容各种不同的数据(不同的数据规模,数据分布和数据类型) ETL处理数据的正确性 测试数据兼容 ETL是按一定规则针对数据进行清洗,抽取,转换等一系列操作的简写。那么一般来说他要能够处理很多种不同的数据类型。 我们在生产上遇见的bug有很大一部分占
|
ETL能兼容各种不同的数据(不同的数据规模,数据分布和数据类型)
ETL处理数据的正确性
测试数据兼容
ETL是按一定规则针对数据进行清洗,抽取,转换等一系列操作的简写。那么一般来说他要能够处理很多种不同的数据类型。 我们在生产上遇见的bug有很大一部分占比是生产环境遇到了比较极端的数据导致我们的ETL程序无法处理。 比如:
数据拥有大量分片
在分布式计算中,一份数据是由多个散落在HDFS上的文件组成的, 这些文件可能散落在不同的机器上, 只不过HDFS会给使用者一个统一的视图,让使用者以为自己在操作的是一个文件,而不是很多个文件。 这是HDFS这种分布式文件系统的存储方式。 而各种分布式计算框架, 比如hadoop的MapReduce,或者是spark。 就会利用这种特性,直接读取散落在各个机器上文件并保存在那个节点的内存中(理想状态下,如果资源不够可能还是会发生数据在节点间迁移)。
而读取到内存中的数据也是分片的(partition)。 spark默认以128M为单位读取数据,如果数据小于这个值会按一个分片存储,如果大于这个值就继续往上增长分片。 比如一个文件的大小是130M, spark读取它的时候会在内存中分成两个partition(1个128M,1个2M)。 如果这个文件特别小,只有10M,那它也会被当做一个partition存在内存中。 所以如果一份数据存放在HDFS中,这个数据是由10个散落在各个节点的文件组成的。 那么spark在读取的时候,就会至少在内存中有10个partition, 如果每个文件的大小都超过了128M,partition的数量会继续增加。
而在执行计算的时候,这些存储在多个节点内存中的数据会并发的执行数据计算任务。 也就是说我们的数据是存放在多个节点中的内存中的, 我们为每一个partition都执行一个计算任务。 所以我们针对一个特别大的数据的计算任务, 会首先把数据按partition读取到不同节点的不同的内存中, 也就是把数据拆分成很多小的分片放在不同机器的内存中。 然后分别在这些小的分片上执行计算任务。 最后再聚合每个计算任务的结果。 这就是分布式计算的基本原理。
那么这个时候问题就来了, 这种按partition为单位的分布式计算框架。partition的数量决定着并发的数量。 可以理解为,如果数据有100个partition,就会有100个线程针对这份数据做计算任务。所以partition的数量代表着计算的并行程度。 但是不是说partition越多越好,如果明明数据就很小, 我们却拆分了大量的partition的话,反而是比较慢的。 而且所有分片的计算结果最后是要聚合在一个地方的。 这些都会造成网络IO的开销(因为数据是在不同的节点之前传输的)。 尤其是在分布式计算中,我们有shuffle这个性能杀手(不熟悉这个概念的同学请看我之前的文章)。 在大量的分片下执行shuffle将会是一个灾难,因为大量的网络IO会导致集群处于很高的负载甚至瘫痪。 我们曾经碰见过只有500M但是却有7000个分片的数据,那一次的结果是针对这个数据并行执行了多个ETL程序后,整个hadoop集群瘫痪了。 这是在数据预处理的时候忘记做reparation(重新分片)的结果。
数据倾斜
Spark:大数据产品的一种测试方法与实现
在上面的任务处理中出现了shuffle的操作。shuffle也叫洗牌, 在上面讲partition和分布式计算原理的时候,我们知道分布式计算就是把数据划分很多个数据片存放在很多个不同的节点上, 然后在这些数据片上并发执行同样的计算任务来达到分布式计算的目的,这些任务互相是独立的, 比如我们执行一个count操作, 也就是计算这个数据的行数。 实际的操作其实是针对每个数据分片,也就是partition分别执行count的操作。 比如我们有3个分片分别是A,B,C, 那执行count的时候其实是并发3个线程,每个线程去计算一个partition的行数, 他们都计算完毕后,再汇总到driver程序中, 也就是A,B,C这三个计算任务的计算过程是彼此独立互不干扰的,只在计算完成后进行聚合。
但并不是所有的计算任务都可以这样独立的,比如你要执行一个groupby的sql操作。 就像上面的图中,我要先把数据按单词分组,之后才能做其他的统计计算, 比如统计词频或者其他相关操作。 那么首先spark要做的是根据groupby的字段做哈希,相同值的数据传送到一个固定的partition上。 这样就像上图一样,我们把数据中拥有相同key值的数分配到一个partition, 这样从数据分片上就把数据进行分组隔离。
然后我们要统计词频的话,只需要才来一个count操作就可以了。 shuffle的出现是为了计算能够高效的执行下去, 把相似的数据聚合到相同的partition上就可以方便之后的计算任务依然是独立隔离的并且不会触发网络IO。 这是方便后续计算的设计模式,也就是节省了后续一系列计算的开销。 但代价是shuffle本身的开销,而且很多情况下shuffle本身的开销也是很大的。 尤其是shuffle会因为数据倾斜而出现著名的长尾现象。
根据shuffle的理论,相似的数据会聚合到同一个partition上。 但是如果我们的数据分布不均匀会出现什么情况呢? 比如我们要针对职业这个字段做groupby的操作, 但是如果100W行数据中有90W行的数据都是程序员这个职业的话, 会出现什么情况? 你会发现有90W行的数据都跑到了同一个partition上造成一个巨大的partition。这样就违背了分布式计算的初衷, 分布式计算的初衷就是把数据切分成很多的小数据分布在不同的节点内存中,利用多个节点的并行计算能力来加速计算过程。
![]() (编辑:揭阳站长网) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |



