Skip to content

Commit 7f54f0f

Browse files
authored
Fixed the connection break and connector returning to the pool (#3791)
Fixes #3705
1 parent 98e7af8 commit 7f54f0f

7 files changed

Lines changed: 107 additions & 87 deletions

File tree

src/Npgsql/Internal/NpgsqlConnector.cs

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1875,7 +1875,21 @@ internal Exception Break(Exception reason)
18751875
Cleanup();
18761876

18771877
if (connection is not null)
1878+
{
1879+
var closeLockTaken = connection.TakeCloseLock();
1880+
Debug.Assert(closeLockTaken);
1881+
if (Settings.ReplicationMode == ReplicationMode.Off)
1882+
{
1883+
Connection = null;
1884+
if (connection.ConnectorBindingScope != ConnectorBindingScope.None)
1885+
Return();
1886+
connection.EnlistedTransaction = null;
1887+
connection.Connector = null;
1888+
connection.ConnectorBindingScope = ConnectorBindingScope.None;
1889+
}
18781890
connection.FullState = ConnectionState.Broken;
1891+
connection.ReleaseCloseLock();
1892+
}
18791893
}
18801894

18811895
return reason;
@@ -1922,7 +1936,8 @@ void Cleanup()
19221936
try
19231937
{
19241938
// Will never complete asynchronously (stream is already closed)
1925-
CurrentReader.Close();
1939+
var readerCloseTask = CurrentReader.CloseAsync();
1940+
Debug.Assert(readerCloseTask.IsCompleted);
19261941
}
19271942
catch
19281943
{
@@ -1931,6 +1946,21 @@ void Cleanup()
19311946
CurrentReader = null;
19321947
}
19331948

1949+
if (CurrentCopyOperation != null)
1950+
{
1951+
try
1952+
{
1953+
// Will never complete asynchronously (stream is already closed)
1954+
var copyOperationDisposeTask = CurrentCopyOperation.DisposeAsync();
1955+
Debug.Assert(copyOperationDisposeTask.IsCompleted);
1956+
}
1957+
catch
1958+
{
1959+
// ignored
1960+
}
1961+
CurrentCopyOperation = null;
1962+
}
1963+
19341964
ClearTransaction();
19351965

19361966
#pragma warning disable CS8625

src/Npgsql/NpgsqlBinaryExporter.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -307,7 +307,6 @@ async ValueTask<T> DoRead<T>(NpgsqlTypeHandler handler, bool async, Cancellation
307307
catch (Exception e)
308308
{
309309
_connector.Break(e);
310-
Cleanup();
311310
throw;
312311
}
313312
}
@@ -410,7 +409,7 @@ async ValueTask DisposeAsync(bool async)
410409
if (_isDisposed)
411410
return;
412411

413-
if (!_isConsumed)
412+
if (!_isConsumed && !_connector.IsBroken)
414413
{
415414
try
416415
{
@@ -440,6 +439,7 @@ async ValueTask DisposeAsync(bool async)
440439
#pragma warning disable CS8625
441440
void Cleanup()
442441
{
442+
Debug.Assert(!_isDisposed);
443443
var connector = _connector;
444444
Log.Debug("COPY operation ended", connector?.Id ?? -1);
445445

src/Npgsql/NpgsqlBinaryImporter.cs

Lines changed: 27 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System;
2+
using System.Diagnostics;
23
using System.Diagnostics.CodeAnalysis;
34
using System.Threading;
45
using System.Threading.Tasks;
@@ -136,20 +137,11 @@ async Task StartRow(bool async, CancellationToken cancellationToken = default)
136137
if (_column != -1 && _column != NumColumns)
137138
ThrowHelper.ThrowInvalidOperationException_BinaryImportParametersMismatch(NumColumns, _column);
138139

139-
try
140-
{
141-
if (_buf.WriteSpaceLeft < 2)
142-
await _buf.Flush(async, cancellationToken);
143-
_buf.WriteInt16(NumColumns);
140+
if (_buf.WriteSpaceLeft < 2)
141+
await _buf.Flush(async, cancellationToken);
142+
_buf.WriteInt16(NumColumns);
144143

145-
_column = 0;
146-
}
147-
catch
148-
{
149-
// An exception here will have already broken the connection etc.
150-
Cleanup();
151-
throw;
152-
}
144+
_column = 0;
153145
}
154146

155147
/// <summary>
@@ -319,35 +311,26 @@ async Task Write<T>([AllowNull] T value, NpgsqlParameter param, bool async, Canc
319311
return;
320312
}
321313

322-
try
314+
if (typeof(T) == typeof(object))
323315
{
324-
if (typeof(T) == typeof(object))
325-
{
326-
param.Value = value;
327-
}
328-
else
329-
{
330-
if (param is not NpgsqlParameter<T> typedParam)
331-
{
332-
_params[_column] = typedParam = new NpgsqlParameter<T>();
333-
typedParam.NpgsqlDbType = param.NpgsqlDbType;
334-
param = typedParam;
335-
}
336-
typedParam.TypedValue = value;
337-
}
338-
param.ResolveHandler(_connector.TypeMapper);
339-
param.ValidateAndGetLength();
340-
param.LengthCache?.Rewind();
341-
await param.WriteWithLength(_buf, async, cancellationToken);
342-
param.LengthCache?.Clear();
343-
_column++;
316+
param.Value = value;
344317
}
345-
catch
318+
else
346319
{
347-
// An exception here will have already broken the connection etc.
348-
Cleanup();
349-
throw;
320+
if (param is not NpgsqlParameter<T> typedParam)
321+
{
322+
_params[_column] = typedParam = new NpgsqlParameter<T>();
323+
typedParam.NpgsqlDbType = param.NpgsqlDbType;
324+
param = typedParam;
325+
}
326+
typedParam.TypedValue = value;
350327
}
328+
param.ResolveHandler(_connector.TypeMapper);
329+
param.ValidateAndGetLength();
330+
param.LengthCache?.Rewind();
331+
await param.WriteWithLength(_buf, async, cancellationToken);
332+
param.LengthCache?.Clear();
333+
_column++;
351334
}
352335

353336
/// <summary>
@@ -372,20 +355,11 @@ async Task WriteNull(bool async, CancellationToken cancellationToken = default)
372355
if (_column == -1)
373356
throw new InvalidOperationException("A row hasn't been started");
374357

375-
try
376-
{
377-
if (_buf.WriteSpaceLeft < 4)
378-
await _buf.Flush(async, cancellationToken);
358+
if (_buf.WriteSpaceLeft < 4)
359+
await _buf.Flush(async, cancellationToken);
379360

380-
_buf.WriteInt32(-1);
381-
_column++;
382-
}
383-
catch
384-
{
385-
// An exception here will have already broken the connection etc.
386-
Cleanup();
387-
throw;
388-
}
361+
_buf.WriteInt32(-1);
362+
_column++;
389363
}
390364

391365
/// <summary>
@@ -472,7 +446,6 @@ async ValueTask<ulong> Complete(bool async, CancellationToken cancellationToken
472446
}
473447
catch
474448
{
475-
// An exception here will have already broken the connection etc.
476449
Cleanup();
477450
throw;
478451
}
@@ -581,6 +554,8 @@ async ValueTask CloseAsync(bool async, CancellationToken cancellationToken = def
581554
#pragma warning disable CS8625
582555
void Cleanup()
583556
{
557+
if (_state == ImporterState.Disposed)
558+
return;
584559
var connector = _connector;
585560
Log.Debug("COPY operation ended", connector?.Id ?? -1);
586561

src/Npgsql/NpgsqlConnection.cs

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -731,7 +731,7 @@ public override void EnlistTransaction(Transaction? transaction)
731731
/// Releases the connection. If the connection is pooled, it will be returned to the pool and made available for re-use.
732732
/// If it is non-pooled, the physical connection will be closed.
733733
/// </summary>
734-
public override void Close() => Close(async: false);
734+
public override void Close() => Close(async: false).GetAwaiter().GetResult();
735735

736736
/// <summary>
737737
/// Releases the connection. If the connection is pooled, it will be returned to the pool and made available for re-use.
@@ -747,28 +747,34 @@ public override Task CloseAsync()
747747
return Close(async: true);
748748
}
749749

750+
internal bool TakeCloseLock() => Interlocked.Exchange(ref _closing, 1) == 0;
751+
752+
internal void ReleaseCloseLock() => Volatile.Write(ref _closing, 0);
753+
750754
internal Task Close(bool async)
751755
{
752756
// Even though NpgsqlConnection isn't thread safe we'll make sure this part is.
753757
// Because we really don't want double returns to the pool.
754-
if (Interlocked.Exchange(ref _closing, 1) == 1)
758+
if (!TakeCloseLock())
755759
return Task.CompletedTask;
756760

757761
switch (FullState)
758762
{
759763
case ConnectionState.Open:
760764
case ConnectionState.Open | ConnectionState.Executing:
761765
case ConnectionState.Open | ConnectionState.Fetching:
762-
case ConnectionState.Broken:
763766
break;
767+
case ConnectionState.Broken:
768+
FullState = ConnectionState.Closed;
769+
goto case ConnectionState.Closed;
764770
case ConnectionState.Closed:
765-
Volatile.Write(ref _closing, 0);
771+
ReleaseCloseLock();
766772
return Task.CompletedTask;
767773
case ConnectionState.Connecting:
768-
Volatile.Write(ref _closing, 0);
774+
ReleaseCloseLock();
769775
throw new InvalidOperationException("Can't close, connection is in state " + FullState);
770776
default:
771-
Volatile.Write(ref _closing, 0);
777+
ReleaseCloseLock();
772778
throw new ArgumentOutOfRangeException("Unknown connection state: " + FullState);
773779
}
774780

@@ -779,7 +785,7 @@ internal Task Close(bool async)
779785
// TODO: Consider falling through to the regular reset logic. This adds some unneeded conditions
780786
// and assignment but actual perf impact should be negligible (measure).
781787
Debug.Assert(Connector == null);
782-
Volatile.Write(ref _closing, 0);
788+
ReleaseCloseLock();
783789

784790
FullState = ConnectionState.Closed;
785791
Log.Debug("Connection closed (multiplexing)");
@@ -820,14 +826,7 @@ async Task CloseAsync(bool async)
820826
Debug.Assert(connector.CurrentReader == null);
821827
Debug.Assert(connector.CurrentCopyOperation == null);
822828

823-
if (connector.IsBroken)
824-
{
825-
connector.Connection = null;
826-
connector.Return();
827-
828-
EnlistedTransaction = null;
829-
}
830-
else if (EnlistedTransaction != null)
829+
if (EnlistedTransaction != null)
831830
{
832831
// A System.Transactions transaction is still in progress
833832

@@ -881,7 +880,7 @@ async Task CloseAsync(bool async)
881880
}
882881
finally
883882
{
884-
Volatile.Write(ref _closing, 0);
883+
ReleaseCloseLock();
885884
}
886885
}
887886

@@ -1768,7 +1767,8 @@ internal T CheckOpenAndRunInTemporaryScope<T>(Func<NpgsqlConnector, T> func)
17681767
/// </remarks>
17691768
internal void EndBindingScope(ConnectorBindingScope scope)
17701769
{
1771-
Debug.Assert(ConnectorBindingScope != ConnectorBindingScope.None, $"Ending binding scope {scope} but connection's scope is null");
1770+
Debug.Assert(ConnectorBindingScope != ConnectorBindingScope.None || FullState == ConnectionState.Broken,
1771+
$"Ending binding scope {scope} but connection's scope is null");
17721772

17731773
if (scope != ConnectorBindingScope)
17741774
return;

src/Npgsql/NpgsqlDataReader.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -899,6 +899,9 @@ internal async Task Close(bool connectionClosing, bool async, bool isDisposing)
899899
return;
900900
}
901901

902+
// Whenever a connector is broken, it also closes the current reader.
903+
Connector.CurrentReader = null;
904+
902905
switch (Connector.State)
903906
{
904907
case ConnectorState.Ready:

src/Npgsql/NpgsqlRawCopyStream.cs

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,6 @@ public override void Write(ReadOnlySpan<byte> buffer)
159159
catch (Exception e)
160160
{
161161
_connector.Break(e);
162-
Cleanup();
163162
throw;
164163
}
165164
}
@@ -205,7 +204,6 @@ async ValueTask WriteAsyncInternal(ReadOnlyMemory<byte> buffer, CancellationToke
205204
catch (Exception e)
206205
{
207206
_connector.Break(e);
208-
Cleanup();
209207
throw;
210208
}
211209
}
@@ -299,7 +297,8 @@ async ValueTask<int> ReadCore(int count, bool async, CancellationToken cancellat
299297
}
300298
catch
301299
{
302-
Cleanup();
300+
if (!_isDisposed)
301+
Cleanup();
303302
throw;
304303
}
305304

@@ -375,9 +374,8 @@ async Task Cancel(bool async)
375374
_connector.EndUserAction();
376375
Cleanup();
377376

378-
if (e.SqlState == PostgresErrorCodes.QueryCanceled)
379-
return;
380-
throw;
377+
if (e.SqlState != PostgresErrorCodes.QueryCanceled)
378+
throw;
381379
}
382380
}
383381
else
@@ -402,10 +400,13 @@ public override ValueTask DisposeAsync()
402400

403401
async ValueTask DisposeAsync(bool disposing, bool async)
404402
{
405-
if (_isDisposed || !disposing) { return; }
403+
if (_isDisposed || !disposing)
404+
return;
406405

407406
try
408407
{
408+
_connector.CurrentCopyOperation = null;
409+
409410
if (CanWrite)
410411
{
411412
await FlushAsync(async);
@@ -448,6 +449,7 @@ async ValueTask DisposeAsync(bool disposing, bool async)
448449
#pragma warning disable CS8625
449450
void Cleanup()
450451
{
452+
Debug.Assert(!_isDisposed);
451453
Log.Debug("COPY operation ended", _connector.Id);
452454
_connector.CurrentCopyOperation = null;
453455
_connector.Connection?.EndBindingScope(ConnectorBindingScope.Copy);
@@ -461,7 +463,7 @@ void Cleanup()
461463
void CheckDisposed()
462464
{
463465
if (_isDisposed) {
464-
throw new ObjectDisposedException(GetType().FullName, "The COPY operation has already ended.");
466+
throw new ObjectDisposedException(nameof(NpgsqlRawCopyStream), "The COPY operation has already ended.");
465467
}
466468
}
467469

0 commit comments

Comments
 (0)