图中左边是正常扫描查询计划,右边是加上Runtime Filter(Dynamic Filter)之后的扫描计划,可以看到probe端在Join之前(Scan时)提前过滤掉数据。
但是实现层面的困难在于如何将Runtime Filter(Dynamic Filter) Builder端的数据转发给probe端,因为这些算子可能在不同的节点上运行。
一般有如下几种设计方案:
- 协调器(Remote模式)
- Local模式
2.Impala VS Presto
2.1 Impala
Impala采用了这两种方式、Presto采用了Local模式。
Impala Remote模式
Impala local表示生成的RF不需要通过网络传输就可以直接应用,典型的情况时BROADCAST HASH JOIN的时候,JOIN和左表的HDFSTableScan是在一个Fragment中实现的(在一个线程中),由于每一个节点上运行的JOIN都会获取到所有的右表数据,因此都能够build出完整的基于右表数据的RF信息,然后直接将这个信息交给左表的Scan算子,不需要经过任何的网络传输。
RuntimeFilter实现层面是采用Bloom Filter,上图中会每个结点从HDFS读取数据,然后传到JoinNode,最后都到了协调者,统一merge后进行分发处理。
2.2 Presto
presto local模式
Presto 的Dynamic Filter包含 Partition Pruning(分区表) 以及 Row filtering(非分区表),依赖于broadcast join,builder端比probe端的表小得多。在这种情况下,probe端扫描和join算子在同一个进程中运行——因此它们之间的消息传递变得更加简单。
presto将生成的Dynamic Filter作为 TupleDomain 公开给Probe端的 PageSourceProvider。
TPC-DS运行结果显示DF在部分query上有显著受益,cost-based optimizer (CBO) 。
Presto的实现原理:
- DynamicFilter
- DynamicFilterSource
DynamicFilter 代表计划的一部分,一旦过滤器数据准备好,它将在运行时进行实际过滤。
DynamicFilterSource 负责构建运行时谓词数据(例如布隆过滤器)并在准备好时将其传递给 DynamicFilter。除了构建 Bloom 过滤器 DynamicFilterSource 节点外,它还作为传递节点将接收到的数据转发到 Join 节点。
代码实现角度:DynamicFilterSource算子作为一个简单的“pass-through”管道,同时保存输入的页信息。收集的页面值用于创建RunTime Filter约束(用于内部连接中的probe端表扫描)。注意该算子仅支持小表builder端页面(使用“广播”连接时应该是这种情况)。
下图中红色箭头表示发送谓词(例如布隆过滤器)时的通信。这里可以使用标准的 Presto 数据通信方式(Pages over Exchanges)将数据从 DFS 传递到 DF。实现另一个“元数据”协议似乎过于复杂。
如果将此连接公开为直接的父子关系,因为这将导致计划不再是树而是 DAG。这将破坏(或至少使)通过访问者的当前遍历方法。无论如何,当在优化期间围绕计划树移动一个或另一个时,需要保持 DFS 到 DF 的关系。
因此,最终实现手段是提供一个DynamicFilterSource算子作为通信管道。
https://varada.io/blog/presto/dynamic-filtering-for-highly-selective-join-optimization/
https://github.com/prestodb/presto/pull/9453/commits