8000 Tidy up Bulk Copy unmatched column name work (#3205) · dotnet/SqlClient@ebcc63a · GitHub
[go: up one dir, main page]

Skip to content

Commit ebcc63a

Browse files
authored
Tidy up Bulk Copy unmatched column name work (#3205)
- Simplified the tracking of unmatched columns. - Cleaned up some fragile flags and loops. - Fixed SNI reflection failures when running in NetFX.
1 parent ab504ef commit ebcc63a

File tree

5 files changed

+387
-341
lines changed

5 files changed

+387
-341
lines changed

src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SqlBulkCopy.cs

Lines changed: 167 additions & 155 deletions
Original file line numberDiff line numberDiff line change
@@ -532,207 +532,219 @@ private string AnalyzeTargetAndCreateUpdateBulkCommand(BulkCopySimpleResultSet i
532532

533533
string[] parts = MultipartIdentifier.ParseMultipartIdentifier(DestinationTableName, "[\"", "]\"", Strings.SQL_BulkCopyDestinationTableName, true);
534534
updateBulkCommandText.AppendFormat("insert bulk {0} (", ADP.BuildMultiPartName(parts));
535-
int nmatched = 0; // Number of columns that match and are accepted
536-
int nrejected = 0; // Number of columns that match but were rejected
537-
bool rejectColumn; // True if a column is rejected because of an excluded type
538535

539-
bool isInTransaction;
540-
541-
isInTransaction = _connection.HasLocalTransaction;
542536
// Throw if there is a transaction but no flag is set
543-
if (isInTransaction && _externalTransaction == null && _internalTransaction == null && (_connection.Parser != null && _connection.Parser.CurrentTransaction != null && _connection.Parser.CurrentTransaction.IsLocal))
537+
if (_connection.HasLocalTransaction
538+
&& _externalTransaction == null
539+
&& _internalTransaction == null
540+
&& _connection.Parser != null
541+
&& _connection.Parser.CurrentTransaction != null
542+
&& _connection.Parser.CurrentTransaction.IsLocal)
544543
{
545544
throw SQL.BulkLoadExistingTransaction();
546545
}
547546

548547
HashSet<string> destColumnNames = new HashSet<string>();
549548

550-
Dictionary<string, bool> columnMappingStatusLookup = new Dictionary<string, bool>();
551-
// Loop over the metadata for each column
549+
// Keep track of any result columns that we don't have a local
550+
// mapping for.
551+
HashSet<string> unmatchedColumns = new(_localColumnMappings.Count);
552+
553+
// Start by assuming all locally mapped Destination columns will be
554+
// unmatched.
555+
for (int i = 0; i < _localColumnMappings.Count; ++i)
556+
{
557+
unmatchedColumns.Add(_localColumnMappings[i].DestinationColumn);
558+
}
559+
560+
// Flag to remember whether or not we need to append a comma before
561+
// the next column in the command text.
562+
bool appendComma = false;
563+
564+
// Loop over the metadata for each result column.
552565
_SqlMetaDataSet metaDataSet = internalResults[MetaDataResultId].MetaData;
553566
_sortedColumnMappings = new List<_ColumnMapping>(metaDataSet.Length);
554567
for (int i = 0; i < metaDataSet.Length; i++)
555568
{
556569
_SqlMetaData metadata = metaDataSet[i];
557-
rejectColumn = false;
558570

559-
// Check for excluded types
560-
if ((metadata.type == SqlDbType.Timestamp)
561-
|| ((metadata.IsIdentity) && !IsCopyOption(SqlBulkCopyOptions.KeepIdentity)))
571+
bool matched = false;
572+
bool rejected = false;
573+
574+
// Look for a local match for the remote column.
575+
for (int j = 0; j < _localColumnMappings.Count; ++j)
562576
{
563-
// Remove metadata for excluded columns
564-
metaDataSet[i] = null;
565-
rejectColumn = true;
566-
// We still need to find a matching column association
567-
}
577+
var localColumn = _localColumnMappings[j];
568578

569-
// Find out if this column is associated
570-
int assocId;
571-
for (assocId = 0; assocId < _localColumnMappings.Count; assocId++)
572-
{
573-
if (!columnMappingStatusLookup.ContainsKey(_localColumnMappings[assocId].DestinationColumn))
574-
{
575-
columnMappingStatusLookup.Add(_localColumnMappings[assocId].DestinationColumn, false);
576-
}
579+
// Are we missing a mapping between the result column and
580+
// this local column (by ordinal or name)?
581+
if (localColumn._destinationColumnOrdinal != metadata.ordinal
582+
&& UnquotedName(localColumn._destinationColumnName) != metadata.column)
583+
{
584+
// Yes, so move on to the next local column.
585+
continue;
586+
}
577587

578-
if ((_localColumnMappings[assocId]._destinationColumnOrdinal == metadata.ordinal) ||
579-
(UnquotedName(_localColumnMappings[assocId]._destinationColumnName) == metadata.column))
588+
// Ok, we found a matching local column.
589+
matched = true;
590+
591+
// Remove it from our unmatched set.
592+
unmatchedColumns.Remove(localColumn.DestinationColumn);
593+
594+
// Check for column types that we refuse to bulk load, even
595+
// though we found a match.
596+
//
597+
// We will not process timestamp or identity columns.
598+
//
599+
if (metadata.type == SqlDbType.Timestamp
600+
|| (metadata.IsIdentity && !IsCopyOption(SqlBulkCopyOptions.KeepIdentity)))
580601
{
581-
columnMappingStatusLookup[_localColumnMappings[assocId].DestinationColumn] = true;
602+
rejected = true;
603+
break;
604+
}
582605

583-
if (rejectColumn)
584-
{
585-
nrejected++; // Count matched columns only
586-
break;
587-
}
606+
_sortedColumnMappings.Add(new _ColumnMapping(localColumn._internalSourceColumnOrdinal, metadata));
607+
destColumnNames.Add(metadata.column);
588608

589-
_sortedColumnMappings.Add(new _ColumnMapping(_localColumnMappings[assocId]._internalSourceColumnOrdinal, metadata));
590-
destColumnNames.Add(metadata.column);
591-
nmatched++;
609+
// Append a comma for each subsequent column.
610+
if (appendComma)
611+
{
612+
updateBulkCommandText.Append(", ");
613+
}
614+
else
615+
{
616+
appendComma = true;
617+
}
592618

593-
if (nmatched > 1)
594-
{
595-
updateBulkCommandText.Append(", "); // A leading comma for all but the first one
596-
}
619+
// Some datatypes need special handling ...
620+
if (metadata.type == SqlDbType.Variant)
621+
{
622+
AppendColumnNameAndTypeName(updateBulkCommandText, metadata.column, "sql_variant");
623+
}
624+
else if (metadata.type == SqlDbType.Udt)
625+
{
626+
AppendColumnNameAndTypeName(updateBulkCommandText, metadata.column, "varbinary");
627+
}
628+
else if (metadata.type == SqlDbTypeExtensions.Json)
629+
{
630+
AppendColumnNameAndTypeName(updateBulkCommandText, metadata.column, "json");
631+
}
632+
else
633+
{
634+
AppendColumnNameAndTypeName(updateBulkCommandText, metadata.column, metadata.type.ToString());
635+
}
597636

598-
// Some datatypes need special handling ...
599-
if (metadata.type == SqlDbType.Variant)
600-
{
601-
AppendColumnNameAndTypeName(updateBulkCommandText, metadata.column, "sql_variant");
602-
}
603-
else if (metadata.type == SqlDbType.Udt)
604-
{
605-
AppendColumnNameAndTypeName(updateBulkCommandText, metadata.column, "varbinary");
606-
}
607-
else if (metadata.type == SqlDbTypeExtensions.Json)
608-
{
609-
AppendColumnNameAndTypeName(updateBulkCommandText, metadata.column, "json");
610-
}
611-
else
637+
switch (metadata.metaType.NullableType)
638+
{
639+
case TdsEnums.SQLNUMERICN:
640+
case TdsEnums.SQLDECIMALN:
641+
// Decimal and numeric need to include precision and scale
642+
updateBulkCommandText.AppendFormat((IFormatProvider)null, "({0},{1})", metadata.precision, metadata.scale);
643+
break;
644+
case TdsEnums.SQLUDT:
612645
{
613-
AppendColumnNameAndTypeName(updateBulkCommandText, metadata.column, metadata.type.ToString());
646+
if (metadata.IsLargeUdt)
647+
{
648+
updateBulkCommandText.Append("(max)");
649+
}
650+
else
651+
{
652+
int size = metadata.length;
653+
updateBulkCommandText.AppendFormat((IFormatProvider)null, "({0})", size);
654+
}
655+
break;
614656
}
615-
616-
switch (metadata.metaType.NullableType)
657+
case TdsEnums.SQLTIME:
658+
case TdsEnums.SQLDATETIME2:
659+
case TdsEnums.SQLDATETIMEOFFSET:
660+
// date, dateime2, and datetimeoffset need to include scale
661+
updateBulkCommandText.AppendFormat((IFormatProvider)null, "({0})", metadata.scale);
662+
break;
663+
default:
617664
{
618-
case TdsEnums.SQLNUMERICN:
619-
case TdsEnums.SQLDECIMALN:
620-
// Decimal and numeric need to include precision and scale
621-
updateBulkCommandText.AppendFormat((IFormatProvider)null, "({0},{1})", metadata.precision, metadata.scale);
622-
break;
623-
case TdsEnums.SQLUDT:
624-
{
625-
if (metadata.IsLargeUdt)
626-
{
627-
updateBulkCommandText.Append("(max)");
628-
}
629-
else
630-
{
631-
int size = metadata.length;
632-
updateBulkCommandText.AppendFormat((IFormatProvider)null, "({0})", size);
633-
}
634-
break;
635-
}
636-
case TdsEnums.SQLTIME:
637-
case TdsEnums.SQLDATETIME2:
638-
case TdsEnums.SQLDATETIMEOFFSET:
639-
// date, dateime2, and datetimeoffset need to include scale
640-
updateBulkCommandText.AppendFormat((IFormatProvider)null, "({0})", metadata.scale);
641-
break;
642-
default:
665+
// For non-long non-fixed types we need to add the Size
666+
if (!metadata.metaType.IsFixed && !metadata.metaType.IsLong)
667+
{
668+
int size = metadata.length;
669+
switch (metadata.metaType.NullableType)
643670
{
644-
// For non-long non-fixed types we need to add the Size
645-
if (!metadata.metaType.IsFixed && !metadata.metaType.IsLong)
646-
{
647-
int size = metadata.length;
648-
switch (metadata.metaType.NullableType)
649-
{
650-
case TdsEnums.SQLNCHAR:
651-
case TdsEnums.SQLNVARCHAR:
652-
case TdsEnums.SQLNTEXT:
653-
size /= 2;
654-
break;
655-
default:
656-
break;
657-
}
658-
updateBulkCommandText.AppendFormat((IFormatProvider)null, "({0})", size);
659-
}
660-
else if (metadata.metaType.IsPlp && metadata.metaType.SqlDbType != SqlDbType.Xml && metadata.metaType.SqlDbType != SqlDbTypeExtensions.Json)
661-
{
662-
// Partial length column prefix (max)
663-
updateBulkCommandText.Append("(max)");
664-
}
665-
break;
671+
case TdsEnums.SQLNCHAR:
672+
case TdsEnums.SQLNVARCHAR:
673+
case TdsEnums.SQLNTEXT:
674+
size /= 2;
675+
break;
676+
default:
677+
break;
666678
}
679+
updateBulkCommandText.AppendFormat((IFormatProvider)null, "({0})", size);
680+
}
681+
else if (metadata.metaType.IsPlp && metadata.metaType.SqlDbType != SqlDbType.Xml && metadata.metaType.SqlDbType != SqlDbTypeExtensions.Json)
682+
{
683+
// Partial length column prefix (max)
684+
updateBulkCommandText.Append("(max)");
685+
}
686+
break;
667687
}
688+
}
668689

669-
// Get collation for column i
670-
Result rowset = internalResults[CollationResultId];
671-
object rowvalue = rowset[i][CollationId];
690+
// Get collation for column i
691+
Result rowset = internalResults[CollationResultId];
692+
object rowvalue = rowset[i][CollationId];
672693

673-
bool shouldSendCollation;
674-
switch (metadata.type)
675-
{
676-
case SqlDbType.Char:
677-
case SqlDbType.NChar:
678-
case SqlDbType.VarChar:
679-
case SqlDbType.NVarChar:
680-
case SqlDbType.Text:
681-
case SqlDbType.NText:
682-
shouldSendCollation = true;
683-
break;
684-
685-
default:
686-
shouldSendCollation = false;
687-
break;
688-
}
694+
bool shouldSendCollation;
695+
switch (metadata.type)
696+
{
697+
case SqlDbType.Char:
698+
case SqlDbType.NChar:
699+
case SqlDbType.VarChar:
700+
case SqlDbType.NVarChar:
701+
case SqlDbType.Text:
702+
case SqlDbType.NText:
703+
shouldSendCollation = true;
704+
break;
689705

690-
if (rowvalue != null && shouldSendCollation)
706+
default:
707+
shouldSendCollation = false;
708+
break;
709+
}
710+
711+
if (rowvalue != null && shouldSendCollation)
712+
{
713+
Debug.Assert(rowvalue is SqlString);
714+
SqlString collation_name = (SqlString)rowvalue;
715+
if (!collation_name.IsNull)
691716
{
692-
Debug.Assert(rowvalue is SqlString);
693-
SqlString collation_name = (SqlString)rowvalue;
694-
if (!collation_name.IsNull)
717+
updateBulkCommandText.Append(" COLLATE " + collation_name.Value);
718+
// Compare collations only if the collation value was set on the metadata
719+
if (_sqlDataReaderRowSource != null && metadata.collation != null)
695720
{
696-
updateBulkCommandText.Append(" COLLATE " + collation_name.Value);
697-
// Compare collations only if the collation value was set on the metadata
698-
if (_sqlDataReaderRowSource != null && metadata.collation != null)
721+
// On SqlDataReader we can verify the sourcecolumn collation!
722+
int sourceColumnId = localColumn._internalSourceColumnOrdinal;
723+
int destinationLcid = metadata.collation.LCID;
724+
int sourceLcid = _sqlDataReaderRowSource.GetLocaleId(sourceColumnId);
725+
if (sourceLcid != destinationLcid)
699726
{
700-
// On SqlDataReader we can verify the sourcecolumn collation!
701-
int sourceColumnId = _localColumnMappings[assocId]._internalSourceColumnOrdinal;
702-
int destinationLcid = metadata.collation.LCID;
703-
int sourceLcid = _sqlDataReaderRowSource.GetLocaleId(sourceColumnId);
704-
if (sourceLcid != destinationLcid)
705-
{
706-
throw SQL.BulkLoadLcidMismatch(sourceLcid, _sqlDataReaderRowSource.GetName(sourceColumnId), destinationLcid, metadata.column);
707-
}
727+
throw SQL.BulkLoadLcidMismatch(sourceLcid, _sqlDataReaderRowSource.GetName(sourceColumnId), destinationLcid, metadata.column);
708728
}
709729
}
710730
}
711-
break;
712731
}
732+
733+
// We found a match, so no need to keep looking.
734+
break;
713735
}
714736

715-
if (assocId == _localColumnMappings.Count)
737+
// Remove metadata for unmatched and rejected columns.
738+
if (! matched || rejected)
716739
{
717-
// Remove metadata for unmatched columns
718740
metaDataSet[i] = null;
719741
}
720742
}
721743

722-
// All columnmappings should have matched up
723-
if (nmatched + nrejected != _localColumnMappings.Count)
744+
// Do we have any unmatched columns?
745+
if (unmatchedColumns.Count > 0)
724746
{
725-
List<string> unmatchedColumns = new List<string>();
726-
727-
foreach(KeyValuePair<string, bool> keyValuePair in columnMappingStatusLookup)
728-
{
729-
if (!keyValuePair.Value)
730-
{
731-
unmatchedColumns.Add(keyValuePair.Key);
732-
}
733-
}
734-
735-
throw SQL.BulkLoadNonMatchingColumnName(unmatchedColumns);
747+
throw SQL.BulkLoadNonMatchingColumnNames(unmatchedColumns);
736748
}
737749

738750
updateBulkCommandText.Append(")");

0 commit comments

Comments
 (0)
0