using System; using System.Collections.Generic; using System.Data; using System.Diagnostics; using System.IO; using System.Linq; using System.Reflection; using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; using JetBrains.Annotations; using Npgsql.BackendMessages; using Npgsql.TypeHandling; namespace Npgsql { /// /// The default, non-sequential reader, which buffers entire rows in memory. /// #pragma warning disable CA1010 sealed class NpgsqlDefaultDataReader : NpgsqlDataReader #pragma warning restore CA1010 { /// /// The number of columns in the current row /// int _column; readonly List<(int Offset, int Length)> _columns = new List<(int Offset, int Length)>(); int _dataMsgEnd; internal NpgsqlDefaultDataReader(NpgsqlConnector connector) : base(connector) {} internal override ValueTask ReadMessage(bool async) => Connector.ReadMessage(async); protected override Task NextResult(bool async, bool isConsuming=false) { var task = base.NextResult(async, isConsuming); if (Command.Parameters.HasOutputParameters && StatementIndex == 0) { // Populate the output parameters from the first row of the first resultset return task.ContinueWith((t, o) => { if (HasRows) PopulateOutputParameters(); return t.Result; }, null); } return task; } /// /// The first row in a stored procedure command that has output parameters needs to be traversed twice - /// once for populating the output parameters and once for the actual result set traversal. So in this /// case we can't be sequential. /// void PopulateOutputParameters() { Debug.Assert(Command.Parameters.Any(p => p.IsOutputDirection)); Debug.Assert(StatementIndex == 0); Debug.Assert(RowDescription != null); Debug.Assert(State == ReaderState.BeforeResult); // Temporarily set our state to InResult to allow us to read the values State = ReaderState.InResult; var pending = new Queue(); var taken = new List(); foreach (var p in Command.Parameters.Where(p => p.IsOutputDirection)) { if (RowDescription.TryGetFieldIndex(p.TrimmedName, out var idx)) { // TODO: Provider-specific check? p.Value = GetValue(idx); taken.Add(idx); } else pending.Enqueue(p); } for (var i = 0; pending.Count != 0 && i != RowDescription.NumFields; ++i) { // TODO: Need to get the provider-specific value based on the out param's type if (!taken.Contains(i)) pending.Dequeue().Value = GetValue(i); } State = ReaderState.BeforeResult; // Set the state back } [MethodImpl(MethodImplOptions.AggressiveInlining)] internal override Task ConsumeRow(bool async) { Debug.Assert(State == ReaderState.InResult || State == ReaderState.BeforeResult); if (ColumnStream != null) { ColumnStream.Dispose(); ColumnStream = null; } Buffer.ReadPosition = _dataMsgEnd; return PGUtil.CompletedTask; } internal override void ProcessDataMessage(DataRowMessage dataMsg) { // The connector's buffer can actually change between DataRows: // If a large DataRow exceeding the connector's current read buffer arrives, and we're // reading in non-sequential mode, a new oversize buffer is allocated. We thus have to // recapture the connector's buffer on each new DataRow. Buffer = Connector.ReadBuffer; _dataMsgEnd = Buffer.ReadPosition + dataMsg.Length; // We currently assume that the row's number of columns is identical to the description's #if DEBUG var numColumns = Buffer.ReadInt16(); Debug.Assert(RowDescription.NumFields == numColumns); #else Buffer.ReadPosition += 2; #endif _column = -1; _columns.Clear(); // Initialize our columns array with the offset and length of the first column var len = Buffer.ReadInt32(); _columns.Add((Buffer.ReadPosition, len)); } // We know the entire row is buffered in memory (non-sequential reader), so no I/O will be performed public override Task GetFieldValueAsync(int column, CancellationToken cancellationToken) => Task.FromResult(GetFieldValue(column)); public override T GetFieldValue(int column) { CheckRowAndOrdinal(column); SeekToColumn(column); if (ColumnLen == -1) { if (NullableHandler.Exists) return default; else throw new InvalidCastException("Column is null"); } var fieldDescription = RowDescription[column]; try { if (NullableHandler.Exists) return NullableHandler.Read(Buffer, ColumnLen, fieldDescription); return typeof(T) == typeof(object) ? (T)fieldDescription.Handler.ReadAsObject(Buffer, ColumnLen, fieldDescription) : fieldDescription.Handler.Read(Buffer, ColumnLen, fieldDescription); } catch (NpgsqlSafeReadException e) { throw e.InnerException; } catch { Connector.Break(); throw; } } /// /// Gets the value of the specified column as an instance of . /// /// The zero-based column ordinal. /// The value of the specified column. public override object GetValue(int ordinal) { CheckRowAndOrdinal(ordinal); SeekToColumn(ordinal); if (ColumnLen == -1) return DBNull.Value; var fieldDescription = RowDescription[ordinal]; object result; try { result = fieldDescription.Handler.ReadAsObject(Buffer, ColumnLen, fieldDescription); } catch (NpgsqlSafeReadException e) { throw e.InnerException; } catch { Connector.Break(); throw; } // Used for Entity Framework <= 6 compability if (Command.ObjectResultTypes?[ordinal] != null) { var type = Command.ObjectResultTypes[ordinal]; result = type == typeof(DateTimeOffset) ? new DateTimeOffset((DateTime)result) : Convert.ChangeType(result, type); } return result; } /// /// Gets the value of the specified column as an instance of . /// /// The zero-based column ordinal. /// The value of the specified column. public override object GetProviderSpecificValue(int ordinal) { CheckRowAndOrdinal(ordinal); SeekToColumn(ordinal); if (ColumnLen == -1) return DBNull.Value; var fieldDescription = RowDescription[ordinal]; object result; try { // TODO: Maybe call a non-async method which would allow simple type handlers (and // maybe also text) to read without going through async... result = fieldDescription.Handler.ReadPsvAsObject(Buffer, ColumnLen, fieldDescription); } catch (NpgsqlSafeReadException e) { throw e.InnerException; } catch { Connector.Break(); throw; } return result; } /// /// Gets a value that indicates whether the column contains nonexistent or missing values. /// /// The zero-based column ordinal. /// true if the specified column is equivalent to ; otherwise false. public override bool IsDBNull(int ordinal) { CheckRowAndOrdinal(ordinal); SeekToColumn(ordinal); return ColumnLen == -1; } void SeekToColumn(int column) { // Shut down any streaming going on on the column if (ColumnStream != null) { ColumnStream.Dispose(); ColumnStream = null; } for (var lastColumnRead = _columns.Count; column >= lastColumnRead; lastColumnRead++) { int lastColumnLen; (Buffer.ReadPosition, lastColumnLen) = _columns[lastColumnRead-1]; if (lastColumnLen != -1) Buffer.ReadPosition += lastColumnLen; var len = Buffer.ReadInt32(); _columns.Add((Buffer.ReadPosition, len)); } (Buffer.ReadPosition, ColumnLen) = _columns[column]; _column = column; PosInColumn = 0; } internal override Task SeekToColumn(int column, bool async) { SeekToColumn(column); return PGUtil.CompletedTask; } internal override Task SeekInColumn(int posInColumn, bool async) { if (posInColumn > ColumnLen) posInColumn = ColumnLen; Buffer.ReadPosition = _columns[_column].Offset + posInColumn; PosInColumn = posInColumn; return PGUtil.CompletedTask; } } }