using System;
using System.Collections.Generic;
using System.Data;
using System.Diagnostics;
using System.IO;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using JetBrains.Annotations;
using Npgsql.BackendMessages;
using Npgsql.TypeHandling;
namespace Npgsql
{
///
/// A sequential reader, which does not buffer rows in memory, and requires columns to be
/// read in-order only. Returned when is passed
/// to .
///
///
/// This reader is suitable in scenarios where a single row is very large, and holding
/// it in memory is undesirable.
///
#pragma warning disable CA1010
sealed class NpgsqlSequentialDataReader : NpgsqlDataReader
#pragma warning restore CA1010
{
///
/// The number of columns in the current row
///
int _numColumns;
///
/// The index of the column that we're on, i.e. that has already been parsed, is
/// is memory and can be retrieved. Initialized to -1
///
int _column;
internal NpgsqlSequentialDataReader(NpgsqlConnector connector)
: base(connector) {}
internal override void Init(NpgsqlCommand command, CommandBehavior behavior, List statements, Task sendTask)
{
base.Init(command, behavior, statements, sendTask);
Debug.Assert(!command.Parameters.HasOutputParameters);
// In sequential reading mode we always use the connector's buffer, unlike in non-sequential
// where an oversize buffer may be allocated for a big DataRow
Buffer = Connector.ReadBuffer;
}
internal override ValueTask ReadMessage(bool async)
=> Connector.ReadMessage(async, DataRowLoadingMode.Sequential);
internal override void ProcessDataMessage(DataRowMessage dataMsg)
{
_column = -1;
ColumnLen = -1;
PosInColumn = 0;
}
public override Task GetFieldValueAsync(int ordinal, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
using (NoSynchronizationContextScope.Enter())
return GetFieldValue(ordinal, true).AsTask();
}
public override T GetFieldValue(int column)
=> GetFieldValue(column, false).GetAwaiter().GetResult();
async ValueTask GetFieldValue(int column, bool async)
{
CheckRowAndOrdinal(column);
await SeekToColumn(column, async);
CheckColumnStart();
if (ColumnLen == -1)
{
if (NullableHandler.Exists)
return default;
else
throw new InvalidCastException("Column is null");
}
var fieldDescription = RowDescription[column];
try
{
if (NullableHandler.Exists)
{
return ColumnLen <= Buffer.ReadBytesLeft
? NullableHandler.Read(Buffer, ColumnLen, fieldDescription)
: await NullableHandler.ReadAsync(Buffer, ColumnLen, async, fieldDescription);
}
if (typeof(T) == typeof(object))
{
return ColumnLen <= Buffer.ReadBytesLeft
? (T)fieldDescription.Handler.ReadAsObject(Buffer, ColumnLen, fieldDescription)
: (T)await fieldDescription.Handler.ReadAsObject(Buffer, ColumnLen, async, fieldDescription);
}
else
{
return ColumnLen <= Buffer.ReadBytesLeft
? fieldDescription.Handler.Read(Buffer, ColumnLen, fieldDescription)
: await fieldDescription.Handler.Read(Buffer, ColumnLen, async, fieldDescription);
}
}
catch (NpgsqlSafeReadException e)
{
throw e.InnerException;
}
catch
{
Connector.Break();
throw;
}
finally
{
// Important in case a NpgsqlSafeReadException was thrown, position must still be updated
PosInColumn += ColumnLen;
}
}
///
/// Gets the value of the specified column as an instance of .
///
/// The zero-based column ordinal.
/// The value of the specified column.
public override object GetValue(int column)
{
CheckRowAndOrdinal(column);
SeekToColumn(column, false).GetAwaiter().GetResult();
CheckColumnStart();
if (ColumnLen == -1)
return DBNull.Value;
var fieldDescription = RowDescription[column];
object result;
try
{
result = fieldDescription.Handler.ReadAsObject(Buffer, ColumnLen, false, fieldDescription)
.GetAwaiter().GetResult();
}
catch (NpgsqlSafeReadException e)
{
throw e.InnerException;
}
catch
{
Connector.Break();
throw;
}
finally
{
// Important in case a NpgsqlSafeReadException was thrown, position must still be updated
PosInColumn += ColumnLen;
}
// Used for Entity Framework <= 6 compability
if (Command.ObjectResultTypes?[column] != null)
{
var type = Command.ObjectResultTypes[column];
result = type == typeof(DateTimeOffset)
? new DateTimeOffset((DateTime)result)
: Convert.ChangeType(result, type);
}
return result;
}
///
/// Gets the value of the specified column as an instance of .
///
/// The zero-based column ordinal.
/// The value of the specified column.
public override object GetProviderSpecificValue(int column)
{
CheckRowAndOrdinal(column);
SeekToColumn(column, false).GetAwaiter().GetResult();
CheckColumnStart();
if (ColumnLen == -1)
return DBNull.Value;
var fieldDescription = RowDescription[column];
try
{
return fieldDescription.Handler.ReadPsvAsObject(Buffer, ColumnLen, false, fieldDescription)
.GetAwaiter().GetResult();
}
catch (NpgsqlSafeReadException e)
{
throw e.InnerException;
}
catch
{
Connector.Break();
throw;
}
finally
{
// Important in case a NpgsqlSafeReadException was thrown, position must still be updated
PosInColumn += ColumnLen;
}
}
///
/// Seeks to the given column. The 4-byte length is read and stored in .
///
internal override async Task SeekToColumn(int column, bool async)
{
if (_column == -1)
{
await Buffer.Ensure(2, async);
_numColumns = Buffer.ReadInt16();
}
if (column < 0 || column >= _numColumns)
throw new IndexOutOfRangeException("Column index out of range");
if (column < _column)
throw new InvalidOperationException($"Invalid attempt to read from column ordinal '{column}'. With CommandBehavior.SequentialAccess, you may only read from column ordinal '{_column}' or greater.");
if (column == _column)
return;
// Need to seek forward
// Shut down any streaming going on on the column
if (ColumnStream != null)
{
ColumnStream.Dispose();
ColumnStream = null;
// Disposing the stream leaves us at the end of the column
PosInColumn = ColumnLen;
}
// Skip to end of column if needed
// TODO: Simplify by better initializing _columnLen/_posInColumn
var remainingInColumn = ColumnLen == -1 ? 0 : ColumnLen - PosInColumn;
if (remainingInColumn > 0)
await Buffer.Skip(remainingInColumn, async);
// Skip over unwanted fields
for (; _column < column - 1; _column++)
{
await Buffer.Ensure(4, async);
var len = Buffer.ReadInt32();
if (len != -1)
await Buffer.Skip(len, async);
}
await Buffer.Ensure(4, async);
ColumnLen = Buffer.ReadInt32();
PosInColumn = 0;
_column = column;
}
internal override async Task SeekInColumn(int posInColumn, bool async)
{
Debug.Assert(_column > -1);
if (posInColumn < PosInColumn)
throw new InvalidOperationException("Attempt to read a position in the column which has already been read");
if (posInColumn > ColumnLen)
posInColumn = ColumnLen;
if (posInColumn > PosInColumn)
{
await Buffer.Skip(posInColumn - PosInColumn, async);
PosInColumn = posInColumn;
}
}
internal override async Task ConsumeRow(bool async)
{
if (_column == -1)
{
await Buffer.Ensure(2, async);
_numColumns = Buffer.ReadInt16();
}
if (ColumnStream != null)
{
ColumnStream.Dispose();
ColumnStream = null;
// Disposing the stream leaves us at the end of the column
PosInColumn = ColumnLen;
}
// TODO: Potential for code-sharing with ReadColumn above, which also skips
// Skip to end of column if needed
var remainingInColumn = ColumnLen == -1 ? 0 : ColumnLen - PosInColumn;
if (remainingInColumn > 0)
await Buffer.Skip(remainingInColumn, async);
// Skip over the remaining columns in the row
for (; _column < _numColumns - 1; _column++)
{
await Buffer.Ensure(4, async);
var len = Buffer.ReadInt32();
if (len != -1)
await Buffer.Skip(len, async);
}
}
#region IsDBNull
///
/// Gets a value that indicates whether the column contains nonexistent or missing values.
///
/// The zero-based column ordinal.
/// true if the specified column is equivalent to ; otherwise false.
public override bool IsDBNull(int ordinal) => IsDBNull(ordinal, false).GetAwaiter().GetResult();
///
/// An asynchronous version of , which gets a value that indicates whether the column contains non-existent or missing values.
/// The parameter is currently ignored.
///
/// The zero-based column to be retrieved.
/// Currently ignored.
/// true if the specified column value is equivalent to otherwise false.
public override Task IsDBNullAsync(int ordinal, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
using (NoSynchronizationContextScope.Enter())
return IsDBNull(ordinal, true);
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
// ReSharper disable once InconsistentNaming
async Task IsDBNull(int ordinal, bool async)
{
CheckRowAndOrdinal(ordinal);
await SeekToColumn(ordinal, async);
return ColumnLen == -1;
}
///
/// Reads a stream of bytes from the specified column, starting at location indicated by dataOffset, into the buffer, starting at the location indicated by bufferOffset.
///
/// The zero-based column ordinal.
/// The index within the row from which to begin the read operation.
/// The buffer into which to copy the data.
/// The index with the buffer to which the data will be copied.
/// The maximum number of characters to read.
/// The actual number of bytes read.
public override long GetBytes(int ordinal, long dataOffset, [CanBeNull] byte[] buffer, int bufferOffset, int length)
{
var read = base.GetBytes(ordinal, dataOffset, buffer, bufferOffset, length);
if (buffer != null) // If buffer is null we're just getting the length, no change in position
PosInColumn += (int)read;
return read;
}
#endregion
void CheckColumnStart()
{
if (PosInColumn != 0)
throw new InvalidOperationException("Attempt to read a position in the column which has already been read");
}
}
}