-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-53598][SQL] Check the existence of numParts before reading large table property #52355
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…g data source schema
cc @cloud-fan as you are the author of SPARK-33812(de234ee) |
def readLargeTableProp( | ||
props: Map[String, String], | ||
key: String, | ||
failIfNumPartsMissing: Boolean = true): Option[String] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when it's needed to be true? I think it's fine to always be more lenient, as the consequence is simply doing a schema inference, which is better than failing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before SPARK-33812, it performs true
for the DS table on schema reading, but false
for the Hive SerDe table, I'm trying to restore such behavior.
SPARK-33812 introduces another case - ColumStats histogram, when numParts
is missing, it fails here and the exception will be caught at
spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
Lines 1009 to 1032 in f2e9c68
def fromMap( | |
table: String, | |
colName: String, | |
map: Map[String, String]): Option[CatalogColumnStat] = { | |
try { | |
Some(CatalogColumnStat( | |
distinctCount = map.get(s"${colName}.${KEY_DISTINCT_COUNT}").map(v => BigInt(v.toLong)), | |
min = map.get(s"${colName}.${KEY_MIN_VALUE}"), | |
max = map.get(s"${colName}.${KEY_MAX_VALUE}"), | |
nullCount = map.get(s"${colName}.${KEY_NULL_COUNT}").map(v => BigInt(v.toLong)), | |
avgLen = map.get(s"${colName}.${KEY_AVG_LEN}").map(_.toLong), | |
maxLen = map.get(s"${colName}.${KEY_MAX_LEN}").map(_.toLong), | |
histogram = CatalogTable.readLargeTableProp(map, s"$colName.$KEY_HISTOGRAM") | |
.map(HistogramSerializer.deserialize), | |
version = map(s"${colName}.${KEY_VERSION}").toInt | |
)) | |
} catch { | |
case NonFatal(e) => | |
logWarning(log"Failed to parse column statistics for column " + | |
log"${MDC(COLUMN_NAME, colName)} in table ${MDC(RELATION_NAME, table)}", e) | |
None | |
} | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan I updated the code as you suggested, could you take another look?
Seq(true, false).foreach { isHiveTable => | ||
try { | ||
val schema = StructType(StructField("int", IntegerType) :: Nil) | ||
val hiveTableWithoutNumPartsProp = CatalogTable( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can construction of CatalogTable
be refactored to a separate method that takes 3 parameters (table name, table type and num of part) to avoid code duplication here and on line 1405?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this requires parameterizing more things, I prefer to keep it as-is unless you strongly disagree
thanks, merging to master/4.0! |
…ge table property ### What changes were proposed in this pull request? This PR proposes to fix a regression caused by SPARK-33812 (de234ee 3.2.0). We hit an error when upgrading Spark from 3.1 to 3.3, the table is a Hive SerDe Parquet table, which TBLPROPERTIES looks like malformed (not sure how this happen), the table can be read and write normally with Spark 3.1, but fails on reading with Spark 3.3. ``` -- Hive DDL CREATE EXTERNAL TABLE `foo`.`bar`( `id` bigint, ... ) PARTITIONED BY (`dt` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 'hdfs://...' TBLPROPERTIES ( ... 'spark.sql.sources.schema.partCol.0'='dt', 'transient_lastDdlTime'='1727333678') ``` ``` org.apache.spark.sql.AnalysisException: Cannot read table property 'spark.sql.sources.schema' as it's corrupted. at org.apache.spark.sql.errors.QueryCompilationErrors$.cannotReadCorruptedTablePropertyError(QueryCompilationErrors.scala:840) ~[spark-catalyst_2.12-3.3.4.104.jar:3.3.4.104] at org.apache.spark.sql.catalyst.catalog.CatalogTable$.$anonfun$readLargeTableProp$1(interface.scala:502) ~[spark-catalyst_2.12-3.3.4.104.jar:3.3.4.104] at scala.Option.orElse(Option.scala:447) ~[scala-library-2.12.18.jar:?] at org.apache.spark.sql.catalyst.catalog.CatalogTable$.readLargeTableProp(interface.scala:497) ~[spark-catalyst_2.12-3.3.4.104.jar:3.3.4.104] at org.apache.spark.sql.hive.HiveExternalCatalog.getSchemaFromTableProperties(HiveExternalCatalog.scala:839) ~[spark-hive_2.12-3.3.4.104.jar:3.3.4.104] at org.apache.spark.sql.hive.HiveExternalCatalog.restoreHiveSerdeTable(HiveExternalCatalog.scala:809) ~[spark-hive_2.12-3.3.4.104.jar:3.3.4.104] at org.apache.spark.sql.hive.HiveExternalCatalog.restoreTableMetadata(HiveExternalCatalog.scala:765) ~[spark-hive_2.12-3.3.4.104.jar:3.3.4.104] at org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$getTable$1(HiveExternalCatalog.scala:734) ~[spark-hive_2.12-3.3.4.104.jar:3.3.4.104] at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:101) ~[spark-hive_2.12-3.3.4.104.jar:3.3.4.104] at org.apache.spark.sql.hive.HiveExternalCatalog.getTable(HiveExternalCatalog.scala:734) ~[spark-hive_2.12-3.3.4.104.jar:3.3.4.104] at org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.getTable(ExternalCatalogWithListener.scala:138) ~[spark-catalyst_2.12-3.3.4.104.jar:3.3.4.104] at org.apache.spark.sql.catalyst.catalog.SessionCatalog.getTableRawMetadata(SessionCatalog.scala:544) ~[spark-catalyst_2.12-3.3.4.104.jar:3.3.4.104] at org.apache.spark.sql.catalyst.catalog.SessionCatalog.getTableMetadata(SessionCatalog.scala:529) ~[spark-catalyst_2.12-3.3.4.104.jar:3.3.4.104] ... ``` Before SPARK-33812, it skipped reading the schema from table properties if `spark.sql.sources.schema.numParts` was missing for a Hive SerDe table. ### Why are the changes needed? Restore behavior before Spark 3.2. ### Does this PR introduce _any_ user-facing change? Yes, restores behavior before Spark 3.2. ### How was this patch tested? UT is added. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #52355 from pan3793/SPARK-53598. Authored-by: Cheng Pan <chengpan@apache.org> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 4b93d4c) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
please open new PRs if you want to backport further. |
…g large table property This PR proposes to fix a regression caused by SPARK-33812 (apache@de234ee 3.2.0). We hit an error when upgrading Spark from 3.1 to 3.3, the table is a Hive SerDe Parquet table, which TBLPROPERTIES looks like malformed (not sure how this happen), the table can be read and write normally with Spark 3.1, but fails on reading with Spark 3.3. ``` -- Hive DDL CREATE EXTERNAL TABLE `foo`.`bar`( `id` bigint, ... ) PARTITIONED BY (`dt` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 'hdfs://...' TBLPROPERTIES ( ... 'spark.sql.sources.schema.partCol.0'='dt', 'transient_lastDdlTime'='1727333678') ``` ``` org.apache.spark.sql.AnalysisException: Cannot read table property 'spark.sql.sources.schema' as it's corrupted. at org.apache.spark.sql.errors.QueryCompilationErrors$.cannotReadCorruptedTablePropertyError(QueryCompilationErrors.scala:840) ~[spark-catalyst_2.12-3.3.4.104.ja 8000 r:3.3.4.104] at org.apache.spark.sql.catalyst.catalog.CatalogTable$.$anonfun$readLargeTableProp$1(interface.scala:502) ~[spark-catalyst_2.12-3.3.4.104.jar:3.3.4.104] at scala.Option.orElse(Option.scala:447) ~[scala-library-2.12.18.jar:?] at org.apache.spark.sql.catalyst.catalog.CatalogTable$.readLargeTableProp(interface.scala:497) ~[spark-catalyst_2.12-3.3.4.104.jar:3.3.4.104] at org.apache.spark.sql.hive.HiveExternalCatalog.getSchemaFromTableProperties(HiveExternalCatalog.scala:839) ~[spark-hive_2.12-3.3.4.104.jar:3.3.4.104] at org.apache.spark.sql.hive.HiveExternalCatalog.restoreHiveSerdeTable(HiveExternalCatalog.scala:809) ~[spark-hive_2.12-3.3.4.104.jar:3.3.4.104] at org.apache.spark.sql.hive.HiveExternalCatalog.restoreTableMetadata(HiveExternalCatalog.scala:765) ~[spark-hive_2.12-3.3.4.104.jar:3.3.4.104] at org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$getTable$1(HiveExternalCatalog.scala:734) ~[spark-hive_2.12-3.3.4.104.jar:3.3.4.104] at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:101) ~[spark-hive_2.12-3.3.4.104.jar:3.3.4.104] at org.apache.spark.sql.hive.HiveExternalCatalog.getTable(HiveExternalCatalog.scala:734) ~[spark-hive_2.12-3.3.4.104.jar:3.3.4.104] at org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.getTable(ExternalCatalogWithListener.scala:138) ~[spark-catalyst_2.12-3.3.4.104.jar:3.3.4.104] at org.apache.spark.sql.catalyst.catalog.SessionCatalog.getTableRawMetadata(SessionCatalog.scala:544) ~[spark-catalyst_2.12-3.3.4.104.jar:3.3.4.104] at org.apache.spark.sql.catalyst.catalog.SessionCatalog.getTableMetadata(SessionCatalog.scala:529) ~[spark-catalyst_2.12-3.3.4.104.jar:3.3.4.104] ... ``` Before SPARK-33812, it skipped reading the schema from table properties if `spark.sql.sources.schema.numParts` was missing for a Hive SerDe table. Restore behavior before Spark 3.2. Yes, restores behavior before Spark 3.2. UT is added. No. Closes apache#52355 from pan3793/SPARK-53598. Authored-by: Cheng Pan <chengpan@apache.org> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@cloud-fan thanks, I opened #52385 for 3.5 |
### What changes were proposed in this pull request? The CatalogColumnStat.readLargeTableProp is an O(N) operation. Considering a table can have a lot of table properties, this effectively becomes an O(N^2) operation, which can be very slow for tables with a lot of table properties. This PR improves the algorithmic complexity to O(N) by only constructing the large table properties if numParts exists. ### Why are the changes needed? For fixing a performance issue unintentionally introduced before. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing unit tests. A previous patch already tested the side effect of this change #52355 ### Was this patch authored or co-authored using generative AI tooling? No Closes #52374 from yeshengm/improve-read-large-prop. Authored-by: Yesheng Ma <yesheng.ma@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…g large table property Backport #52355 to branch-3.5 ### What changes were proposed in this pull request? This PR proposes to fix a regression caused by SPARK-33812 (de234ee 3.2.0). We hit an error when upgrading Spark from 3.1 to 3.3, the table is a Hive SerDe Parquet table, which TBLPROPERTIES looks like malformed (not sure how this happen), the table can be read and write normally with Spark 3.1, but fails on reading with Spark 3.3. ``` -- Hive DDL CREATE EXTERNAL TABLE `foo`.`bar`( `id` bigint, ... ) PARTITIONED BY (`dt` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 'hdfs://...' TBLPROPERTIES ( ... 'spark.sql.sources.schema.partCol.0'='dt', 'transient_lastDdlTime'='1727333678') ``` ``` org.apache.spark.sql.AnalysisException: Cannot read table property 'spark.sql.sources.schema' as it's corrupted. at org.apache.spark.sql.errors.QueryCompilationErrors$.cannotReadCorruptedTablePropertyError(QueryCompilationErrors.scala:840) ~[spark-catalyst_2.12-3.3.4.104.jar:3.3.4.104] at org.apache.spark.sql.catalyst.catalog.CatalogTable$.$anonfun$readLargeTableProp$1(interface.scala:502) ~[spark-catalyst_2.12-3.3.4.104.jar:3.3.4.104] at scala.Option.orElse(Option.scala:447) ~[scala-library-2.12.18.jar:?] at org.apache.spark.sql.catalyst.catalog.CatalogTable$.readLargeTableProp(interface.scala:497) ~[spark-catalyst_2.12-3.3.4.104.jar:3.3.4.104] at org.apache.spark.sql.hive.HiveExternalCatalog.getSchemaFromTableProperties(HiveExternalCatalog.scala:839) ~[spark-hive_2.12-3.3.4.104.jar:3.3.4.104] at org.apache.spark.sql.hive.HiveExternalCatalog.restoreHiveSerdeTable(HiveExternalCatalog.scala:809) ~[spark-hive_2.12-3.3.4.104.jar:3.3.4.104] at org.apache.spark.sql.hive.HiveExternalCatalog.restoreTableMetadata(HiveExternalCatalog.scala:765) ~[spark-hive_2.12-3.3.4.104.jar:3.3.4.104] at org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$getTable$1(HiveExternalCatalog.scala:734) ~[spark-hive_2.12-3.3.4.104.jar:3.3.4.104] at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:101) ~[spark-hive_2.12-3.3.4.104.jar:3.3.4.104] at org.apache.spark.sql.hive.HiveExternalCatalog.getTable(HiveExternalCatalog.scala:734) ~[spark-hive_2.12-3.3.4.104.jar:3.3.4.104] at org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.getTable(ExternalCatalogWithListener.scala:138) ~[spark-catalyst_2.12-3.3.4.104.jar:3.3.4.104] at org.apache.spark.sql.catalyst.catalog.SessionCatalog.getTableRawMetadata(SessionCatalog.scala:544) ~[spark-catalyst_2.12-3.3.4.104.jar:3.3.4.104] at org.apache.spark.sql.catalyst.catalog.SessionCatalog.getTableMetadata(SessionCatalog.scala:529) ~[spark-catalyst_2.12-3.3.4.104.jar:3.3.4.104] ... ``` Before SPARK-33812, it skipped reading the schema from table properties if `spark.sql.sources.schema.numParts` was missing for a Hive SerDe table. ### Why are the changes needed? Restore behavior before Spark 3.2. ### Does this PR introduce _any_ user-facing change? Yes, restores behavior before Spark 3.2. ### How was this patch tested? UT is added. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #52385 from pan3793/SPARK-53598-3.5. Authored-by: Cheng Pan <chengpan@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
What changes were proposed in this pull request?
This PR proposes to fix a regression caused by SPARK-33812 (de234ee 3.2.0).
We hit an error when upgrading Spark from 3.1 to 3.3, the table is a Hive SerDe Parquet table, which TBLPROPERTIES looks like malformed (not sure how this happen), the table can be read and write normally with Spark 3.1, but fails on reading with Spark 3.3.
Before SPARK-33812, it skipped reading the schema from table properties if
spark.sql.sources.schema.numParts
was missing for a Hive SerDe table.Why are the changes needed?
Restore behavior before Spark 3.2.
Does this PR introduce any user-facing change?
Yes, restores behavior before Spark 3.2.
How was this patch tested?
UT is added.
Was this patch authored or co-authored using generative AI tooling?
No.