Spark SQL 查询执行流程详解
Spark SQL 执行流程概览
Spark SQL 的查询执行过程可分为六个核心阶段:语法解析、分析绑定、逻辑优化、物理计划生成、可执行计划准备与最终执行。整个流程由 Catalyst 框架驱动,通过一系列规则对抽象语法树(AST)进行变换。
1. 语法解析(Parsing)
用户提交的 SQL 语句首先由 ANTLR4 词法和语法解析器处理。Spark 使用位于 sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 的语法规则文件进行解析,生成初始的未解析逻辑计划(Unresolved Logical Plan)。
val logicalPlan = spark.sql("SELECT ...").queryExecution.logical
// 输出: UnresolvedRelation `table`, Filter, Aggregate 等结构
2. 分析绑定(Analysis)
此阶段通过 Analyzer 将未解析对象(如表名、列名)与元数据目录(Catalog)中的实际信息绑定,生成已解析的逻辑计划(Resolved Logical Plan)。关键步骤包括:
- 识别表与视图来源(
ResolveRelations) - 解析列引用(
ResolveReferences) - 处理函数调用与别名(
ResolveFunctions,ResolveAliases) - 检查语法完整性与类型一致性
例如:`ITEMS.name` 被替换为具体的列引用 name#1,并关联到内存表或外部数据源。
3. 逻辑优化(Optimization)
基于预定义的规则组(Batches),对逻辑计划进行性能优化,主要目标是减少计算开销与 I/O 消耗。常见优化包括:
- 谓词下推(PushDownPredicate):将过滤条件尽可能下推至数据读取层,减少中间结果大小。
- 列裁剪(ColumnPruning):仅保留查询所需字段,避免加载无关列。
- 常量折叠(ConstantFolding):提前计算表达式中可确定的值,如
a + 1替换为11。 - 算子合并(CombineFilters, CombineLimits):合并多个相邻过滤或限制操作,降低树深度。
- 广播连接优化(BroadcastJoin):当小表参与 Join 时,自动选择广播策略以提升效率。
// 原始计划:Filter (id > 1) → Join → Project
// 优化后:Join → Filter (id > 1) → Project
4. 物理计划生成(Physical Planning)
将优化后的逻辑计划转换为可执行的物理执行计划(PhysicalPlan),涉及:
- 确定数据分区方式(HashPartitioning / RangePartitioning)
- 选择合适的算子实现(如 HashAggregate、SortMergeJoin)
- 决定是否使用缓存或广播
- 插入 Shuffle 操作(Exchange)以协调分布式计算
例如:HashAggregate 用于分组聚合,BroadcastHashJoin 用于小表广播。
5. 可执行计划准备(Preparation)
调用 prepareForExecution() 方法,完成以下任务:
- 将物理计划封装为可执行的
SparkPlan实例 - 注册任务依赖关系与资源分配策略
- 初始化代码生成上下文(CodegenContext)
该阶段为后续任务调度和执行做最后准备。
6. 执行与结果输出(Execution)
通过 execute() 触发分布式任务运行,最终生成 RDD 并返回结果集。典型流程如下:
- 将物理计划划分为多个 Stage
- 每个 Stage 生成若干 Task,提交至 Executor 执行
- Task 在集群节点上运行,输出部分结果
- Driver 收集所有结果并展示(如
show())
itemSumDF.show()
// 输出:
// +----------------+--------+---+
// | name| price| c|
// +----------------+--------+---+
// |iphone 11 pro max|10500.0|10 |
// +----------------+--------+---+
调试建议
若需深入理解执行细节,建议:
- 从 Spark 官方源码仓库克隆项目并编译(支持 Scala 2.12/2.13)
- 在
org.apache.spark.examples.sql.SparkSQLExample中设置断点 - 使用 IntelliJ IDEA 启动调试模式,逐行跟踪
spark.sql()调用链 - 观察各阶段输出的
toString()计划日志,对比前后变化
推荐配置:Spark 3.4.x 以上版本,配合 Jupyter Notebook 进行交互式验证。