Skip to content
Prev Previous commit
Next Next commit
Refactor SeekToColumn to a sync and an async variant
Instead of two separate implementations for buffered and sequential
  • Loading branch information
NinoFloris committed Mar 25, 2024
commit d8c0efaac28cc25bae1cd6d14e0adc9faaef7da7
19 changes: 17 additions & 2 deletions src/Npgsql/Internal/NpgsqlReadBuffer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -415,10 +415,25 @@ internal NpgsqlReadBuffer AllocateOversize(int count)
}

/// <summary>
/// Does not perform any I/O - assuming that the bytes to be skipped are in the memory buffer.
/// Skip a given number of bytes.
/// </summary>
internal void Skip(int len)
internal void Skip(int len, bool allowIO = false)
{
Debug.Assert(len >= 0);

if (allowIO && len > ReadBytesLeft)
{
len -= ReadBytesLeft;
while (len > Size)
{
ResetPosition();
Ensure(Size);
len -= Size;
}
ResetPosition();
Ensure(len);
}

Debug.Assert(ReadBytesLeft >= len);
ReadPosition += len;
}
Expand Down
46 changes: 15 additions & 31 deletions src/Npgsql/Internal/PgReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -424,42 +424,23 @@ internal void AdvanceCharsRead(int charsRead)
internal void InitCharsRead(int dataOffset, ArraySegment<char>? buffer, out int? charsRead)
{
if (!Resumable)
throw new InvalidOperationException("Wasn't initialized as resumed");
ThrowHelper.ThrowInvalidOperationException("Reader was not initialized as resumable");

charsRead = _charsReadReader is null ? null : _charsRead;
_charsReadOffset = dataOffset;
_charsReadBuffer = buffer;
}

internal PgReader Init(int fieldLength, DataFormat format, bool resumable = false)
internal void Init(int fieldSize, DataFormat fieldFormat, bool resumable = false)
{
if (Initialized)
{
if (resumable)
{
if (Resumable)
return this;
_resumable = true;
}
else
{
if (!IsAtStart)
ThrowHelper.ThrowInvalidOperationException("Cannot be initialized to be non-resumable until a commit is issued.");
_resumable = false;
}
}
else
{
_fieldStartPos = _buffer.CumulativeReadPosition;
_fieldConsumed = false;
}
ThrowHelper.ThrowInvalidOperationException("Already initialized");

Debug.Assert(!_requiresCleanup, "Reader wasn't properly committed before next init");

_fieldSize = fieldLength;
_fieldFormat = format;
_fieldStartPos = _buffer.CumulativeReadPosition;
_fieldConsumed = false;
_fieldSize = fieldSize;
_fieldFormat = fieldFormat;
_resumable = resumable;
return this;
}

internal void StartRead(Size bufferRequirement)
Expand Down Expand Up @@ -571,7 +552,6 @@ internal void ThrowIfStreamActive()
ThrowHelper.ThrowInvalidOperationException("A stream is already open for this reader");
}

internal bool CommitHasIO(bool resuming) => Initialized && !resuming && FieldRemaining > 0;
[MethodImpl(MethodImplOptions.NoInlining)]
void Cleanup()
{
Expand Down Expand Up @@ -601,18 +581,21 @@ void ResetCurrent()
_currentSize = -1;
}

internal void Restart(bool resume)
internal void Restart(bool resumable)
{
if (!Initialized)
ThrowHelper.ThrowInvalidOperationException("Cannot restart a non-initialized reader.");

if (resume)
// We resume if the reader was initialized as resumable and we're not explicitly restarting as non-resumable.
// When the field size is -1 we're always restarting as resumable, to allow rereading null values endlessly.
if ((Resumable && resumable) || FieldSize is -1)
{
if (!Resumable)
ThrowHelper.ThrowInvalidOperationException("Cannot resume a non-resumable read.");
_resumable = resumable || FieldSize is -1;
return;
}

// From this point on we're not resuming, we're resetting any remaining state and rewinding our position.

// Shut down any streaming and pooling going on on the column.
if (_requiresCleanup)
Cleanup();
Expand All @@ -621,6 +604,7 @@ internal void Restart(bool resume)
ResetCurrent();

_fieldConsumed = false;
_resumable = resumable;
Seek(0);

Debug.Assert(Initialized);
Expand Down
Loading