登录
首页 >  文章 >  python教程

PySpark 3.5+ 新特性详解

时间:2026-03-31 08:05:11 492浏览 收藏

PySpark 3.5+ 默认启用自适应查询执行(AQE)本意是提升性能,但在小数据量或分区不均的典型场景下,反而因缺乏运行时统计导致重分区误判、策略优化失当,显著增加调度开销与延迟——尤其当执行如 `df1.join(df2, "id").filter(...)` 且 `df2` 极小时,JOIN 性能不升反降;本文深入剖析这一“默认变慢”的根源,并提供可落地的调优建议与配置规避方案。

Python spark 的 pyspark 3.5+ 新特性

PySpark 3.5+ 的 spark.sql.adaptive.enabled 默认开启后,为什么 JOIN 变慢了?

因为自适应查询执行(AQE)在小数据量或非均匀分区场景下反而引入调度开销和重分区判断延迟。它默认启用后,会自动合并小任务、动态优化 Join 策略、调整 shuffle 分区数——但这些决策依赖运行时统计,首次执行无历史信息,容易误判。

  • 若你的作业多为 df1.join(df2, "id").filter(...)df2 很小(hint("broadcast")
  • 检查是否触发了 CoalescePartitions:用 explain(mode="extended") 看物理计划里有没有 AdaptiveSparkPlan 块;若有,再看子节点是否出现意外的 Exchange
  • 临时关闭:设 spark.conf.set("spark.sql.adaptive.enabled", "false") 对比耗时;长期建议保留,但配合 spark.sql.adaptive.coalescePartitions.enabled 等细粒度开关控制

PySpark 3.5+ 中 pandas_udf 被弃用,该用 scalar Pandas function 还是 vectorized UDF

两者本质相同,都是基于 Arrow 的向量化函数,但 API 和语义有关键区别:前者是推荐路径,后者是旧名残留;真正要换的是调用方式和类型声明。

  • 必须改写 @pandas_udf(returnType=StringType())@pandas_function(returnType=StringType()),否则运行时报 AttributeError: module 'pyspark.sql.functions' has no attribute 'pandas_udf'
  • 输入不再是单列 pd.Series,而是整个批次的 pd.DataFrame(即使只有一列),需用 df.iloc[:, 0] 显式取列,否则易出 KeyError
  • 性能上无差异,但新 API 强制要求显式声明 returnType,且不支持 GROUPED_AGG 模式——聚合场景得用 groupby().applyInPandas()

PySpark 3.5+ 读 Parquet 时 mergeSchema 行为变了,字段缺失直接报错?

是的。3.5+ 默认启用 spark.sql.parquet.mergeSchema,但底层改用更严格的 schema 合并逻辑:当某文件缺失非 nullable 字段时,不再静默补 null,而是抛 org.apache.spark.sql.AnalysisException: Cannot resolve column name

  • 常见于增量写入:上游用不同 schema 写了多个目录,比如一批含 user_id,另一批没写,3.5+ 读父目录就会失败
  • 兼容做法:显式关掉合并,用 spark.read.option("mergeSchema", "false").parquet(...),再手动 union 或用 schema 参数指定统一 schema
  • 更健壮的做法是提前用 spark.read.parquet(...).schema 扫描所有子目录推断一次,存为 JSON,后续读取时传入 schema=StructType.fromJson(...)

spark.sql.files.maxPartitionBytes 在 3.5+ 影响比以前更大,为什么?

因为 3.5+ 把这个参数从“仅影响 text/CSV”扩展到所有文件源(包括 Parquet、ORC),且与新的 FileSourceScanExec 执行器深度耦合,直接影响 task 划分粒度和内存压力。

  • 默认值从 128MB 降到 64MB,导致小文件多的作业 task 数暴增,shuffle 压力上升;若集群 executor 内存不足,容易 OOM
  • 调大前先确认:用 df.explain("formatted") 查看 InputPartitions 数量和平均大小;若大量 InputPartition 小于 10MB,说明切太碎
  • 安全调整范围:64MB ~ 256MB;超过 512MB 需同步调高 spark.sql.files.openCostInBytes,否则 Spark 会误判“打开文件代价高”,继续切小

最常被忽略的是:这个参数和 spark.sql.adaptive.coalescePartitions.enabled 是联动的——关了 AQE 的分区合并,又没调 maxPartitionBytes,就等于把小文件问题硬扛在 stage 里了。

以上就是《PySpark 3.5+ 新特性详解》的详细内容,更多关于的资料请关注golang学习网公众号!

资料下载
相关阅读
更多>
最新阅读
更多>
课程推荐
更多>