using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Data.Common;
using System.Diagnostics;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Globalization;
using Npgsql.BackendMessages;
using Npgsql.Logging;
using Npgsql.TypeMapping;
using Npgsql.Util;
using NpgsqlTypes;
using static Npgsql.Util.Statics;
using System.Collections;
using System.Diagnostics.CodeAnalysis;
using Npgsql.Internal;
namespace Npgsql
{
///
/// Represents a SQL statement or function (stored procedure) to execute
/// against a PostgreSQL database. This class cannot be inherited.
///
// ReSharper disable once RedundantNameQualifier
[System.ComponentModel.DesignerCategory("")]
public sealed class NpgsqlCommand : DbCommand, ICloneable, IComponent
{
#region Fields
NpgsqlConnection? _connection;
readonly NpgsqlConnector? _connector;
///
/// If this command is (explicitly) prepared, references the connector on which the preparation happened.
/// Used to detect when the connector was changed (i.e. connection open/close), meaning that the command
/// is no longer prepared.
///
NpgsqlConnector? _connectorPreparedOn;
string _commandText;
CommandBehavior _behavior;
int? _timeout;
readonly NpgsqlParameterCollection _parameters;
internal readonly List _statements;
///
/// Returns details about each statement that this command has executed.
/// Is only populated when an Execute* method is called.
///
public IReadOnlyList Statements => _statements.AsReadOnly();
UpdateRowSource _updateRowSource = UpdateRowSource.Both;
bool IsExplicitlyPrepared => _connectorPreparedOn != null;
///
/// Whether this command is cached by and returned by .
///
bool _isCached;
static readonly List EmptyParameters = new();
static readonly SingleThreadSynchronizationContext SingleThreadSynchronizationContext = new("NpgsqlRemainingAsyncSendWorker");
static readonly NpgsqlLogger Log = NpgsqlLogManager.CreateLogger(nameof(NpgsqlCommand));
#endregion Fields
#region Constants
internal const int DefaultTimeout = 30;
#endregion
#region Constructors
///
/// Initializes a new instance of the class.
///
public NpgsqlCommand() : this(null, null, null) {}
///
/// Initializes a new instance of the class with the text of the query.
///
/// The text of the query.
// ReSharper disable once IntroduceOptionalParameters.Global
public NpgsqlCommand(string? cmdText) : this(cmdText, null, null) {}
///
/// Initializes a new instance of the class with the text of the query and a
/// .
///
/// The text of the query.
/// A that represents the connection to a PostgreSQL server.
// ReSharper disable once IntroduceOptionalParameters.Global
public NpgsqlCommand(string? cmdText, NpgsqlConnection? connection) : this(cmdText, connection, null) {}
///
/// Initializes a new instance of the class with the text of the query, a
/// , and the .
///
/// The text of the query.
/// A that represents the connection to a PostgreSQL server.
/// The in which the executes.
public NpgsqlCommand(string? cmdText, NpgsqlConnection? connection, NpgsqlTransaction? transaction)
{
GC.SuppressFinalize(this);
_statements = new List(1);
_parameters = new NpgsqlParameterCollection();
_commandText = cmdText ?? string.Empty;
_connection = connection;
Transaction = transaction;
CommandType = CommandType.Text;
}
internal NpgsqlCommand(string? cmdText, NpgsqlConnector connector) : this(cmdText) => _connector = connector;
internal static NpgsqlCommand CreateCachedCommand(NpgsqlConnection connection)
=> new(null, connection) { _isCached = true };
#endregion Constructors
#region Public properties
///
/// Gets or sets the SQL statement or function (stored procedure) to execute at the data source.
///
/// The Transact-SQL statement or stored procedure to execute. The default is an empty string.
[AllowNull, DefaultValue("")]
[Category("Data")]
public override string CommandText
{
get => _commandText;
set
{
_commandText = State == CommandState.Idle
? value ?? string.Empty
: throw new InvalidOperationException("An open data reader exists for this command.");
ResetExplicitPreparation();
// TODO: Technically should do this also if the parameter list (or type) changes
}
}
///
/// Gets or sets the wait time (in seconds) before terminating the attempt to execute a command and generating an error.
///
/// The time (in seconds) to wait for the command to execute. The default value is 30 seconds.
[DefaultValue(DefaultTimeout)]
public override int CommandTimeout
{
get => _timeout ?? (_connection?.CommandTimeout ?? DefaultTimeout);
set
{
if (value < 0) {
throw new ArgumentOutOfRangeException(nameof(value), value, "CommandTimeout can't be less than zero.");
}
_timeout = value;
}
}
///
/// Gets or sets a value indicating how the property is to be interpreted.
///
///
/// One of the values. The default is .
///
[DefaultValue(CommandType.Text)]
[Category("Data")]
public override CommandType CommandType { get; set; }
///
/// DB connection.
///
protected override DbConnection? DbConnection
{
get => _connection;
set => _connection = (NpgsqlConnection?)value;
}
///
/// Gets or sets the used by this instance of the .
///
/// The connection to a data source. The default value is .
[DefaultValue(null)]
[Category("Behavior")]
public new NpgsqlConnection? Connection
{
get => _connection;
set
{
if (_connection == value)
return;
_connection = State == CommandState.Idle
? value
: throw new InvalidOperationException("An open data reader exists for this command.");
Transaction = null;
}
}
///
/// Design time visible.
///
public override bool DesignTimeVisible { get; set; }
///
/// Gets or sets how command results are applied to the DataRow when used by the
/// DbDataAdapter.Update(DataSet) method.
///
/// One of the values.
[Category("Behavior"), DefaultValue(UpdateRowSource.Both)]
public override UpdateRowSource UpdatedRowSource
{
get => _updateRowSource;
set
{
switch (value)
{
// validate value (required based on base type contract)
case UpdateRowSource.None:
case UpdateRowSource.OutputParameters:
case UpdateRowSource.FirstReturnedRecord:
case UpdateRowSource.Both:
_updateRowSource = value;
break;
default:
throw new ArgumentOutOfRangeException();
}
}
}
///
/// Returns whether this query will execute as a prepared (compiled) query.
///
public bool IsPrepared =>
_connectorPreparedOn == (_connection?.Connector ?? _connector) &&
_statements.Any() && _statements.All(s => s.PreparedStatement?.IsPrepared == true);
#endregion Public properties
#region Known/unknown Result Types Management
///
/// Marks all of the query's result columns as either known or unknown.
/// Unknown results column are requested them from PostgreSQL in text format, and Npgsql makes no
/// attempt to parse them. They will be accessible as strings only.
///
public bool AllResultTypesAreUnknown
{
get => _allResultTypesAreUnknown;
set
{
// TODO: Check that this isn't modified after calling prepare
_unknownResultTypeList = null;
_allResultTypesAreUnknown = value;
}
}
bool _allResultTypesAreUnknown;
///
/// Marks the query's result columns as known or unknown, on a column-by-column basis.
/// Unknown results column are requested them from PostgreSQL in text format, and Npgsql makes no
/// attempt to parse them. They will be accessible as strings only.
///
///
/// If the query includes several queries (e.g. SELECT 1; SELECT 2), this will only apply to the first
/// one. The rest of the queries will be fetched and parsed as usual.
///
/// The array size must correspond exactly to the number of result columns the query returns, or an
/// error will be raised.
///
public bool[]? UnknownResultTypeList
{
get => _unknownResultTypeList;
set
{
// TODO: Check that this isn't modified after calling prepare
_allResultTypesAreUnknown = false;
_unknownResultTypeList = value;
}
}
bool[]? _unknownResultTypeList;
#endregion
#region Result Types Management
///
/// Marks result types to be used when using GetValue on a data reader, on a column-by-column basis.
/// Used for Entity Framework 5-6 compability.
/// Only primitive numerical types and DateTimeOffset are supported.
/// Set the whole array or just a value to null to use default type.
///
internal Type[]? ObjectResultTypes { get; set; }
#endregion
#region State management
volatile int _state;
///
/// The current state of the command
///
internal CommandState State
{
get => (CommandState)_state;
set
{
var newState = (int)value;
if (newState == _state)
return;
_state = newState;
}
}
void ResetExplicitPreparation() => _connectorPreparedOn = null;
#endregion State management
#region Parameters
///
/// Creates a new instance of an object.
///
/// A object.
protected override DbParameter CreateDbParameter() => CreateParameter();
///
/// Creates a new instance of a object.
///
/// An object.
public new NpgsqlParameter CreateParameter() => new();
///
/// DB parameter collection.
///
protected override DbParameterCollection DbParameterCollection => Parameters;
///
/// Gets the .
///
/// The parameters of the SQL statement or function (stored procedure). The default is an empty collection.
public new NpgsqlParameterCollection Parameters => _parameters;
#endregion
#region DeriveParameters
const string DeriveParametersForFunctionQuery = @"
SELECT
CASE
WHEN pg_proc.proargnames IS NULL THEN array_cat(array_fill(''::name,ARRAY[pg_proc.pronargs]),array_agg(pg_attribute.attname ORDER BY pg_attribute.attnum))
ELSE pg_proc.proargnames
END AS proargnames,
pg_proc.proargtypes,
CASE
WHEN pg_proc.proallargtypes IS NULL AND (array_agg(pg_attribute.atttypid))[1] IS NOT NULL THEN array_cat(string_to_array(pg_proc.proargtypes::text,' ')::oid[],array_agg(pg_attribute.atttypid ORDER BY pg_attribute.attnum))
ELSE pg_proc.proallargtypes
END AS proallargtypes,
CASE
WHEN pg_proc.proargmodes IS NULL AND (array_agg(pg_attribute.atttypid))[1] IS NOT NULL THEN array_cat(array_fill('i'::""char"",ARRAY[pg_proc.pronargs]),array_fill('o'::""char"",ARRAY[array_length(array_agg(pg_attribute.atttypid), 1)]))
ELSE pg_proc.proargmodes
END AS proargmodes
FROM pg_proc
LEFT JOIN pg_type ON pg_proc.prorettype = pg_type.oid
LEFT JOIN pg_attribute ON pg_type.typrelid = pg_attribute.attrelid AND pg_attribute.attnum >= 1 AND NOT pg_attribute.attisdropped
WHERE pg_proc.oid = :proname::regproc
GROUP BY pg_proc.proargnames, pg_proc.proargtypes, pg_proc.proallargtypes, pg_proc.proargmodes, pg_proc.pronargs;
";
internal void DeriveParameters()
{
var conn = CheckAndGetConnection();
Debug.Assert(conn is not null);
if (string.IsNullOrEmpty(CommandText))
throw new InvalidOperationException("CommandText property has not been initialized");
using var _ = conn.StartTemporaryBindingScope(out var connector);
if (Statements.Any(s => s.PreparedStatement?.IsExplicit == true))
throw new NpgsqlException("Deriving parameters isn't supported for commands that are already prepared.");
// Here we unprepare statements that possibly are auto-prepared
Unprepare();
Parameters.Clear();
switch (CommandType)
{
case CommandType.Text:
DeriveParametersForQuery(connector);
break;
case CommandType.StoredProcedure:
DeriveParametersForFunction();
break;
default:
throw new NotSupportedException("Cannot derive parameters for CommandType " + CommandType);
}
}
void DeriveParametersForFunction()
{
using var c = new NpgsqlCommand(DeriveParametersForFunctionQuery, _connection);
c.Parameters.Add(new NpgsqlParameter("proname", NpgsqlDbType.Text));
c.Parameters[0].Value = CommandText;
string[]? names = null;
uint[]? types = null;
char[]? modes = null;
using (var rdr = c.ExecuteReader(CommandBehavior.SingleRow | CommandBehavior.SingleResult))
{
if (rdr.Read())
{
if (!rdr.IsDBNull(0))
names = rdr.GetFieldValue(0);
if (!rdr.IsDBNull(2))
types = rdr.GetFieldValue(2);
if (!rdr.IsDBNull(3))
modes = rdr.GetFieldValue(3);
if (types == null)
{
if (rdr.IsDBNull(1) || rdr.GetFieldValue(1).Length == 0)
return; // Parameter-less function
types = rdr.GetFieldValue(1);
}
}
else
throw new InvalidOperationException($"{CommandText} does not exist in pg_proc");
}
var typeMapper = c._connection!.Connector!.TypeMapper;
for (var i = 0; i < types.Length; i++)
{
var param = new NpgsqlParameter();
var (npgsqlDbType, postgresType) = typeMapper.GetTypeInfoByOid(types[i]);
param.DataTypeName = postgresType.DisplayName;
param.PostgresType = postgresType;
if (npgsqlDbType.HasValue)
param.NpgsqlDbType = npgsqlDbType.Value;
if (names != null && i < names.Length)
param.ParameterName = names[i];
else
param.ParameterName = "parameter" + (i + 1);
if (modes == null) // All params are IN, or server < 8.1.0 (and only IN is supported)
param.Direction = ParameterDirection.Input;
else
{
param.Direction = modes[i] switch
{
'i' => ParameterDirection.Input,
'o' => ParameterDirection.Output,
't' => ParameterDirection.Output,
'b' => ParameterDirection.InputOutput,
'v' => throw new NotSupportedException("Cannot derive function parameter of type VARIADIC"),
_ => throw new ArgumentOutOfRangeException("Unknown code in proargmodes while deriving: " + modes[i])
};
}
Parameters.Add(param);
}
}
void DeriveParametersForQuery(NpgsqlConnector connector)
{
using (connector.StartUserAction())
{
Log.Debug($"Deriving Parameters for query: {CommandText}", connector.Id);
connector.SqlQueryParser.ParseRawQuery(CommandText, _parameters, _statements, connector.UseConformingStrings, deriveParameters: true);
var sendTask = SendDeriveParameters(connector, false);
if (sendTask.IsFaulted)
sendTask.GetAwaiter().GetResult();
foreach (var statement in _statements)
{
Expect(
connector.ReadMessage(async: false).GetAwaiter().GetResult(), connector);
var paramTypeOIDs = Expect(
connector.ReadMessage(async: false).GetAwaiter().GetResult(), connector).TypeOIDs;
if (statement.InputParameters.Count != paramTypeOIDs.Count)
{
connector.SkipUntil(BackendMessageCode.ReadyForQuery);
Parameters.Clear();
throw new NpgsqlException("There was a mismatch in the number of derived parameters between the Npgsql SQL parser and the PostgreSQL parser. Please report this as bug to the Npgsql developers (https://github.com/npgsql/npgsql/issues).");
}
for (var i = 0; i < paramTypeOIDs.Count; i++)
{
try
{
var param = statement.InputParameters[i];
var paramOid = paramTypeOIDs[i];
var (npgsqlDbType, postgresType) = connector.TypeMapper.GetTypeInfoByOid(paramOid);
if (param.NpgsqlDbType != NpgsqlDbType.Unknown && param.NpgsqlDbType != npgsqlDbType)
throw new NpgsqlException("The backend parser inferred different types for parameters with the same name. Please try explicit casting within your SQL statement or batch or use different placeholder names.");
param.DataTypeName = postgresType.DisplayName;
param.PostgresType = postgresType;
if (npgsqlDbType.HasValue)
param.NpgsqlDbType = npgsqlDbType.Value;
}
catch
{
connector.SkipUntil(BackendMessageCode.ReadyForQuery);
Parameters.Clear();
throw;
}
}
var msg = connector.ReadMessage(async: false).GetAwaiter().GetResult();
switch (msg.Code)
{
case BackendMessageCode.RowDescription:
case BackendMessageCode.NoData:
break;
default:
throw connector.UnexpectedMessageReceived(msg.Code);
}
}
Expect(connector.ReadMessage(async: false).GetAwaiter().GetResult(), connector);
sendTask.GetAwaiter().GetResult();
}
}
#endregion
#region Prepare
///
/// Creates a server-side prepared statement on the PostgreSQL server.
/// This will make repeated future executions of this command much faster.
///
public override void Prepare() => Prepare(false).GetAwaiter().GetResult();
///
/// Creates a server-side prepared statement on the PostgreSQL server.
/// This will make repeated future executions of this command much faster.
///
///
/// An optional token to cancel the asynchronous operation. The default value is .
///
#if NETSTANDARD2_0
public Task PrepareAsync(CancellationToken cancellationToken = default)
#else
public override Task PrepareAsync(CancellationToken cancellationToken = default)
#endif
{
using (NoSynchronizationContextScope.Enter())
return Prepare(true, cancellationToken);
}
Task Prepare(bool async, CancellationToken cancellationToken = default)
{
var connection = CheckAndGetConnection();
Debug.Assert(connection is not null);
if (connection.Settings.Multiplexing)
throw new NotSupportedException("Explicit preparation not supported with multiplexing");
var connector = connection.Connector!;
foreach (var p in Parameters.InternalList)
{
Parameters.CalculatePlaceholderType(p);
p.Bind(connector.TypeMapper);
}
ProcessRawQuery(connector.SqlQueryParser, connector.UseConformingStrings);
if (Log.IsEnabled(NpgsqlLogLevel.Debug))
Log.Debug($"Preparing: {CommandText}", connector.Id);
var needToPrepare = false;
foreach (var statement in _statements)
{
if (statement.IsPrepared)
continue;
statement.PreparedStatement = connector.PreparedStatementManager.GetOrAddExplicit(statement);
if (statement.PreparedStatement?.State == PreparedState.NotPrepared)
{
statement.PreparedStatement.State = PreparedState.BeingPrepared;
statement.IsPreparing = true;
needToPrepare = true;
}
}
_connectorPreparedOn = connector;
// It's possible the command was already prepared, or that persistent prepared statements were found for
// all statements. Nothing to do here, move along.
return needToPrepare
? PrepareLong(this, async, connector, cancellationToken)
: Task.CompletedTask;
static async Task PrepareLong(NpgsqlCommand command, bool async, NpgsqlConnector connector, CancellationToken cancellationToken)
{
try
{
using (connector.StartUserAction(cancellationToken))
{
var sendTask = command.SendPrepare(connector, async, cancellationToken);
if (sendTask.IsFaulted)
sendTask.GetAwaiter().GetResult();
// Loop over statements, skipping those that are already prepared (because they were persisted)
var isFirst = true;
for (var i = 0; i < command._statements.Count; i++)
{
var statement = command._statements[i];
if (!statement.IsPreparing)
continue;
var pStatement = statement.PreparedStatement!;
if (pStatement.StatementBeingReplaced != null)
{
Expect(await connector.ReadMessage(async), connector);
pStatement.StatementBeingReplaced.CompleteUnprepare();
pStatement.StatementBeingReplaced = null;
}
Expect(await connector.ReadMessage(async), connector);
Expect(await connector.ReadMessage(async), connector);
var msg = await connector.ReadMessage(async);
switch (msg.Code)
{
case BackendMessageCode.RowDescription:
// Clone the RowDescription for use with the prepared statement (the one we have is reused
// by the connection)
var description = ((RowDescriptionMessage)msg).Clone();
command.FixupRowDescription(description, isFirst);
statement.Description = description;
break;
case BackendMessageCode.NoData:
statement.Description = null;
break;
default:
throw connector.UnexpectedMessageReceived(msg.Code);
}
statement.IsPreparing = false;
pStatement.CompletePrepare();
isFirst = false;
}
Expect(await connector.ReadMessage(async), connector);
if (async)
await sendTask;
else
sendTask.GetAwaiter().GetResult();
}
}
catch
{
// The statements weren't prepared successfully, update the bookkeeping for them
foreach (var statement in command._statements)
{
if (statement.IsPreparing)
{
statement.IsPreparing = false;
statement.PreparedStatement!.CompleteUnprepare();
}
}
throw;
}
}
}
///
/// Unprepares a command, closing server-side statements associated with it.
/// Note that this only affects commands explicitly prepared with , not
/// automatically prepared statements.
///
public void Unprepare()
=> Unprepare(false).GetAwaiter().GetResult();
///
/// Unprepares a command, closing server-side statements associated with it.
/// Note that this only affects commands explicitly prepared with , not
/// automatically prepared statements.
///
///
/// An optional token to cancel the asynchronous operation. The default value is .
///
public Task UnprepareAsync(CancellationToken cancellationToken = default)
{
using (NoSynchronizationContextScope.Enter())
return Unprepare(true, cancellationToken);
}
async Task Unprepare(bool async, CancellationToken cancellationToken = default)
{
var connection = CheckAndGetConnection();
Debug.Assert(connection is not null);
if (connection.Settings.Multiplexing)
throw new NotSupportedException("Explicit preparation not supported with multiplexing");
if (_statements.All(s => !s.IsPrepared))
return;
var connector = connection.Connector!;
Log.Debug("Closing command's prepared statements", connector.Id);
using (connector.StartUserAction(cancellationToken))
{
var sendTask = SendClose(connector, async, cancellationToken);
if (sendTask.IsFaulted)
sendTask.GetAwaiter().GetResult();
foreach (var statement in _statements)
if (statement.PreparedStatement?.State == PreparedState.BeingUnprepared)
{
Expect(await connector.ReadMessage(async), connector);
statement.PreparedStatement.CompleteUnprepare();
statement.PreparedStatement = null;
}
Expect(await connector.ReadMessage(async), connector);
if (async)
await sendTask;
else
sendTask.GetAwaiter().GetResult();
}
}
#endregion Prepare
#region Query analysis
internal void ProcessRawQuery(SqlQueryParser? parser, bool standardConformingStrings)
{
if (string.IsNullOrEmpty(CommandText))
throw new InvalidOperationException("CommandText property has not been initialized");
NpgsqlStatement statement;
switch (CommandType) {
case CommandType.Text:
switch (Parameters.PlaceholderType)
{
case PlaceholderType.Positional:
// In positional parameter mode, we don't need to parse/rewrite the CommandText or reorder the parameters - just use
// them as is. If the SQL contains a semicolon (legacy batching) when positional parameters are in use, we just send
// that and PostgreSQL will error (this behavior is by-design - use the new batching API).
statement = TruncateStatementsToOne();
statement.SQL = CommandText;
statement.InputParameters = Parameters.InternalList;
break;
case PlaceholderType.NoParameters:
// Note that queries with no parameters are parsed just like queries with named parameters, since they may contain a
// semicolon (legacy batching).
case PlaceholderType.Named:
parser ??= new SqlQueryParser();
parser.ParseRawQuery(CommandText, _parameters, _statements, standardConformingStrings);
if (_statements.Count > 1 && _parameters.HasOutputParameters)
throw new NotSupportedException("Commands with multiple queries cannot have out parameters");
break;
case PlaceholderType.Mixed:
throw new NotSupportedException("Mixing named and positional parameters isn't supported");
default:
throw new ArgumentOutOfRangeException(
nameof(PlaceholderType), $"Unknown {nameof(PlaceholderType)} value: {Parameters.PlaceholderType}");
}
break;
case CommandType.TableDirect:
statement = TruncateStatementsToOne();
statement.SQL = "SELECT * FROM " + CommandText;
break;
case CommandType.StoredProcedure:
var inputList = _parameters.Where(p => p.IsInputDirection).ToList();
var numInput = inputList.Count;
var sb = new StringBuilder();
sb.Append("SELECT * FROM ");
sb.Append(CommandText);
sb.Append('(');
var hasWrittenFirst = false;
for (var i = 1; i <= numInput; i++) {
var param = inputList[i - 1];
if (param.IsPositional)
{
if (hasWrittenFirst)
sb.Append(',');
sb.Append('$');
sb.Append(i);
hasWrittenFirst = true;
}
}
for (var i = 1; i <= numInput; i++)
{
var param = inputList[i - 1];
if (!param.IsPositional)
{
if (hasWrittenFirst)
sb.Append(',');
sb.Append('"');
sb.Append(param.TrimmedName.Replace("\"", "\"\""));
sb.Append("\" := ");
sb.Append('$');
sb.Append(i);
hasWrittenFirst = true;
}
}
sb.Append(')');
statement = TruncateStatementsToOne();
statement.SQL = sb.ToString();
statement.InputParameters.AddRange(inputList);
break;
default:
throw new InvalidOperationException($"Internal Npgsql bug: unexpected value {CommandType} of enum {nameof(CommandType)}. Please file a bug.");
}
foreach (var s in _statements)
if (s.InputParameters.Count > ushort.MaxValue)
throw new NpgsqlException($"A statement cannot have more than {ushort.MaxValue} parameters");
}
#endregion
#region Execute
void ValidateParameters(ConnectorTypeMapper typeMapper)
{
var hasOutputParameters = false;
foreach (var p in Parameters.InternalList)
{
Parameters.CalculatePlaceholderType(p);
switch (p.Direction)
{
case ParameterDirection.Input:
break;
case ParameterDirection.InputOutput:
if (Parameters.PlaceholderType == PlaceholderType.Positional)
throw new NotSupportedException("Output parameters are not supported in positional mode");
hasOutputParameters = true;
break;
case ParameterDirection.Output:
if (Parameters.PlaceholderType == PlaceholderType.Positional)
throw new NotSupportedException("Output parameters are not supported in positional mode");
hasOutputParameters = true;
continue;
case ParameterDirection.ReturnValue:
continue;
default:
throw new ArgumentOutOfRangeException(nameof(ParameterDirection),
$"Unhandled {nameof(ParameterDirection)} value: {p.Direction}");
}
p.Bind(typeMapper);
p.LengthCache?.Clear();
p.ValidateAndGetLength();
}
Parameters.HasOutputParameters = hasOutputParameters;
}
#endregion
#region Message Creation / Population
void BeginSend(NpgsqlConnector connector)
=> connector.WriteBuffer.Timeout = TimeSpan.FromSeconds(CommandTimeout);
internal Task Write(NpgsqlConnector connector, bool async, bool flush, CancellationToken cancellationToken = default)
{
return (_behavior & CommandBehavior.SchemaOnly) == 0
? WriteExecute(connector, async, flush, cancellationToken)
: WriteExecuteSchemaOnly(connector, async, flush, cancellationToken);
async Task WriteExecute(NpgsqlConnector connector, bool async, bool flush, CancellationToken cancellationToken)
{
var syncOverAsync = false;
for (var i = 0; i < _statements.Count; i++)
{
// The following is only for deadlock avoidance when doing sync I/O (so never in multiplexing)
SyncOverAsyncIfNecessary(ref async, ref syncOverAsync, i);
var statement = _statements[i];
var pStatement = statement.PreparedStatement;
if (pStatement == null || statement.IsPreparing)
{
// The statement should either execute unprepared, or is being auto-prepared.
// Send Parse, Bind, Describe
// We may have a prepared statement that replaces an existing statement - close the latter first.
if (pStatement?.StatementBeingReplaced != null)
{
await connector.WriteClose(StatementOrPortal.Statement, pStatement.StatementBeingReplaced.Name!, async, syncOverAsync, cancellationToken)
.ConfigureAwait(syncOverAsync);
}
await connector.WriteParse(statement.SQL, statement.StatementName, statement.InputParameters, async, syncOverAsync, cancellationToken)
.ConfigureAwait(syncOverAsync);
await connector.WriteBind(
statement.InputParameters, string.Empty, statement.StatementName, AllResultTypesAreUnknown,
i == 0 ? UnknownResultTypeList : null,
async,
syncOverAsync,
cancellationToken)
.ConfigureAwait(syncOverAsync);
await connector.WriteDescribe(StatementOrPortal.Portal, string.Empty, async, syncOverAsync, cancellationToken)
.ConfigureAwait(syncOverAsync);
}
else
{
// The statement is already prepared, only a Bind is needed
await connector.WriteBind(
statement.InputParameters, string.Empty, statement.StatementName, AllResultTypesAreUnknown,
i == 0 ? UnknownResultTypeList : null,
async, syncOverAsync, cancellationToken)
.ConfigureAwait(syncOverAsync);
}
await connector.WriteExecute(0, async, syncOverAsync, cancellationToken).ConfigureAwait(syncOverAsync);
if (pStatement != null)
pStatement.LastUsed = DateTime.UtcNow;
}
await connector.WriteSync(async, syncOverAsync, cancellationToken).ConfigureAwait(syncOverAsync);
if (flush)
await connector.Flush(async, syncOverAsync, cancellationToken).ConfigureAwait(syncOverAsync);
}
async Task WriteExecuteSchemaOnly(NpgsqlConnector connector, bool async, bool flush, CancellationToken cancellationToken)
{
var syncOverAsync = false;
var wroteSomething = false;
for (var i = 0; i < _statements.Count; i++)
{
SyncOverAsyncIfNecessary(ref async, ref syncOverAsync, i);
var statement = _statements[i];
if (statement.PreparedStatement?.State == PreparedState.Prepared)
continue; // Prepared, we already have the RowDescription
Debug.Assert(statement.PreparedStatement == null);
await connector.WriteParse(statement.SQL, string.Empty, statement.InputParameters, async, syncOverAsync, cancellationToken)
.ConfigureAwait(syncOverAsync);
await connector.WriteDescribe(StatementOrPortal.Statement, statement.StatementName, async, syncOverAsync, cancellationToken)
.ConfigureAwait(syncOverAsync);
wroteSomething = true;
}
if (wroteSomething)
{
await connector.WriteSync(async, syncOverAsync, cancellationToken).ConfigureAwait(syncOverAsync);
if (flush)
await connector.Flush(async, syncOverAsync, cancellationToken).ConfigureAwait(syncOverAsync);
}
}
}
async Task SendDeriveParameters(NpgsqlConnector connector, bool async, CancellationToken cancellationToken = default)
{
var syncOverAsync = false;
BeginSend(connector);
for (var i = 0; i < _statements.Count; i++)
{
SyncOverAsyncIfNecessary(ref async, ref syncOverAsync, i);
var statement = _statements[i];
await connector.WriteParse(statement.SQL, string.Empty, EmptyParameters, async, syncOverAsync, cancellationToken)
.ConfigureAwait(syncOverAsync);
await connector.WriteDescribe(StatementOrPortal.Statement, string.Empty, async, syncOverAsync, cancellationToken)
.ConfigureAwait(syncOverAsync);
}
await connector.WriteSync(async, syncOverAsync, cancellationToken).ConfigureAwait(syncOverAsync);
await connector.Flush(async, syncOverAsync, cancellationToken).ConfigureAwait(syncOverAsync);
}
async Task SendPrepare(NpgsqlConnector connector, bool async, CancellationToken cancellationToken = default)
{
var syncOverAsync = false;
BeginSend(connector);
for (var i = 0; i < _statements.Count; i++)
{
SyncOverAsyncIfNecessary(ref async, ref syncOverAsync, i);
var statement = _statements[i];
var pStatement = statement.PreparedStatement;
// A statement may be already prepared, already in preparation (i.e. same statement twice
// in the same command), or we can't prepare (overloaded SQL)
if (!statement.IsPreparing)
continue;
// We may have a prepared statement that replaces an existing statement - close the latter first.
var statementToClose = pStatement!.StatementBeingReplaced;
if (statementToClose != null)
await connector.WriteClose(StatementOrPortal.Statement, statementToClose.Name!, async, syncOverAsync, cancellationToken)
.ConfigureAwait(syncOverAsync);
await connector.WriteParse(statement.SQL, pStatement.Name!, statement.InputParameters, async, syncOverAsync, cancellationToken)
.ConfigureAwait(syncOverAsync);
await connector.WriteDescribe(StatementOrPortal.Statement, pStatement.Name!, async, syncOverAsync, cancellationToken)
.ConfigureAwait(syncOverAsync);
}
await connector.WriteSync(async, syncOverAsync, cancellationToken).ConfigureAwait(syncOverAsync);
await connector.Flush(async, syncOverAsync, cancellationToken).ConfigureAwait(syncOverAsync);
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
void SyncOverAsyncIfNecessary(ref bool async, ref bool syncOverAsync, int numberOfStatementInBatch)
{
if (!async && numberOfStatementInBatch > 0)
{
// We're synchronously sending the non-first statement in a batch - switch to async writing.
// See long comment in Execute() above.
// TODO: we can simply do all batch writing asynchronously, instead of starting with the 2nd statement.
// For now, writing the first statement synchronously gives us a better chance of handle and bubbling up errors correctly
// (see sendTask.IsFaulted in Execute()). Once #1323 is done, that shouldn't be needed any more and entire batches should
// be written asynchronously.
async = true;
syncOverAsync = true;
SynchronizationContext.SetSynchronizationContext(SingleThreadSynchronizationContext);
}
}
async Task SendClose(NpgsqlConnector connector, bool async, CancellationToken cancellationToken = default)
{
var syncOverAsync = false;
BeginSend(connector);
var i = 0;
foreach (var statement in _statements.Where(s => s.IsPrepared))
{
SyncOverAsyncIfNecessary(ref async, ref syncOverAsync, i);
await connector.WriteClose(StatementOrPortal.Statement, statement.StatementName, async, syncOverAsync, cancellationToken)
.ConfigureAwait(syncOverAsync);
statement.PreparedStatement!.State = PreparedState.BeingUnprepared;
i++;
}
await connector.WriteSync(async, syncOverAsync, cancellationToken).ConfigureAwait(syncOverAsync);
await connector.Flush(async, syncOverAsync, cancellationToken).ConfigureAwait(syncOverAsync);
}
#endregion
#region Execute Non Query
///
/// Executes a SQL statement against the connection and returns the number of rows affected.
///
/// The number of rows affected if known; -1 otherwise.
public override int ExecuteNonQuery() => ExecuteNonQuery(false, CancellationToken.None).GetAwaiter().GetResult();
///
/// Asynchronous version of
///
///
/// An optional token to cancel the asynchronous operation. The default value is .
///
/// A task representing the asynchronous operation, with the number of rows affected if known; -1 otherwise.
public override Task ExecuteNonQueryAsync(CancellationToken cancellationToken)
{
using (NoSynchronizationContextScope.Enter())
return ExecuteNonQuery(true, cancellationToken);
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
async Task ExecuteNonQuery(bool async, CancellationToken cancellationToken)
{
var reader = await ExecuteReader(CommandBehavior.Default, async, cancellationToken).ConfigureAwait(false);
try
{
while (async ? await reader.NextResultAsync(cancellationToken).ConfigureAwait(false) : reader.NextResult()) ;
return reader.RecordsAffected;
}
finally
{
if (async)
await reader.DisposeAsync().ConfigureAwait(false);
else
reader.Dispose();
}
}
#endregion Execute Non Query
#region Execute Scalar
///
/// Executes the query, and returns the first column of the first row
/// in the result set returned by the query. Extra columns or rows are ignored.
///
/// The first column of the first row in the result set,
/// or a null reference if the result set is empty.
public override object? ExecuteScalar() => ExecuteScalar(false, CancellationToken.None).GetAwaiter().GetResult();
///
/// Asynchronous version of
///
///
/// An optional token to cancel the asynchronous operation. The default value is .
///
/// A task representing the asynchronous operation, with the first column of the
/// first row in the result set, or a null reference if the result set is empty.
public override Task