Skip to content
6 changes: 4 additions & 2 deletions src/Npgsql/BackendMessages/DataRowMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ sealed class DataRowMessage : IBackendMessage
public BackendMessageCode Code => BackendMessageCode.DataRow;

internal int Length { get; private set; }
internal short ColumnCount { get; private set; }

internal DataRowMessage Load(int len)
internal DataRowMessage Load(int len, short columnCount)
{
Length = len;
ColumnCount = columnCount;
return this;
}
}
}
14 changes: 12 additions & 2 deletions src/Npgsql/Internal/Converters/Primitive/ByteaConverters.cs
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,21 @@ sealed class MemoryByteaConverter : ByteaConverters<Memory<byte>>

sealed class StreamByteaConverter : PgStreamingConverter<Stream>
{
readonly bool _supportsText;

public StreamByteaConverter(bool supportsText) => _supportsText = supportsText;

public override bool CanConvert(DataFormat format, out BufferRequirements bufferRequirements)
{
bufferRequirements = BufferRequirements.None;
return format is DataFormat.Text or DataFormat.Binary;
}

public override Stream Read(PgReader reader)
=> throw new NotSupportedException("Handled by generic stream support in NpgsqlDataReader");
=> reader.GetStream();

public override ValueTask<Stream> ReadAsync(PgReader reader, CancellationToken cancellationToken = default)
=> throw new NotSupportedException("Handled by generic stream support in NpgsqlDataReader");
=> new(reader.GetStream());

public override Size GetSize(SizeContext context, Stream value, ref object? writeState)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ static int ConsumeChars(TextReader reader, int? count)
var toRead = count is null ? maxStackAlloc : Math.Min(maxStackAlloc, count.Value - totalRead);
var read = reader.ReadBlock(tempCharBuf.Slice(0, toRead));
totalRead += read;
if (count is not null && read is 0)
if (read is 0 && count >= totalRead)
throw new EndOfStreamException();

fin = count is null ? read is 0 : totalRead >= count;
Expand Down
20 changes: 20 additions & 0 deletions src/Npgsql/Internal/Converters/ReadStreamConverter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
using System.IO;
using System.Threading;
using System.Threading.Tasks;

namespace Npgsql.Internal.Converters;

class ReadStreamConverter : PgStreamingConverter<Stream>
{
public override bool CanConvert(DataFormat format, out BufferRequirements bufferRequirements)
{
bufferRequirements = BufferRequirements.None;
return format is DataFormat.Text or DataFormat.Binary;
}

public override Stream Read(PgReader reader) => reader.GetStream();
public override ValueTask<Stream> ReadAsync(PgReader reader, CancellationToken cancellationToken = default) => new(reader.GetStream());
public override Size GetSize(SizeContext context, Stream value, ref object? writeState) => throw new System.NotImplementedException();
public override void Write(PgWriter writer, Stream value) => throw new System.NotImplementedException();
public override ValueTask WriteAsync(PgWriter writer, Stream value, CancellationToken cancellationToken = default) => throw new System.NotImplementedException();
}
31 changes: 10 additions & 21 deletions src/Npgsql/Internal/NpgsqlConnector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1294,9 +1294,12 @@ internal ValueTask<IBackendMessage> ReadMessage(
{
if (dataRowLoadingMode == DataRowLoadingMode.Skip)
{
await ReadBuffer.Skip(len, async).ConfigureAwait(false);
await ReadBuffer.Skip(async, len).ConfigureAwait(false);
continue;
}

// Make sure that the column count is already buffered.
await ReadBuffer.Ensure(sizeof(short), async).ConfigureAwait(false);
}
else if (len > ReadBuffer.ReadBytesLeft)
{
Expand Down Expand Up @@ -1399,22 +1402,14 @@ internal ValueTask<IBackendMessage> ReadMessage(
}
}

internal IBackendMessage? ParseResultSetMessage(NpgsqlReadBuffer buf, BackendMessageCode code, int len, bool handleCallbacks = false)
=> code switch
{
BackendMessageCode.DataRow => _dataRowMessage.Load(len),
BackendMessageCode.CommandComplete => _commandCompleteMessage.Load(buf, len),
_ => ParseServerMessage(buf, code, len, false, handleCallbacks)
};

internal IBackendMessage? ParseServerMessage(NpgsqlReadBuffer buf, BackendMessageCode code, int len, bool isPrependedMessage, bool handleCallbacks = true)
internal IBackendMessage? ParseServerMessage(NpgsqlReadBuffer buf, BackendMessageCode code, int len, bool isPrependedMessage = false)
{
switch (code)
{
case BackendMessageCode.RowDescription:
return _rowDescriptionMessage.Load(buf, SerializerOptions);
case BackendMessageCode.DataRow:
return _dataRowMessage.Load(len);
return _dataRowMessage.Load(len, buf.ReadInt16());
case BackendMessageCode.CommandComplete:
return _commandCompleteMessage.Load(buf, len);
case BackendMessageCode.ReadyForQuery:
Expand Down Expand Up @@ -1443,18 +1438,12 @@ internal ValueTask<IBackendMessage> ReadMessage(
ReadParameterStatus(buf.GetNullTerminatedBytes(), buf.GetNullTerminatedBytes());
return null;
case BackendMessageCode.NoticeResponse:
if (handleCallbacks)
{
var notice = PostgresNotice.Load(buf, Settings.IncludeErrorDetail, LoggingConfiguration.ExceptionLogger);
LogMessages.ReceivedNotice(ConnectionLogger, notice.MessageText, Id);
Connection?.OnNotice(notice);
}
var notice = PostgresNotice.Load(buf, Settings.IncludeErrorDetail, LoggingConfiguration.ExceptionLogger);
LogMessages.ReceivedNotice(ConnectionLogger, notice.MessageText, Id);
Connection?.OnNotice(notice);
return null;
case BackendMessageCode.NotificationResponse:
if (handleCallbacks)
{
Connection?.OnNotification(new NpgsqlNotificationEventArgs(buf));
}
Connection?.OnNotification(new NpgsqlNotificationEventArgs(buf));
return null;

case BackendMessageCode.AuthenticationRequest:
Expand Down
19 changes: 12 additions & 7 deletions src/Npgsql/Internal/NpgsqlReadBuffer.Stream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ internal sealed class ColumnStream : Stream
int _read;
bool _canSeek;
bool _commandScoped;
bool _consumeOnDispose;
/// Does not throw ODE.
internal int CurrentLength { get; private set; }
internal bool IsDisposed { get; private set; }
Expand All @@ -28,7 +29,7 @@ internal ColumnStream(NpgsqlConnector connector)
IsDisposed = true;
}

internal void Init(int len, bool canSeek, bool commandScoped)
internal void Init(int len, bool canSeek, bool commandScoped, bool consumeOnDispose = true)
{
Debug.Assert(!canSeek || _buf.ReadBytesLeft >= len,
"Seekable stream constructed but not all data is in buffer (sequential)");
Expand All @@ -41,6 +42,7 @@ internal void Init(int len, bool canSeek, bool commandScoped)
_read = 0;

_commandScoped = commandScoped;
_consumeOnDispose = consumeOnDispose;
IsDisposed = false;
}

Expand Down Expand Up @@ -195,22 +197,25 @@ void CheckDisposed()
}

protected override void Dispose(bool disposing)
=> DisposeAsync(disposing, async: false).GetAwaiter().GetResult();
{
if (disposing)
DisposeCore(async: false).GetAwaiter().GetResult();
}

public override ValueTask DisposeAsync()
=> DisposeAsync(disposing: true, async: true);
=> DisposeCore(async: true);

async ValueTask DisposeAsync(bool disposing, bool async)
async ValueTask DisposeCore(bool async)
{
if (IsDisposed || !disposing)
if (IsDisposed)
return;

if (!_connector.IsBroken)
if (_consumeOnDispose && !_connector.IsBroken)
{
var pos = _buf.CumulativeReadPosition - _startPos;
var remaining = checked((int)(CurrentLength - pos));
if (remaining > 0)
await _buf.Skip(remaining, async).ConfigureAwait(false);
await _buf.Skip(async, remaining).ConfigureAwait(false);
}

IsDisposed = true;
Expand Down
61 changes: 57 additions & 4 deletions src/Npgsql/Internal/NpgsqlReadBuffer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.IO;
using System.Net.Sockets;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -13,6 +14,37 @@

namespace Npgsql.Internal;

readonly struct MessageHeader(BackendMessageCode code, int length)
{
public const int ByteCount = sizeof(byte) + sizeof(int);

public BackendMessageCode Code { get; } = code;
public int Length { get; } = length;

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static bool TryParse(ref ReadOnlySpan<byte> source, out MessageHeader header)
{
const int headerSize = sizeof(byte) + sizeof(int);

if (source.Length < headerSize)
{
header = default;
return false;
}

ref var first = ref MemoryMarshal.GetReference(source);
var code = (BackendMessageCode)first;
var length =
BitConverter.IsLittleEndian
? BinaryPrimitives.ReverseEndianness(Unsafe.ReadUnaligned<int>(ref Unsafe.Add(ref first, 1)))
: Unsafe.ReadUnaligned<int>(ref Unsafe.Add(ref first, 1));

header = new MessageHeader(code, length - sizeof(int)); // Transmitted length includes itself
source = source.Slice(headerSize);
return true;
}
}

/// <summary>
/// A buffer used by Npgsql to read data from the socket efficiently.
/// Provides methods which decode different values types and tracks the current position.
Expand Down Expand Up @@ -415,8 +447,29 @@ internal NpgsqlReadBuffer AllocateOversize(int count)
}

/// <summary>
/// Does not perform any I/O - assuming that the bytes to be skipped are in the memory buffer.
/// Skip a given number of bytes.
/// </summary>
internal void Skip(int len, bool allowIO)
{
Debug.Assert(len >= 0);

if (allowIO && len > ReadBytesLeft)
{
len -= ReadBytesLeft;
while (len > Size)
{
ResetPosition();
Ensure(Size);
len -= Size;
}
ResetPosition();
Ensure(len);
}

Debug.Assert(ReadBytesLeft >= len);
ReadPosition += len;
}

internal void Skip(int len)
{
Debug.Assert(ReadBytesLeft >= len);
Expand All @@ -426,7 +479,7 @@ internal void Skip(int len)
/// <summary>
/// Skip a given number of bytes.
/// </summary>
public async Task Skip(int len, bool async)
public async Task Skip(bool async, int len)
{
Debug.Assert(len >= 0);

Expand Down Expand Up @@ -673,11 +726,11 @@ static async ValueTask<int> ReadAsyncLong(NpgsqlReadBuffer buffer, bool commandS
}

ColumnStream? _lastStream;
public ColumnStream CreateStream(int len, bool canSeek)
public ColumnStream CreateStream(int len, bool canSeek, bool consumeOnDispose = true)
{
if (_lastStream is not { IsDisposed: true })
_lastStream = new ColumnStream(Connector);
_lastStream.Init(len, canSeek, !Connector.LongRunningConnection);
_lastStream.Init(len, canSeek, !Connector.LongRunningConnection, consumeOnDispose);
return _lastStream;
}

Expand Down
7 changes: 7 additions & 0 deletions src/Npgsql/Internal/PgConverter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Buffers;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;

Expand Down Expand Up @@ -176,6 +177,12 @@ static class PgConverterExtensions

return size;
}

internal static PgConverter<T> UnsafeDowncast<T>(this PgConverter converter)
{
Debug.Assert(converter is PgConverter<T>);
return Unsafe.As<PgConverter<T>>(converter);
}
}

interface IResumableRead
Expand Down
Loading