Skip to content

Commit d58efec

Browse files
authored
Add COPY operations dispose on initialization failure (#6220)
Fixes #6219
1 parent 444c77a commit d58efec

4 files changed

Lines changed: 112 additions & 33 deletions

File tree

src/Npgsql/NpgsqlBinaryExporter.cs

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ public sealed class NpgsqlBinaryExporter : ICancelable
2424

2525
NpgsqlConnector _connector;
2626
NpgsqlReadBuffer _buf;
27-
bool _isConsumed, _isDisposed;
27+
ExporterState _state = ExporterState.Uninitialized;
2828
long _endOfMessagePos;
2929

3030
short _column;
@@ -91,6 +91,7 @@ internal async Task Init(string copyToCommand, bool async, CancellationToken can
9191
throw _connector.UnexpectedMessageReceived(msg.Code);
9292
}
9393

94+
_state = ExporterState.Ready;
9495
NumColumns = copyOutResponse.NumColumns;
9596
_columnInfoCache = new PgConverterInfo[NumColumns];
9697
_rowsExported = 0;
@@ -141,7 +142,7 @@ async Task ReadHeader(bool async)
141142
async ValueTask<int> StartRow(bool async, CancellationToken cancellationToken = default)
142143
{
143144
ThrowIfDisposed();
144-
if (_isConsumed)
145+
if (_state == ExporterState.Consumed)
145146
return -1;
146147

147148
using var registration = _connector.StartNestedCancellableOperation(cancellationToken);
@@ -176,7 +177,7 @@ async ValueTask<int> StartRow(bool async, CancellationToken cancellationToken =
176177
Expect<CommandCompleteMessage>(await _connector.ReadMessage(async).ConfigureAwait(false), _connector);
177178
Expect<ReadyForQueryMessage>(await _connector.ReadMessage(async).ConfigureAwait(false), _connector);
178179
_column = BeforeRow;
179-
_isConsumed = true;
180+
_state = ExporterState.Consumed;
180181
return -1;
181182
}
182183

@@ -437,7 +438,7 @@ void ThrowIfNotOnRow()
437438

438439
void ThrowIfDisposed()
439440
{
440-
if (_isDisposed)
441+
if (_state == ExporterState.Disposed)
441442
ThrowHelper.ThrowObjectDisposedException(nameof(NpgsqlBinaryExporter), "The COPY operation has already ended.");
442443
}
443444

@@ -472,10 +473,10 @@ public Task CancelAsync()
472473

473474
async ValueTask DisposeAsync(bool async)
474475
{
475-
if (_isDisposed)
476+
if (_state == ExporterState.Disposed)
476477
return;
477478

478-
if (_isConsumed)
479+
if (_state is ExporterState.Consumed or ExporterState.Uninitialized)
479480
{
480481
LogMessages.BinaryCopyOperationCompleted(_copyLogger, _rowsExported, _connector.Id);
481482
}
@@ -512,7 +513,7 @@ async ValueTask DisposeAsync(bool async)
512513

513514
void Cleanup()
514515
{
515-
Debug.Assert(!_isDisposed);
516+
Debug.Assert(_state != ExporterState.Disposed);
516517
var connector = _connector;
517518

518519
if (!ReferenceEquals(connector, null))
@@ -523,9 +524,21 @@ void Cleanup()
523524
}
524525

525526
_buf = null!;
526-
_isDisposed = true;
527+
_state = ExporterState.Disposed;
527528
}
528529
}
529530

530531
#endregion
532+
533+
#region Enums
534+
535+
enum ExporterState
536+
{
537+
Uninitialized,
538+
Ready,
539+
Consumed,
540+
Disposed
541+
}
542+
543+
#endregion Enums
531544
}

src/Npgsql/NpgsqlBinaryImporter.cs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ public sealed class NpgsqlBinaryImporter : ICancelable
2525
NpgsqlConnector _connector;
2626
NpgsqlWriteBuffer _buf;
2727

28-
ImporterState _state;
28+
ImporterState _state = ImporterState.Uninitialized;
2929

3030
/// <summary>
3131
/// The number of columns in the current (not-yet-written) row.
@@ -99,6 +99,7 @@ internal async Task Init(string copyFromCommand, bool async, CancellationToken c
9999
throw _connector.UnexpectedMessageReceived(msg.Code);
100100
}
101101

102+
_state = ImporterState.Ready;
102103
_params = new NpgsqlParameter[copyInResponse.NumColumns];
103104
_rowsImported = 0;
104105
_buf.StartCopyMode();
@@ -512,6 +513,7 @@ async ValueTask CloseAsync(bool async, CancellationToken cancellationToken = def
512513
case ImporterState.Ready:
513514
await Cancel(async, cancellationToken).ConfigureAwait(false);
514515
break;
516+
case ImporterState.Uninitialized:
515517
case ImporterState.Cancelled:
516518
case ImporterState.Committed:
517519
break;
@@ -553,6 +555,7 @@ void CheckReady()
553555
static void Throw(ImporterState state)
554556
=> throw (state switch
555557
{
558+
ImporterState.Uninitialized => throw new InvalidOperationException("The COPY operation has not been initialized."),
556559
ImporterState.Disposed => new ObjectDisposedException(typeof(NpgsqlBinaryImporter).FullName,
557560
"The COPY operation has already ended."),
558561
ImporterState.Cancelled => new InvalidOperationException("The COPY operation has already been cancelled."),
@@ -567,6 +570,7 @@ static void Throw(ImporterState state)
567570

568571
enum ImporterState
569572
{
573+
Uninitialized,
570574
Ready,
571575
Committed,
572576
Cancelled,

src/Npgsql/NpgsqlConnection.cs

Lines changed: 60 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1160,17 +1160,26 @@ async Task<NpgsqlBinaryImporter> BeginBinaryImport(bool async, string copyFromCo
11601160
LogMessages.StartingBinaryImport(connector.LoggingConfiguration.CopyLogger, connector.Id);
11611161
// no point in passing a cancellationToken here, as we register the cancellation in the Init method
11621162
connector.StartUserAction(ConnectorState.Copy, attemptPgCancellation: false);
1163+
var importer = new NpgsqlBinaryImporter(connector);
11631164
try
11641165
{
1165-
var importer = new NpgsqlBinaryImporter(connector);
11661166
await importer.Init(copyFromCommand, async, cancellationToken).ConfigureAwait(false);
11671167
connector.CurrentCopyOperation = importer;
11681168
return importer;
11691169
}
11701170
catch
11711171
{
1172-
connector.EndUserAction();
1173-
EndBindingScope(ConnectorBindingScope.Copy);
1172+
try
1173+
{
1174+
if (async)
1175+
await importer.DisposeAsync().ConfigureAwait(false);
1176+
else
1177+
importer.Dispose();
1178+
}
1179+
catch
1180+
{
1181+
// ignored
1182+
}
11741183
throw;
11751184
}
11761185
}
@@ -1210,17 +1219,26 @@ async Task<NpgsqlBinaryExporter> BeginBinaryExport(bool async, string copyToComm
12101219
LogMessages.StartingBinaryExport(connector.LoggingConfiguration.CopyLogger, connector.Id);
12111220
// no point in passing a cancellationToken here, as we register the cancellation in the Init method
12121221
connector.StartUserAction(ConnectorState.Copy, attemptPgCancellation: false);
1222+
var exporter = new NpgsqlBinaryExporter(connector);
12131223
try
12141224
{
1215-
var exporter = new NpgsqlBinaryExporter(connector);
12161225
await exporter.Init(copyToCommand, async, cancellationToken).ConfigureAwait(false);
12171226
connector.CurrentCopyOperation = exporter;
12181227
return exporter;
12191228
}
12201229
catch
12211230
{
1222-
connector.EndUserAction();
1223-
EndBindingScope(ConnectorBindingScope.Copy);
1231+
try
1232+
{
1233+
if (async)
1234+
await exporter.DisposeAsync().ConfigureAwait(false);
1235+
else
1236+
exporter.Dispose();
1237+
}
1238+
catch
1239+
{
1240+
// ignored
1241+
}
12241242
throw;
12251243
}
12261244
}
@@ -1266,18 +1284,27 @@ async Task<TextWriter> BeginTextImport(bool async, string copyFromCommand, Cance
12661284
LogMessages.StartingTextImport(connector.LoggingConfiguration.CopyLogger, connector.Id);
12671285
// no point in passing a cancellationToken here, as we register the cancellation in the Init method
12681286
connector.StartUserAction(ConnectorState.Copy, attemptPgCancellation: false);
1287+
var copyStream = new NpgsqlRawCopyStream(connector);
12691288
try
12701289
{
1271-
var copyStream = new NpgsqlRawCopyStream(connector);
12721290
await copyStream.Init(copyFromCommand, async, cancellationToken).ConfigureAwait(false);
12731291
var writer = new NpgsqlCopyTextWriter(connector, copyStream);
12741292
connector.CurrentCopyOperation = writer;
12751293
return writer;
12761294
}
12771295
catch
12781296
{
1279-
connector.EndUserAction();
1280-
EndBindingScope(ConnectorBindingScope.Copy);
1297+
try
1298+
{
1299+
if (async)
1300+
await copyStream.DisposeAsync().ConfigureAwait(false);
1301+
else
1302+
copyStream.Dispose();
1303+
}
1304+
catch
1305+
{
1306+
// ignored
1307+
}
12811308
throw;
12821309
}
12831310
}
@@ -1323,18 +1350,27 @@ async Task<TextReader> BeginTextExport(bool async, string copyToCommand, Cancell
13231350
LogMessages.StartingTextExport(connector.LoggingConfiguration.CopyLogger, connector.Id);
13241351
// no point in passing a cancellationToken here, as we register the cancellation in the Init method
13251352
connector.StartUserAction(ConnectorState.Copy, attemptPgCancellation: false);
1353+
var copyStream = new NpgsqlRawCopyStream(connector);
13261354
try
13271355
{
1328-
var copyStream = new NpgsqlRawCopyStream(connector);
13291356
await copyStream.Init(copyToCommand, async, cancellationToken).ConfigureAwait(false);
13301357
var reader = new NpgsqlCopyTextReader(connector, copyStream);
13311358
connector.CurrentCopyOperation = reader;
13321359
return reader;
13331360
}
13341361
catch
13351362
{
1336-
connector.EndUserAction();
1337-
EndBindingScope(ConnectorBindingScope.Copy);
1363+
try
1364+
{
1365+
if (async)
1366+
await copyStream.DisposeAsync().ConfigureAwait(false);
1367+
else
1368+
copyStream.Dispose();
1369+
}
1370+
catch
1371+
{
1372+
// ignored
1373+
}
13381374
throw;
13391375
}
13401376
}
@@ -1380,9 +1416,9 @@ async Task<NpgsqlRawCopyStream> BeginRawBinaryCopy(bool async, string copyComman
13801416
LogMessages.StartingRawCopy(connector.LoggingConfiguration.CopyLogger, connector.Id);
13811417
// no point in passing a cancellationToken here, as we register the cancellation in the Init method
13821418
connector.StartUserAction(ConnectorState.Copy, attemptPgCancellation: false);
1419+
var stream = new NpgsqlRawCopyStream(connector);
13831420
try
13841421
{
1385-
var stream = new NpgsqlRawCopyStream(connector);
13861422
await stream.Init(copyCommand, async, cancellationToken).ConfigureAwait(false);
13871423
if (!stream.IsBinary)
13881424
{
@@ -1395,8 +1431,17 @@ async Task<NpgsqlRawCopyStream> BeginRawBinaryCopy(bool async, string copyComman
13951431
}
13961432
catch
13971433
{
1398-
connector.EndUserAction();
1399-
EndBindingScope(ConnectorBindingScope.Copy);
1434+
try
1435+
{
1436+
if (async)
1437+
await stream.DisposeAsync().ConfigureAwait(false);
1438+
else
1439+
stream.Dispose();
1440+
}
1441+
catch
1442+
{
1443+
// ignored
1444+
}
14001445
throw;
14011446
}
14021447
}

0 commit comments

Comments
 (0)