Skip to content

Commit 449f1e0

Browse files
authored
Implement batching error barrier control (#4674)
Closes #4205
1 parent 58e3a4c commit 449f1e0

9 files changed

Lines changed: 425 additions & 41 deletions

File tree

src/Npgsql/Internal/NpgsqlConnector.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1358,7 +1358,8 @@ internal ValueTask<IBackendMessage> ReadMessage(bool async, DataRowLoadingMode d
13581358
// an RFQ. Instead, the server closes the connection immediately
13591359
throw error;
13601360
}
1361-
else if (PostgresErrorCodes.IsCriticalFailure(error, clusterError: false))
1361+
1362+
if (PostgresErrorCodes.IsCriticalFailure(error, clusterError: false))
13621363
{
13631364
// Consider the connection dead
13641365
throw connector.Break(error);

src/Npgsql/NpgsqlBatch.cs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,32 @@ protected override DbTransaction? DbTransaction
5454
set => Transaction = (NpgsqlTransaction?)value;
5555
}
5656

57+
/// <summary>
58+
/// Controls whether to place error barriers between all batch commands within this batch. Default to <see langword="false" />.
59+
/// </summary>
60+
/// <remarks>
61+
/// <para>
62+
/// By default, any exception in a command causes later commands in the batch to be skipped, and earlier commands to be rolled back.
63+
/// Enabling error barriers ensures that errors do not affect other commands in the batch.
64+
/// </para>
65+
/// <para>
66+
/// Note that if the batch is executed within an explicit transaction, the first error places the transaction in a failed state,
67+
/// causing all later commands to fail in any case. As a result, this option is useful mainly when there is no explicit transaction.
68+
/// </para>
69+
/// <para>
70+
/// At the PostgreSQL wire protocol level, this corresponds to inserting a Sync message between each command, rather than grouping
71+
/// all the batch's commands behind a single terminating Sync.
72+
/// </para>
73+
/// <para>
74+
/// To control error barriers on a command-by-command basis, see <see cref="NpgsqlBatchCommand.AppendErrorBarrier" />.
75+
/// </para>
76+
/// </remarks>
77+
public bool EnableErrorBarriers
78+
{
79+
get => Command.EnableErrorBarriers;
80+
set => Command.EnableErrorBarriers = value;
81+
}
82+
5783
/// <summary>
5884
/// Marks all of the batch's result columns as either known or unknown.
5985
/// Unknown results column are requested them from PostgreSQL in text format, and Npgsql makes no

src/Npgsql/NpgsqlBatchCommand.cs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,31 @@ public override string CommandText
3232
/// <inheritdoc cref="DbBatchCommand.Parameters"/>
3333
public new NpgsqlParameterCollection Parameters { get; } = new();
3434

35+
/// <summary>
36+
/// Appends an error barrier after this batch command. Defaults to the value of <see cref="NpgsqlBatch.EnableErrorBarriers" /> on the
37+
/// batch.
38+
/// </summary>
39+
/// <remarks>
40+
/// <para>
41+
/// By default, any exception in a command causes later commands in the batch to be skipped, and earlier commands to be rolled back.
42+
/// Appending an error barrier ensures that errors from this command (or previous ones) won't cause later commands to be skipped,
43+
/// and that errors from later commands won't cause this command (or previous ones) to be rolled back).
44+
/// </para>
45+
/// <para>
46+
/// Note that if the batch is executed within an explicit transaction, the first error places the transaction in a failed state,
47+
/// causing all later commands to fail in any case. As a result, this option is useful mainly when there is no explicit transaction.
48+
/// </para>
49+
/// <para>
50+
/// At the PostgreSQL wire protocol level, this corresponds to inserting a Sync message after this command, rather than grouping
51+
/// all the batch's commands behind a single terminating Sync.
52+
/// </para>
53+
/// <para>
54+
/// Controlling error barriers on a command-by-command basis is an advanced feature, consider enabling error barriers for the entire
55+
/// batch via <see cref="NpgsqlBatch.EnableErrorBarriers" />.
56+
/// </para>
57+
/// </remarks>
58+
public bool? AppendErrorBarrier { get; set; }
59+
3560
/// <summary>
3661
/// The number of rows affected or retrieved.
3762
/// </summary>

src/Npgsql/NpgsqlCommand.cs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,8 @@ public class NpgsqlCommand : DbCommand, ICloneable, IComponent
7676
internal static readonly bool EnableSqlRewriting;
7777
#endif
7878

79+
internal bool EnableErrorBarriers { get; set; }
80+
7981
static readonly List<NpgsqlParameter> EmptyParameters = new();
8082

8183
static readonly SingleThreadSynchronizationContext SingleThreadSynchronizationContext = new("NpgsqlRemainingAsyncSendWorker");
@@ -961,12 +963,14 @@ internal Task Write(NpgsqlConnector connector, bool async, bool flush, Cancellat
961963

962964
async Task WriteExecute(NpgsqlConnector connector, bool async, bool flush, CancellationToken cancellationToken)
963965
{
966+
NpgsqlBatchCommand? batchCommand = null;
967+
964968
for (var i = 0; i < InternalBatchCommands.Count; i++)
965969
{
966970
// The following is only for deadlock avoidance when doing sync I/O (so never in multiplexing)
967971
ForceAsyncIfNecessary(ref async, i);
968972

969-
var batchCommand = InternalBatchCommands[i];
973+
batchCommand = InternalBatchCommands[i];
970974
var pStatement = batchCommand.PreparedStatement;
971975

972976
Debug.Assert(batchCommand.FinalCommandText is not null);
@@ -1000,11 +1004,17 @@ await connector.WriteBind(
10001004

10011005
await connector.WriteExecute(0, async, cancellationToken);
10021006

1007+
if (batchCommand.AppendErrorBarrier ?? EnableErrorBarriers)
1008+
await connector.WriteSync(async, cancellationToken);
1009+
10031010
if (pStatement != null)
10041011
pStatement.LastUsed = DateTime.UtcNow;
10051012
}
10061013

1007-
await connector.WriteSync(async, cancellationToken);
1014+
if (batchCommand is null || !(batchCommand.AppendErrorBarrier ?? EnableErrorBarriers))
1015+
{
1016+
await connector.WriteSync(async, cancellationToken);
1017+
}
10081018

10091019
if (flush)
10101020
await connector.Flush(async, cancellationToken);

src/Npgsql/NpgsqlDataReader.cs

Lines changed: 108 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
using System.IO;
1010
using System.Linq;
1111
using System.Runtime.CompilerServices;
12+
using System.Runtime.ExceptionServices;
1213
using System.Text;
1314
using System.Threading;
1415
using System.Threading.Tasks;
@@ -283,9 +284,24 @@ async Task<bool> Read(bool async, CancellationToken cancellationToken = default)
283284
throw new ArgumentOutOfRangeException();
284285
}
285286

286-
var msg2 = await ReadMessage(async);
287-
ProcessMessage(msg2);
288-
return msg2.Code == BackendMessageCode.DataRow;
287+
var msg = await ReadMessage(async);
288+
289+
switch (msg.Code)
290+
{
291+
case BackendMessageCode.DataRow:
292+
ProcessMessage(msg);
293+
return true;
294+
295+
case BackendMessageCode.CommandComplete:
296+
case BackendMessageCode.EmptyQueryResponse:
297+
ProcessMessage(msg);
298+
if (_statements[StatementIndex].AppendErrorBarrier ?? Command.EnableErrorBarriers)
299+
Expect<ReadyForQueryMessage>(await Connector.ReadMessage(async), Connector);
300+
return false;
301+
302+
default:
303+
throw Connector.UnexpectedMessageReceived(msg.Code);
304+
}
289305
}
290306
catch
291307
{
@@ -335,10 +351,11 @@ public override bool NextResult() => (_isSchemaOnly ? NextResultSchemaOnly(false
335351
/// <returns>A task representing the asynchronous operation.</returns>
336352
public override Task<bool> NextResultAsync(CancellationToken cancellationToken)
337353
{
338-
using (NoSynchronizationContextScope.Enter())
339-
return _isSchemaOnly
340-
? NextResultSchemaOnly(async: true, cancellationToken: cancellationToken)
341-
: NextResult(async: true, cancellationToken: cancellationToken);
354+
using var _ = NoSynchronizationContextScope.Enter();
355+
356+
return _isSchemaOnly
357+
? NextResultSchemaOnly(async: true, cancellationToken: cancellationToken)
358+
: NextResult(async: true, cancellationToken: cancellationToken);
342359
}
343360

344361
/// <summary>
@@ -370,7 +387,12 @@ async Task<bool> NextResult(bool async, bool isConsuming = false, CancellationTo
370387
case BackendMessageCode.CommandComplete:
371388
case BackendMessageCode.EmptyQueryResponse:
372389
ProcessMessage(completedMsg);
390+
391+
if (_statements[StatementIndex].AppendErrorBarrier ?? Command.EnableErrorBarriers)
392+
Expect<ReadyForQueryMessage>(await Connector.ReadMessage(async), Connector);
393+
373394
break;
395+
374396
default:
375397
continue;
376398
}
@@ -472,6 +494,10 @@ async Task<bool> NextResult(bool async, bool isConsuming = false, CancellationTo
472494
}
473495

474496
ProcessMessage(msg);
497+
498+
if (statement.AppendErrorBarrier ?? Command.EnableErrorBarriers)
499+
Expect<ReadyForQueryMessage>(await Connector.ReadMessage(async), Connector);
500+
475501
continue;
476502
}
477503

@@ -494,30 +520,32 @@ async Task<bool> NextResult(bool async, bool isConsuming = false, CancellationTo
494520
switch (msg.Code)
495521
{
496522
case BackendMessageCode.DataRow:
523+
return true;
497524
case BackendMessageCode.CommandComplete:
498-
break;
525+
if (statement.AppendErrorBarrier ?? Command.EnableErrorBarriers)
526+
Expect<ReadyForQueryMessage>(await Connector.ReadMessage(async), Connector);
527+
return true;
499528
default:
500529
throw Connector.UnexpectedMessageReceived(msg.Code);
501530
}
502-
503-
return true;
504531
}
505532

506533
// There are no more queries, we're done. Read the RFQ.
507-
ProcessMessage(Expect<ReadyForQueryMessage>(await Connector.ReadMessage(async), Connector));
534+
if (_statements.Count == 0 || !(_statements[_statements.Count - 1].AppendErrorBarrier ?? Command.EnableErrorBarriers))
535+
Expect<ReadyForQueryMessage>(await Connector.ReadMessage(async), Connector);
536+
537+
State = ReaderState.Consumed;
508538
RowDescription = null;
509539
return false;
510540
}
511541
catch (Exception e)
512542
{
513-
State = ReaderState.Consumed;
514-
515543
// Reference the triggering statement from the exception
516544
if (e is PostgresException postgresException && StatementIndex >= 0 && StatementIndex < _statements.Count)
517545
{
518546
postgresException.BatchCommand = _statements[StatementIndex];
519547

520-
// Prevent the command or batch from by recycled (by the connection) when it's disposed. This is important since
548+
// Prevent the command or batch from being recycled (by the connection) when it's disposed. This is important since
521549
// the exception is very likely to escape the using statement of the command, and by that time some other user may
522550
// already be using the recycled instance.
523551
if (!Command.IsWrappedByBatch)
@@ -526,9 +554,8 @@ async Task<bool> NextResult(bool async, bool isConsuming = false, CancellationTo
526554
}
527555
}
528556

529-
// An error means all subsequent statements were skipped by PostgreSQL.
530-
// If any of them were being prepared, we need to update our bookkeeping to put
531-
// them back in unprepared state.
557+
// For the statement that errored, if it was being prepared we need to update our bookkeeping to put them back in unprepared
558+
// state.
532559
for (; StatementIndex < _statements.Count; StatementIndex++)
533560
{
534561
var statement = _statements[StatementIndex];
@@ -537,8 +564,33 @@ async Task<bool> NextResult(bool async, bool isConsuming = false, CancellationTo
537564
statement.IsPreparing = false;
538565
statement.PreparedStatement!.AbortPrepare();
539566
}
567+
568+
// In normal, non-isolated batching, we've consumed the result set and are done.
569+
// However, if the command has error barrier, we now have to consume results from the commands after it (unless it's the
570+
// last one).
571+
// Note that Consume calls NextResult (this method) recursively, the isConsuming flag tells us we're in this mode.
572+
if ((statement.AppendErrorBarrier ?? Command.EnableErrorBarriers) && StatementIndex < _statements.Count - 1)
573+
{
574+
if (isConsuming)
575+
throw;
576+
switch (State)
577+
{
578+
case ReaderState.Consumed:
579+
case ReaderState.Closed:
580+
case ReaderState.Disposed:
581+
// The exception may have caused the connector to break (e.g. I/O), and so the reader is already closed.
582+
break;
583+
default:
584+
// We provide Consume with the first exception which we've just caught.
585+
// If it encounters other exceptions while consuming the rest of the result set, it will raise an AggregateException,
586+
// otherwise it will rethrow this first exception.
587+
await Consume(async, firstException: e);
588+
break; // Never reached, Consume always throws above
589+
}
590+
}
540591
}
541592

593+
State = ReaderState.Consumed;
542594
throw;
543595
}
544596
}
@@ -672,8 +724,9 @@ async Task<bool> NextResultSchemaOnly(bool async, bool isConsuming = false, Canc
672724
// There are no more queries, we're done. Read to the RFQ.
673725
if (!_statements.All(s => s.IsPrepared))
674726
{
675-
ProcessMessage(Expect<ReadyForQueryMessage>(await Connector.ReadMessage(async), Connector));
727+
Expect<ReadyForQueryMessage>(await Connector.ReadMessage(async), Connector);
676728
RowDescription = null;
729+
State = ReaderState.Consumed;
677730
}
678731

679732
return false;
@@ -748,10 +801,6 @@ internal void ProcessMessage(IBackendMessage msg)
748801
State = ReaderState.BetweenResults;
749802
return;
750803

751-
case BackendMessageCode.ReadyForQuery:
752-
State = ReaderState.Consumed;
753-
return;
754-
755804
default:
756805
throw new Exception("Received unexpected backend message of type " + msg.Code);
757806
}
@@ -901,14 +950,44 @@ public override int FieldCount
901950
/// Consumes all result sets for this reader, leaving the connector ready for sending and processing further
902951
/// queries
903952
/// </summary>
904-
async Task Consume(bool async)
953+
async Task Consume(bool async, Exception? firstException = null)
905954
{
906-
// Skip over the other result sets. Note that this does tally records affected
907-
// from CommandComplete messages, and properly sets state for auto-prepared statements
908-
if (_isSchemaOnly)
909-
while (await NextResultSchemaOnly(async, isConsuming: true)) {}
910-
else
911-
while (await NextResult(async, isConsuming: true)) {}
955+
var exceptions = firstException is null ? null : new List<Exception> { firstException };
956+
957+
// Skip over the other result sets. Note that this does tally records affected from CommandComplete messages, and properly sets
958+
// state for auto-prepared statements
959+
while (true)
960+
{
961+
try
962+
{
963+
if (!(_isSchemaOnly
964+
? await NextResultSchemaOnly(async, isConsuming: true)
965+
: await NextResult(async, isConsuming: true)))
966+
{
967+
break;
968+
}
969+
}
970+
catch (Exception e)
971+
{
972+
exceptions ??= new();
973+
exceptions.Add(e);
974+
}
975+
}
976+
977+
Debug.Assert(exceptions?.Count != 0);
978+
979+
switch (exceptions?.Count)
980+
{
981+
case null:
982+
return;
983+
case 1:
984+
ExceptionDispatchInfo.Capture(exceptions[0]).Throw();
985+
return;
986+
default:
987+
throw new NpgsqlException(
988+
"Multiple exceptions occurred when consuming the result set",
989+
new AggregateException(exceptions));
990+
}
912991
}
913992

914993
/// <summary>

src/Npgsql/PostgresMinimalDatabaseInfo.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ class PostgresMinimalDatabaseInfoFactory : INpgsqlDatabaseInfoFactory
2020

2121
class PostgresMinimalDatabaseInfo : PostgresDatabaseInfo
2222
{
23-
static PostgresType[]? TypesWithMultiranges, TypesWithoutMultiranges;
23+
static PostgresType[]? _typesWithMultiranges, _typesWithoutMultiranges;
2424

2525
static PostgresType[] CreateTypes(bool withMultiranges)
2626
=> typeof(NpgsqlDbType).GetFields()
@@ -50,8 +50,8 @@ static PostgresType[] CreateTypes(bool withMultiranges)
5050

5151
protected override IEnumerable<PostgresType> GetTypes()
5252
=> SupportsMultirangeTypes
53-
? TypesWithMultiranges ??= CreateTypes(withMultiranges: true)
54-
: TypesWithoutMultiranges ??= CreateTypes(withMultiranges: false);
53+
? _typesWithMultiranges ??= CreateTypes(withMultiranges: true)
54+
: _typesWithoutMultiranges ??= CreateTypes(withMultiranges: false);
5555

5656
internal PostgresMinimalDatabaseInfo(NpgsqlConnector conn)
5757
: base(conn)

src/Npgsql/PublicAPI.Unshipped.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
11
#nullable enable
2+
Npgsql.NpgsqlBatch.EnableErrorBarriers.get -> bool
3+
Npgsql.NpgsqlBatch.EnableErrorBarriers.set -> void
4+
Npgsql.NpgsqlBatchCommand.AppendErrorBarrier.get -> bool?
5+
Npgsql.NpgsqlBatchCommand.AppendErrorBarrier.set -> void
26
Npgsql.NpgsqlLoggingConfiguration
37
static Npgsql.NpgsqlLoggingConfiguration.InitializeLogging(Microsoft.Extensions.Logging.ILoggerFactory! loggerFactory, bool parameterLoggingEnabled = false) -> void
48
*REMOVED*Npgsql.NpgsqlConnection.Settings.get -> Npgsql.NpgsqlConnectionStringBuilder!

0 commit comments

Comments
 (0)