从数据库系统到Spark SQL (四)

微信扫一扫,分享到朋友圈

从数据库系统到Spark SQL (四)

从数据库系统到Spark SQL (二) 中由于篇幅过长,所以与Spark SQL的部分放到这一章来论述。在第二篇中有提到这样一句话: 而对于mongodb而言,设计的目标是一种文档型数据库,所有的数据通过直接通过key索引到value,采取尽可能的低高度减少查询时的磁盘寻道,采用了b-tree

这里为什么可以减少磁盘寻道呢?其实这跟Spark的 Tungsten优化机制 有点像。下面就来介绍一下databricks引进的这一功能

Tungsten项目将是Spark执行引擎的最大变化。它集中于显着提高的效率存储器和CPU用于火花应用,推送性能更接近现代硬件的限制。这项工作包括三个计划:

  • 内存管理和二进制处理:利用应用程序语义来显式管理内存并消除JVM对象模型和垃圾回收的开销
  • 缓存感知计算:利用内存层次结构的算法和数据结构
  • 代码生成:使用代码生成来利用现代编译器和CPU

Spark1.4之前的工作负载越来越受到CPU和内存使用(而不是IO和网络通信)的限制,所以开始着重于CPU效率的优化

为什么CPU是新的瓶颈?这件事情是由很多原因导致的。一是硬件配置提供了越来越大的总IO带宽,例如网络中的10Gbps链接以及用于存储的高带宽SSD或条带化HDD阵列。从软件角度来看,Spark的优化器现在允许许多工作负载通过修剪给定作业不需要的输入数据来避免大量的磁盘IO。在Spark的shuffle子系统中,序列化和哈希(受CPU约束)已被证明是关键瓶颈,而不是底层硬件的原始网络吞吐量。所有这些趋势意味着,当今的Spark通常受CPU效率和内存压力而非IO的限制。

内存管理和二进制处理

JVM上的应用程序通常依赖JVM的垃圾回收器来管理内存。旨在作为许多工作负载的通用运行时。但是,随着Spark应用程序突破性能极限,JVM对象和GC的开销变得不可忽略。

Java对象具有很大的固有内存开销。考虑一个简单的字符串“ abcd”,使用UTF-8编码将花费4个字节来存储。但是,JVM的本机String实现将这种存储方式不同,以便利于更常见的工作负载。它使用UTF-16编码使用2个字节对​​每个字符进行编码,每个String对象还包含12个字节的标头和8个字节的哈希码,如Java Object Layout工具的以下输出所示。

java.lang.String object internals:
OFFSET  SIZE   TYPE DESCRIPTION                    VALUE
0     4        (object header)                ...
4     4        (object header)                ...
8     4        (object header)                ...
12     4 char[] String.value                   []
16     4    int String.hash                    0
20     4    int String.hash32                  0
Instance size: 24 bytes (reported by Instrumentation API)
复制代码

一个简单的4字节字符串在JVM对象模型中总共超过48个字节

JVM对象模型的另一个问题是垃圾回收的开销。从较高的层次上讲,代垃圾收集将对象分为两类:分配/取消分配率高的对象(年轻代)和被保留的对象(老一代)。垃圾收集器利用年轻一代对象的瞬时特性来有效地管理它们。当GC可以可靠地估计对象的生命周期时,此方法效果很好,但如果不进行估计(即某些瞬态对象溢出到老一代对象中),则效果会很差。由于此方法最终基于启发式方法和估计,因此要想获得性能,可能需要进行GC调整的“黑魔法”,它具有数十个参数,可为JVM提供有关对象生命周期的更多信息。

但是,Spark不仅是通用应用程序。Spark了解数据如何在计算的各个阶段以及工作和任务的范围内流动。结果,Spark比JVM垃圾收集器了解更多有关内存块生命周期的信息,因此应该比JVM更有效地管理内存。

为了解决对象开销和GC效率低下的问题,引入了一个显式的内存管理器,可以将大多数Spark操作转换为直接针对二进制数据而非Java对象进行操作。它基于sun.misc.UnsafeJVM提供的高级功能,该功能公开了C样式的内存访问(例如,显式分配,释放,指针算术)。此外,Unsafe方法是固有的,这意味着每个方法调用都由JIT编译为一条机器指令。

在某些地区,Spark已经开始使用显式管理的内存。Spark1.4中支持了一种新的基于Netty的网络传输,该传输使用诸如内存管理器的jemalloc显式管理所有网络缓冲区。这对于扩大Spark的洗牌操作并赢得“排序基准”至关重要。

其中的第一部分将出现在Spark 1.4中,其中包括一个哈希表,该哈希表直接对二进制数据进行操作,内存由Spark明确管理。与标准Java相比HashMap,此新实现的间接开销要少得多,并且对于垃圾收集器是不可见的。

如上所示,比较了使用不同哈希图的聚合操作的吞吐量:一种使用新哈希图的堆模式,一种使用offheap,一种使用java.util.HashMap。新的哈希表在单个线程中每秒支持超过一百万次聚合操作,约为java.util.HashMap吞吐量的2倍。更重要的是,无需调整任何参数,它几乎不会随着内存利用率的提高而降低性能,而JVM默认值最终会由于GC而崩溃。

在Spark 1.4中,此哈希映射将用于DataFrame和SQL的聚合,而在1.5中,将为大多数其他操作(例如排序和联接)准备好数据结构。在许多情况下,这将消除调整GC以实现高性能的需要。

缓存感知计算

Spark是众所周知的内存计算引擎。该术语的真正含义是,Spark可以有效利用群集上的内存资源,以比基于磁盘的解决方案高得多的速率处理数据。但是,Spark还可以处理比可用内存大几个数量级的数据,透明地溢出到磁盘上并执行诸如排序和散列之类的外部操作。

同样,通过更有效地使用L1 / L2 / L3 CPU缓存,可感知缓存的计算可提高数据处理速度,因为它们比主内存快几个数量级。对Spark用户应用程序进行性能分析时,发现有大量的CPU时间用于等待从主内存中获取数据。作为Tungsten项目的一部分,正在设计对缓存友好的算法和数据结构,因此Spark应用程序将花费更少的时间等待从内存中获取数据,而将更多的时间用于有用的工作。

考虑以记录排序为例。标准排序过程将存储指向记录的指针数组,并使用quicksort交换指针,直到对所有记录进行排序。由于顺序扫描访问模式,排序通常具有良好的缓存命中率。但是,对指针列表进行排序的缓存命中率很低,因为每个比较操作都需要取消引用两个指向内存中随机记录的指针。

那么如何改善排序的缓存局部性呢?一种非常简单的方法是将每个记录的排序键与指针并排存储。例如,如果排序键是64位整数,则使用128位(64位指针和64位键)将每个记录存储在指针数组中。这样,每个快速排序比较操作仅以线性方式查找指针-密钥对,并且不需要随机存储器查找。

这如何适用于Spark?大多数分布式数据处理可以归结为一小部分操作,例如聚合,排序和联接。通过提高这些操作的效率,可以整体上提高Spark应用程序的效率。已经构建了一个具有缓存感知功能的排序版本,该版本比以前的版本快3倍。此新排序将用于基于排序的混洗,高基数聚合和排序合并联接运算符。

Spark Tusten具体工作流程代码如下:

这里调用ExecutorMemoryManager进行内存分配,分配得到一个内存页,将其添加到
page table中,以遍内存地址映射
/**
* Allocate a block of memory that will be tracked in the MemoryManager's page table; this is
* intended for allocating large blocks of memory that will be shared between operators.
*/
public MemoryBlock allocatePage(long size) {
if (size > MAXIMUM_PAGE_SIZE_BYTES) {
throw new IllegalArgumentException(
"Cannot allocate a page with more than " + MAXIMUM_PAGE_SIZE_BYTES + " bytes");
}
final int pageNumber;
synchronized (this) {
pageNumber = allocatedPages.nextClearBit(0);
if (pageNumber >= PAGE_TABLE_SIZE) {
throw new IllegalStateException(
"Have already allocated a maximum of " + PAGE_TABLE_SIZE + " pages");
}
allocatedPages.set(pageNumber);
}
final MemoryBlock page = executorMemoryManager.allocate(size);
page.pageNumber = pageNumber;
pageTable[pageNumber] = page;
if (logger.isTraceEnabled()) {
logger.trace("Allocate page number {} ({} bytes)", pageNumber, size);
}
return page;
}
给定分配到的内存页和页内的偏移,生成一个64bits的逻辑地址
/**
* Given a memory page and offset within that page, encode this address into a 64-bit long.
* This address will remain valid as long as the corresponding page has not been freed.
*
* @param page a data page allocated by {@link TaskMemoryManager#allocate(long)}.
* @param offsetInPage an offset in this page which incorporates the base offset. In other words,
*                     this should be the value that you would pass as the base offset into an
*                     UNSAFE call (e.g. page.baseOffset() + something).
* @return an encoded page address.
*/
public long encodePageNumberAndOffset(MemoryBlock page, long offsetInPage) {
if (!inHeap) {
// In off-heap mode, an offset is an absolute address that may require a full 64 bits to
// encode. Due to our page size limitation, though, we can convert this into an offset that's
// relative to the page's base offset; this relative offset will fit in 51 bits.
offsetInPage -= page.getBaseOffset();
}
return encodePageNumberAndOffset(page.pageNumber, offsetInPage);
}
高13bits是page number,低位为页内偏移
@VisibleForTesting
public static long encodePageNumberAndOffset(int pageNumber, long offsetInPage) {
assert (pageNumber != -1) : "encodePageNumberAndOffset called with invalid page";
return (((long) pageNumber) << OFFSET_BITS) | (offsetInPage & MASK_LONG_LOWER_51_BITS);
}
给定逻辑地址,获取page number
@VisibleForTesting
public static int decodePageNumber(long pagePlusOffsetAddress) {
return (int) ((pagePlusOffsetAddress & MASK_LONG_UPPER_13_BITS) >>> OFFSET_BITS);
}
给定逻辑地址,获取页内偏移
private static long decodeOffset(long pagePlusOffsetAddress) {
return (pagePlusOffsetAddress & MASK_LONG_LOWER_51_BITS);
}
给定
/**
* Get the page associated with an address encoded by
* {@link TaskMemoryManager#encodePageNumberAndOffset(MemoryBlock, long)}
*/
public Object getPage(long pagePlusOffsetAddress) {
if (inHeap) {
final int pageNumber = decodePageNumber(pagePlusOffsetAddress);
assert (pageNumber >= 0 && pageNumber < PAGE_TABLE_SIZE);
final MemoryBlock page = pageTable[pageNumber];
assert (page != null);
assert (page.getBaseObject() != null);
return page.getBaseObject();
} else {
return null;
}
}
/**
* Get the offset associated with an address encoded by
* {@link TaskMemoryManager#encodePageNumberAndOffset(MemoryBlock, long)}
*/
public long getOffsetInPage(long pagePlusOffsetAddress) {
final long offsetInPage = decodeOffset(pagePlusOffsetAddress);
if (inHeap) {
return offsetInPage;
} else {
// In off-heap mode, an offset is an absolute address. In encodePageNumberAndOffset, we
// converted the absolute address into a relative address. Here, we invert that operation:
final int pageNumber = decodePageNumber(pagePlusOffsetAddress);
assert (pageNumber >= 0 && pageNumber < PAGE_TABLE_SIZE);
final MemoryBlock page = pageTable[pageNumber];
assert (page != null);
return page.getBaseOffset() + offsetInPage;
}
}
复制代码

代码生成

Spark1.4引入了用于生成SQL和DataFrames中的表达式评估(如从数据库系统到Spark SQL (三)所述流程)的代码。表达式评估是计算age > 35 && age < 40特定记录上表达式(例如“ ”)的值的过程。在运行时,Spark动态生成用于评估这些表达式的字节码,而不是针对每一行逐步执行较慢的解释器。与解释相比,代码生成减少了原始数据类型的装箱,更重要的是,避免了昂贵的多态函数调度。

在较早的博客文章中,证明了代码生成可以使许多TPC-DS查询的速度提高近一个数量级。现在,将代码生成范围扩展到大多数内置表达式。此外,利用JIT的功能来利用现代CPU中更好的指令流水线,从而将代码生成的级别从一次记录的表达式评估提高到矢量化的表达式评估,以便可以一次处理多个记录。

还将代码生成应用于表达式评估之外的领域,以优化内部组件的CPU效率。对应用代码生成感到非常兴奋的一个方面是加快数据从内存中的二进制格式到有线协议的转换以进行随机播放。如前所述,混洗通常是数据序列化而不是底层网络的瓶颈。通过代码生成,可以提高序列化的吞吐量,进而提高洗牌网络的吞吐量。

金蝶云全球用户大会2020干货不断 赵燕锡分享重构数字战斗力经验

上一篇

Kafka集群搭建及必知必会

下一篇

你也可能喜欢

从数据库系统到Spark SQL (四)

长按储存图像,分享给朋友