From 3db90878c441de6ba8ef974190e1bf5c4ccb9dc7 Mon Sep 17 00:00:00 2001 From: Nikita Kazmin Date: Fri, 18 Sep 2020 00:06:37 +0300 Subject: [PATCH 1/5] Added timeouts for the copy operations --- src/Npgsql/NpgsqlBinaryExporter.cs | 13 +++++++++++++ src/Npgsql/NpgsqlBinaryImporter.cs | 13 +++++++++++++ src/Npgsql/NpgsqlRawCopyStream.cs | 13 +++++++++++++ test/Npgsql.Tests/BugTests.cs | 3 ++- test/Npgsql.Tests/CopyTests.cs | 26 +++++++++++++------------- 5 files changed, 54 insertions(+), 14 deletions(-) diff --git a/src/Npgsql/NpgsqlBinaryExporter.cs b/src/Npgsql/NpgsqlBinaryExporter.cs index c1bd27325d..f12da335ad 100644 --- a/src/Npgsql/NpgsqlBinaryExporter.cs +++ b/src/Npgsql/NpgsqlBinaryExporter.cs @@ -36,6 +36,19 @@ public sealed class NpgsqlBinaryExporter : ICancelable, IAsyncDisposable readonly NpgsqlTypeHandler?[] _typeHandlerCache; static readonly NpgsqlLogger Log = NpgsqlLogManager.CreateLogger(nameof(NpgsqlBinaryExporter)); + /// + /// Current timeout in ms + /// + public int Timeout + { + set + { + _buf.Timeout = TimeSpan.FromMilliseconds(value); + // While calling Complete(), we're using the connector, which overwrites the buffer's timeout with it's own + _connector.UserTimeout = value; + } + } + #endregion #region Construction / Initialization diff --git a/src/Npgsql/NpgsqlBinaryImporter.cs b/src/Npgsql/NpgsqlBinaryImporter.cs index 6c39942a98..52a59217d7 100644 --- a/src/Npgsql/NpgsqlBinaryImporter.cs +++ b/src/Npgsql/NpgsqlBinaryImporter.cs @@ -42,6 +42,19 @@ public sealed class NpgsqlBinaryImporter : ICancelable, IAsyncDisposable static readonly NpgsqlLogger Log = NpgsqlLogManager.CreateLogger(nameof(NpgsqlBinaryImporter)); + /// + /// Current timeout in ms + /// + public int Timeout + { + set + { + _buf.Timeout = TimeSpan.FromMilliseconds(value); + // While calling Complete(), we're using the connector, which overwrites the buffer's timeout with it's own + _connector.UserTimeout = value; + } + } + #endregion #region Construction / Initialization diff --git a/src/Npgsql/NpgsqlRawCopyStream.cs b/src/Npgsql/NpgsqlRawCopyStream.cs index c28dc2c320..6382fae4c5 100644 --- a/src/Npgsql/NpgsqlRawCopyStream.cs +++ b/src/Npgsql/NpgsqlRawCopyStream.cs @@ -37,6 +37,19 @@ public sealed class NpgsqlRawCopyStream : Stream, ICancelable public override bool CanWrite => _canWrite; public override bool CanRead => _canRead; + public override bool CanTimeout => true; + public override int WriteTimeout { get => (int) _writeBuf.Timeout.TotalMilliseconds; set => _writeBuf.Timeout = TimeSpan.FromMilliseconds(value); } + public override int ReadTimeout + { + get => (int) _readBuf.Timeout.TotalMilliseconds; + set + { + _readBuf.Timeout = TimeSpan.FromMilliseconds(value); + // While calling the connector it will overwrite our read buffer timeout + _connector.UserTimeout = value; + } + } + /// /// The copy binary format header signature /// diff --git a/test/Npgsql.Tests/BugTests.cs b/test/Npgsql.Tests/BugTests.cs index 0dd22ba8f1..a571af1f59 100644 --- a/test/Npgsql.Tests/BugTests.cs +++ b/test/Npgsql.Tests/BugTests.cs @@ -1237,7 +1237,8 @@ public void NullReferenceExceptionInBeginTextExport() } finally { - conn.ExecuteNonQuery("DROP FUNCTION IF EXISTS f_test()"); + using var conn2 = OpenConnection(); + conn2.ExecuteNonQuery("DROP FUNCTION IF EXISTS f_test()"); } } diff --git a/test/Npgsql.Tests/CopyTests.cs b/test/Npgsql.Tests/CopyTests.cs index 512776a72e..9b1983b39a 100644 --- a/test/Npgsql.Tests/CopyTests.cs +++ b/test/Npgsql.Tests/CopyTests.cs @@ -17,12 +17,13 @@ public class CopyTests : MultiplexingTestBase #region issue 2257 [Test, Description("Reproduce #2257")] + [Timeout(10000)] public async Task Issue2257() { if (IsMultiplexing) Assert.Ignore("Multiplexing: fails"); - using (var conn = OpenConnection(new NpgsqlConnectionStringBuilder(ConnectionString) { CommandTimeout = 3 })) + using (var conn = OpenConnection(new NpgsqlConnectionStringBuilder(ConnectionString) { CommandTimeout = 30 })) { await using var _ = await GetTempTableName(conn, out var table1); await using var __ = await GetTempTableName(conn, out var table2); @@ -31,27 +32,26 @@ public async Task Issue2257() using (var cmd = conn.CreateCommand()) { cmd.CommandText = $"CREATE TABLE {table1} AS SELECT * FROM generate_series(1, {rowCount}) id"; - // Creating table can take some time, so we set quite large timeout - cmd.CommandTimeout = 30; await cmd.ExecuteNonQueryAsync(); cmd.CommandText = $"ALTER TABLE {table1} ADD CONSTRAINT {table1}_pk PRIMARY KEY (id)"; await cmd.ExecuteNonQueryAsync(); cmd.CommandText = $"CREATE TABLE {table2} (master_id integer NOT NULL REFERENCES {table1} (id))"; - // We need to fail with timeout while calling writer.Complete() and conn.BeginBinaryImport reuses timeout from previous command - // so we set default timeout here - cmd.CommandTimeout = 3; await cmd.ExecuteNonQueryAsync(); } using (var writer = conn.BeginBinaryImport($"COPY {table2} FROM STDIN BINARY")) { - for (var i = 1; i <= rowCount; ++i) + writer.Timeout = 3; + var e = Assert.Throws(() => { - writer.StartRow(); - writer.Write(i); - } + for (var i = 1; i <= rowCount; ++i) + { + writer.StartRow(); + writer.Write(i); + } - var e = Assert.Throws(() => writer.Complete()); + writer.Complete(); + }); Assert.That(e.InnerException, Is.TypeOf()); } } @@ -650,8 +650,9 @@ public async Task Read_NullAsValue_ThrowsInvalidCastException() public async Task ErrorDuringImport() { using (var conn = await OpenConnectionAsync()) + using (var conn2 = await OpenConnectionAsync()) { - await using var _ = await CreateTempTable(conn, "foo INT, CONSTRAINT uq UNIQUE(foo)", out var table); + await using var _ = await CreateTempTable(conn2, "foo INT, CONSTRAINT uq UNIQUE(foo)", out var table); var writer = conn.BeginBinaryImport($"COPY {table} (foo) FROM STDIN BINARY"); writer.StartRow(); @@ -661,7 +662,6 @@ public async Task ErrorDuringImport() Assert.That(() => writer.Complete(), Throws.Exception .TypeOf() .With.Property(nameof(PostgresException.SqlState)).EqualTo("23505")); - Assert.That(await conn.ExecuteScalarAsync("SELECT 1"), Is.EqualTo(1)); } } From 64f6c1611240252fab509745ea45f7b7d30ba2c4 Mon Sep 17 00:00:00 2001 From: Nikita Kazmin Date: Wed, 28 Oct 2020 22:16:07 +0300 Subject: [PATCH 2/5] Reversed some test changes --- test/Npgsql.Tests/BugTests.cs | 3 +-- test/Npgsql.Tests/CopyTests.cs | 4 ++-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/test/Npgsql.Tests/BugTests.cs b/test/Npgsql.Tests/BugTests.cs index a571af1f59..0dd22ba8f1 100644 --- a/test/Npgsql.Tests/BugTests.cs +++ b/test/Npgsql.Tests/BugTests.cs @@ -1237,8 +1237,7 @@ public void NullReferenceExceptionInBeginTextExport() } finally { - using var conn2 = OpenConnection(); - conn2.ExecuteNonQuery("DROP FUNCTION IF EXISTS f_test()"); + conn.ExecuteNonQuery("DROP FUNCTION IF EXISTS f_test()"); } } diff --git a/test/Npgsql.Tests/CopyTests.cs b/test/Npgsql.Tests/CopyTests.cs index 9b1983b39a..7d7e6da16e 100644 --- a/test/Npgsql.Tests/CopyTests.cs +++ b/test/Npgsql.Tests/CopyTests.cs @@ -650,9 +650,8 @@ public async Task Read_NullAsValue_ThrowsInvalidCastException() public async Task ErrorDuringImport() { using (var conn = await OpenConnectionAsync()) - using (var conn2 = await OpenConnectionAsync()) { - await using var _ = await CreateTempTable(conn2, "foo INT, CONSTRAINT uq UNIQUE(foo)", out var table); + await using var _ = await CreateTempTable(conn, "foo INT, CONSTRAINT uq UNIQUE(foo)", out var table); var writer = conn.BeginBinaryImport($"COPY {table} (foo) FROM STDIN BINARY"); writer.StartRow(); @@ -662,6 +661,7 @@ public async Task ErrorDuringImport() Assert.That(() => writer.Complete(), Throws.Exception .TypeOf() .With.Property(nameof(PostgresException.SqlState)).EqualTo("23505")); + Assert.That(await conn.ExecuteScalarAsync("SELECT 1"), Is.EqualTo(1)); } } From 107e5e8002cc3ca0ddbcb14ed3ad38c5ec68e90a Mon Sep 17 00:00:00 2001 From: Nikita Kazmin Date: Wed, 28 Oct 2020 22:29:51 +0300 Subject: [PATCH 3/5] Added cleanup for the binary importer methods --- src/Npgsql/NpgsqlBinaryImporter.cs | 73 ++++++++++++++++++++---------- 1 file changed, 50 insertions(+), 23 deletions(-) diff --git a/src/Npgsql/NpgsqlBinaryImporter.cs b/src/Npgsql/NpgsqlBinaryImporter.cs index 52a59217d7..f7066c90af 100644 --- a/src/Npgsql/NpgsqlBinaryImporter.cs +++ b/src/Npgsql/NpgsqlBinaryImporter.cs @@ -130,11 +130,20 @@ async Task StartRow(bool async, CancellationToken cancellationToken = default) if (_column != -1 && _column != NumColumns) ThrowHelper.ThrowInvalidOperationException_BinaryImportParametersMismatch(NumColumns, _column); - if (_buf.WriteSpaceLeft < 2) - await _buf.Flush(async, cancellationToken); - _buf.WriteInt16(NumColumns); + try + { + if (_buf.WriteSpaceLeft < 2) + await _buf.Flush(async, cancellationToken); + _buf.WriteInt16(NumColumns); - _column = 0; + _column = 0; + } + catch + { + // An exception here will have already broken the connection etc. + Cleanup(); + throw; + } } /// @@ -298,25 +307,34 @@ async Task Write([AllowNull] T value, NpgsqlParameter param, bool async, Canc return; } - if (typeof(T) == typeof(object)) - { - param.Value = value; - } - else + try { - if (!(param is NpgsqlParameter typedParam)) + if (typeof(T) == typeof(object)) + { + param.Value = value; + } + else { - _params[_column] = typedParam = new NpgsqlParameter(); - typedParam.NpgsqlDbType = param.NpgsqlDbType; + if (!(param is NpgsqlParameter typedParam)) + { + _params[_column] = typedParam = new NpgsqlParameter(); + typedParam.NpgsqlDbType = param.NpgsqlDbType; + } + typedParam.TypedValue = value; } - typedParam.TypedValue = value; + param.ResolveHandler(_connector.TypeMapper); + param.ValidateAndGetLength(); + param.LengthCache?.Rewind(); + await param.WriteWithLength(_buf, async, cancellationToken); + param.LengthCache?.Clear(); + _column++; + } + catch + { + // An exception here will have already broken the connection etc. + Cleanup(); + throw; } - param.ResolveHandler(_connector.TypeMapper); - param.ValidateAndGetLength(); - param.LengthCache?.Rewind(); - await param.WriteWithLength(_buf, async, cancellationToken); - param.LengthCache?.Clear(); - _column++; } /// @@ -341,11 +359,20 @@ async Task WriteNull(bool async, CancellationToken cancellationToken = default) if (_column == -1) throw new InvalidOperationException("A row hasn't been started"); - if (_buf.WriteSpaceLeft < 4) - await _buf.Flush(async, cancellationToken); + try + { + if (_buf.WriteSpaceLeft < 4) + await _buf.Flush(async, cancellationToken); - _buf.WriteInt32(-1); - _column++; + _buf.WriteInt32(-1); + _column++; + } + catch + { + // An exception here will have already broken the connection etc. + Cleanup(); + throw; + } } /// From 3c832f4f103d185de1d75f615f2c9bffcaa8e1bf Mon Sep 17 00:00:00 2001 From: Nikita Kazmin Date: Wed, 28 Oct 2020 22:34:24 +0300 Subject: [PATCH 4/5] Timeout doesn't work with Assert.Ignore --- test/Npgsql.Tests/CopyTests.cs | 1 - 1 file changed, 1 deletion(-) diff --git a/test/Npgsql.Tests/CopyTests.cs b/test/Npgsql.Tests/CopyTests.cs index 7d7e6da16e..e7cf967db0 100644 --- a/test/Npgsql.Tests/CopyTests.cs +++ b/test/Npgsql.Tests/CopyTests.cs @@ -17,7 +17,6 @@ public class CopyTests : MultiplexingTestBase #region issue 2257 [Test, Description("Reproduce #2257")] - [Timeout(10000)] public async Task Issue2257() { if (IsMultiplexing) From 2abb6b888b8e276a9ffcbafe099d799b1c28ef4d Mon Sep 17 00:00:00 2001 From: Nikita Kazmin Date: Tue, 10 Nov 2020 09:04:11 +0300 Subject: [PATCH 5/5] Small nits --- src/Npgsql/NpgsqlBinaryExporter.cs | 8 ++--- src/Npgsql/NpgsqlBinaryImporter.cs | 8 ++--- src/Npgsql/NpgsqlRawCopyStream.cs | 6 +++- test/Npgsql.Tests/CopyTests.cs | 51 +++++++++++++----------------- 4 files changed, 35 insertions(+), 38 deletions(-) diff --git a/src/Npgsql/NpgsqlBinaryExporter.cs b/src/Npgsql/NpgsqlBinaryExporter.cs index f12da335ad..96f690c66b 100644 --- a/src/Npgsql/NpgsqlBinaryExporter.cs +++ b/src/Npgsql/NpgsqlBinaryExporter.cs @@ -37,15 +37,15 @@ public sealed class NpgsqlBinaryExporter : ICancelable, IAsyncDisposable static readonly NpgsqlLogger Log = NpgsqlLogManager.CreateLogger(nameof(NpgsqlBinaryExporter)); /// - /// Current timeout in ms + /// Current timeout /// - public int Timeout + public TimeSpan Timeout { set { - _buf.Timeout = TimeSpan.FromMilliseconds(value); + _buf.Timeout = value; // While calling Complete(), we're using the connector, which overwrites the buffer's timeout with it's own - _connector.UserTimeout = value; + _connector.UserTimeout = (int)value.TotalMilliseconds; } } diff --git a/src/Npgsql/NpgsqlBinaryImporter.cs b/src/Npgsql/NpgsqlBinaryImporter.cs index f7066c90af..0a3b753938 100644 --- a/src/Npgsql/NpgsqlBinaryImporter.cs +++ b/src/Npgsql/NpgsqlBinaryImporter.cs @@ -43,15 +43,15 @@ public sealed class NpgsqlBinaryImporter : ICancelable, IAsyncDisposable static readonly NpgsqlLogger Log = NpgsqlLogManager.CreateLogger(nameof(NpgsqlBinaryImporter)); /// - /// Current timeout in ms + /// Current timeout /// - public int Timeout + public TimeSpan Timeout { set { - _buf.Timeout = TimeSpan.FromMilliseconds(value); + _buf.Timeout = value; // While calling Complete(), we're using the connector, which overwrites the buffer's timeout with it's own - _connector.UserTimeout = value; + _connector.UserTimeout = (int)value.TotalMilliseconds; } } diff --git a/src/Npgsql/NpgsqlRawCopyStream.cs b/src/Npgsql/NpgsqlRawCopyStream.cs index 6382fae4c5..0bb9606b07 100644 --- a/src/Npgsql/NpgsqlRawCopyStream.cs +++ b/src/Npgsql/NpgsqlRawCopyStream.cs @@ -38,7 +38,11 @@ public sealed class NpgsqlRawCopyStream : Stream, ICancelable public override bool CanRead => _canRead; public override bool CanTimeout => true; - public override int WriteTimeout { get => (int) _writeBuf.Timeout.TotalMilliseconds; set => _writeBuf.Timeout = TimeSpan.FromMilliseconds(value); } + public override int WriteTimeout + { + get => (int) _writeBuf.Timeout.TotalMilliseconds; + set => _writeBuf.Timeout = TimeSpan.FromMilliseconds(value); + } public override int ReadTimeout { get => (int) _readBuf.Timeout.TotalMilliseconds; diff --git a/test/Npgsql.Tests/CopyTests.cs b/test/Npgsql.Tests/CopyTests.cs index e7cf967db0..cbd324c9d9 100644 --- a/test/Npgsql.Tests/CopyTests.cs +++ b/test/Npgsql.Tests/CopyTests.cs @@ -19,41 +19,34 @@ public class CopyTests : MultiplexingTestBase [Test, Description("Reproduce #2257")] public async Task Issue2257() { - if (IsMultiplexing) - Assert.Ignore("Multiplexing: fails"); + await using var conn = await OpenConnectionAsync(); + await using var _ = await GetTempTableName(conn, out var table1); + await using var __ = await GetTempTableName(conn, out var table2); - using (var conn = OpenConnection(new NpgsqlConnectionStringBuilder(ConnectionString) { CommandTimeout = 30 })) + const int rowCount = 1000000; + using (var cmd = conn.CreateCommand()) { - await using var _ = await GetTempTableName(conn, out var table1); - await using var __ = await GetTempTableName(conn, out var table2); + cmd.CommandText = $"CREATE TABLE {table1} AS SELECT * FROM generate_series(1, {rowCount}) id"; + await cmd.ExecuteNonQueryAsync(); + cmd.CommandText = $"ALTER TABLE {table1} ADD CONSTRAINT {table1}_pk PRIMARY KEY (id)"; + await cmd.ExecuteNonQueryAsync(); + cmd.CommandText = $"CREATE TABLE {table2} (master_id integer NOT NULL REFERENCES {table1} (id))"; + await cmd.ExecuteNonQueryAsync(); + } - const int rowCount = 1000000; - using (var cmd = conn.CreateCommand()) + await using var writer = conn.BeginBinaryImport($"COPY {table2} FROM STDIN BINARY"); + writer.Timeout = TimeSpan.FromMilliseconds(3); + var e = Assert.Throws(() => + { + for (var i = 1; i <= rowCount; ++i) { - cmd.CommandText = $"CREATE TABLE {table1} AS SELECT * FROM generate_series(1, {rowCount}) id"; - await cmd.ExecuteNonQueryAsync(); - cmd.CommandText = $"ALTER TABLE {table1} ADD CONSTRAINT {table1}_pk PRIMARY KEY (id)"; - await cmd.ExecuteNonQueryAsync(); - cmd.CommandText = $"CREATE TABLE {table2} (master_id integer NOT NULL REFERENCES {table1} (id))"; - await cmd.ExecuteNonQueryAsync(); + writer.StartRow(); + writer.Write(i); } - using (var writer = conn.BeginBinaryImport($"COPY {table2} FROM STDIN BINARY")) - { - writer.Timeout = 3; - var e = Assert.Throws(() => - { - for (var i = 1; i <= rowCount; ++i) - { - writer.StartRow(); - writer.Write(i); - } - - writer.Complete(); - }); - Assert.That(e.InnerException, Is.TypeOf()); - } - } + writer.Complete(); + }); + Assert.That(e.InnerException, Is.TypeOf()); } #endregion