Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Refactor flow of loading/compressing data
There was a lot of code duplication, and the general flow of
loading/storing the data in compressed format was hard to navigate.
  • Loading branch information
PGijsbers committed Jan 15, 2021
commit d1d0f91e2a9bac077ba217c78fb85950aed2eaa4
180 changes: 63 additions & 117 deletions openml/datasets/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,10 +217,11 @@ def find_invalid_characters(string, pattern):
self.qualities = None

if data_file is not None:
rval = self._create_pickle_in_cache(data_file)
rval = self._compressed_cache_file_paths(data_file)
self.data_pickle_file = rval[0] # type: Optional[str]
self.data_feather_file = rval[1] # type: Optional[str]
self.feather_attribute_file = rval[2] # type: Optional[str]
self._cache_compressed_file_from_arff(self.data_file)
else:
self.data_pickle_file, self.data_feather_file, self.feather_attribute_file = (
None,
Expand Down Expand Up @@ -278,7 +279,7 @@ def __eq__(self, other):
"upload_date",
"url",
"dataset",
"data_file",
"arff_file",
}

# check that the keys are identical
Expand All @@ -291,7 +292,7 @@ def __eq__(self, other):
return all(self.__dict__[key] == other.__dict__[key] for key in self_keys)

def _download_data(self) -> None:
""" Download ARFF data file to standard cache directory. Set `self.data_file`. """
""" Download ARFF data file to standard cache directory. Set `self.arff_file`. """
# import required here to avoid circular import.
from .functions import _get_dataset_arff

Expand All @@ -300,7 +301,7 @@ def _download_data(self) -> None:
def _get_arff(self, format: str) -> Dict:
"""Read ARFF file and return decoded arff.

Reads the file referenced in self.data_file.
Reads the file referenced in self.arff_file.

Parameters
----------
Expand Down Expand Up @@ -455,152 +456,97 @@ def _parse_data_from_arff(

return X, categorical, attribute_names

def _create_pickle_in_cache(self, data_file: str) -> Tuple[str, str, str]:
""" Parse the arff and pickle the result. Update any old pickle objects. """
def _compressed_cache_file_paths(self, data_file: str) -> Tuple[str, str, str]:
data_pickle_file = data_file.replace(".arff", ".pkl.py3")
data_feather_file = data_file.replace(".arff", ".feather")
feather_attribute_file = data_file.replace(".arff", ".feather.attributes.pkl.py3")
if os.path.exists(data_pickle_file) and self.cache_format == "pickle":
# Load the data to check if the pickle file is outdated (i.e. contains numpy array)
with open(data_pickle_file, "rb") as fh:
try:
data, categorical, attribute_names = pickle.load(fh)
except EOFError:
# The file is likely corrupt, see #780.
# We deal with this when loading the data in `_load_data`.
return data_pickle_file, data_feather_file, feather_attribute_file
except ModuleNotFoundError:
# There was some issue loading the file, see #918
# We deal with this when loading the data in `_load_data`.
return data_pickle_file, data_feather_file, feather_attribute_file
except ValueError as e:
if "unsupported pickle protocol" in e.args[0]:
# There was some issue loading the file, see #898
# We deal with this when loading the data in `_load_data`.
return data_pickle_file, data_feather_file, feather_attribute_file
else:
raise

# Between v0.8 and v0.9 the format of pickled data changed from
# np.ndarray to pd.DataFrame. This breaks some backwards compatibility,
# e.g. for `run_model_on_task`. If a local file still exists with
# np.ndarray data, we reprocess the data file to store a pickled
# pd.DataFrame blob. See also #646.
if isinstance(data, pd.DataFrame) or scipy.sparse.issparse(data):
logger.debug("Data pickle file already exists and is up to date.")
return data_pickle_file, data_feather_file, feather_attribute_file
elif os.path.exists(data_feather_file) and self.cache_format == "feather":
# Load the data to check if the pickle file is outdated (i.e. contains numpy array)
try:
data = pd.read_feather(data_feather_file)
except EOFError:
# The file is likely corrupt, see #780.
# We deal with this when loading the data in `_load_data`.
return data_pickle_file, data_feather_file, feather_attribute_file
except ModuleNotFoundError:
# There was some issue loading the file, see #918
# We deal with this when loading the data in `_load_data`.
return data_pickle_file, data_feather_file, feather_attribute_file
except ValueError as e:
if "unsupported pickle protocol" in e.args[0]:
# There was some issue loading the file, see #898
# We deal with this when loading the data in `_load_data`.
return data_pickle_file, data_feather_file, feather_attribute_file
else:
raise
return data_pickle_file, data_feather_file, feather_attribute_file

logger.debug("Data feather file already exists and is up to date.")
return data_pickle_file, data_feather_file, feather_attribute_file
def _cache_compressed_file_from_arff(
self, arff_file: str
) -> Tuple[Union[pd.DataFrame, scipy.sparse.csr_matrix], List[bool], List[str]]:
""" Store data from the arff file in compressed format. Sets cache_format to 'pickle' if data is sparse. """ # noqa: 501
(
data_pickle_file,
data_feather_file,
feather_attribute_file,
) = self._compressed_cache_file_paths(arff_file)

# At this point either the pickle file does not exist, or it had outdated formatting.
# We parse the data from arff again and populate the cache with a recent pickle file.
X, categorical, attribute_names = self._parse_data_from_arff(data_file)
data, categorical, attribute_names = self._parse_data_from_arff(arff_file)

# Feather format does not work for sparse datasets, so we use pickle for sparse datasets
if scipy.sparse.issparse(data):
self.cache_format = "pickle"

if self.cache_format == "feather" and not scipy.sparse.issparse(X):
logger.info("feather write {}".format(self.name))
X.to_feather(data_feather_file)
logger.info(f"{self.cache_format} write {self.name}")
if self.cache_format == "feather":
data.to_feather(data_feather_file)
with open(feather_attribute_file, "wb") as fh:
pickle.dump((categorical, attribute_names), fh, pickle.HIGHEST_PROTOCOL)
else:
logger.info("pickle write {}".format(self.name))
self.cache_format = "pickle"
with open(data_pickle_file, "wb") as fh:
pickle.dump((X, categorical, attribute_names), fh, pickle.HIGHEST_PROTOCOL)
logger.debug(
"Saved dataset {did}: {name} to file {path}".format(
did=int(self.dataset_id or -1), name=self.name, path=data_pickle_file
)
)
return data_pickle_file, data_feather_file, feather_attribute_file
pickle.dump((data, categorical, attribute_names), fh, pickle.HIGHEST_PROTOCOL)

data_file = data_pickle_file if self.cache_format == "pickle" else data_feather_file
logger.debug(f"Saved dataset {int(self.dataset_id or -1)}: {self.name} to file {data_file}")
return data, categorical, attribute_names

def _load_data(self):
""" Load data from pickle or arff. Download data first if not present on disk. """
if (self.cache_format == "pickle" and self.data_pickle_file is None) or (
self.cache_format == "feather" and self.data_feather_file is None
):
""" Load data from compressed format or arff. Download data if not present on disk. """
need_to_create_pickle = self.cache_format == "pickle" and self.data_pickle_file is None
need_to_create_feather = self.cache_format == "feather" and self.data_feather_file is None

if need_to_create_pickle or need_to_create_feather:
if self.data_file is None:
self._download_data()
(
self.data_pickle_file,
self.data_feather_file,
self.feather_attribute_file,
) = self._create_pickle_in_cache(self.data_file)

res = self._compressed_cache_file_paths(self.data_file)
self.data_pickle_file, self.data_feather_file, self.feather_attribute_file = res
# Since our recently stored data is exists in memory, there is no need to load from disk
return self._cache_compressed_file_from_arff(self.data_file)

# helper variable to help identify where errors occur
fpath = self.data_feather_file if self.cache_format == "feather" else self.data_pickle_file
logger.info(f"{self.cache_format} load data {self.name}")
try:
if self.cache_format == "feather":
logger.info("feather load data {}".format(self.name))
data = pd.read_feather(self.data_feather_file)

fpath = self.feather_attribute_file
with open(self.feather_attribute_file, "rb") as fh:
categorical, attribute_names = pickle.load(fh)
else:
logger.info("pickle load data {}".format(self.name))
with open(self.data_pickle_file, "rb") as fh:
data, categorical, attribute_names = pickle.load(fh)
except EOFError:
logger.warning(
"Detected a corrupt cache file loading dataset %d: '%s'. "
"We will continue loading data from the arff-file, "
"but this will be much slower for big datasets. "
"Please manually delete the cache file if you want OpenML-Python "
"to attempt to reconstruct it."
"" % (self.dataset_id, self.data_pickle_file)
)
data, categorical, attribute_names = self._parse_data_from_arff(self.data_file)
except FileNotFoundError:
raise ValueError(
"Cannot find a pickle file for dataset {} at "
"location {} ".format(self.name, self.data_pickle_file)
)
except ModuleNotFoundError as e:
raise ValueError(f"Cannot find file for dataset {self.name} at location '{fpath}'.")
except (EOFError, ModuleNotFoundError, ValueError) as e:
error_message = e.message if hasattr(e, "message") else e.args[0]
hint = ""

if isinstance(e, EOFError):
readable_error = "Detected a corrupt cache file"
elif isinstance(e, ModuleNotFoundError):
readable_error = "Detected likely dependency issues"
hint = "This is most likely due to https://github.com/openml/openml-python/issues/918. " # noqa: 501
elif isinstance(e, ValueError) and "unsupported pickle protocol" in e.args[0]:
readable_error = "Encountered unsupported pickle protocol"
else:
raise # an unknown ValueError is raised, should crash and file bug report

logger.warning(
"Encountered error message when loading cached dataset %d: '%s'. "
"Error message was: %s. "
"This is most likely due to https://github.com/openml/openml-python/issues/918. "
f"{readable_error} when loading dataset {self.id} from '{fpath}'. "
f"{hint}"
f"Error message was: {error_message}. "
"We will continue loading data from the arff-file, "
"but this will be much slower for big datasets. "
"Please manually delete the cache file if you want OpenML-Python "
"to attempt to reconstruct it."
"" % (self.dataset_id, self.data_pickle_file, e.args[0]),
)
data, categorical, attribute_names = self._parse_data_from_arff(self.data_file)
except ValueError as e:
if "unsupported pickle protocol" in e.args[0]:
logger.warning(
"Encountered unsupported pickle protocol when loading cached dataset %d: '%s'. "
"Error message was: %s. "
"We will continue loading data from the arff-file, "
"but this will be much slower for big datasets. "
"Please manually delete the cache file if you want OpenML-Python "
"to attempt to reconstruct it."
"" % (self.dataset_id, self.data_pickle_file, e.args[0]),
)
data, categorical, attribute_names = self._parse_data_from_arff(self.data_file)
else:
raise

data_up_to_date = isinstance(data, pd.DataFrame) or scipy.sparse.issparse(data)
if self.cache_format == "pickle" and not data_up_to_date:
logger.info("Updating outdated pickle file.")
return self._cache_compressed_file_from_arff(self.data_file)
return data, categorical, attribute_names

@staticmethod
Expand Down