11using System ;
22using System . Buffers ;
3+ using System . Buffers . Binary ;
34using System . Collections ;
45using System . Collections . Generic ;
56using System . Collections . ObjectModel ;
@@ -63,6 +64,8 @@ public sealed class NpgsqlDataReader : DbDataReader, IDbColumnSchemaGenerator
6364 /// </summary>
6465 internal int StatementIndex { get ; private set ; }
6566
67+ bool _expectErrorBarrier ;
68+
6669 /// <summary>
6770 /// Records, for each column, its starting offset and length in the current row.
6871 /// Used only in non-sequential mode.
@@ -147,6 +150,7 @@ internal void Init(
147150 State = ReaderState . BetweenResults ;
148151 _recordsAffected = null ;
149152 _startTimestamp = startTimestamp ;
153+ _expectErrorBarrier = false ;
150154 }
151155
152156 #region Read
@@ -161,7 +165,7 @@ internal void Init(
161165 public override bool Read ( )
162166 {
163167 ThrowIfClosedOrDisposed ( ) ;
164- return TryRead ( ) ? . Result ?? Read ( false ) . GetAwaiter ( ) . GetResult ( ) ;
168+ return Read ( false ) . GetAwaiter ( ) . GetResult ( ) ;
165169 }
166170
167171 /// <summary>
@@ -174,111 +178,104 @@ public override bool Read()
174178 public override Task < bool > ReadAsync ( CancellationToken cancellationToken )
175179 {
176180 ThrowIfClosedOrDisposed ( ) ;
177- return TryRead ( ) ?? Read ( async: true , cancellationToken ) ;
181+ return Read ( async: true , cancellationToken ) ;
178182 }
179183
180- // This is an optimized execution path that avoids calling any async methods for the (usual)
181- // case where the next row (or CommandComplete) is already in memory.
182- Task < bool > ? TryRead ( )
184+ [ MethodImpl ( MethodImplOptions . AggressiveInlining ) ]
185+ Task < bool > Read ( bool async , CancellationToken cancellationToken = default )
183186 {
184- switch ( State )
187+ var state = State ;
188+ if ( state is ReaderState . InResult )
185189 {
186- case ReaderState . BeforeResult :
187- // First Read() after NextResult. Data row has already been processed.
188- State = ReaderState . InResult ;
189- return TrueTask ;
190- case ReaderState . InResult :
191- break ;
192- default :
193- return FalseTask ;
194- }
190+ if ( ! _isRowBuffered || _behavior . HasFlag ( CommandBehavior . SingleRow ) )
191+ goto slow ;
195192
196- // We have a special case path for SingleRow.
197- if ( _behavior . HasFlag ( CommandBehavior . SingleRow ) || ! _isRowBuffered )
198- return null ;
193+ // Consume current row.
194+ var buffer = Buffer ;
195+ buffer . PgReader . Commit ( ) ;
196+ buffer . ReadPosition = _dataMsgEnd ;
199197
200- ConsumeBufferedRow ( ) ;
198+ var span = buffer . Span ;
199+ if ( ! MessageHeader . TryParse ( ref span , out var header ) )
200+ goto slow ;
201201
202- const int headerSize = sizeof ( byte ) + sizeof ( int ) ;
203- var buffer = Buffer ;
204- var readPosition = buffer . ReadPosition ;
205- var bytesLeft = buffer . FilledBytes - readPosition ;
206- if ( bytesLeft < headerSize )
207- return null ;
208- var messageCode = ( BackendMessageCode ) buffer . ReadByte ( ) ;
209- var len = buffer . ReadInt32 ( ) - sizeof ( int ) ; // Transmitted length includes itself
210- var isDataRow = messageCode is BackendMessageCode . DataRow ;
211- // sizeof(short) is for the number of columns
212- var sufficientBytes = isDataRow && _isSequential ? headerSize + sizeof ( short ) : headerSize + len ;
213- if ( bytesLeft < sufficientBytes
214- || ! isDataRow && ( _statements [ StatementIndex ] . AppendErrorBarrier ?? Command . EnableErrorBarriers )
215- // Could be an error, let main read handle it.
216- || Connector . ParseResultSetMessage ( buffer , messageCode , len ) is not { } msg )
202+ switch ( header . Code )
203+ {
204+ // sizeof(short) is for the number of columns.
205+ case BackendMessageCode . DataRow when span . Length >= ( _isSequential ? sizeof ( short ) : header . Length ) :
206+ Debug . Assert ( BinaryPrimitives . ReadInt16BigEndian ( span ) == ColumnCount ) ;
207+
208+ var msgEnd = _dataMsgEnd = buffer . ReadPosition + MessageHeader . ByteCount + header . Length ;
209+
210+ _isRowBuffered = msgEnd <= buffer . FilledBytes ;
211+ _column = - 1 ;
212+
213+ if ( _columns . Count != 0 )
214+ _columns . Clear ( ) ;
215+
216+ _columnsStartPos = buffer . ReadPosition += MessageHeader . ByteCount + sizeof ( short ) ;
217+ return TrueTask ;
218+ case BackendMessageCode . CommandComplete or BackendMessageCode . EmptyQueryResponse when ! _expectErrorBarrier && span . Length >= header . Length :
219+ buffer . ReadPosition += MessageHeader . ByteCount ;
220+ ProcessMessage ( Connector . ParseServerMessage ( Buffer , BackendMessageCode . CommandComplete , header . Length ) ! ) ;
221+ return FalseTask ;
222+ default :
223+ goto slow ;
224+ }
225+ slow :
226+ return InResultSlow ( async , cancellationToken ) ;
227+ }
228+
229+ if ( state is ReaderState . BeforeResult )
217230 {
218- buffer . ReadPosition = readPosition ;
219- return null ;
231+ // First Read() after NextResult. Data row has already been processed.
232+ State = ReaderState . InResult ;
233+ return TrueTask ;
220234 }
221- ProcessMessage ( msg ) ;
222- return isDataRow ? TrueTask : FalseTask ;
223- }
224235
225- async Task < bool > Read ( bool async , CancellationToken cancellationToken = default )
226- {
227- using var registration = Connector . StartNestedCancellableOperation ( cancellationToken ) ;
228- try
236+ Debug . Assert ( Enum . IsDefined ( state ) ) ;
237+ return FalseTask ;
238+
239+ async Task < bool > InResultSlow ( bool async , CancellationToken cancellationToken )
229240 {
230- switch ( State )
241+ Debug . Assert ( State is ReaderState . InResult ) ;
242+ using var registration = Connector . StartNestedCancellableOperation ( cancellationToken ) ;
243+ try
231244 {
232- case ReaderState . BeforeResult :
233- // First Read() after NextResult. Data row has already been processed.
234- State = ReaderState . InResult ;
235- return true ;
236-
237- case ReaderState . InResult :
238- await ConsumeRow ( async ) . ConfigureAwait ( false ) ;
245+ // No more rows for single row.
239246 if ( _behavior . HasFlag ( CommandBehavior . SingleRow ) )
240247 {
241- // TODO: See optimization proposal in #410
242248 await Consume ( async ) . ConfigureAwait ( false ) ;
243249 return false ;
244250 }
245- break ;
246251
247- case ReaderState . BetweenResults :
248- case ReaderState . Consumed :
249- case ReaderState . Closed :
250- case ReaderState . Disposed :
251- return false ;
252- default :
253- ThrowHelper . ThrowArgumentOutOfRangeException ( ) ;
254- return false ;
255- }
256-
257- var msg = await ReadMessage ( async ) . ConfigureAwait( false) ;
252+ await ConsumeRow ( async ) . ConfigureAwait ( false ) ;
258253
259- switch ( msg. Code)
260- {
261- case BackendMessageCode. DataRow:
262- ProcessMessage( msg) ;
263- return true;
254+ var msg = await ReadMessage ( async ) . ConfigureAwait( false) ;
264255
265- case BackendMessageCode. CommandComplete:
266- case BackendMessageCode. EmptyQueryResponse:
267- ProcessMessage( msg) ;
268- if ( _statements [ StatementIndex ] . AppendErrorBarrier ?? Command . EnableErrorBarriers )
269- Expect < ReadyForQueryMessage > ( await Connector . ReadMessage ( async ) . ConfigureAwait( false) , Connector ) ;
270- return false ;
256+ switch ( msg. Code)
257+ {
258+ case BackendMessageCode. DataRow:
259+ ProcessMessage( msg) ;
260+ return true;
271261
272- default :
273- throw Connector . UnexpectedMessageReceived ( msg . Code ) ;
262+ case BackendMessageCode. CommandComplete:
263+ case BackendMessageCode. EmptyQueryResponse:
264+ ProcessMessage( msg) ;
265+ if ( _expectErrorBarrier)
266+ Expect< ReadyForQueryMessage> ( await Connector. ReadMessage( async) . ConfigureAwait( false) , Connector) ;
267+ return false;
268+ default :
269+ throw Connector . UnexpectedMessageReceived ( msg . Code ) ;
270+ }
271+ }
272+ catch
273+ {
274+ // Break may have progressed the reader already.
275+ if ( State is not ReaderState . Closed )
276+ State = ReaderState. Consumed ;
277+ throw ;
274278 }
275- }
276- catch
277- {
278- // Break may have progressed the reader already.
279- if ( State is not ReaderState. Closed)
280- State = ReaderState. Consumed;
281- throw ;
282279 }
283280 }
284281
@@ -289,7 +286,7 @@ ValueTask<IBackendMessage> ReadMessage(bool async)
289286 static async ValueTask < IBackendMessage > ReadMessageSequential ( NpgsqlConnector connector , bool async )
290287 {
291288 var msg = await connector . ReadMessage ( async , DataRowLoadingMode . Sequential ) . ConfigureAwait ( false ) ;
292- if ( msg . Code = = BackendMessageCode . DataRow )
289+ if ( msg . Code is BackendMessageCode. DataRow)
293290 {
294291 // Make sure that the datarow's column count is already buffered
295292 await connector. ReadBuffer. Ensure( 2 , async ) . ConfigureAwait ( false ) ;
@@ -343,7 +340,12 @@ async Task<bool> NextResult(bool async, bool isConsuming = false, CancellationTo
343340 using var registration = isConsuming ? default : Connector . StartNestedCancellableOperation ( cancellationToken ) ;
344341 // If we're in the middle of a resultset, consume it
345342 if ( State is ReaderState. BeforeResult or ReaderState. InResult)
346- await ConsumeResultSet ( async ) . ConfigureAwait ( false ) ;
343+ {
344+ // if (!isConsuming && _behavior.HasFlag(CommandBehavior.SingleRow) && State is ReaderState.InResult)
345+ // await Consume(async).ConfigureAwait(false);
346+ // else
347+ await ConsumeResultSet( async) . ConfigureAwait( false) ;
348+ }
347349
348350 Debug. Assert( State is ReaderState. BetweenResults) ;
349351
@@ -418,6 +420,7 @@ async Task<bool> NextResult(bool async, bool isConsuming = false, CancellationTo
418420
419421 if ( RowDescription is not null )
420422 {
423+ _expectErrorBarrier = statement. AppendErrorBarrier ?? Command. EnableErrorBarriers;
421424 if ( ColumnInfoCache ? . Length >= ColumnCount )
422425 Array . Clear ( ColumnInfoCache , 0 , ColumnCount ) ;
423426 else
0 commit comments