Skip to content

Commit 3592cee

Browse files
authored
Column seeking rework (#5476)
1 parent 447387c commit 3592cee

File tree

12 files changed

+358
-435
lines changed

12 files changed

+358
-435
lines changed

src/Npgsql/Internal/NpgsqlConnector.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1295,7 +1295,7 @@ internal ValueTask<IBackendMessage> ReadMessage(
12951295
{
12961296
if (dataRowLoadingMode == DataRowLoadingMode.Skip)
12971297
{
1298-
await ReadBuffer.Skip(len, async).ConfigureAwait(false);
1298+
await ReadBuffer.Skip(async, len).ConfigureAwait(false);
12991299
continue;
13001300
}
13011301
}

src/Npgsql/Internal/NpgsqlReadBuffer.Stream.cs

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ internal sealed class ColumnStream : Stream
1717
int _read;
1818
bool _canSeek;
1919
bool _commandScoped;
20+
bool _consumeOnDispose;
2021
/// Does not throw ODE.
2122
internal int CurrentLength { get; private set; }
2223
internal bool IsDisposed { get; private set; }
@@ -28,7 +29,7 @@ internal ColumnStream(NpgsqlConnector connector)
2829
IsDisposed = true;
2930
}
3031

31-
internal void Init(int len, bool canSeek, bool commandScoped)
32+
internal void Init(int len, bool canSeek, bool commandScoped, bool consumeOnDispose = true)
3233
{
3334
Debug.Assert(!canSeek || _buf.ReadBytesLeft >= len,
3435
"Seekable stream constructed but not all data is in buffer (sequential)");
@@ -41,6 +42,7 @@ internal void Init(int len, bool canSeek, bool commandScoped)
4142
_read = 0;
4243

4344
_commandScoped = commandScoped;
45+
_consumeOnDispose = consumeOnDispose;
4446
IsDisposed = false;
4547
}
4648

@@ -195,22 +197,25 @@ void CheckDisposed()
195197
}
196198

197199
protected override void Dispose(bool disposing)
198-
=> DisposeAsync(disposing, async: false).GetAwaiter().GetResult();
200+
{
201+
if (disposing)
202+
DisposeCore(async: false).GetAwaiter().GetResult();
203+
}
199204

200205
public override ValueTask DisposeAsync()
201-
=> DisposeAsync(disposing: true, async: true);
206+
=> DisposeCore(async: true);
202207

203-
async ValueTask DisposeAsync(bool disposing, bool async)
208+
async ValueTask DisposeCore(bool async)
204209
{
205-
if (IsDisposed || !disposing)
210+
if (IsDisposed)
206211
return;
207212

208-
if (!_connector.IsBroken)
213+
if (_consumeOnDispose && !_connector.IsBroken)
209214
{
210215
var pos = _buf.CumulativeReadPosition - _startPos;
211216
var remaining = checked((int)(CurrentLength - pos));
212217
if (remaining > 0)
213-
await _buf.Skip(remaining, async).ConfigureAwait(false);
218+
await _buf.Skip(async, remaining).ConfigureAwait(false);
214219
}
215220

216221
IsDisposed = true;

src/Npgsql/Internal/NpgsqlReadBuffer.cs

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -411,8 +411,29 @@ internal NpgsqlReadBuffer AllocateOversize(int count)
411411
}
412412

413413
/// <summary>
414-
/// Does not perform any I/O - assuming that the bytes to be skipped are in the memory buffer.
414+
/// Skip a given number of bytes.
415415
/// </summary>
416+
internal void Skip(int len, bool allowIO)
417+
{
418+
Debug.Assert(len >= 0);
419+
420+
if (allowIO && len > ReadBytesLeft)
421+
{
422+
len -= ReadBytesLeft;
423+
while (len > Size)
424+
{
425+
ResetPosition();
426+
Ensure(Size);
427+
len -= Size;
428+
}
429+
ResetPosition();
430+
Ensure(len);
431+
}
432+
433+
Debug.Assert(ReadBytesLeft >= len);
434+
ReadPosition += len;
435+
}
436+
416437
internal void Skip(int len)
417438
{
418439
Debug.Assert(ReadBytesLeft >= len);
@@ -422,7 +443,7 @@ internal void Skip(int len)
422443
/// <summary>
423444
/// Skip a given number of bytes.
424445
/// </summary>
425-
public async Task Skip(int len, bool async)
446+
public async Task Skip(bool async, int len)
426447
{
427448
Debug.Assert(len >= 0);
428449

@@ -658,11 +679,11 @@ static async ValueTask<int> ReadAsyncLong(NpgsqlReadBuffer buffer, bool commandS
658679
}
659680

660681
ColumnStream? _lastStream;
661-
public ColumnStream CreateStream(int len, bool canSeek)
682+
public ColumnStream CreateStream(int len, bool canSeek, bool consumeOnDispose = true)
662683
{
663684
if (_lastStream is not { IsDisposed: true })
664685
_lastStream = new ColumnStream(Connector);
665-
_lastStream.Init(len, canSeek, !Connector.LongRunningConnection);
686+
_lastStream.Init(len, canSeek, !Connector.LongRunningConnection, consumeOnDispose);
666687
return _lastStream;
667688
}
668689

src/Npgsql/Internal/PgBufferedConverter.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ public override Size GetSize(SizeContext context, T value, ref object? writeStat
1818

1919
public sealed override T Read(PgReader reader)
2020
{
21-
// We check IsAtStart first to speed up primitive reads.
22-
if (!reader.IsAtStart && reader.ShouldBufferCurrent())
21+
// We check FieldAtStart to speed up simple value reads, as field level buffering was handled by reader.StartRead() already.
22+
if (!reader.FieldAtStart && reader.ShouldBufferCurrent())
2323
ThrowIORequired(reader.CurrentBufferRequirement);
2424

2525
return ReadCore(reader);

0 commit comments

Comments
 (0)