Skip to content

Commit 302af43

Browse files
authored
Add Timeout property to text COPY operations (#6294)
Closes #5758
1 parent 70ad294 commit 302af43

File tree

5 files changed

+48
-18
lines changed

5 files changed

+48
-18
lines changed

src/Npgsql/Internal/NpgsqlConnector.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2833,11 +2833,11 @@ UserAction DoStartUserAction(ConnectorState newState, NpgsqlCommand? command,
28332833

28342834
StartCancellableOperation(cancellationToken, attemptPgCancellation);
28352835

2836-
// We reset the ReadBuffer.Timeout for every user action, so it wouldn't leak from the previous query or action
2836+
// We reset the ReadBuffer.Timeout and WriteBuffer.Timeout for every user action, so it wouldn't leak from the previous query or action
28372837
// For example, we might have successfully cancelled the previous query (so the connection is not broken)
28382838
// But the next time, we call the Prepare, which doesn't set its own timeout
28392839
var timeoutSeconds = command?.CommandTimeout ?? Settings.CommandTimeout;
2840-
ReadBuffer.Timeout = timeoutSeconds > 0 ? TimeSpan.FromSeconds(timeoutSeconds) : Timeout.InfiniteTimeSpan;
2840+
ReadBuffer.Timeout = WriteBuffer.Timeout = timeoutSeconds > 0 ? TimeSpan.FromSeconds(timeoutSeconds) : Timeout.InfiniteTimeSpan;
28412841

28422842
return new UserAction(this);
28432843
}

src/Npgsql/NpgsqlCommand.cs

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1041,9 +1041,6 @@ static void ValidateParameterCount(NpgsqlBatchCommand batchCommand)
10411041

10421042
#region Message Creation / Population
10431043

1044-
void BeginSend(NpgsqlConnector connector)
1045-
=> connector.WriteBuffer.Timeout = TimeSpan.FromSeconds(CommandTimeout);
1046-
10471044
internal Task Write(NpgsqlConnector connector, bool async, bool flush, CancellationToken cancellationToken = default)
10481045
{
10491046
return (_behavior & CommandBehavior.SchemaOnly) == 0
@@ -1158,8 +1155,6 @@ await connector.WriteParse(batchCommand.FinalCommandText, batchCommand.Statement
11581155

11591156
async Task SendDeriveParameters(NpgsqlConnector connector, bool async, CancellationToken cancellationToken = default)
11601157
{
1161-
BeginSend(connector);
1162-
11631158
var syncCaller = !async;
11641159
for (var i = 0; i < InternalBatchCommands.Count; i++)
11651160
{
@@ -1178,8 +1173,6 @@ async Task SendDeriveParameters(NpgsqlConnector connector, bool async, Cancellat
11781173

11791174
async Task SendPrepare(NpgsqlConnector connector, bool async, CancellationToken cancellationToken = default)
11801175
{
1181-
BeginSend(connector);
1182-
11831176
var syncCaller = !async;
11841177
for (var i = 0; i < InternalBatchCommands.Count; i++)
11851178
{
@@ -1227,8 +1220,6 @@ bool ShouldSchedule(ref bool async, int indexOfStatementInBatch)
12271220

12281221
async Task SendClose(NpgsqlConnector connector, bool async, CancellationToken cancellationToken = default)
12291222
{
1230-
BeginSend(connector);
1231-
12321223
foreach (var batchCommand in InternalBatchCommands)
12331224
{
12341225
if (!batchCommand.IsPrepared)
@@ -1531,7 +1522,6 @@ internal virtual async ValueTask<NpgsqlDataReader> ExecuteReader(bool async, Com
15311522
// Instead, all sends for non-first statements are performed asynchronously (even if the user requested sync),
15321523
// in a special synchronization context to prevents a dependency on the thread pool (which would also trigger
15331524
// deadlocks).
1534-
BeginSend(connector);
15351525
sendTask = Write(connector, async, flush: true, CancellationToken.None);
15361526

15371527
// The following is a hack. It raises an exception if one was thrown in the first phases

src/Npgsql/NpgsqlConnection.cs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1241,7 +1241,7 @@ async Task<NpgsqlBinaryExporter> BeginBinaryExport(bool async, string copyToComm
12411241
/// <remarks>
12421242
/// See https://www.postgresql.org/docs/current/static/sql-copy.html.
12431243
/// </remarks>
1244-
public TextWriter BeginTextImport(string copyFromCommand)
1244+
public NpgsqlCopyTextWriter BeginTextImport(string copyFromCommand)
12451245
=> BeginTextImport(async: false, copyFromCommand, CancellationToken.None).GetAwaiter().GetResult();
12461246

12471247
/// <summary>
@@ -1256,10 +1256,10 @@ public TextWriter BeginTextImport(string copyFromCommand)
12561256
/// <remarks>
12571257
/// See https://www.postgresql.org/docs/current/static/sql-copy.html.
12581258
/// </remarks>
1259-
public Task<TextWriter> BeginTextImportAsync(string copyFromCommand, CancellationToken cancellationToken = default)
1259+
public Task<NpgsqlCopyTextWriter> BeginTextImportAsync(string copyFromCommand, CancellationToken cancellationToken = default)
12601260
=> BeginTextImport(async: true, copyFromCommand, cancellationToken);
12611261

1262-
async Task<TextWriter> BeginTextImport(bool async, string copyFromCommand, CancellationToken cancellationToken = default)
1262+
async Task<NpgsqlCopyTextWriter> BeginTextImport(bool async, string copyFromCommand, CancellationToken cancellationToken = default)
12631263
{
12641264
ArgumentNullException.ThrowIfNull(copyFromCommand);
12651265
if (!IsValidCopyCommand(copyFromCommand))
@@ -1307,7 +1307,7 @@ async Task<TextWriter> BeginTextImport(bool async, string copyFromCommand, Cance
13071307
/// <remarks>
13081308
/// See https://www.postgresql.org/docs/current/static/sql-copy.html.
13091309
/// </remarks>
1310-
public TextReader BeginTextExport(string copyToCommand)
1310+
public NpgsqlCopyTextReader BeginTextExport(string copyToCommand)
13111311
=> BeginTextExport(async: false, copyToCommand, CancellationToken.None).GetAwaiter().GetResult();
13121312

13131313
/// <summary>
@@ -1322,10 +1322,10 @@ public TextReader BeginTextExport(string copyToCommand)
13221322
/// <remarks>
13231323
/// See https://www.postgresql.org/docs/current/static/sql-copy.html.
13241324
/// </remarks>
1325-
public Task<TextReader> BeginTextExportAsync(string copyToCommand, CancellationToken cancellationToken = default)
1325+
public Task<NpgsqlCopyTextReader> BeginTextExportAsync(string copyToCommand, CancellationToken cancellationToken = default)
13261326
=> BeginTextExport(async: true, copyToCommand, cancellationToken);
13271327

1328-
async Task<TextReader> BeginTextExport(bool async, string copyToCommand, CancellationToken cancellationToken = default)
1328+
async Task<NpgsqlCopyTextReader> BeginTextExport(bool async, string copyToCommand, CancellationToken cancellationToken = default)
13291329
{
13301330
ArgumentNullException.ThrowIfNull(copyToCommand);
13311331
if (!IsValidCopyCommand(copyToCommand))

src/Npgsql/NpgsqlRawCopyStream.cs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -485,6 +485,20 @@ internal NpgsqlCopyTextWriter(NpgsqlConnector connector, NpgsqlRawCopyStream und
485485
throw connector.Break(new Exception("Can't use a binary copy stream for text writing"));
486486
}
487487

488+
/// <summary>
489+
/// Gets or sets a value, in milliseconds, that determines how long the text writer will attempt to write before timing out.
490+
/// </summary>
491+
public int Timeout
492+
{
493+
get => ((NpgsqlRawCopyStream)BaseStream).WriteTimeout;
494+
set
495+
{
496+
var stream = (NpgsqlRawCopyStream)BaseStream;
497+
stream.ReadTimeout = value;
498+
stream.WriteTimeout = value;
499+
}
500+
}
501+
488502
/// <summary>
489503
/// Cancels and terminates an ongoing import. Any data already written will be discarded.
490504
/// </summary>
@@ -511,6 +525,20 @@ internal NpgsqlCopyTextReader(NpgsqlConnector connector, NpgsqlRawCopyStream und
511525
throw connector.Break(new Exception("Can't use a binary copy stream for text reading"));
512526
}
513527

528+
/// <summary>
529+
/// Gets or sets a value, in milliseconds, that determines how long the text reader will attempt to read before timing out.
530+
/// </summary>
531+
public int Timeout
532+
{
533+
get => ((NpgsqlRawCopyStream)BaseStream).ReadTimeout;
534+
set
535+
{
536+
var stream = (NpgsqlRawCopyStream)BaseStream;
537+
stream.ReadTimeout = value;
538+
stream.WriteTimeout = value;
539+
}
540+
}
541+
514542
/// <summary>
515543
/// Cancels and terminates an ongoing export.
516544
/// </summary>

src/Npgsql/PublicAPI.Unshipped.txt

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@ Npgsql.GssEncryptionMode.Disable = 0 -> Npgsql.GssEncryptionMode
55
Npgsql.GssEncryptionMode.Prefer = 1 -> Npgsql.GssEncryptionMode
66
Npgsql.GssEncryptionMode.Require = 2 -> Npgsql.GssEncryptionMode
77
Npgsql.TypeMapping.INpgsqlTypeMapper.AddDbTypeResolverFactory(Npgsql.Internal.DbTypeResolverFactory! factory) -> void
8+
Npgsql.NpgsqlConnection.BeginTextExport(string! copyToCommand) -> Npgsql.NpgsqlCopyTextReader!
9+
Npgsql.NpgsqlConnection.BeginTextExportAsync(string! copyToCommand, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<Npgsql.NpgsqlCopyTextReader!>!
10+
Npgsql.NpgsqlConnection.BeginTextImport(string! copyFromCommand) -> Npgsql.NpgsqlCopyTextWriter!
11+
Npgsql.NpgsqlConnection.BeginTextImportAsync(string! copyFromCommand, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<Npgsql.NpgsqlCopyTextWriter!>!
812
Npgsql.NpgsqlConnection.CloneWithAsync(string! connectionString, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask<Npgsql.NpgsqlConnection!>
913
Npgsql.NpgsqlConnection.SslClientAuthenticationOptionsCallback.get -> System.Action<System.Net.Security.SslClientAuthenticationOptions!>?
1014
Npgsql.NpgsqlConnection.SslClientAuthenticationOptionsCallback.set -> void
@@ -16,6 +20,10 @@ Npgsql.NpgsqlConnectionStringBuilder.RequireAuth.get -> string?
1620
Npgsql.NpgsqlConnectionStringBuilder.RequireAuth.set -> void
1721
Npgsql.NpgsqlConnectionStringBuilder.SslNegotiation.get -> Npgsql.SslNegotiation
1822
Npgsql.NpgsqlConnectionStringBuilder.SslNegotiation.set -> void
23+
Npgsql.NpgsqlCopyTextReader.Timeout.get -> int
24+
Npgsql.NpgsqlCopyTextReader.Timeout.set -> void
25+
Npgsql.NpgsqlCopyTextWriter.Timeout.get -> int
26+
Npgsql.NpgsqlCopyTextWriter.Timeout.set -> void
1927
Npgsql.NpgsqlDataSourceBuilder.ConfigureTypeLoading(System.Action<Npgsql.NpgsqlTypeLoadingOptionsBuilder!>! configureAction) -> Npgsql.NpgsqlDataSourceBuilder!
2028
Npgsql.NpgsqlDataSourceBuilder.MapComposite(System.Type! clrType, string? pgName = null, Npgsql.INpgsqlNameTranslator? nameTranslator = null) -> Npgsql.NpgsqlDataSourceBuilder!
2129
Npgsql.NpgsqlDataSourceBuilder.MapComposite<T>(string? pgName = null, Npgsql.INpgsqlNameTranslator? nameTranslator = null) -> Npgsql.NpgsqlDataSourceBuilder!
@@ -123,3 +131,7 @@ NpgsqlTypes.NpgsqlLine.Deconstruct(out double a, out double b, out double c) ->
123131
NpgsqlTypes.NpgsqlLSeg.Deconstruct(out NpgsqlTypes.NpgsqlPoint start, out NpgsqlTypes.NpgsqlPoint end) -> void
124132
NpgsqlTypes.NpgsqlPoint.Deconstruct(out double x, out double y) -> void
125133
NpgsqlTypes.NpgsqlTid.Deconstruct(out uint blockNumber, out ushort offsetNumber) -> void
134+
*REMOVED*Npgsql.NpgsqlConnection.BeginTextExport(string! copyToCommand) -> System.IO.TextReader!
135+
*REMOVED*Npgsql.NpgsqlConnection.BeginTextExportAsync(string! copyToCommand, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<System.IO.TextReader!>!
136+
*REMOVED*Npgsql.NpgsqlConnection.BeginTextImport(string! copyFromCommand) -> System.IO.TextWriter!
137+
*REMOVED*Npgsql.NpgsqlConnection.BeginTextImportAsync(string! copyFromCommand, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<System.IO.TextWriter!>!

0 commit comments

Comments
 (0)