This repository was archived by the owner on Dec 24, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 860
Expand file tree
/
Copy pathProgram.cs
More file actions
112 lines (100 loc) · 3.57 KB
/
Program.cs
File metadata and controls
112 lines (100 loc) · 3.57 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
using System;
using System.Threading;
using ServiceStack;
using ServiceStack.Logging;
using ServiceStack.Redis;
using ServiceStack.Redis.Messaging;
using ServiceStack.Text;
namespace TestMqHost
{
public class Incr
{
public int Value { get; set; }
}
class Program
{
static void Main2(string[] args)
{
var sbLogFactory = new StringBuilderLogFactory();
LogManager.LogFactory = sbLogFactory;
var log = LogManager.GetLogger(typeof(Program));
var clientManager = new PooledRedisClientManager(new[] { "localhost" })
{
PoolTimeout = 1000,
};
var mqHost = new RedisMqServer(clientManager, retryCount: 2);
var msgsProcessed = 0;
var sum = 0;
mqHost.RegisterHandler<Incr>(c =>
{
var dto = c.GetBody();
sum += dto.Value;
log.InfoFormat("Received {0}, sum: {1}", dto.Value, sum);
msgsProcessed++;
return null;
});
mqHost.Start();
10.Times(i =>
{
ThreadPool.QueueUserWorkItem(x =>
{
using (var client = mqHost.CreateMessageQueueClient())
{
try
{
log.InfoFormat("Publish: {0}...", i);
client.Publish(new Incr { Value = i });
}
catch (Exception ex)
{
log.InfoFormat("Start Publish exception: {0}", ex.Message);
clientManager.GetClientPoolActiveStates().PrintDump();
clientManager.GetReadOnlyClientPoolActiveStates().PrintDump();
}
Thread.Sleep(10);
}
});
});
ThreadPool.QueueUserWorkItem(_ =>
{
using (var client = (RedisClient)clientManager.GetClient())
{
client.SetConfig("timeout", "1");
var clientAddrs = client.GetClientList().ConvertAll(x => x["addr"]);
log.InfoFormat("Killing clients: {0}...", clientAddrs.Dump());
try
{
clientAddrs.ForEach(client.ClientKill);
}
catch (Exception ex)
{
log.InfoFormat("Client exception: {0}", ex.Message);
}
}
});
20.Times(i =>
{
using (var client = mqHost.CreateMessageQueueClient())
{
try
{
log.InfoFormat("Publish: {0}...", i);
client.Publish(new Incr { Value = i });
}
catch (Exception ex)
{
log.InfoFormat("Publish exception: {0}", ex.Message);
clientManager.GetClientPoolActiveStates().PrintDump();
clientManager.GetReadOnlyClientPoolActiveStates().PrintDump();
}
}
Thread.Sleep(1000);
});
Thread.Sleep(2000);
"Messages processed: {0}".Print(msgsProcessed);
"Logs: ".Print();
sbLogFactory.GetLogs().Print();
Console.ReadKey();
}
}
}