Spark SQL 分桶表在字节跳动的优化

微信扫一扫,分享到朋友圈

Spark SQL 分桶表在字节跳动的优化

本文来自 SPARK + AI SUMMIT 2020 北美会议,分享者来自字节跳动的郭俊。Bucket 在 Hive 和 Spark SQL 中普遍使用,用于消除 Join 或者 group-by-aggregate 场景下的 Shuffle 操作。本文主要介绍字节跳动在 Bucket 方面的优化。

本文主要从以下四个方面介绍:

  • Spark SQL 在字节跳动的应用

  • 什么是分桶

  • Spark 分桶的限制

  • 字节跳动在分桶方面的优化

下面是 Spark SQL 在字节跳动的应用。

  • 2016年主要是小规模的测试阶段

  • 2017年用于处理 Ad-hoc 工作负载

  • 2018年在生产环境下处理少量的 ETL 管道工作;

  • 2019年在生产环境下全面部署;

  • 2020年成为 DW 领域的主要计算引擎。

什么是分桶

上面例子展示了创建分桶表的方法。主要关键字是 clustered by (xxx) sorted by (xxx) into N buckets

如果我们往分桶表里面插入数据,可以如下使用

INSERT INTO order

SELECT order_id, user_id, product, amount

FROM order_staging

可见,这个和正常表的使用并没有什么区别。

如果我们进行一个 ShuffleHashJoin 的时候,首先需要将表的数据按照 on 的条件进行分区,然后才是进行 Join 操作。

但是如果参与 Join 的表已经实现分桶了,那么在执行  ShuffleHashJoin
的时候省去 Shuffle 的操作。比如上面的例子如果 我们
对 order 和 user 表按照 user_id 字段进行分桶,那么在  ShuffleHashJoin
的时候就不需要进行 Exchange 操作了。


对于 SortMergeJoin ,需要对 on 里面的条件字段进行 Exchange 操作,然后再进行 Sort 操作,最后才是执行  SortMergeJoin
(更多关于 Join 的策略可以参见过往记忆大数据的《 每个 Spark 工程师都应该知道的五种 Join 策略
》文章)。

如果参与 Join 的表已经分桶了,那么不需要就行 Exchange 和 Sort 操作了。

Spark 分桶的限制

小文件问题


执行上面的



SQL



,每个 task 最多可能产生



1024



个文件,其中



1024



是分桶的数量。所以如果我们有



M 个 task




,那么最多产生的文件个数为 M * 1024


比如上面的 attempt_20200519145628_0014_m_000014_0 目录下产生了 988 个文件。

解决小文件的问题可以加上 


DISTRIBUTE BY


 ,如下:

INSERT INTO order SELECT order_id, user_id, product, amount

FROM order_staging

DISTRIBUTE BY user_id

如果 1024 是 M 的倍数,那么最多会产生 1024 个文件,其中 M = spark.sql.shuffle.partitions;

如果 M 是 1024 的倍数,那么最多会产生 M 个文件, 其中 M 

spark.sql.shuffle.partitions

Spark 分桶和其他 SQL 引擎不兼容

Spark 的分桶和 Hive 的分桶是不兼容的,同时和 Presto 也是不兼容的;但是 Presto 与 Hive 的分桶是兼容的。


Spark 的分桶和 Hive 不兼容主要原因是以下原因导致的:

  • Hi
    ve 在生成分桶的时候会额外进行
    一个 Reduce 操作,以保证相同分桶的数据都存储在一个文件中。
    而 Spark SQL 在写分桶文件时不需要 Shuffle 操作,这样就会导致每个分桶最多产生 M 个文件,这就导致上面说的小文件问题;

  • Spark 分桶和 Hive 分桶采用不同的 hash 算法。Hive 用的是 HiveHash;而 Spark 用的是 Murmur3,

    所以数据的分布是不一样的。



因为 Spark 和 Hive 分桶不兼容,所以当 Spark 的 分桶
表和 Hive 的分桶表进行 SortMergeJoin 的时候是需要进行 Sort 和 Exchange 操作的。

额外的排序操作


因为 Spark SQL 表中的每个分桶里面最少包含一个文件,所以在进行 Join 之前需要进行额外的排序操作。

分桶数不对齐


如果参与 Join 的表分桶数不一致,那么其中一张表需要进行额外的 Exchange 操作。

参与 Join 的 key 和分桶列不一样需要额外操作


当参与 Join 的 key 和分桶的列不一样时,需要额外的 Exchange 操作。


上面的例子尽管参与 Join 的表都是对 user_id 字段进行分桶,并且分桶数一样,但是还是需要额外的 Exchange 操作。

字节跳动在分桶方面的优化

Spark 分桶和 Hive 分桶对齐


前面介绍了 Spark 和 Hive 分桶不兼容,对于这方面,字节跳动将 Hive 分桶表和 Spark 分桶表进行了对齐,主要包括:


Spark SQL 写 Hive 分桶表的逻辑和 Hive 一致。 重写了  InsertIntoHiveTable#requiredOrdering 和 InsertIntoHiveTable#requiredDistribution,并且也使用了 HiveHash 算法。



对于读方面,重写了 HiveTableScanExec#outputPartitioning 和 HiveTableScanExec#outputOrdering, 使用了 
Hiv
eHash
算法,并且使用了 Hive 的分桶元数据。


上面是 Spark 读取 Hive 分桶表改进前和改进后的区别。可以看到,改进后,outputPartitioning 为 HashPartitioning,并且 outputOrdering 为 SortOrder,满足了 requireChildDistribution 为 HashClusteredDistribution的要求以及requireChildOrdering 为 SortOrder,从而在进行 SortMergeJoin 的时候省去了 Exchange 和 Sort 操作。

One to Mange Bucket Join


另一个改进是 One to Merge Bucket Join,比如下面例子 A 表有三个分桶,B 表有六个分桶。


如果我们在 Spark 对上面两张表进行 Join 操作,B 表需要额外的 Sort 操作,因为上面两张表的分桶数不一样。但是在字节公司,由于对性能的要求,需要避免 Sort 操作。

一种方法是将 A 表的分桶 0 和 B 表的分桶 0 、分桶 3 进行关联;
 A 表的
分桶 1 和
 B 表的分桶 1 、分桶
 4 进行关联

 A 表的
分桶 2 和
 B 表的分桶 2 、分桶
 5 进行关联
。我们只需要将 A 表复制一份,这样 A 表也满足 6 个分桶。将 A 表和 A 表进行 Union 可以产生 到 6 个分桶的新表,但是 Spark 自带的 Union 操作之后  outputPartitioning 和
 

outputOrdering 将被删除,所以字节自己开发出 bucket union,使得  outputPartitioning 和
 
outputOrdering 被保留,这样就可以省去 Sort 和 Exchange 操作。




不过上面的方面在  B left join A 、 B
 
left semi join A
B
 
anti join A

B
 
inner join A
可以正常工作,但是在 
right join A、

full outer join A、
B cross
 join A 的时候结果有重复,因为 A 表的数据被扫描了两次。






为了解决这个问题,在 TableScan 后面加上了 hash(10) % buckets = bucket id 的过滤条件,比如 bucket 0 将会把 3、9、15 过滤掉,通过这种办法将会消除重复数据。


字节的另外一个优化是如果 Join 的 Key 不仅仅是分桶的 Key,原生的 Spark 会产生额外的 Exchange 和 Sort 操作。

通过优化后,Exchange 将消除。

过往记忆大数据微信群,请添加微信:
fangzhen0219,备注【进群】

微信扫一扫,分享到朋友圈

Spark SQL 分桶表在字节跳动的优化

名创优品无法停止狂奔|招股书深度解读

上一篇

第四阶段 day9.24 springboot

下一篇

你也可能喜欢

Spark SQL 分桶表在字节跳动的优化

长按储存图像,分享给朋友