Skip to content

Commit ea5a2e1

Browse files
authored
Improve output parameter handling (#5645)
Closes #2252
1 parent 82a849f commit ea5a2e1

5 files changed

Lines changed: 194 additions & 53 deletions

File tree

src/Npgsql/NpgsqlBatchCommand.cs

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
using System;
2+
using System.Buffers;
23
using System.Collections.Generic;
34
using System.Data;
45
using System.Data.Common;
56
using System.Diagnostics;
67
using System.Diagnostics.CodeAnalysis;
78
using System.Runtime.CompilerServices;
9+
using Microsoft.Extensions.Logging;
810
using Npgsql.BackendMessages;
911
using Npgsql.Internal;
1012

@@ -41,6 +43,7 @@ public override string CommandText
4143
/// <inheritdoc cref="DbBatchCommand.Parameters"/>
4244
public new NpgsqlParameterCollection Parameters => _parameters ??= [];
4345

46+
internal bool HasOutputParameters => _parameters?.HasOutputParameters == true;
4447

4548
/// <inheritdoc/>
4649
public override NpgsqlParameter CreateParameter() => new();
@@ -273,6 +276,112 @@ internal void ApplyCommandComplete(CommandCompleteMessage msg)
273276

274277
internal void ResetPreparation() => ConnectorPreparedOn = null;
275278

279+
internal void PopulateOutputParameters(NpgsqlDataReader reader, ILogger logger)
280+
{
281+
Debug.Assert(_parameters is not null);
282+
var parameters = _parameters;
283+
var fieldCount = reader.FieldCount;
284+
switch (parameters.PlaceholderType)
285+
{
286+
case PlaceholderType.Mixed:
287+
case PlaceholderType.Named:
288+
{
289+
// In the case of named and mixed parameters we first try to populate all parameters with a named column match.
290+
// For backwards compat we allow populating named parameters as long as they haven't been filled yet.
291+
// So for every column that we couldn't match by name we fill the first output direction parameter that wasn't filled previously.
292+
// This means a row like {"a" => 1, "some_field" => 2} will populate the following output db params {"a" => 1, "b" => 2}.
293+
// And a row like {"some_field" => 1, "a" => 2} will populate them as follows {"a" => 2, "b" => 1}.
294+
295+
var parameterIndices = new ArraySegment<int>(ArrayPool<int>.Shared.Rent(fieldCount), 0, fieldCount);
296+
var secondPassOrdinal = -1;
297+
for (var ordinal = 0; ordinal < fieldCount; ordinal++)
298+
{
299+
var name = reader.GetName(ordinal);
300+
var i = parameters.IndexOf(name);
301+
if (i is not -1 && parameters[i] is { IsOutputDirection: true } parameter)
302+
{
303+
SetValue(reader, logger, parameter, ordinal, i);
304+
parameterIndices[ordinal] = i;
305+
}
306+
else
307+
{
308+
parameterIndices[ordinal] = -1;
309+
if (secondPassOrdinal is -1)
310+
secondPassOrdinal = ordinal;
311+
}
312+
}
313+
314+
if (secondPassOrdinal is -1)
315+
{
316+
ArrayPool<int>.Shared.Return(parameterIndices.Array!);
317+
break;
318+
}
319+
320+
// This set will also contain -1, but that's not a valid index so we can ignore it is included.
321+
var matchedParameters = new HashSet<int>(parameterIndices);
322+
var parameterList = parameters.InternalList;
323+
for (var i = 0; i < parameterList.Count; i++)
324+
{
325+
// Find an output parameter that wasn't matched by name.
326+
if (parameterList[i] is not { IsOutputDirection: true } parameter || matchedParameters.Contains(i))
327+
continue;
328+
329+
SetValue(reader, logger, parameter, secondPassOrdinal, i);
330+
331+
// And find the next unhandled ordinal.
332+
secondPassOrdinal = NextSecondPassOrdinal(parameterIndices, secondPassOrdinal);
333+
if (secondPassOrdinal is -1)
334+
break;
335+
}
336+
337+
ArrayPool<int>.Shared.Return(parameterIndices.Array!);
338+
break;
339+
340+
static int NextSecondPassOrdinal(ArraySegment<int> indices, int offset)
341+
{
342+
for (var i = offset + 1; i < indices.Count; i++)
343+
{
344+
if (indices[i] is -1)
345+
return i;
346+
}
347+
348+
return -1;
349+
}
350+
}
351+
case PlaceholderType.Positional:
352+
{
353+
var parameterList = parameters.InternalList;
354+
var ordinal = 0;
355+
for (var i = 0; i < parameterList.Count; i++)
356+
{
357+
if (parameterList[i] is not { IsOutputDirection: true } parameter)
358+
continue;
359+
360+
SetValue(reader, logger, parameter, ordinal, i);
361+
362+
ordinal++;
363+
if (ordinal == fieldCount)
364+
break;
365+
}
366+
break;
367+
}
368+
}
369+
370+
static void SetValue(NpgsqlDataReader reader, ILogger logger, NpgsqlParameter p, int ordinal, int index)
371+
{
372+
try
373+
{
374+
p.SetOutputValue(reader, ordinal);
375+
}
376+
catch (Exception ex)
377+
{
378+
logger.LogDebug(ex, "Failed to set value on output parameter instance '{ParameterNameOrIndex}' for output parameter {OutputName}",
379+
p.ParameterName is NpgsqlParameter.PositionalName ? index : p.ParameterName, reader.GetName(ordinal));
380+
throw;
381+
}
382+
}
383+
}
384+
276385
/// <summary>
277386
/// Returns the <see cref="CommandText"/>.
278387
/// </summary>

src/Npgsql/NpgsqlDataReader.cs

Lines changed: 29 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -459,18 +459,44 @@ async Task<bool> NextResult(bool async, bool isConsuming = false, CancellationTo
459459
continue;
460460
}
461461

462-
if ((Command.WrappingBatch is not null || StatementIndex is 0) && Command.InternalBatchCommands[StatementIndex]._parameters?.HasOutputParameters == true)
462+
if ((Command.WrappingBatch is not null || StatementIndex is 0) && Command.InternalBatchCommands[StatementIndex] is { HasOutputParameters: true } command)
463463
{
464464
// If output parameters are present and this is the first row of the resultset,
465465
// we must always read it in non-sequential mode because it will be traversed twice (once
466466
// here for the parameters, then as a regular row).
467-
msg = await Connector.ReadMessage(async).ConfigureAwait(false);
467+
msg = await Connector.ReadMessage(async, dataRowLoadingMode: DataRowLoadingMode.NonSequential).ConfigureAwait(false);
468468
ProcessMessage(msg);
469469
if (msg.Code == BackendMessageCode.DataRow)
470470
{
471+
Debug.Assert(RowDescription != null);
472+
Debug.Assert(State == ReaderState.BeforeResult);
473+
471474
try
472475
{
473-
PopulateOutputParameters(Command.InternalBatchCommands[StatementIndex]._parameters!);
476+
// Temporarily set our state to InResult and non-sequential to allow us to read the values, and in any order.
477+
var isSequential = _isSequential;
478+
var currentPosition = Buffer.ReadPosition;
479+
State = ReaderState.InResult;
480+
_isSequential = false;
481+
try
482+
{
483+
command.PopulateOutputParameters(this, _commandLogger);
484+
485+
// On success we want to revert any row and column state for the user to be able to read the same row again.
486+
if (async)
487+
await PgReader.CommitAsync().ConfigureAwait(false);
488+
else
489+
PgReader.Commit();
490+
491+
State = ReaderState.BeforeResult; // Set the state back
492+
Buffer.ReadPosition = currentPosition; // Restore position
493+
_column = -1;
494+
}
495+
finally
496+
{
497+
// To be on the safe side we always revert this CommandBehavior state change, including on failure.
498+
_isSequential = isSequential;
499+
}
474500
}
475501
catch (Exception e)
476502
{
@@ -612,53 +638,6 @@ async ValueTask ConsumeResultSet(bool async)
612638
}
613639
}
614640

615-
616-
void PopulateOutputParameters(NpgsqlParameterCollection parameters)
617-
{
618-
// The first row in a stored procedure command that has output parameters needs to be traversed twice -
619-
// once for populating the output parameters and once for the actual result set traversal. So in this
620-
// case we can't be sequential.
621-
Debug.Assert(RowDescription != null);
622-
Debug.Assert(State == ReaderState.BeforeResult);
623-
624-
var currentPosition = Buffer.ReadPosition;
625-
626-
// Temporarily set our state to InResult to allow us to read the values
627-
State = ReaderState.InResult;
628-
629-
var pending = new Queue<object>();
630-
var taken = new List<NpgsqlParameter>();
631-
for (var i = 0; i < ColumnCount; i++)
632-
{
633-
if (parameters.TryGetValue(GetName(i), out var p) && p.IsOutputDirection)
634-
{
635-
p.Value = GetValue(i);
636-
taken.Add(p);
637-
}
638-
else
639-
pending.Enqueue(GetValue(i));
640-
}
641-
642-
// Not sure where this odd behavior comes from: all output parameters which did not get matched by
643-
// name now get populated with column values which weren't matched. Keeping this for backwards compat,
644-
// opened #2252 for investigation.
645-
foreach (var p in (IEnumerable<NpgsqlParameter>)parameters)
646-
{
647-
if (!p.IsOutputDirection || taken.Contains(p))
648-
continue;
649-
650-
if (pending.Count == 0)
651-
break;
652-
p.Value = pending.Dequeue();
653-
}
654-
655-
PgReader.Commit();
656-
State = ReaderState.BeforeResult; // Set the state back
657-
Buffer.ReadPosition = currentPosition; // Restore position
658-
659-
_column = -1;
660-
}
661-
662641
/// <summary>
663642
/// Note that in SchemaOnly mode there are no resultsets, and we read nothing from the backend (all
664643
/// RowDescriptions have already been processed and are available)

src/Npgsql/NpgsqlParameter.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -497,6 +497,16 @@ public sealed override string SourceColumn
497497

498498
Type? GetValueType(Type staticValueType) => staticValueType != typeof(object) ? staticValueType : Value?.GetType();
499499

500+
internal void SetOutputValue(NpgsqlDataReader reader, int ordinal)
501+
{
502+
if (GetType() == typeof(NpgsqlParameter))
503+
Value = reader.GetValue(ordinal);
504+
else
505+
SetOutputValueCore(reader, ordinal);
506+
}
507+
508+
private protected virtual void SetOutputValueCore(NpgsqlDataReader reader, int ordinal) {}
509+
500510
internal bool ShouldResetObjectTypeInfo(object? value)
501511
{
502512
var currentType = TypeInfo?.Type;

src/Npgsql/NpgsqlParameter`.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,9 @@ public NpgsqlParameter(string parameterName, DbType dbType)
8181

8282
#endregion Constructors
8383

84+
private protected override void SetOutputValueCore(NpgsqlDataReader reader, int ordinal)
85+
=> TypedValue = reader.GetFieldValue<T>(ordinal);
86+
8487
private protected override PgConverterResolution ResolveConverter(PgTypeInfo typeInfo)
8588
{
8689
if (typeof(T) == typeof(object) || TypeInfo!.IsBoxing)

test/Npgsql.Tests/CommandTests.cs

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -741,14 +741,14 @@ public async Task Cached_command_clears_parameters_placeholder_type()
741741
public async Task Statement_mapped_output_parameters(CommandBehavior behavior)
742742
{
743743
await using var conn = await OpenConnectionAsync();
744-
var command = new NpgsqlCommand("select 3, 4 as param1, 5 as param2, 6;", conn);
744+
var command = new NpgsqlCommand("select 3 as unknown, 4 as param1, 5 as param2, 6;", conn);
745745

746-
var p = new NpgsqlParameter("param2", NpgsqlDbType.Integer);
746+
var p = new NpgsqlParameter("param1", NpgsqlDbType.Integer);
747747
p.Direction = ParameterDirection.Output;
748748
p.Value = -1;
749749
command.Parameters.Add(p);
750750

751-
p = new NpgsqlParameter("param1", NpgsqlDbType.Integer);
751+
p = new NpgsqlParameter("param2", NpgsqlDbType.Integer);
752752
p.Direction = ParameterDirection.Output;
753753
p.Value = -1;
754754
command.Parameters.Add(p);
@@ -760,6 +760,7 @@ public async Task Statement_mapped_output_parameters(CommandBehavior behavior)
760760

761761
await using var reader = await command.ExecuteReaderAsync(behavior);
762762

763+
Assert.That(command.Parameters["p"].Value, Is.EqualTo(3));
763764
Assert.That(command.Parameters["param1"].Value, Is.EqualTo(4));
764765
Assert.That(command.Parameters["param2"].Value, Is.EqualTo(5));
765766

@@ -771,6 +772,45 @@ public async Task Statement_mapped_output_parameters(CommandBehavior behavior)
771772
Assert.That(reader.GetInt32(3), Is.EqualTo(6));
772773
}
773774

775+
776+
[Test]
777+
[TestCase(CommandBehavior.Default)]
778+
[TestCase(CommandBehavior.SequentialAccess)]
779+
public async Task Statement_mapped_generic_output_parameters(CommandBehavior behavior)
780+
{
781+
await using var conn = await OpenConnectionAsync();
782+
var command = new NpgsqlCommand("select '' as unknown, 4 as param1, 5 as param2, 6;", conn);
783+
784+
var p = new NpgsqlParameter<int>("param1", NpgsqlDbType.Integer);
785+
p.Direction = ParameterDirection.Output;
786+
p.Value = -1;
787+
command.Parameters.Add(p);
788+
789+
p = new NpgsqlParameter<int>("param2", NpgsqlDbType.Integer);
790+
p.Direction = ParameterDirection.Output;
791+
p.Value = -1;
792+
command.Parameters.Add(p);
793+
794+
// char[] is one alternative mapping for text.
795+
var textP = new NpgsqlParameter<char[]>("p", NpgsqlDbType.Text);
796+
textP.Direction = ParameterDirection.Output;
797+
textP.Value = "text".ToCharArray();
798+
command.Parameters.Add(textP);
799+
800+
await using var reader = await command.ExecuteReaderAsync(behavior);
801+
802+
Assert.That(command.Parameters["p"].Value, Is.EquivalentTo(Array.Empty<char>()));
803+
Assert.That(command.Parameters["param1"].Value, Is.EqualTo(4));
804+
Assert.That(command.Parameters["param2"].Value, Is.EqualTo(5));
805+
806+
reader.Read();
807+
808+
Assert.That(reader.GetFieldValue<char[]>(0), Is.EquivalentTo(Array.Empty<char>()));
809+
Assert.That(reader.GetInt32(1), Is.EqualTo(4));
810+
Assert.That(reader.GetInt32(2), Is.EqualTo(5));
811+
Assert.That(reader.GetInt32(3), Is.EqualTo(6));
812+
}
813+
774814
[Test]
775815
public async Task Bug1006158_output_parameters()
776816
{

0 commit comments

Comments
 (0)