Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions src/Npgsql/NpgsqlBinaryExporter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,19 @@ public sealed class NpgsqlBinaryExporter : ICancelable, IAsyncDisposable
readonly NpgsqlTypeHandler?[] _typeHandlerCache;
static readonly NpgsqlLogger Log = NpgsqlLogManager.CreateLogger(nameof(NpgsqlBinaryExporter));

/// <summary>
/// Current timeout
/// </summary>
public TimeSpan Timeout
{
set
{
_buf.Timeout = value;
// While calling Complete(), we're using the connector, which overwrites the buffer's timeout with it's own
_connector.UserTimeout = (int)value.TotalMilliseconds;
}
}

#endregion

#region Construction / Initialization
Expand Down
86 changes: 63 additions & 23 deletions src/Npgsql/NpgsqlBinaryImporter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,19 @@ public sealed class NpgsqlBinaryImporter : ICancelable, IAsyncDisposable

static readonly NpgsqlLogger Log = NpgsqlLogManager.CreateLogger(nameof(NpgsqlBinaryImporter));

/// <summary>
/// Current timeout
/// </summary>
public TimeSpan Timeout
{
set
{
_buf.Timeout = value;
// While calling Complete(), we're using the connector, which overwrites the buffer's timeout with it's own
_connector.UserTimeout = (int)value.TotalMilliseconds;
}
}

#endregion

#region Construction / Initialization
Expand Down Expand Up @@ -117,11 +130,20 @@ async Task StartRow(bool async, CancellationToken cancellationToken = default)
if (_column != -1 && _column != NumColumns)
ThrowHelper.ThrowInvalidOperationException_BinaryImportParametersMismatch(NumColumns, _column);

if (_buf.WriteSpaceLeft < 2)
await _buf.Flush(async, cancellationToken);
_buf.WriteInt16(NumColumns);
try
{
if (_buf.WriteSpaceLeft < 2)
await _buf.Flush(async, cancellationToken);
_buf.WriteInt16(NumColumns);

_column = 0;
_column = 0;
}
catch
{
// An exception here will have already broken the connection etc.
Cleanup();
throw;
}
}

/// <summary>
Expand Down Expand Up @@ -285,25 +307,34 @@ async Task Write<T>([AllowNull] T value, NpgsqlParameter param, bool async, Canc
return;
}

if (typeof(T) == typeof(object))
{
param.Value = value;
}
else
try
{
if (!(param is NpgsqlParameter<T> typedParam))
if (typeof(T) == typeof(object))
{
_params[_column] = typedParam = new NpgsqlParameter<T>();
typedParam.NpgsqlDbType = param.NpgsqlDbType;
param.Value = value;
}
typedParam.TypedValue = value;
else
{
if (!(param is NpgsqlParameter<T> typedParam))
{
_params[_column] = typedParam = new NpgsqlParameter<T>();
typedParam.NpgsqlDbType = param.NpgsqlDbType;
}
typedParam.TypedValue = value;
}
param.ResolveHandler(_connector.TypeMapper);
param.ValidateAndGetLength();
param.LengthCache?.Rewind();
await param.WriteWithLength(_buf, async, cancellationToken);
param.LengthCache?.Clear();
_column++;
}
catch
{
// An exception here will have already broken the connection etc.
Cleanup();
throw;
}
param.ResolveHandler(_connector.TypeMapper);
param.ValidateAndGetLength();
param.LengthCache?.Rewind();
await param.WriteWithLength(_buf, async, cancellationToken);
param.LengthCache?.Clear();
_column++;
}

/// <summary>
Expand All @@ -328,11 +359,20 @@ async Task WriteNull(bool async, CancellationToken cancellationToken = default)
if (_column == -1)
throw new InvalidOperationException("A row hasn't been started");

if (_buf.WriteSpaceLeft < 4)
await _buf.Flush(async, cancellationToken);
try
{
if (_buf.WriteSpaceLeft < 4)
await _buf.Flush(async, cancellationToken);

_buf.WriteInt32(-1);
_column++;
_buf.WriteInt32(-1);
_column++;
}
catch
{
// An exception here will have already broken the connection etc.
Cleanup();
throw;
}
}

/// <summary>
Expand Down
17 changes: 17 additions & 0 deletions src/Npgsql/NpgsqlRawCopyStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,23 @@ public sealed class NpgsqlRawCopyStream : Stream, ICancelable
public override bool CanWrite => _canWrite;
public override bool CanRead => _canRead;

public override bool CanTimeout => true;
public override int WriteTimeout
{
get => (int) _writeBuf.Timeout.TotalMilliseconds;
set => _writeBuf.Timeout = TimeSpan.FromMilliseconds(value);
}
public override int ReadTimeout
{
get => (int) _readBuf.Timeout.TotalMilliseconds;
set
{
_readBuf.Timeout = TimeSpan.FromMilliseconds(value);
// While calling the connector it will overwrite our read buffer timeout
_connector.UserTimeout = value;
}
}

/// <summary>
/// The copy binary format header signature
/// </summary>
Expand Down
52 changes: 22 additions & 30 deletions test/Npgsql.Tests/CopyTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,42 +19,34 @@ public class CopyTests : MultiplexingTestBase
[Test, Description("Reproduce #2257")]
public async Task Issue2257()
{
if (IsMultiplexing)
Assert.Ignore("Multiplexing: fails");
await using var conn = await OpenConnectionAsync();
await using var _ = await GetTempTableName(conn, out var table1);
await using var __ = await GetTempTableName(conn, out var table2);

using (var conn = OpenConnection(new NpgsqlConnectionStringBuilder(ConnectionString) { CommandTimeout = 3 }))
const int rowCount = 1000000;
using (var cmd = conn.CreateCommand())
{
await using var _ = await GetTempTableName(conn, out var table1);
await using var __ = await GetTempTableName(conn, out var table2);
cmd.CommandText = $"CREATE TABLE {table1} AS SELECT * FROM generate_series(1, {rowCount}) id";
await cmd.ExecuteNonQueryAsync();
cmd.CommandText = $"ALTER TABLE {table1} ADD CONSTRAINT {table1}_pk PRIMARY KEY (id)";
await cmd.ExecuteNonQueryAsync();
cmd.CommandText = $"CREATE TABLE {table2} (master_id integer NOT NULL REFERENCES {table1} (id))";
await cmd.ExecuteNonQueryAsync();
}

const int rowCount = 1000000;
using (var cmd = conn.CreateCommand())
await using var writer = conn.BeginBinaryImport($"COPY {table2} FROM STDIN BINARY");
writer.Timeout = TimeSpan.FromMilliseconds(3);
var e = Assert.Throws<NpgsqlException>(() =>
{
for (var i = 1; i <= rowCount; ++i)
{
cmd.CommandText = $"CREATE TABLE {table1} AS SELECT * FROM generate_series(1, {rowCount}) id";
// Creating table can take some time, so we set quite large timeout
cmd.CommandTimeout = 30;
await cmd.ExecuteNonQueryAsync();
cmd.CommandText = $"ALTER TABLE {table1} ADD CONSTRAINT {table1}_pk PRIMARY KEY (id)";
await cmd.ExecuteNonQueryAsync();
cmd.CommandText = $"CREATE TABLE {table2} (master_id integer NOT NULL REFERENCES {table1} (id))";
// We need to fail with timeout while calling writer.Complete() and conn.BeginBinaryImport reuses timeout from previous command
// so we set default timeout here
cmd.CommandTimeout = 3;
await cmd.ExecuteNonQueryAsync();
writer.StartRow();
writer.Write(i);
}

using (var writer = conn.BeginBinaryImport($"COPY {table2} FROM STDIN BINARY"))
{
for (var i = 1; i <= rowCount; ++i)
{
writer.StartRow();
writer.Write(i);
}

var e = Assert.Throws<NpgsqlException>(() => writer.Complete());
Assert.That(e.InnerException, Is.TypeOf<TimeoutException>());
}
}
writer.Complete();
});
Assert.That(e.InnerException, Is.TypeOf<TimeoutException>());
}

#endregion
Expand Down