Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/Npgsql/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,12 @@ Npgsql.NpgsqlTypeLoadingOptionsBuilder
Npgsql.NpgsqlTypeLoadingOptionsBuilder.EnableTableCompositesLoading(bool enable = true) -> Npgsql.NpgsqlTypeLoadingOptionsBuilder!
Npgsql.NpgsqlTypeLoadingOptionsBuilder.EnableTypeLoading(bool enable = true) -> Npgsql.NpgsqlTypeLoadingOptionsBuilder!
Npgsql.NpgsqlTypeLoadingOptionsBuilder.SetTypeLoadingSchemas(params System.Collections.Generic.IEnumerable<string!>? schemas) -> Npgsql.NpgsqlTypeLoadingOptionsBuilder!
Npgsql.Replication.LogicalReplicationConnection.LogicalReplicationConnection(Npgsql.NpgsqlDataSource! dataSource) -> void
Npgsql.Replication.PgOutput.ReplicationValue.GetFieldName() -> string!
Npgsql.Replication.PgOutput.Messages.ParallelStreamAbortMessage
Npgsql.Replication.PgOutput.Messages.ParallelStreamAbortMessage.AbortLsn.get -> NpgsqlTypes.NpgsqlLogSequenceNumber
Npgsql.Replication.PgOutput.Messages.ParallelStreamAbortMessage.AbortTimestamp.get -> System.DateTime
Npgsql.Replication.PhysicalReplicationConnection.PhysicalReplicationConnection(Npgsql.NpgsqlDataSource! dataSource) -> void
Npgsql.Replication.PgOutput.PgOutputProtocolVersion
Npgsql.Replication.PgOutput.PgOutputProtocolVersion.V1 = 1 -> Npgsql.Replication.PgOutput.PgOutputProtocolVersion
Npgsql.Replication.PgOutput.PgOutputProtocolVersion.V2 = 2 -> Npgsql.Replication.PgOutput.PgOutputProtocolVersion
Expand Down
6 changes: 6 additions & 0 deletions src/Npgsql/Replication/LogicalReplicationConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,10 @@ public LogicalReplicationConnection() {}
/// </summary>
/// <param name="connectionString">The connection used to open the PostgreSQL database.</param>
public LogicalReplicationConnection(string? connectionString) : base(connectionString) {}

/// <summary>
/// Initializes a new instance of <see cref="LogicalReplicationConnection"/> with the given data source.
/// </summary>
/// <param name="dataSource">The data source to use for this connection.</param>
public LogicalReplicationConnection(NpgsqlDataSource dataSource) : base(dataSource) {}
}
6 changes: 6 additions & 0 deletions src/Npgsql/Replication/PhysicalReplicationConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ public PhysicalReplicationConnection() {}
/// <param name="connectionString">The connection used to open the PostgreSQL database.</param>
public PhysicalReplicationConnection(string? connectionString) : base(connectionString) {}

/// <summary>
/// Initializes a new instance of <see cref="PhysicalReplicationConnection"/> with the given data source.
/// </summary>
/// <param name="dataSource">The data source to use for this connection.</param>
public PhysicalReplicationConnection(NpgsqlDataSource dataSource) : base(dataSource) {}

/// <summary>
/// Creates a <see cref="PhysicalReplicationSlot"/> that wraps a PostgreSQL physical replication slot and
/// can be used to start physical streaming replication
Expand Down
35 changes: 35 additions & 0 deletions src/Npgsql/Replication/ReplicationConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public abstract class ReplicationConnection : IAsyncDisposable
static readonly Version FirstVersionWithoutDropSlotDoubleCommandCompleteMessage = new(13, 0);
static readonly Version FirstVersionWithTemporarySlotsAndSlotSnapshotInitMode = new(10, 0);
readonly NpgsqlConnection _npgsqlConnection;
readonly NpgsqlDataSource? _internalDataSource;
readonly SemaphoreSlim _feedbackSemaphore = new(1, 1);
string? _userFacingConnectionString;
TimeSpan? _commandTimeout;
Expand Down Expand Up @@ -68,6 +69,25 @@ private protected ReplicationConnection()
private protected ReplicationConnection(string? connectionString) : this()
=> ConnectionString = connectionString;

private protected ReplicationConnection(NpgsqlDataSource dataSource)
{
ArgumentNullException.ThrowIfNull(dataSource);

if (dataSource is NpgsqlMultiHostDataSource)
throw new NotSupportedException("Replication is not supported with multiple hosts");

var settings = dataSource.Settings.Clone();
settings.Pooling = false;
settings.Enlist = false;
settings.KeepAlive = 0;
settings.ReplicationMode = ReplicationMode;

_internalDataSource = new UnpooledDataSource(settings, dataSource.Configuration);
_npgsqlConnection = NpgsqlConnection.FromDataSource(_internalDataSource);
_userFacingConnectionString = dataSource.ConnectionString;
_requestFeedbackInterval = new TimeSpan(_walReceiverTimeout.Ticks / 2);
}

#endregion

#region Properties
Expand All @@ -90,6 +110,9 @@ public string ConnectionString {
get => _userFacingConnectionString ?? string.Empty;
set
{
if (_internalDataSource is not null)
throw new InvalidOperationException("The connection string property can not be changed when the replication connection was created with a data source.");

_userFacingConnectionString = value;
var cs = new NpgsqlConnectionStringBuilder(value)
{
Expand Down Expand Up @@ -284,6 +307,18 @@ public async ValueTask DisposeAsync()
// Dispose
}

if (_internalDataSource is not null)
{
try
{
await _internalDataSource.DisposeAsync().ConfigureAwait(false);
}
catch
{
// Dispose
}
}

_isDisposed = true;
}

Expand Down
17 changes: 17 additions & 0 deletions test/Npgsql.Tests/Replication/CommonReplicationTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,15 @@ public async Task Open()
await using var rc = await OpenReplicationConnectionAsync();
}

[Test, NonParallelizable]
public async Task Open_with_data_source()
{
await using var dataSource = NpgsqlDataSource.Create(ConnectionString);
await using var rc = await OpenReplicationConnectionAsync(dataSource);
var info = await rc.IdentifySystem();
Assert.That(info.Timeline, Is.GreaterThan(0));
}

[Test]
public void Open_with_cancelled_token()
=> Assert.That(async () =>
Expand All @@ -45,6 +54,14 @@ public void Open_on_disposed_connection()
.With.Property(nameof(ObjectDisposedException.ObjectName))
.EqualTo(typeof(TConnection).Name));

[Test]
public async Task ConnectionString_cannot_be_set_when_created_with_data_source()
{
await using var dataSource = NpgsqlDataSource.Create(ConnectionString);
await using var rc = (TConnection)Activator.CreateInstance(typeof(TConnection), dataSource)!;
Assert.Throws<InvalidOperationException>(() => rc.ConnectionString = "Host=localhost");
}

#endregion Open

#region IdentifySystem
Expand Down
9 changes: 9 additions & 0 deletions test/Npgsql.Tests/Replication/SafeReplicationTestBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,15 @@ private protected async Task<TConnection> OpenReplicationConnectionAsync(
return c;
}

private protected async Task<TConnection> OpenReplicationConnectionAsync(
NpgsqlDataSource dataSource,
CancellationToken cancellationToken = default)
{
var c = (TConnection)Activator.CreateInstance(typeof(TConnection), dataSource)!;
await c.Open(cancellationToken);
return c;
}

private protected static async Task AssertReplicationCancellation<T>(IAsyncEnumerator<T> enumerator, bool streamingStarted = true)
{
try
Expand Down
Loading