@@ -627,16 +627,32 @@ bool DataProcessingDevice::tryDispatchComputation()
627627 };
628628
629629 // Error handling means printing the error and updating the metric
630- auto errorHandling = [&errorCallback, &monitoringService, &serviceRegistry](std::exception& e, InputRecord& record) {
631- StateMonitoring<DataProcessingStatus>::moveTo (DataProcessingStatus::IN_DPL_ERROR_CALLBACK);
632- LOG (ERROR) << " Exception caught: " << e.what () << std::endl;
633- if (errorCallback) {
630+ std::function<void (std::exception & e, InputRecord & record)> errorHandling = nullptr ;
631+ if (errorCallback != nullptr ) {
632+ errorHandling = [&errorCallback, &monitoringService,
633+ &serviceRegistry](std::exception& e, InputRecord& record) {
634+ StateMonitoring<DataProcessingStatus>::moveTo (DataProcessingStatus::IN_DPL_ERROR_CALLBACK);
635+ LOG (ERROR) << " Exception caught: " << e.what () << std::endl;
634636 monitoringService.send ({1 , " error" });
635637 ErrorContext errorContext{record, serviceRegistry, e};
636638 errorCallback (errorContext);
637- }
638- StateMonitoring<DataProcessingStatus>::moveTo (DataProcessingStatus::IN_DPL_OVERHEAD);
639- };
639+ StateMonitoring<DataProcessingStatus>::moveTo (DataProcessingStatus::IN_DPL_OVERHEAD);
640+ };
641+ } else {
642+ errorHandling = [&monitoringService, &errorPolicy = mErrorPolicy ,
643+ &serviceRegistry](std::exception& e, InputRecord& record) {
644+ StateMonitoring<DataProcessingStatus>::moveTo (DataProcessingStatus::IN_DPL_ERROR_CALLBACK);
645+ LOG (ERROR) << " Exception caught: " << e.what () << std::endl;
646+ monitoringService.send ({1 , " error" });
647+ switch (errorPolicy) {
648+ case TerminationPolicy::QUIT:
649+ throw e;
650+ default :
651+ break ;
652+ }
653+ StateMonitoring<DataProcessingStatus>::moveTo (DataProcessingStatus::IN_DPL_OVERHEAD);
654+ };
655+ }
640656
641657 // I need a preparation step which gets the current timeslice id and
642658 // propagates it to the various contextes (i.e. the actual entities which
@@ -780,64 +796,64 @@ bool DataProcessingDevice::tryDispatchComputation()
780796
781797 if (canDispatchSomeComputation () == false ) {
782798 return false ;
783- }
784-
785- for (auto action : getReadyActions ()) {
786- if (action.op == CompletionPolicy::CompletionOp::Wait) {
787- continue ;
788799 }
789800
790- prepareAllocatorForCurrentTimeSlice (TimesliceSlot{action.slot });
791- InputRecord record = fillInputs (action.slot );
792- if (action.op == CompletionPolicy::CompletionOp::Discard) {
793- if (forwards.empty () == false ) {
794- forwardInputs (action.slot , record);
801+ for (auto action : getReadyActions ()) {
802+ if (action.op == CompletionPolicy::CompletionOp::Wait) {
795803 continue ;
796804 }
797- }
798- auto tStart = std::chrono::high_resolution_clock::now ();
799- for (size_t ai = 0 ; ai != record.size (); ai++) {
800- auto cacheId = action.slot .index * record.size () + ai;
801- auto state = record.isValid (ai) ? 2 : 0 ;
802- mStats .relayerState .resize (std::max (cacheId + 1 , mStats .relayerState .size ()), 0 );
803- mStats .relayerState [cacheId] = state;
804- }
805- try {
806- if (mState .quitRequested == false ) {
807- dispatchProcessing (action.slot , record);
805+
806+ prepareAllocatorForCurrentTimeSlice (TimesliceSlot{action.slot });
807+ InputRecord record = fillInputs (action.slot );
808+ if (action.op == CompletionPolicy::CompletionOp::Discard) {
809+ if (forwards.empty () == false ) {
810+ forwardInputs (action.slot , record);
811+ continue ;
812+ }
808813 }
809- } catch (std::exception& e) {
810- errorHandling (e, record);
811- }
812- for (size_t ai = 0 ; ai != record.size (); ai++) {
813- auto cacheId = action.slot .index * record.size () + ai;
814- auto state = record.isValid (ai) ? 3 : 0 ;
815- mStats .relayerState .resize (std::max (cacheId + 1 , mStats .relayerState .size ()), 0 );
816- mStats .relayerState [cacheId] = state;
817- }
818- auto tEnd = std::chrono::high_resolution_clock::now ();
819- mStats .lastElapsedTimeMs = std::chrono::duration<double , std::milli>(tEnd - tStart).count ();
820- mStats .lastTotalProcessedSize = calculateTotalInputRecordSize (record);
821- mStats .lastLatency = calculateInputRecordLatency (record, tStart);
822- // We forward inputs only when we consume them. If we simply Process them,
823- // we keep them for next message arriving.
824- if (action.op == CompletionPolicy::CompletionOp::Consume) {
825- if (forwards.empty () == false ) {
826- forwardInputs (action.slot , record);
814+ auto tStart = std::chrono::high_resolution_clock::now ();
815+ for (size_t ai = 0 ; ai != record.size (); ai++) {
816+ auto cacheId = action.slot .index * record.size () + ai;
817+ auto state = record.isValid (ai) ? 2 : 0 ;
818+ mStats .relayerState .resize (std::max (cacheId + 1 , mStats .relayerState .size ()), 0 );
819+ mStats .relayerState [cacheId] = state;
820+ }
821+ try {
822+ if (mState .quitRequested == false ) {
823+ dispatchProcessing (action.slot , record);
824+ }
825+ } catch (std::exception& e) {
826+ errorHandling (e, record);
827+ }
828+ for (size_t ai = 0 ; ai != record.size (); ai++) {
829+ auto cacheId = action.slot .index * record.size () + ai;
830+ auto state = record.isValid (ai) ? 3 : 0 ;
831+ mStats .relayerState .resize (std::max (cacheId + 1 , mStats .relayerState .size ()), 0 );
832+ mStats .relayerState [cacheId] = state;
833+ }
834+ auto tEnd = std::chrono::high_resolution_clock::now ();
835+ mStats .lastElapsedTimeMs = std::chrono::duration<double , std::milli>(tEnd - tStart).count ();
836+ mStats .lastTotalProcessedSize = calculateTotalInputRecordSize (record);
837+ mStats .lastLatency = calculateInputRecordLatency (record, tStart);
838+ // We forward inputs only when we consume them. If we simply Process them,
839+ // we keep them for next message arriving.
840+ if (action.op == CompletionPolicy::CompletionOp::Consume) {
841+ if (forwards.empty () == false ) {
842+ forwardInputs (action.slot , record);
843+ }
844+ } else if (action.op == CompletionPolicy::CompletionOp::Process) {
845+ cleanTimers (action.slot , record);
827846 }
828- } else if (action.op == CompletionPolicy::CompletionOp::Process) {
829- cleanTimers (action.slot , record);
830847 }
831- }
832- // We now broadcast the end of stream if it was requested
833- if (mState .streaming == StreamingState::EndOfStreaming) {
834- for (auto & channel : mSpec .outputChannels ) {
835- DataProcessingHelpers::sendEndOfStream (*this , channel);
848+ // We now broadcast the end of stream if it was requested
849+ if (mState .streaming == StreamingState::EndOfStreaming) {
850+ for (auto & channel : mSpec .outputChannels ) {
851+ DataProcessingHelpers::sendEndOfStream (*this , channel);
852+ }
853+ switchState (StreamingState::Idle);
836854 }
837- switchState (StreamingState::Idle);
838- }
839855
840- return true ;
856+ return true ;
841857}
842858
843859void DataProcessingDevice::error (const char * msg)
0 commit comments