Skip to content
Merged
14 changes: 7 additions & 7 deletions src/Npgsql/NpgsqlCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -615,14 +615,14 @@ async Task PrepareLong(CancellationToken cancellationToken)
{
if (pStatement.StatementBeingReplaced != null)
{
Expect<CloseCompletedMessage>(await connector.ReadMessage(async, cancellationToken), connector);
Expect<CloseCompletedMessage>(await connector.ReadMessageWithCancellation(async, cancellationToken), connector);
Comment thread
roji marked this conversation as resolved.
Outdated
pStatement.StatementBeingReplaced.CompleteUnprepare();
pStatement.StatementBeingReplaced = null;
}

Expect<ParseCompleteMessage>(await connector.ReadMessage(async, cancellationToken), connector);
Expect<ParameterDescriptionMessage>(await connector.ReadMessage(async, cancellationToken), connector);
var msg = await connector.ReadMessage(async, cancellationToken);
Expect<ParseCompleteMessage>(await connector.ReadMessageWithCancellation(async, cancellationToken), connector);
Expect<ParameterDescriptionMessage>(await connector.ReadMessageWithCancellation(async, cancellationToken), connector);
var msg = await connector.ReadMessageWithCancellation(async, cancellationToken);
switch (msg.Code)
{
case BackendMessageCode.RowDescription:
Expand Down Expand Up @@ -661,7 +661,7 @@ async Task PrepareLong(CancellationToken cancellationToken)
}
}

Expect<ReadyForQueryMessage>(await connector.ReadMessage(async, cancellationToken), connector);
Expect<ReadyForQueryMessage>(await connector.ReadMessageWithCancellation(async, cancellationToken), connector);

if (async)
await sendTask;
Expand Down Expand Up @@ -712,11 +712,11 @@ async Task Unprepare(bool async, CancellationToken cancellationToken = default)
foreach (var statement in _statements)
if (statement.PreparedStatement?.State == PreparedState.BeingUnprepared)
{
Expect<CloseCompletedMessage>(await connector.ReadMessage(async, cancellationToken), connector);
Expect<CloseCompletedMessage>(await connector.ReadMessageWithCancellation(async, cancellationToken), connector);
statement.PreparedStatement.CompleteUnprepare();
statement.PreparedStatement = null;
}
Expect<ReadyForQueryMessage>(await connector.ReadMessage(async, cancellationToken), connector);
Expect<ReadyForQueryMessage>(await connector.ReadMessageWithCancellation(async, cancellationToken), connector);
if (async)
await sendTask;
else
Expand Down
15 changes: 12 additions & 3 deletions src/Npgsql/NpgsqlConnector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1003,24 +1003,32 @@ internal IBackendMessage ReadMessage(DataRowLoadingMode dataRowLoadingMode = Dat
internal ValueTask<IBackendMessage> ReadMessage(bool async, CancellationToken cancellationToken = default)
=> DoReadMessage(async, DataRowLoadingMode.NonSequential, cancellationToken: cancellationToken)!;

internal ValueTask<IBackendMessage> ReadMessageWithCancellation(bool async, CancellationToken cancellationToken = default)
=> DoReadMessage(async, DataRowLoadingMode.NonSequential, attemptPostgresCancellation: true, cancellationToken: cancellationToken)!;

internal ValueTask<IBackendMessage> ReadMessage(bool async, DataRowLoadingMode dataRowLoadingMode = DataRowLoadingMode.NonSequential, CancellationToken cancellationToken = default)
=> DoReadMessage(async, dataRowLoadingMode, cancellationToken: cancellationToken)!;

internal ValueTask<IBackendMessage> ReadMessageWithCancellation(bool async, DataRowLoadingMode dataRowLoadingMode = DataRowLoadingMode.NonSequential, CancellationToken cancellationToken = default)
=> DoReadMessage(async, dataRowLoadingMode, attemptPostgresCancellation: true, cancellationToken: cancellationToken)!;

internal ValueTask<IBackendMessage?> ReadMessageWithNotifications(bool async, CancellationToken cancellationToken = default)
=> DoReadMessage(async, DataRowLoadingMode.NonSequential, true, cancellationToken: cancellationToken);

ValueTask<IBackendMessage?> DoReadMessage(
bool async,
DataRowLoadingMode dataRowLoadingMode = DataRowLoadingMode.NonSequential,
bool readingNotifications = false,
bool attemptPostgresCancellation = false,
CancellationToken cancellationToken = default)
{
if (_pendingPrependedResponses > 0 ||
dataRowLoadingMode != DataRowLoadingMode.NonSequential ||
readingNotifications ||
ReadBuffer.ReadBytesLeft < 5)
{
return ReadMessageLong(dataRowLoadingMode, readingNotifications, cancellationToken2: cancellationToken);
return ReadMessageLong(dataRowLoadingMode, readingNotifications2: readingNotifications,
attemptPostgresCancellation2: attemptPostgresCancellation, cancellationToken2: cancellationToken);
}

var messageCode = (BackendMessageCode)ReadBuffer.ReadByte();
Expand Down Expand Up @@ -1050,6 +1058,7 @@ internal ValueTask<IBackendMessage> ReadMessage(bool async, DataRowLoadingMode d
DataRowLoadingMode dataRowLoadingMode2,
bool readingNotifications2,
bool isReadingPrependedMessage = false,
bool attemptPostgresCancellation2 = false,
CancellationToken cancellationToken2 = default)
{
// First read the responses of any prepended messages.
Expand All @@ -1060,7 +1069,7 @@ internal ValueTask<IBackendMessage> ReadMessage(bool async, DataRowLoadingMode d
// TODO: There could be room for optimization here, rather than the async call(s)
ReadBuffer.Timeout = TimeSpan.FromMilliseconds(InternalCommandTimeout);
for (; _pendingPrependedResponses > 0; _pendingPrependedResponses--)
await ReadMessageLong(DataRowLoadingMode.Skip, false, true, cancellationToken2);
await ReadMessageLong(DataRowLoadingMode.Skip, false, true, false, cancellationToken2);
Comment thread
vonzshik marked this conversation as resolved.
Outdated
}
catch (PostgresException e)
{
Expand All @@ -1076,7 +1085,7 @@ internal ValueTask<IBackendMessage> ReadMessage(bool async, DataRowLoadingMode d

while (true)
{
await ReadBuffer.Ensure(5, async, readingNotifications2, cancellationToken2);
await ReadBuffer.Ensure(5, async, readingNotifications2, attemptPostgresCancellation2, cancellationToken2);
messageCode = (BackendMessageCode)ReadBuffer.ReadByte();
PGUtil.ValidateBackendMessageCode(messageCode);
len = ReadBuffer.ReadInt32() - 4; // Transmitted length includes itself
Expand Down
26 changes: 13 additions & 13 deletions src/Npgsql/NpgsqlDataReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ async Task<bool> Read(bool async, CancellationToken cancellationToken = default)

ValueTask<IBackendMessage> ReadMessage(bool async, CancellationToken cancellationToken = default)
{
return _isSequential ? ReadMessageSequential(async) : Connector.ReadMessage(async, cancellationToken);
return _isSequential ? ReadMessageSequential(async) : Connector.ReadMessageWithCancellation(async, cancellationToken);

async ValueTask<IBackendMessage> ReadMessageSequential(bool async2)
{
Expand Down Expand Up @@ -348,7 +348,7 @@ async Task<bool> NextResult(bool async, bool isConsuming = false, CancellationTo
await ConsumeRow(async, CancellationToken.None);
while (true)
{
var completedMsg = await Connector.ReadMessage(async, DataRowLoadingMode.Skip, CancellationToken.None);
var completedMsg = await Connector.ReadMessageWithCancellation(async, DataRowLoadingMode.Skip, CancellationToken.None);
switch (completedMsg.Code)
{
case BackendMessageCode.CommandComplete:
Expand Down Expand Up @@ -392,7 +392,7 @@ async Task<bool> NextResult(bool async, bool isConsuming = false, CancellationTo

if (statement.IsPrepared)
{
Expect<BindCompleteMessage>(await Connector.ReadMessage(async, CancellationToken.None), Connector);
Expect<BindCompleteMessage>(await Connector.ReadMessageWithCancellation(async, CancellationToken.None), Connector);
RowDescription = statement.Description;
}
else // Non-prepared/preparing flow
Expand All @@ -403,15 +403,15 @@ async Task<bool> NextResult(bool async, bool isConsuming = false, CancellationTo
Debug.Assert(!pStatement.IsPrepared);
if (pStatement.StatementBeingReplaced != null)
{
Expect<CloseCompletedMessage>(await Connector.ReadMessage(async, CancellationToken.None), Connector);
Expect<CloseCompletedMessage>(await Connector.ReadMessageWithCancellation(async, CancellationToken.None), Connector);
pStatement.StatementBeingReplaced.CompleteUnprepare();
pStatement.StatementBeingReplaced = null;
}
}

Expect<ParseCompleteMessage>(await Connector.ReadMessage(async, CancellationToken.None), Connector);
Expect<BindCompleteMessage>(await Connector.ReadMessage(async, CancellationToken.None), Connector);
msg = await Connector.ReadMessage(async, CancellationToken.None);
Expect<ParseCompleteMessage>(await Connector.ReadMessageWithCancellation(async, CancellationToken.None), Connector);
Expect<BindCompleteMessage>(await Connector.ReadMessageWithCancellation(async, CancellationToken.None), Connector);
msg = await Connector.ReadMessageWithCancellation(async, CancellationToken.None);

RowDescription = statement.Description = msg.Code switch
{
Expand Down Expand Up @@ -457,7 +457,7 @@ async Task<bool> NextResult(bool async, bool isConsuming = false, CancellationTo
// If output parameters are present and this is the first row of the first resultset,
// we must always read it in non-sequential mode because it will be traversed twice (once
// here for the parameters, then as a regular row).
msg = await Connector.ReadMessage(async, CancellationToken.None);
msg = await Connector.ReadMessageWithCancellation(async, CancellationToken.None);
ProcessMessage(msg);
if (msg.Code == BackendMessageCode.DataRow)
PopulateOutputParameters();
Expand All @@ -481,7 +481,7 @@ async Task<bool> NextResult(bool async, bool isConsuming = false, CancellationTo
}

// There are no more queries, we're done. Read the RFQ.
ProcessMessage(Expect<ReadyForQueryMessage>(await Connector.ReadMessage(async, CancellationToken.None), Connector));
ProcessMessage(Expect<ReadyForQueryMessage>(await Connector.ReadMessageWithCancellation(async, CancellationToken.None), Connector));
RowDescription = null;
return false;
}
Expand Down Expand Up @@ -605,9 +605,9 @@ async Task<bool> NextResultSchemaOnly(bool async, CancellationToken cancellation
}
else
{
Expect<ParseCompleteMessage>(await Connector.ReadMessage(async, CancellationToken.None), Connector);
Expect<ParameterDescriptionMessage>(await Connector.ReadMessage(async, CancellationToken.None), Connector);
var msg = await Connector.ReadMessage(async, CancellationToken.None);
Expect<ParseCompleteMessage>(await Connector.ReadMessageWithCancellation(async, CancellationToken.None), Connector);
Expect<ParameterDescriptionMessage>(await Connector.ReadMessageWithCancellation(async, CancellationToken.None), Connector);
var msg = await Connector.ReadMessageWithCancellation(async, CancellationToken.None);
switch (msg.Code)
{
case BackendMessageCode.NoData:
Expand All @@ -631,7 +631,7 @@ async Task<bool> NextResultSchemaOnly(bool async, CancellationToken cancellation
// There are no more queries, we're done. Read to the RFQ.
if (!_statements.All(s => s.IsPrepared))
{
ProcessMessage(Expect<ReadyForQueryMessage>(await Connector.ReadMessage(async, CancellationToken.None), Connector));
ProcessMessage(Expect<ReadyForQueryMessage>(await Connector.ReadMessageWithCancellation(async, CancellationToken.None), Connector));
RowDescription = null;
}

Expand Down
15 changes: 8 additions & 7 deletions src/Npgsql/NpgsqlReadBuffer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -128,16 +128,17 @@ internal void Ensure(int count)
}

public Task Ensure(int count, bool async, CancellationToken cancellationToken = default)
=> Ensure(count, async, readingNotifications: false, cancellationToken);
=> Ensure(count, async, readingNotifications: false, attemptPostgresCancellation: false, cancellationToken);

public Task EnsureAsync(int count, CancellationToken cancellationToken = default)
=> Ensure(count, async: true, readingNotifications: false, cancellationToken);
=> Ensure(count, async: true, readingNotifications: false, attemptPostgresCancellation: false, cancellationToken);

/// <summary>
/// Ensures that <paramref name="count"/> bytes are available in the buffer, and if
/// not, reads from the socket until enough is available.
/// </summary>
internal Task Ensure(int count, bool async, bool readingNotifications, CancellationToken cancellationToken = default)
internal Task Ensure(int count, bool async, bool readingNotifications, bool attemptPostgresCancellation,
CancellationToken cancellationToken = default)
{
return count <= ReadBytesLeft ? Task.CompletedTask : EnsureLong();

Expand Down Expand Up @@ -202,10 +203,9 @@ async Task EnsureLong()

switch (e)
{
// User requested the cancellation (at this moment, it should be only WaitAsync)
// User requested the cancellation (at this moment, it is COPY operations, WaitAsync, Reader's sequential methods, authentication)
case OperationCanceledException _ when cancellationToken.IsCancellationRequested:
Debug.Assert(readingNotifications);
throw;
throw readingNotifications ? e : Connector.Break(e);

// Read timeout
case OperationCanceledException _:
Expand All @@ -220,7 +220,8 @@ async Task EnsureLong()

// Note that if PG cancellation fails, the exception for that is already logged internally by CancelRequest.
// We simply continue and throw the timeout one.
if (!wasCancellationRequested && Connector.CancelRequest(requestedByUser: false))
// TODO: As an optimization, we can still attempt to send a cancellation request, but after that immediately break the connection
if (attemptPostgresCancellation && !wasCancellationRequested && Connector.CancelRequest(requestedByUser: false))
{
// If the cancellation timeout is negative, we break the connection immediately
var cancellationTimeout = Connector.Settings.CancellationTimeout;
Expand Down
36 changes: 36 additions & 0 deletions test/Npgsql.Tests/ReaderTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1724,6 +1724,42 @@ await pgMock
Assert.That(conn.FullState, Is.EqualTo(ConnectionState.Broken));
}

[Test, Description("Timeouts ReadAsGetFieldValueAsync")]
[Timeout(15000)]
public async Task GetFieldValue_timeout_hard()
Comment thread
vonzshik marked this conversation as resolved.
Outdated
{
if (IsMultiplexing)
return; // Multiplexing, cancellation

var csb = new NpgsqlConnectionStringBuilder(ConnectionString);
csb.CommandTimeout = 5;
csb.CancellationTimeout = 30000;

await using var postmasterMock = PgPostmasterMock.Start(csb.ToString());
using var _ = CreateTempPool(postmasterMock.ConnectionString, out var connectionString);
await using var conn = await OpenConnectionAsync(connectionString);

// Write responses to the query we're about to send, with a single data row (we'll attempt to read two)
var pgMock = await postmasterMock.WaitForServerConnection();
await pgMock
.WriteParseComplete()
.WriteBindComplete()
.WriteRowDescription(new FieldDescription(PostgresTypeOIDs.Bytea))
.WriteDataRowWithFlush(new byte[10000]);

using var cmd = new NpgsqlCommand("SELECT some_bytea FROM some_table", conn);
await using var reader = await cmd.ExecuteReaderAsync(CommandBehavior.SequentialAccess);

await reader.ReadAsync();

var task = reader.GetFieldValueAsync<byte[]>(0);

var exception = Assert.ThrowsAsync<NpgsqlException>(async () => await task);
Assert.That(exception.InnerException, Is.TypeOf<TimeoutException>());

Assert.That(conn.FullState, Is.EqualTo(ConnectionState.Broken));
}

#endregion Cancellation

#region Initialization / setup / teardown
Expand Down
15 changes: 15 additions & 0 deletions test/Npgsql.Tests/Support/PgServerMock.cs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,21 @@ internal PgServerMock WriteDataRow(params byte[][] columnValues)
return this;
}

internal async Task WriteDataRowWithFlush(params byte[][] columnValues)
{
CheckDisposed();

_writeBuffer.WriteByte((byte) BackendMessageCode.DataRow);
_writeBuffer.WriteInt32(4 + 2 + columnValues.Sum(v => 4 + v.Length));
_writeBuffer.WriteInt16(columnValues.Length);

foreach (var field in columnValues)
{
_writeBuffer.WriteInt32(field.Length);
await _writeBuffer.WriteBytesRaw(field, true);
}
}

internal PgServerMock WriteCommandComplete(string tag = "")
{
CheckDisposed();
Expand Down