Skip to content

Commit 254561a

Browse files
committed
Rework row reading
1 parent 5b91717 commit 254561a

File tree

3 files changed

+126
-105
lines changed

3 files changed

+126
-105
lines changed

src/Npgsql/Internal/NpgsqlConnector.cs

Lines changed: 5 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1399,15 +1399,7 @@ internal ValueTask<IBackendMessage> ReadMessage(
13991399
}
14001400
}
14011401

1402-
internal IBackendMessage? ParseResultSetMessage(NpgsqlReadBuffer buf, BackendMessageCode code, int len, bool handleCallbacks = false)
1403-
=> code switch
1404-
{
1405-
BackendMessageCode.DataRow => _dataRowMessage.Load(len),
1406-
BackendMessageCode.CommandComplete => _commandCompleteMessage.Load(buf, len),
1407-
_ => ParseServerMessage(buf, code, len, false, handleCallbacks)
1408-
};
1409-
1410-
internal IBackendMessage? ParseServerMessage(NpgsqlReadBuffer buf, BackendMessageCode code, int len, bool isPrependedMessage, bool handleCallbacks = true)
1402+
internal IBackendMessage? ParseServerMessage(NpgsqlReadBuffer buf, BackendMessageCode code, int len, bool isPrependedMessage = false)
14111403
{
14121404
switch (code)
14131405
{
@@ -1443,18 +1435,12 @@ internal ValueTask<IBackendMessage> ReadMessage(
14431435
ReadParameterStatus(buf.GetNullTerminatedBytes(), buf.GetNullTerminatedBytes());
14441436
return null;
14451437
case BackendMessageCode.NoticeResponse:
1446-
if (handleCallbacks)
1447-
{
1448-
var notice = PostgresNotice.Load(buf, Settings.IncludeErrorDetail, LoggingConfiguration.ExceptionLogger);
1449-
LogMessages.ReceivedNotice(ConnectionLogger, notice.MessageText, Id);
1450-
Connection?.OnNotice(notice);
1451-
}
1438+
var notice = PostgresNotice.Load(buf, Settings.IncludeErrorDetail, LoggingConfiguration.ExceptionLogger);
1439+
LogMessages.ReceivedNotice(ConnectionLogger, notice.MessageText, Id);
1440+
Connection?.OnNotice(notice);
14521441
return null;
14531442
case BackendMessageCode.NotificationResponse:
1454-
if (handleCallbacks)
1455-
{
1456-
Connection?.OnNotification(new NpgsqlNotificationEventArgs(buf));
1457-
}
1443+
Connection?.OnNotification(new NpgsqlNotificationEventArgs(buf));
14581444
return null;
14591445

14601446
case BackendMessageCode.AuthenticationRequest:

src/Npgsql/Internal/NpgsqlReadBuffer.cs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
using System.IO;
66
using System.Net.Sockets;
77
using System.Runtime.CompilerServices;
8+
using System.Runtime.InteropServices;
89
using System.Text;
910
using System.Threading;
1011
using System.Threading.Tasks;
@@ -13,6 +14,37 @@
1314

1415
namespace Npgsql.Internal;
1516

17+
readonly struct MessageHeader(BackendMessageCode code, int length)
18+
{
19+
public const int ByteCount = sizeof(byte) + sizeof(int);
20+
21+
public BackendMessageCode Code { get; } = code;
22+
public int Length { get; } = length;
23+
24+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
25+
public static bool TryParse(ref ReadOnlySpan<byte> source, out MessageHeader header)
26+
{
27+
const int headerSize = sizeof(byte) + sizeof(int);
28+
29+
if (source.Length < headerSize)
30+
{
31+
header = default;
32+
return false;
33+
}
34+
35+
ref var first = ref MemoryMarshal.GetReference(source);
36+
var code = (BackendMessageCode)first;
37+
var length =
38+
BitConverter.IsLittleEndian
39+
? BinaryPrimitives.ReverseEndianness(Unsafe.ReadUnaligned<int>(ref Unsafe.Add(ref first, 1)))
40+
: Unsafe.ReadUnaligned<int>(ref Unsafe.Add(ref first, 1));
41+
42+
header = new MessageHeader(code, length - sizeof(int)); // Transmitted length includes itself
43+
source = source.Slice(headerSize);
44+
return true;
45+
}
46+
}
47+
1648
/// <summary>
1749
/// A buffer used by Npgsql to read data from the socket efficiently.
1850
/// Provides methods which decode different values types and tracks the current position.

src/Npgsql/NpgsqlDataReader.cs

Lines changed: 89 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using System;
22
using System.Buffers;
3+
using System.Buffers.Binary;
34
using System.Collections;
45
using System.Collections.Generic;
56
using System.Collections.ObjectModel;
@@ -63,6 +64,8 @@ public sealed class NpgsqlDataReader : DbDataReader, IDbColumnSchemaGenerator
6364
/// </summary>
6465
internal int StatementIndex { get; private set; }
6566

67+
bool _expectErrorBarrier;
68+
6669
/// <summary>
6770
/// Records, for each column, its starting offset and length in the current row.
6871
/// Used only in non-sequential mode.
@@ -147,6 +150,7 @@ internal void Init(
147150
State = ReaderState.BetweenResults;
148151
_recordsAffected = null;
149152
_startTimestamp = startTimestamp;
153+
_expectErrorBarrier = false;
150154
}
151155

152156
#region Read
@@ -161,7 +165,7 @@ internal void Init(
161165
public override bool Read()
162166
{
163167
ThrowIfClosedOrDisposed();
164-
return TryRead()?.Result ?? Read(false).GetAwaiter().GetResult();
168+
return Read(false).GetAwaiter().GetResult();
165169
}
166170

167171
/// <summary>
@@ -174,111 +178,104 @@ public override bool Read()
174178
public override Task<bool> ReadAsync(CancellationToken cancellationToken)
175179
{
176180
ThrowIfClosedOrDisposed();
177-
return TryRead() ?? Read(async: true, cancellationToken);
181+
return Read(async: true, cancellationToken);
178182
}
179183

180-
// This is an optimized execution path that avoids calling any async methods for the (usual)
181-
// case where the next row (or CommandComplete) is already in memory.
182-
Task<bool>? TryRead()
184+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
185+
Task<bool> Read(bool async, CancellationToken cancellationToken = default)
183186
{
184-
switch (State)
187+
var state = State;
188+
if (state is ReaderState.InResult)
185189
{
186-
case ReaderState.BeforeResult:
187-
// First Read() after NextResult. Data row has already been processed.
188-
State = ReaderState.InResult;
189-
return TrueTask;
190-
case ReaderState.InResult:
191-
break;
192-
default:
193-
return FalseTask;
194-
}
190+
if (!_isRowBuffered || _behavior.HasFlag(CommandBehavior.SingleRow))
191+
goto slow;
195192

196-
// We have a special case path for SingleRow.
197-
if (_behavior.HasFlag(CommandBehavior.SingleRow) || !_isRowBuffered)
198-
return null;
193+
// Consume current row.
194+
var buffer = Buffer;
195+
buffer.PgReader.Commit();
196+
buffer.ReadPosition = _dataMsgEnd;
199197

200-
ConsumeBufferedRow();
198+
var span = buffer.Span;
199+
if (!MessageHeader.TryParse(ref span, out var header))
200+
goto slow;
201201

202-
const int headerSize = sizeof(byte) + sizeof(int);
203-
var buffer = Buffer;
204-
var readPosition = buffer.ReadPosition;
205-
var bytesLeft = buffer.FilledBytes - readPosition;
206-
if (bytesLeft < headerSize)
207-
return null;
208-
var messageCode = (BackendMessageCode)buffer.ReadByte();
209-
var len = buffer.ReadInt32() - sizeof(int); // Transmitted length includes itself
210-
var isDataRow = messageCode is BackendMessageCode.DataRow;
211-
// sizeof(short) is for the number of columns
212-
var sufficientBytes = isDataRow && _isSequential ? headerSize + sizeof(short) : headerSize + len;
213-
if (bytesLeft < sufficientBytes
214-
|| !isDataRow && (_statements[StatementIndex].AppendErrorBarrier ?? Command.EnableErrorBarriers)
215-
// Could be an error, let main read handle it.
216-
|| Connector.ParseResultSetMessage(buffer, messageCode, len) is not { } msg)
202+
switch (header.Code)
203+
{
204+
// sizeof(short) is for the number of columns.
205+
case BackendMessageCode.DataRow when span.Length >= (_isSequential ? sizeof(short) : header.Length):
206+
Debug.Assert(BinaryPrimitives.ReadInt16BigEndian(span) == ColumnCount);
207+
208+
var msgEnd = _dataMsgEnd = buffer.ReadPosition + MessageHeader.ByteCount + header.Length;
209+
210+
_isRowBuffered = msgEnd <= buffer.FilledBytes;
211+
_column = -1;
212+
213+
if (_columns.Count != 0)
214+
_columns.Clear();
215+
216+
_columnsStartPos = buffer.ReadPosition += MessageHeader.ByteCount + sizeof(short);
217+
return TrueTask;
218+
case BackendMessageCode.CommandComplete or BackendMessageCode.EmptyQueryResponse when !_expectErrorBarrier && span.Length >= header.Length:
219+
buffer.ReadPosition += MessageHeader.ByteCount;
220+
ProcessMessage(Connector.ParseServerMessage(Buffer, BackendMessageCode.CommandComplete, header.Length)!);
221+
return FalseTask;
222+
default:
223+
goto slow;
224+
}
225+
slow:
226+
return InResultSlow(async, cancellationToken);
227+
}
228+
229+
if (state is ReaderState.BeforeResult)
217230
{
218-
buffer.ReadPosition = readPosition;
219-
return null;
231+
// First Read() after NextResult. Data row has already been processed.
232+
State = ReaderState.InResult;
233+
return TrueTask;
220234
}
221-
ProcessMessage(msg);
222-
return isDataRow ? TrueTask : FalseTask;
223-
}
224235

225-
async Task<bool> Read(bool async, CancellationToken cancellationToken = default)
226-
{
227-
using var registration = Connector.StartNestedCancellableOperation(cancellationToken);
228-
try
236+
Debug.Assert(Enum.IsDefined(state));
237+
return FalseTask;
238+
239+
async Task<bool> InResultSlow(bool async, CancellationToken cancellationToken)
229240
{
230-
switch (State)
241+
Debug.Assert(State is ReaderState.InResult);
242+
using var registration = Connector.StartNestedCancellableOperation(cancellationToken);
243+
try
231244
{
232-
case ReaderState.BeforeResult:
233-
// First Read() after NextResult. Data row has already been processed.
234-
State = ReaderState.InResult;
235-
return true;
236-
237-
case ReaderState.InResult:
238-
await ConsumeRow(async).ConfigureAwait(false);
245+
// No more rows for single row.
239246
if (_behavior.HasFlag(CommandBehavior.SingleRow))
240247
{
241-
// TODO: See optimization proposal in #410
242248
await Consume(async).ConfigureAwait(false);
243249
return false;
244250
}
245-
break;
246251

247-
case ReaderState.BetweenResults:
248-
case ReaderState.Consumed:
249-
case ReaderState.Closed:
250-
case ReaderState.Disposed:
251-
return false;
252-
default:
253-
ThrowHelper.ThrowArgumentOutOfRangeException();
254-
return false;
255-
}
256-
257-
var msg = await ReadMessage(async).ConfigureAwait(false);
252+
await ConsumeRow(async).ConfigureAwait(false);
258253

259-
switch (msg.Code)
260-
{
261-
case BackendMessageCode.DataRow:
262-
ProcessMessage(msg);
263-
return true;
254+
var msg = await ReadMessage(async).ConfigureAwait(false);
264255

265-
case BackendMessageCode.CommandComplete:
266-
case BackendMessageCode.EmptyQueryResponse:
267-
ProcessMessage(msg);
268-
if (_statements[StatementIndex].AppendErrorBarrier ?? Command.EnableErrorBarriers)
269-
Expect<ReadyForQueryMessage>(await Connector.ReadMessage(async).ConfigureAwait(false), Connector);
270-
return false;
256+
switch (msg.Code)
257+
{
258+
case BackendMessageCode.DataRow:
259+
ProcessMessage(msg);
260+
return true;
271261

272-
default:
273-
throw Connector.UnexpectedMessageReceived(msg.Code);
262+
case BackendMessageCode.CommandComplete:
263+
case BackendMessageCode.EmptyQueryResponse:
264+
ProcessMessage(msg);
265+
if (_expectErrorBarrier)
266+
Expect<ReadyForQueryMessage>(await Connector.ReadMessage(async).ConfigureAwait(false), Connector);
267+
return false;
268+
default:
269+
throw Connector.UnexpectedMessageReceived(msg.Code);
270+
}
271+
}
272+
catch
273+
{
274+
// Break may have progressed the reader already.
275+
if (State is not ReaderState.Closed)
276+
State = ReaderState.Consumed;
277+
throw;
274278
}
275-
}
276-
catch
277-
{
278-
// Break may have progressed the reader already.
279-
if (State is not ReaderState.Closed)
280-
State = ReaderState.Consumed;
281-
throw;
282279
}
283280
}
284281

@@ -289,7 +286,7 @@ ValueTask<IBackendMessage> ReadMessage(bool async)
289286
static async ValueTask<IBackendMessage> ReadMessageSequential(NpgsqlConnector connector, bool async)
290287
{
291288
var msg = await connector.ReadMessage(async, DataRowLoadingMode.Sequential).ConfigureAwait(false);
292-
if (msg.Code == BackendMessageCode.DataRow)
289+
if (msg.Code is BackendMessageCode.DataRow)
293290
{
294291
// Make sure that the datarow's column count is already buffered
295292
await connector.ReadBuffer.Ensure(2, async).ConfigureAwait(false);
@@ -343,7 +340,12 @@ async Task<bool> NextResult(bool async, bool isConsuming = false, CancellationTo
343340
using var registration = isConsuming ? default : Connector.StartNestedCancellableOperation(cancellationToken);
344341
// If we're in the middle of a resultset, consume it
345342
if (State is ReaderState.BeforeResult or ReaderState.InResult)
346-
await ConsumeResultSet(async).ConfigureAwait(false);
343+
{
344+
// if (!isConsuming && _behavior.HasFlag(CommandBehavior.SingleRow) && State is ReaderState.InResult)
345+
// await Consume(async).ConfigureAwait(false);
346+
// else
347+
await ConsumeResultSet(async).ConfigureAwait(false);
348+
}
347349

348350
Debug.Assert(State is ReaderState.BetweenResults);
349351

@@ -418,6 +420,7 @@ async Task<bool> NextResult(bool async, bool isConsuming = false, CancellationTo
418420

419421
if (RowDescription is not null)
420422
{
423+
_expectErrorBarrier = statement.AppendErrorBarrier ?? Command.EnableErrorBarriers;
421424
if (ColumnInfoCache?.Length >= ColumnCount)
422425
Array.Clear(ColumnInfoCache, 0, ColumnCount);
423426
else

0 commit comments

Comments
 (0)