@@ -28,8 +28,12 @@ bool ControlServiceHelpers::parseControl(std::string_view const& s, std::match_r
2828 const static std::regex controlRE1 (" ^READY_TO_(QUIT)_(ME|ALL)" , std::regex::optimize);
2929 const static std::regex controlRE2 (" ^(NOTIFY_STREAMING_STATE) (IDLE|STREAMING|EOS)" , std::regex::optimize);
3030 const static std::regex controlRE3 (" ^(NOTIFY_DEVICE_STATE) ([A-Z ]*)" , std::regex::optimize);
31+ const static std::regex controlRE4 (" ^(PUT) (.*)" , std::regex::optimize);
3132 std::string_view sv = s.substr (pos + strlen (" CONTROL_ACTION: " ));
32- return std::regex_search (sv.begin (), sv.end (), match, controlRE1) || std::regex_search (sv.begin (), sv.end (), match, controlRE2) || std::regex_search (sv.begin (), sv.end (), match, controlRE3);
33+ return std::regex_search (sv.begin (), sv.end (), match, controlRE1) ||
34+ std::regex_search (sv.begin (), sv.end (), match, controlRE2) ||
35+ std::regex_search (sv.begin (), sv.end (), match, controlRE3) ||
36+ std::regex_search (sv.begin (), sv.end (), match, controlRE4);
3337}
3438
3539void ControlServiceHelpers::processCommand (std::vector<DeviceInfo>& infos,
@@ -63,6 +67,39 @@ void ControlServiceHelpers::processCommand(std::vector<DeviceInfo>& infos,
6367 doToMatchingPid (infos, pid, [](DeviceInfo& info) { info.streamingState = StreamingState::EndOfStreaming; });
6468 } else if (command == " NOTIFY_DEVICE_STATE" ) {
6569 doToMatchingPid (infos, pid, [arg](DeviceInfo& info) { info.deviceState = arg; info.providedState ++; });
70+ } else if (command == " PUT" ) {
71+ doToMatchingPid (infos, pid, [&arg](DeviceInfo& info) {
72+ // Tokenize arg for the first two empty space using strtok_r
73+ // and create string_views associated to it.
74+ char * brk;
75+ char * beginKey = strtok_r ((char *)arg.data (), " " , &brk);
76+ char * endKey = strtok_r (nullptr , " " , &brk);
77+ char * beginTimestamp = endKey;
78+ char * endTimestamp = strtok_r (nullptr , " " , &brk);
79+ char * beginValue = endTimestamp;
80+ char * endValue = (char *)arg.data () + arg.size ();
81+ std::string_view key (beginKey, endKey - beginKey);
82+ std::string_view timestamp (beginTimestamp, endTimestamp - beginTimestamp);
83+ std::string_view value (beginValue, endValue - beginValue);
84+ int timestampInt = std::stoll (timestamp.data ());
85+
86+ // Find the StateInfo in the dataProcessingStateManager with the same key. Insert a new one if not found.
87+ auto & infos = info.dataProcessingStateManager .infos ;
88+ auto & states = info.dataProcessingStateManager .states ;
89+ auto it = std::lower_bound (infos.begin (), infos.end (), key, [](DataProcessingStateManager::StateInfo const & stateInfo, std::string_view const & key) { return stateInfo.name < key; });
90+ if (it == infos.end () || it->name != key) {
91+ it = infos.insert (it, DataProcessingStateManager::StateInfo{std::string{key}, timestampInt, (int )states.size ()});
92+ states.resize (states.size () + 1 );
93+ memcpy (states.back ().data (), value.data (), value.size ());
94+ states.back ()[value.size ()] = ' \0 ' ;
95+ LOG (debug) << " New state" << key << " with timestamp " << timestamp << " and value " << value;
96+ } else {
97+ it->lastUpdate = timestampInt;
98+ memcpy (states[it->index ].data (), value.data (), value.size ());
99+ states[it->index ][value.size ()] = ' \0 ' ;
100+ LOG (debug) << " Updated state" << key << " with timestamp " << timestamp << " and value " << value;
101+ }
102+ });
66103 } else {
67104 LOGP (error, " Unknown command {} with argument {}" , command, arg);
68105 }
0 commit comments