@@ -42,6 +42,19 @@ public sealed class NpgsqlBinaryImporter : ICancelable, IAsyncDisposable
4242
4343 static readonly NpgsqlLogger Log = NpgsqlLogManager . CreateLogger ( nameof ( NpgsqlBinaryImporter ) ) ;
4444
45+ /// <summary>
46+ /// Current timeout
47+ /// </summary>
48+ public TimeSpan Timeout
49+ {
50+ set
51+ {
52+ _buf . Timeout = value ;
53+ // While calling Complete(), we're using the connector, which overwrites the buffer's timeout with it's own
54+ _connector . UserTimeout = ( int ) value . TotalMilliseconds ;
55+ }
56+ }
57+
4558 #endregion
4659
4760 #region Construction / Initialization
@@ -117,11 +130,20 @@ async Task StartRow(bool async, CancellationToken cancellationToken = default)
117130 if ( _column != - 1 && _column != NumColumns )
118131 ThrowHelper . ThrowInvalidOperationException_BinaryImportParametersMismatch ( NumColumns , _column ) ;
119132
120- if ( _buf . WriteSpaceLeft < 2 )
121- await _buf . Flush ( async , cancellationToken ) ;
122- _buf . WriteInt16 ( NumColumns ) ;
133+ try
134+ {
135+ if ( _buf . WriteSpaceLeft < 2 )
136+ await _buf . Flush ( async , cancellationToken ) ;
137+ _buf . WriteInt16 ( NumColumns ) ;
123138
124- _column = 0 ;
139+ _column = 0 ;
140+ }
141+ catch
142+ {
143+ // An exception here will have already broken the connection etc.
144+ Cleanup ( ) ;
145+ throw ;
146+ }
125147 }
126148
127149 /// <summary>
@@ -285,25 +307,34 @@ async Task Write<T>([AllowNull] T value, NpgsqlParameter param, bool async, Canc
285307 return ;
286308 }
287309
288- if ( typeof ( T ) == typeof ( object ) )
289- {
290- param . Value = value ;
291- }
292- else
310+ try
293311 {
294- if ( ! ( param is NpgsqlParameter < T > typedParam ) )
312+ if ( typeof ( T ) == typeof ( object ) )
295313 {
296- _params [ _column ] = typedParam = new NpgsqlParameter < T > ( ) ;
297- typedParam . NpgsqlDbType = param . NpgsqlDbType ;
314+ param . Value = value ;
298315 }
299- typedParam . TypedValue = value ;
316+ else
317+ {
318+ if ( ! ( param is NpgsqlParameter < T > typedParam ) )
319+ {
320+ _params [ _column ] = typedParam = new NpgsqlParameter < T > ( ) ;
321+ typedParam . NpgsqlDbType = param . NpgsqlDbType ;
322+ }
323+ typedParam . TypedValue = value ;
324+ }
325+ param . ResolveHandler ( _connector . TypeMapper ) ;
326+ param . ValidateAndGetLength ( ) ;
327+ param . LengthCache ? . Rewind ( ) ;
328+ await param . WriteWithLength ( _buf , async , cancellationToken ) ;
329+ param . LengthCache ? . Clear ( ) ;
330+ _column ++ ;
331+ }
332+ catch
333+ {
334+ // An exception here will have already broken the connection etc.
335+ Cleanup ( ) ;
336+ throw ;
300337 }
301- param . ResolveHandler ( _connector . TypeMapper ) ;
302- param . ValidateAndGetLength ( ) ;
303- param . LengthCache ? . Rewind ( ) ;
304- await param . WriteWithLength ( _buf , async , cancellationToken ) ;
305- param . LengthCache ? . Clear ( ) ;
306- _column ++ ;
307338 }
308339
309340 /// <summary>
@@ -328,11 +359,20 @@ async Task WriteNull(bool async, CancellationToken cancellationToken = default)
328359 if ( _column == - 1 )
329360 throw new InvalidOperationException ( "A row hasn't been started" ) ;
330361
331- if ( _buf . WriteSpaceLeft < 4 )
332- await _buf . Flush ( async , cancellationToken ) ;
362+ try
363+ {
364+ if ( _buf . WriteSpaceLeft < 4 )
365+ await _buf . Flush ( async , cancellationToken ) ;
333366
334- _buf . WriteInt32 ( - 1 ) ;
335- _column ++ ;
367+ _buf . WriteInt32 ( - 1 ) ;
368+ _column ++ ;
369+ }
370+ catch
371+ {
372+ // An exception here will have already broken the connection etc.
373+ Cleanup ( ) ;
374+ throw ;
375+ }
336376 }
337377
338378 /// <summary>
0 commit comments