Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Npgsql.sln.DotSettings
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
<s:Boolean x:Key="/Default/UserDictionary/Words/=bytea/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=citext/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=Conformant/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=containee/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=Datemultirange/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=daterange/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=DDEX/@EntryIndexedValue">True</s:Boolean>
Expand Down
8 changes: 4 additions & 4 deletions src/Npgsql/BackendMessages/RowDescriptionMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ internal RowDescriptionMessage(int numFields = 10)
_insensitiveIndex = new Dictionary<string, int>(source._insensitiveIndex);
}

internal RowDescriptionMessage Load(NpgsqlReadBuffer buf, ConnectorTypeMapper typeMapper)
internal RowDescriptionMessage Load(NpgsqlReadBuffer buf, TypeMapper typeMapper)
{
_nameIndex.Clear();
_insensitiveIndex?.Clear();
Expand Down Expand Up @@ -77,7 +77,7 @@ internal RowDescriptionMessage Load(NpgsqlReadBuffer buf, ConnectorTypeMapper ty
}

internal static RowDescriptionMessage CreateForReplication(
ConnectorTypeMapper typeMapper, uint tableOID, FormatCode formatCode, IReadOnlyList<RelationMessage.Column> columns)
TypeMapper typeMapper, uint tableOID, FormatCode formatCode, IReadOnlyList<RelationMessage.Column> columns)
{
var msg = new RowDescriptionMessage(columns.Count);
var numFields = msg.Count = columns.Count;
Expand Down Expand Up @@ -238,7 +238,7 @@ internal FieldDescription(FieldDescription source)
}

internal void Populate(
ConnectorTypeMapper typeMapper, string name, uint tableOID, short columnAttributeNumber,
TypeMapper typeMapper, string name, uint tableOID, short columnAttributeNumber,
uint oid, short typeSize, int typeModifier, FormatCode formatCode
)
{
Expand Down Expand Up @@ -309,7 +309,7 @@ internal PostgresType PostgresType
internal void ResolveHandler()
=> Handler = IsBinaryFormat ? _typeMapper.ResolveByOID(TypeOID) : _typeMapper.UnrecognizedTypeHandler;

ConnectorTypeMapper _typeMapper;
TypeMapper _typeMapper;

internal bool IsBinaryFormat => FormatCode == FormatCode.Binary;
internal bool IsTextFormat => FormatCode == FormatCode.Text;
Expand Down
2 changes: 1 addition & 1 deletion src/Npgsql/Internal/NpgsqlConnector.Auth.cs
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ async Task AuthenticateGSS(bool async)

async ValueTask<string?> GetPassword(string username, bool async, CancellationToken cancellationToken = default)
{
var password = await _dataSource.GetPassword(async, cancellationToken);
var password = await DataSource.GetPassword(async, cancellationToken);

if (password is not null)
return password;
Expand Down
106 changes: 33 additions & 73 deletions src/Npgsql/Internal/NpgsqlConnector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public sealed partial class NpgsqlConnector : IDisposable
/// </summary>
public NpgsqlDatabaseInfo DatabaseInfo { get; internal set; } = default!;

internal ConnectorTypeMapper TypeMapper { get; set; } = default!;
internal TypeMapper TypeMapper { get; set; } = default!;

/// <summary>
/// The current transaction status for this connector.
Expand Down Expand Up @@ -161,11 +161,6 @@ public sealed partial class NpgsqlConnector : IDisposable
/// </summary>
volatile Exception? _breakReason;

/// <summary>
/// Semaphore, used to synchronize DatabaseInfo between multiple connections, so it wouldn't be loaded in parallel.
/// </summary>
static readonly SemaphoreSlim DatabaseInfoSemaphore = new(1);

/// <summary>
/// <para>
/// Used by the pool to indicate that I/O is currently in progress on this connector, so that another write
Expand Down Expand Up @@ -228,9 +223,9 @@ internal void FlagAsWritableForMultiplexing()
/// Note that in multi-host scenarios, this references the host-specific <see cref="PoolingDataSource"/> rather than the
/// <see cref="NpgsqlMultiHostDataSource"/>.
/// </summary>
readonly NpgsqlDataSource _dataSource;
internal NpgsqlDataSource DataSource { get; }

internal string UserFacingConnectionString => _dataSource.ConnectionString;
internal string UserFacingConnectionString => DataSource.ConnectionString;

/// <summary>
/// Contains the UTC timestamp when this connector was opened, used to implement
Expand Down Expand Up @@ -315,7 +310,7 @@ internal NpgsqlConnector(NpgsqlDataSource dataSource, NpgsqlConnection conn)
}

NpgsqlConnector(NpgsqlConnector connector)
: this(connector._dataSource)
: this(connector.DataSource)
{
ProvideClientCertificatesCallback = connector.ProvideClientCertificatesCallback;
UserCertificateValidationCallback = connector.UserCertificateValidationCallback;
Expand All @@ -326,7 +321,7 @@ internal NpgsqlConnector(NpgsqlDataSource dataSource, NpgsqlConnection conn)
{
Debug.Assert(dataSource.OwnsConnectors);

_dataSource = dataSource;
DataSource = dataSource;

LoggingConfiguration = dataSource.LoggingConfiguration;
ConnectionLogger = LoggingConfiguration.ConnectionLogger;
Expand Down Expand Up @@ -464,7 +459,13 @@ internal async Task Open(NpgsqlTimeout timeout, bool async, CancellationToken ca
{
await OpenCore(this, Settings.SslMode, timeout, async, cancellationToken);

await LoadDatabaseInfo(forceReload: false, timeout, async, cancellationToken);
if (!DataSource.IsBootstrapped)
await DataSource.Bootstrap(this, timeout, async, cancellationToken);

Debug.Assert(DataSource.TypeMapper is not null);
Debug.Assert(DataSource.DatabaseInfo is not null);
TypeMapper = DataSource.TypeMapper;
DatabaseInfo = DataSource.DatabaseInfo;

if (Settings.Pooling && !Settings.Multiplexing && !Settings.NoResetOnClose && DatabaseInfo.SupportsDiscard)
{
Expand Down Expand Up @@ -500,18 +501,18 @@ internal async Task Open(NpgsqlTimeout timeout, bool async, CancellationToken ca
}
}

if (_dataSource.ConnectionInitializerAsync is not null)
if (DataSource.ConnectionInitializerAsync is not null)
{
Debug.Assert(_dataSource.ConnectionInitializer is not null);
Debug.Assert(DataSource.ConnectionInitializer is not null);

var tempConnection = new NpgsqlConnection(_dataSource, this);
var tempConnection = new NpgsqlConnection(DataSource, this);

try
{
if (async)
await _dataSource.ConnectionInitializerAsync(tempConnection);
await DataSource.ConnectionInitializerAsync(tempConnection);
else if (!async)
_dataSource.ConnectionInitializer(tempConnection);
DataSource.ConnectionInitializer(tempConnection);
}
finally
{
Expand Down Expand Up @@ -595,47 +596,6 @@ await OpenCore(
}
}

internal async ValueTask LoadDatabaseInfo(bool forceReload, NpgsqlTimeout timeout, bool async,
CancellationToken cancellationToken = default)
{
// The type loading below will need to send queries to the database, and that depends on a type mapper being set up (even if its
// empty). So we set up here, and then later inject the DatabaseInfo.
// For multiplexing connectors, the type mapper is the shared pool-wide one (since when validating/binding parameters on
// multiplexing there's no connector yet). However, in the very first multiplexing connection (bootstrap phase) we create
// a connector-specific mapper, which will later become shared pool-wide one.
TypeMapper =
Settings.Multiplexing && ((MultiplexingDataSource)_dataSource).MultiplexingTypeMapper is { } multiplexingTypeMapper
? multiplexingTypeMapper
: new ConnectorTypeMapper(this);

var key = new NpgsqlDatabaseInfoCacheKey(Settings);
if (forceReload || !NpgsqlDatabaseInfo.Cache.TryGetValue(key, out var database))
{
var hasSemaphore = async
? await DatabaseInfoSemaphore.WaitAsync(timeout.CheckAndGetTimeLeft(), cancellationToken)
: DatabaseInfoSemaphore.Wait(timeout.CheckAndGetTimeLeft(), cancellationToken);

// We've timed out - calling Check, to throw the correct exception
if (!hasSemaphore)
timeout.Check();

try
{
if (forceReload || !NpgsqlDatabaseInfo.Cache.TryGetValue(key, out database))
{
NpgsqlDatabaseInfo.Cache[key] = database = await NpgsqlDatabaseInfo.Load(this, timeout, async);
}
}
finally
{
DatabaseInfoSemaphore.Release();
}
}

DatabaseInfo = database;
TypeMapper.DatabaseInfo = database;
}

internal async ValueTask<ClusterState> QueryClusterState(
NpgsqlTimeout timeout, bool async, CancellationToken cancellationToken = default)
{
Expand Down Expand Up @@ -1177,7 +1137,7 @@ async Task MultiplexingReadLoop()
SpinWait.SpinUntil(() => MultiplexAsyncWritingLock == 0 || IsBroken);

ResetReadBuffer();
_dataSource.Return(this);
DataSource.Return(this);
}
}

Expand Down Expand Up @@ -1214,7 +1174,7 @@ async Task MultiplexingReadLoop()
}

// "Return" the connector to the pool to for cleanup (e.g. update total connector count)
_dataSource.Return(this);
DataSource.Return(this);

ConnectionLogger.LogError(e, "Exception in multiplexing read loop", Id);
}
Expand Down Expand Up @@ -1930,9 +1890,9 @@ internal void Close()
}

internal bool TryRemovePendingEnlistedConnector(Transaction transaction)
=> _dataSource.TryRemovePendingEnlistedConnector(this, transaction);
=> DataSource.TryRemovePendingEnlistedConnector(this, transaction);

internal void Return() => _dataSource.Return(this);
internal void Return() => DataSource.Return(this);

/// <inheritdoc />
public void Dispose() => Close();
Expand Down Expand Up @@ -1971,7 +1931,7 @@ internal Exception Break(Exception reason)
{
ClusterStateCache.UpdateClusterState(Host, Port, ClusterState.Offline, DateTime.UtcNow,
Settings.HostRecheckSecondsTranslated);
_dataSource.Clear();
DataSource.Clear();
}

LogMessages.BreakingConnection(ConnectionLogger, Id, reason);
Expand All @@ -1991,17 +1951,17 @@ internal Exception Break(Exception reason)
Debug.Assert(closeLockTaken);
if (Settings.ReplicationMode == ReplicationMode.Off)
{
// When a connector is broken, we immediately "return" it to the pool (i.e. update the pool state so reflect the
// connector no longer being open). Upper layers such as EF may check DbConnection.ConnectionState, and only close if
// it's closed; so we can't set the state to Closed and expect the user to still close (in order to return to the pool).
// On the other hand leaving the state Open could indicate to the user that the connection is functional.
// (see https://github.com/npgsql/npgsql/issues/3705#issuecomment-839908772)
Connection = null;
if (connection.ConnectorBindingScope != ConnectorBindingScope.None)
Return();
connection.EnlistedTransaction = null;
connection.Connector = null;
connection.ConnectorBindingScope = ConnectorBindingScope.None;
// When a connector is broken, we immediately "return" it to the pool (i.e. update the pool state so reflect the
Comment thread
roji marked this conversation as resolved.
Outdated
// connector no longer being open). Upper layers such as EF may check DbConnection.ConnectionState, and only close if
// it's closed; so we can't set the state to Closed and expect the user to still close (in order to return to the pool).
// On the other hand leaving the state Open could indicate to the user that the connection is functional.
// (see https://github.com/npgsql/npgsql/issues/3705#issuecomment-839908772)
Connection = null;
if (connection.ConnectorBindingScope != ConnectorBindingScope.None)
Return();
connection.EnlistedTransaction = null;
connection.Connector = null;
connection.ConnectorBindingScope = ConnectorBindingScope.None;
}
connection.FullState = ConnectionState.Broken;
connection.ReleaseCloseLock();
Expand Down
12 changes: 2 additions & 10 deletions src/Npgsql/Internal/NpgsqlDatabaseInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@ public abstract class NpgsqlDatabaseInfo
{
#region Fields

internal static readonly ConcurrentDictionary<NpgsqlDatabaseInfoCacheKey, NpgsqlDatabaseInfo> Cache
= new();

static volatile INpgsqlDatabaseInfoFactory[] Factories = new INpgsqlDatabaseInfoFactory[]
{
new PostgresMinimalDatabaseInfoFactory(),
Expand Down Expand Up @@ -185,7 +182,7 @@ private protected NpgsqlDatabaseInfo(string host, int port, string databaseName,
public PostgresType GetPostgresTypeByName(string pgName)
=> TryGetPostgresTypeByName(pgName, out var pgType)
? pgType
: throw new ArgumentException($"A PostgreSQL type with the name {pgName} was not found in the database");
: throw new ArgumentException($"A PostgreSQL type with the name '{pgName}' was not found in the database");

public bool TryGetPostgresTypeByName(string pgName, [NotNullWhen(true)] out PostgresType? pgType)
{
Expand Down Expand Up @@ -302,8 +299,6 @@ public static void RegisterFactory(INpgsqlDatabaseInfoFactory factory)
factories[0] = factory;
Array.Copy(Factories, 0, factories, 1, Factories.Length);
Factories = factories;

Cache.Clear();
}

internal static async Task<NpgsqlDatabaseInfo> Load(NpgsqlConnector conn, NpgsqlTimeout timeout, bool async)
Expand All @@ -324,14 +319,11 @@ internal static async Task<NpgsqlDatabaseInfo> Load(NpgsqlConnector conn, Npgsql

// For tests
internal static void ResetFactories()
{
Factories = new INpgsqlDatabaseInfoFactory[]
=> Factories = new INpgsqlDatabaseInfoFactory[]
{
new PostgresMinimalDatabaseInfoFactory(),
new PostgresDatabaseInfoFactory()
};
Cache.Clear();
}

#endregion Factory management
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ namespace Npgsql.Internal.TypeHandlers.CompositeHandlers;

sealed partial class CompositeHandler<T> : NpgsqlTypeHandler<T>, ICompositeHandler
{
readonly ConnectorTypeMapper _typeMapper;
readonly TypeMapper _typeMapper;
readonly INpgsqlNameTranslator _nameTranslator;

Func<T>? _constructor;
Expand All @@ -51,7 +51,7 @@ sealed partial class CompositeHandler<T> : NpgsqlTypeHandler<T>, ICompositeHandl

public Type CompositeType => typeof(T);

public CompositeHandler(PostgresCompositeType postgresType, ConnectorTypeMapper typeMapper, INpgsqlNameTranslator nameTranslator)
public CompositeHandler(PostgresCompositeType postgresType, TypeMapper typeMapper, INpgsqlNameTranslator nameTranslator)
: base(postgresType)
{
_typeMapper = typeMapper;
Expand Down Expand Up @@ -146,7 +146,7 @@ void InitializeCore()
}
}

static CompositeConstructorHandler<T>? CreateConstructorHandler(PostgresCompositeType pgType, ConnectorTypeMapper typeMapper, INpgsqlNameTranslator nameTranslator)
static CompositeConstructorHandler<T>? CreateConstructorHandler(PostgresCompositeType pgType, TypeMapper typeMapper, INpgsqlNameTranslator nameTranslator)
{
var pgFields = pgType.Fields;
var clrType = typeof(T);
Expand Down Expand Up @@ -222,7 +222,7 @@ void InitializeCore()
return null;
}

static CompositeMemberHandler<T>[] CreateMemberHandlers(PostgresCompositeType pgType, ConnectorTypeMapper typeMapper, INpgsqlNameTranslator nameTranslator)
static CompositeMemberHandler<T>[] CreateMemberHandlers(PostgresCompositeType pgType, TypeMapper typeMapper, INpgsqlNameTranslator nameTranslator)
{
var pgFields = pgType.Fields;

Expand Down
4 changes: 2 additions & 2 deletions src/Npgsql/Internal/TypeHandlers/RecordHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ namespace Npgsql.Internal.TypeHandlers;
/// </remarks>
sealed partial class RecordHandler : NpgsqlTypeHandler<object[]>
{
readonly ConnectorTypeMapper _typeMapper;
readonly TypeMapper _typeMapper;

public RecordHandler(PostgresType postgresType, ConnectorTypeMapper typeMapper)
public RecordHandler(PostgresType postgresType, TypeMapper typeMapper)
: base(postgresType)
=> _typeMapper = typeMapper;

Expand Down
Loading