11using System ;
22using System . Buffers ;
3+ using System . Buffers . Binary ;
34using System . Collections ;
45using System . Collections . Generic ;
56using System . Collections . ObjectModel ;
@@ -63,6 +64,8 @@ public sealed class NpgsqlDataReader : DbDataReader, IDbColumnSchemaGenerator
6364 /// </summary>
6465 internal int StatementIndex { get ; private set ; }
6566
67+ bool _expectErrorBarrier ;
68+
6669 /// <summary>
6770 /// Records, for each column, its starting offset and length in the current row.
6871 /// Used only in non-sequential mode.
@@ -147,6 +150,7 @@ internal void Init(
147150 State = ReaderState . BetweenResults ;
148151 _recordsAffected = null ;
149152 _startTimestamp = startTimestamp ;
153+ _expectErrorBarrier = false ;
150154 }
151155
152156 #region Read
@@ -161,7 +165,7 @@ internal void Init(
161165 public override bool Read ( )
162166 {
163167 ThrowIfClosedOrDisposed ( ) ;
164- return TryRead ( ) ? . Result ?? Read ( false ) . GetAwaiter ( ) . GetResult ( ) ;
168+ return Read ( async : false ) . GetAwaiter ( ) . GetResult ( ) ;
165169 }
166170
167171 /// <summary>
@@ -174,125 +178,115 @@ public override bool Read()
174178 public override Task < bool > ReadAsync ( CancellationToken cancellationToken )
175179 {
176180 ThrowIfClosedOrDisposed ( ) ;
177- return TryRead ( ) ?? Read ( async: true , cancellationToken ) ;
181+ return Read ( async: true , cancellationToken ) ;
178182 }
179183
180- // This is an optimized execution path that avoids calling any async methods for the (usual)
181- // case where the next row (or CommandComplete) is already in memory.
182- Task < bool > ? TryRead ( )
184+ [ MethodImpl ( MethodImplOptions . AggressiveInlining ) ]
185+ Task < bool > Read ( bool async , CancellationToken cancellationToken = default )
183186 {
184187 switch ( State )
185188 {
189+ case ReaderState . InResult :
190+ {
191+ if ( ! _isRowBuffered || _behavior . HasFlag ( CommandBehavior . SingleRow ) )
192+ return InResultSlow ( async , cancellationToken ) ;
193+
194+ // Consume current row.
195+ var buffer = Buffer ;
196+ buffer . PgReader . Commit ( ) ;
197+ buffer . ReadPosition = _dataMsgEnd ;
198+
199+ var span = buffer . Span ;
200+ if ( ! MessageHeader . TryParse ( ref span , out var header ) )
201+ return InResultSlow ( async , cancellationToken ) ;
202+
203+ switch ( header . Code )
204+ {
205+ // sizeof(short) is for the number of columns.
206+ case BackendMessageCode . DataRow when span . Length >= ( _isSequential ? sizeof ( short ) : header . Length ) :
207+ Debug . Assert ( BinaryPrimitives . ReadInt16BigEndian ( span ) == ColumnCount ) ;
208+
209+ var msgEnd = _dataMsgEnd = buffer . ReadPosition + MessageHeader . ByteCount + header . Length ;
210+
211+ _isRowBuffered = msgEnd <= buffer . FilledBytes ;
212+ _column = - 1 ;
213+
214+ if ( _columns . Count != 0 )
215+ _columns . Clear ( ) ;
216+
217+ _columnsStartPos = buffer . ReadPosition += MessageHeader . ByteCount + sizeof ( short ) ;
218+ return TrueTask ;
219+ case BackendMessageCode . CommandComplete or BackendMessageCode . EmptyQueryResponse when ! _expectErrorBarrier && span . Length >= header . Length :
220+ buffer . ReadPosition += MessageHeader . ByteCount ;
221+ ProcessMessage ( Connector . ParseServerMessage ( Buffer , BackendMessageCode . CommandComplete , header . Length ) ! ) ;
222+ return FalseTask ;
223+ default :
224+ return InResultSlow ( async , cancellationToken ) ;
225+ }
226+ }
186227 case ReaderState . BeforeResult :
187228 // First Read() after NextResult. Data row has already been processed.
188229 State = ReaderState . InResult ;
189230 return TrueTask ;
190- case ReaderState . InResult :
191- break ;
192231 default :
232+ Debug . Assert ( Enum . IsDefined ( State ) ) ;
193233 return FalseTask ;
194234 }
195235
196- // We have a special case path for SingleRow.
197- if ( _behavior . HasFlag ( CommandBehavior . SingleRow ) || ! _isRowBuffered )
198- return null ;
199-
200- ConsumeBufferedRow ( ) ;
201-
202- const int headerSize = sizeof ( byte ) + sizeof ( int ) ;
203- var buffer = Buffer ;
204- var readPosition = buffer . ReadPosition ;
205- var bytesLeft = buffer . FilledBytes - readPosition ;
206- if ( bytesLeft < headerSize )
207- return null ;
208- var messageCode = ( BackendMessageCode ) buffer . ReadByte ( ) ;
209- var len = buffer . ReadInt32 ( ) - sizeof ( int ) ; // Transmitted length includes itself
210- var isDataRow = messageCode is BackendMessageCode . DataRow ;
211- // sizeof(short) is for the number of columns
212- var sufficientBytes = isDataRow && _isSequential ? headerSize + sizeof ( short ) : headerSize + len ;
213- if ( bytesLeft < sufficientBytes
214- || ! isDataRow && ( _statements [ StatementIndex ] . AppendErrorBarrier ?? Command . EnableErrorBarriers )
215- // Could be an error, let main read handle it.
216- || Connector . ParseResultSetMessage ( buffer , messageCode , len ) is not { } msg )
236+ async Task < bool > InResultSlow ( bool async , CancellationToken cancellationToken )
217237 {
218- buffer . ReadPosition = readPosition ;
219- return null ;
220- }
221- ProcessMessage ( msg ) ;
222- return isDataRow ? TrueTask : FalseTask ;
223- }
224-
225- async Task < bool > Read ( bool async , CancellationToken cancellationToken = default )
226- {
227- using var registration = Connector . StartNestedCancellableOperation ( cancellationToken ) ;
228- try
229- {
230- switch ( State )
238+ Debug . Assert ( State is ReaderState . InResult ) ;
239+ using var registration = Connector . StartNestedCancellableOperation ( cancellationToken ) ;
240+ try
231241 {
232- case ReaderState . BeforeResult :
233- // First Read() after NextResult. Data row has already been processed.
234- State = ReaderState . InResult ;
235- return true ;
236-
237- case ReaderState . InResult :
238- await ConsumeRow ( async ) . ConfigureAwait ( false ) ;
242+ // No more rows for single row.
239243 if ( _behavior . HasFlag ( CommandBehavior . SingleRow ) )
240244 {
241- // TODO: See optimization proposal in #410
242245 await Consume ( async ) . ConfigureAwait ( false ) ;
243246 return false ;
244247 }
245- break ;
246248
247- case ReaderState . BetweenResults :
248- case ReaderState . Consumed :
249- case ReaderState . Closed :
250- case ReaderState . Disposed :
251- return false ;
252- default :
253- ThrowHelper . ThrowArgumentOutOfRangeException ( ) ;
254- return false ;
255- }
256-
257- var msg = await ReadMessage ( async ) . ConfigureAwait( false) ;
249+ await ConsumeRow ( async ) . ConfigureAwait ( false ) ;
258250
259- switch ( msg. Code)
260- {
261- case BackendMessageCode. DataRow:
262- ProcessMessage( msg) ;
263- return true;
251+ var msg = await ReadMessage ( async ) . ConfigureAwait( false) ;
264252
265- case BackendMessageCode. CommandComplete:
266- case BackendMessageCode. EmptyQueryResponse:
267- ProcessMessage( msg) ;
268- if ( _statements [ StatementIndex ] . AppendErrorBarrier ?? Command . EnableErrorBarriers )
269- Expect < ReadyForQueryMessage > ( await Connector . ReadMessage ( async ) . ConfigureAwait( false) , Connector ) ;
270- return false ;
253+ switch ( msg. Code)
254+ {
255+ case BackendMessageCode. DataRow:
256+ ProcessMessage( msg) ;
257+ return true;
271258
272- default :
273- throw Connector . UnexpectedMessageReceived ( msg . Code ) ;
259+ case BackendMessageCode. CommandComplete:
260+ case BackendMessageCode. EmptyQueryResponse:
261+ ProcessMessage( msg) ;
262+ if ( _expectErrorBarrier)
263+ Expect< ReadyForQueryMessage> ( await Connector. ReadMessage( async) . ConfigureAwait( false) , Connector) ;
264+ return false;
265+ default :
266+ throw Connector . UnexpectedMessageReceived ( msg . Code ) ;
267+ }
268+ }
269+ catch
270+ {
271+ // Break may have progressed the reader already.
272+ if ( State is not ReaderState . Closed )
273+ State = ReaderState. Consumed ;
274+ throw ;
274275 }
275- }
276- catch
277- {
278- // Break may have progressed the reader already.
279- if ( State is not ReaderState. Closed)
280- State = ReaderState. Consumed;
281- throw ;
282276 }
283277 }
284278
285279 ValueTask< IBackendMessage > ReadMessage ( bool async )
286280 {
287- return _isSequential ? ReadMessageSequential ( Connector , async ) : Connector. ReadMessage( async ) ;
281+ return _isSequential ? ReadMessageSequential( async , Connector ) : Connector . ReadMessage ( async ) ;
288282
289- static async ValueTask < IBackendMessage > ReadMessageSequential ( NpgsqlConnector connector , bool async )
283+ static async ValueTask < IBackendMessage > ReadMessageSequential ( bool async , NpgsqlConnector connector )
290284 {
291285 var msg = await connector . ReadMessage ( async , DataRowLoadingMode . Sequential ) . ConfigureAwait ( false ) ;
292- if ( msg . Code = = BackendMessageCode . DataRow )
286+ if ( msg . Code is BackendMessageCode. DataRow)
293287 {
294288 // Make sure that the datarow's column count is already buffered
295- await connector . ReadBuffer . Ensure ( 2 , async ) . ConfigureAwait ( false ) ;
289+ await connector. ReadBuffer. Ensure( sizeof ( short ) , async ) . ConfigureAwait ( false ) ;
296290 return msg;
297291 }
298292 return msg;
@@ -310,8 +304,7 @@ static async ValueTask<IBackendMessage> ReadMessageSequential(NpgsqlConnector co
310304 public override bool NextResult ( )
311305 {
312306 ThrowIfClosedOrDisposed( ) ;
313- return ( _isSchemaOnly ? NextResultSchemaOnly ( false ) : NextResult ( false ) )
314- . GetAwaiter ( ) . GetResult ( ) ;
307+ return ( _isSchemaOnly ? NextResultSchemaOnly ( false ) : NextResult( false ) ) . GetAwaiter( ) . GetResult( ) ;
315308 }
316309
317310 /// <summary>
@@ -418,6 +411,7 @@ async Task<bool> NextResult(bool async, bool isConsuming = false, CancellationTo
418411
419412 if ( RowDescription is not null )
420413 {
414+ _expectErrorBarrier = statement. AppendErrorBarrier ?? Command. EnableErrorBarriers;
421415 if ( ColumnInfoCache ? . Length >= ColumnCount )
422416 Array . Clear ( ColumnInfoCache , 0 , ColumnCount ) ;
423417 else
@@ -776,7 +770,7 @@ async Task<bool> NextResultSchemaOnly(bool async, bool isConsuming = false, Canc
776770
777771 #region ProcessMessage
778772
779- internal void ProcessMessage( IBackendMessage msg)
773+ void ProcessMessage( IBackendMessage msg)
780774 {
781775 if ( msg. Code is not BackendMessageCode. DataRow)
782776 {
0 commit comments