8000 Recycle MySqlDataReader objects. Fixes #1277 · mysql-net/MySqlConnector@6d5189d · GitHub
[go: up one dir, main page]

Skip to content

Commit 6d5189d

Browse files
committed
Recycle MySqlDataReader objects. Fixes #1277
1 parent a8164ae commit 6d5189d

File tree

3 files changed

+64
-44
lines changed

3 files changed

+64
-44
lines changed

src/MySqlConnector/Core/CommandExecutor.cs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,16 +47,18 @@ public static async Task<MySqlDataReader> ExecuteReaderAsync(IReadOnlyList<IMySq
4747
cancellationToken.ThrowIfCancellationRequested();
4848

4949
using var payload = writer.ToPayloadData();
50-
connection.Session.StartQuerying(command.CancellableCommand);
50+
var session = connection.Session;
51+
session.StartQuerying(command.CancellableCommand);
5152
command.SetLastInsertedId(0);
5253
try
5354
{
54-
await connection.Session.SendAsync(payload, ioBehavior, CancellationToken.None).ConfigureAwait(false);
55-
return await MySqlDataReader.CreateAsync(commandListPosition, payloadCreator, cachedProcedures, command, behavior, activity, ioBehavior, cancellationToken).ConfigureAwait(false);
55+
await session.SendAsync(payload, ioBehavior, CancellationToken.None).ConfigureAwait(false);
56+
await session.DataReader.InitAsync(commandListPosition, payloadCreator, cachedProcedures, command, behavior, activity, ioBehavior, cancellationToken).ConfigureAwait(false);
57+
return session.DataReader;
5658
}
5759
catch (MySqlException ex) when (ex.ErrorCode == MySqlErrorCode.QueryInterrupted && cancellationToken.IsCancellationRequested)
5860
{
59-
Log.QueryWasInterrupted(command.Logger, connection.Session.Id);
61+
Log.QueryWasInterrupted(command.Logger, session.Id);
6062
throw new OperationCanceledException(ex.Message, ex, cancellationToken);
6163
}
6264
catch (Exception ex) when (payload.Span.Length > 4_194_304 && (ex is SocketException or IOException or MySqlProtocolException))

src/MySqlConnector/Core/ServerSession.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ public ServerSession(ILogger logger, ConnectionPool? pool, int poolGeneration, i
4242
PoolGeneration = poolGeneration;
4343
HostName = "";
4444
m_activityTags = new ActivityTagsCollection();
45+
DataReader = new();
4546
Log.CreatedNewSession(m_logger, Id);
4647
}
4748

@@ -65,6 +66,7 @@ public ServerSession(ILogger logger, ConnectionPool? pool, int poolGeneration, i
6566
public bool SupportsSessionTrack => m_supportsSessionTrack;
6667
public bool ProcAccessDenied { get; set; }
6768
public ICollection<KeyValuePair<string, object?>> ActivityTags => m_activityTags;
69+
public MySqlDataReader DataReader { get; }
6870

6971
public ValueTask ReturnToPoolAsync(IOBehavior ioBehavior, MySqlConnection? owningConnection)
7072
{

src/MySqlConnector/MySqlDataReader.cs

Lines changed: 56 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -27,19 +27,19 @@ public override bool Read()
2727
{
2828
VerifyNotDisposed();
2929
Command!.CancellableCommand.ResetCommandTimeout();
30-
return m_resultSet!.Read();
30+
return m_resultSet.Read();
3131
}
3232

3333
public override async Task<bool> ReadAsync(CancellationToken cancellationToken)
3434
{
3535
VerifyNotDisposed();
3636
Command!.CancellableCommand.ResetCommandTimeout();
3737
using var registration = Command.CancellableCommand.RegisterCancel(cancellationToken);
38-
return await m_resultSet!.ReadAsync(cancellationToken).ConfigureAwait(false);
38+
return await m_resultSet.ReadAsync(cancellationToken).ConfigureAwait(false);
3939
}
4040

4141
internal Task<bool> ReadAsync(IOBehavior ioBehavior, CancellationToken cancellationToken) =>
42-
m_resultSet!.ReadAsync(ioBehavior, cancellationToken);
42+
m_resultSet.ReadAsync(ioBehavior, cancellationToken);
4343

4444
public override async Task<bool> NextResultAsync(CancellationToken cancellationToken)
4545
{
@@ -58,7 +58,7 @@ internal async Task<bool> NextResultAsync(IOBehavior ioBehavior, CancellationTok
5858
{
5959
while (true)
6060
{
61-
await m_resultSet!.ReadEntireAsync(ioBehavior, cancellationToken).ConfigureAwait(false);
61+
await m_resultSet.ReadEntireAsync(ioBehavior, cancellationToken).ConfigureAwait(false);
6262
await ScanResultSetAsync(ioBehavior, m_resultSet, cancellationToken).ConfigureAwait(false);
6363
if (m_hasMoreResults && m_resultSet.ContainsCommandParameters)
6464
await ReadOutParametersAsync(Command!, m_resultSet, ioBehavior, cancellationToken).ConfigureAwait(false);
@@ -74,7 +74,7 @@ internal async Task<bool> NextResultAsync(IOBehavior ioBehavior, CancellationTok
7474
using (Command.CancellableCommand.RegisterCancel(cancellationToken))
7575
{
7676
var writer = new ByteBufferWriter();
77-
if (!Command.Connection!.Session.IsCancelingQuery && m_payloadCreator.WriteQueryCommand(ref m_commandListPosition, m_cachedProcedures!, writer, false))
77+
if (!Command.Connection!.Session.IsCancelingQuery && m_payloadCreator!.WriteQueryCommand(ref m_commandListPosition, m_cachedProcedures!, writer, false))
7878
{
7979
using var payload = writer.ToPayloadData();
8080
await Command.Connection.Session.SendAsync(payload, ioBehavior, cancellationToken).ConfigureAwait(false);
@@ -99,7 +99,7 @@ internal async Task<bool> NextResultAsync(IOBehavior ioBehavior, CancellationTok
9999
}
100100
catch (MySqlException)
101101
{
102-
m_resultSet!.Reset();
102+
m_resultSet.Reset();
103103
m_hasMoreResults = false;
104104< 10000 /td>
m_schemaTable = null;
105105
throw;
@@ -108,7 +108,7 @@ internal async Task<bool> NextResultAsync(IOBehavior ioBehavior, CancellationTok
108108

109109
private void ActivateResultSet(CancellationToken cancellationToken)
110110
{
111-
if (m_resultSet!.ReadResultSetHeaderException is not null)
111+
if (m_resultSet.ReadResultSetHeaderException is not null)
112112
{
113113
var mySqlException = m_resultSet.ReadResultSetHeaderException.SourceException as MySqlException;
114114

@@ -350,8 +350,8 @@ public override long GetChars(int ordinal, long dataOffset, char[]? buffer, int
350350
/// <returns>A <see cref="System.Collections.ObjectModel.ReadOnlyCollection{DbColumn}"/> containing metadata about the result set.</returns>
351351
public ReadOnlyCollection<DbColumn> GetColumnSchema()
352352
{
353-
var columnDefinitions = m_resultSet?.ColumnDefinitions;
354-
var hasNoSchema = columnDefinitions is null || m_resultSet!.ContainsCommandParameters;
353+
var columnDefinitions = m_resultSet.ColumnDefinitions;
354+
var hasNoSchema = columnDefinitions is null || m_resultSet.ContainsCommandParameters;
355355
return hasNoSchema ? new List<DbColumn>().AsReadOnly() :
356356
columnDefinitions!
357357
.Select((c, n) => (DbColumn) new MySqlDbColumn(n, c, Connection!.AllowZeroDateTime, GetResultSet().ColumnTypes![n]))
@@ -451,30 +451,49 @@ protected override void Dispose(bool disposing)
451451
public Task DisposeAsync() => DisposeAsync(Connection?.AsyncIOBehavior ?? IOBehavior.Asynchronous, CancellationToken.None);
452452
#endif
453453

454-
internal Activity? Activity { get; }
454+
internal Activity? Activity { get; private set; }
455455
internal IMySqlCommand? Command { get; private set; }
456456
internal MySqlConnection? Connection => Command?.Connection;
457457
internal ulong? RealRecordsAffected { get; set; }
458458
internal ServerSession? Session => Command?.Connection!.Session;
459459

460-
internal static async Task<MySqlDataReader> CreateAsync(CommandListPosition commandListPosition, ICommandPayloadCreator payloadCreator, IDictionary<string, CachedProcedure?>? cachedProcedures, IMySqlCommand command, CommandBehavior behavior, Activity? activity, IOBehavior ioBehavior, CancellationToken cancellationToken)
460+
internal async Task InitAsync(CommandListPosition commandListPosition, ICommandPayloadCreator payloadCreator, IDictionary<string, CachedProcedure?>? cachedProcedures, IMySqlCommand command, CommandBehavior behavior, Activity? activity, IOBehavior ioBehavior, CancellationToken cancellationToken)
461461
{
462-
var dataReader = new MySqlDataReader(commandListPosition, payloadCreator, cachedProcedures, command, behavior, activity);
463-
command.Connection!.SetActiveReader(dataReader);
462+
// reset fields from last use of this MySqlDataReader
463+
if (m_hasMoreResults)
464+
throw new InvalidOperationException("Expected m_hasMoreResults to be false");
465+
if (m_resultSet.BufferState != ResultSetState.None || m_resultSet.State != ResultSetState.None)
466+
throw new InvalidOperationException("Expected BufferState and State to be ResultSetState.None.");
467+
if (m_resultSet.ColumnDefinitions is not null)
468+
throw new InvalidOperationException("Expected ColumnDefinitions to be null");
469+
m_closed = false;
470+
m_hasWarnings = false;
471+
m_initializationFailed = false;
472+
RealRecordsAffected = null;
473+
474+
// initialize for new command
475+
m_commandListPosition = commandListPosition;
476+
m_payloadCreator = payloadCreator;
477+
m_cachedProcedures = cachedProcedures;
478+
Command = command;
479+
m_behavior = behavior;
480+
Activity = activity;
481+
482+
command.Connection!.SetActiveReader(this);
464483

465484
try
466485
{
467-
await dataReader.m_resultSet!.ReadResultSetHeaderAsync(ioBehavior).ConfigureAwait(false);
468-
dataReader.ActivateResultSet(cancellationToken);
469-
dataReader.m_hasMoreResults = true;
486+
await m_resultSet.ReadResultSetHeaderAsync(ioBehavior).ConfigureAwait(false);
487+
ActivateResultSet(cancellationToken);
488+
m_hasMoreResults = true;
470489

471-
if (dataReader.m_resultSet.ContainsCommandParameters)
472-
await ReadOutParametersAsync(dataReader.Command!, dataReader.m_resultSet, ioBehavior, cancellationToken).ConfigureAwait(false);
490+
if (m_resultSet.ContainsCommandParameters)
491+
await ReadOutParametersAsync(command, m_resultSet, ioBehavior, cancellationToken).ConfigureAwait(false);
473492

474493
// if the command list has multiple commands, keep reading until a result set is found
475-
while (dataReader.m_resultSet.State == ResultSetState.NoMoreData && commandListPosition.CommandIndex < commandListPosition.Commands.Count)
494+
while (m_resultSet.State == ResultSetState.NoMoreData && commandListPosition.CommandIndex < commandListPosition.Commands.Count)
476495
{
477-
await dataReader.NextResultAsync(ioBehavior, cancellationToken).ConfigureAwait(false);
496+
await NextResultAsync(ioBehavior, cancellationToken).ConfigureAwait(false);
478497
}
479498
}
480499
catch (Exception ex)
@@ -484,18 +503,16 @@ internal static async Task<MySqlDataReader> CreateAsync(CommandListPosition comm
484503
activity.SetException(ex);
485504
activity.Stop();
486505
}
487-
dataReader.m_creationFailed = true;
488-
dataReader.Dispose();
506+
m_initializationFailed = true;
507+
Dispose();
489508
throw;
490509
}
491-
492-
return dataReader;
493510
}
494511

495512
internal DataTable? BuildSchemaTable()
496513
{
497-
var columnDefinitions = m_resultSet?.ColumnDefinitions;
498-
if (columnDefinitions is null || m_resultSet!.ContainsCommandParameters)
514+
var columnDefinitions = m_resultSet.ColumnDefinitions;
515+
if (columnDefinitions is null || m_resultSet.ContainsCommandParameters)
499516
return null;
500517

501518
var schemaTable = new DataTable("SchemaTable") { Locale = CultureInfo.InvariantCulture };
@@ -583,15 +600,9 @@ internal static async Task<MySqlDataReader> CreateAsync(CommandListPosition comm
583600
return schemaTable;
584601
}
585602

586-
private MySqlDataReader(CommandListPosition commandListPosition, ICommandPayloadCreator payloadCreator, IDictionary<string, CachedProcedure?>? cachedProcedures, IMySqlCommand command, CommandBehavior behavior, Activity? activity)
603+
internal MySqlDataReader()
587604
{
588-
m_commandListPosition = commandListPosition;
589-
m_payloadCreator = payloadCreator;
590-
m_cachedProcedures = cachedProcedures;
591-
Command = command;
592-
m_behavior = behavior;
593605
m_resultSet = new(this);
594-
Activity = activity;
595606
}
596607

597608
#if NETCOREAPP3_0_OR_GREATER || NETSTANDARD2_1_OR_GREATER
@@ -619,7 +630,6 @@ internal async Task DisposeAsync(IOBehavior ioBehavior, CancellationToken cancel
619630
if (ex.ErrorCode != MySqlErrorCode.QueryInterrupted)
620631
Log.IgnoringExceptionInDisposeAsync(Command.Logger, ex, Command.Connection.Session.Id, ex.Message, Command.CommandText!);
621632
}
622-
m_resultSet = null;
623633
}
624634

625635
m_hasMoreResults = false;
@@ -628,13 +638,19 @@ internal async Task DisposeAsync(IOBehavior ioBehavior, CancellationToken cancel
628638
Command.CancellableCommand.SetTimeout(Constants.InfiniteTimeout);
629639
connection.FinishQuerying(m_hasWarnings);
630640

631-
if (!m_creationFailed)
641+
if (!m_initializationFailed)
632642
Activity?.SetSuccess();
633643
Activity?.Stop();
644+
Activity = null;
634645

635646
if ((m_behavior & CommandBehavior.CloseConnection) != 0)
636647
await connection.CloseAsync(ioBehavior).ConfigureAwait(false);
648+
649+
// clear fields (so that MySqlConnection can be GCed if the user doesn't hold a reference to it)
637650
Command = null;
651+
m_commandListPosition = default;
652+
m_payloadCreator = null;
653+
m_cachedProcedures = null;
638654
}
639655
}
640656

@@ -683,14 +699,14 @@ private ResultSet GetResultSet()
683699
return m_resultSet;
684700
}
685701

686-
private readonly CommandBehavior m_behavior;
687-
private readonly ICommandPayloadCreator m_payloadCreator;
688-
private readonly IDictionary<string, CachedProcedure?>? m_cachedProcedures;
702+
private readonly ResultSet m_resultSet;
703+
private CommandBehavior m_behavior;
704+
private ICommandPayloadCreator? m_payloadCreator;
705+
private IDictionary<string, CachedProcedure?>? m_cachedProcedures;
689706
private CommandListPosition m_commandListPosition;
690707
private bool m_closed;
691708
private bool m_hasWarnings;
692709
private bool m_hasMoreResults;
693-
private bool m_creationFailed;
694-
private ResultSet? m_resultSet;
710+
private bool m_initializationFailed;
695711
private DataTable? m_schemaTable;
696712
}

0 commit comments

Comments
 (0)
0