Skip to content

Commit 683f1a9

Browse files
committed
Make sure we flush something to all subscribers to workout which are no longer connected
1 parent f79e10c commit 683f1a9

1 file changed

Lines changed: 63 additions & 12 deletions

File tree

src/ServiceStack/ServerEventsFeature.cs

Lines changed: 63 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -428,13 +428,18 @@ public void Publish(string selector)
428428
}
429429

430430
public void Publish(string selector, string message)
431+
{
432+
var msg = message ?? "";
433+
var frame = "id: " + Interlocked.Increment(ref msgId) + "\n"
434+
+ "data: " + selector + " " + msg + "\n\n";
435+
436+
PublishRaw(frame);
437+
}
438+
439+
public void PublishRaw(string frame)
431440
{
432441
try
433442
{
434-
var msg = message ?? "";
435-
var frame = "id: " + Interlocked.Increment(ref msgId) + "\n"
436-
+ "data: " + selector + " " + msg + "\n\n";
437-
438443
lock (response)
439444
{
440445
WriteEvent(response, frame);
@@ -445,15 +450,18 @@ public void Publish(string selector, string message)
445450
}
446451
catch (Exception ex)
447452
{
448-
Log.Error("Error publishing notification to: " + selector, ex);
453+
Log.Error("Error publishing notification to: " + frame.SafeSubstring(0, 50), ex);
449454

450455
// Mono: If we explicitly close OutputStream after the error socket wont leak (response.Close() doesn't work)
451456
try
452457
{
453458
// This will throw an exception, but on Mono (Linux/OSX) the socket will leak if we not close the OutputStream
454459
response.OutputStream.Close();
455460
}
456-
catch { }
461+
catch(Exception innerEx)
462+
{
463+
Log.Error("OutputStream.Close()", innerEx);
464+
}
457465

458466
Unsubscribe();
459467
}
@@ -510,6 +518,7 @@ public interface IEventSubscription : IMeta, IDisposable
510518
void Unsubscribe();
511519

512520
void Publish(string selector, string message);
521+
void PublishRaw(string frame);
513522
void Pulse();
514523
}
515524

@@ -673,6 +682,34 @@ protected void Notify(ConcurrentDictionary<string, ConcurrentDictionary<IEventSu
673682
}
674683
}
675684

685+
protected void FlushNop(ConcurrentDictionary<string, ConcurrentDictionary<IEventSubscription, bool>> map,
686+
string key, string channel = null)
687+
{
688+
var subs = map.TryGet(key);
689+
if (subs == null)
690+
return;
691+
692+
var expired = new List<IEventSubscription>();
693+
var now = DateTime.UtcNow;
694+
695+
foreach (var sub in subs.KeysWithoutLock())
696+
{
697+
if (sub.HasChannel(channel))
698+
{
699+
if (now - sub.LastPulseAt > IdleTimeout)
700+
{
701+
expired.Add(sub);
702+
}
703+
sub.PublishRaw("\n");
704+
}
705+
}
706+
707+
foreach (var sub in expired)
708+
{
709+
sub.Unsubscribe();
710+
}
711+
}
712+
676713
protected void Notify(ConcurrentDictionary<string, IEventSubscription> map, string key, string selector,
677714
object message, string channel = null)
678715
{
@@ -900,10 +937,12 @@ public void Register(IEventSubscription subscription, Dictionary<string, string>
900937

901938
if (OnSubscribe != null)
902939
OnSubscribe(subscription);
903-
}
904940

905-
if (NotifyChannelOfSubscriptions && subscription.Channels != null && NotifyJoin != null)
906-
NotifyJoin(subscription);
941+
if (NotifyChannelOfSubscriptions && subscription.Channels != null && NotifyJoin != null)
942+
NotifyJoin(subscription);
943+
else
944+
FlushNopToChannels(subscription.Channels);
945+
}
907946
}
908947
catch (Exception ex)
909948
{
@@ -912,6 +951,18 @@ public void Register(IEventSubscription subscription, Dictionary<string, string>
912951
}
913952
}
914953

954+
public void FlushNopToChannels(string[] channels)
955+
{
956+
//For some yet-to-be-determined reason we need to send something to all channels to determine
957+
//which subscriptions are no longer connected so we can dispose of them right then and there.
958+
//Failing to do this for 10 simultaneous requests on Local IIS will hang the entire Website instance
959+
//ref: https://forums.servicestack.net/t/serversentevents-with-notifychannelofsubscriptions-set-to-false-leaks-requests/2552/2
960+
foreach (var channel in channels)
961+
{
962+
FlushNop(ChannelSubcriptions, channel, channel);
963+
}
964+
}
965+
915966
void RegisterSubscription(IEventSubscription subscription, string key,
916967
ConcurrentDictionary<string, IEventSubscription> map)
917968
{
@@ -989,10 +1040,10 @@ void HandleUnsubscription(IEventSubscription subscription)
9891040
OnUnsubscribe(subscription);
9901041

9911042
subscription.Dispose();
992-
}
9931043

994-
if (NotifyChannelOfSubscriptions && subscription.Channels != null && NotifyLeave != null)
995-
NotifyLeave(subscription);
1044+
if (NotifyChannelOfSubscriptions && subscription.Channels != null && NotifyLeave != null)
1045+
NotifyLeave(subscription);
1046+
}
9961047
}
9971048

9981049
public void Dispose()

0 commit comments

Comments
 (0)