#region License
// The PostgreSQL License
//
// Copyright (C) 2018 The Npgsql Development Team
//
// Permission to use, copy, modify, and distribute this software and its
// documentation for any purpose, without fee, and without a written
// agreement is hereby granted, provided that the above copyright notice
// and this paragraph and the following two paragraphs appear in all copies.
//
// IN NO EVENT SHALL THE NPGSQL DEVELOPMENT TEAM BE LIABLE TO ANY PARTY
// FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES,
// INCLUDING LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
// DOCUMENTATION, EVEN IF THE NPGSQL DEVELOPMENT TEAM HAS BEEN ADVISED OF
// THE POSSIBILITY OF SUCH DAMAGE.
//
// THE NPGSQL DEVELOPMENT TEAM SPECIFICALLY DISCLAIMS ANY WARRANTIES,
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
// AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
// ON AN "AS IS" BASIS, AND THE NPGSQL DEVELOPMENT TEAM HAS NO OBLIGATIONS
// TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
#endregion
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Transactions;
using JetBrains.Annotations;
using Npgsql.Logging;
namespace Npgsql
{
///
///
///
///
/// Note that a connection may be closed before its TransactionScope completes. In this case we close the NpgsqlConnection
/// as usual but the connector in a special list in the pool; it will be closed only when the scope completes.
///
class VolatileResourceManager : ISinglePhaseNotification
{
[CanBeNull] NpgsqlConnector _connector;
[CanBeNull] Transaction _transaction;
[CanBeNull] readonly string _txId;
[CanBeNull] NpgsqlTransaction _localTx;
[CanBeNull] string _preparedTxName;
bool IsPrepared => _preparedTxName != null;
bool _isDisposed;
static readonly NpgsqlLogger Log = NpgsqlLogManager.GetCurrentClassLogger();
const int MaximumRollbackAttempts = 20;
internal VolatileResourceManager(NpgsqlConnection connection, [NotNull] Transaction transaction)
{
_connector = connection.Connector;
_transaction = transaction;
// _tx gets disposed by System.Transactions at some point, but we want to be able to log its local ID
_txId = transaction.TransactionInformation.LocalIdentifier;
_localTx = connection.BeginTransaction(ConvertIsolationLevel(_transaction.IsolationLevel));
}
public void SinglePhaseCommit(SinglePhaseEnlistment singlePhaseEnlistment)
{
CheckDisposed();
Debug.Assert(_transaction != null, "No transaction");
Debug.Assert(_localTx != null, "No local transaction");
Debug.Assert(_connector != null, "No connector");
Log.Debug($"Single Phase Commit (localid={_txId})", _connector.Id);
try
{
_localTx.Commit();
singlePhaseEnlistment.Committed();
}
catch (PostgresException e)
{
singlePhaseEnlistment.Aborted(e);
}
catch (Exception e)
{
singlePhaseEnlistment.InDoubt(e);
}
finally
{
Dispose();
}
}
public void Prepare(PreparingEnlistment preparingEnlistment)
{
CheckDisposed();
Debug.Assert(_transaction != null, "No transaction");
Debug.Assert(_localTx != null, "No local transaction");
Debug.Assert(_connector != null, "No connector");
Log.Debug($"Two-phase transaction prepare (localid={_txId})", _connector.Id);
// The PostgreSQL prepared transaction name is the distributed GUID + our connection's process ID, for uniqueness
_preparedTxName = $"{_transaction.TransactionInformation.DistributedIdentifier}/{_connector.BackendProcessId}";
try
{
using (_connector.StartUserAction())
_connector.ExecuteInternalCommand($"PREPARE TRANSACTION '{_preparedTxName}'");
// The MSDTC, which manages escalated distributed transactions, performs the 2nd phase
// asynchronously - this means that TransactionScope.Dispose() will return before all
// resource managers have actually commit.
// If the same connection tries to enlist to a new TransactionScope immediately after
// disposing an old TransactionScope, its EnlistedTransaction must have been cleared
// (or we'll throw a double enlistment exception). This must be done here at the 1st phase
// (which is sync).
if (_connector.Connection != null)
_connector.Connection.EnlistedTransaction = null;
preparingEnlistment.Prepared();
}
catch (Exception e)
{
Dispose();
preparingEnlistment.ForceRollback(e);
}
}
public void Commit(Enlistment enlistment)
{
CheckDisposed();
Debug.Assert(_transaction != null, "No transaction");
Debug.Assert(_connector != null, "No connector");
Log.Debug($"Two-phase transaction commit (localid={_txId})", _connector.Id);
try
{
if (_connector.Connection == null)
{
// The connection has been closed before the TransactionScope was disposed.
// The connector is unbound from its connection and is sitting in the pool's
// pending enlisted connector list. Since there's no risk of the connector being
// used by anyone we can executed the 2nd phase on it directly (see below).
using (_connector.StartUserAction())
_connector.ExecuteInternalCommand($"COMMIT PREPARED '{_preparedTxName}'");
}
else
{
// The connection is still open and potentially will be reused by by the user.
// The MSDTC, which manages escalated distributed transactions, performs the 2nd phase
// asynchronously - this means that TransactionScope.Dispose() will return before all
// resource managers have actually commit. This can cause a concurrent connection use scenario
// if the user continues to use their connection after disposing the scope, and the MSDTC
// requests a commit at that exact time.
// To avoid this, we open a new connection for performing the 2nd phase.
using (var conn2 = (NpgsqlConnection)((ICloneable)_connector.Connection).Clone())
{
conn2.Open();
var connector = conn2.Connector;
Debug.Assert(connector != null);
using (connector.StartUserAction())
connector.ExecuteInternalCommand($"COMMIT PREPARED '{_preparedTxName}'");
}
}
}
catch (Exception e)
{
Log.Error("Exception during two-phase transaction commit (localid={TransactionId})", e, _connector.Id);
}
finally
{
Dispose();
enlistment.Done();
}
}
public void Rollback(Enlistment enlistment)
{
CheckDisposed();
Debug.Assert(_transaction != null, "No transaction");
Debug.Assert(_connector != null, "No connector");
try
{
if (IsPrepared)
RollbackTwoPhase();
else
RollbackLocal();
}
catch (Exception e)
{
Log.Error($"Exception during transaction rollback (localid={_txId})", e, _connector.Id);
}
finally
{
Dispose();
enlistment.Done();
}
}
public void InDoubt(Enlistment enlistment)
{
Debug.Assert(_transaction != null, "No transaction");
Debug.Assert(_connector != null, "No connector");
Log.Warn($"Two-phase transaction in doubt (localid={_txId})", _connector.Id);
// TODO: Is this the correct behavior?
try
{
RollbackTwoPhase();
}
catch (Exception e)
{
Log.Error($"Exception during transaction rollback (localid={_txId})", e, _connector.Id);
}
finally
{
Dispose();
enlistment.Done();
}
}
void RollbackLocal()
{
Debug.Assert(_connector != null, "No connector");
Debug.Assert(_localTx != null, "No local transaction");
Log.Debug($"Single-phase transaction rollback (localid={_txId})", _connector.Id);
var attempt = 0;
while (true)
{
try
{
_localTx.Rollback();
return;
}
catch (NpgsqlOperationInProgressException)
{
// Repeatedly attempts to rollback, to support timeout-triggered rollbacks that occur
// while the connection is busy.
// This really shouldn't be necessary, but just in case
if (attempt++ == MaximumRollbackAttempts)
throw new Exception($"Could not roll back after {MaximumRollbackAttempts} attempts, aborting. Transaction is in an unknown state.");
Log.Warn($"Connection in use while trying to rollback, will cancel and retry (localid={_txId}", _connector.Id);
_connector.CancelRequest();
// Cancellations are asynchronous, give it some time
Thread.Sleep(500);
}
}
}
void RollbackTwoPhase()
{
// This only occurs if we've started a two-phase commit but one of the commits has failed.
Log.Debug($"Two-phase transaction rollback (localid={_txId})", _connector.Id);
if (_connector.Connection == null)
{
// The connection has been closed before the TransactionScope was disposed.
// The connector is unbound from its connection and is sitting in the pool's
// pending enlisted connector list. Since there's no risk of the connector being
// used by anyone we can executed the 2nd phase on it directly (see below).
using (_connector.StartUserAction())
_connector.ExecuteInternalCommand($"ROLLBACK PREPARED '{_preparedTxName}'");
}
else
{
// The connection is still open and potentially will be reused by by the user.
// The MSDTC, which manages escalated distributed transactions, performs the 2nd phase
// asynchronously - this means that TransactionScope.Dispose() will return before all
// resource managers have actually commit. This can cause a concurrent connection use scenario
// if the user continues to use their connection after disposing the scope, and the MSDTC
// requests a commit at that exact time.
// To avoid this, we open a new connection for performing the 2nd phase.
using (var conn2 = (NpgsqlConnection)((ICloneable)_connector.Connection).Clone())
{
conn2.Open();
var connector = conn2.Connector;
Debug.Assert(connector != null);
using (connector.StartUserAction())
connector.ExecuteInternalCommand($"ROLLBACK PREPARED '{_preparedTxName}'");
}
}
}
#region Dispose/Cleanup
void Dispose()
{
if (_isDisposed)
return;
Debug.Assert(_transaction != null, "No transaction");
Debug.Assert(_connector != null, "No connector");
Log.Trace($"Cleaning up resource manager (localid={_txId}", _connector.Id);
if (_localTx != null)
{
_localTx.Dispose();
_localTx = null;
}
if (_connector.Connection != null)
_connector.Connection.EnlistedTransaction = null;
else
{
// We're here for connections which were closed before their TransactionScope completes.
// These need to be closed now.
if (_connector.Settings.Pooling)
{
var found = PoolManager.TryGetValue(_connector.ConnectionString, out var pool);
Debug.Assert(found);
pool.TryRemovePendingEnlistedConnector(_connector, _transaction);
pool.Release(_connector);
}
else
_connector.Close();
}
_connector = null;
_transaction = null;
_isDisposed = true;
}
void CheckDisposed()
{
if (_isDisposed)
throw new ObjectDisposedException(nameof(VolatileResourceManager));
}
#endregion
static System.Data.IsolationLevel ConvertIsolationLevel(IsolationLevel isolationLevel)
{
switch (isolationLevel)
{
case IsolationLevel.Chaos:
return System.Data.IsolationLevel.Chaos;
case IsolationLevel.ReadCommitted:
return System.Data.IsolationLevel.ReadCommitted;
case IsolationLevel.ReadUncommitted:
return System.Data.IsolationLevel.ReadUncommitted;
case IsolationLevel.RepeatableRead:
return System.Data.IsolationLevel.RepeatableRead;
case IsolationLevel.Serializable:
return System.Data.IsolationLevel.Serializable;
case IsolationLevel.Snapshot:
return System.Data.IsolationLevel.Snapshot;
case IsolationLevel.Unspecified:
default:
return System.Data.IsolationLevel.Unspecified;
}
}
}
}