8000 [SPARK-53598][SQL] Check the existence of numParts before reading lar… · apache/spark@4b93d4c · GitHub
[go: up one dir, main page]

Skip to content

Commit 4b93d4c

Browse files
pan3793cloud-fan
authored andcommitted
[SPARK-53598][SQL] Check the existence of numParts before reading large table property
### What changes were proposed in this pull request? This PR proposes to fix a regression caused by SPARK-33812 (de234ee 3.2.0). We hit an error when upgrading Spark from 3.1 to 3.3, the table is a Hive SerDe Parquet table, which TBLPROPERTIES looks like malformed (not sure how this happen), the table can be read and write normally with Spark 3.1, but fails on reading with Spark 3.3. ``` -- Hive DDL CREATE EXTERNAL TABLE `foo`.`bar`( `id` bigint, ... ) PARTITIONED BY (`dt` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 'hdfs://...' TBLPROPERTIES ( ... 'spark.sql.sources.schema.partCol.0'='dt', 'transient_lastDdlTime'='1727333678') ``` ``` org.apache.spark.sql.AnalysisException: Cannot read table property 'spark.sql.sources.schema' as it's corrupted. at org.apache.spark.sql.errors.QueryCompilationErrors$.cannotReadCorruptedTablePropertyError(QueryCompilationErrors.scala:840) ~[spark-catalyst_2.12-3.3.4.104.jar:3.3.4.104] at org.apache.spark.sql.catalyst.catalog.CatalogTable$.$anonfun$readLargeTableProp$1(interface.scala:502) ~[spark-catalyst_2.12-3.3.4.104.jar:3.3.4.104] at scala.Option.orElse(Option.scala:447) ~[scala-library-2.12.18.jar:?] at org.apache.spark.sql.catalyst.catalog.CatalogTable$.readLargeTableProp(interface.scala:497) ~[spark-catalyst_2.12-3.3.4.104.jar:3.3.4.104] at org.apache.spark.sql.hive.HiveExternalCatalog.getSchemaFromTableProperties(HiveExternalCatalog.scala:839) ~[spark-hive_2.12-3.3.4.104.jar:3.3.4.104] at org.apache.spark.sql.hive.HiveExternalCatalog.restoreHiveSerdeTable(HiveExternalCatalog.scala:809) ~[spark-hive_2.12-3.3.4.104.jar:3.3.4.104] at org.apache.spark.sql.hive.HiveExternalCatalog.restoreTableMetadata(HiveExternalCatalog.scala:765) ~[spark-hive_2.12-3.3.4.104.jar:3.3.4.104] at org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$getTable$1(HiveExternalCatalog.scala:734) ~[spark-hive_2.12-3.3.4.104.jar:3.3.4.104] at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:101) ~[spark-hive_2.12-3.3.4.104.jar:3.3.4.104] at org.apache.spark.sql.hive.HiveExternalCatalog.getTable(HiveExternalCatalog.scala:734) ~[spark-hive_2.12-3.3.4.104.jar:3.3.4.104] at org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.getTable(ExternalCatalogWithListener.scala:138) ~[spark-catalyst_2.12-3.3.4.104.jar:3.3.4.104] at org.apache.spark.sql.catalyst.catalog.SessionCatalog.getTableRawMetadata(SessionCatalog.scala:544) ~[spark-catalyst_2.12-3.3.4.104.jar:3.3.4.104] at org.apache.spark.sql.catalyst.catalog.SessionCatalog.getTableMetadata(SessionCatalog.scala:529) ~[spark-catalyst_2.12-3.3.4.104.jar:3.3.4.104] ... ``` Before SPARK-33812, it skipped reading the schema from table properties if `spark.sql.sources.schema.numParts` was missing for a Hive SerDe table. ### Why are the changes needed? Restore behavior before Spark 3.2. ### Does this PR introduce _any_ user-facing change? Yes, restores behavior before Spark 3.2. ### How was this patch tested? UT is added. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #52355 from pan3793/SPARK-53598. Authored-by: Cheng Pan <chengpan@apache.org> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 1795306 commit 4b93d4c

File tree

2 files changed

+68
-52
lines changed

2 files changed

+68
-52
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -767,7 +767,7 @@ object CatalogTable {
767767
props.get(key).orElse {
768768
if (props.exists { case (mapKey, _) => mapKey.startsWith(key) }) {
769769
props.get(s"$key.numParts") match {
770-
case None => throw QueryCompilationErrors.insufficientTablePropertyError(key)
770+
case None => None
771771
case Some(numParts) =>
772772
val parts = (0 until numParts.toInt).map { index =>
773773
val keyPart = s"$key.part.$index"

sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala

Lines changed: 67 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -1378,62 +1378,78 @@ class MetastoreDataSourcesSuite extends QueryTest
13781378
}
13791379

13801380
test("read table with corrupted schema") {
1381-
try {
1382-
val schema = StructType(StructField("int", IntegerType) :: Nil)
1383-
val hiveTableWithoutNumPartsProp = CatalogTable(
1384-
identifier = TableIdentifier("t", Some("default")),
1385-
tableType = CatalogTableType.MANAGED,
1386-
schema = HiveExternalCatalog.EMPTY_DATA_SCHEMA,
1387-
provider = Some("json"),
1388-
storage = CatalogStorageFormat.empty,
1389-
properties = Map(
1390-
DATASOURCE_PROVIDER -> "json",
1391-
DATASOURCE_SCHEMA_PART_PREFIX + 0 -> schema.json))
1392-
1393-
hiveClient.createTable(hiveTableWithoutNumPartsProp, ignoreIfExists = false)
1394-
1395-
checkError(
1396-
exception = intercept[AnalysisException] {
1397-
sharedState.externalCatalog.getTable("default", "t")
1398-
},
1399-
condition = "INSUFFICIENT_TABLE_PROPERTY.MISSING_KEY",
1400-
parameters = Map("key" -> toSQLConf("spark.sql.sources.schema"))
1401-
)
1402-
1403-
val hiveTableWithNumPartsProp = CatalogTable(
1404-
identifier = TableIdentifier("t2", Some("default")),
1405-
tableType = CatalogTableType.MANAGED,
1406-
schema = HiveExternalCatalog.EMPTY_DATA_SCHEMA,
1407-
provider = Some("json"),
1408-
storage = CatalogStorageFormat.empty,
1409-
properties = Map(
1410-
DATASOURCE_PROVIDER -> "json",
1411-
DATASOURCE_SCHEMA_PREFIX + "numParts" -> "3",
1412-
DATASOURCE_SCHEMA_PART_PREFIX + 0 -> schema.json))
1413-
1414-
hiveClient.createTable(hiveTableWithNumPartsProp, ignoreIfExists = false)
1381+
Seq(true, false).foreach { isHiveTable =>
1382+
try {
1383+
val schema = StructType(StructField("int", IntegerType) :: Nil)
1384+
val hiveTableWithoutNumPartsProp = CatalogTable(
1385+
identifier = TableIdentifier("t", Some("default")),
1386+
tableType = CatalogTableType.MANAGED,
1387+
schema = HiveExternalCatalog.EMPTY_DATA_SCHEMA,
1388+
provider = if (isHiveTable) None else Some("json"),
1389+
storage = CatalogStorageFormat.empty,
1390+
properties = Map(
1391+
DATASOURCE_SCHEMA_PART_PREFIX + 0 -> schema.json) ++ {
1392+
if (isHiveTable) {
1393+
Map.empty
1394+
} else {
1395+
Map(DATASOURCE_PROVIDER -> "json")
1396+
}
1397+
})
14151398

1416-
checkError(
1417-
exception = intercept[AnalysisException] {
1418-
sharedState.externalCatalog.getTable("default", "t2")
1419-
},
1420-
condition = "INSUFFICIENT_TABLE_PROPERTY.MISSING_KEY_PART",
1421-
parameters = Map(
1422-
"key" -> toSQLConf("spark.sql.sources.schema.part.1"),
1423-
"totalAmountOfParts" -> "3")
1424-
)
1399+
hiveClient.createTable(hiveTableWithoutNumPartsProp, ignoreIfExists = false)
14251400

1426-
withDebugMode {
14271401
val tableMeta = sharedState.externalCatalog.getTable("default", "t")
14281402
assert(tableMeta.identifier == TableIdentifier("t", Some("default")))
1429-
assert(tableMeta.properties(DATASOURCE_PROVIDER) == "json")
1430-
val tableMeta2 = sharedState.externalCatalog.getTable("default", "t2")
1431-
assert(tableMeta2.identifier == TableIdentifier("t2", Some("default")))
1432-
assert(tableMeta2.properties(DATASOURCE_PROVIDER) == "json")
1403+
assert(!tableMeta.properties.contains(DATASOURCE_PROVIDER))
1404+
1405+
val hiveTableWithNumPartsProp = CatalogTable(
1406+
identifier = TableIdentifier("t2", Some("default")),
1407+
tableType = CatalogTableType.MANAGED,
1408+
schema = HiveExternalCatalog.EMPTY_DATA_SCHEMA,
1409+
provider = if (isHiveTable) None else Some("json"),
1410+
storage = CatalogStorageFormat.empty,
1411+
properties = Map(
1412+
DATASOURCE_SCHEMA_PREFIX + "numParts" -> "3",
1413+
DATASOURCE_SCHEMA_PART_PREFIX + 0 -> schema.json) ++ {
1414+
if (isHiveTable) {
1415+
Map.empty
1416+
} else {
1417+
Map(DATASOURCE_PROVIDER -> "json")
1418+
}
1419+
})
1420+
1421+
hiveClient.createTable(hiveTableWithNumPartsProp, ignoreIfExists = false)
1422+
1423+
checkError(
1424+
exception = intercept[AnalysisException] {
1425+
sharedState.externalCatalog.getTable("default", "t2")
1426+
},
1427+
condition = "INSUFFICIENT_TABLE_PROPERTY.MISSING_KEY_PART",
1428+
parameters = Map(
1429+
"key" -> toSQLConf("spark.sql.sources.schema.part.1"),
1430+
"totalAmountOfParts" -> "3")
1431+
)
1432+
1433+
withDebugMode {
1434+
val tableMeta = sharedState.externalCatalog.getTable("default", "t")
1435+
assert(tableMeta.identifier == TableIdentifier("t", Some("default")))
1436+
if (isHiveTable) {
1437+
assert(!tableMeta.properties.contains(DATASOURCE_PROVIDER))
1438+
} else {
1439+
assert(tableMeta.properties(DATASOURCE_PROVIDER) == "json")
1440+
}
1441+
val tableMeta2 = sharedState.externalCatalog.getTable("default", "t2")
1442+
assert(tableMeta2.identifier == TableIdentifier("t2", Some("default")))
1443+
if (isHiveTable) {
1444+
assert(!tableMeta2.properties.contains(DATASOURCE_PROVIDER))
1445+
} else {
1446+
assert(tableMeta2.properties(DATASOURCE_PROVIDER) == "json")
1447+
}
1448+
}
1449+
} finally {
1450+
hiveClient.dropTable("default", "t", ignoreIfNotExists = true, purge = true)
1451+
hiveClient.dropTable("default", "t2", ignoreIfNotExists = true, purge = true)
14331452
}
1434-
} finally {
1435-
hiveClient.dropTable("default", "t", ignoreIfNotExists = true, purge = true)
1436-
hiveClient.dropTable("default", "t2", ignoreIfNotExists = true, purge = true)
14371453
}
14381454
}
14391455

0 commit comments

Comments
 (0)
0