1212#include " Framework/RawDeviceService.h"
1313#include " Framework/Logger.h"
1414#include " Framework/DeviceInfo.h"
15+ #include " Framework/RuntimeError.h"
16+ #include " Framework/DataProcessingStates.h"
1517#include < string>
1618#include < string_view>
1719#include < regex>
@@ -37,18 +39,32 @@ bool ControlServiceHelpers::parseControl(std::string_view const& s, std::match_r
3739}
3840
3941void ControlServiceHelpers::processCommand (std::vector<DeviceInfo>& infos,
42+ std::vector<DataProcessingStates>& allStates,
4043 pid_t pid,
4144 std::string const & command,
4245 std::string const & arg)
4346{
44- auto doToMatchingPid = [](std::vector<DeviceInfo>& infos, pid_t pid, auto lambda) {
45- for (auto & deviceInfo : infos) {
47+ auto doToMatchingPid = [&](std::vector<DeviceInfo>& infos, pid_t pid, auto lambda) {
48+ assert (infos.size () == allStates.size ());
49+ for (size_t i = 0 ; i < infos.size (); ++i) {
50+ auto & deviceInfo = infos[i];
4651 if (deviceInfo.pid == pid) {
4752 return lambda (deviceInfo);
4853 }
4954 }
5055 LOGP (error, " Command received for pid {} which does not exists." , pid);
5156 };
57+ auto doToMatchingStatePid = [&](std::vector<DeviceInfo>& infos, std::vector<DataProcessingStates>& allStates, pid_t pid, auto lambda) {
58+ assert (infos.size () == allStates.size ());
59+ for (size_t i = 0 ; i < infos.size (); ++i) {
60+ auto & deviceInfo = infos[i];
61+ auto & states = allStates[i];
62+ if (deviceInfo.pid == pid) {
63+ return lambda (deviceInfo, states);
64+ }
65+ }
66+ LOGP (error, " Command received for pid {} which does not exists." , pid);
67+ };
5268 LOGP (debug2, " Found control command {} from pid {} with argument {}." , command, pid, arg);
5369 if (command == " QUIT" && arg == " ALL" ) {
5470 for (auto & deviceInfo : infos) {
@@ -68,37 +84,49 @@ void ControlServiceHelpers::processCommand(std::vector<DeviceInfo>& infos,
6884 } else if (command == " NOTIFY_DEVICE_STATE" ) {
6985 doToMatchingPid (infos, pid, [arg](DeviceInfo& info) { info.deviceState = arg; info.providedState ++; });
7086 } else if (command == " PUT" ) {
71- doToMatchingPid (infos, pid, [&arg](DeviceInfo& info) {
72- // Tokenize arg for the first two empty space using strtok_r
73- // and create string_views associated to it.
74- char * brk;
75- char * beginKey = strtok_r ((char *)arg.data (), " " , &brk);
76- char * endKey = strtok_r (nullptr , " " , &brk);
77- char * beginTimestamp = endKey;
78- char * endTimestamp = strtok_r (nullptr , " " , &brk);
79- char * beginValue = endTimestamp;
80- char * endValue = (char *)arg.data () + arg.size ();
81- std::string_view key (beginKey, endKey - beginKey);
82- std::string_view timestamp (beginTimestamp, endTimestamp - beginTimestamp);
83- std::string_view value (beginValue, endValue - beginValue);
84- int timestampInt = std::stoll (timestamp.data ());
87+ doToMatchingStatePid (infos, allStates, pid, [&arg](DeviceInfo& info, DataProcessingStates& states) {
88+ // / Use scanf to parse PUT <key> <timestamp>
89+ // find the first space, that is the beginning of the key.
90+ // Find the position of the fist space in beginKey.
91+ auto beginKey = 0 ;
92+ // If we did not find it complain and return.
93+ if (beginKey == std::string::npos) {
94+ LOGP (error, " Cannot parse key in PUT command with arg {} for device {}" , arg, info.pid );
95+ return ;
96+ }
97+ auto endKey = arg.find (' ' , beginKey + 1 );
98+ if (endKey == std::string::npos) {
99+ LOGP (error, " Cannot parse timestamp in PUT command with arg {}" , arg);
100+ return ;
101+ }
102+ auto beginTimestamp = endKey + 1 ;
103+ auto endTimestamp = arg.find (' ' , beginTimestamp + 1 );
104+ if (endTimestamp == std::string::npos) {
105+ LOGP (error, " Cannot parse value in PUT command with arg {}" , arg);
106+ return ;
107+ }
108+ auto beginValue = endTimestamp + 1 ;
109+ auto endValue = arg.size ();
85110
86- // Find the StateInfo in the dataProcessingStateManager with the same key. Insert a new one if not found.
87- auto & infos = info.dataProcessingStateManager .infos ;
88- auto & states = info.dataProcessingStateManager .states ;
89- auto it = std::lower_bound (infos.begin (), infos.end (), key, [](DataProcessingStateManager::StateInfo const & stateInfo, std::string_view const & key) { return stateInfo.name < key; });
90- if (it == infos.end () || it->name != key) {
91- it = infos.insert (it, DataProcessingStateManager::StateInfo{std::string{key}, timestampInt, (int )states.size ()});
92- states.resize (states.size () + 1 );
93- memcpy (states.back ().data (), value.data (), value.size ());
94- states.back ()[value.size ()] = ' \0 ' ;
95- LOG (debug) << " New state" << key << " with timestamp " << timestamp << " and value " << value;
96- } else {
97- it->lastUpdate = timestampInt;
98- memcpy (states[it->index ].data (), value.data (), value.size ());
99- states[it->index ][value.size ()] = ' \0 ' ;
100- LOG (debug) << " Updated state" << key << " with timestamp " << timestamp << " and value " << value;
111+ std::string_view key (arg.data () + beginKey, endKey - beginKey);
112+ std::string_view timestamp (arg.data () + beginTimestamp, endTimestamp - beginTimestamp);
113+ std::string_view value (arg.data () + beginValue, endValue - beginValue);
114+ // Find the assocaiated StateSpec and get the id.
115+ auto spec = std::find_if (states.stateSpecs .begin (), states.stateSpecs .end (), [&key](auto const & spec) {
116+ return spec.name == key;
117+ });
118+ if (spec == states.stateSpecs .end ()) {
119+ LOGP (warn, " Cannot find state {} in the state specs for pid {}" , key.data ());
120+ return ;
121+ }
122+ if (value.data () == nullptr ) {
123+ LOGP (debug, " State {} value is null skipping" , key.data ());
124+ return ;
101125 }
126+ // / Notice this will remap the actual time to the time we received the command.
127+ // / This should not be a problem, because we have separate states per device.
128+ states.updateState (DataProcessingStates::CommandSpec{.id = spec->stateId , .size = (int )value.size (), .data = value.data ()});
129+ states.processCommandQueue ();
102130 });
103131 } else {
104132 LOGP (error, " Unknown command {} with argument {}" , command, arg);
0 commit comments