TinySQL学习笔记之SelectionExec

微信扫一扫,分享到朋友圈

TinySQL学习笔记之SelectionExec

本文以中的代码对应的 tinysql 项目版本号为 df75611

SelectionExec 是tinysql第五章第一节中要补全的执行器组件,希望能够通过本文的总结来说明tinysql执行器中的一些概念。

接口

作为执行器组件, SelectionExec 实现了 Executor 接口, 其定义位于 tinysql/executor/executor.go L136

// Executor is the physical implementation of a algebra operator.
//
// In TiDB, all algebra operators are implemented as iterators, i.e., they
// support a simple Open-Next-Close protocol. See this paper for more details:
//
// "Volcano-An Extensible and Parallel Query Evaluation System"
//
// Different from Volcano's execution model, a "Next" function call in TiDB will
// return a batch of rows, other than a single row in Volcano.
// NOTE: Executors must call "chk.Reset()" before appending their results to it.
type Executor interface {
base() *baseExecutor
Open(context.Context) error
Next(ctx context.Context, req *chunk.Chunk) error
Close() error
Schema() *expression.Schema
}

每一个 Executor 的实现都是从 baseExecutor 扩展出来的,一般都会继承其中的 base / Schema 方法,前者是将 baseExecutor 返回,而后者返回的是表结构,设计了哪些列和键。

在火山模型中,主要需要使用 Open / Next / Close 三个方法。代码上最基本的逻辑是,当上层 ExecutorNext 方法被调用时,被调用的 Executor 通过调用下层 ExecutorNext 方法返回的 Chunk ,经过一定的处理来构建本层的返回。简单的这么说比较宽泛,所以希望通过分享我自己在写proj时阅读代码的个人理解,帮助大家理解 Executor 的串联。

结构体

SelectionExec 的定义位于 tinysql/executor/executor.go L345

// SelectionExec represents a filter executor.
type SelectionExec struct {
baseExecutor                           // 基础结构
batched     bool                       // 是否以批处理的形式返回结果
filters     []expression.Expression    // 过滤器表达式列表
selected    []bool                     // 过滤结果buffer
inputIter   *chunk.Iterator4Chunk      // 迭代器
inputRow    chunk.Row                  // 迭代当前行
childResult *chunk.Chunk               // 下层Executor返回的结果buffer
}

其中 baseExecutor 的定义位于 tinysql/executor/executor.go L55

type baseExecutor struct {
ctx           sessionctx.Context        // 执行上下文
id            fmt.Stringer              // 标识
schema        *expression.Schema        // 表结构
initCap       int                       // Chunk初始容量
maxChunkSize  int                       // 返回Chunk的最大尺寸
children      []Executor                // 下层Executor
retFieldTypes []*types.FieldType        // 返回的列信息
}

方法

下面根据 SelectionExecExecutor 接口的实现,来说明,主要说明的是

base

实现非常简单,就是直接继承 baseExecutorbase方法

// base returns the baseExecutor of an executor, don't override this method!
func (e *baseExecutor) base() *baseExecutor {
return e
}

Schema

这里实现也是直接继承了 baseExecutorSchema方法

// Schema returns the current baseExecutor's schema. If it is nil, then create and return a new one.
func (e *baseExecutor) Schema() *expression.Schema {
if e.schema == nil {
return expression.NewSchema()
}
return e.schema
}

Open

SelectionExec 对Open方法进行了 重写 , 本质上Open方法是进行了初始化操作

// Open implements the Executor Open interface.
func (e *SelectionExec) Open(ctx context.Context) error {
// 调用baseExecutor的初始化,类似super
if err := e.baseExecutor.Open(ctx); err != nil {
return err
}
// 注意这里这newFirstChunk的名字具有一定的迷惑性,实际上根据下层Executor的属性来构建chunk
e.childResult = newFirstChunk(e.children[0])
// 判断是否可以filters是否可以向量化执行
// 其实就是检查是否所有的filter都可以向量化, 只有所有filter都可以向量化,才可以进行批执行
e.batched = expression.Vectorizable(e.filters)
if e.batched {
// 如果可以进行批执行的话,构建一个bool切片作为buffer,来保存过滤器的选择情况
// 在这里初始化好了这块空间,只要之后没有发生切片的resize,那么始终使用的是这块空间
// 减轻内存分配和GC的压力
e.selected = make([]bool, 0, chunk.InitialCapacity)
}
// 注意这里仅仅是完成了iterator和chunk的绑定,此时chunk中没有数据,iterator也没有意义
e.inputIter = chunk.NewIterator4Chunk(e.childResult)
// 这里就是指向了一个空Row
e.inputRow = e.inputIter.End()
return nil
}

baseExecutor实现 如下

// Open initializes children recursively and "childrenResults" according to children's schemas.
func (e *baseExecutor) Open(ctx context.Context) error {
// 本质上就是遍历所有位于下层的Executor调用一遍Open
// 注意一点就是,位于下层的Executor会先于当前Executor被初始化
for _, child := range e.children {
err := child.Open(ctx)
if err != nil {
return err
}
}
return nil
}

Close

SelectionExec 对Close方法进行了 重写 , 本质上Close方法是进行了资源释放的作用

// Close implements plannercore.Plan Close interface.
func (e *SelectionExec) Close() error {
// 清空两个buffer
e.childResult = nil
e.selected = nil
return e.baseExecutor.Close()
}

baseExecutor实现 如下

// Close closes all executors and release all resources.
func (e *baseExecutor) Close() error {
var firstErr error
// 与Open时相似,就是直接调用一遍下层Executor,
for _, src := range e.children {
if err := src.Close(); err != nil && firstErr == nil {
firstErr = err
}
}
return firstErr
}

Next

SelectionExec 的Next方法同样进行了 重写 ,这里也是需要我们进行填充的部分,但是这段代码内外两层循环乍一看上去有一些令人费解。

// Next implements the Executor Next interface.
func (e *SelectionExec) Next(ctx context.Context, req *chunk.Chunk) error {
req.GrowAndReset(e.maxChunkSize)
if !e.batched {
return e.unBatchedNext(ctx, req)
}
/*
Exit the loop when:
1. the `req` chunk` is full.
2. there is no further results from child.
3. meets any error.
*/
for {
// Fill in the `req` util it is full or the `inputIter` is fully processed.
for ; e.inputRow != e.inputIter.End(); e.inputRow = e.inputIter.Next() {
// Your code here.
}
err := Next(ctx, e.children[0], e.childResult)
if err != nil {
return err
}
// no more data.
if e.childResult.NumRows() == 0 {
return nil
}
/* Your code here.
Process and filter the child result using `expression.VectorizedFilter`.
*/
}
}

根据课程文档的建议是要求我们通过阅读 unBatchedNext方法 来学习函数功能,该函数就是单行处理我们要实现的逻辑的单行处理版本,并且该函数与我们要实现的部分的代码结构类似,通过理解这部分代码,也能帮助我们知道批处理时大概的处理流程。

// unBatchedNext filters input rows one by one and returns once an input row is selected.
// For sql with "SETVAR" in filter and "GETVAR" in projection, for example: "SELECT @a FROM t WHERE (@a := 2) > 0",
// we have to set batch size to 1 to do the evaluation of filter and projection.
func (e *SelectionExec) unBatchedNext(ctx context.Context, chk *chunk.Chunk) error {
// 外循环, 本质上是更新下层结果集
for {
// 内循环,本质是不断迭代遍历下层结果集,直到返回一个被选中的行时,插入要返回的chunk,并
//
// 注意第一次调用Next的时,由于`e.inputRow`是一个空Row,`e.inputIter.End()`同样也会返回一个空Row
// 因此第一次调用Next的时候并不会先进入内循环的逻辑
for ; e.inputRow != e.inputIter.End(); e.inputRow = e.inputIter.Next() {
selected, _, err := expression.EvalBool(e.ctx, e.filters, e.inputRow)
if err != nil {
return err
}
if selected {
chk.AppendRow(e.inputRow)
e.inputRow = e.inputIter.Next()
return nil
}
}
/* 这里才是第一次执行时的实际开始位置 */
// Next函数的本质是将调用下层children[0]的Next方法,并将调用结果更新到childResult当中
err := Next(ctx, e.children[0], e.childResult)
if err != nil {
return err
}
// 将inputRow定位到更新后的chunk第一行
e.inputRow = e.inputIter.Begin()
// 如果已经没有数据了,那就直接返回nil
if e.childResult.NumRows() == 0 {
return nil
}
}
}

那么根据这块流程的理解,那么我们就可以大致补充出代码

// Next implements the Executor Next interface.
func (e *SelectionExec) Next(ctx context.Context, req *chunk.Chunk) error {
// 这里由Callee对结果集chunk进行重置
// 在批处理时,会返回maxChunkSize限定大小的结果集
req.GrowAndReset(e.maxChunkSize)
if !e.batched {
return e.unBatchedNext(ctx, req)
}
/*
Exit the loop when:
1. the `req` chunk` is full.
2. there is no further results from child.
3. meets any error.
*/
for {
// Fill in the `req` util it is full or the `inputIter` is fully processed.
for ; e.inputRow != e.inputIter.End(); e.inputRow = e.inputIter.Next() {
/* Your code here. */
// 根据过滤结果buffer中的数据判断当前行是否被选中,如果被选中了则添加到结果集中
if e.selected[e.inputRow.Idx()] {
req.AppendRow(e.inputRow)
if req.IsFull() { // 如果结果集被填满了,那么需要将inputRow未被检索的第一行,并返回
e.inputRow = e.inputIter.Next()
return nil
}
}
}
// 这里是调用volcano模型处在下层的子语句的Next方法, 并赋值到当前的childResult中,更新下层结果集内容
err := Next(ctx, e.children[0], e.childResult)
if err != nil {
return err
}
// no more data.
if e.childResult.NumRows() == 0 {
return nil
}
/* Your code here.
Process and filter the child result using `expression.VectorizedFilter`.
*/
// 这里主要是重复利用selected所申请的空间, 注意一定要赋值e.selected, 进行同步改变
e.inputRow = e.inputIter.Begin()
// selectd保存使用向量filters过滤后的结果
e.selected, err = expression.VectorizedFilter(e.ctx, e.filters, e.inputIter, e.selected)
if err != nil {
return nil
}
}
}

那么大家可能有一个疑问就是为什么需要使用内外两层循环来实现,这里按照我的个人理解做一定的解释

其实这里代码结构设计的非常精彩, 整个结构看起来就像是一个变速器。

外循环的作用是充分利用当前在 e.inputRow / e.inputIter / e.selected / e.childResult 中的缓存, 因为上下层的chunk可能是有一定尺寸差距的

内循环的作用是进行结果集的附加, 会有两种退出内层循环的情况

  • 一旦结果集被填充满, 就退出函数, 完成了当前的处理,当前还没处理的信息仍然保留在 e.inputRow / e.inputIter / e.selected / e.childResult 中, 下一次调用Next的时候就可以接着从 e.inputRow 开始
  • 如果遍历完了下层的结果集, 当前层的结果集仍然还没有被填满, 那么就会从调用 Next 更新下层的结果集, 然后在根据filter进行筛选, 将下层结果集中filter过滤后的结果保存到 e.selected 里面, 然后从头开始

下面完整的描述一下调用Next的实际执行流程, 因为实际的执行流程会和代码自上而下的顺序不一样, 这也是这段代码结构精妙的地方

  1. 首先要说明的是 SelectionExec 在调用 Next 之前是需要调用 Open 进行初始化的, 那么 Open 需要关注的是, Open 中进行的仅仅是状态的初始化, 并没有执行实质的计算, ( e.childResult 使用了 newFirstChunk 的时候只是进行了字段/容量/大小的初始化, 并没有进行内容填充), e.childResult 是空的, e.inputItere.inputRow 应该也是空的, 需要在后续步骤中进行初始化.
  2. 第1次调用SelectionExec.Next

    1. 注意的循环条件 e.inputRow != e.inputIter.End() 此时是不成立的, 二者都是空的Row结构体, 所以第一次调用Next的时候完全不进入内循环
    2. 调用Next将下层数据加载到 e.childResult 当中, 进行一些检查
    3. 更新 e.inputRow 使之对应 e.inputIter 的第一个数据
    4. 使用 expression.VectorizedFilter 根据 e.filters 的条件将下层结果集数据的根据过滤器的过滤结果存放到 e.selected
    5. 回到外循环开头往下执行, 那么我们就直接进入了内循环, 在 e.inputRow != e.inputIter.End() 此时已经成立了, 所以可以进入内循环
    6. 在内循环中, 需要判断结果集是否已经被填满

      • 如果没有被填满, 那么就根据筛选结果, 考虑是否将遍历到的行放到结果集中, 当遍历结束时, 步骤b开始继续往下执行
      • 如果已经被填满, 那么就直接返回. 在下层结果集中遍历的状态保存在 e.inputRow / e.inputIter 中, filter过滤的结果放在 e.selected 中, 等待下一次Next调用的时候再调用
  3. 第n次调用 SelectionExec.Next

    1. 如果上一次调用 Next 时还有下层结果集的数据没有遍历完,那么当时的遍历状态仍然保留在 e.inputRow / e.inputIter / e.selected / e.childResult 中, 那么可以直接从2.5开始,进入内循环
    2. 如果上一次调用 Next 时刚好下层结果集的数据也遍历完了,那么 e.inputRow 就会是一个空Row, 从2.1开始往下执行,也就是重新加载下层数据。

有疑问加站长微信联系

微信扫一扫,分享到朋友圈

TinySQL学习笔记之SelectionExec

微软终于Azure文件支持NFS v4.1协议

上一篇

使用Python预测缺失值

下一篇

你也可能喜欢

TinySQL学习笔记之SelectionExec

长按储存图像,分享给朋友