Skip to content

Commit 70e566e

Browse files
committed
DPL: add error policy
By default one single error terminates the workflow when running in batch mode. Use --error-policy=wait to wait. By default one single error is reported but not fatal when running in GUI mode. use --error-policy=quit to quit the whole workflow and the GUI.
1 parent 74a961d commit 70e566e

5 files changed

Lines changed: 136 additions & 76 deletions

File tree

Framework/Core/include/Framework/DataProcessingDevice.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include "Framework/InputRoute.h"
2727
#include "Framework/ForwardRoute.h"
2828
#include "Framework/TimingInfo.h"
29+
#include "Framework/TerminationPolicy.h"
2930

3031
#include <fairmq/FairMQDevice.h>
3132
#include <fairmq/FairMQParts.h>
@@ -51,6 +52,7 @@ class DataProcessingDevice : public FairMQDevice
5152
void Reset() final;
5253
void ResetTask() final;
5354
bool ConditionalRun() final;
55+
void SetErrorPolicy(enum TerminationPolicy policy) { mErrorPolicy = policy; }
5456

5557
protected:
5658
bool handleData(FairMQParts&, InputChannelInfo&);
@@ -86,6 +88,7 @@ class DataProcessingDevice : public FairMQDevice
8688
DataProcessingStats mStats; /// Stats about the actual data processing.
8789
int mCurrentBackoff = 0; /// The current exponential backoff value.
8890
std::vector<FairMQRegionInfo> mPendingRegionInfos; /// A list of the region infos not yet notified.
91+
enum TerminationPolicy mErrorPolicy = TerminationPolicy::WAIT; /// What to do when an error arises
8992
};
9093

9194
} // namespace o2::framework
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
// Copyright CERN and copyright holders of ALICE O2. This software is
2+
// distributed under the terms of the GNU General Public License v3 (GPL
3+
// Version 3), copied verbatim in the file "COPYING".
4+
//
5+
// See http://alice-o2.web.cern.ch/license for full licensing information.
6+
//
7+
// In applying this license CERN does not waive the privileges and immunities
8+
// granted to it by virtue of its status as an Intergovernmental Organization
9+
// or submit itself to any jurisdiction.
10+
#ifndef O2_FRAMEWORK_TERMINATIONPOLICY_H_
11+
#define O2_FRAMEWORK_TERMINATIONPOLICY_H_
12+
13+
namespace o2::framework
14+
{
15+
16+
/// These are the possible actions we can do
17+
/// when a workflow is deemed complete (e.g. when we are done
18+
/// reading from file) or when a data processor has
19+
/// an error.
20+
enum struct TerminationPolicy { QUIT,
21+
WAIT,
22+
RESTART };
23+
24+
} // namespace o2::framework
25+
26+
#endif // O2_FRAMEWORK_TERMINATIONPOLICY_H_

Framework/Core/src/DataProcessingDevice.cxx

Lines changed: 72 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -627,16 +627,32 @@ bool DataProcessingDevice::tryDispatchComputation()
627627
};
628628

629629
// Error handling means printing the error and updating the metric
630-
auto errorHandling = [&errorCallback, &monitoringService, &serviceRegistry](std::exception& e, InputRecord& record) {
631-
StateMonitoring<DataProcessingStatus>::moveTo(DataProcessingStatus::IN_DPL_ERROR_CALLBACK);
632-
LOG(ERROR) << "Exception caught: " << e.what() << std::endl;
633-
if (errorCallback) {
630+
std::function<void(std::exception & e, InputRecord & record)> errorHandling = nullptr;
631+
if (errorCallback != nullptr) {
632+
errorHandling = [&errorCallback, &monitoringService,
633+
&serviceRegistry](std::exception& e, InputRecord& record) {
634+
StateMonitoring<DataProcessingStatus>::moveTo(DataProcessingStatus::IN_DPL_ERROR_CALLBACK);
635+
LOG(ERROR) << "Exception caught: " << e.what() << std::endl;
634636
monitoringService.send({1, "error"});
635637
ErrorContext errorContext{record, serviceRegistry, e};
636638
errorCallback(errorContext);
637-
}
638-
StateMonitoring<DataProcessingStatus>::moveTo(DataProcessingStatus::IN_DPL_OVERHEAD);
639-
};
639+
StateMonitoring<DataProcessingStatus>::moveTo(DataProcessingStatus::IN_DPL_OVERHEAD);
640+
};
641+
} else {
642+
errorHandling = [&monitoringService, &errorPolicy = mErrorPolicy,
643+
&serviceRegistry](std::exception& e, InputRecord& record) {
644+
StateMonitoring<DataProcessingStatus>::moveTo(DataProcessingStatus::IN_DPL_ERROR_CALLBACK);
645+
LOG(ERROR) << "Exception caught: " << e.what() << std::endl;
646+
monitoringService.send({1, "error"});
647+
switch (errorPolicy) {
648+
case TerminationPolicy::QUIT:
649+
throw e;
650+
default:
651+
break;
652+
}
653+
StateMonitoring<DataProcessingStatus>::moveTo(DataProcessingStatus::IN_DPL_OVERHEAD);
654+
};
655+
}
640656

641657
// I need a preparation step which gets the current timeslice id and
642658
// propagates it to the various contextes (i.e. the actual entities which
@@ -780,64 +796,64 @@ bool DataProcessingDevice::tryDispatchComputation()
780796

781797
if (canDispatchSomeComputation() == false) {
782798
return false;
783-
}
784-
785-
for (auto action : getReadyActions()) {
786-
if (action.op == CompletionPolicy::CompletionOp::Wait) {
787-
continue;
788799
}
789800

790-
prepareAllocatorForCurrentTimeSlice(TimesliceSlot{action.slot});
791-
InputRecord record = fillInputs(action.slot);
792-
if (action.op == CompletionPolicy::CompletionOp::Discard) {
793-
if (forwards.empty() == false) {
794-
forwardInputs(action.slot, record);
801+
for (auto action : getReadyActions()) {
802+
if (action.op == CompletionPolicy::CompletionOp::Wait) {
795803
continue;
796804
}
797-
}
798-
auto tStart = std::chrono::high_resolution_clock::now();
799-
for (size_t ai = 0; ai != record.size(); ai++) {
800-
auto cacheId = action.slot.index * record.size() + ai;
801-
auto state = record.isValid(ai) ? 2 : 0;
802-
mStats.relayerState.resize(std::max(cacheId + 1, mStats.relayerState.size()), 0);
803-
mStats.relayerState[cacheId] = state;
804-
}
805-
try {
806-
if (mState.quitRequested == false) {
807-
dispatchProcessing(action.slot, record);
805+
806+
prepareAllocatorForCurrentTimeSlice(TimesliceSlot{action.slot});
807+
InputRecord record = fillInputs(action.slot);
808+
if (action.op == CompletionPolicy::CompletionOp::Discard) {
809+
if (forwards.empty() == false) {
810+
forwardInputs(action.slot, record);
811+
continue;
812+
}
808813
}
809-
} catch (std::exception& e) {
810-
errorHandling(e, record);
811-
}
812-
for (size_t ai = 0; ai != record.size(); ai++) {
813-
auto cacheId = action.slot.index * record.size() + ai;
814-
auto state = record.isValid(ai) ? 3 : 0;
815-
mStats.relayerState.resize(std::max(cacheId + 1, mStats.relayerState.size()), 0);
816-
mStats.relayerState[cacheId] = state;
817-
}
818-
auto tEnd = std::chrono::high_resolution_clock::now();
819-
mStats.lastElapsedTimeMs = std::chrono::duration<double, std::milli>(tEnd - tStart).count();
820-
mStats.lastTotalProcessedSize = calculateTotalInputRecordSize(record);
821-
mStats.lastLatency = calculateInputRecordLatency(record, tStart);
822-
// We forward inputs only when we consume them. If we simply Process them,
823-
// we keep them for next message arriving.
824-
if (action.op == CompletionPolicy::CompletionOp::Consume) {
825-
if (forwards.empty() == false) {
826-
forwardInputs(action.slot, record);
814+
auto tStart = std::chrono::high_resolution_clock::now();
815+
for (size_t ai = 0; ai != record.size(); ai++) {
816+
auto cacheId = action.slot.index * record.size() + ai;
817+
auto state = record.isValid(ai) ? 2 : 0;
818+
mStats.relayerState.resize(std::max(cacheId + 1, mStats.relayerState.size()), 0);
819+
mStats.relayerState[cacheId] = state;
820+
}
821+
try {
822+
if (mState.quitRequested == false) {
823+
dispatchProcessing(action.slot, record);
824+
}
825+
} catch (std::exception& e) {
826+
errorHandling(e, record);
827+
}
828+
for (size_t ai = 0; ai != record.size(); ai++) {
829+
auto cacheId = action.slot.index * record.size() + ai;
830+
auto state = record.isValid(ai) ? 3 : 0;
831+
mStats.relayerState.resize(std::max(cacheId + 1, mStats.relayerState.size()), 0);
832+
mStats.relayerState[cacheId] = state;
833+
}
834+
auto tEnd = std::chrono::high_resolution_clock::now();
835+
mStats.lastElapsedTimeMs = std::chrono::duration<double, std::milli>(tEnd - tStart).count();
836+
mStats.lastTotalProcessedSize = calculateTotalInputRecordSize(record);
837+
mStats.lastLatency = calculateInputRecordLatency(record, tStart);
838+
// We forward inputs only when we consume them. If we simply Process them,
839+
// we keep them for next message arriving.
840+
if (action.op == CompletionPolicy::CompletionOp::Consume) {
841+
if (forwards.empty() == false) {
842+
forwardInputs(action.slot, record);
843+
}
844+
} else if (action.op == CompletionPolicy::CompletionOp::Process) {
845+
cleanTimers(action.slot, record);
827846
}
828-
} else if (action.op == CompletionPolicy::CompletionOp::Process) {
829-
cleanTimers(action.slot, record);
830847
}
831-
}
832-
// We now broadcast the end of stream if it was requested
833-
if (mState.streaming == StreamingState::EndOfStreaming) {
834-
for (auto& channel : mSpec.outputChannels) {
835-
DataProcessingHelpers::sendEndOfStream(*this, channel);
848+
// We now broadcast the end of stream if it was requested
849+
if (mState.streaming == StreamingState::EndOfStreaming) {
850+
for (auto& channel : mSpec.outputChannels) {
851+
DataProcessingHelpers::sendEndOfStream(*this, channel);
852+
}
853+
switchState(StreamingState::Idle);
836854
}
837-
switchState(StreamingState::Idle);
838-
}
839855

840-
return true;
856+
return true;
841857
}
842858

843859
void DataProcessingDevice::error(const char* msg)

Framework/Core/src/DriverInfo.h

Lines changed: 8 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@
88
// granted to it by virtue of its status as an Intergovernmental Organization
99
// or submit itself to any jurisdiction.
1010

11-
#ifndef FRAMEWORK_DRIVER_INFO_H
12-
#define FRAMEWORK_DRIVER_INFO_H
11+
#ifndef O2_FRAMEWORK_DRIVERINFO_H_
12+
#define O2_FRAMEWORK_DRIVERINFO_H_
1313

1414
#include <chrono>
1515
#include <cstddef>
@@ -21,11 +21,10 @@
2121

2222
#include "Framework/ChannelConfigurationPolicy.h"
2323
#include "Framework/ConfigParamSpec.h"
24+
#include "Framework/TerminationPolicy.h"
2425
#include "DataProcessorInfo.h"
2526

26-
namespace o2
27-
{
28-
namespace framework
27+
namespace o2::framework
2928
{
3029

3130
class ConfigContext;
@@ -73,13 +72,6 @@ enum struct DriverState {
7372
LAST
7473
};
7574

76-
/// These are the possible actions we can do
77-
/// when a workflow is deemed complete (e.g. when we are done
78-
/// reading from file).
79-
enum struct TerminationPolicy { QUIT,
80-
WAIT,
81-
RESTART };
82-
8375
/// Information about the driver process (i.e. / the one which calculates the
8476
/// topology and actually spawns the devices )
8577
struct DriverInfo {
@@ -114,6 +106,8 @@ struct DriverInfo {
114106
bool batch;
115107
/// What we should do when the workflow is completed.
116108
enum TerminationPolicy terminationPolicy;
109+
/// What we should do when one device in the workflow has an error
110+
enum TerminationPolicy errorPolicy;
117111
/// The offset at which the process was started.
118112
std::chrono::time_point<std::chrono::steady_clock> startTime;
119113
/// The optional timeout after which the driver will request
@@ -146,7 +140,6 @@ struct DriverInfo {
146140
std::string uniqueWorkflowId = "";
147141
};
148142

149-
} // namespace framework
150-
} // namespace o2
143+
} // namespace o2::framework
151144

152-
#endif
145+
#endif // O2_FRAMEWORK_DRIVERINFO_H_

Framework/Core/src/runDataProcessing.cxx

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -581,14 +581,19 @@ void processChildrenOutput(DriverInfo& driverInfo, DeviceInfos& infos, DeviceSpe
581581
}
582582

583583
// Process all the sigchld which are pending
584-
void processSigChild(DeviceInfos& infos)
584+
// @return wether or not a given child exited with an error condition.
585+
bool processSigChild(DeviceInfos& infos)
585586
{
587+
bool hasError = false;
586588
while (true) {
587589
int status;
588590
pid_t pid = waitpid((pid_t)(-1), &status, WNOHANG);
589591
if (pid > 0) {
590592
int es = WEXITSTATUS(status);
591593

594+
if (es) {
595+
hasError = true;
596+
}
592597
for (auto& info : infos) {
593598
if (info.pid == pid) {
594599
info.active = false;
@@ -600,6 +605,7 @@ void processSigChild(DeviceInfos& infos)
600605
break;
601606
}
602607
}
608+
return hasError;
603609
}
604610

605611
// Creates the sink for FairLogger / InfoLogger integration
@@ -657,7 +663,7 @@ auto createInfoLoggerSinkHelper(std::unique_ptr<InfoLogger>& logger, std::unique
657663
};
658664
};
659665

660-
int doChild(int argc, char** argv, const o2::framework::DeviceSpec& spec)
666+
int doChild(int argc, char** argv, const o2::framework::DeviceSpec& spec, TerminationPolicy errorPolicy)
661667
{
662668
fair::Logger::SetConsoleColor(false);
663669
LOG(INFO) << "Spawing new device " << spec.id << " in process with pid " << getpid();
@@ -707,7 +713,8 @@ int doChild(int argc, char** argv, const o2::framework::DeviceSpec& spec)
707713
&serviceRegistry,
708714
&infoLoggerContext,
709715
&deviceState,
710-
&timesliceIndex](fair::mq::DeviceRunner& r) {
716+
&timesliceIndex,
717+
&errorPolicy](fair::mq::DeviceRunner& r) {
711718
localRootFileService = std::make_unique<LocalRootFileService>();
712719
deviceState = std::make_unique<DeviceState>();
713720
textControlService = std::make_unique<TextControlService>(serviceRegistry, *deviceState.get());
@@ -746,6 +753,7 @@ int doChild(int argc, char** argv, const o2::framework::DeviceSpec& spec)
746753
// FairMQ API (one which uses a shared_ptr, the other one a unique_ptr.
747754
decltype(r.fDevice) device;
748755
device = std::move(make_matching<decltype(device), DataProcessingDevice>(spec, serviceRegistry, *deviceState.get()));
756+
dynamic_cast<DataProcessingDevice*>(device.get())->SetErrorPolicy(errorPolicy);
749757

750758
serviceRegistry.get<RawDeviceService>().setDevice(device.get());
751759
r.fDevice = std::move(device);
@@ -820,6 +828,7 @@ int runStateMachine(DataProcessorSpecs const& workflow,
820828
driverInfo.batch = true;
821829
}
822830
bool guiQuitRequested = false;
831+
bool hasError = false;
823832

824833
auto frameLast = std::chrono::high_resolution_clock::now();
825834
auto inputProcessingLast = frameLast;
@@ -940,7 +949,7 @@ int runStateMachine(DataProcessorSpecs const& workflow,
940949
}
941950
for (auto& spec : deviceSpecs) {
942951
if (spec.id == frameworkId) {
943-
return doChild(driverInfo.argc, driverInfo.argv, spec);
952+
return doChild(driverInfo.argc, driverInfo.argv, spec, driverInfo.errorPolicy);
944953
}
945954
}
946955
{
@@ -1079,7 +1088,7 @@ int runStateMachine(DataProcessorSpecs const& workflow,
10791088
sigchld_requested = false;
10801089
driverInfo.sigchldRequested = false;
10811090
processChildrenOutput(driverInfo, infos, deviceSpecs, controls, metricsInfos);
1082-
processSigChild(infos);
1091+
hasError = processSigChild(infos);
10831092
if (areAllChildrenGone(infos) == true &&
10841093
(guiQuitRequested || (checkIfCanExit(infos) == true) || graceful_exit)) {
10851094
// We move to the exit, regardless of where we were
@@ -1090,6 +1099,11 @@ int runStateMachine(DataProcessorSpecs const& workflow,
10901099
(guiQuitRequested || checkIfCanExit(infos) == true || graceful_exit)) {
10911100
driverInfo.states.push_back(DriverState::HANDLE_CHILDREN);
10921101
driverInfo.states.push_back(DriverState::GUI);
1102+
} else if (hasError && driverInfo.errorPolicy == TerminationPolicy::QUIT &&
1103+
!(guiQuitRequested || checkIfCanExit(infos) == true || graceful_exit)) {
1104+
graceful_exit = 1;
1105+
driverInfo.states.push_back(DriverState::GUI);
1106+
driverInfo.states.push_back(DriverState::QUIT_REQUESTED);
10931107
} else {
10941108
driverInfo.states.push_back(DriverState::GUI);
10951109
}
@@ -1411,6 +1425,7 @@ int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow,
14111425
currentWorkflowOptions};
14121426

14131427
enum TerminationPolicy policy;
1428+
enum TerminationPolicy errorPolicy;
14141429
bpo::options_description executorOptions("Executor options");
14151430
const char* helpDescription = "print help: short, full, executor, or processor name";
14161431
executorOptions.add_options() //
@@ -1425,6 +1440,8 @@ int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow,
14251440
("port-range,pr", bpo::value<unsigned short>()->default_value(1000), "ports in range") //
14261441
("completion-policy,c", bpo::value<TerminationPolicy>(&policy)->default_value(TerminationPolicy::QUIT), //
14271442
"what to do when processing is finished: quit, wait") //
1443+
("error-policy,c", bpo::value<TerminationPolicy>(&errorPolicy)->default_value(TerminationPolicy::QUIT), //
1444+
"what to do when a device has an error: quit, wait") //
14281445
("graphviz,g", bpo::value<bool>()->zero_tokens()->default_value(false), "produce graph output") //
14291446
("timeout,t", bpo::value<double>()->default_value(0), "timeout after which to exit") //
14301447
("dds,D", bpo::value<bool>()->zero_tokens()->default_value(false), "create DDS configuration") //
@@ -1628,6 +1645,11 @@ int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow,
16281645
driverInfo.argv = argv;
16291646
driverInfo.batch = varmap["batch"].as<bool>();
16301647
driverInfo.terminationPolicy = varmap["completion-policy"].as<TerminationPolicy>();
1648+
if (varmap["error-policy"].defaulted() && driverInfo.batch == false) {
1649+
driverInfo.errorPolicy = TerminationPolicy::WAIT;
1650+
} else {
1651+
driverInfo.errorPolicy = varmap["error-policy"].as<TerminationPolicy>();
1652+
}
16311653
driverInfo.startTime = std::chrono::steady_clock::now();
16321654
driverInfo.timeout = varmap["timeout"].as<double>();
16331655
driverInfo.deployHostname = varmap["hostname"].as<std::string>();

0 commit comments

Comments
 (0)