From 59e3467b25cc249c87f22b3509fb01810d03e2b1 Mon Sep 17 00:00:00 2001 From: Nikita Kazmin Date: Mon, 26 Oct 2020 21:28:09 +0300 Subject: [PATCH 01/10] Removed soft cancellation for the reader's sequential methods --- src/Npgsql/HardCancellationBlock.cs | 33 +++++++++++++++++++++ src/Npgsql/NpgsqlDataReader.cs | 3 ++ src/Npgsql/NpgsqlReadBuffer.cs | 7 ++--- test/Npgsql.Tests/ReaderTests.cs | 36 +++++++++++++++++++++++ test/Npgsql.Tests/Support/PgServerMock.cs | 15 ++++++++++ 5 files changed, 90 insertions(+), 4 deletions(-) create mode 100644 src/Npgsql/HardCancellationBlock.cs diff --git a/src/Npgsql/HardCancellationBlock.cs b/src/Npgsql/HardCancellationBlock.cs new file mode 100644 index 0000000000..3db2745b58 --- /dev/null +++ b/src/Npgsql/HardCancellationBlock.cs @@ -0,0 +1,33 @@ +using System; +using System.Threading; + +namespace Npgsql +{ + internal class HardCancellationBlock : IDisposable + { + bool disposed; + + static readonly AsyncLocal _entered = new AsyncLocal(); + + public static bool Entered => _entered.Value; + + public static IDisposable Enter() + { + var context = new HardCancellationBlock(); + if (Entered) + context.disposed = true; + else + _entered.Value = true; + return context; + } + + public void Dispose() + { + if (!disposed) + { + _entered.Value = false; + disposed = true; + } + } + } +} diff --git a/src/Npgsql/NpgsqlDataReader.cs b/src/Npgsql/NpgsqlDataReader.cs index e14822bd6d..f164e9c9e3 100644 --- a/src/Npgsql/NpgsqlDataReader.cs +++ b/src/Npgsql/NpgsqlDataReader.cs @@ -1476,6 +1476,7 @@ public override T GetFieldValue(int ordinal) async ValueTask GetFieldValueSequential(int column, bool async, CancellationToken cancellationToken = default) { + using var _ = HardCancellationBlock.Enter(); var field = CheckRowAndGetField(column); await SeekToColumnSequential(column, async, cancellationToken); CheckColumnStart(); @@ -1962,6 +1963,8 @@ async Task SeekToColumnSequential(int column, bool async, CancellationToken canc PosInColumn = ColumnLen; } + using var _ = HardCancellationBlock.Enter(); + // Skip to end of column if needed // TODO: Simplify by better initializing _columnLen/_posInColumn var remainingInColumn = ColumnLen == -1 ? 0 : ColumnLen - PosInColumn; diff --git a/src/Npgsql/NpgsqlReadBuffer.cs b/src/Npgsql/NpgsqlReadBuffer.cs index 33fbc48868..63fb8f2834 100644 --- a/src/Npgsql/NpgsqlReadBuffer.cs +++ b/src/Npgsql/NpgsqlReadBuffer.cs @@ -202,10 +202,9 @@ async Task EnsureLong() switch (e) { - // User requested the cancellation (at this moment, it should be only WaitAsync) + // User requested the cancellation (at this moment, it is COPY operations, WaitAsync, Reader's sequential methods, authentication) case OperationCanceledException _ when cancellationToken.IsCancellationRequested: - Debug.Assert(readingNotifications); - throw; + throw readingNotifications ? e : Connector.Break(e); // Read timeout case OperationCanceledException _: @@ -220,7 +219,7 @@ async Task EnsureLong() // Note that if PG cancellation fails, the exception for that is already logged internally by CancelRequest. // We simply continue and throw the timeout one. - if (!wasCancellationRequested && Connector.CancelRequest(requestedByUser: false)) + if (!HardCancellationBlock.Entered && !wasCancellationRequested && Connector.CancelRequest(requestedByUser: false)) { // If the cancellation timeout is negative, we break the connection immediately var cancellationTimeout = Connector.Settings.CancellationTimeout; diff --git a/test/Npgsql.Tests/ReaderTests.cs b/test/Npgsql.Tests/ReaderTests.cs index 403fe183cb..d2d4a9790a 100644 --- a/test/Npgsql.Tests/ReaderTests.cs +++ b/test/Npgsql.Tests/ReaderTests.cs @@ -1724,6 +1724,42 @@ await pgMock Assert.That(conn.FullState, Is.EqualTo(ConnectionState.Broken)); } + [Test, Description("Timeouts ReadAsGetFieldValueAsync")] + [Timeout(15000)] + public async Task GetFieldValue_timeout_hard() + { + if (IsMultiplexing) + return; // Multiplexing, cancellation + + var csb = new NpgsqlConnectionStringBuilder(ConnectionString); + csb.CommandTimeout = 5; + csb.CancellationTimeout = 30000; + + await using var postmasterMock = PgPostmasterMock.Start(csb.ToString()); + using var _ = CreateTempPool(postmasterMock.ConnectionString, out var connectionString); + await using var conn = await OpenConnectionAsync(connectionString); + + // Write responses to the query we're about to send, with a single data row (we'll attempt to read two) + var pgMock = await postmasterMock.WaitForServerConnection(); + await pgMock + .WriteParseComplete() + .WriteBindComplete() + .WriteRowDescription(new FieldDescription(PostgresTypeOIDs.Bytea)) + .WriteDataRowWithFlush(new byte[10000]); + + using var cmd = new NpgsqlCommand("SELECT some_int FROM some_table", conn); + await using var reader = await cmd.ExecuteReaderAsync(CommandBehavior.SequentialAccess); + + await reader.ReadAsync(); + + var task = reader.GetFieldValueAsync(0); + + var exception = Assert.ThrowsAsync(async () => await task); + Assert.That(exception.InnerException, Is.TypeOf()); + + Assert.That(conn.FullState, Is.EqualTo(ConnectionState.Broken)); + } + #endregion Cancellation #region Initialization / setup / teardown diff --git a/test/Npgsql.Tests/Support/PgServerMock.cs b/test/Npgsql.Tests/Support/PgServerMock.cs index 5f3f377e30..b30ce89bd7 100644 --- a/test/Npgsql.Tests/Support/PgServerMock.cs +++ b/test/Npgsql.Tests/Support/PgServerMock.cs @@ -158,6 +158,21 @@ internal PgServerMock WriteDataRow(params byte[][] columnValues) return this; } + internal async Task WriteDataRowWithFlush(params byte[][] columnValues) + { + CheckDisposed(); + + _writeBuffer.WriteByte((byte) BackendMessageCode.DataRow); + _writeBuffer.WriteInt32(4 + 2 + columnValues.Sum(v => 4 + v.Length)); + _writeBuffer.WriteInt16(columnValues.Length); + + foreach (var field in columnValues) + { + _writeBuffer.WriteInt32(field.Length); + await _writeBuffer.WriteBytesRaw(field, true); + } + } + internal PgServerMock WriteCommandComplete(string tag = "") { CheckDisposed(); From 5b047828eb96a4f26cd97b0bd6adf448d67e36b5 Mon Sep 17 00:00:00 2001 From: Nikita Kazmin Date: Tue, 27 Oct 2020 08:13:32 +0300 Subject: [PATCH 02/10] Made block a readonly struct --- src/Npgsql/HardCancellationBlock.cs | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/src/Npgsql/HardCancellationBlock.cs b/src/Npgsql/HardCancellationBlock.cs index 3db2745b58..917e70049d 100644 --- a/src/Npgsql/HardCancellationBlock.cs +++ b/src/Npgsql/HardCancellationBlock.cs @@ -3,31 +3,28 @@ namespace Npgsql { - internal class HardCancellationBlock : IDisposable + readonly struct HardCancellationBlock : IDisposable { - bool disposed; - static readonly AsyncLocal _entered = new AsyncLocal(); public static bool Entered => _entered.Value; + readonly bool _outerBlock; + + public HardCancellationBlock(bool outerBlock) =>_outerBlock = outerBlock; + public static IDisposable Enter() { - var context = new HardCancellationBlock(); - if (Entered) - context.disposed = true; - else + var context = new HardCancellationBlock(!Entered); + if (!Entered) _entered.Value = true; return context; } public void Dispose() { - if (!disposed) - { + if (_outerBlock) _entered.Value = false; - disposed = true; - } } } } From c9fb4a5eb2f14bbae11dc53c7243a16db97ae5f8 Mon Sep 17 00:00:00 2001 From: Nikita Kazmin Date: Tue, 27 Oct 2020 14:02:33 +0300 Subject: [PATCH 03/10] Rolled back previous changes --- src/Npgsql/HardCancellationBlock.cs | 30 ----------------------------- src/Npgsql/NpgsqlDataReader.cs | 3 --- src/Npgsql/NpgsqlReadBuffer.cs | 2 +- 3 files changed, 1 insertion(+), 34 deletions(-) delete mode 100644 src/Npgsql/HardCancellationBlock.cs diff --git a/src/Npgsql/HardCancellationBlock.cs b/src/Npgsql/HardCancellationBlock.cs deleted file mode 100644 index 917e70049d..0000000000 --- a/src/Npgsql/HardCancellationBlock.cs +++ /dev/null @@ -1,30 +0,0 @@ -using System; -using System.Threading; - -namespace Npgsql -{ - readonly struct HardCancellationBlock : IDisposable - { - static readonly AsyncLocal _entered = new AsyncLocal(); - - public static bool Entered => _entered.Value; - - readonly bool _outerBlock; - - public HardCancellationBlock(bool outerBlock) =>_outerBlock = outerBlock; - - public static IDisposable Enter() - { - var context = new HardCancellationBlock(!Entered); - if (!Entered) - _entered.Value = true; - return context; - } - - public void Dispose() - { - if (_outerBlock) - _entered.Value = false; - } - } -} diff --git a/src/Npgsql/NpgsqlDataReader.cs b/src/Npgsql/NpgsqlDataReader.cs index f164e9c9e3..e14822bd6d 100644 --- a/src/Npgsql/NpgsqlDataReader.cs +++ b/src/Npgsql/NpgsqlDataReader.cs @@ -1476,7 +1476,6 @@ public override T GetFieldValue(int ordinal) async ValueTask GetFieldValueSequential(int column, bool async, CancellationToken cancellationToken = default) { - using var _ = HardCancellationBlock.Enter(); var field = CheckRowAndGetField(column); await SeekToColumnSequential(column, async, cancellationToken); CheckColumnStart(); @@ -1963,8 +1962,6 @@ async Task SeekToColumnSequential(int column, bool async, CancellationToken canc PosInColumn = ColumnLen; } - using var _ = HardCancellationBlock.Enter(); - // Skip to end of column if needed // TODO: Simplify by better initializing _columnLen/_posInColumn var remainingInColumn = ColumnLen == -1 ? 0 : ColumnLen - PosInColumn; diff --git a/src/Npgsql/NpgsqlReadBuffer.cs b/src/Npgsql/NpgsqlReadBuffer.cs index 63fb8f2834..ddbd133552 100644 --- a/src/Npgsql/NpgsqlReadBuffer.cs +++ b/src/Npgsql/NpgsqlReadBuffer.cs @@ -219,7 +219,7 @@ async Task EnsureLong() // Note that if PG cancellation fails, the exception for that is already logged internally by CancelRequest. // We simply continue and throw the timeout one. - if (!HardCancellationBlock.Entered && !wasCancellationRequested && Connector.CancelRequest(requestedByUser: false)) + if (!wasCancellationRequested && Connector.CancelRequest(requestedByUser: false)) { // If the cancellation timeout is negative, we break the connection immediately var cancellationTimeout = Connector.Settings.CancellationTimeout; From 59f04b8855c7c76e9ab547c45b5dd47d1a386b04 Mon Sep 17 00:00:00 2001 From: Nikita Kazmin Date: Tue, 27 Oct 2020 15:15:46 +0300 Subject: [PATCH 04/10] Added a variable, to control the flow of the cancellation on a timeout --- src/Npgsql/NpgsqlCommand.cs | 14 +++++++------- src/Npgsql/NpgsqlConnector.cs | 15 ++++++++++++--- src/Npgsql/NpgsqlDataReader.cs | 26 +++++++++++++------------- src/Npgsql/NpgsqlReadBuffer.cs | 10 ++++++---- test/Npgsql.Tests/ReaderTests.cs | 2 +- 5 files changed, 39 insertions(+), 28 deletions(-) diff --git a/src/Npgsql/NpgsqlCommand.cs b/src/Npgsql/NpgsqlCommand.cs index 52175f5444..aababe7484 100644 --- a/src/Npgsql/NpgsqlCommand.cs +++ b/src/Npgsql/NpgsqlCommand.cs @@ -615,14 +615,14 @@ async Task PrepareLong(CancellationToken cancellationToken) { if (pStatement.StatementBeingReplaced != null) { - Expect(await connector.ReadMessage(async, cancellationToken), connector); + Expect(await connector.ReadMessageWithCancellation(async, cancellationToken), connector); pStatement.StatementBeingReplaced.CompleteUnprepare(); pStatement.StatementBeingReplaced = null; } - Expect(await connector.ReadMessage(async, cancellationToken), connector); - Expect(await connector.ReadMessage(async, cancellationToken), connector); - var msg = await connector.ReadMessage(async, cancellationToken); + Expect(await connector.ReadMessageWithCancellation(async, cancellationToken), connector); + Expect(await connector.ReadMessageWithCancellation(async, cancellationToken), connector); + var msg = await connector.ReadMessageWithCancellation(async, cancellationToken); switch (msg.Code) { case BackendMessageCode.RowDescription: @@ -661,7 +661,7 @@ async Task PrepareLong(CancellationToken cancellationToken) } } - Expect(await connector.ReadMessage(async, cancellationToken), connector); + Expect(await connector.ReadMessageWithCancellation(async, cancellationToken), connector); if (async) await sendTask; @@ -712,11 +712,11 @@ async Task Unprepare(bool async, CancellationToken cancellationToken = default) foreach (var statement in _statements) if (statement.PreparedStatement?.State == PreparedState.BeingUnprepared) { - Expect(await connector.ReadMessage(async, cancellationToken), connector); + Expect(await connector.ReadMessageWithCancellation(async, cancellationToken), connector); statement.PreparedStatement.CompleteUnprepare(); statement.PreparedStatement = null; } - Expect(await connector.ReadMessage(async, cancellationToken), connector); + Expect(await connector.ReadMessageWithCancellation(async, cancellationToken), connector); if (async) await sendTask; else diff --git a/src/Npgsql/NpgsqlConnector.cs b/src/Npgsql/NpgsqlConnector.cs index 3db2356d49..5af2d8a9b3 100644 --- a/src/Npgsql/NpgsqlConnector.cs +++ b/src/Npgsql/NpgsqlConnector.cs @@ -1003,9 +1003,15 @@ internal IBackendMessage ReadMessage(DataRowLoadingMode dataRowLoadingMode = Dat internal ValueTask ReadMessage(bool async, CancellationToken cancellationToken = default) => DoReadMessage(async, DataRowLoadingMode.NonSequential, cancellationToken: cancellationToken)!; + internal ValueTask ReadMessageWithCancellation(bool async, CancellationToken cancellationToken = default) + => DoReadMessage(async, DataRowLoadingMode.NonSequential, attemptPostgresCancellation: true, cancellationToken: cancellationToken)!; + internal ValueTask ReadMessage(bool async, DataRowLoadingMode dataRowLoadingMode = DataRowLoadingMode.NonSequential, CancellationToken cancellationToken = default) => DoReadMessage(async, dataRowLoadingMode, cancellationToken: cancellationToken)!; + internal ValueTask ReadMessageWithCancellation(bool async, DataRowLoadingMode dataRowLoadingMode = DataRowLoadingMode.NonSequential, CancellationToken cancellationToken = default) + => DoReadMessage(async, dataRowLoadingMode, attemptPostgresCancellation: true, cancellationToken: cancellationToken)!; + internal ValueTask ReadMessageWithNotifications(bool async, CancellationToken cancellationToken = default) => DoReadMessage(async, DataRowLoadingMode.NonSequential, true, cancellationToken: cancellationToken); @@ -1013,6 +1019,7 @@ internal ValueTask ReadMessage(bool async, DataRowLoadingMode d bool async, DataRowLoadingMode dataRowLoadingMode = DataRowLoadingMode.NonSequential, bool readingNotifications = false, + bool attemptPostgresCancellation = false, CancellationToken cancellationToken = default) { if (_pendingPrependedResponses > 0 || @@ -1020,7 +1027,8 @@ internal ValueTask ReadMessage(bool async, DataRowLoadingMode d readingNotifications || ReadBuffer.ReadBytesLeft < 5) { - return ReadMessageLong(dataRowLoadingMode, readingNotifications, cancellationToken2: cancellationToken); + return ReadMessageLong(dataRowLoadingMode, readingNotifications2: readingNotifications, + attemptPostgresCancellation2: attemptPostgresCancellation, cancellationToken2: cancellationToken); } var messageCode = (BackendMessageCode)ReadBuffer.ReadByte(); @@ -1050,6 +1058,7 @@ internal ValueTask ReadMessage(bool async, DataRowLoadingMode d DataRowLoadingMode dataRowLoadingMode2, bool readingNotifications2, bool isReadingPrependedMessage = false, + bool attemptPostgresCancellation2 = false, CancellationToken cancellationToken2 = default) { // First read the responses of any prepended messages. @@ -1060,7 +1069,7 @@ internal ValueTask ReadMessage(bool async, DataRowLoadingMode d // TODO: There could be room for optimization here, rather than the async call(s) ReadBuffer.Timeout = TimeSpan.FromMilliseconds(InternalCommandTimeout); for (; _pendingPrependedResponses > 0; _pendingPrependedResponses--) - await ReadMessageLong(DataRowLoadingMode.Skip, false, true, cancellationToken2); + await ReadMessageLong(DataRowLoadingMode.Skip, false, true, false, cancellationToken2); } catch (PostgresException e) { @@ -1076,7 +1085,7 @@ internal ValueTask ReadMessage(bool async, DataRowLoadingMode d while (true) { - await ReadBuffer.Ensure(5, async, readingNotifications2, cancellationToken2); + await ReadBuffer.Ensure(5, async, readingNotifications2, attemptPostgresCancellation2, cancellationToken2); messageCode = (BackendMessageCode)ReadBuffer.ReadByte(); PGUtil.ValidateBackendMessageCode(messageCode); len = ReadBuffer.ReadInt32() - 4; // Transmitted length includes itself diff --git a/src/Npgsql/NpgsqlDataReader.cs b/src/Npgsql/NpgsqlDataReader.cs index e14822bd6d..d82c9306dc 100644 --- a/src/Npgsql/NpgsqlDataReader.cs +++ b/src/Npgsql/NpgsqlDataReader.cs @@ -282,7 +282,7 @@ async Task Read(bool async, CancellationToken cancellationToken = default) ValueTask ReadMessage(bool async, CancellationToken cancellationToken = default) { - return _isSequential ? ReadMessageSequential(async) : Connector.ReadMessage(async, cancellationToken); + return _isSequential ? ReadMessageSequential(async) : Connector.ReadMessageWithCancellation(async, cancellationToken); async ValueTask ReadMessageSequential(bool async2) { @@ -348,7 +348,7 @@ async Task NextResult(bool async, bool isConsuming = false, CancellationTo await ConsumeRow(async, CancellationToken.None); while (true) { - var completedMsg = await Connector.ReadMessage(async, DataRowLoadingMode.Skip, CancellationToken.None); + var completedMsg = await Connector.ReadMessageWithCancellation(async, DataRowLoadingMode.Skip, CancellationToken.None); switch (completedMsg.Code) { case BackendMessageCode.CommandComplete: @@ -392,7 +392,7 @@ async Task NextResult(bool async, bool isConsuming = false, CancellationTo if (statement.IsPrepared) { - Expect(await Connector.ReadMessage(async, CancellationToken.None), Connector); + Expect(await Connector.ReadMessageWithCancellation(async, CancellationToken.None), Connector); RowDescription = statement.Description; } else // Non-prepared/preparing flow @@ -403,15 +403,15 @@ async Task NextResult(bool async, bool isConsuming = false, CancellationTo Debug.Assert(!pStatement.IsPrepared); if (pStatement.StatementBeingReplaced != null) { - Expect(await Connector.ReadMessage(async, CancellationToken.None), Connector); + Expect(await Connector.ReadMessageWithCancellation(async, CancellationToken.None), Connector); pStatement.StatementBeingReplaced.CompleteUnprepare(); pStatement.StatementBeingReplaced = null; } } - Expect(await Connector.ReadMessage(async, CancellationToken.None), Connector); - Expect(await Connector.ReadMessage(async, CancellationToken.None), Connector); - msg = await Connector.ReadMessage(async, CancellationToken.None); + Expect(await Connector.ReadMessageWithCancellation(async, CancellationToken.None), Connector); + Expect(await Connector.ReadMessageWithCancellation(async, CancellationToken.None), Connector); + msg = await Connector.ReadMessageWithCancellation(async, CancellationToken.None); RowDescription = statement.Description = msg.Code switch { @@ -457,7 +457,7 @@ async Task NextResult(bool async, bool isConsuming = false, CancellationTo // If output parameters are present and this is the first row of the first resultset, // we must always read it in non-sequential mode because it will be traversed twice (once // here for the parameters, then as a regular row). - msg = await Connector.ReadMessage(async, CancellationToken.None); + msg = await Connector.ReadMessageWithCancellation(async, CancellationToken.None); ProcessMessage(msg); if (msg.Code == BackendMessageCode.DataRow) PopulateOutputParameters(); @@ -481,7 +481,7 @@ async Task NextResult(bool async, bool isConsuming = false, CancellationTo } // There are no more queries, we're done. Read the RFQ. - ProcessMessage(Expect(await Connector.ReadMessage(async, CancellationToken.None), Connector)); + ProcessMessage(Expect(await Connector.ReadMessageWithCancellation(async, CancellationToken.None), Connector)); RowDescription = null; return false; } @@ -605,9 +605,9 @@ async Task NextResultSchemaOnly(bool async, CancellationToken cancellation } else { - Expect(await Connector.ReadMessage(async, CancellationToken.None), Connector); - Expect(await Connector.ReadMessage(async, CancellationToken.None), Connector); - var msg = await Connector.ReadMessage(async, CancellationToken.None); + Expect(await Connector.ReadMessageWithCancellation(async, CancellationToken.None), Connector); + Expect(await Connector.ReadMessageWithCancellation(async, CancellationToken.None), Connector); + var msg = await Connector.ReadMessageWithCancellation(async, CancellationToken.None); switch (msg.Code) { case BackendMessageCode.NoData: @@ -631,7 +631,7 @@ async Task NextResultSchemaOnly(bool async, CancellationToken cancellation // There are no more queries, we're done. Read to the RFQ. if (!_statements.All(s => s.IsPrepared)) { - ProcessMessage(Expect(await Connector.ReadMessage(async, CancellationToken.None), Connector)); + ProcessMessage(Expect(await Connector.ReadMessageWithCancellation(async, CancellationToken.None), Connector)); RowDescription = null; } diff --git a/src/Npgsql/NpgsqlReadBuffer.cs b/src/Npgsql/NpgsqlReadBuffer.cs index ddbd133552..d3e8079a22 100644 --- a/src/Npgsql/NpgsqlReadBuffer.cs +++ b/src/Npgsql/NpgsqlReadBuffer.cs @@ -128,16 +128,17 @@ internal void Ensure(int count) } public Task Ensure(int count, bool async, CancellationToken cancellationToken = default) - => Ensure(count, async, readingNotifications: false, cancellationToken); + => Ensure(count, async, readingNotifications: false, attemptPostgresCancellation: false, cancellationToken); public Task EnsureAsync(int count, CancellationToken cancellationToken = default) - => Ensure(count, async: true, readingNotifications: false, cancellationToken); + => Ensure(count, async: true, readingNotifications: false, attemptPostgresCancellation: false, cancellationToken); /// /// Ensures that bytes are available in the buffer, and if /// not, reads from the socket until enough is available. /// - internal Task Ensure(int count, bool async, bool readingNotifications, CancellationToken cancellationToken = default) + internal Task Ensure(int count, bool async, bool readingNotifications, bool attemptPostgresCancellation, + CancellationToken cancellationToken = default) { return count <= ReadBytesLeft ? Task.CompletedTask : EnsureLong(); @@ -219,7 +220,8 @@ async Task EnsureLong() // Note that if PG cancellation fails, the exception for that is already logged internally by CancelRequest. // We simply continue and throw the timeout one. - if (!wasCancellationRequested && Connector.CancelRequest(requestedByUser: false)) + // TODO: As an optimization, we can still attempt to send a cancellation request, but after that immediately break the connection + if (attemptPostgresCancellation && !wasCancellationRequested && Connector.CancelRequest(requestedByUser: false)) { // If the cancellation timeout is negative, we break the connection immediately var cancellationTimeout = Connector.Settings.CancellationTimeout; diff --git a/test/Npgsql.Tests/ReaderTests.cs b/test/Npgsql.Tests/ReaderTests.cs index d2d4a9790a..e0f672777c 100644 --- a/test/Npgsql.Tests/ReaderTests.cs +++ b/test/Npgsql.Tests/ReaderTests.cs @@ -1747,7 +1747,7 @@ await pgMock .WriteRowDescription(new FieldDescription(PostgresTypeOIDs.Bytea)) .WriteDataRowWithFlush(new byte[10000]); - using var cmd = new NpgsqlCommand("SELECT some_int FROM some_table", conn); + using var cmd = new NpgsqlCommand("SELECT some_bytea FROM some_table", conn); await using var reader = await cmd.ExecuteReaderAsync(CommandBehavior.SequentialAccess); await reader.ReadAsync(); From 67d36f77e4482e93c9e372510776cd61d97e683b Mon Sep 17 00:00:00 2001 From: Nikita Kazmin Date: Tue, 27 Oct 2020 23:55:00 +0300 Subject: [PATCH 05/10] Flipped cancellation flag --- src/Npgsql/NpgsqlBinaryExporter.cs | 12 ++++---- src/Npgsql/NpgsqlBinaryImporter.cs | 8 ++--- src/Npgsql/NpgsqlCommand.cs | 22 +++++++------- src/Npgsql/NpgsqlConnector.Auth.cs | 14 ++++----- src/Npgsql/NpgsqlConnector.cs | 49 +++++++++++++++++------------- src/Npgsql/NpgsqlDataReader.cs | 26 ++++++++-------- src/Npgsql/NpgsqlRawCopyStream.cs | 14 ++++----- 7 files changed, 76 insertions(+), 69 deletions(-) diff --git a/src/Npgsql/NpgsqlBinaryExporter.cs b/src/Npgsql/NpgsqlBinaryExporter.cs index 0472f73dfb..c1bd27325d 100644 --- a/src/Npgsql/NpgsqlBinaryExporter.cs +++ b/src/Npgsql/NpgsqlBinaryExporter.cs @@ -52,7 +52,7 @@ internal NpgsqlBinaryExporter(NpgsqlConnector connector, string copyToCommand) _connector.Flush(); CopyOutResponseMessage copyOutResponse; - var msg = _connector.ReadMessage(); + var msg = _connector.ReadMessageWithoutCancellation(); switch (msg.Code) { case BackendMessageCode.CopyOutResponse: @@ -144,9 +144,9 @@ async ValueTask StartRow(bool async, CancellationToken cancellationToken = if (numColumns == -1) { Debug.Assert(_leftToReadInDataMsg == 0); - Expect(await _connector.ReadMessage(async, cancellationToken), _connector); - Expect(await _connector.ReadMessage(async, cancellationToken), _connector); - Expect(await _connector.ReadMessage(async, cancellationToken), _connector); + Expect(await _connector.ReadMessageWithoutCancellation(async, cancellationToken), _connector); + Expect(await _connector.ReadMessageWithoutCancellation(async, cancellationToken), _connector); + Expect(await _connector.ReadMessageWithoutCancellation(async, cancellationToken), _connector); _column = -1; _isConsumed = true; return -1; @@ -384,8 +384,8 @@ async ValueTask DisposeAsync(bool async) // Read to the end _connector.SkipUntil(BackendMessageCode.CopyDone); // We intentionally do not pass a CancellationToken since we don't want to cancel cleanup - Expect(await _connector.ReadMessage(async, cancellationToken: default), _connector); - Expect(await _connector.ReadMessage(async, cancellationToken: default), _connector); + Expect(await _connector.ReadMessageWithoutCancellation(async, cancellationToken: default), _connector); + Expect(await _connector.ReadMessageWithoutCancellation(async, cancellationToken: default), _connector); } _connector.EndUserAction(); diff --git a/src/Npgsql/NpgsqlBinaryImporter.cs b/src/Npgsql/NpgsqlBinaryImporter.cs index d52b22724a..6c39942a98 100644 --- a/src/Npgsql/NpgsqlBinaryImporter.cs +++ b/src/Npgsql/NpgsqlBinaryImporter.cs @@ -56,7 +56,7 @@ internal NpgsqlBinaryImporter(NpgsqlConnector connector, string copyFromCommand) _connector.Flush(); CopyInResponseMessage copyInResponse; - var msg = _connector.ReadMessage(); + var msg = _connector.ReadMessageWithoutCancellation(); switch (msg.Code) { case BackendMessageCode.CopyInResponse: @@ -408,8 +408,8 @@ async ValueTask Complete(bool async, CancellationToken cancellationToken _buf.EndCopyMode(); await _connector.WriteCopyDone(async, cancellationToken); await _connector.Flush(async, cancellationToken); - var cmdComplete = Expect(await _connector.ReadMessage(async, cancellationToken), _connector); - Expect(await _connector.ReadMessage(async, cancellationToken), _connector); + var cmdComplete = Expect(await _connector.ReadMessageWithoutCancellation(async, cancellationToken), _connector); + Expect(await _connector.ReadMessageWithoutCancellation(async, cancellationToken), _connector); _state = ImporterState.Committed; return cmdComplete.Rows; } @@ -447,7 +447,7 @@ async Task Cancel(bool async, CancellationToken cancellationToken = default) await _connector.Flush(async, cancellationToken); try { - var msg = await _connector.ReadMessage(async, cancellationToken); + var msg = await _connector.ReadMessageWithoutCancellation(async, cancellationToken); // The CopyFail should immediately trigger an exception from the read above. throw _connector.Break( new NpgsqlException("Expected ErrorResponse when cancelling COPY but got: " + msg.Code)); diff --git a/src/Npgsql/NpgsqlCommand.cs b/src/Npgsql/NpgsqlCommand.cs index aababe7484..67717c22f3 100644 --- a/src/Npgsql/NpgsqlCommand.cs +++ b/src/Npgsql/NpgsqlCommand.cs @@ -480,8 +480,8 @@ void DeriveParametersForQuery(NpgsqlConnector connector) foreach (var statement in _statements) { - Expect(connector.ReadMessage(), connector); - var paramTypeOIDs = Expect(connector.ReadMessage(), connector).TypeOIDs; + Expect(connector.ReadMessageWithoutCancellation(), connector); + var paramTypeOIDs = Expect(connector.ReadMessageWithoutCancellation(), connector).TypeOIDs; if (statement.InputParameters.Count != paramTypeOIDs.Count) { @@ -515,7 +515,7 @@ void DeriveParametersForQuery(NpgsqlConnector connector) } } - var msg = connector.ReadMessage(); + var msg = connector.ReadMessageWithoutCancellation(); switch (msg.Code) { case BackendMessageCode.RowDescription: @@ -526,7 +526,7 @@ void DeriveParametersForQuery(NpgsqlConnector connector) } } - Expect(connector.ReadMessage(), connector); + Expect(connector.ReadMessageWithoutCancellation(), connector); sendTask.GetAwaiter().GetResult(); } } @@ -615,14 +615,14 @@ async Task PrepareLong(CancellationToken cancellationToken) { if (pStatement.StatementBeingReplaced != null) { - Expect(await connector.ReadMessageWithCancellation(async, cancellationToken), connector); + Expect(await connector.ReadMessage(async, cancellationToken), connector); pStatement.StatementBeingReplaced.CompleteUnprepare(); pStatement.StatementBeingReplaced = null; } - Expect(await connector.ReadMessageWithCancellation(async, cancellationToken), connector); - Expect(await connector.ReadMessageWithCancellation(async, cancellationToken), connector); - var msg = await connector.ReadMessageWithCancellation(async, cancellationToken); + Expect(await connector.ReadMessage(async, cancellationToken), connector); + Expect(await connector.ReadMessage(async, cancellationToken), connector); + var msg = await connector.ReadMessage(async, cancellationToken); switch (msg.Code) { case BackendMessageCode.RowDescription: @@ -661,7 +661,7 @@ async Task PrepareLong(CancellationToken cancellationToken) } } - Expect(await connector.ReadMessageWithCancellation(async, cancellationToken), connector); + Expect(await connector.ReadMessage(async, cancellationToken), connector); if (async) await sendTask; @@ -712,11 +712,11 @@ async Task Unprepare(bool async, CancellationToken cancellationToken = default) foreach (var statement in _statements) if (statement.PreparedStatement?.State == PreparedState.BeingUnprepared) { - Expect(await connector.ReadMessageWithCancellation(async, cancellationToken), connector); + Expect(await connector.ReadMessage(async, cancellationToken), connector); statement.PreparedStatement.CompleteUnprepare(); statement.PreparedStatement = null; } - Expect(await connector.ReadMessageWithCancellation(async, cancellationToken), connector); + Expect(await connector.ReadMessage(async, cancellationToken), connector); if (async) await sendTask; else diff --git a/src/Npgsql/NpgsqlConnector.Auth.cs b/src/Npgsql/NpgsqlConnector.Auth.cs index dbd0448e58..45288671ba 100644 --- a/src/Npgsql/NpgsqlConnector.Auth.cs +++ b/src/Npgsql/NpgsqlConnector.Auth.cs @@ -22,7 +22,7 @@ async Task Authenticate(string username, NpgsqlTimeout timeout, bool async, Canc Log.Trace("Authenticating...", Id); timeout.CheckAndApply(this); - var msg = Expect(await ReadMessage(async, cancellationToken), this); + var msg = Expect(await ReadMessageWithoutCancellation(async, cancellationToken), this); switch (msg.AuthRequestType) { case AuthenticationRequestType.AuthenticationOk: @@ -64,7 +64,7 @@ async Task AuthenticateCleartext(string username, bool async, CancellationToken await WritePassword(encoded, async, cancellationToken); await Flush(async, cancellationToken); - Expect(await ReadMessage(async, cancellationToken), this); + Expect(await ReadMessageWithoutCancellation(async, cancellationToken), this); } async Task AuthenticateSASL(List mechanisms, string username, bool async, CancellationToken cancellationToken = default) @@ -164,7 +164,7 @@ async Task AuthenticateSASL(List mechanisms, string username, bool async await WriteSASLInitialResponse(mechanism, PGUtil.UTF8Encoding.GetBytes($"{cbindFlag},,n=*,r={clientNonce}"), async, cancellationToken); await Flush(async, cancellationToken); - var saslContinueMsg = Expect(await ReadMessage(async, cancellationToken), this); + var saslContinueMsg = Expect(await ReadMessageWithoutCancellation(async, cancellationToken), this); if (saslContinueMsg.AuthRequestType != AuthenticationRequestType.AuthenticationSASLContinue) throw new NpgsqlException("[SASL] AuthenticationSASLFinal message expected"); var firstServerMsg = AuthenticationSCRAMServerFirstMessage.Load(saslContinueMsg.Payload); @@ -197,7 +197,7 @@ async Task AuthenticateSASL(List mechanisms, string username, bool async await WriteSASLResponse(Encoding.UTF8.GetBytes(messageStr), async, cancellationToken); await Flush(async, cancellationToken); - var saslFinalServerMsg = Expect(await ReadMessage(async, cancellationToken), this); + var saslFinalServerMsg = Expect(await ReadMessageWithoutCancellation(async, cancellationToken), this); if (saslFinalServerMsg.AuthRequestType != AuthenticationRequestType.AuthenticationSASLFinal) throw new NpgsqlException("[SASL] AuthenticationSASLFinal message expected"); @@ -205,7 +205,7 @@ async Task AuthenticateSASL(List mechanisms, string username, bool async if (scramFinalServerMsg.ServerSignature != Convert.ToBase64String(serverSignature)) throw new NpgsqlException("[SCRAM] Unable to verify server signature"); - var okMsg = Expect(await ReadMessage(async, cancellationToken), this); + var okMsg = Expect(await ReadMessageWithoutCancellation(async, cancellationToken), this); if (okMsg.AuthRequestType != AuthenticationRequestType.AuthenticationOk) throw new NpgsqlException("[SASL] Expected AuthenticationOK message"); @@ -297,7 +297,7 @@ async Task AuthenticateMD5(string username, byte[] salt, bool async, Cancellatio await WritePassword(result, async, cancellationToken); await Flush(async, cancellationToken); - Expect(await ReadMessage(async, cancellationToken), this); + Expect(await ReadMessageWithoutCancellation(async, cancellationToken), this); } async Task AuthenticateGSS(bool async) @@ -393,7 +393,7 @@ async Task Read(byte[] buffer, int offset, int count, bool async, Cancellat { if (_leftToRead == 0) { - var response = Expect(await _connector.ReadMessage(async, cancellationToken), _connector); + var response = Expect(await _connector.ReadMessageWithoutCancellation(async, cancellationToken), _connector); if (response.AuthRequestType == AuthenticationRequestType.AuthenticationOk) throw new AuthenticationCompleteException(); var gssMsg = response as AuthenticationGSSContinueMessage; diff --git a/src/Npgsql/NpgsqlConnector.cs b/src/Npgsql/NpgsqlConnector.cs index 5af2d8a9b3..11f48993d9 100644 --- a/src/Npgsql/NpgsqlConnector.cs +++ b/src/Npgsql/NpgsqlConnector.cs @@ -434,13 +434,13 @@ internal async Task Open(NpgsqlTimeout timeout, bool async, CancellationToken ca // We treat BackendKeyData as optional because some PostgreSQL-like database // don't send it (CockroachDB, CrateDB) - var msg = await ReadMessage(async, cancellationToken); + var msg = await ReadMessageWithoutCancellation(async, cancellationToken); if (msg.Code == BackendMessageCode.BackendKeyData) { var keyDataMsg = (BackendKeyDataMessage)msg; BackendProcessId = keyDataMsg.BackendProcessId; _backendSecretKey = keyDataMsg.BackendSecretKey; - msg = await ReadMessage(async, cancellationToken); + msg = await ReadMessageWithoutCancellation(async, cancellationToken); } if (msg.Code != BackendMessageCode.ReadyForQuery) throw new NpgsqlException($"Received backend message {msg.Code} while expecting ReadyForQuery. Please file a bug."); @@ -997,29 +997,33 @@ internal void PrependInternalMessage(byte[] rawMessage, int responseMessageCount #region Backend message processing + internal IBackendMessage ReadMessageWithoutCancellation(DataRowLoadingMode dataRowLoadingMode = DataRowLoadingMode.NonSequential) + => ReadMessageWithoutCancellation(false, dataRowLoadingMode, cancellationToken: default).GetAwaiter().GetResult()!; + + internal ValueTask ReadMessageWithoutCancellation(bool async, DataRowLoadingMode dataRowLoadingMode = DataRowLoadingMode.NonSequential, CancellationToken cancellationToken = default) + => DoReadMessage(async, dataRowLoadingMode, attemptPostgresCancellation: false, cancellationToken: cancellationToken)!; + + internal ValueTask ReadMessageWithoutCancellation(bool async, CancellationToken cancellationToken = default) + => DoReadMessage(async, DataRowLoadingMode.NonSequential, attemptPostgresCancellation: false, cancellationToken: cancellationToken)!; + internal IBackendMessage ReadMessage(DataRowLoadingMode dataRowLoadingMode = DataRowLoadingMode.NonSequential) - => ReadMessage(false, dataRowLoadingMode, cancellationToken: default).GetAwaiter().GetResult()!; + => ReadMessage(false, dataRowLoadingMode, cancellationToken: default).GetAwaiter().GetResult(); internal ValueTask ReadMessage(bool async, CancellationToken cancellationToken = default) => DoReadMessage(async, DataRowLoadingMode.NonSequential, cancellationToken: cancellationToken)!; - internal ValueTask ReadMessageWithCancellation(bool async, CancellationToken cancellationToken = default) - => DoReadMessage(async, DataRowLoadingMode.NonSequential, attemptPostgresCancellation: true, cancellationToken: cancellationToken)!; - internal ValueTask ReadMessage(bool async, DataRowLoadingMode dataRowLoadingMode = DataRowLoadingMode.NonSequential, CancellationToken cancellationToken = default) => DoReadMessage(async, dataRowLoadingMode, cancellationToken: cancellationToken)!; - internal ValueTask ReadMessageWithCancellation(bool async, DataRowLoadingMode dataRowLoadingMode = DataRowLoadingMode.NonSequential, CancellationToken cancellationToken = default) - => DoReadMessage(async, dataRowLoadingMode, attemptPostgresCancellation: true, cancellationToken: cancellationToken)!; - internal ValueTask ReadMessageWithNotifications(bool async, CancellationToken cancellationToken = default) - => DoReadMessage(async, DataRowLoadingMode.NonSequential, true, cancellationToken: cancellationToken); + => DoReadMessage(async, DataRowLoadingMode.NonSequential, readingNotifications: true, attemptPostgresCancellation: false, + cancellationToken: cancellationToken); ValueTask DoReadMessage( bool async, DataRowLoadingMode dataRowLoadingMode = DataRowLoadingMode.NonSequential, bool readingNotifications = false, - bool attemptPostgresCancellation = false, + bool attemptPostgresCancellation = true, CancellationToken cancellationToken = default) { if (_pendingPrependedResponses > 0 || @@ -1039,7 +1043,8 @@ internal ValueTask ReadMessageWithCancellation(bool async, Data case BackendMessageCode.ParameterStatus: case BackendMessageCode.ErrorResponse: ReadBuffer.ReadPosition--; - return ReadMessageLong(dataRowLoadingMode, readingNotifications2: false, cancellationToken2: cancellationToken); + return ReadMessageLong(dataRowLoadingMode, readingNotifications2: false, attemptPostgresCancellation2: false, + cancellationToken2: cancellationToken); case BackendMessageCode.ReadyForQuery: break; } @@ -1049,7 +1054,8 @@ internal ValueTask ReadMessageWithCancellation(bool async, Data if (len > ReadBuffer.ReadBytesLeft) { ReadBuffer.ReadPosition -= 5; - return ReadMessageLong(dataRowLoadingMode, readingNotifications2: false, cancellationToken2: cancellationToken); + return ReadMessageLong(dataRowLoadingMode, readingNotifications2: false, attemptPostgresCancellation2: false, + cancellationToken2: cancellationToken); } return new ValueTask(ParseServerMessage(ReadBuffer, messageCode, len, false)); @@ -1058,7 +1064,7 @@ internal ValueTask ReadMessageWithCancellation(bool async, Data DataRowLoadingMode dataRowLoadingMode2, bool readingNotifications2, bool isReadingPrependedMessage = false, - bool attemptPostgresCancellation2 = false, + bool attemptPostgresCancellation2 = true, CancellationToken cancellationToken2 = default) { // First read the responses of any prepended messages. @@ -1069,7 +1075,8 @@ internal ValueTask ReadMessageWithCancellation(bool async, Data // TODO: There could be room for optimization here, rather than the async call(s) ReadBuffer.Timeout = TimeSpan.FromMilliseconds(InternalCommandTimeout); for (; _pendingPrependedResponses > 0; _pendingPrependedResponses--) - await ReadMessageLong(DataRowLoadingMode.Skip, false, true, false, cancellationToken2); + await ReadMessageLong(DataRowLoadingMode.Skip, readingNotifications2: false, isReadingPrependedMessage: true, + attemptPostgresCancellation2: false, cancellationToken2); } catch (PostgresException e) { @@ -1300,7 +1307,7 @@ internal IBackendMessage SkipUntil(BackendMessageCode stopAt) while (true) { - var msg = ReadMessage(false, DataRowLoadingMode.Skip).GetAwaiter().GetResult()!; + var msg = ReadMessageWithoutCancellation(false, DataRowLoadingMode.Skip).GetAwaiter().GetResult()!; Debug.Assert(!(msg is DataRowMessage)); if (msg.Code == stopAt) return msg; @@ -2025,7 +2032,7 @@ internal async Task Wait(bool async, int timeout, CancellationToken cancel while (true) { - var msg = await ReadMessage(async, cancellationToken); + var msg = await ReadMessageWithoutCancellation(async, cancellationToken); if (msg == null) { receivedNotification = true; @@ -2086,8 +2093,8 @@ internal async Task ExecuteInternalCommand(string query, bool async, Cancellatio await WriteQuery(query, async, cancellationToken); await Flush(async, cancellationToken); - Expect(await ReadMessage(async, cancellationToken), this); - Expect(await ReadMessage(async, cancellationToken), this); + Expect(await ReadMessageWithoutCancellation(async, cancellationToken), this); + Expect(await ReadMessageWithoutCancellation(async, cancellationToken), this); } internal async Task ExecuteInternalCommand(byte[] data, bool async, CancellationToken cancellationToken = default) @@ -2098,8 +2105,8 @@ internal async Task ExecuteInternalCommand(byte[] data, bool async, Cancellation await WritePregenerated(data, async, cancellationToken); await Flush(async, cancellationToken); - Expect(await ReadMessage(async, cancellationToken), this); - Expect(await ReadMessage(async, cancellationToken), this); + Expect(await ReadMessageWithoutCancellation(async, cancellationToken), this); + Expect(await ReadMessageWithoutCancellation(async, cancellationToken), this); } #endregion diff --git a/src/Npgsql/NpgsqlDataReader.cs b/src/Npgsql/NpgsqlDataReader.cs index d82c9306dc..e14822bd6d 100644 --- a/src/Npgsql/NpgsqlDataReader.cs +++ b/src/Npgsql/NpgsqlDataReader.cs @@ -282,7 +282,7 @@ async Task Read(bool async, CancellationToken cancellationToken = default) ValueTask ReadMessage(bool async, CancellationToken cancellationToken = default) { - return _isSequential ? ReadMessageSequential(async) : Connector.ReadMessageWithCancellation(async, cancellationToken); + return _isSequential ? ReadMessageSequential(async) : Connector.ReadMessage(async, cancellationToken); async ValueTask ReadMessageSequential(bool async2) { @@ -348,7 +348,7 @@ async Task NextResult(bool async, bool isConsuming = false, CancellationTo await ConsumeRow(async, CancellationToken.None); while (true) { - var completedMsg = await Connector.ReadMessageWithCancellation(async, DataRowLoadingMode.Skip, CancellationToken.None); + var completedMsg = await Connector.ReadMessage(async, DataRowLoadingMode.Skip, CancellationToken.None); switch (completedMsg.Code) { case BackendMessageCode.CommandComplete: @@ -392,7 +392,7 @@ async Task NextResult(bool async, bool isConsuming = false, CancellationTo if (statement.IsPrepared) { - Expect(await Connector.ReadMessageWithCancellation(async, CancellationToken.None), Connector); + Expect(await Connector.ReadMessage(async, CancellationToken.None), Connector); RowDescription = statement.Description; } else // Non-prepared/preparing flow @@ -403,15 +403,15 @@ async Task NextResult(bool async, bool isConsuming = false, CancellationTo Debug.Assert(!pStatement.IsPrepared); if (pStatement.StatementBeingReplaced != null) { - Expect(await Connector.ReadMessageWithCancellation(async, CancellationToken.None), Connector); + Expect(await Connector.ReadMessage(async, CancellationToken.None), Connector); pStatement.StatementBeingReplaced.CompleteUnprepare(); pStatement.StatementBeingReplaced = null; } } - Expect(await Connector.ReadMessageWithCancellation(async, CancellationToken.None), Connector); - Expect(await Connector.ReadMessageWithCancellation(async, CancellationToken.None), Connector); - msg = await Connector.ReadMessageWithCancellation(async, CancellationToken.None); + Expect(await Connector.ReadMessage(async, CancellationToken.None), Connector); + Expect(await Connector.ReadMessage(async, CancellationToken.None), Connector); + msg = await Connector.ReadMessage(async, CancellationToken.None); RowDescription = statement.Description = msg.Code switch { @@ -457,7 +457,7 @@ async Task NextResult(bool async, bool isConsuming = false, CancellationTo // If output parameters are present and this is the first row of the first resultset, // we must always read it in non-sequential mode because it will be traversed twice (once // here for the parameters, then as a regular row). - msg = await Connector.ReadMessageWithCancellation(async, CancellationToken.None); + msg = await Connector.ReadMessage(async, CancellationToken.None); ProcessMessage(msg); if (msg.Code == BackendMessageCode.DataRow) PopulateOutputParameters(); @@ -481,7 +481,7 @@ async Task NextResult(bool async, bool isConsuming = false, CancellationTo } // There are no more queries, we're done. Read the RFQ. - ProcessMessage(Expect(await Connector.ReadMessageWithCancellation(async, CancellationToken.None), Connector)); + ProcessMessage(Expect(await Connector.ReadMessage(async, CancellationToken.None), Connector)); RowDescription = null; return false; } @@ -605,9 +605,9 @@ async Task NextResultSchemaOnly(bool async, CancellationToken cancellation } else { - Expect(await Connector.ReadMessageWithCancellation(async, CancellationToken.None), Connector); - Expect(await Connector.ReadMessageWithCancellation(async, CancellationToken.None), Connector); - var msg = await Connector.ReadMessageWithCancellation(async, CancellationToken.None); + Expect(await Connector.ReadMessage(async, CancellationToken.None), Connector); + Expect(await Connector.ReadMessage(async, CancellationToken.None), Connector); + var msg = await Connector.ReadMessage(async, CancellationToken.None); switch (msg.Code) { case BackendMessageCode.NoData: @@ -631,7 +631,7 @@ async Task NextResultSchemaOnly(bool async, CancellationToken cancellation // There are no more queries, we're done. Read to the RFQ. if (!_statements.All(s => s.IsPrepared)) { - ProcessMessage(Expect(await Connector.ReadMessageWithCancellation(async, CancellationToken.None), Connector)); + ProcessMessage(Expect(await Connector.ReadMessage(async, CancellationToken.None), Connector)); RowDescription = null; } diff --git a/src/Npgsql/NpgsqlRawCopyStream.cs b/src/Npgsql/NpgsqlRawCopyStream.cs index d392277323..c28dc2c320 100644 --- a/src/Npgsql/NpgsqlRawCopyStream.cs +++ b/src/Npgsql/NpgsqlRawCopyStream.cs @@ -61,7 +61,7 @@ internal NpgsqlRawCopyStream(NpgsqlConnector connector, string copyCommand) _connector.WriteQuery(copyCommand); _connector.Flush(); - var msg = _connector.ReadMessage(); + var msg = _connector.ReadMessageWithoutCancellation(); switch (msg.Code) { case BackendMessageCode.CopyInResponse: @@ -270,7 +270,7 @@ async ValueTask ReadCore(int count, bool async, CancellationToken cancellat { // We've consumed the current DataMessage (or haven't yet received the first), // read the next message - msg = await _connector.ReadMessage(async, cancellationToken); + msg = await _connector.ReadMessageWithoutCancellation(async, cancellationToken); } catch { @@ -284,8 +284,8 @@ async ValueTask ReadCore(int count, bool async, CancellationToken cancellat _leftToReadInDataMsg = ((CopyDataMessage)msg).Length; break; case BackendMessageCode.CopyDone: - Expect(await _connector.ReadMessage(async, cancellationToken), _connector); - Expect(await _connector.ReadMessage(async, cancellationToken), _connector); + Expect(await _connector.ReadMessageWithoutCancellation(async, cancellationToken), _connector); + Expect(await _connector.ReadMessageWithoutCancellation(async, cancellationToken), _connector); _isConsumed = true; return 0; default: @@ -340,7 +340,7 @@ async Task Cancel(bool async) await _connector.Flush(async); try { - var msg = await _connector.ReadMessage(async, cancellationToken: default); + var msg = await _connector.ReadMessageWithoutCancellation(async, cancellationToken: default); // The CopyFail should immediately trigger an exception from the read above. throw _connector.Break( new NpgsqlException("Expected ErrorResponse when cancelling COPY but got: " + msg.Code)); @@ -379,8 +379,8 @@ async ValueTask DisposeAsync(bool disposing, bool async) _writeBuf.EndCopyMode(); await _connector.WriteCopyDone(async); await _connector.Flush(async); - Expect(await _connector.ReadMessage(async, cancellationToken: default), _connector); - Expect(await _connector.ReadMessage(async, cancellationToken : default), _connector); + Expect(await _connector.ReadMessageWithoutCancellation(async, cancellationToken: default), _connector); + Expect(await _connector.ReadMessageWithoutCancellation(async, cancellationToken : default), _connector); } else { From 0df8ecc9dc7bb7cadfa02f15ebaec3a6365ee73c Mon Sep 17 00:00:00 2001 From: Nikita Kazmin Date: Wed, 28 Oct 2020 13:58:30 +0300 Subject: [PATCH 06/10] Added tests for a timeout --- test/Npgsql.Tests/ReaderTests.cs | 117 +++++++++++++++++++++++++++++-- 1 file changed, 110 insertions(+), 7 deletions(-) diff --git a/test/Npgsql.Tests/ReaderTests.cs b/test/Npgsql.Tests/ReaderTests.cs index e0f672777c..e6fb3df48a 100644 --- a/test/Npgsql.Tests/ReaderTests.cs +++ b/test/Npgsql.Tests/ReaderTests.cs @@ -1075,7 +1075,6 @@ public async Task ManyReads() } } - [Test] public async Task NullableScalar() { @@ -1724,16 +1723,84 @@ await pgMock Assert.That(conn.FullState, Is.EqualTo(ConnectionState.Broken)); } - [Test, Description("Timeouts ReadAsGetFieldValueAsync")] - [Timeout(15000)] - public async Task GetFieldValue_timeout_hard() + [Test, Description("Cancels sequential ReadAsGetFieldValueAsync")] + public async Task GetFieldValueAsync_sequential_cancel() + { + if (IsMultiplexing) + return; // Multiplexing, cancellation + + await using var postmasterMock = PgPostmasterMock.Start(ConnectionString); + using var _ = CreateTempPool(postmasterMock.ConnectionString, out var connectionString); + await using var conn = await OpenConnectionAsync(connectionString); + + // Write responses to the query we're about to send, with a single data row (we'll attempt to read two) + var pgMock = await postmasterMock.WaitForServerConnection(); + await pgMock + .WriteParseComplete() + .WriteBindComplete() + .WriteRowDescription(new FieldDescription(PostgresTypeOIDs.Bytea)) + .WriteDataRowWithFlush(new byte[10000]); + + using var cmd = new NpgsqlCommand("SELECT some_bytea FROM some_table", conn); + await using var reader = await cmd.ExecuteReaderAsync(CommandBehavior.SequentialAccess); + + await reader.ReadAsync(); + + using var cts = new CancellationTokenSource(); + var task = reader.GetFieldValueAsync(0, cts.Token); + cts.Cancel(); + + var exception = Assert.ThrowsAsync(async () => await task); + + Assert.That(conn.FullState, Is.EqualTo(ConnectionState.Broken)); + } + + [Test, Description("Cancels sequential ReadAsGetFieldValueAsync")] + public async Task IsDBNullAsync_sequential_cancel() + { + if (IsMultiplexing) + return; // Multiplexing, cancellation + + await using var postmasterMock = PgPostmasterMock.Start(ConnectionString); + using var _ = CreateTempPool(postmasterMock.ConnectionString, out var connectionString); + await using var conn = await OpenConnectionAsync(connectionString); + + // Write responses to the query we're about to send, with a single data row (we'll attempt to read two) + var pgMock = await postmasterMock.WaitForServerConnection(); + await pgMock + .WriteParseComplete() + .WriteBindComplete() + .WriteRowDescription(new FieldDescription(PostgresTypeOIDs.Bytea), new FieldDescription(PostgresTypeOIDs.Int4)) + .WriteDataRowWithFlush(new byte[10000], new byte[4]); + + using var cmd = new NpgsqlCommand("SELECT some_bytea, some_int FROM some_table", conn); + await using var reader = await cmd.ExecuteReaderAsync(CommandBehavior.SequentialAccess); + + await reader.ReadAsync(); + + using var cts = new CancellationTokenSource(); + var task = reader.IsDBNullAsync(1, cts.Token); + cts.Cancel(); + + var exception = Assert.ThrowsAsync(async () => await task); + + Assert.That(conn.FullState, Is.EqualTo(ConnectionState.Broken)); + } + + #endregion Cancellation + + #region Timeout + + [Test, Description("Timeouts sequential ReadAsGetFieldValueAsync")] + [Timeout(10000)] + public async Task GetFieldValueAsync_sequential_timeout() { if (IsMultiplexing) return; // Multiplexing, cancellation var csb = new NpgsqlConnectionStringBuilder(ConnectionString); - csb.CommandTimeout = 5; - csb.CancellationTimeout = 30000; + csb.CommandTimeout = 3; + csb.CancellationTimeout = 15000; await using var postmasterMock = PgPostmasterMock.Start(csb.ToString()); using var _ = CreateTempPool(postmasterMock.ConnectionString, out var connectionString); @@ -1760,7 +1827,43 @@ await pgMock Assert.That(conn.FullState, Is.EqualTo(ConnectionState.Broken)); } - #endregion Cancellation + [Test, Description("Timeouts sequential IsDBNullAsync")] + [Timeout(10000)] + public async Task IsDBNullAsync_sequential_timeout() + { + if (IsMultiplexing) + return; // Multiplexing, cancellation + + var csb = new NpgsqlConnectionStringBuilder(ConnectionString); + csb.CommandTimeout = 3; + csb.CancellationTimeout = 15000; + + await using var postmasterMock = PgPostmasterMock.Start(csb.ToString()); + using var _ = CreateTempPool(postmasterMock.ConnectionString, out var connectionString); + await using var conn = await OpenConnectionAsync(connectionString); + + // Write responses to the query we're about to send, with a single data row (we'll attempt to read two) + var pgMock = await postmasterMock.WaitForServerConnection(); + await pgMock + .WriteParseComplete() + .WriteBindComplete() + .WriteRowDescription(new FieldDescription(PostgresTypeOIDs.Bytea), new FieldDescription(PostgresTypeOIDs.Int4)) + .WriteDataRowWithFlush(new byte[10000], new byte[4]); + + using var cmd = new NpgsqlCommand("SELECT some_bytea, some_int FROM some_table", conn); + await using var reader = await cmd.ExecuteReaderAsync(CommandBehavior.SequentialAccess); + + await reader.ReadAsync(); + + var task = reader.GetFieldValueAsync(0); + + var exception = Assert.ThrowsAsync(async () => await task); + Assert.That(exception.InnerException, Is.TypeOf()); + + Assert.That(conn.FullState, Is.EqualTo(ConnectionState.Broken)); + } + + #endregion #region Initialization / setup / teardown From 771806d82a190bfa57ad6aa635dae20592d630b5 Mon Sep 17 00:00:00 2001 From: Nikita Kazmin Date: Wed, 28 Oct 2020 14:54:56 +0300 Subject: [PATCH 07/10] Added one more test + fixed sequential cancellation --- src/Npgsql/NpgsqlConnector.cs | 8 +++--- src/Npgsql/NpgsqlReadBuffer.cs | 9 +++++++ test/Npgsql.Tests/ReaderTests.cs | 42 +++++++++++++++++++++++++++++++- 3 files changed, 53 insertions(+), 6 deletions(-) diff --git a/src/Npgsql/NpgsqlConnector.cs b/src/Npgsql/NpgsqlConnector.cs index 11f48993d9..9135774b20 100644 --- a/src/Npgsql/NpgsqlConnector.cs +++ b/src/Npgsql/NpgsqlConnector.cs @@ -223,6 +223,9 @@ internal void FlagAsWritableForMultiplexing() volatile bool _cancellationRequested; volatile bool _userCancellationRequested; + + internal bool UserCancellationRequesed => _userCancellationRequested; + internal CancellationToken UserCancellationToken { get; set; } static readonly NpgsqlLogger Log = NpgsqlLogManager.CreateLogger(nameof(NpgsqlConnector)); @@ -1194,11 +1197,6 @@ await ReadMessageLong(DataRowLoadingMode.Skip, readingNotifications2: false, isR throw; } - catch (NpgsqlException e) when (e.InnerException is TimeoutException && _userCancellationRequested) - { - // User requested the cancellation and it timed out - throw new OperationCanceledException("Query was cancelled", e.InnerException, UserCancellationToken); - } catch (NpgsqlException) { // An ErrorResponse isn't followed by ReadyForQuery diff --git a/src/Npgsql/NpgsqlReadBuffer.cs b/src/Npgsql/NpgsqlReadBuffer.cs index d3e8079a22..ad6fef6205 100644 --- a/src/Npgsql/NpgsqlReadBuffer.cs +++ b/src/Npgsql/NpgsqlReadBuffer.cs @@ -239,6 +239,15 @@ async Task EnsureLong() } } + // There is a case, when we might call a cancellable method (NpgsqlDataReader.NextResult) + // but it times out on a sequential read (NpgsqlDataReader.ConsumeRow) + if (Connector.UserCancellationRequesed) + { + // User requested the cancellation and it timed out (or we didn't send it) + throw Connector.Break(new OperationCanceledException("Query was cancelled", e.InnerException, + Connector.UserCancellationToken)); + } + throw Connector.Break(TimeoutException()); } diff --git a/test/Npgsql.Tests/ReaderTests.cs b/test/Npgsql.Tests/ReaderTests.cs index e6fb3df48a..e9c0fe80fa 100644 --- a/test/Npgsql.Tests/ReaderTests.cs +++ b/test/Npgsql.Tests/ReaderTests.cs @@ -1680,7 +1680,7 @@ await pgMock Assert.That(conn.FullState, Is.EqualTo(ConnectionState.Broken)); } - [Test, Description("Cancels ReadAsync via the cancellation token, with unsuccessful PG cancellation (socket break)")] + [Test, Description("Cancels NextResultAsync via the cancellation token, with unsuccessful PG cancellation (socket break)")] public async Task NextResultAsync_cancel_hard() { if (IsMultiplexing) @@ -1723,6 +1723,46 @@ await pgMock Assert.That(conn.FullState, Is.EqualTo(ConnectionState.Broken)); } + [Test, Description("Cancels sequential NextResultAsync via the cancellation token")] + public async Task NextResultAsync_sequential_cancel() + { + if (IsMultiplexing) + return; // Multiplexing, cancellation + + await using var postmasterMock = PgPostmasterMock.Start(ConnectionString); + using var _ = CreateTempPool(postmasterMock.ConnectionString, out var connectionString); + await using var conn = await OpenConnectionAsync(connectionString); + + // Write responses to the query we're about to send, with a single data row (we'll attempt to read two) + var pgMock = await postmasterMock.WaitForServerConnection(); + await pgMock + .WriteParseComplete() + .WriteBindComplete() + .WriteRowDescription(new FieldDescription(PostgresTypeOIDs.Bytea)) + .WriteDataRowWithFlush(new byte[10000]); + + using var cmd = new NpgsqlCommand("SELECT some_bytea FROM some_table", conn); + await using var reader = await cmd.ExecuteReaderAsync(CommandBehavior.SequentialAccess); + + // Successfully read the first resultset + Assert.True(await reader.ReadAsync()); + + // Attempt to read the second row - simulate blocking and cancellation + var cancellationSource = new CancellationTokenSource(); + var task = reader.NextResultAsync(cancellationSource.Token); + cancellationSource.Cancel(); + + var (processId, _) = await postmasterMock.WaitForCancellationRequest(); + Assert.That(processId, Is.EqualTo(conn.ProcessID)); + + // Send no response from server, wait for the cancellation attempt to time out + var exception = Assert.ThrowsAsync(async () => await task); + Assert.That(exception.InnerException, Is.Null); + Assert.That(exception.CancellationToken, Is.EqualTo(cancellationSource.Token)); + + Assert.That(conn.FullState, Is.EqualTo(ConnectionState.Broken)); + } + [Test, Description("Cancels sequential ReadAsGetFieldValueAsync")] public async Task GetFieldValueAsync_sequential_cancel() { From 32049d687ad3e41eb94665471e08be30605576d0 Mon Sep 17 00:00:00 2001 From: Nikita Kazmin Date: Wed, 28 Oct 2020 15:06:36 +0300 Subject: [PATCH 08/10] Small fix --- src/Npgsql/NpgsqlConnector.cs | 2 +- src/Npgsql/NpgsqlReadBuffer.cs | 15 ++++++++------- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/src/Npgsql/NpgsqlConnector.cs b/src/Npgsql/NpgsqlConnector.cs index 9135774b20..8ed1161230 100644 --- a/src/Npgsql/NpgsqlConnector.cs +++ b/src/Npgsql/NpgsqlConnector.cs @@ -224,7 +224,7 @@ internal void FlagAsWritableForMultiplexing() volatile bool _cancellationRequested; volatile bool _userCancellationRequested; - internal bool UserCancellationRequesed => _userCancellationRequested; + internal bool CancellationRequesed => _cancellationRequested; internal CancellationToken UserCancellationToken { get; set; } diff --git a/src/Npgsql/NpgsqlReadBuffer.cs b/src/Npgsql/NpgsqlReadBuffer.cs index ad6fef6205..21016d2419 100644 --- a/src/Npgsql/NpgsqlReadBuffer.cs +++ b/src/Npgsql/NpgsqlReadBuffer.cs @@ -216,7 +216,7 @@ async Task EnsureLong() Debug.Assert(e is OperationCanceledException ? async : !async); if (readingNotifications) - throw TimeoutException(); + throw NpgsqlTimeoutException(); // Note that if PG cancellation fails, the exception for that is already logged internally by CancelRequest. // We simply continue and throw the timeout one. @@ -241,14 +241,14 @@ async Task EnsureLong() // There is a case, when we might call a cancellable method (NpgsqlDataReader.NextResult) // but it times out on a sequential read (NpgsqlDataReader.ConsumeRow) - if (Connector.UserCancellationRequesed) + if (Connector.CancellationRequesed) { - // User requested the cancellation and it timed out (or we didn't send it) - throw Connector.Break(new OperationCanceledException("Query was cancelled", e.InnerException, + // The cancellation was requested and it timed out (or we didn't send it) + throw Connector.Break(new OperationCanceledException("Query was cancelled", TimeoutException(), Connector.UserCancellationToken)); } - throw Connector.Break(TimeoutException()); + throw Connector.Break(NpgsqlTimeoutException()); } default: @@ -260,8 +260,9 @@ async Task EnsureLong() Cts.Stop(); NpgsqlEventSource.Log.BytesRead(totalRead); - static Exception TimeoutException() - => new NpgsqlException("Exception while reading from stream", new TimeoutException("Timeout during reading attempt")); + static Exception NpgsqlTimeoutException() => new NpgsqlException("Exception while reading from stream", TimeoutException()); + + static Exception TimeoutException() => new TimeoutException("Timeout during reading attempt"); } } From 32484fc6d2c17b2577cac214b2f1d1ae7a3635f3 Mon Sep 17 00:00:00 2001 From: Nikita Kazmin Date: Wed, 28 Oct 2020 15:07:59 +0300 Subject: [PATCH 09/10] Another small fix --- src/Npgsql/NpgsqlConnector.cs | 2 +- src/Npgsql/NpgsqlReadBuffer.cs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/Npgsql/NpgsqlConnector.cs b/src/Npgsql/NpgsqlConnector.cs index 8ed1161230..9135774b20 100644 --- a/src/Npgsql/NpgsqlConnector.cs +++ b/src/Npgsql/NpgsqlConnector.cs @@ -224,7 +224,7 @@ internal void FlagAsWritableForMultiplexing() volatile bool _cancellationRequested; volatile bool _userCancellationRequested; - internal bool CancellationRequesed => _cancellationRequested; + internal bool UserCancellationRequesed => _userCancellationRequested; internal CancellationToken UserCancellationToken { get; set; } diff --git a/src/Npgsql/NpgsqlReadBuffer.cs b/src/Npgsql/NpgsqlReadBuffer.cs index 21016d2419..c1f6da5cad 100644 --- a/src/Npgsql/NpgsqlReadBuffer.cs +++ b/src/Npgsql/NpgsqlReadBuffer.cs @@ -241,9 +241,9 @@ async Task EnsureLong() // There is a case, when we might call a cancellable method (NpgsqlDataReader.NextResult) // but it times out on a sequential read (NpgsqlDataReader.ConsumeRow) - if (Connector.CancellationRequesed) + if (Connector.UserCancellationRequesed) { - // The cancellation was requested and it timed out (or we didn't send it) + // User requested the cancellation and it timed out (or we didn't send it) throw Connector.Break(new OperationCanceledException("Query was cancelled", TimeoutException(), Connector.UserCancellationToken)); } From 2424d20df3c74485ae7e3d688fcace247631b6d1 Mon Sep 17 00:00:00 2001 From: Nikita Kazmin Date: Wed, 28 Oct 2020 15:26:03 +0300 Subject: [PATCH 10/10] Final test fixes --- src/Npgsql/NpgsqlConnector.cs | 2 +- src/Npgsql/NpgsqlReadBuffer.cs | 2 +- test/Npgsql.Tests/ReaderTests.cs | 29 +++++++++++++++++++++++------ 3 files changed, 25 insertions(+), 8 deletions(-) diff --git a/src/Npgsql/NpgsqlConnector.cs b/src/Npgsql/NpgsqlConnector.cs index 9135774b20..085287f7ae 100644 --- a/src/Npgsql/NpgsqlConnector.cs +++ b/src/Npgsql/NpgsqlConnector.cs @@ -224,7 +224,7 @@ internal void FlagAsWritableForMultiplexing() volatile bool _cancellationRequested; volatile bool _userCancellationRequested; - internal bool UserCancellationRequesed => _userCancellationRequested; + internal bool UserCancellationRequested => _userCancellationRequested; internal CancellationToken UserCancellationToken { get; set; } diff --git a/src/Npgsql/NpgsqlReadBuffer.cs b/src/Npgsql/NpgsqlReadBuffer.cs index c1f6da5cad..50fc0d75de 100644 --- a/src/Npgsql/NpgsqlReadBuffer.cs +++ b/src/Npgsql/NpgsqlReadBuffer.cs @@ -241,7 +241,7 @@ async Task EnsureLong() // There is a case, when we might call a cancellable method (NpgsqlDataReader.NextResult) // but it times out on a sequential read (NpgsqlDataReader.ConsumeRow) - if (Connector.UserCancellationRequesed) + if (Connector.UserCancellationRequested) { // User requested the cancellation and it timed out (or we didn't send it) throw Connector.Break(new OperationCanceledException("Query was cancelled", TimeoutException(), diff --git a/test/Npgsql.Tests/ReaderTests.cs b/test/Npgsql.Tests/ReaderTests.cs index e9c0fe80fa..a37e798e94 100644 --- a/test/Npgsql.Tests/ReaderTests.cs +++ b/test/Npgsql.Tests/ReaderTests.cs @@ -1729,6 +1729,9 @@ public async Task NextResultAsync_sequential_cancel() if (IsMultiplexing) return; // Multiplexing, cancellation + if (!IsSequential) + return; + await using var postmasterMock = PgPostmasterMock.Start(ConnectionString); using var _ = CreateTempPool(postmasterMock.ConnectionString, out var connectionString); await using var conn = await OpenConnectionAsync(connectionString); @@ -1742,7 +1745,7 @@ await pgMock .WriteDataRowWithFlush(new byte[10000]); using var cmd = new NpgsqlCommand("SELECT some_bytea FROM some_table", conn); - await using var reader = await cmd.ExecuteReaderAsync(CommandBehavior.SequentialAccess); + await using var reader = await cmd.ExecuteReaderAsync(Behavior); // Successfully read the first resultset Assert.True(await reader.ReadAsync()); @@ -1757,7 +1760,7 @@ await pgMock // Send no response from server, wait for the cancellation attempt to time out var exception = Assert.ThrowsAsync(async () => await task); - Assert.That(exception.InnerException, Is.Null); + Assert.That(exception.InnerException, Is.TypeOf()); Assert.That(exception.CancellationToken, Is.EqualTo(cancellationSource.Token)); Assert.That(conn.FullState, Is.EqualTo(ConnectionState.Broken)); @@ -1769,6 +1772,9 @@ public async Task GetFieldValueAsync_sequential_cancel() if (IsMultiplexing) return; // Multiplexing, cancellation + if (!IsSequential) + return; + await using var postmasterMock = PgPostmasterMock.Start(ConnectionString); using var _ = CreateTempPool(postmasterMock.ConnectionString, out var connectionString); await using var conn = await OpenConnectionAsync(connectionString); @@ -1782,7 +1788,7 @@ await pgMock .WriteDataRowWithFlush(new byte[10000]); using var cmd = new NpgsqlCommand("SELECT some_bytea FROM some_table", conn); - await using var reader = await cmd.ExecuteReaderAsync(CommandBehavior.SequentialAccess); + await using var reader = await cmd.ExecuteReaderAsync(Behavior); await reader.ReadAsync(); @@ -1791,6 +1797,7 @@ await pgMock cts.Cancel(); var exception = Assert.ThrowsAsync(async () => await task); + Assert.That(exception.InnerException, Is.Null); Assert.That(conn.FullState, Is.EqualTo(ConnectionState.Broken)); } @@ -1801,6 +1808,9 @@ public async Task IsDBNullAsync_sequential_cancel() if (IsMultiplexing) return; // Multiplexing, cancellation + if (!IsSequential) + return; + await using var postmasterMock = PgPostmasterMock.Start(ConnectionString); using var _ = CreateTempPool(postmasterMock.ConnectionString, out var connectionString); await using var conn = await OpenConnectionAsync(connectionString); @@ -1814,7 +1824,7 @@ await pgMock .WriteDataRowWithFlush(new byte[10000], new byte[4]); using var cmd = new NpgsqlCommand("SELECT some_bytea, some_int FROM some_table", conn); - await using var reader = await cmd.ExecuteReaderAsync(CommandBehavior.SequentialAccess); + await using var reader = await cmd.ExecuteReaderAsync(Behavior); await reader.ReadAsync(); @@ -1823,6 +1833,7 @@ await pgMock cts.Cancel(); var exception = Assert.ThrowsAsync(async () => await task); + Assert.That(exception.InnerException, Is.Null); Assert.That(conn.FullState, Is.EqualTo(ConnectionState.Broken)); } @@ -1838,6 +1849,9 @@ public async Task GetFieldValueAsync_sequential_timeout() if (IsMultiplexing) return; // Multiplexing, cancellation + if (!IsSequential) + return; + var csb = new NpgsqlConnectionStringBuilder(ConnectionString); csb.CommandTimeout = 3; csb.CancellationTimeout = 15000; @@ -1855,7 +1869,7 @@ await pgMock .WriteDataRowWithFlush(new byte[10000]); using var cmd = new NpgsqlCommand("SELECT some_bytea FROM some_table", conn); - await using var reader = await cmd.ExecuteReaderAsync(CommandBehavior.SequentialAccess); + await using var reader = await cmd.ExecuteReaderAsync(Behavior); await reader.ReadAsync(); @@ -1874,6 +1888,9 @@ public async Task IsDBNullAsync_sequential_timeout() if (IsMultiplexing) return; // Multiplexing, cancellation + if (!IsSequential) + return; + var csb = new NpgsqlConnectionStringBuilder(ConnectionString); csb.CommandTimeout = 3; csb.CancellationTimeout = 15000; @@ -1891,7 +1908,7 @@ await pgMock .WriteDataRowWithFlush(new byte[10000], new byte[4]); using var cmd = new NpgsqlCommand("SELECT some_bytea, some_int FROM some_table", conn); - await using var reader = await cmd.ExecuteReaderAsync(CommandBehavior.SequentialAccess); + await using var reader = await cmd.ExecuteReaderAsync(Behavior); await reader.ReadAsync();