Skip to content
Merged
12 changes: 6 additions & 6 deletions src/Npgsql/NpgsqlBinaryExporter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ internal NpgsqlBinaryExporter(NpgsqlConnector connector, string copyToCommand)
_connector.Flush();

CopyOutResponseMessage copyOutResponse;
var msg = _connector.ReadMessage();
var msg = _connector.ReadMessageWithoutCancellation();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't we say we want PG cancellation when reading messages when exporting?

Thinking about this again, we can say that we don't think PG cancellation makes sense with COPY, because we don't think PG will hang for any reason (after all it's just pushing out data), so we only hang because of network issues. If this is true (it makes sense to me), we can just do hard cancellation. Interestingly we still allow sending PG cancellation via the Cancel method, but that wouldn't be because of hanging, just because the user decided mid-way to stop the long process.

I'm still not completely sure here.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But we're not reading here a CopyDataMessage. Is there a reason you think, that reading a CopyOutResponseMessage may hang?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, just like RawCopyStream , here we're initializing the stream. Is it really OK to cancel, when we do not even know if the copy operation has really started?

Copy link
Copy Markdown
Member

@roji roji Oct 28, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be honest, I have no idea...

Let's go with your approach, and attempt cancellation only when reading CopyData. We can always adjust this later based on user feedback if we get any.

switch (msg.Code)
{
case BackendMessageCode.CopyOutResponse:
Expand Down Expand Up @@ -144,9 +144,9 @@ async ValueTask<int> StartRow(bool async, CancellationToken cancellationToken =
if (numColumns == -1)
{
Debug.Assert(_leftToReadInDataMsg == 0);
Expect<CopyDoneMessage>(await _connector.ReadMessage(async, cancellationToken), _connector);
Expect<CommandCompleteMessage>(await _connector.ReadMessage(async, cancellationToken), _connector);
Expect<ReadyForQueryMessage>(await _connector.ReadMessage(async, cancellationToken), _connector);
Expect<CopyDoneMessage>(await _connector.ReadMessageWithoutCancellation(async, cancellationToken), _connector);
Expect<CommandCompleteMessage>(await _connector.ReadMessageWithoutCancellation(async, cancellationToken), _connector);
Expect<ReadyForQueryMessage>(await _connector.ReadMessageWithoutCancellation(async, cancellationToken), _connector);
_column = -1;
_isConsumed = true;
return -1;
Expand Down Expand Up @@ -384,8 +384,8 @@ async ValueTask DisposeAsync(bool async)
// Read to the end
_connector.SkipUntil(BackendMessageCode.CopyDone);
// We intentionally do not pass a CancellationToken since we don't want to cancel cleanup
Expect<CommandCompleteMessage>(await _connector.ReadMessage(async, cancellationToken: default), _connector);
Expect<ReadyForQueryMessage>(await _connector.ReadMessage(async, cancellationToken: default), _connector);
Expect<CommandCompleteMessage>(await _connector.ReadMessageWithoutCancellation(async, cancellationToken: default), _connector);
Expect<ReadyForQueryMessage>(await _connector.ReadMessageWithoutCancellation(async, cancellationToken: default), _connector);
}

_connector.EndUserAction();
Expand Down
8 changes: 4 additions & 4 deletions src/Npgsql/NpgsqlBinaryImporter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ internal NpgsqlBinaryImporter(NpgsqlConnector connector, string copyFromCommand)
_connector.Flush();

CopyInResponseMessage copyInResponse;
var msg = _connector.ReadMessage();
var msg = _connector.ReadMessageWithoutCancellation();
switch (msg.Code)
{
case BackendMessageCode.CopyInResponse:
Expand Down Expand Up @@ -408,8 +408,8 @@ async ValueTask<ulong> Complete(bool async, CancellationToken cancellationToken
_buf.EndCopyMode();
await _connector.WriteCopyDone(async, cancellationToken);
await _connector.Flush(async, cancellationToken);
var cmdComplete = Expect<CommandCompleteMessage>(await _connector.ReadMessage(async, cancellationToken), _connector);
Expect<ReadyForQueryMessage>(await _connector.ReadMessage(async, cancellationToken), _connector);
var cmdComplete = Expect<CommandCompleteMessage>(await _connector.ReadMessageWithoutCancellation(async, cancellationToken), _connector);
Expect<ReadyForQueryMessage>(await _connector.ReadMessageWithoutCancellation(async, cancellationToken), _connector);
_state = ImporterState.Committed;
return cmdComplete.Rows;
}
Expand Down Expand Up @@ -447,7 +447,7 @@ async Task Cancel(bool async, CancellationToken cancellationToken = default)
await _connector.Flush(async, cancellationToken);
try
{
var msg = await _connector.ReadMessage(async, cancellationToken);
var msg = await _connector.ReadMessageWithoutCancellation(async, cancellationToken);
// The CopyFail should immediately trigger an exception from the read above.
throw _connector.Break(
new NpgsqlException("Expected ErrorResponse when cancelling COPY but got: " + msg.Code));
Expand Down
8 changes: 4 additions & 4 deletions src/Npgsql/NpgsqlCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -480,8 +480,8 @@ void DeriveParametersForQuery(NpgsqlConnector connector)

foreach (var statement in _statements)
{
Expect<ParseCompleteMessage>(connector.ReadMessage(), connector);
var paramTypeOIDs = Expect<ParameterDescriptionMessage>(connector.ReadMessage(), connector).TypeOIDs;
Expect<ParseCompleteMessage>(connector.ReadMessageWithoutCancellation(), connector);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason this shouldn't do PG cancellation?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one is... actually, I'm not really sure about. Does it really can timeout because it was executing for too long?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably not - we're just asking the server to parse and describe the parameters. I think in my mind I wanted to default to trying cancellation, but like with COPY we can go with this and revisit later (this API specifically is in very low usage.

var paramTypeOIDs = Expect<ParameterDescriptionMessage>(connector.ReadMessageWithoutCancellation(), connector).TypeOIDs;

if (statement.InputParameters.Count != paramTypeOIDs.Count)
{
Expand Down Expand Up @@ -515,7 +515,7 @@ void DeriveParametersForQuery(NpgsqlConnector connector)
}
}

var msg = connector.ReadMessage();
var msg = connector.ReadMessageWithoutCancellation();
switch (msg.Code)
{
case BackendMessageCode.RowDescription:
Expand All @@ -526,7 +526,7 @@ void DeriveParametersForQuery(NpgsqlConnector connector)
}
}

Expect<ReadyForQueryMessage>(connector.ReadMessage(), connector);
Expect<ReadyForQueryMessage>(connector.ReadMessageWithoutCancellation(), connector);
sendTask.GetAwaiter().GetResult();
}
}
Expand Down
14 changes: 7 additions & 7 deletions src/Npgsql/NpgsqlConnector.Auth.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ async Task Authenticate(string username, NpgsqlTimeout timeout, bool async, Canc
Log.Trace("Authenticating...", Id);

timeout.CheckAndApply(this);
var msg = Expect<AuthenticationRequestMessage>(await ReadMessage(async, cancellationToken), this);
var msg = Expect<AuthenticationRequestMessage>(await ReadMessageWithoutCancellation(async, cancellationToken), this);
switch (msg.AuthRequestType)
{
case AuthenticationRequestType.AuthenticationOk:
Expand Down Expand Up @@ -64,7 +64,7 @@ async Task AuthenticateCleartext(string username, bool async, CancellationToken

await WritePassword(encoded, async, cancellationToken);
await Flush(async, cancellationToken);
Expect<AuthenticationRequestMessage>(await ReadMessage(async, cancellationToken), this);
Expect<AuthenticationRequestMessage>(await ReadMessageWithoutCancellation(async, cancellationToken), this);
}

async Task AuthenticateSASL(List<string> mechanisms, string username, bool async, CancellationToken cancellationToken = default)
Expand Down Expand Up @@ -164,7 +164,7 @@ async Task AuthenticateSASL(List<string> mechanisms, string username, bool async
await WriteSASLInitialResponse(mechanism, PGUtil.UTF8Encoding.GetBytes($"{cbindFlag},,n=*,r={clientNonce}"), async, cancellationToken);
await Flush(async, cancellationToken);

var saslContinueMsg = Expect<AuthenticationSASLContinueMessage>(await ReadMessage(async, cancellationToken), this);
var saslContinueMsg = Expect<AuthenticationSASLContinueMessage>(await ReadMessageWithoutCancellation(async, cancellationToken), this);
if (saslContinueMsg.AuthRequestType != AuthenticationRequestType.AuthenticationSASLContinue)
throw new NpgsqlException("[SASL] AuthenticationSASLFinal message expected");
var firstServerMsg = AuthenticationSCRAMServerFirstMessage.Load(saslContinueMsg.Payload);
Expand Down Expand Up @@ -197,15 +197,15 @@ async Task AuthenticateSASL(List<string> mechanisms, string username, bool async
await WriteSASLResponse(Encoding.UTF8.GetBytes(messageStr), async, cancellationToken);
await Flush(async, cancellationToken);

var saslFinalServerMsg = Expect<AuthenticationSASLFinalMessage>(await ReadMessage(async, cancellationToken), this);
var saslFinalServerMsg = Expect<AuthenticationSASLFinalMessage>(await ReadMessageWithoutCancellation(async, cancellationToken), this);
if (saslFinalServerMsg.AuthRequestType != AuthenticationRequestType.AuthenticationSASLFinal)
throw new NpgsqlException("[SASL] AuthenticationSASLFinal message expected");

var scramFinalServerMsg = AuthenticationSCRAMServerFinalMessage.Load(saslFinalServerMsg.Payload);
if (scramFinalServerMsg.ServerSignature != Convert.ToBase64String(serverSignature))
throw new NpgsqlException("[SCRAM] Unable to verify server signature");

var okMsg = Expect<AuthenticationRequestMessage>(await ReadMessage(async, cancellationToken), this);
var okMsg = Expect<AuthenticationRequestMessage>(await ReadMessageWithoutCancellation(async, cancellationToken), this);
if (okMsg.AuthRequestType != AuthenticationRequestType.AuthenticationOk)
throw new NpgsqlException("[SASL] Expected AuthenticationOK message");

Expand Down Expand Up @@ -297,7 +297,7 @@ async Task AuthenticateMD5(string username, byte[] salt, bool async, Cancellatio

await WritePassword(result, async, cancellationToken);
await Flush(async, cancellationToken);
Expect<AuthenticationRequestMessage>(await ReadMessage(async, cancellationToken), this);
Expect<AuthenticationRequestMessage>(await ReadMessageWithoutCancellation(async, cancellationToken), this);
}

async Task AuthenticateGSS(bool async)
Expand Down Expand Up @@ -393,7 +393,7 @@ async Task<int> Read(byte[] buffer, int offset, int count, bool async, Cancellat
{
if (_leftToRead == 0)
{
var response = Expect<AuthenticationRequestMessage>(await _connector.ReadMessage(async, cancellationToken), _connector);
var response = Expect<AuthenticationRequestMessage>(await _connector.ReadMessageWithoutCancellation(async, cancellationToken), _connector);
if (response.AuthRequestType == AuthenticationRequestType.AuthenticationOk)
throw new AuthenticationCompleteException();
var gssMsg = response as AuthenticationGSSContinueMessage;
Expand Down
Loading