diff --git a/src/System.Management.Automation/engine/remoting/fanin/OutOfProcTransportManager.cs b/src/System.Management.Automation/engine/remoting/fanin/OutOfProcTransportManager.cs index c23b875d5b3..c94f5a74617 100644 --- a/src/System.Management.Automation/engine/remoting/fanin/OutOfProcTransportManager.cs +++ b/src/System.Management.Automation/engine/remoting/fanin/OutOfProcTransportManager.cs @@ -9,6 +9,7 @@ * elevation to support local machine remoting). */ +using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Globalization; @@ -463,6 +464,8 @@ internal abstract class OutOfProcessClientSessionTransportManagerBase : BaseClie { #region Data + private readonly BlockingCollection _sessionMessageQueue; + private readonly BlockingCollection _commandMessageQueue; private PrioritySendDataCollection.OnDataAvailableCallback _onDataAvailableToSendCallback; private OutOfProcessUtils.DataProcessingDelegates _dataProcessingCallbacks; private Dictionary _cmdTransportManagers; @@ -507,6 +510,20 @@ internal OutOfProcessClientSessionTransportManagerBase( // timers initialization _closeTimeOutTimer = new Timer(OnCloseTimeOutTimerElapsed, null, Timeout.Infinite, Timeout.Infinite); + // Session message processing + _sessionMessageQueue = new BlockingCollection(); + var sessionThread = new Thread(ProcessMessageProc); + sessionThread.Name = "SessionMessageProcessing"; + sessionThread.IsBackground = true; + sessionThread.Start(_sessionMessageQueue); + + // Command message processing + _commandMessageQueue = new BlockingCollection(); + var commandThread = new Thread(ProcessMessageProc); + commandThread.Name = "CommandMessageProcessing"; + commandThread.IsBackground = true; + commandThread.Start(_commandMessageQueue); + _tracer = PowerShellTraceSourceFactory.GetTraceSource(); } @@ -601,7 +618,7 @@ internal override BaseClientCommandTransportManager CreateClientCommandTransport } /// - /// Kills the server process and disposes other resources. + /// Terminates the server process and disposes other resources. /// /// internal override void Dispose(bool isDisposing) @@ -611,6 +628,8 @@ internal override void Dispose(bool isDisposing) { _cmdTransportManagers.Clear(); _closeTimeOutTimer.Dispose(); + _sessionMessageQueue.Dispose(); + _commandMessageQueue.Dispose(); } } @@ -663,29 +682,88 @@ private void OnCloseSessionCompleted() { // stop timer _closeTimeOutTimer.Change(Timeout.Infinite, Timeout.Infinite); + + // Stop protocol message processing threads. + _sessionMessageQueue.CompleteAdding(); + _commandMessageQueue.CompleteAdding(); + RaiseCloseCompleted(); CleanupConnection(); } protected abstract void CleanupConnection(); + private void ProcessMessageProc(object state) + { + var messageQueue = state as BlockingCollection; + + try + { + while (true) + { + var data = messageQueue.Take(); + try + { + OutOfProcessUtils.ProcessData(data, _dataProcessingCallbacks); + } + catch (Exception exception) + { + PSRemotingTransportException psrte = + new PSRemotingTransportException( + PSRemotingErrorId.IPCErrorProcessingServerData, + RemotingErrorIdStrings.IPCErrorProcessingServerData, + exception.Message); + RaiseErrorHandler(new TransportErrorOccuredEventArgs(psrte, TransportMethodEnum.ReceiveShellOutputEx)); + } + } + } + catch (InvalidOperationException) + { + // Normal session message processing thread end. + } + } + + private const string GUIDTAG = "PSGuid='"; + private const int GUID_STR_LEN = 36; // GUID string: 32 digits plus 4 dashes + + private Guid GetMessageGuid(string data) + { + // Perform quick scan for data packet for a GUID, ignoring any errors. + var iTag = data.IndexOf(GUIDTAG, StringComparison.OrdinalIgnoreCase); + if (iTag > -1) + { + try + { + var psGuidString = data.Substring(iTag + GUIDTAG.Length, GUID_STR_LEN); + return new Guid(psGuidString); + } + catch + { + // Ignore any malformed packet errors here and return an empty Guid. + // Packet errors will be reported later during message processing. + } + } + + return Guid.Empty; + } + #endregion #region Event Handlers protected void HandleOutputDataReceived(string data) { - try + // Route protocol message based on whether it is a session or command message. + // Session messages have empty Guid values. + if (Guid.Equals(GetMessageGuid(data), Guid.Empty)) { - OutOfProcessUtils.ProcessData(data, _dataProcessingCallbacks); + // Session message + _sessionMessageQueue.Add(data); } - catch (Exception exception) + else { - PSRemotingTransportException psrte = - new PSRemotingTransportException(PSRemotingErrorId.IPCErrorProcessingServerData, - RemotingErrorIdStrings.IPCErrorProcessingServerData, - exception.Message); - RaiseErrorHandler(new TransportErrorOccuredEventArgs(psrte, TransportMethodEnum.ReceiveShellOutputEx)); + // Command message + _commandMessageQueue.Add(data); } }