@@ -428,13 +428,18 @@ public void Publish(string selector)
428428 }
429429
430430 public void Publish ( string selector , string message )
431+ {
432+ var msg = message ?? "" ;
433+ var frame = "id: " + Interlocked . Increment ( ref msgId ) + "\n "
434+ + "data: " + selector + " " + msg + "\n \n " ;
435+
436+ PublishRaw ( frame ) ;
437+ }
438+
439+ public void PublishRaw ( string frame )
431440 {
432441 try
433442 {
434- var msg = message ?? "" ;
435- var frame = "id: " + Interlocked . Increment ( ref msgId ) + "\n "
436- + "data: " + selector + " " + msg + "\n \n " ;
437-
438443 lock ( response )
439444 {
440445 WriteEvent ( response , frame ) ;
@@ -445,15 +450,18 @@ public void Publish(string selector, string message)
445450 }
446451 catch ( Exception ex )
447452 {
448- Log . Error ( "Error publishing notification to: " + selector , ex ) ;
453+ Log . Error ( "Error publishing notification to: " + frame . SafeSubstring ( 0 , 50 ) , ex ) ;
449454
450455 // Mono: If we explicitly close OutputStream after the error socket wont leak (response.Close() doesn't work)
451456 try
452457 {
453458 // This will throw an exception, but on Mono (Linux/OSX) the socket will leak if we not close the OutputStream
454459 response . OutputStream . Close ( ) ;
455460 }
456- catch { }
461+ catch ( Exception innerEx )
462+ {
463+ Log . Error ( "OutputStream.Close()" , innerEx ) ;
464+ }
457465
458466 Unsubscribe ( ) ;
459467 }
@@ -510,6 +518,7 @@ public interface IEventSubscription : IMeta, IDisposable
510518 void Unsubscribe ( ) ;
511519
512520 void Publish ( string selector , string message ) ;
521+ void PublishRaw ( string frame ) ;
513522 void Pulse ( ) ;
514523 }
515524
@@ -673,6 +682,34 @@ protected void Notify(ConcurrentDictionary<string, ConcurrentDictionary<IEventSu
673682 }
674683 }
675684
685+ protected void FlushNop ( ConcurrentDictionary < string , ConcurrentDictionary < IEventSubscription , bool > > map ,
686+ string key , string channel = null )
687+ {
688+ var subs = map . TryGet ( key ) ;
689+ if ( subs == null )
690+ return ;
691+
692+ var expired = new List < IEventSubscription > ( ) ;
693+ var now = DateTime . UtcNow ;
694+
695+ foreach ( var sub in subs . KeysWithoutLock ( ) )
696+ {
697+ if ( sub . HasChannel ( channel ) )
698+ {
699+ if ( now - sub . LastPulseAt > IdleTimeout )
700+ {
701+ expired . Add ( sub ) ;
702+ }
703+ sub . PublishRaw ( "\n " ) ;
704+ }
705+ }
706+
707+ foreach ( var sub in expired )
708+ {
709+ sub . Unsubscribe ( ) ;
710+ }
711+ }
712+
676713 protected void Notify ( ConcurrentDictionary < string , IEventSubscription > map , string key , string selector ,
677714 object message , string channel = null )
678715 {
@@ -900,10 +937,12 @@ public void Register(IEventSubscription subscription, Dictionary<string, string>
900937
901938 if ( OnSubscribe != null )
902939 OnSubscribe ( subscription ) ;
903- }
904940
905- if ( NotifyChannelOfSubscriptions && subscription . Channels != null && NotifyJoin != null )
906- NotifyJoin ( subscription ) ;
941+ if ( NotifyChannelOfSubscriptions && subscription . Channels != null && NotifyJoin != null )
942+ NotifyJoin ( subscription ) ;
943+ else
944+ FlushNopToChannels ( subscription . Channels ) ;
945+ }
907946 }
908947 catch ( Exception ex )
909948 {
@@ -912,6 +951,18 @@ public void Register(IEventSubscription subscription, Dictionary<string, string>
912951 }
913952 }
914953
954+ public void FlushNopToChannels ( string [ ] channels )
955+ {
956+ //For some yet-to-be-determined reason we need to send something to all channels to determine
957+ //which subscriptions are no longer connected so we can dispose of them right then and there.
958+ //Failing to do this for 10 simultaneous requests on Local IIS will hang the entire Website instance
959+ //ref: https://forums.servicestack.net/t/serversentevents-with-notifychannelofsubscriptions-set-to-false-leaks-requests/2552/2
960+ foreach ( var channel in channels )
961+ {
962+ FlushNop ( ChannelSubcriptions , channel , channel ) ;
963+ }
964+ }
965+
915966 void RegisterSubscription ( IEventSubscription subscription , string key ,
916967 ConcurrentDictionary < string , IEventSubscription > map )
917968 {
@@ -989,10 +1040,10 @@ void HandleUnsubscription(IEventSubscription subscription)
9891040 OnUnsubscribe ( subscription ) ;
9901041
9911042 subscription . Dispose ( ) ;
992- }
9931043
994- if ( NotifyChannelOfSubscriptions && subscription . Channels != null && NotifyLeave != null )
995- NotifyLeave ( subscription ) ;
1044+ if ( NotifyChannelOfSubscriptions && subscription . Channels != null && NotifyLeave != null )
1045+ NotifyLeave ( subscription ) ;
1046+ }
9961047 }
9971048
9981049 public void Dispose ( )
0 commit comments