99using System . IO ;
1010using System . Linq ;
1111using System . Runtime . CompilerServices ;
12+ using System . Runtime . ExceptionServices ;
1213using System . Text ;
1314using System . Threading ;
1415using System . Threading . Tasks ;
@@ -283,9 +284,24 @@ async Task<bool> Read(bool async, CancellationToken cancellationToken = default)
283284 throw new ArgumentOutOfRangeException ( ) ;
284285 }
285286
286- var msg2 = await ReadMessage ( async ) ;
287- ProcessMessage ( msg2 ) ;
288- return msg2 . Code == BackendMessageCode . DataRow ;
287+ var msg = await ReadMessage ( async ) ;
288+
289+ switch ( msg . Code )
290+ {
291+ case BackendMessageCode . DataRow :
292+ ProcessMessage ( msg ) ;
293+ return true ;
294+
295+ case BackendMessageCode . CommandComplete :
296+ case BackendMessageCode . EmptyQueryResponse :
297+ ProcessMessage ( msg ) ;
298+ if ( _statements [ StatementIndex ] . AppendErrorBarrier ?? Command . EnableErrorBarriers )
299+ Expect < ReadyForQueryMessage > ( await Connector . ReadMessage ( async ) , Connector ) ;
300+ return false ;
301+
302+ default :
303+ throw Connector . UnexpectedMessageReceived ( msg . Code ) ;
304+ }
289305 }
290306 catch
291307 {
@@ -335,10 +351,11 @@ public override bool NextResult() => (_isSchemaOnly ? NextResultSchemaOnly(false
335351 /// <returns>A task representing the asynchronous operation.</returns>
336352 public override Task < bool > NextResultAsync ( CancellationToken cancellationToken )
337353 {
338- using ( NoSynchronizationContextScope . Enter ( ) )
339- return _isSchemaOnly
340- ? NextResultSchemaOnly ( async: true , cancellationToken : cancellationToken )
341- : NextResult ( async: true , cancellationToken : cancellationToken ) ;
354+ using var _ = NoSynchronizationContextScope . Enter ( ) ;
355+
356+ return _isSchemaOnly
357+ ? NextResultSchemaOnly ( async: true , cancellationToken : cancellationToken )
358+ : NextResult ( async: true , cancellationToken : cancellationToken ) ;
342359 }
343360
344361 /// <summary>
@@ -370,7 +387,12 @@ async Task<bool> NextResult(bool async, bool isConsuming = false, CancellationTo
370387 case BackendMessageCode . CommandComplete :
371388 case BackendMessageCode . EmptyQueryResponse :
372389 ProcessMessage ( completedMsg ) ;
390+
391+ if ( _statements [ StatementIndex ] . AppendErrorBarrier ?? Command . EnableErrorBarriers )
392+ Expect< ReadyForQueryMessage > ( await Connector . ReadMessage ( async ) , Connector ) ;
393+
373394 break ;
395+
374396 default :
375397 continue ;
376398 }
@@ -472,6 +494,10 @@ async Task<bool> NextResult(bool async, bool isConsuming = false, CancellationTo
472494 }
473495
474496 ProcessMessage( msg ) ;
497+
498+ if ( statement . AppendErrorBarrier ?? Command . EnableErrorBarriers )
499+ Expect< ReadyForQueryMessage > ( await Connector . ReadMessage ( async ) , Connector ) ;
500+
475501 continue ;
476502 }
477503
@@ -494,30 +520,32 @@ async Task<bool> NextResult(bool async, bool isConsuming = false, CancellationTo
494520 switch ( msg . Code )
495521 {
496522 case BackendMessageCode . DataRow :
523+ return true;
497524 case BackendMessageCode . CommandComplete :
498- break ;
525+ if ( statement . AppendErrorBarrier ?? Command . EnableErrorBarriers )
526+ Expect< ReadyForQueryMessage> ( await Connector. ReadMessage( async) , Connector ) ;
527+ return true;
499528 default :
500529 throw Connector. UnexpectedMessageReceived( msg . Code ) ;
501530 }
502-
503- return true;
504531 }
505532
506533 // There are no more queries, we're done. Read the RFQ.
507- ProcessMessage ( Expect < ReadyForQueryMessage > ( await Connector . ReadMessage( async ) , Connector ) ) ;
534+ if ( _statements . Count = = 0 || ! ( _statements [ _statements . Count - 1 ] . AppendErrorBarrier ? ? Command . EnableErrorBarriers ) )
535+ Expect < ReadyForQueryMessage > ( await Connector . ReadMessage( async ) , Connector ) ;
536+
537+ State = ReaderState . Consumed ;
508538 RowDescription = null ;
509539 return false ;
510540 }
511541 catch ( Exception e )
512542 {
513- State = ReaderState . Consumed ;
514-
515543 // Reference the triggering statement from the exception
516544 if ( e is PostgresException postgresException && StatementIndex >= 0 && StatementIndex < _statements . Count )
517545 {
518546 postgresException. BatchCommand = _statements [ StatementIndex ] ;
519547
520- // Prevent the command or batch from by recycled (by the connection) when it's disposed. This is important since
548+ // Prevent the command or batch from being recycled (by the connection) when it's disposed. This is important since
521549 // the exception is very likely to escape the using statement of the command, and by that time some other user may
522550 // already be using the recycled instance.
523551 if ( ! Command . IsWrappedByBatch )
@@ -526,9 +554,8 @@ async Task<bool> NextResult(bool async, bool isConsuming = false, CancellationTo
526554 }
527555 }
528556
529- // An error means all subsequent statements were skipped by PostgreSQL.
530- // If any of them were being prepared, we need to update our bookkeeping to put
531- // them back in unprepared state.
557+ // For the statement that errored, if it was being prepared we need to update our bookkeeping to put them back in unprepared
558+ // state.
532559 for ( ; StatementIndex < _statements . Count ; StatementIndex++ )
533560 {
534561 var statement = _statements [ StatementIndex ] ;
@@ -537,8 +564,33 @@ async Task<bool> NextResult(bool async, bool isConsuming = false, CancellationTo
537564 statement . IsPreparing = false;
538565 statement . PreparedStatement ! . AbortPrepare ( ) ;
539566 }
567+
568+ // In normal, non-isolated batching, we've consumed the result set and are done.
569+ // However, if the command has error barrier, we now have to consume results from the commands after it (unless it's the
570+ // last one).
571+ // Note that Consume calls NextResult (this method) recursively, the isConsuming flag tells us we're in this mode.
572+ if ( ( statement . AppendErrorBarrier ?? Command . EnableErrorBarriers ) && StatementIndex < _statements . Count - 1 )
573+ {
574+ if ( isConsuming )
575+ throw;
576+ switch ( State )
577+ {
578+ case ReaderState. Consumed :
579+ case ReaderState. Closed :
580+ case ReaderState. Disposed :
581+ // The exception may have caused the connector to break (e.g. I/O), and so the reader is already closed.
582+ break ;
583+ default :
584+ // We provide Consume with the first exception which we've just caught.
585+ // If it encounters other exceptions while consuming the rest of the result set, it will raise an AggregateException,
586+ // otherwise it will rethrow this first exception.
587+ await Consume( async, firstException: e) ;
588+ break ; // Never reached, Consume always throws above
589+ }
590+ }
540591 }
541592
593+ State = ReaderState. Consumed ;
542594 throw ;
543595 }
544596 }
@@ -672,8 +724,9 @@ async Task<bool> NextResultSchemaOnly(bool async, bool isConsuming = false, Canc
672724 // There are no more queries, we're done. Read to the RFQ.
673725 if ( ! _statements . All ( s => s . IsPrepared ) )
674726 {
675- ProcessMessage ( Expect< ReadyForQueryMessage> ( await Connector. ReadMessage( async) , Connector ) ) ;
727+ Expect< ReadyForQueryMessage > ( await Connector . ReadMessage ( async ) , Connector ) ;
676728 RowDescription = null ;
729+ State = ReaderState. Consumed;
677730 }
678731
679732 return false;
@@ -748,10 +801,6 @@ internal void ProcessMessage(IBackendMessage msg)
748801 State = ReaderState . BetweenResults ;
749802 return ;
750803
751- case BackendMessageCode. ReadyForQuery:
752- State = ReaderState. Consumed;
753- return ;
754-
755804 default :
756805 throw new Exception ( "Received unexpected backend message of type " + msg . Code ) ;
757806 }
@@ -901,14 +950,44 @@ public override int FieldCount
901950 /// Consumes all result sets for this reader, leaving the connector ready for sending and processing further
902951 /// queries
903952 /// </summary>
904- async Task Consume ( bool async )
953+ async Task Consume ( bool async , Exception ? firstException = null )
905954 {
906- // Skip over the other result sets. Note that this does tally records affected
907- // from CommandComplete messages, and properly sets state for auto-prepared statements
908- if ( _isSchemaOnly )
909- while ( await NextResultSchemaOnly ( async , isConsuming : true) ) { }
910- else
911- while ( await NextResult ( async , isConsuming : true) ) { }
955+ var exceptions = firstException is null ? null : new List < Exception > { firstException } ;
956+
957+ // Skip over the other result sets. Note that this does tally records affected from CommandComplete messages, and properly sets
958+ // state for auto-prepared statements
959+ while ( true )
960+ {
961+ try
962+ {
963+ if ( ! ( _isSchemaOnly
964+ ? await NextResultSchemaOnly ( async , isConsuming : true)
965+ : await NextResult( async , isConsuming : true) ) )
966+ {
967+ break ;
968+ }
969+ }
970+ catch ( Exception e )
971+ {
972+ exceptions ??= new ( ) ;
973+ exceptions . Add ( e ) ;
974+ }
975+ }
976+
977+ Debug . Assert ( exceptions ? . Count != 0 ) ;
978+
979+ switch ( exceptions ? . Count )
980+ {
981+ case null :
982+ return ;
983+ case 1 :
984+ ExceptionDispatchInfo . Capture ( exceptions [ 0 ] ) . Throw ( ) ;
985+ return ;
986+ default :
987+ throw new NpgsqlException(
988+ "Multiple exceptions occurred when consuming the result set" ,
989+ new AggregateException( exceptions ) ) ;
990+ }
912991 }
913992
914993 /// <summary>
0 commit comments