Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 11 additions & 9 deletions src/Npgsql/Internal/NpgsqlConnector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2482,13 +2482,13 @@ UserAction DoStartUserAction(ConnectorState newState, NpgsqlCommand? command,
State = newState;
_currentCommand = command;

StartCancellableOperation(cancellationToken, attemptPgCancellation);

// We reset the ReadBuffer.Timeout for every user action, so it wouldn't leak from the previous query or action
// For example, we might have successfully cancelled the previous query (so the connection is not broken)
// But the next time, we call the Prepare, which doesn't set it's own timeout
// But the next time, we call the Prepare, which doesn't set its own timeout
ReadBuffer.Timeout = TimeSpan.FromSeconds(command?.CommandTimeout ?? Settings.CommandTimeout);

StartCancellableOperation(cancellationToken, attemptPgCancellation);

return new UserAction(this);
}

Expand Down Expand Up @@ -2624,10 +2624,14 @@ internal async Task<bool> Wait(bool async, int timeout, CancellationToken cancel
var keepaliveMs = Settings.KeepAlive * 1000;
while (true)
{
cancellationToken.ThrowIfCancellationRequested();

var timeoutForKeepalive = _isKeepAliveEnabled && (timeout <= 0 || keepaliveMs < timeout);
ReadBuffer.Timeout = TimeSpan.FromMilliseconds(timeoutForKeepalive ? keepaliveMs : timeout);
lock (CancelLock)
{
// We need to make sure a callback registered on CancellationToken doesn't try to concurrently change ReadBuffer.Timeout.
// Having Thread.MemoryBarrier() wouldn't be enough because ReadBuffer.Timeout isn't thread safe.
cancellationToken.ThrowIfCancellationRequested();
ReadBuffer.Timeout = TimeSpan.FromMilliseconds(timeoutForKeepalive ? keepaliveMs : timeout);
}
try
{
var msg = await ReadMessageWithNotifications(async).ConfigureAwait(false);
Expand All @@ -2651,7 +2655,6 @@ internal async Task<bool> Wait(bool async, int timeout, CancellationToken cancel
await Flush(async, cancellationToken).ConfigureAwait(false);

var receivedNotification = false;
var expectedMessageCode = BackendMessageCode.RowDescription;

while (true)
{
Expand All @@ -2676,13 +2679,12 @@ internal async Task<bool> Wait(bool async, int timeout, CancellationToken cancel
}

if (msg.Code != BackendMessageCode.ReadyForQuery)
throw new NpgsqlException($"Received unexpected message of type {msg.Code} while expecting {expectedMessageCode} as part of keepalive");
throw new NpgsqlException($"Received unexpected message of type {msg.Code} while expecting {BackendMessageCode.ReadyForQuery} as part of keepalive");

LogMessages.CompletedKeepalive(ConnectionLogger, Id);

if (receivedNotification)
return true; // Notification was received during the keepalive
cancellationToken.ThrowIfCancellationRequested();
break;
}

Expand Down