Skip to content

Commit fe448b4

Browse files
committed
Rework row reading
1 parent cf122e5 commit fe448b4

3 files changed

Lines changed: 119 additions & 107 deletions

File tree

src/Npgsql/Internal/NpgsqlConnector.cs

Lines changed: 5 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1399,15 +1399,7 @@ internal ValueTask<IBackendMessage> ReadMessage(
13991399
}
14001400
}
14011401

1402-
internal IBackendMessage? ParseResultSetMessage(NpgsqlReadBuffer buf, BackendMessageCode code, int len, bool handleCallbacks = false)
1403-
=> code switch
1404-
{
1405-
BackendMessageCode.DataRow => _dataRowMessage.Load(len),
1406-
BackendMessageCode.CommandComplete => _commandCompleteMessage.Load(buf, len),
1407-
_ => ParseServerMessage(buf, code, len, false, handleCallbacks)
1408-
};
1409-
1410-
internal IBackendMessage? ParseServerMessage(NpgsqlReadBuffer buf, BackendMessageCode code, int len, bool isPrependedMessage, bool handleCallbacks = true)
1402+
internal IBackendMessage? ParseServerMessage(NpgsqlReadBuffer buf, BackendMessageCode code, int len, bool isPrependedMessage = false)
14111403
{
14121404
switch (code)
14131405
{
@@ -1443,18 +1435,12 @@ internal ValueTask<IBackendMessage> ReadMessage(
14431435
ReadParameterStatus(buf.GetNullTerminatedBytes(), buf.GetNullTerminatedBytes());
14441436
return null;
14451437
case BackendMessageCode.NoticeResponse:
1446-
if (handleCallbacks)
1447-
{
1448-
var notice = PostgresNotice.Load(buf, Settings.IncludeErrorDetail, LoggingConfiguration.ExceptionLogger);
1449-
LogMessages.ReceivedNotice(ConnectionLogger, notice.MessageText, Id);
1450-
Connection?.OnNotice(notice);
1451-
}
1438+
var notice = PostgresNotice.Load(buf, Settings.IncludeErrorDetail, LoggingConfiguration.ExceptionLogger);
1439+
LogMessages.ReceivedNotice(ConnectionLogger, notice.MessageText, Id);
1440+
Connection?.OnNotice(notice);
14521441
return null;
14531442
case BackendMessageCode.NotificationResponse:
1454-
if (handleCallbacks)
1455-
{
1456-
Connection?.OnNotification(new NpgsqlNotificationEventArgs(buf));
1457-
}
1443+
Connection?.OnNotification(new NpgsqlNotificationEventArgs(buf));
14581444
return null;
14591445

14601446
case BackendMessageCode.AuthenticationRequest:

src/Npgsql/Internal/NpgsqlReadBuffer.cs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
using System.IO;
66
using System.Net.Sockets;
77
using System.Runtime.CompilerServices;
8+
using System.Runtime.InteropServices;
89
using System.Text;
910
using System.Threading;
1011
using System.Threading.Tasks;
@@ -13,6 +14,37 @@
1314

1415
namespace Npgsql.Internal;
1516

17+
readonly struct MessageHeader(BackendMessageCode code, int length)
18+
{
19+
public const int ByteCount = sizeof(byte) + sizeof(int);
20+
21+
public BackendMessageCode Code { get; } = code;
22+
public int Length { get; } = length;
23+
24+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
25+
public static bool TryParse(ref ReadOnlySpan<byte> source, out MessageHeader header)
26+
{
27+
const int headerSize = sizeof(byte) + sizeof(int);
28+
29+
if (source.Length < headerSize)
30+
{
31+
header = default;
32+
return false;
33+
}
34+
35+
ref var first = ref MemoryMarshal.GetReference(source);
36+
var code = (BackendMessageCode)first;
37+
var length =
38+
BitConverter.IsLittleEndian
39+
? BinaryPrimitives.ReverseEndianness(Unsafe.ReadUnaligned<int>(ref Unsafe.Add(ref first, 1)))
40+
: Unsafe.ReadUnaligned<int>(ref Unsafe.Add(ref first, 1));
41+
42+
header = new MessageHeader(code, length - sizeof(int)); // Transmitted length includes itself
43+
source = source.Slice(headerSize);
44+
return true;
45+
}
46+
}
47+
1648
/// <summary>
1749
/// A buffer used by Npgsql to read data from the socket efficiently.
1850
/// Provides methods which decode different values types and tracks the current position.

src/Npgsql/NpgsqlDataReader.cs

Lines changed: 82 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using System;
22
using System.Buffers;
3+
using System.Buffers.Binary;
34
using System.Collections;
45
using System.Collections.Generic;
56
using System.Collections.ObjectModel;
@@ -63,6 +64,8 @@ public sealed class NpgsqlDataReader : DbDataReader, IDbColumnSchemaGenerator
6364
/// </summary>
6465
internal int StatementIndex { get; private set; }
6566

67+
bool _expectErrorBarrier;
68+
6669
/// <summary>
6770
/// Records, for each column, its starting offset and length in the current row.
6871
/// Used only in non-sequential mode.
@@ -147,6 +150,7 @@ internal void Init(
147150
State = ReaderState.BetweenResults;
148151
_recordsAffected = null;
149152
_startTimestamp = startTimestamp;
153+
_expectErrorBarrier = false;
150154
}
151155

152156
#region Read
@@ -161,7 +165,7 @@ internal void Init(
161165
public override bool Read()
162166
{
163167
ThrowIfClosedOrDisposed();
164-
return TryRead()?.Result ?? Read(false).GetAwaiter().GetResult();
168+
return Read(async: false).GetAwaiter().GetResult();
165169
}
166170

167171
/// <summary>
@@ -174,125 +178,115 @@ public override bool Read()
174178
public override Task<bool> ReadAsync(CancellationToken cancellationToken)
175179
{
176180
ThrowIfClosedOrDisposed();
177-
return TryRead() ?? Read(async: true, cancellationToken);
181+
return Read(async: true, cancellationToken);
178182
}
179183

180-
// This is an optimized execution path that avoids calling any async methods for the (usual)
181-
// case where the next row (or CommandComplete) is already in memory.
182-
Task<bool>? TryRead()
184+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
185+
Task<bool> Read(bool async, CancellationToken cancellationToken = default)
183186
{
184187
switch (State)
185188
{
189+
case ReaderState.InResult:
190+
{
191+
if (!_isRowBuffered || _behavior.HasFlag(CommandBehavior.SingleRow))
192+
return InResultSlow(async, cancellationToken);
193+
194+
// Consume current row.
195+
var buffer = Buffer;
196+
buffer.PgReader.Commit();
197+
buffer.ReadPosition = _dataMsgEnd;
198+
199+
var span = buffer.Span;
200+
if (!MessageHeader.TryParse(ref span, out var header))
201+
return InResultSlow(async, cancellationToken);
202+
203+
switch (header.Code)
204+
{
205+
// sizeof(short) is for the number of columns.
206+
case BackendMessageCode.DataRow when span.Length >= (_isSequential ? sizeof(short) : header.Length):
207+
Debug.Assert(BinaryPrimitives.ReadInt16BigEndian(span) == ColumnCount);
208+
209+
var msgEnd = _dataMsgEnd = buffer.ReadPosition + MessageHeader.ByteCount + header.Length;
210+
211+
_isRowBuffered = msgEnd <= buffer.FilledBytes;
212+
_column = -1;
213+
214+
if (_columns.Count != 0)
215+
_columns.Clear();
216+
217+
_columnsStartPos = buffer.ReadPosition += MessageHeader.ByteCount + sizeof(short);
218+
return TrueTask;
219+
case BackendMessageCode.CommandComplete or BackendMessageCode.EmptyQueryResponse when !_expectErrorBarrier && span.Length >= header.Length:
220+
buffer.ReadPosition += MessageHeader.ByteCount;
221+
ProcessMessage(Connector.ParseServerMessage(Buffer, BackendMessageCode.CommandComplete, header.Length)!);
222+
return FalseTask;
223+
default:
224+
return InResultSlow(async, cancellationToken);
225+
}
226+
}
186227
case ReaderState.BeforeResult:
187228
// First Read() after NextResult. Data row has already been processed.
188229
State = ReaderState.InResult;
189230
return TrueTask;
190-
case ReaderState.InResult:
191-
break;
192231
default:
232+
Debug.Assert(Enum.IsDefined(State));
193233
return FalseTask;
194234
}
195235

196-
// We have a special case path for SingleRow.
197-
if (_behavior.HasFlag(CommandBehavior.SingleRow) || !_isRowBuffered)
198-
return null;
199-
200-
ConsumeBufferedRow();
201-
202-
const int headerSize = sizeof(byte) + sizeof(int);
203-
var buffer = Buffer;
204-
var readPosition = buffer.ReadPosition;
205-
var bytesLeft = buffer.FilledBytes - readPosition;
206-
if (bytesLeft < headerSize)
207-
return null;
208-
var messageCode = (BackendMessageCode)buffer.ReadByte();
209-
var len = buffer.ReadInt32() - sizeof(int); // Transmitted length includes itself
210-
var isDataRow = messageCode is BackendMessageCode.DataRow;
211-
// sizeof(short) is for the number of columns
212-
var sufficientBytes = isDataRow && _isSequential ? headerSize + sizeof(short) : headerSize + len;
213-
if (bytesLeft < sufficientBytes
214-
|| !isDataRow && (_statements[StatementIndex].AppendErrorBarrier ?? Command.EnableErrorBarriers)
215-
// Could be an error, let main read handle it.
216-
|| Connector.ParseResultSetMessage(buffer, messageCode, len) is not { } msg)
236+
async Task<bool> InResultSlow(bool async, CancellationToken cancellationToken)
217237
{
218-
buffer.ReadPosition = readPosition;
219-
return null;
220-
}
221-
ProcessMessage(msg);
222-
return isDataRow ? TrueTask : FalseTask;
223-
}
224-
225-
async Task<bool> Read(bool async, CancellationToken cancellationToken = default)
226-
{
227-
using var registration = Connector.StartNestedCancellableOperation(cancellationToken);
228-
try
229-
{
230-
switch (State)
238+
Debug.Assert(State is ReaderState.InResult);
239+
using var registration = Connector.StartNestedCancellableOperation(cancellationToken);
240+
try
231241
{
232-
case ReaderState.BeforeResult:
233-
// First Read() after NextResult. Data row has already been processed.
234-
State = ReaderState.InResult;
235-
return true;
236-
237-
case ReaderState.InResult:
238-
await ConsumeRow(async).ConfigureAwait(false);
242+
// No more rows for single row.
239243
if (_behavior.HasFlag(CommandBehavior.SingleRow))
240244
{
241-
// TODO: See optimization proposal in #410
242245
await Consume(async).ConfigureAwait(false);
243246
return false;
244247
}
245-
break;
246248

247-
case ReaderState.BetweenResults:
248-
case ReaderState.Consumed:
249-
case ReaderState.Closed:
250-
case ReaderState.Disposed:
251-
return false;
252-
default:
253-
ThrowHelper.ThrowArgumentOutOfRangeException();
254-
return false;
255-
}
256-
257-
var msg = await ReadMessage(async).ConfigureAwait(false);
249+
await ConsumeRow(async).ConfigureAwait(false);
258250

259-
switch (msg.Code)
260-
{
261-
case BackendMessageCode.DataRow:
262-
ProcessMessage(msg);
263-
return true;
251+
var msg = await ReadMessage(async).ConfigureAwait(false);
264252

265-
case BackendMessageCode.CommandComplete:
266-
case BackendMessageCode.EmptyQueryResponse:
267-
ProcessMessage(msg);
268-
if (_statements[StatementIndex].AppendErrorBarrier ?? Command.EnableErrorBarriers)
269-
Expect<ReadyForQueryMessage>(await Connector.ReadMessage(async).ConfigureAwait(false), Connector);
270-
return false;
253+
switch (msg.Code)
254+
{
255+
case BackendMessageCode.DataRow:
256+
ProcessMessage(msg);
257+
return true;
271258

272-
default:
273-
throw Connector.UnexpectedMessageReceived(msg.Code);
259+
case BackendMessageCode.CommandComplete:
260+
case BackendMessageCode.EmptyQueryResponse:
261+
ProcessMessage(msg);
262+
if (_expectErrorBarrier)
263+
Expect<ReadyForQueryMessage>(await Connector.ReadMessage(async).ConfigureAwait(false), Connector);
264+
return false;
265+
default:
266+
throw Connector.UnexpectedMessageReceived(msg.Code);
267+
}
268+
}
269+
catch
270+
{
271+
// Break may have progressed the reader already.
272+
if (State is not ReaderState.Closed)
273+
State = ReaderState.Consumed;
274+
throw;
274275
}
275-
}
276-
catch
277-
{
278-
// Break may have progressed the reader already.
279-
if (State is not ReaderState.Closed)
280-
State = ReaderState.Consumed;
281-
throw;
282276
}
283277
}
284278

285279
ValueTask<IBackendMessage> ReadMessage(bool async)
286280
{
287-
return _isSequential ? ReadMessageSequential(Connector, async) : Connector.ReadMessage(async);
281+
return _isSequential ? ReadMessageSequential(async, Connector) : Connector.ReadMessage(async);
288282

289-
static async ValueTask<IBackendMessage> ReadMessageSequential(NpgsqlConnector connector, bool async)
283+
static async ValueTask<IBackendMessage> ReadMessageSequential(bool async, NpgsqlConnector connector)
290284
{
291285
var msg = await connector.ReadMessage(async, DataRowLoadingMode.Sequential).ConfigureAwait(false);
292-
if (msg.Code == BackendMessageCode.DataRow)
286+
if (msg.Code is BackendMessageCode.DataRow)
293287
{
294288
// Make sure that the datarow's column count is already buffered
295-
await connector.ReadBuffer.Ensure(2, async).ConfigureAwait(false);
289+
await connector.ReadBuffer.Ensure(sizeof(short), async).ConfigureAwait(false);
296290
return msg;
297291
}
298292
return msg;
@@ -310,8 +304,7 @@ static async ValueTask<IBackendMessage> ReadMessageSequential(NpgsqlConnector co
310304
public override bool NextResult()
311305
{
312306
ThrowIfClosedOrDisposed();
313-
return (_isSchemaOnly ? NextResultSchemaOnly(false) : NextResult(false))
314-
.GetAwaiter().GetResult();
307+
return (_isSchemaOnly ? NextResultSchemaOnly(false) : NextResult(false)).GetAwaiter().GetResult();
315308
}
316309

317310
/// <summary>
@@ -418,6 +411,7 @@ async Task<bool> NextResult(bool async, bool isConsuming = false, CancellationTo
418411

419412
if (RowDescription is not null)
420413
{
414+
_expectErrorBarrier = statement.AppendErrorBarrier ?? Command.EnableErrorBarriers;
421415
if (ColumnInfoCache?.Length >= ColumnCount)
422416
Array.Clear(ColumnInfoCache, 0, ColumnCount);
423417
else
@@ -776,7 +770,7 @@ async Task<bool> NextResultSchemaOnly(bool async, bool isConsuming = false, Canc
776770

777771
#region ProcessMessage
778772

779-
internal void ProcessMessage(IBackendMessage msg)
773+
void ProcessMessage(IBackendMessage msg)
780774
{
781775
if (msg.Code is not BackendMessageCode.DataRow)
782776
{

0 commit comments

Comments
 (0)