Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 18 additions & 8 deletions Framework/Core/include/Framework/DataInputDirector.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@ namespace framework
{
using namespace rapidjson;

struct FileNameHolder {
std::string fileName;
int numberOfTimeFrames = 0;
std::vector<std::string> listOfTimeFrameKeys;
};
FileNameHolder* makeFileNameHolder(std::string fileName);

struct DataInputDescriptor {
/// Holds information concerning the reading of an aod table.
/// The information includes the table specification, treename,
Expand All @@ -44,31 +51,34 @@ struct DataInputDescriptor {
void setFilenamesRegex(std::string fn) { mFilenameRegex = fn; }
void setFilenamesRegex(std::string* fnptr) { mFilenameRegexPtr = fnptr; }

void setDefaultInputfiles(std::vector<std::string>* difnptr) { mdefaultFilenamesPtr = difnptr; }
void setDefaultInputfiles(std::vector<FileNameHolder*>* difnptr) { mdefaultFilenamesPtr = difnptr; }

void addFilename(std::string fn);
void addFileNameHolder(FileNameHolder* fn);
int fillInputfiles();

// getters
std::string getInputfilesFilename();
std::string getFilenamesRegexString();
std::regex getFilenamesRegex();
int getNumberInputfiles() { return mfilenames.size(); }
int getNumberTimeFrames() { return mtotalNumberTimeFrames; }

std::tuple<TFile*, std::string> getFileFolder(int counter);

TFile* getInputFile(int counter);
void closeInputFile();
std::string getInputFilename(int counter);
bool isAlienSupportOn() { return mAlienSupport; }

private:
std::string minputfilesFile = "";
std::string* minputfilesFilePtr = nullptr;
std::string mFilenameRegex = "";
std::string* mFilenameRegexPtr = nullptr;
std::vector<std::string> mfilenames;
std::vector<std::string>* mdefaultFilenamesPtr = nullptr;
std::vector<FileNameHolder*> mfilenames;
std::vector<FileNameHolder*>* mdefaultFilenamesPtr = nullptr;
TFile* mcurrentFile = nullptr;
bool mAlienSupport = false;

int mtotalNumberTimeFrames = 0;
};

struct DataInputDirector {
Expand All @@ -94,7 +104,7 @@ struct DataInputDirector {
// getters
DataInputDescriptor* getDataInputDescriptor(header::DataHeader dh);
std::unique_ptr<TTreeReader> getTreeReader(header::DataHeader dh, int counter, std::string treeName);
std::string getInputFilename(header::DataHeader dh, int counter);
std::tuple<TFile*, std::string> getFileFolder(header::DataHeader dh, int counter);
TTree* getDataTree(header::DataHeader dh, int counter);
int getNumberInputDescriptors() { return mdataInputDescriptors.size(); }

Expand All @@ -104,7 +114,7 @@ struct DataInputDirector {
std::string mFilenameRegex;
std::string* const mFilenameRegexPtr = &mFilenameRegex;
DataInputDescriptor* mdefaultDataInputDescriptor = nullptr;
std::vector<std::string> mdefaultInputFiles;
std::vector<FileNameHolder*> mdefaultInputFiles;
std::vector<DataInputDescriptor*> mdataInputDescriptors;

bool mDebugMode = false;
Expand Down
136 changes: 84 additions & 52 deletions Framework/Core/src/DataInputDirector.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,48 @@ namespace framework
{
using namespace rapidjson;

FileNameHolder* makeFileNameHolder(std::string fileName)
{
auto fileNameHolder = new FileNameHolder();
fileNameHolder->fileName = fileName;

TFile file = TFile(fileName.c_str(), "R");
if (!file.IsOpen()) {
LOGP(ERROR, "\"{}\" can not be opened.", fileName);
return fileNameHolder;
}

// find TimeFrame folders
std::regex TFRegex = std::regex("TF_[0-9]+");
TList* keyList = file.GetListOfKeys();
for (auto key : *keyList) {
if (std::regex_match(((TObjString*)key)->GetString().Data(), TFRegex)) {
fileNameHolder->listOfTimeFrameKeys.emplace_back(std::string(((TObjString*)key)->GetString().Data()));
}
}
fileNameHolder->numberOfTimeFrames = fileNameHolder->listOfTimeFrameKeys.size();

return fileNameHolder;
}

DataInputDescriptor::DataInputDescriptor(bool alienSupport)
{
mAlienSupport = alienSupport;
}

void DataInputDescriptor::printOut()
{
LOGP(INFO, "DataInputDescriptor");
LOGP(INFO, " Table name : {}", tablename);
LOGP(INFO, " Tree name : {}", treename);
LOGP(INFO, " Input files file : {}", getInputfilesFilename());
LOGP(INFO, " File name regex : {}", getFilenamesRegexString());
LOGP(INFO, " Input files : {}", mfilenames.size());
for (auto fn : mfilenames)
LOGP(INFO, " {} {}", fn->fileName, fn->numberOfTimeFrames);
LOGP(INFO, " Total number of TF: {}", getNumberTimeFrames());
}

std::string DataInputDescriptor::getInputfilesFilename()
{
return (minputfilesFile.empty() && minputfilesFilePtr) ? (std::string)*minputfilesFilePtr : minputfilesFile;
Expand All @@ -44,35 +81,52 @@ std::regex DataInputDescriptor::getFilenamesRegex()
return std::regex(getFilenamesRegexString());
}

void DataInputDescriptor::addFilename(std::string fn)
void DataInputDescriptor::addFileNameHolder(FileNameHolder* fn)
{
if (!mAlienSupport && fn.rfind("alien://", 0) == 0) {
if (!mAlienSupport && fn->fileName.rfind("alien://", 0) == 0) {
LOG(debug) << "AliEn file requested. Enabling support.";
TGrid::Connect("alien://");
mAlienSupport = true;
}

mtotalNumberTimeFrames += fn->numberOfTimeFrames;
mfilenames.emplace_back(fn);
}

TFile* DataInputDescriptor::getInputFile(int counter)
std::tuple<TFile*, std::string> DataInputDescriptor::getFileFolder(int counter)
{
if (counter < getNumberInputfiles()) {
std::string filename("");
std::string directoryName("");

int cnt = mfilenames[0]->numberOfTimeFrames;
if (counter >= 0 && counter < getNumberTimeFrames()) {
for (int ii = 0; ii < getNumberInputfiles(); ii++) {
if (counter < cnt) {
filename = mfilenames[ii]->fileName;
cnt -= mfilenames[ii]->numberOfTimeFrames;
directoryName = (mfilenames[ii]->listOfTimeFrameKeys)[counter - cnt];
break;
} else {
cnt += mfilenames[ii + 1]->numberOfTimeFrames;
}
}

if (mcurrentFile) {
if (mcurrentFile->GetName() != mfilenames[counter]) {
if (mcurrentFile->GetName() != filename) {
closeInputFile();
mcurrentFile = TFile::Open(mfilenames[counter].c_str());
mcurrentFile = TFile::Open(filename.c_str());
}
} else {
mcurrentFile = TFile::Open(mfilenames[counter].c_str());
mcurrentFile = TFile::Open(filename.c_str());
}
if (!mcurrentFile) {
throw std::runtime_error(fmt::format("Couldn't open file \"{}\"!", mfilenames[counter]));
throw std::runtime_error(fmt::format("Couldn't open file \"{}\"!", filename));
}
} else {
closeInputFile();
}

return mcurrentFile;
return std::make_tuple(mcurrentFile, directoryName);
}

void DataInputDescriptor::closeInputFile()
Expand All @@ -98,7 +152,7 @@ int DataInputDescriptor::fillInputfiles()
while (std::getline(filelist, fileName)) {
if (getFilenamesRegexString().empty() ||
std::regex_match(fileName, getFilenamesRegex())) {
addFilename(fileName);
addFileNameHolder(makeFileNameHolder(fileName));
}
}
} catch (...) {
Expand All @@ -108,10 +162,10 @@ int DataInputDescriptor::fillInputfiles()
} else {
// 3. getFilenamesRegex() @ mdefaultFilenamesPtr
if (mdefaultFilenamesPtr) {
for (auto fileName : *mdefaultFilenamesPtr) {
for (auto fileNameHolder : *mdefaultFilenamesPtr) {
if (getFilenamesRegexString().empty() ||
std::regex_match(fileName, getFilenamesRegex())) {
addFilename(fileName);
std::regex_match(fileNameHolder->fileName, getFilenamesRegex())) {
addFileNameHolder(fileNameHolder);
}
}
}
Expand All @@ -120,29 +174,6 @@ int DataInputDescriptor::fillInputfiles()
return getNumberInputfiles();
}

std::string DataInputDescriptor::getInputFilename(int counter)
{
std::string filename("");
if (counter >= 0 && counter < getNumberInputfiles()) {
filename = mfilenames[counter];
}

return filename;
}

void DataInputDescriptor::printOut()
{
LOGP(INFO, "DataInputDescriptor");
LOGP(INFO, " Table name : {}", tablename);
LOGP(INFO, " Tree name : {}", treename);
LOGP(INFO, " Input files file : {}", getInputfilesFilename());
LOGP(INFO, " File name regex : {}", getFilenamesRegexString());
LOGP(INFO, " Input files : {}", mfilenames.size());
for (auto fn : mfilenames) {
LOGP(INFO, " {}", fn);
}
}

DataInputDirector::DataInputDirector()
{
createDefaultDataInputDescriptor();
Expand All @@ -154,7 +185,7 @@ DataInputDirector::DataInputDirector(std::string inputFile)
inputFile.erase(0, 1);
setInputfilesFile(inputFile);
} else {
mdefaultInputFiles.emplace_back(inputFile);
mdefaultInputFiles.emplace_back(makeFileNameHolder(inputFile));
}

createDefaultDataInputDescriptor();
Expand All @@ -163,7 +194,7 @@ DataInputDirector::DataInputDirector(std::string inputFile)
DataInputDirector::DataInputDirector(std::vector<std::string> inputFiles)
{
for (auto inputFile : inputFiles) {
mdefaultInputFiles.emplace_back(inputFile);
mdefaultInputFiles.emplace_back(makeFileNameHolder(inputFile));
}

createDefaultDataInputDescriptor();
Expand Down Expand Up @@ -278,13 +309,13 @@ bool DataInputDirector::readJsonDocument(Document* jsonDoc)
setInputfilesFile(fileName);
} else {
setInputfilesFile("");
mdefaultInputFiles.emplace_back(fileName);
mdefaultInputFiles.emplace_back(makeFileNameHolder(fileName));
}
} else if (didirItem[itemName].IsArray()) {
setInputfilesFile("");
auto fns = didirItem[itemName].GetArray();
for (auto& fn : fns) {
mdefaultInputFiles.emplace_back(fn.GetString());
mdefaultInputFiles.emplace_back(makeFileNameHolder(fn.GetString()));
}
} else {
LOGP(ERROR, "Check the JSON document! Item \"{}\" must be a string or an array!", itemName);
Expand Down Expand Up @@ -361,15 +392,15 @@ bool DataInputDirector::readJsonDocument(Document* jsonDoc)
} else {
if (didesc->getFilenamesRegexString().empty() ||
std::regex_match(fileName, didesc->getFilenamesRegex())) {
didesc->addFilename(fileName);
didesc->addFileNameHolder(makeFileNameHolder(fileName));
}
}
} else if (didescItem[itemName].IsArray()) {
auto fns = didescItem[itemName].GetArray();
for (auto& fn : fns) {
if (didesc->getFilenamesRegexString().empty() ||
std::regex_match(fn.GetString(), didesc->getFilenamesRegex())) {
didesc->addFilename(fn.GetString());
didesc->addFileNameHolder(makeFileNameHolder(fn.GetString()));
}
}
} else {
Expand Down Expand Up @@ -434,8 +465,9 @@ std::unique_ptr<TTreeReader> DataInputDirector::getTreeReader(header::DataHeader
didesc = mdefaultDataInputDescriptor;
}

auto file = didesc->getInputFile(counter);
auto [file, directory] = didesc->getFileFolder(counter);
if (file) {
treename = directory + "/" + treename;
reader = std::make_unique<TTreeReader>(treename.c_str(), file);
if (!reader) {
throw std::runtime_error(fmt::format(R"(Couldn't create TTreeReader for tree "{}" in file "{}")", treename, file->GetName()));
Expand All @@ -445,16 +477,16 @@ std::unique_ptr<TTreeReader> DataInputDirector::getTreeReader(header::DataHeader
return reader;
}

std::string DataInputDirector::getInputFilename(header::DataHeader dh, int counter)
std::tuple<TFile*, std::string> DataInputDirector::getFileFolder(header::DataHeader dh, int counter)
{
auto didesc = getDataInputDescriptor(dh);
// if NOT match then use defaultDataInputDescriptor
if (!didesc) {
didesc = mdefaultDataInputDescriptor;
}
auto filename = didesc->getInputFilename(counter);
auto [file, directory] = didesc->getFileFolder(counter);

return filename;
return std::make_tuple(file, directory);
}

TTree* DataInputDirector::getDataTree(header::DataHeader dh, int counter)
Expand All @@ -474,8 +506,9 @@ TTree* DataInputDirector::getDataTree(header::DataHeader dh, int counter)
treename = aod::datamodel::getTreeName(dh);
}

auto file = didesc->getInputFile(counter);
auto [file, directory] = didesc->getFileFolder(counter);
if (file) {
treename = directory + "/" + treename;
tree = (TTree*)file->Get(treename.c_str());
if (!tree) {
throw std::runtime_error(fmt::format(R"(Couldn't get TTree "{}" from "{}")", treename, file->GetName()));
Expand Down Expand Up @@ -506,9 +539,9 @@ bool DataInputDirector::isValid()

bool DataInputDirector::atEnd(int counter)
{
bool status = mdefaultDataInputDescriptor->getNumberInputfiles() <= counter;
bool status = mdefaultDataInputDescriptor->getNumberTimeFrames() <= counter;
for (auto didesc : mdataInputDescriptors) {
status &= (didesc->getNumberInputfiles() <= counter);
status &= (didesc->getNumberTimeFrames() <= counter);
}

return status;
Expand All @@ -520,9 +553,8 @@ void DataInputDirector::printOut()
LOGP(INFO, " Default input files file : {}", minputfilesFile);
LOGP(INFO, " Default file name regex : {}", mFilenameRegex);
LOGP(INFO, " Default file names : {}", mdefaultInputFiles.size());
for (auto const& fn : mdefaultInputFiles) {
LOGP(INFO, " {}", fn);
}
for (auto const& fn : mdefaultInputFiles)
LOGP(INFO, " {} {}", fn->fileName, fn->numberOfTimeFrames);
LOGP(INFO, " Default DataInputDescriptor:");
mdefaultDataInputDescriptor->printOut();
LOGP(INFO, " DataInputDescriptors : {}", getNumberInputDescriptors());
Expand Down
2 changes: 1 addition & 1 deletion Framework/Core/src/WorkflowHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext
for (auto ii = 0u; ii < OutputsInputs.size(); ii++) {
if ((outputTypes[ii] & 2) == 2) {

// temporarily also request to be dangling
// is this dangling ?
if ((outputTypes[ii] & 1) == 1) {
outputsInputsAOD.emplace_back(OutputsInputs[ii]);
isdangling.emplace_back((outputTypes[ii] & 1) == 1);
Expand Down
9 changes: 6 additions & 3 deletions Framework/Core/test/test_DataInputDirector.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,15 @@ BOOST_AUTO_TEST_CASE(TestDatainputDirector)

DataInputDirector didir1;
BOOST_CHECK(didir1.readJson(jsonFile));
//didir1.printOut(); printf("\n\n");
didir1.printOut(); printf("\n\n");

BOOST_CHECK_EQUAL(didir1.getNumberInputDescriptors(), 2);

auto dh = DataHeader(DataDescription{"DUE"},
DataOrigin{"AOD"},
DataHeader::SubSpecificationType{0});
BOOST_CHECK_EQUAL(didir1.getInputFilename(dh, 1), "Bresults_1.root");
auto [file1, directory1] = didir1.getFileFolder(dh, 1);
//BOOST_CHECK_EQUAL(file1->GetName(), "Bresults_1.root");

auto didesc = didir1.getDataInputDescriptor(dh);
BOOST_CHECK(didesc);
Expand Down Expand Up @@ -93,9 +94,11 @@ BOOST_AUTO_TEST_CASE(TestDatainputDirector)
"Bresults_1.root",
"Bresults_2.root"};
DataInputDirector didir2(inputFiles);
didir2.printOut(); printf("\n\n");
BOOST_CHECK(didir2.readJson(jsonFile));

BOOST_CHECK_EQUAL(didir2.getInputFilename(dh, 1), "Bresults_1.root");
auto [file2, directory2] = didir2.getFileFolder(dh, 1);
//BOOST_CHECK_EQUAL(file2->GetName(), "Bresults_1.root");

didesc = didir2.getDataInputDescriptor(dh);
BOOST_CHECK(didesc);
Expand Down