Skip to content

Commit 9fda64a

Browse files
authored
Added timeouts for the COPY operations
Closes #1328
1 parent 0537fd7 commit 9fda64a

4 files changed

Lines changed: 115 additions & 53 deletions

File tree

src/Npgsql/NpgsqlBinaryExporter.cs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,19 @@ public sealed class NpgsqlBinaryExporter : ICancelable, IAsyncDisposable
3636
readonly NpgsqlTypeHandler?[] _typeHandlerCache;
3737
static readonly NpgsqlLogger Log = NpgsqlLogManager.CreateLogger(nameof(NpgsqlBinaryExporter));
3838

39+
/// <summary>
40+
/// Current timeout
41+
/// </summary>
42+
public TimeSpan Timeout
43+
{
44+
set
45+
{
46+
_buf.Timeout = value;
47+
// While calling Complete(), we're using the connector, which overwrites the buffer's timeout with it's own
48+
_connector.UserTimeout = (int)value.TotalMilliseconds;
49+
}
50+
}
51+
3952
#endregion
4053

4154
#region Construction / Initialization

src/Npgsql/NpgsqlBinaryImporter.cs

Lines changed: 63 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,19 @@ public sealed class NpgsqlBinaryImporter : ICancelable, IAsyncDisposable
4242

4343
static readonly NpgsqlLogger Log = NpgsqlLogManager.CreateLogger(nameof(NpgsqlBinaryImporter));
4444

45+
/// <summary>
46+
/// Current timeout
47+
/// </summary>
48+
public TimeSpan Timeout
49+
{
50+
set
51+
{
52+
_buf.Timeout = value;
53+
// While calling Complete(), we're using the connector, which overwrites the buffer's timeout with it's own
54+
_connector.UserTimeout = (int)value.TotalMilliseconds;
55+
}
56+
}
57+
4558
#endregion
4659

4760
#region Construction / Initialization
@@ -117,11 +130,20 @@ async Task StartRow(bool async, CancellationToken cancellationToken = default)
117130
if (_column != -1 && _column != NumColumns)
118131
ThrowHelper.ThrowInvalidOperationException_BinaryImportParametersMismatch(NumColumns, _column);
119132

120-
if (_buf.WriteSpaceLeft < 2)
121-
await _buf.Flush(async, cancellationToken);
122-
_buf.WriteInt16(NumColumns);
133+
try
134+
{
135+
if (_buf.WriteSpaceLeft < 2)
136+
await _buf.Flush(async, cancellationToken);
137+
_buf.WriteInt16(NumColumns);
123138

124-
_column = 0;
139+
_column = 0;
140+
}
141+
catch
142+
{
143+
// An exception here will have already broken the connection etc.
144+
Cleanup();
145+
throw;
146+
}
125147
}
126148

127149
/// <summary>
@@ -285,25 +307,34 @@ async Task Write<T>([AllowNull] T value, NpgsqlParameter param, bool async, Canc
285307
return;
286308
}
287309

288-
if (typeof(T) == typeof(object))
289-
{
290-
param.Value = value;
291-
}
292-
else
310+
try
293311
{
294-
if (!(param is NpgsqlParameter<T> typedParam))
312+
if (typeof(T) == typeof(object))
295313
{
296-
_params[_column] = typedParam = new NpgsqlParameter<T>();
297-
typedParam.NpgsqlDbType = param.NpgsqlDbType;
314+
param.Value = value;
298315
}
299-
typedParam.TypedValue = value;
316+
else
317+
{
318+
if (!(param is NpgsqlParameter<T> typedParam))
319+
{
320+
_params[_column] = typedParam = new NpgsqlParameter<T>();
321+
typedParam.NpgsqlDbType = param.NpgsqlDbType;
322+
}
323+
typedParam.TypedValue = value;
324+
}
325+
param.ResolveHandler(_connector.TypeMapper);
326+
param.ValidateAndGetLength();
327+
param.LengthCache?.Rewind();
328+
await param.WriteWithLength(_buf, async, cancellationToken);
329+
param.LengthCache?.Clear();
330+
_column++;
331+
}
332+
catch
333+
{
334+
// An exception here will have already broken the connection etc.
335+
Cleanup();
336+
throw;
300337
}
301-
param.ResolveHandler(_connector.TypeMapper);
302-
param.ValidateAndGetLength();
303-
param.LengthCache?.Rewind();
304-
await param.WriteWithLength(_buf, async, cancellationToken);
305-
param.LengthCache?.Clear();
306-
_column++;
307338
}
308339

309340
/// <summary>
@@ -328,11 +359,20 @@ async Task WriteNull(bool async, CancellationToken cancellationToken = default)
328359
if (_column == -1)
329360
throw new InvalidOperationException("A row hasn't been started");
330361

331-
if (_buf.WriteSpaceLeft < 4)
332-
await _buf.Flush(async, cancellationToken);
362+
try
363+
{
364+
if (_buf.WriteSpaceLeft < 4)
365+
await _buf.Flush(async, cancellationToken);
333366

334-
_buf.WriteInt32(-1);
335-
_column++;
367+
_buf.WriteInt32(-1);
368+
_column++;
369+
}
370+
catch
371+
{
372+
// An exception here will have already broken the connection etc.
373+
Cleanup();
374+
throw;
375+
}
336376
}
337377

338378
/// <summary>

src/Npgsql/NpgsqlRawCopyStream.cs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,23 @@ public sealed class NpgsqlRawCopyStream : Stream, ICancelable
3737
public override bool CanWrite => _canWrite;
3838
public override bool CanRead => _canRead;
3939

40+
public override bool CanTimeout => true;
41+
public override int WriteTimeout
42+
{
43+
get => (int) _writeBuf.Timeout.TotalMilliseconds;
44+
set => _writeBuf.Timeout = TimeSpan.FromMilliseconds(value);
45+
}
46+
public override int ReadTimeout
47+
{
48+
get => (int) _readBuf.Timeout.TotalMilliseconds;
49+
set
50+
{
51+
_readBuf.Timeout = TimeSpan.FromMilliseconds(value);
52+
// While calling the connector it will overwrite our read buffer timeout
53+
_connector.UserTimeout = value;
54+
}
55+
}
56+
4057
/// <summary>
4158
/// The copy binary format header signature
4259
/// </summary>

test/Npgsql.Tests/CopyTests.cs

Lines changed: 22 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -19,42 +19,34 @@ public class CopyTests : MultiplexingTestBase
1919
[Test, Description("Reproduce #2257")]
2020
public async Task Issue2257()
2121
{
22-
if (IsMultiplexing)
23-
Assert.Ignore("Multiplexing: fails");
22+
await using var conn = await OpenConnectionAsync();
23+
await using var _ = await GetTempTableName(conn, out var table1);
24+
await using var __ = await GetTempTableName(conn, out var table2);
2425

25-
using (var conn = OpenConnection(new NpgsqlConnectionStringBuilder(ConnectionString) { CommandTimeout = 3 }))
26+
const int rowCount = 1000000;
27+
using (var cmd = conn.CreateCommand())
2628
{
27-
await using var _ = await GetTempTableName(conn, out var table1);
28-
await using var __ = await GetTempTableName(conn, out var table2);
29+
cmd.CommandText = $"CREATE TABLE {table1} AS SELECT * FROM generate_series(1, {rowCount}) id";
30+
await cmd.ExecuteNonQueryAsync();
31+
cmd.CommandText = $"ALTER TABLE {table1} ADD CONSTRAINT {table1}_pk PRIMARY KEY (id)";
32+
await cmd.ExecuteNonQueryAsync();
33+
cmd.CommandText = $"CREATE TABLE {table2} (master_id integer NOT NULL REFERENCES {table1} (id))";
34+
await cmd.ExecuteNonQueryAsync();
35+
}
2936

30-
const int rowCount = 1000000;
31-
using (var cmd = conn.CreateCommand())
37+
await using var writer = conn.BeginBinaryImport($"COPY {table2} FROM STDIN BINARY");
38+
writer.Timeout = TimeSpan.FromMilliseconds(3);
39+
var e = Assert.Throws<NpgsqlException>(() =>
40+
{
41+
for (var i = 1; i <= rowCount; ++i)
3242
{
33-
cmd.CommandText = $"CREATE TABLE {table1} AS SELECT * FROM generate_series(1, {rowCount}) id";
34-
// Creating table can take some time, so we set quite large timeout
35-
cmd.CommandTimeout = 30;
36-
await cmd.ExecuteNonQueryAsync();
37-
cmd.CommandText = $"ALTER TABLE {table1} ADD CONSTRAINT {table1}_pk PRIMARY KEY (id)";
38-
await cmd.ExecuteNonQueryAsync();
39-
cmd.CommandText = $"CREATE TABLE {table2} (master_id integer NOT NULL REFERENCES {table1} (id))";
40-
// We need to fail with timeout while calling writer.Complete() and conn.BeginBinaryImport reuses timeout from previous command
41-
// so we set default timeout here
42-
cmd.CommandTimeout = 3;
43-
await cmd.ExecuteNonQueryAsync();
43+
writer.StartRow();
44+
writer.Write(i);
4445
}
4546

46-
using (var writer = conn.BeginBinaryImport($"COPY {table2} FROM STDIN BINARY"))
47-
{
48-
for (var i = 1; i <= rowCount; ++i)
49-
{
50-
writer.StartRow();
51-
writer.Write(i);
52-
}
53-
54-
var e = Assert.Throws<NpgsqlException>(() => writer.Complete());
55-
Assert.That(e.InnerException, Is.TypeOf<TimeoutException>());
56-
}
57-
}
47+
writer.Complete();
48+
});
49+
Assert.That(e.InnerException, Is.TypeOf<TimeoutException>());
5850
}
5951

6052
#endregion

0 commit comments

Comments
 (0)