8000 temp · apache/spark@1679aed · GitHub
[go: up one dir, main page]

Skip to content

Commit 1679aed

Browse files
committed
temp
1 parent 3159416 commit 1679aed

File tree

2 files changed

+153
-28
lines changed

2 files changed

+153
-28
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ApplyDefaultCollationToStringType.scala

Lines changed: 87 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,13 @@ package org.apache.spark.sql.catalyst.analysis
2020
import org.apache.spark.sql.catalyst.expressions.{Cast, DefaultStringProducingExpression, Expression, Literal, SubqueryExpression}
2121
import org.apache.spark.sql.catalyst.plans.logical.{AddColumns, AlterColumns, AlterColumnSpec, AlterViewAs, ColumnDefinition, CreateTable, CreateTempView, CreateView, LogicalPlan, QualifiedColType, ReplaceColumns, ReplaceTable, TableSpec, V2CreateTablePlan}
2222
import org.apache.spark.sql.catalyst.rules.Rule
23-
import org.apache.spark.sql.connector.catalog.{SupportsNamespaces, TableCatalog}
23+
import org.apache.spark.sql.catalyst.trees.CurrentOrigin
24+
import org.apache.spark.sql.catalyst.util.CharVarcharUtils.CHAR_VARCHAR_TYPE_STRING_METADATA_KEY
25+
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, SupportsNamespaces, Table, TableCatalog}
2426
import org.apache.spark.sql.connector.catalog.SupportsNamespaces.PROP_COLLATION
25-
import org.apache.spark.sql.types.{DataType, StringType}
27+
import org.apache.spark.sql.errors.DataTypeErrors.toSQLId
28+
import org.apache.spark.sql.errors.QueryCompilationErrors
29+
import org.apache.spark.sql.types.{CharType, DataType, StringType, StructField, VarcharType}
2630

2731
/**
2832
* Resolves string types in logical plans by assigning them the appropriate collation. The
@@ -33,12 +37,13 @@ import org.apache.spark.sql.types.{DataType, StringType}
3337
*/
3438
object ApplyDefaultCollationToStringType extends Rule[LogicalPlan] {
3539
def apply(plan: LogicalPlan): LogicalPlan = {
36-
val planWithResolvedDefaultCollation = resolveDefaultCollation(plan)
40+
val preprocessedPlan = Seq(resolveDefaultCollation _, resolveAlterColumnsDataType _)
41+
.foldLeft(plan) { case (currentPlan, resolver) => resolver(currentPlan) }
3742

38-
fetchDefaultCollation(planWithResolvedDefaultCollation) match {
43+
fetchDefaultCollation(preprocessedPlan) match {
3944
case Some(collation) =>
40-
transform(planWithResolvedDefaultCollation, StringType(collation))
41-
case None => planWithResolvedDefaultCollation
45+
transform(preprocessedPlan, StringType(collation))
46+
case None => preprocessedPlan
4247
}
4348
}
4449

@@ -63,10 +68,14 @@ object ApplyDefaultCollationToStringType extends Rule[LogicalPlan] {
6368
case ReplaceTable(_: ResolvedIdentifier, _, _, tableSpec: TableSpec, _) =>
6469
tableSpec.collation
6570

66-
// In `transform` we handle these 3 ALTER TABLE commands.
67-
case cmd: AddColumns => getCollationFromTableProps(cmd.table)
68-
case cmd: ReplaceColumns => getCollationFromTableProps(cmd.table)
69-
case cmd: AlterColumns => getCollationFromTableProps(cmd.table)
71+
case AddColumns(resolvedTable: ResolvedTable, _) =>
72+
Option(resolvedTable.table.properties.get(TableCatalog.PROP_COLLATION))
73+
74+
case ReplaceColumns(resolvedTable: ResolvedTable, _) =>
75+
Option(resolvedTable.table.properties.get(TableCatalog.PROP_COLLATION))
76+
77+
case AlterColumns(resolvedTable: ResolvedTable, _) =>
78+
Option(resolvedTable.table.properties.get(TableCatalog.PROP_COLLATION))
7079

7180
case alterViewAs: AlterViewAs =>
7281
alterViewAs.child match {
@@ -85,15 +94,6 @@ object ApplyDefaultCollationToStringType extends Rule[LogicalPlan] {
8594
}
8695
}
8796

88-
private def getCollationFromTableProps(t: LogicalPlan): Option[String] = {
89-
t match {
90-
case resolvedTbl: ResolvedTable
91-
if resolvedTbl.table.properties.containsKey(TableCatalog.PROP_COLLATION) =>
92-
Some(resolvedTbl.table.properties.get(TableCatalog.PROP_COLLATION))
93-
case _ => None
94-
}
95-
}
96-
9797
/**
9898
* Determines the default collation for an object in the following order:
9999
* 1. Use the object's explicitly defined default collation, if available.
@@ -168,22 +168,86 @@ object ApplyDefaultCollationToStringType extends Rule[LogicalPlan] {
168168
case p if isCreateOrAlterPlan(p) || AnalysisContext.get.collation.isDefined =>
169169
transformPlan(p, newType)
170170

171-
case addCols: AddColumns =>
171+
case addCols@AddColumns(_: ResolvedTable, _) =>
172172
addCols.copy(column 341A sToAdd = replaceColumnTypes(addCols.columnsToAdd, newType))
173173

174-
case replaceCols: ReplaceColumns =>
174+
case replaceCols@ReplaceColumns(_: ResolvedTable, _) =>
175175
replaceCols.copy(columnsToAdd = replaceColumnTypes(replaceCols.columnsToAdd, newType))
176176

177-
case a @ AlterColumns(_, specs: Seq[AlterColumnSpec]) =>
177+
case a @ AlterColumns(ResolvedTable(_, _, table: Table, _), specs: Seq[AlterColumnSpec]) =>
178178
val newSpecs = specs.map {
179-
case spec if spec.newDataType.isDefined && hasDefaultStringType(spec.newDataType.get) =>
179+
case spec if shouldApplyDefaultCollationToAlterColumn(spec, table) =>
180180
spec.copy(newDataType = Some(replaceDefaultStringType(spec.newDataType.get, newType)))
181181
case col => col
182182
}
183183
a.copy(specs = newSpecs)
184184
}
185185
}
186186

187+
/**
188+
* The column type should not be changed if the original column type is [[StringType]] and the new
189+
* type is the default [[StringType]] (i.e., [[StringType]] without an explicit collation).
190+
*
191+
* Query Example:
192+
* {{{
193+
* CREATE TABLE t (c1 STRING COLLATE UNICODE)
194+
* ALTER TABLE t ALTER COLUMN c1 TYPE STRING -- c1 will remain STRING COLLATE UNICODE
195+
* }}}
196+
*/
197+
private def resolveAlterColumnsDataType(plan: LogicalPlan): LogicalPlan = {
198+
plan match {
199+
case alterColumns@AlterColumns(
200+
ResolvedTable(_, _, table: Table, _), specs: Seq[AlterColumnSpec]) =>
201+
val resolvedSpecs = specs.map { spec =>
202+
if (spec.newDataType.isDefined && isStringTypeColumn(spec.column, table) &&
203+
isDefaultStringType(spec.newDataType.get)) {
204+
spec.copy(newDataType = None)
205+
} else {
206+
spec
207+
}
208+
}
209+
val newAlterColumns = CurrentOrigin.withOrigin(alterColumns.origin) {
210+
alterColumns.copy(specs = resolvedSpecs)
211+
}
212+
newAlterColumns.copyTagsFrom(alterColumns)
213+
newAlterColumns
214+
case _ =>
215+
plan
216+
}
217+
}
218+
219+
private def shouldApplyDefaultCollationToAlterColumn(
220+
alterColumnSpec: AlterColumnSpec, table: Table): Boolean = {
221+
alterColumnSpec.newDataType.isDefined &&
222+
// Applies the default collation only if the original column's type is not StringType.
223+
!isStringTypeColumn(alterColumnSpec.column, table) &&
224+
hasDefaultStringType(alterColumnSpec.newDataType.get)
225+
}
226+
227+
/**
228+
* Checks whether the column's [[DataType]] is [[StringType]] in the given table. Throws an error
229+
* if the column is not found.
230+
*/
231+
private def isStringTypeColumn(fieldName: FieldName, table: Table): Boolean = {
232+
CatalogV2Util.v2ColumnsToStructType(table.columns())
233+
.findNestedField(fieldName.name, includeCollections = true, resolver = conf.resolver)
234+
.map {
235+
case (_, StructField(_, _: CharType, _, _)) =>
236+
false
237+
case (_, StructField(_, _: VarcharType, _, _)) =>
238+
false
239+
case (_, StructField(_, _: StringType, _, metadata))
240+
if !metadata.contains(CHAR_VARCHAR_TYPE_STRING_METADATA_KEY) =>
241+
true
242+
case (_, _) =>
243+
false
244+
}
245+
.getOrElse {
246+
throw QueryCompilationErrors.unresolvedColumnError(
247+
toSQLId(fieldName.name), table.columns().map(_.name))
248+
}
249+
}
250+
187251
/**
188252
* Transforms the given plan, by transforming all expressions in its operators to use the given
189253
* new type instead of the default string type.

sql/core/src/test/scala/org/apache/spark/sql/collation/DefaultCollationTestSuite.scala

Lines changed: 66 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -100,8 +100,12 @@ abstract class DefaultCollationTestSuite extends QueryTest with SharedSparkSessi
100100
sql(s"ALTER TABLE $testTable ALTER COLUMN c2 TYPE STRING COLLATE UNICODE")
101101
assertTableColumnCollation(testTable, "c2", "UNICODE")
102102

103+
// When using ALTER TABLE ALTER COLUMN TYPE, the column should inherit the table's collation
104+
// only if it wasn't a string column before. If the column was already a string, and we're
105+
// just changing its type to string (without explicit collation) again, keep the original
106+
// collation.
103107
sql(s"ALTER TABLE $testTable ALTER COLUMN c2 TYPE STRING")
104-
assertTableColumnCollation(testTable, "c2", "UTF8_BINARY")
108+
assertTableColumnCollation(testTable, "c2", "UNICODE")
105109
}
106110
}
107111

@@ -148,9 +152,26 @@ abstract class DefaultCollationTestSuite extends QueryTest with SharedSparkSessi
148152
sql(s"ALTER TABLE $testTable ALTER COLUMN c1 TYPE STRING COLLATE UNICODE_CI")
149153
assertTableColumnCollation(testTable, "c1", "UNICODE_CI")
150154

155+
// When using ALTER TABLE ALTER COLUMN TYPE, the column should inherit the table's collation
156+
// only if it wasn't a string column before. If the column was already a string, and we're
157+
// just changing its type to string (without explicit collation) again, keep the original
158+
// collation.
159+
sql(s"ALTER TABLE $testTable ALTER COLUMN c1 TYPE STRING")
160+
assertTableColumnCollation(testTable, "c1", "UNICODE_CI")
161+
151162
// alter table add columns with explicit collation, check collation for each column
152163
sql(s"ALTER TABLE $testTable ADD COLUMN c7 STRING COLLATE SR_CI_AI")
153164
sql(s"ALTER TABLE $testTable ADD COLUMN c8 STRING COLLATE UTF8_BINARY")
165+
assertTableColumnCollation(testTable, "c7", "SR_CI_AI")
166+
assertTableColumnCollation(testTable, "c8", "UTF8_BINARY")
167+
168+
// When using ALTER TABLE ALTER COLUMN TYPE, the column should inherit the table's collation
169+
// only if it wasn't a string column before. If the column was already a string, and we're
170+
// just changing its type to string (without explicit collation) again, keep the original
171+
// collation.
172+
sql(s"ALTER TABLE $testTable ALTER COLUMN c8 TYPE STRING")
173+
assertTableColumnCollation(testTable, "c8", "UTF8_BINARY")
174+
154175
assertTableColumnCollation(testTable, "c1", "UNICODE_CI")
155176
assertTableColumnCollation(testTable, "c2", "SR")
156177
assertTableColumnCollation(testTable, "c3", "UTF8_BINARY")
@@ -162,6 +183,24 @@ abstract class DefaultCollationTestSuite extends QueryTest with SharedSparkSessi
162183
}
163184
}
164185

186+
test("Alter table alter column type with default collation") {
187+
// When using ALTER TABLE ALTER COLUMN TYPE, the column should inherit the table's collation
188+
// only if it wasn't a string column before. If the column was already a string, and we're
189+
// just changing its type to string (without explicit collation) again, keep the original
190+
// collation.
191+
withTable(testTable) {
192+
sql(s"CREATE TABLE $testTable (c1 STRING, c2 STRING COLLATE UTF8_LCASE, c3 STRING)" +
193+
s" DEFAULT COLLATION UTF8_LCASE")
194+
sql(s"ALTER TABLE $testTable ALTER COLUMN c1 TYPE STRING")
195+
sql(s"ALTER TABLE $testTable ALTER COLUMN c2 TYPE STRING")
196+
sql(s"ALTER TABLE $testTable ALTER COLUMN c3 TYPE STRING COLLATE UNICODE")
197+
198+
assertTableColumnCollation(testTable, "c1", "UTF8_LCASE")
199+
assertTableColumnCollation(testTable, "c2", "UTF8_LCASE")
200+
assertTableColumnCollation(testTable, "c3", "UNICODE")
201+
}
202+
}
203+
165204
schemaAndObjectCollationPairs.foreach {
166205
case (schemaDefaultCollation, tableDefaultCollation) =>
167206
test(
@@ -221,7 +260,7 @@ abstract class DefaultCollationTestSuite extends QueryTest with SharedSparkSessi
221260
sql(s"ALTER SCHEMA $testSchema DEFAULT COLLATION $schemaNewCollation")
222261

223262
// Altering schema default collation should not affect existing objects.
224-
addAndAlterColumns(tableDefaultCollation = tableDefaultCollation)
263+
addAndAlterColumns(c2Collation = "SR_AI", tableDefaultCollation = tableDefaultCollation)
225264
}
226265

227266
withTable(testTable) {
@@ -413,12 +452,12 @@ abstract class DefaultCollationTestSuite extends QueryTest with SharedSparkSessi
413452
sql(s"CREATE TABLE $testTable (c1 STRING, c2 STRING COLLATE SR_AI) " +
414453
s"$tableDefaultCollationClause")
415454

416-
addAndAlterColumns(tableDefaultCollation = resolvedDefaultCollation)
455+
addAndAlterColumns(c2Collation = "SR_AI", tableDefaultCollation = resolvedDefaultCollation)
417456
}
418457
}
419458
}
420459

421-
private def addAndAlterColumns(tableDefaultCollation: String): Unit = {
460+
private def addAndAlterColumns(c2Collation: String, tableDefaultCollation: String): Unit = {
422461
// ADD COLUMN
423462
sql(s"ALTER TABLE $testTable ADD COLUMN c3 STRING")
424463
sql(s"ALTER TABLE $testTable ADD COLUMN c4 STRING COLLATE SR_AI")
@@ -432,7 +471,7 @@ abstract class DefaultCollationTestSuite extends QueryTest with SharedSparkSessi
432471
sql(s"ALTER TABLE $testTable ALTER COLUMN c2 TYPE STRING")
433472
sql(s"ALTER TABLE $testTable ALTER COLUMN c3 TYPE STRING COLLATE UTF8_BINARY")
434473
assertTableColumnCollation(testTable, "c1", "UNICODE")
435-
assertTableColumnCollation(testTable, "c2", tableDefaultCollation)
474+
assertTableColumnCollation(testTable, "c2", c2Collation)
436475
assertTableColumnCollation(testTable, "c3", "UTF8_BINARY")
437476
}
438477
}
@@ -806,6 +845,28 @@ class DefaultCollationTestSuiteV2 extends DefaultCollationTestSuite with Datasou
806845
}
807846
}
808847

848+
test("alter char/varchar column to string type") {
849+
withTable(testTable) {
850+
sql(s"CREATE TABLE $testTable (c1 VARCHAR(10), c2 CHAR(10)) " +
851+
s"DEFAULT COLLATION UTF8_LCASE")
852+
853+
sql(s"ALTER TABLE $testTable ALTER COLUMN c1 TYPE STRING")
854+
sql(s"ALTER TABLE $testTable ALTER COLUMN c2 TYPE STRING")
855+
assertTableColumnCollation(testTable, "c1", "UTF8_LCASE")
856+
assertTableColumnCollation(testTable, "c2", "UTF8_LCASE")
857+
}
858+
859+
withTable(testTable) {
860+
sql(s"CREATE TABLE $testTable (c1 VARCHAR(10), c2 CHAR(10)) " +
861+
s"DEFAULT COLLATION UTF8_LCASE")
862+
863+
sql(s"ALTER TABLE $testTable ALTER COLUMN c1 TYPE STRING COLLATE UNICODE")
864+
sql(s"ALTER TABLE $testTable ALTER COLUMN c2 TYPE STRING COLLATE UNICODE")
865+
assertTableColumnCollation(testTable, "c1", "UNICODE")
866+
assertTableColumnCollation(testTable, "c2", "UNICODE")
867+
}
868+
}
869+
809870
private def testReplaceColumns(
810871
schemaDefaultCollation: String, tableDefaultCollation: Option[String] = None): Unit = {
811872
val (tableDefaultCollationClause, resolvedDefaultCollation) =

0 commit comments

Comments
 (0)
0