|
77 | 77 | from feast.repo_config import RepoConfig, load_repo_config |
78 | 78 | from feast.repo_contents import RepoContents |
79 | 79 | from feast.request_feature_view import RequestFeatureView |
| 80 | +from feast.saved_dataset import SavedDataset, SavedDatasetStorage |
80 | 81 | from feast.type_map import python_values_to_proto_values |
81 | 82 | from feast.usage import log_exceptions, log_exceptions_and_usage, set_usage_attribute |
82 | 83 | from feast.value_type import ValueType |
@@ -764,6 +765,93 @@ def get_historical_features( |
764 | 765 |
|
765 | 766 | return job |
766 | 767 |
|
| 768 | + @log_exceptions_and_usage |
| 769 | + def create_saved_dataset( |
| 770 | + self, |
| 771 | + from_: RetrievalJob, |
| 772 | + name: str, |
| 773 | + storage: SavedDatasetStorage, |
| 774 | + tags: Optional[Dict[str, str]] = None, |
| 775 | + ) -> SavedDataset: |
| 776 | + """ |
| 777 | + Execute provided retrieval job and persist its outcome in given storage. |
| 778 | + Storage type (eg, BigQuery or Redshift) must be the same as globally configured offline store. |
| 779 | + After data successfully persisted saved dataset object with dataset metadata is committed to the registry. |
| 780 | + Name for the saved dataset should be unique within project, since it's possible to overwrite previously stored dataset |
| 781 | + with the same name. |
| 782 | +
|
| 783 | + Returns: |
| 784 | + SavedDataset object with attached RetrievalJob |
| 785 | +
|
| 786 | + Raises: |
| 787 | + ValueError if given retrieval job doesn't have metadata |
| 788 | + """ |
| 789 | + warnings.warn( |
| 790 | + "Saving dataset is an experimental feature. " |
| 791 | + "This API is unstable and it could and most probably will be changed in the future. " |
| 792 | + "We do not guarantee that future changes will maintain backward compatibility.", |
| 793 | + RuntimeWarning, |
| 794 | + ) |
| 795 | + |
| 796 | + if not from_.metadata: |
| 797 | + raise ValueError( |
| 798 | + "RetrievalJob must contains metadata. " |
| 799 | + "Use RetrievalJob produced by get_historical_features" |
| 800 | + ) |
| 801 | + |
| 802 | + dataset = SavedDataset( |
| 803 | + name=name, |
| 804 | + features=from_.metadata.features, |
| 805 | + join_keys=from_.metadata.keys, |
| 806 | + full_feature_names=from_.full_feature_names, |
| 807 | + storage=storage, |
| 808 | + tags=tags, |
| 809 | + ) |
| 810 | + |
| 811 | + dataset.min_event_timestamp = from_.metadata.min_event_timestamp |
| 812 | + dataset.max_event_timestamp = from_.metadata.max_event_timestamp |
| 813 | + |
| 814 | + from_.persist(storage) |
| 815 | + |
| 816 | + self._registry.apply_saved_dataset(dataset, self.project, commit=True) |
| 817 | + |
| 818 | + return dataset.with_retrieval_job( |
| 819 | + self._get_provider().retrieve_saved_dataset( |
| 820 | + config=self.config, dataset=dataset |
| 821 | + ) |
| 822 | + ) |
| 823 | + |
| 824 | + @log_exceptions_and_usage |
| 825 | + def get_saved_dataset(self, name: str) -> SavedDataset: |
| 826 | + """ |
| 827 | + Find a saved dataset in the registry by provided name and |
| 828 | + create a retrieval job to pull whole dataset from storage (offline store). |
| 829 | +
|
| 830 | + If dataset couldn't be found by provided name SavedDatasetNotFound exception will be raised. |
| 831 | +
|
| 832 | + Data will be retrieved from globally configured offline store. |
| 833 | +
|
| 834 | + Returns: |
| 835 | + SavedDataset with RetrievalJob attached |
| 836 | +
|
| 837 | + Raises: |
| 838 | + SavedDatasetNotFound |
| 839 | + """ |
| 840 | + warnings.warn( |
| 841 | + "Retrieving datasets is an experimental feature. " |
| 842 | + "This API is unstable and it could and most probably will be changed in the future. " |
| 843 | + "We do not guarantee that future changes will maintain backward compatibility.", |
| 844 | + RuntimeWarning, |
| 845 | + ) |
| 846 | + |
| 847 | + dataset = self._registry.get_saved_dataset(name, self.project) |
| 848 | + provider = self._get_provider() |
| 849 | + |
| 850 | + retrieval_job = provider.retrieve_saved_dataset( |
| 851 | + config=self.config, dataset=dataset |
| 852 | + ) |
| 853 | + return dataset.with_retrieval_job(retrieval_job) |
| 854 | + |
767 | 855 | @log_exceptions_and_usage |
768 | 856 | def materialize_incremental( |
769 | 857 | self, end_date: datetime, feature_views: Optional[List[str]] = None, |
|
0 commit comments