forked from sttp/cppapi
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathSubscriberHandler.cpp
More file actions
131 lines (109 loc) · 4.49 KB
/
SubscriberHandler.cpp
File metadata and controls
131 lines (109 loc) · 4.49 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
//******************************************************************************************************
// SubscriberHandler.cpp - Gbtc
//
// Copyright © 2018, Grid Protection Alliance. All Rights Reserved.
//
// Licensed to the Grid Protection Alliance (GPA) under one or more contributor license agreements. See
// the NOTICE file distributed with this work for additional information regarding copyright ownership.
// The GPA licenses this file to you under the MIT License (MIT), the "License"; you may not use this
// file except in compliance with the License. You may obtain a copy of the License at:
//
// http://opensource.org/licenses/MIT
//
// Unless agreed to in writing, the subject software distributed under the License is distributed on an
// "AS-IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. Refer to the
// License for the specific language governing permissions and limitations.
//
// Code Modification History:
// ----------------------------------------------------------------------------------------------------
// 03/27/2018 - J. Ritchie Carroll
// Generated original version of source code.
//
//******************************************************************************************************
#include "SubscriberHandler.h"
#include "../../lib/Convert.h"
using namespace std;
using namespace sttp;
using namespace sttp::transport;
Mutex SubscriberHandler::s_coutLock{};
SubscriberHandler::SubscriberHandler(string name) :
m_name(std::move(name)),
m_processCount(0ULL),
m_export(nullptr),
m_ready(false)
{
}
void SubscriberHandler::ReceivedNewMeasurements(const vector<MeasurementPtr>& measurements)
{
static const uint64_t interval = 30 * 2;
static const uint64_t exportCount = 500;
const uint64_t measurementCount = measurements.size();
const bool showMessage = (m_processCount + measurementCount >= (m_processCount / interval + 1) * interval);
if (m_processCount >= exportCount)
return;
// Only display messages every few seconds
if (showMessage)
{
stringstream receivedUpdate;
receivedUpdate << GetTotalMeasurementsReceived() << " measurements received so far..." << endl << endl;
StatusMessage(receivedUpdate.str());
}
// Process measurements
for (auto &measurement : measurements)
{
datetime_t timestamp = measurement->GetDateTime();
if (!m_ready)
{
// Start export at top of second
if (DatePart(timestamp, TimeInterval::Millisecond) == 0)
m_ready = true;
else
continue;
StatusMessage("Export started for measurement " + ToString(measurement->SignalID) + " timestamp at " + ToString(measurement->GetDateTime()));
fprintf(m_export, ("Export for measurement " + ToString(measurement->SignalID) + "\n\n").c_str());
fprintf(m_export, "Timestamp,Value,Flags\n");
}
fprintf(m_export, "%s,%.*f,%i\n", ToString(measurement->GetDateTime()).c_str(), 10, measurement->Value, measurement->Flags);
m_processCount++;
if (m_processCount >= exportCount)
{
Thread([this]{ Disconnect(); });
break;
}
}
}
void SubscriberHandler::StatusMessage(const string& message)
{
// Calls can come from multiple threads, so we impose a simple lock before write to console
s_coutLock.lock();
SubscriberInstance::StatusMessage(message);
s_coutLock.unlock();
}
void SubscriberHandler::ErrorMessage(const string& message)
{
// Calls can come from multiple threads, so we impose a simple lock before write to console
s_coutLock.lock();
SubscriberInstance::ErrorMessage(message);
s_coutLock.unlock();
}
void SubscriberHandler::SubscriptionUpdated(const SignalIndexCachePtr& signalIndexCache)
{
StatusMessage("Publisher provided " + ToString(signalIndexCache->Count()) + " measurements in response to subscription.");
}
void SubscriberHandler::ConnectionEstablished()
{
StatusMessage("Connection established.");
stringstream fileNameBuf;
fileNameBuf << m_name << "-cpp.csv";
const string fileName = fileNameBuf.str();
if (fopen_s(&m_export, fileName.c_str(), "w") != 0)
{
ErrorMessage("InteropTest canceled: failed to open export file \"" + ToString(fileName) + "\"");
Thread([this]{ Disconnect(); });
}
}
void SubscriberHandler::ConnectionTerminated()
{
StatusMessage("Connection terminated.");
fclose(m_export);
}