Skip to content
Prev Previous commit
Next Next commit
Make sure we don't consume data in the stream during reader restart
  • Loading branch information
NinoFloris committed Mar 25, 2024
commit 57d8264d90cbb8d3e7cb6fb5db5cc2b7c3199122
17 changes: 11 additions & 6 deletions src/Npgsql/Internal/NpgsqlReadBuffer.Stream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ internal sealed class ColumnStream : Stream
int _read;
bool _canSeek;
bool _commandScoped;
bool _consumeOnDispose;
/// Does not throw ODE.
internal int CurrentLength { get; private set; }
internal bool IsDisposed { get; private set; }
Expand All @@ -28,7 +29,7 @@ internal ColumnStream(NpgsqlConnector connector)
IsDisposed = true;
}

internal void Init(int len, bool canSeek, bool commandScoped)
internal void Init(int len, bool canSeek, bool commandScoped, bool consumeOnDispose = true)
{
Debug.Assert(!canSeek || _buf.ReadBytesLeft >= len,
"Seekable stream constructed but not all data is in buffer (sequential)");
Expand All @@ -41,6 +42,7 @@ internal void Init(int len, bool canSeek, bool commandScoped)
_read = 0;

_commandScoped = commandScoped;
_consumeOnDispose = consumeOnDispose;
IsDisposed = false;
}

Expand Down Expand Up @@ -195,17 +197,20 @@ void CheckDisposed()
}

protected override void Dispose(bool disposing)
=> DisposeAsync(disposing, async: false).GetAwaiter().GetResult();
{
if (disposing)
DisposeCore(async: false).GetAwaiter().GetResult();
}

public override ValueTask DisposeAsync()
=> DisposeAsync(disposing: true, async: true);
=> DisposeCore(async: true);

async ValueTask DisposeAsync(bool disposing, bool async)
async ValueTask DisposeCore(bool async)
{
if (IsDisposed || !disposing)
if (IsDisposed)
return;

if (!_connector.IsBroken)
if (_consumeOnDispose && !_connector.IsBroken)
{
var pos = _buf.CumulativeReadPosition - _startPos;
var remaining = checked((int)(CurrentLength - pos));
Expand Down
4 changes: 2 additions & 2 deletions src/Npgsql/Internal/NpgsqlReadBuffer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -688,11 +688,11 @@ static async ValueTask<int> ReadAsyncLong(NpgsqlReadBuffer buffer, bool commandS
}

ColumnStream? _lastStream;
public ColumnStream CreateStream(int len, bool canSeek)
public ColumnStream CreateStream(int len, bool canSeek, bool consumeOnDispose = true)
{
if (_lastStream is not { IsDisposed: true })
_lastStream = new ColumnStream(Connector);
_lastStream.Init(len, canSeek, !Connector.LongRunningConnection);
_lastStream.Init(len, canSeek, !Connector.LongRunningConnection, consumeOnDispose);
return _lastStream;
}

Expand Down
27 changes: 15 additions & 12 deletions src/Npgsql/Internal/PgReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ NpgsqlReadBuffer.ColumnStream GetColumnStream(bool canSeek = false, int? length

length ??= CurrentRemaining;
CheckBounds(length.GetValueOrDefault());
return _userActiveStream = _buffer.CreateStream(length.GetValueOrDefault(), canSeek && length <= _buffer.ReadBytesLeft);
return _userActiveStream = _buffer.CreateStream(length.GetValueOrDefault(), canSeek && length <= _buffer.ReadBytesLeft, consumeOnDispose: false);
}

public TextReader GetTextReader(Encoding encoding)
Expand Down Expand Up @@ -344,15 +344,16 @@ public async ValueTask<ReadOnlySequence<byte>> ReadBytesAsync(int count, Cancell

public void Rewind(int count)
{
// Shut down any streaming going on on the column
DisposeUserActiveStream(async: false).GetAwaiter().GetResult();

if (CurrentOffset < count)
ThrowHelper.ThrowArgumentOutOfRangeException(nameof(count), "Attempt to rewind past the current field start.");

if (_buffer.ReadPosition < count)
ThrowHelper.ThrowArgumentOutOfRangeException(nameof(count), "Attempt to rewind past the buffer start, some of this data is no longer part of the underlying buffer.");

// Shut down any streaming going on on the column
if (StreamActive)
DisposeUserActiveStream(async: false).GetAwaiter().GetResult();

_buffer.ReadPosition -= count;
}

Expand All @@ -363,14 +364,10 @@ public void Rewind(int count)
/// <returns>The stream length, if any</returns>
async ValueTask DisposeUserActiveStream(bool async)
{
if (StreamActive)
{
if (async)
await _userActiveStream.DisposeAsync().ConfigureAwait(false);
else
_userActiveStream.Dispose();
}

if (async)
await (_userActiveStream?.DisposeAsync() ?? new()).ConfigureAwait(false);
else
_userActiveStream?.Dispose();
_userActiveStream = null;
}

Expand Down Expand Up @@ -540,6 +537,9 @@ public void Consume(int? count = null)
if (count > currentRemaining)
ThrowHelper.ThrowArgumentOutOfRangeException(nameof(count), "Attempt to read past the end of the current field size.");

if (StreamActive)
DisposeUserActiveStream(async: false).GetAwaiter().GetResult();

var origOffset = FieldOffset;
// A breaking exception unwind from a nested scope should not try to consume its remaining data.
if (!_buffer.Connector.IsBroken)
Expand All @@ -559,6 +559,9 @@ public async ValueTask ConsumeAsync(int? count = null, CancellationToken cancell
if (count > currentRemaining)
ThrowHelper.ThrowArgumentOutOfRangeException(nameof(count), "Attempt to read past the end of the current field size.");

if (StreamActive)
await DisposeUserActiveStream(async: true).ConfigureAwait(false);

var origOffset = FieldOffset;
// A breaking exception unwind from a nested scope should not try to consume its remaining data.
if (!_buffer.Connector.IsBroken)
Expand Down