Skip to content

Commit c88aa35

Browse files
committed
Implement ClearAsync in data sources
1 parent 0acc808 commit c88aa35

File tree

9 files changed

+83
-23
lines changed

9 files changed

+83
-23
lines changed

src/Npgsql/Internal/NpgsqlConnector.cs

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1884,11 +1884,21 @@ copyOperation is NpgsqlCopyTextWriter ||
18841884
}
18851885
}
18861886

1887-
// TODO in theory this should be async-optional, but the only I/O done here is the Terminate Flush, which is
1888-
// very unlikely to block (plus locking would need to be worked out)
18891887
internal void Close()
18901888
{
1891-
lock (this)
1889+
Close(async: false).GetAwaiter().GetResult();
1890+
}
1891+
1892+
SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1);
1893+
1894+
internal async Task Close(bool async, CancellationToken cancellationToken = default)
1895+
{
1896+
if (async)
1897+
await _semaphore.WaitAsync(cancellationToken);
1898+
else
1899+
_semaphore.Wait(cancellationToken);
1900+
1901+
try
18921902
{
18931903
if (IsReady)
18941904
{
@@ -1900,7 +1910,7 @@ internal void Close()
19001910
// see https://github.com/npgsql/npgsql/issues/3592
19011911
WriteBuffer.Clear();
19021912
WriteTerminate();
1903-
Flush();
1913+
await Flush(async, cancellationToken);
19041914
}
19051915
catch (Exception e)
19061916
{
@@ -1920,6 +1930,10 @@ internal void Close()
19201930
FullCleanup();
19211931
LogMessages.ClosedPhysicalConnection(ConnectionLogger, Host, Port, Database, UserFacingConnectionString, Id);
19221932
}
1933+
finally
1934+
{
1935+
_semaphore.Release();
1936+
}
19231937
}
19241938

19251939
internal bool TryRemovePendingEnlistedConnector(Transaction transaction)
@@ -2006,7 +2020,7 @@ internal Exception Break(Exception reason)
20062020

20072021
void FullCleanup()
20082022
{
2009-
Debug.Assert(Monitor.IsEntered(this));
2023+
Debug.Assert(Monitor.IsEntered(this) || _semaphore.CurrentCount == 0);
20102024

20112025
if (Settings.Multiplexing)
20122026
{

src/Npgsql/MultiHostDataSourceWrapper.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ static NpgsqlConnectionStringBuilder CloneSettingsForTargetSessionAttributes(
2828

2929
internal override (int Total, int Idle, int Busy) Statistics => _wrappedSource.Statistics;
3030

31-
internal override void Clear() => _wrappedSource.Clear();
31+
internal override Task Clear(bool async, CancellationToken cancellationToken = default) => _wrappedSource.Clear(async, cancellationToken);
3232
internal override ValueTask<NpgsqlConnector> Get(NpgsqlConnection conn, NpgsqlTimeout timeout, bool async, CancellationToken cancellationToken)
3333
=> _wrappedSource.Get(conn, timeout, async, cancellationToken);
3434
internal override bool TryGetIdleConnector([NotNullWhen(true)] out NpgsqlConnector? connector)

src/Npgsql/NpgsqlConnection.cs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1861,13 +1861,27 @@ public override void ChangeDatabase(string dbName)
18611861
/// </summary>
18621862
public static void ClearPool(NpgsqlConnection connection) => PoolManager.Clear(connection._connectionString);
18631863

1864+
/// <summary>
1865+
/// Clears the connection pool. All idle physical connections in the pool of the given connection are
1866+
/// immediately closed, and any busy connections which were opened before <see cref="ClearPool"/> was called
1867+
/// will be closed when returned to the pool.
1868+
/// </summary>
1869+
public static Task ClearPoolAsync(NpgsqlConnection connection, CancellationToken cancellationToken = default) => PoolManager.ClearAsync(connection._connectionString, cancellationToken);
1870+
18641871
/// <summary>
18651872
/// Clear all connection pools. All idle physical connections in all pools are immediately closed, and any busy
18661873
/// connections which were opened before <see cref="ClearAllPools"/> was called will be closed when returned
18671874
/// to their pool.
18681875
/// </summary>
18691876
public static void ClearAllPools() => PoolManager.ClearAll();
18701877

1878+
/// <summary>
1879+
/// Clear all connection pools. All idle physical connections in all pools are immediately closed, and any busy
1880+
/// connections which were opened before <see cref="ClearAllPools"/> was called will be closed when returned
1881+
/// to their pool.
1882+
/// </summary>
1883+
public static Task ClearAllPoolsAsync(CancellationToken cancellationToken = default) => PoolManager.ClearAllAsync(cancellationToken);
1884+
18711885
/// <summary>
18721886
/// Unprepares all prepared statements on this connection.
18731887
/// </summary>

src/Npgsql/NpgsqlDataSource.cs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -312,7 +312,15 @@ internal abstract ValueTask<NpgsqlConnector> Get(
312312

313313
internal abstract void Return(NpgsqlConnector connector);
314314

315-
internal abstract void Clear();
315+
internal void Clear() => Clear(async: false).GetAwaiter().GetResult();
316+
317+
internal Task ClearAsync(CancellationToken cancellationToken = default)
318+
{
319+
using (NoSynchronizationContextScope.Enter())
320+
return Clear(async: true, cancellationToken);
321+
}
322+
323+
internal abstract Task Clear(bool async, CancellationToken cancellationToken = default);
316324

317325
internal abstract bool OwnsConnectors { get; }
318326

@@ -429,7 +437,6 @@ protected sealed override ValueTask DisposeAsyncCore()
429437
return default;
430438
}
431439

432-
#pragma warning disable CS1998
433440
/// <inheritdoc cref="DisposeAsyncCore" />
434441
protected virtual async ValueTask DisposeAsyncBase()
435442
{
@@ -451,10 +458,8 @@ protected virtual async ValueTask DisposeAsyncBase()
451458

452459
_setupMappingsSemaphore.Dispose();
453460

454-
// TODO: async Clear, #4499
455-
Clear();
461+
await ClearAsync();
456462
}
457-
#pragma warning restore CS1998
458463

459464
[MethodImpl(MethodImplOptions.AggressiveInlining)]
460465
private protected void CheckDisposed()

src/Npgsql/NpgsqlMultiHostDataSource.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -351,10 +351,10 @@ internal override bool TryGetIdleConnector([NotNullWhen(true)] out NpgsqlConnect
351351
internal override ValueTask<NpgsqlConnector?> OpenNewConnector(NpgsqlConnection conn, NpgsqlTimeout timeout, bool async, CancellationToken cancellationToken)
352352
=> throw new NpgsqlException("Npgsql bug: trying to open a new connector from " + nameof(NpgsqlMultiHostDataSource));
353353

354-
internal override void Clear()
354+
internal override async Task Clear(bool async, CancellationToken cancellationToken = default)
355355
{
356356
foreach (var pool in _pools)
357-
pool.Clear();
357+
await pool.Clear(async, cancellationToken);
358358
}
359359

360360
/// <summary>

src/Npgsql/PoolManager.cs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
using System.Collections.Concurrent;
33
using System.Diagnostics.CodeAnalysis;
44
using System.Threading;
5+
using System.Threading.Tasks;
56

67
namespace Npgsql;
78

@@ -22,13 +23,27 @@ internal static void Clear(string connString)
2223
pool.Clear();
2324
}
2425

26+
internal static async Task ClearAsync(string connString, CancellationToken cancellationToken = default)
27+
{
28+
// TODO: Actually remove the pools from here, #3387 (but be careful of concurrency)
29+
if (Pools.TryGetValue(connString, out var pool))
30+
await pool.ClearAsync(cancellationToken);
31+
}
32+
2533
internal static void ClearAll()
2634
{
2735
// TODO: Actually remove the pools from here, #3387 (but be careful of concurrency)
2836
foreach (var pool in Pools.Values)
2937
pool.Clear();
3038
}
3139

40+
internal static async Task ClearAllAsync(CancellationToken cancellationToken = default)
41+
{
42+
// TODO: Actually remove the pools from here, #3387 (but be careful of concurrency)
43+
foreach (var pool in Pools.Values)
44+
await pool.ClearAsync(cancellationToken);
45+
}
46+
3247
static PoolManager()
3348
{
3449
// When the appdomain gets unloaded (e.g. web app redeployment) attempt to nicely

src/Npgsql/PoolingDataSource.cs

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -147,8 +147,8 @@ async ValueTask<NpgsqlConnector> RentAsync(
147147
if (async)
148148
{
149149
connector = await _idleConnectorReader.ReadAsync(finalToken);
150-
if (CheckIdleConnector(connector))
151-
return connector;
150+
if (await CheckIdleConnector(connector, async, cancellationToken))
151+
return connector!;
152152
}
153153
else
154154
{
@@ -203,8 +203,13 @@ internal sealed override bool TryGetIdleConnector([NotNullWhen(true)] out Npgsql
203203
return false;
204204
}
205205

206-
[MethodImpl(MethodImplOptions.AggressiveInlining)]
207206
bool CheckIdleConnector([NotNullWhen(true)] NpgsqlConnector? connector)
207+
{
208+
return CheckIdleConnector(connector, async: false).GetAwaiter().GetResult();
209+
}
210+
211+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
212+
async Task<bool> CheckIdleConnector(NpgsqlConnector? connector, bool async, CancellationToken cancellationToken = default)
208213
{
209214
if (connector is null)
210215
return false;
@@ -218,14 +223,14 @@ bool CheckIdleConnector([NotNullWhen(true)] NpgsqlConnector? connector)
218223
// if keepalive isn't turned on.
219224
if (connector.IsBroken)
220225
{
221-
CloseConnector(connector);
226+
await CloseConnector(connector, async, cancellationToken);
222227
return false;
223228
}
224229

225230
if (_connectionLifetime != TimeSpan.Zero && DateTime.UtcNow > connector.OpenTimestamp + _connectionLifetime)
226231
{
227232
LogMessages.ConnectionExceededMaximumLifetime(_logger, _connectionLifetime, connector.Id);
228-
CloseConnector(connector);
233+
await CloseConnector(connector, async, cancellationToken);
229234
return false;
230235
}
231236

@@ -320,7 +325,7 @@ internal sealed override void Return(NpgsqlConnector connector)
320325
Debug.Assert(written);
321326
}
322327

323-
internal override void Clear()
328+
internal override async Task Clear(bool async, CancellationToken cancellationToken = default)
324329
{
325330
Interlocked.Increment(ref _clearCounter);
326331

@@ -332,9 +337,9 @@ internal override void Clear()
332337
var count = _idleCount;
333338
while (count > 0 && _idleConnectorReader.TryRead(out var connector))
334339
{
335-
if (CheckIdleConnector(connector))
340+
if (await CheckIdleConnector(connector, async, cancellationToken))
336341
{
337-
CloseConnector(connector);
342+
await CloseConnector(connector!, async, cancellationToken);
338343
count--;
339344
}
340345
}
@@ -346,10 +351,15 @@ internal override void Clear()
346351
}
347352

348353
void CloseConnector(NpgsqlConnector connector)
354+
{
355+
CloseConnector(connector, async: false).GetAwaiter().GetResult();
356+
}
357+
358+
async Task CloseConnector(NpgsqlConnector connector, bool async, CancellationToken cancellationToken = default)
349359
{
350360
try
351361
{
352-
connector.Close();
362+
await connector.Close(async, cancellationToken);
353363
}
354364
catch (Exception exception)
355365
{

src/Npgsql/PublicAPI.Unshipped.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,3 @@
11
#nullable enable
2+
static Npgsql.NpgsqlConnection.ClearAllPoolsAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task!
3+
static Npgsql.NpgsqlConnection.ClearPoolAsync(Npgsql.NpgsqlConnection! connection, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task!

src/Npgsql/UnpooledDataSource.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ internal override void Return(NpgsqlConnector connector)
4747
connector.Close();
4848
}
4949

50-
internal override void Clear() {}
50+
internal override Task Clear(bool async, CancellationToken cancellationToken = default) => Task.CompletedTask;
5151

5252
internal override bool TryRentEnlistedPending(Transaction transaction, NpgsqlConnection connection,
5353
[NotNullWhen(true)] out NpgsqlConnector? connector)

0 commit comments

Comments
 (0)