forked from tangxuehua/equeue
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathProgram.cs
More file actions
101 lines (95 loc) · 4.04 KB
/
Program.cs
File metadata and controls
101 lines (95 loc) · 4.04 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
using System;
using System.Collections.Generic;
using System.Configuration;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using ECommon.Components;
using ECommon.Configurations;
using ECommon.Utilities;
using EQueue.Broker;
using EQueue.Configurations;
using EQueue.Protocols;
using ECommonConfiguration = ECommon.Configurations.Configuration;
namespace EQueue.MessageStorePerfTests
{
class Program
{
static IMessageStore _messageStore;
static IPerformanceService _performanceService;
static void Main(string[] args)
{
InitializeEQueue();
WriteMessagePerfTest();
Console.ReadLine();
}
static void InitializeEQueue()
{
var configuration = ECommonConfiguration
.Create()
.UseAutofac()
.RegisterCommonComponents()
.UseLog4Net()
.UseJsonNet()
.RegisterUnhandledExceptionHandler()
.RegisterEQueueComponents();
BrokerController.Create(new BrokerSetting(false, ConfigurationManager.AppSettings["fileStoreRootPath"], enableCache: false, syncFlush: bool.Parse(ConfigurationManager.AppSettings["syncFlush"])));
_messageStore = ObjectContainer.Resolve<IMessageStore>();
_performanceService = ObjectContainer.Resolve<IPerformanceService>();
_performanceService.Initialize("StoreMessage").Start();
_messageStore.Load();
_messageStore.Start();
}
static void WriteMessagePerfTest()
{
var threadCount = int.Parse(ConfigurationManager.AppSettings["concurrentThreadCount"]); //并行写消息的线程数
var messageSize = int.Parse(ConfigurationManager.AppSettings["messageSize"]); //消息大小,字节为单位
var messageCount = int.Parse(ConfigurationManager.AppSettings["messageCount"]); //总共要写入的消息数
var batchSize = int.Parse(ConfigurationManager.AppSettings["batchSize"]); //批量持久化大小
var payload = new byte[messageSize];
var messages = new List<Message>();
var topic = "topic1";
var queue = new Queue(topic, 1);
var count = 0L;
for (var i = 0; i < batchSize; i++)
{
messages.Add(new Message(topic, 100, payload));
}
for (var i = 0; i < threadCount; i++)
{
Task.Factory.StartNew(() =>
{
while (true)
{
var current = Interlocked.Increment(ref count);
if (current > messageCount)
{
break;
}
foreach (var message in messages)
{
message.CreatedTime = DateTime.Now;
}
if (batchSize == 1)
{
_messageStore.StoreMessageAsync(queue, messages.First(), (x, y) =>
{
_performanceService.IncrementKeyCount("default", (DateTime.Now - x.CreatedTime).TotalMilliseconds);
}, null, null);
}
else
{
_messageStore.BatchStoreMessageAsync(queue, messages, (x, y) =>
{
foreach (var record in x.Records)
{
_performanceService.IncrementKeyCount("default", (DateTime.Now - record.CreatedTime).TotalMilliseconds);
}
}, null, null);
}
}
});
}
}
}
}