using System;
using System.Collections.Generic;
using System.Data;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Security;
using System.Net.Sockets;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Runtime.ExceptionServices;
using System.Security.Authentication;
using System.Security.Cryptography.X509Certificates;
using System.Text;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Npgsql.BackendMessages;
using Npgsql.Logging;
using Npgsql.TypeMapping;
using Npgsql.Util;
using static Npgsql.Util.Statics;
namespace Npgsql
{
///
/// Represents a connection to a PostgreSQL backend. Unlike NpgsqlConnection objects, which are
/// exposed to users, connectors are internal to Npgsql and are recycled by the connection pool.
///
sealed partial class NpgsqlConnector : IDisposable
{
#region Fields and Properties
///
/// The physical connection socket to the backend.
///
Socket _socket = default!;
///
/// The physical connection stream to the backend, without anything on top.
///
NetworkStream _baseStream = default!;
///
/// The physical connection stream to the backend, layered with an SSL/TLS stream if in secure mode.
///
Stream _stream = default!;
internal NpgsqlConnectionStringBuilder Settings { get; }
internal string ConnectionString { get; }
ProvideClientCertificatesCallback? ProvideClientCertificatesCallback { get; }
RemoteCertificateValidationCallback? UserCertificateValidationCallback { get; }
ProvidePasswordCallback? ProvidePasswordCallback { get; }
internal Encoding TextEncoding { get; private set; } = default!;
///
/// Same as , except that it does not throw an exception if an invalid char is
/// encountered (exception fallback), but rather replaces it with a question mark character (replacement
/// fallback).
///
internal Encoding RelaxedTextEncoding { get; private set; } = default!;
///
/// Buffer used for reading data.
///
internal NpgsqlReadBuffer ReadBuffer { get; private set; } = default!;
///
/// If we read a data row that's bigger than , we allocate an oversize buffer.
/// The original (smaller) buffer is stored here, and restored when the connection is reset.
///
NpgsqlReadBuffer? _origReadBuffer;
///
/// Buffer used for writing data.
///
internal NpgsqlWriteBuffer WriteBuffer { get; private set; } = default!;
///
/// The secret key of the backend for this connector, used for query cancellation.
///
int _backendSecretKey;
///
/// The process ID of the backend for this connector.
///
internal int BackendProcessId { get; private set; }
bool SupportsPostgresCancellation => BackendProcessId != 0;
///
/// A unique ID identifying this connector, used for logging. Currently mapped to BackendProcessId
///
internal int Id => BackendProcessId;
internal NpgsqlDatabaseInfo DatabaseInfo { get; private set; } = default!;
internal ConnectorTypeMapper TypeMapper { get; set; } = default!;
///
/// The current transaction status for this connector.
///
internal TransactionStatus TransactionStatus { get; set; }
///
/// A transaction object for this connector. Since only one transaction can be in progress at any given time,
/// this instance is recycled. To check whether a transaction is currently in progress on this connector,
/// see .
///
internal NpgsqlTransaction? Transaction { get; set; }
internal NpgsqlTransaction? UnboundTransaction { get; set; }
///
/// The NpgsqlConnection that (currently) owns this connector. Null if the connector isn't
/// owned (i.e. idle in the pool)
///
internal NpgsqlConnection? Connection { get; set; }
///
/// The number of messages that were prepended to the current message chain, but not yet sent.
/// Note that this only tracks messages which produce a ReadyForQuery message
///
internal int PendingPrependedResponses { get; set; }
internal NpgsqlDataReader? CurrentReader;
internal PreparedStatementManager PreparedStatementManager;
///
/// If the connector is currently in COPY mode, holds a reference to the importer/exporter object.
/// Otherwise null.
///
internal ICancelable? CurrentCopyOperation;
///
/// Holds all run-time parameters received from the backend (via ParameterStatus messages)
///
internal readonly Dictionary PostgresParameters;
///
/// Holds all run-time parameters in raw, binary format for efficient handling without allocations.
///
readonly List<(byte[] Name, byte[] Value)> _rawParameters = new();
///
/// If this connector was broken, this contains the exception that caused the break.
///
volatile Exception? _breakReason;
///
/// Semaphore, used to synchronize DatabaseInfo between multiple connections, so it wouldn't be loaded in parallel.
///
static readonly SemaphoreSlim DatabaseInfoSemaphore = new(1);
///
///
/// Used by the pool to indicate that I/O is currently in progress on this connector, so that another write
/// isn't started concurrently. Note that since we have only one write loop, this is only ever usedto
/// protect against an over-capacity writes into a connector that's currently *asynchronously* writing.
///
///
/// It is guaranteed that the currently-executing
/// Specifically, reading may occur - and the connector may even be returned to the pool - before this is
/// released.
///
///
internal volatile int MultiplexAsyncWritingLock;
///
internal void FlagAsNotWritableForMultiplexing()
{
if (Settings.Multiplexing)
{
Debug.Assert(CommandsInFlightCount > 0 || IsBroken || IsClosed,
$"About to mark multiplexing connector as non-writable, but {nameof(CommandsInFlightCount)} is {CommandsInFlightCount}");
Interlocked.Exchange(ref MultiplexAsyncWritingLock, 1);
}
}
///
internal void FlagAsWritableForMultiplexing()
{
if (Settings.Multiplexing && Interlocked.CompareExchange(ref MultiplexAsyncWritingLock, 0, 1) != 1)
throw new Exception("Multiplexing lock was not taken when releasing. Please report a bug.");
}
///
/// The timeout for reading messages that are part of the user's command
/// (i.e. which aren't internal prepended commands).
///
/// Precision is milliseconds
internal int UserTimeout { private get; set; }
///
/// A lock that's taken while a user action is in progress, e.g. a command being executed.
/// Only used when keepalive is enabled, otherwise null.
///
SemaphoreSlim? _userLock;
///
/// A lock that's taken while a cancellation is being delivered; new queries are blocked until the
/// cancellation is delivered. This reduces the chance that a cancellation meant for a previous
/// command will accidentally cancel a later one, see #615.
///
internal object CancelLock { get; }
readonly bool _isKeepAliveEnabled;
readonly Timer? _keepAliveTimer;
///
/// The command currently being executed by the connector, null otherwise.
/// Used only for concurrent use error reporting purposes.
///
NpgsqlCommand? _currentCommand;
bool _sendResetOnClose;
ConnectorPool? _pool;
///
/// Contains the UTC timestamp when this connector was opened, used to implement
/// .
///
internal DateTime OpenTimestamp { get; private set; }
internal int ClearCounter { get; set; }
volatile bool _postgresCancellationPerformed;
internal bool PostgresCancellationPerformed
{
get => _postgresCancellationPerformed;
private set => _postgresCancellationPerformed = value;
}
volatile bool _userCancellationRequested;
CancellationTokenRegistration _cancellationTokenRegistration;
internal bool UserCancellationRequested => _userCancellationRequested;
internal CancellationToken UserCancellationToken { get; set; }
internal bool AttemptPostgresCancellation { get; private set; }
static readonly TimeSpan _cancelImmediatelyTimeout = TimeSpan.FromMilliseconds(-1);
static readonly NpgsqlLogger Log = NpgsqlLogManager.CreateLogger(nameof(NpgsqlConnector));
internal readonly Stopwatch QueryLogStopWatch = new();
#endregion
#region Constants
///
/// The minimum timeout that can be set on internal commands such as COMMIT, ROLLBACK.
///
/// Precision is seconds
internal const int MinimumInternalCommandTimeout = 3;
#endregion
#region Reusable Message Objects
byte[]? _resetWithoutDeallocateMessage;
int _resetWithoutDeallocateResponseCount;
// Backend
readonly CommandCompleteMessage _commandCompleteMessage = new();
readonly ReadyForQueryMessage _readyForQueryMessage = new();
readonly ParameterDescriptionMessage _parameterDescriptionMessage = new();
readonly DataRowMessage _dataRowMessage = new();
readonly RowDescriptionMessage _rowDescriptionMessage = new();
// Since COPY is rarely used, allocate these lazily
CopyInResponseMessage? _copyInResponseMessage;
CopyOutResponseMessage? _copyOutResponseMessage;
CopyDataMessage? _copyDataMessage;
CopyBothResponseMessage? _copyBothResponseMessage;
#endregion
internal NpgsqlDataReader DataReader { get; set; }
internal NpgsqlDataReader? UnboundDataReader { get; set; }
#region Constructors
internal NpgsqlConnector(NpgsqlConnection connection)
: this(connection.Settings, connection.OriginalConnectionString)
{
Connection = connection;
_pool = connection.Pool;
Connection.Connector = this;
ProvideClientCertificatesCallback = Connection.ProvideClientCertificatesCallback;
UserCertificateValidationCallback = Connection.UserCertificateValidationCallback;
ProvidePasswordCallback = Connection.ProvidePasswordCallback;
}
NpgsqlConnector(NpgsqlConnector connector)
: this(connector.Settings, connector.ConnectionString)
{
ProvideClientCertificatesCallback = connector.ProvideClientCertificatesCallback;
UserCertificateValidationCallback = connector.UserCertificateValidationCallback;
ProvidePasswordCallback = connector.ProvidePasswordCallback;
}
///
/// Creates a new connector with the given connection string.
///
/// The parsed connection string.
/// The connection string.
NpgsqlConnector(NpgsqlConnectionStringBuilder settings, string connectionString)
{
State = ConnectorState.Closed;
TransactionStatus = TransactionStatus.Idle;
Settings = settings;
ConnectionString = connectionString;
PostgresParameters = new Dictionary();
CancelLock = new object();
_isKeepAliveEnabled = Settings.KeepAlive > 0;
if (_isKeepAliveEnabled)
{
_userLock = new SemaphoreSlim(1, 1);
_keepAliveTimer = new Timer(PerformKeepAlive, null, Timeout.Infinite, Timeout.Infinite);
}
DataReader = new NpgsqlDataReader(this);
// TODO: Not just for automatic preparation anymore...
PreparedStatementManager = new PreparedStatementManager(this);
if (settings.Multiplexing)
{
// Note: It's OK for this channel to be unbounded: each command enqueued to it is accompanied by sending
// it to PostgreSQL. If we overload it, a TCP zero window will make us block on the networking side
// anyway.
// Note: the in-flight channel can probably be single-writer, but that doesn't actually do anything
// at this point. And we currently rely on being able to complete the channel at any point (from
// Break). We may want to revisit this if an optimized, SingleWriter implementation is introduced.
var commandsInFlightChannel = Channel.CreateUnbounded(
new UnboundedChannelOptions { SingleReader = true });
CommandsInFlightReader = commandsInFlightChannel.Reader;
CommandsInFlightWriter = commandsInFlightChannel.Writer;
// TODO: Properly implement this
if (_isKeepAliveEnabled)
throw new NotImplementedException("Keepalive not yet implemented for multiplexing");
}
}
#endregion
#region Configuration settings
string Host => Settings.Host!;
int Port => Settings.Port;
string Database => Settings.Database!;
string KerberosServiceName => Settings.KerberosServiceName;
SslMode SslMode => Settings.SslMode;
int ConnectionTimeout => Settings.Timeout;
bool IntegratedSecurity => Settings.IntegratedSecurity;
internal bool ConvertInfinityDateTime => Settings.ConvertInfinityDateTime;
///
/// The actual command timeout value that gets set on internal commands.
///
/// Precision is milliseconds
int InternalCommandTimeout
{
get
{
var internalTimeout = Settings.InternalCommandTimeout;
if (internalTimeout == -1)
return Math.Max(Settings.CommandTimeout, MinimumInternalCommandTimeout) * 1000;
// Todo: Decide what we really want here
// This assertion can easily fail if InternalCommandTimeout is set to 1 or 2 in the connection string
// We probably don't want to allow these values but in that case a Debug.Assert is the wrong way to enforce it.
Debug.Assert(internalTimeout == 0 || internalTimeout >= MinimumInternalCommandTimeout);
return internalTimeout * 1000;
}
}
#endregion Configuration settings
#region State management
int _state;
///
/// Gets the current state of the connector
///
internal ConnectorState State
{
get => (ConnectorState)_state;
set
{
var newState = (int)value;
if (newState == _state)
return;
Interlocked.Exchange(ref _state, newState);
}
}
///
/// Returns whether the connector is open, regardless of any task it is currently performing
///
bool IsConnected
=> State switch
{
ConnectorState.Ready => true,
ConnectorState.Executing => true,
ConnectorState.Fetching => true,
ConnectorState.Waiting => true,
ConnectorState.Copy => true,
ConnectorState.Replication => true,
ConnectorState.Closed => false,
ConnectorState.Connecting => false,
ConnectorState.Broken => false,
_ => throw new ArgumentOutOfRangeException("Unknown state: " + State)
};
internal bool IsReady => State == ConnectorState.Ready;
internal bool IsClosed => State == ConnectorState.Closed;
internal bool IsBroken => State == ConnectorState.Broken;
#endregion
#region Open
///
/// Opens the physical connection to the server.
///
/// Usually called by the RequestConnector
/// Method of the connection pool manager.
internal async Task Open(NpgsqlTimeout timeout, bool async, CancellationToken cancellationToken)
{
Debug.Assert(Connection != null && Connection.Connector == this);
Debug.Assert(State == ConnectorState.Closed);
State = ConnectorState.Connecting;
try
{
await RawOpen(timeout, async, cancellationToken);
var username = GetUsername();
if (Settings.Database == null)
Settings.Database = username;
timeout.CheckAndApply(this);
WriteStartupMessage(username);
await Flush(async, cancellationToken);
using (StartCancellableOperation(cancellationToken, attemptPgCancellation: false))
{
await Authenticate(username, timeout, async, cancellationToken);
// We treat BackendKeyData as optional because some PostgreSQL-like database
// don't send it (CockroachDB, CrateDB)
var msg = await ReadMessage(async);
if (msg.Code == BackendMessageCode.BackendKeyData)
{
var keyDataMsg = (BackendKeyDataMessage)msg;
BackendProcessId = keyDataMsg.BackendProcessId;
_backendSecretKey = keyDataMsg.BackendSecretKey;
msg = await ReadMessage(async);
}
if (msg.Code != BackendMessageCode.ReadyForQuery)
throw new NpgsqlException($"Received backend message {msg.Code} while expecting ReadyForQuery. Please file a bug.");
State = ConnectorState.Ready;
}
await LoadDatabaseInfo(forceReload: false, timeout, async, cancellationToken);
if (Settings.Pooling && !Settings.Multiplexing && !Settings.NoResetOnClose && DatabaseInfo.SupportsDiscard)
{
_sendResetOnClose = true;
GenerateResetMessage();
}
OpenTimestamp = DateTime.UtcNow;
Log.Trace($"Opened connection to {Host}:{Port}");
if (Settings.Multiplexing)
{
// Start an infinite async loop, which processes incoming multiplexing traffic.
// It is intentionally not awaited and will run as long as the connector is alive.
// The CommandsInFlightWriter channel is completed in Cleanup, which should cause this task
// to complete.
_ = Task.Run(MultiplexingReadLoop, CancellationToken.None)
.ContinueWith(t =>
{
// Note that we *must* observe the exception if the task is faulted.
Log.Error("Exception bubbled out of multiplexing read loop", t.Exception!, Id);
}, TaskContinuationOptions.OnlyOnFaulted);
}
if (_isKeepAliveEnabled)
{
// Start the keep alive mechanism to work by scheduling the timer.
// Otherwise, it doesn't work for cases when no query executed during
// the connection lifetime in case of a new connector.
lock (this)
{
var keepAlive = Settings.KeepAlive * 1000;
_keepAliveTimer!.Change(keepAlive, keepAlive);
}
}
}
catch (Exception e)
{
Break(e);
throw;
}
}
internal async ValueTask LoadDatabaseInfo(bool forceReload, NpgsqlTimeout timeout, bool async,
CancellationToken cancellationToken = default)
{
// Super hacky stuff...
var prevBindingScope = Connection!.ConnectorBindingScope;
Connection.ConnectorBindingScope = ConnectorBindingScope.PhysicalConnecting;
using var _ = Defer(static (conn, prevScope) => conn.ConnectorBindingScope = prevScope, Connection, prevBindingScope);
// The type loading below will need to send queries to the database, and that depends on a type mapper
// being set up (even if its empty)
TypeMapper = new ConnectorTypeMapper(this);
var key = new NpgsqlDatabaseInfoCacheKey(Settings);
if (forceReload || !NpgsqlDatabaseInfo.Cache.TryGetValue(key, out var database))
{
var hasSemaphore = async
? await DatabaseInfoSemaphore.WaitAsync(timeout.TimeLeft, cancellationToken)
: DatabaseInfoSemaphore.Wait(timeout.TimeLeft, cancellationToken);
// We've timed out - calling Check, to throw the correct exception
if (!hasSemaphore)
timeout.Check();
try
{
if (forceReload || !NpgsqlDatabaseInfo.Cache.TryGetValue(key, out database))
{
NpgsqlDatabaseInfo.Cache[key] = database = await NpgsqlDatabaseInfo.Load(Connection,
timeout, async);
}
}
finally
{
DatabaseInfoSemaphore.Release();
}
}
DatabaseInfo = database!;
TypeMapper.Bind(DatabaseInfo);
}
void WriteStartupMessage(string username)
{
var startupParams = new Dictionary
{
["user"] = username,
["client_encoding"] = Settings.ClientEncoding ??
PostgresEnvironment.ClientEncoding ??
"UTF8",
["database"] = Settings.Database!
};
if (Settings.ApplicationName?.Length > 0)
startupParams["application_name"] = Settings.ApplicationName;
if (Settings.SearchPath?.Length > 0)
startupParams["search_path"] = Settings.SearchPath;
var timezone = Settings.Timezone ?? PostgresEnvironment.TimeZone;
if (timezone != null)
startupParams["TimeZone"] = timezone;
var options = Settings.Options ?? PostgresEnvironment.Options;
if (options?.Length > 0)
startupParams["options"] = options;
switch (Settings.ReplicationMode)
{
case ReplicationMode.Logical:
startupParams["replication"] = "database";
break;
case ReplicationMode.Physical:
startupParams["replication"] = "true";
break;
}
WriteStartup(startupParams);
}
string GetUsername()
{
var username = Settings.Username;
if (username?.Length > 0)
return username;
username = PostgresEnvironment.User;
if (username?.Length > 0)
return username;
if (!PGUtil.IsWindows)
{
username = KerberosUsernameProvider.GetUsername(Settings.IncludeRealm);
if (username?.Length > 0)
return username;
}
username = Environment.UserName;
if (username?.Length > 0)
return username;
throw new NpgsqlException("No username could be found, please specify one explicitly");
}
async Task RawOpen(NpgsqlTimeout timeout, bool async, CancellationToken cancellationToken)
{
var cert = default(X509Certificate2?);
try
{
if (async)
await ConnectAsync(timeout, cancellationToken);
else
Connect(timeout);
_baseStream = new NetworkStream(_socket, true);
_stream = _baseStream;
if (Settings.Encoding == "UTF8")
{
TextEncoding = PGUtil.UTF8Encoding;
RelaxedTextEncoding = PGUtil.RelaxedUTF8Encoding;
}
else
{
TextEncoding = Encoding.GetEncoding(Settings.Encoding, EncoderFallback.ExceptionFallback, DecoderFallback.ExceptionFallback);
RelaxedTextEncoding = Encoding.GetEncoding(Settings.Encoding, EncoderFallback.ReplacementFallback, DecoderFallback.ReplacementFallback);
}
ReadBuffer = new NpgsqlReadBuffer(this, _stream, _socket, Settings.ReadBufferSize, TextEncoding, RelaxedTextEncoding);
WriteBuffer = new NpgsqlWriteBuffer(this, _stream, _socket, Settings.WriteBufferSize, TextEncoding);
timeout.CheckAndApply(this);
if (SslMode == SslMode.Require || SslMode == SslMode.Prefer)
{
WriteSslRequest();
await Flush(async, cancellationToken);
await ReadBuffer.Ensure(1, async);
var response = (char)ReadBuffer.ReadByte();
timeout.CheckAndApply(this);
switch (response)
{
default:
throw new NpgsqlException($"Received unknown response {response} for SSLRequest (expecting S or N)");
case 'N':
if (SslMode == SslMode.Require)
throw new NpgsqlException("SSL connection requested. No SSL enabled connection from this host is configured.");
break;
case 'S':
var clientCertificates = new X509Certificate2Collection();
var certPath = Settings.ClientCertificate ?? PostgresEnvironment.SslCert;
if (certPath is null && PostgresEnvironment.SslCertDefault is string certPathDefault)
certPath = certPathDefault;
if (certPath != null)
{
cert = new X509Certificate2(certPath, Settings.ClientCertificateKey ?? PostgresEnvironment.SslKey);
clientCertificates.Add(cert);
}
ProvideClientCertificatesCallback?.Invoke(clientCertificates);
var certificateValidationCallback = Settings.TrustServerCertificate
? SslTrustServerValidation
: (Settings.RootCertificate ?? PostgresEnvironment.SslCertRoot ?? PostgresEnvironment.SslCertRootDefault) is { } certRootPath
? SslRootValidation(certRootPath)
: UserCertificateValidationCallback is { } userValidation
? userValidation
: SslDefaultValidation;
timeout.CheckAndApply(this);
try
{
var sslStream = new SslStream(_stream, leaveInnerStreamOpen: false, certificateValidationCallback);
if (async)
await sslStream.AuthenticateAsClientAsync(Host, clientCertificates,
SslProtocols.Tls | SslProtocols.Tls11 | SslProtocols.Tls12, Settings.CheckCertificateRevocation);
else
sslStream.AuthenticateAsClient(Host, clientCertificates,
SslProtocols.Tls | SslProtocols.Tls11 | SslProtocols.Tls12, Settings.CheckCertificateRevocation);
_stream = sslStream;
}
catch (Exception e)
{
throw new NpgsqlException("Exception while performing SSL handshake", e);
}
ReadBuffer.Clear(); // Reset to empty after reading single SSL char
ReadBuffer.Underlying = _stream;
WriteBuffer.Underlying = _stream;
IsSecure = true;
Log.Trace("SSL negotiation successful");
break;
}
}
Log.Trace($"Socket connected to {Host}:{Port}");
}
catch
{
cert?.Dispose();
_stream?.Dispose();
_stream = null!;
_baseStream?.Dispose();
_baseStream = null!;
_socket?.Dispose();
_socket = null!;
throw;
}
}
void Connect(NpgsqlTimeout timeout)
{
// Note that there aren't any timeout-able or cancellable DNS methods
var endpoints = Path.IsPathRooted(Host)
? new EndPoint[] { new UnixDomainSocketEndPoint(Path.Combine(Host, $".s.PGSQL.{Port}")) }
: Dns.GetHostAddresses(Host).Select(a => new IPEndPoint(a, Port)).ToArray();
timeout.Check();
// Give each endpoint an equal share of the remaining time
var perEndpointTimeout = -1; // Default to infinity
if (timeout.IsSet)
{
var timeoutTicks = timeout.TimeLeft.Ticks;
if (timeoutTicks <= 0)
throw new TimeoutException();
perEndpointTimeout = (int)(timeoutTicks / endpoints.Length / 10);
}
for (var i = 0; i < endpoints.Length; i++)
{
var endpoint = endpoints[i];
Log.Trace($"Attempting to connect to {endpoint}");
var protocolType =
endpoint.AddressFamily == AddressFamily.InterNetwork ||
endpoint.AddressFamily == AddressFamily.InterNetworkV6
? ProtocolType.Tcp
: ProtocolType.IP;
var socket = new Socket(endpoint.AddressFamily, SocketType.Stream, protocolType)
{
Blocking = false
};
try
{
try
{
socket.Connect(endpoint);
}
catch (SocketException e)
{
if (e.SocketErrorCode != SocketError.WouldBlock)
throw;
}
var write = new List { socket };
var error = new List { socket };
Socket.Select(null, write, error, perEndpointTimeout);
var errorCode = (int) socket.GetSocketOption(SocketOptionLevel.Socket, SocketOptionName.Error)!;
if (errorCode != 0)
throw new SocketException(errorCode);
if (!write.Any())
throw new TimeoutException("Timeout during connection attempt");
socket.Blocking = true;
SetSocketOptions(socket);
_socket = socket;
return;
}
catch (Exception e)
{
try { socket.Dispose(); }
catch
{
// ignored
}
Log.Trace($"Failed to connect to {endpoint}", e);
if (i == endpoints.Length - 1)
throw new NpgsqlException("Exception while connecting", e);
}
}
}
async Task ConnectAsync(NpgsqlTimeout timeout, CancellationToken cancellationToken)
{
// Note that there aren't any timeout-able or cancellable DNS methods
var endpoints = Path.IsPathRooted(Host)
? new EndPoint[] { new UnixDomainSocketEndPoint(Path.Combine(Host, $".s.PGSQL.{Port}")) }
: (await Dns.GetHostAddressesAsync(Host).WithCancellationAndTimeout(timeout, cancellationToken))
.Select(a => new IPEndPoint(a, Port)).ToArray();
// Give each IP an equal share of the remaining time
var perIpTimespan = default(TimeSpan);
var perIpTimeout = timeout;
if (timeout.IsSet)
{
var timeoutTicks = timeout.TimeLeft.Ticks;
if (timeoutTicks <= 0)
throw new TimeoutException();
perIpTimespan = new TimeSpan(timeoutTicks / endpoints.Length);
perIpTimeout = new NpgsqlTimeout(perIpTimespan);
}
for (var i = 0; i < endpoints.Length; i++)
{
var endpoint = endpoints[i];
Log.Trace($"Attempting to connect to {endpoint}");
var protocolType =
endpoint.AddressFamily == AddressFamily.InterNetwork ||
endpoint.AddressFamily == AddressFamily.InterNetworkV6
? ProtocolType.Tcp
: ProtocolType.IP;
var socket = new Socket(endpoint.AddressFamily, SocketType.Stream, protocolType);
CancellationTokenSource? combinedCts = null;
try
{
// .NET 5.0 added cancellation support to ConnectAsync, which allows us to implement real
// cancellation and timeout. On older TFMs, we fake-cancel the operation, i.e. stop waiting
// and raise the exception, but the actual connection task is left running.
#if NETSTANDARD2_0 || NETSTANDARD2_1 || NETCOREAPP3_1
await socket.ConnectAsync(endpoint)
.WithCancellationAndTimeout(perIpTimeout, cancellationToken);
#else
var finalCt = cancellationToken;
if (perIpTimeout.IsSet)
{
combinedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
combinedCts.CancelAfter((int)perIpTimeout.TimeLeft.TotalMilliseconds);
finalCt = combinedCts.Token;
}
await socket.ConnectAsync(endpoint, finalCt);
#endif
SetSocketOptions(socket);
_socket = socket;
return;
}
catch (Exception e)
{
try
{
socket.Dispose();
}
catch
{
// ignored
}
cancellationToken.ThrowIfCancellationRequested();
if (e is OperationCanceledException)
e = new TimeoutException("Timeout during connection attempt");
Log.Trace($"Failed to connect to {endpoint}", e);
if (i == endpoints.Length - 1)
{
throw new NpgsqlException("Exception while connecting", e);
}
}
finally
{
combinedCts?.Dispose();
}
}
}
void SetSocketOptions(Socket socket)
{
if (socket.AddressFamily == AddressFamily.InterNetwork)
socket.NoDelay = true;
if (Settings.SocketReceiveBufferSize > 0)
socket.ReceiveBufferSize = Settings.SocketReceiveBufferSize;
if (Settings.SocketSendBufferSize > 0)
socket.SendBufferSize = Settings.SocketSendBufferSize;
if (Settings.TcpKeepAlive)
socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true);
if (Settings.TcpKeepAliveInterval > 0 && Settings.TcpKeepAliveTime == 0)
throw new ArgumentException("If TcpKeepAliveInterval is defined, TcpKeepAliveTime must be defined as well");
if (Settings.TcpKeepAliveTime > 0)
{
var timeSeconds = Settings.TcpKeepAliveTime;
var intervalSeconds = Settings.TcpKeepAliveInterval > 0
? Settings.TcpKeepAliveInterval
: Settings.TcpKeepAliveTime;
#if NETSTANDARD2_0 || NETSTANDARD2_1
var timeMilliseconds = timeSeconds * 1000;
var intervalMilliseconds = intervalSeconds * 1000;
// For the following see https://msdn.microsoft.com/en-us/library/dd877220.aspx
var uintSize = Marshal.SizeOf(typeof(uint));
var inOptionValues = new byte[uintSize * 3];
BitConverter.GetBytes((uint)1).CopyTo(inOptionValues, 0);
BitConverter.GetBytes((uint)timeMilliseconds).CopyTo(inOptionValues, uintSize);
BitConverter.GetBytes((uint)intervalMilliseconds).CopyTo(inOptionValues, uintSize * 2);
var result = 0;
try
{
result = socket.IOControl(IOControlCode.KeepAliveValues, inOptionValues, null);
}
catch (PlatformNotSupportedException)
{
throw new PlatformNotSupportedException("Setting TCP Keepalive Time and TCP Keepalive Interval is supported only on Windows, Mono and .NET Core 3.1+. " +
"TCP keepalives can still be used on other systems but are enabled via the TcpKeepAlive option or configured globally for the machine, see the relevant docs.");
}
if (result != 0)
throw new NpgsqlException($"Got non-zero value when trying to set TCP keepalive: {result}");
#else
socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true);
socket.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveTime, timeSeconds);
socket.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveInterval, intervalSeconds);
#endif
}
}
#endregion
#region I/O
internal readonly ChannelReader? CommandsInFlightReader;
internal readonly ChannelWriter? CommandsInFlightWriter;
internal volatile int CommandsInFlightCount;
internal ManualResetValueTaskSource