forked from trygve-isaacson/code-vault
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathvmessagequeue.cpp
More file actions
112 lines (85 loc) · 3.45 KB
/
vmessagequeue.cpp
File metadata and controls
112 lines (85 loc) · 3.45 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
/*
Copyright c1997-2014 Trygve Isaacson. All rights reserved.
This file is part of the Code Vault version 4.1
http://www.bombaydigital.com/
License: MIT. See LICENSE.md in the Vault top level directory.
*/
#include "vmessagequeue.h"
#include "vmutexlocker.h"
#include "vmessage.h"
#include "vlogger.h"
// VMessageQueue --------------------------------------------------------------
VDuration VMessageQueue::gVMessageQueueLagLoggingThreshold(-1 * VDuration::MILLISECOND()); // -1 means we don't examine the lag time at all
int VMessageQueue::gVMessageQueueLagLoggingLevel(VLoggerLevel::DEBUG);
VMessageQueue::VMessageQueue()
: mQueuedMessages()
, mQueuedMessagesDataSize(0)
, mMessageQueueMutex("VMessageQueue::mMessageQueueMutex")
, mMessageQueueSemaphore()
, mLastMessagePostTime()
{
}
VMessageQueue::~VMessageQueue() {
// 4.0: VMessagePtr means no more need to manually release mQueuedMessages contents.
}
void VMessageQueue::postMessage(VMessagePtr message) {
VMutexLocker locker(&mMessageQueueMutex, "VMessageQueue::postMessage()");
mQueuedMessages.push_back(message);
mLastMessagePostTime.setNow();
if (message != nullptr) {
mQueuedMessagesDataSize += message->getMessageDataLength();
}
locker.unlock(); // otherwise signal() will deadlock
mMessageQueueSemaphore.signal();
}
VMessagePtr VMessageQueue::blockUntilNextMessage() {
// If there is a message on the queue, we can simply return it.
VMessagePtr message = this->getNextMessage();
if (message != nullptr) {
return message;
}
// There is nothing on the queue, so wait until someone posts a message.
VMutex dummy("VMessageQueue::blockUntilNextMessage() dummy");
mMessageQueueSemaphore.wait(&dummy, 5 * VDuration::SECOND());
return this->getNextMessage();
}
VMessagePtr VMessageQueue::getNextMessage() {
VMessagePtr message;
VMutexLocker locker(&mMessageQueueMutex, "VMessageQueue::getNextMessage()");
if (mQueuedMessages.size() > 0) {
message = mQueuedMessages.front();
mQueuedMessages.pop_front();
if (message != nullptr) {
mQueuedMessagesDataSize -= message->getMessageDataLength();
}
}
if ((message != nullptr) && (gVMessageQueueLagLoggingThreshold >= VDuration::ZERO())) {
VInstant now;
VDuration delayInterval = now - mLastMessagePostTime;
if (delayInterval >= gVMessageQueueLagLoggingThreshold) {
VLOGGER_NAMED_LEVEL("vault.messages.VMessageQueue", gVMessageQueueLagLoggingLevel, VSTRING_FORMAT("VMessageQueue saw a delay of %s when getting a message with ID %d.", delayInterval.getDurationString().chars(), message->getMessageID()));
}
}
return message;
}
void VMessageQueue::wakeUp() {
mMessageQueueSemaphore.signal();
}
VSizeType VMessageQueue::getQueueSize() const {
// No need to lock here, nothing bad can happen underneath us.
return mQueuedMessages.size();
}
Vs64 VMessageQueue::getQueueDataSize() const {
// No need to lock here, nothing bad can happen underneath us.
return mQueuedMessagesDataSize;
}
void VMessageQueue::releaseAllMessages() {
VMutexLocker locker(&mMessageQueueMutex, "VMessageQueue::releaseAllMessages()");
while (mQueuedMessages.size() > 0) {
VMessagePtr message = mQueuedMessages.front();
mQueuedMessages.pop_front();
if (message != nullptr) {
mQueuedMessagesDataSize -= message->getMessageDataLength();
}
}
}