Skip to content

Commit 8040bc2

Browse files
committed
Add PublishToOutqWhitelist/DisablePublishingToOutq to disable publishing .outq messages
1 parent d54482b commit 8040bc2

File tree

6 files changed

+96
-1
lines changed

6 files changed

+96
-1
lines changed

src/ServiceStack.RabbitMq/RabbitMqServer.cs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,21 @@ public bool DisablePublishingResponses
121121
set => PublishResponsesWhitelist = value ? TypeConstants.EmptyStringArray : null;
122122
}
123123

124+
/// <summary>
125+
/// Opt-in to only publish .outq messages on this white list.
126+
/// Publishes all responses by default.
127+
/// </summary>
128+
public string[] PublishToOutqWhitelist { get; set; }
129+
130+
/// <summary>
131+
/// Don't publish any messages to .outq
132+
/// </summary>
133+
public bool DisablePublishingToOutq
134+
{
135+
set => PublishToOutqWhitelist = value ? TypeConstants.EmptyStringArray : null;
136+
}
137+
138+
124139
private IConnection connection;
125140
private IConnection Connection => connection ?? (connection = ConnectionFactory.CreateConnection());
126141

@@ -186,7 +201,8 @@ protected IMessageHandlerFactory CreateMessageHandlerFactory<T>(Func<IMessage<T>
186201
{
187202
RequestFilter = this.RequestFilter,
188203
ResponseFilter = this.ResponseFilter,
189-
PublishResponsesWhitelist = PublishResponsesWhitelist,
204+
PublishResponsesWhitelist = PublishResponsesWhitelist,
205+
PublishToOutqWhitelist = PublishToOutqWhitelist,
190206
RetryCount = RetryCount,
191207
};
192208
}

src/ServiceStack.Server/Messaging/Redis/RedisMqServer.cs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,11 +98,28 @@ public IMessageQueueClient CreateMessageQueueClient()
9898
/// </summary>
9999
public string[] PublishResponsesWhitelist { get; set; }
100100

101+
/// <summary>
102+
/// Don't publish any response messages
103+
/// </summary>
101104
public bool DisablePublishingResponses
102105
{
103106
set => PublishResponsesWhitelist = value ? TypeConstants.EmptyStringArray : null;
104107
}
105108

109+
/// <summary>
110+
/// Opt-in to only publish .outq messages on this white list.
111+
/// Publishes all responses by default.
112+
/// </summary>
113+
public string[] PublishToOutqWhitelist { get; set; }
114+
115+
/// <summary>
116+
/// Don't publish any messages to .outq
117+
/// </summary>
118+
public bool DisablePublishingToOutq
119+
{
120+
set => PublishToOutqWhitelist = value ? TypeConstants.EmptyStringArray : null;
121+
}
122+
106123
private readonly Dictionary<Type, IMessageHandlerFactory> handlerMap
107124
= new Dictionary<Type, IMessageHandlerFactory>();
108125

@@ -168,6 +185,7 @@ protected IMessageHandlerFactory CreateMessageHandlerFactory<T>(Func<IMessage<T>
168185
RequestFilter = this.RequestFilter,
169186
ResponseFilter = this.ResponseFilter,
170187
PublishResponsesWhitelist = PublishResponsesWhitelist,
188+
PublishToOutqWhitelist = PublishToOutqWhitelist,
171189
RetryCount = RetryCount,
172190
};
173191
}

src/ServiceStack/Messaging/BackgroundMqService.cs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,29 @@ public string[] PriortyQueuesWhitelist
5757
/// Publishes all responses by default.
5858
/// </summary>
5959
public string[] PublishResponsesWhitelist { get; set; }
60+
61+
/// <summary>
62+
/// Don't publish any response messages
63+
/// </summary>
64+
public bool DisablePublishingResponses
65+
{
66+
set => PublishResponsesWhitelist = value ? TypeConstants.EmptyStringArray : null;
67+
}
6068

69+
/// <summary>
70+
/// Opt-in to only publish .outq messages on this white list.
71+
/// Publishes all responses by default.
72+
/// </summary>
73+
public string[] PublishToOutqWhitelist { get; set; }
74+
75+
/// <summary>
76+
/// Don't publish any messages to .outq
77+
/// </summary>
78+
public bool DisablePublishingToOutq
79+
{
80+
set => PublishToOutqWhitelist = value ? TypeConstants.EmptyStringArray : null;
81+
}
82+
6183
/// <summary>
6284
/// Subscribe to messages sent to .outq
6385
/// </summary>
@@ -121,6 +143,7 @@ protected IMessageHandlerFactory CreateMessageHandlerFactory<T>(
121143
RequestFilter = this.RequestFilter,
122144
ResponseFilter = this.ResponseFilter,
123145
PublishResponsesWhitelist = PublishResponsesWhitelist,
146+
PublishToOutqWhitelist = PublishToOutqWhitelist,
124147
RetryCount = RetryCount,
125148
};
126149
}

src/ServiceStack/Messaging/MessageHandler.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ public class MessageHandler<T>
2020
private readonly Action<IMessageHandler, IMessage<T>, Exception> processInExceptionFn;
2121
public Func<string, IOneWayClient> ReplyClientFactory { get; set; }
2222
public string[] PublishResponsesWhitelist { get; set; }
23+
public string[] PublishToOutqWhitelist { get; set; }
24+
2325
private readonly int retryCount;
2426

2527
public int TotalMessagesProcessed { get; private set; }
@@ -170,6 +172,14 @@ public void ProcessMessage(IMessageQueueClient mqClient, IMessage<T> message)
170172
}
171173
else
172174
{
175+
var publishOutqResponses = PublishToOutqWhitelist == null;
176+
if (!publishOutqResponses)
177+
{
178+
var inWhitelist = PublishToOutqWhitelist.Contains(QueueNames<T>.Out);
179+
if (!inWhitelist)
180+
return;
181+
}
182+
173183
var messageOptions = (MessageOption) message.Options;
174184
if (messageOptions.Has(MessageOption.NotifyOneWay))
175185
{

src/ServiceStack/Messaging/MessageHandlerFactory.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ public class MessageHandlerFactory<T>
1111
public Func<IMessage, IMessage> RequestFilter { get; set; }
1212
public Func<object, object> ResponseFilter { get; set; }
1313
public string[] PublishResponsesWhitelist { get; set; }
14+
public string[] PublishToOutqWhitelist { get; set; }
1415

1516
private readonly Func<IMessage<T>, object> processMessageFn;
1617
private readonly Action<IMessageHandler, IMessage<T>, Exception> processExceptionFn;
@@ -38,6 +39,7 @@ public IMessageHandler CreateMessageHandler()
3839
return new MessageHandler<T>(messageService, processMessageFn, processExceptionFn, this.RetryCount)
3940
{
4041
PublishResponsesWhitelist = PublishResponsesWhitelist,
42+
PublishToOutqWhitelist = PublishToOutqWhitelist,
4143
};
4244
}
4345

@@ -56,6 +58,7 @@ public IMessageHandler CreateMessageHandler()
5658
processExceptionFn, this.RetryCount)
5759
{
5860
PublishResponsesWhitelist = PublishResponsesWhitelist,
61+
PublishToOutqWhitelist = PublishToOutqWhitelist,
5962
};
6063
}
6164
}

tests/ServiceStack.Server.Tests/Messaging/RabbitMqServerTests.cs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -394,6 +394,31 @@ public void Messages_with_null_Response_is_published_to_OutMQ()
394394
}
395395
}
396396

397+
[Test]
398+
public void Messages_with_null_responses_are_not_published_when_DisablePublishingToOutq()
399+
{
400+
int msgsReceived = 0;
401+
using (var mqServer = CreateMqServer())
402+
{
403+
mqServer.DisablePublishingToOutq = true;
404+
mqServer.RegisterHandler<HelloNull>(m =>
405+
{
406+
Interlocked.Increment(ref msgsReceived);
407+
return null;
408+
});
409+
410+
mqServer.Start();
411+
412+
using (var mqClient = mqServer.CreateMessageQueueClient())
413+
{
414+
mqClient.Publish(new HelloNull { Name = "Into the Void" });
415+
416+
var msg = mqClient.Get<HelloNull>(QueueNames<HelloNull>.Out, TimeSpan.FromSeconds(2));
417+
Assert.That(msg, Is.Null);
418+
}
419+
}
420+
}
421+
397422
[Test]
398423
public void Messages_with_null_Response_is_published_to_ReplyMQ()
399424
{

0 commit comments

Comments
 (0)