diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index de242fea54..05dc1e3eda 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -177,6 +177,7 @@ The services with containerized replacements currently implemented are: - DynamoDB - Redis - Trino +- HBase You can run `make test-python-integration-container` to run tests against the containerized versions of dependencies. diff --git a/sdk/python/feast/cli.py b/sdk/python/feast/cli.py index ec5b6cd1b6..bb920c3537 100644 --- a/sdk/python/feast/cli.py +++ b/sdk/python/feast/cli.py @@ -539,7 +539,8 @@ def materialize_incremental_command(ctx: click.Context, end_ts: str, views: List "--template", "-t", type=click.Choice( - ["local", "gcp", "aws", "snowflake", "spark", "postgres"], case_sensitive=False + ["local", "gcp", "aws", "snowflake", "spark", "postgres", "hbase"], + case_sensitive=False, ), help="Specify a template for the created project", default="local", diff --git a/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/README.md b/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/README.md new file mode 100644 index 0000000000..651e4e90b8 --- /dev/null +++ b/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/README.md @@ -0,0 +1,78 @@ +# HBase Online Store +HBase is not included in current [Feast](https://github.com/feast-dev/feast) roadmap, this project intends to add HBase support for Online Store. +We create a table _ which gets updated with data on every materialize call + + +#### Create a feature repository + +```shell +feast init feature_repo +cd feature_repo +``` + +#### Edit `feature_store.yaml` + +set `online_store` type to be `hbase` + +```yaml +project: feature_repo +registry: data/registry.db +provider: local +online_store: + type: hbase + host: 127.0.0.1 # hbase thrift endpoint + port: 9090 # hbase thrift api port +``` + +#### Apply the feature definitions in `example.py` + +```shell +feast -c feature_repo apply +``` +##### Output +``` +Registered entity driver_id +Registered feature view driver_hourly_stats_view +Deploying infrastructure for driver_hourly_stats_view +``` + +### Materialize Latest Data to Online Feature Store (HBase) +``` +$ CURRENT_TIME=$(date -u +"%Y-%m-%dT%H:%M:%S") +$ feast -c feature_repo materialize-incremental $CURRENT_TIME +``` +#### Output +``` +Materializing 1 feature views from 2022-04-16 15:30:39+05:30 to 2022-04-19 15:31:04+05:30 into the hbase online store. + +driver_hourly_stats_view from 2022-04-16 15:30:39+05:30 to 2022-04-19 15:31:04+05:30: +100%|████████████████████████████████████████████████████████████████| 5/5 [00:00<00:00, 120.59it/s] +``` + +### Fetch the latest features for some entity id +```python +from pprint import pprint +from feast import FeatureStore + +store = FeatureStore(repo_path=".") +feature_vector = store.get_online_features( + features=[ + "driver_hourly_stats:conv_rate", + "driver_hourly_stats:acc_rate", + "driver_hourly_stats:avg_daily_trips", + ], + entity_rows=[ + {"driver_id": 1004}, + {"driver_id": 1005}, + ], +).to_dict() +pprint(feature_vector) + +``` +#### Output +``` +{'acc_rate': [0.01390857808291912, 0.4063614010810852], + 'avg_daily_trips': [69, 706], + 'conv_rate': [0.6624961495399475, 0.7595928311347961], + 'driver_id': [1004, 1005]} +``` \ No newline at end of file diff --git a/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/__init__.py b/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/hbase.py b/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/hbase.py new file mode 100644 index 0000000000..0c0945e786 --- /dev/null +++ b/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/hbase.py @@ -0,0 +1,234 @@ +import calendar +import struct +from datetime import datetime +from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple + +from happybase import Connection +from pydantic.typing import Literal + +from feast import Entity +from feast.feature_view import FeatureView +from feast.infra.key_encoding_utils import serialize_entity_key +from feast.infra.online_stores.online_store import OnlineStore +from feast.infra.utils.hbase_utils import HbaseConstants, HbaseUtils +from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto +from feast.protos.feast.types.Value_pb2 import Value as ValueProto +from feast.repo_config import FeastConfigBaseModel, RepoConfig + + +class HbaseOnlineStoreConfig(FeastConfigBaseModel): + """Online store config for Hbase store""" + + type: Literal["hbase"] = "hbase" + """Online store type selector""" + + host: str + """Hostname of Hbase Thrift server""" + + port: str + """Port in which Hbase Thrift server is running""" + + +class HbaseConnection: + """ + Hbase connecttion to connect to hbase. + + Attributes: + store_config: Online store config for Hbase store. + """ + + def __init__(self, store_config: HbaseOnlineStoreConfig): + self._store_config = store_config + self._real_conn = Connection( + host=store_config.host, port=int(store_config.port) + ) + + @property + def real_conn(self) -> Connection: + """Stores the real happybase Connection to connect to hbase.""" + return self._real_conn + + def close(self) -> None: + """Close the happybase connection.""" + self.real_conn.close() + + +class HbaseOnlineStore(OnlineStore): + """ + Online feature store for Hbase. + + Attributes: + _conn: Happybase Connection to connect to hbase thrift server. + """ + + _conn: Connection = None + + def _get_conn(self, config: RepoConfig): + """ + Get or Create Hbase Connection from Repoconfig. + + Args: + config: The RepoConfig for the current FeatureStore. + """ + + store_config = config.online_store + assert isinstance(store_config, HbaseOnlineStoreConfig) + + if not self._conn: + self._conn = Connection(host=store_config.host, port=int(store_config.port)) + return self._conn + + def online_write_batch( + self, + config: RepoConfig, + table: FeatureView, + data: List[ + Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] + ], + progress: Optional[Callable[[int], Any]], + ) -> None: + """ + Write a batch of feature rows to Hbase online store. + + Args: + config: The RepoConfig for the current FeatureStore. + table: Feast FeatureView. + data: a list of quadruplets containing Feature data. Each quadruplet contains an Entity Key, + a dict containing feature values, an event timestamp for the row, and + the created timestamp for the row if it exists. + progress: Optional function to be called once every mini-batch of rows is written to + the online store. Can be used to display progress. + """ + + hbase = HbaseUtils(self._get_conn(config)) + project = config.project + table_name = _table_id(project, table) + + b = hbase.batch(table_name) + for entity_key, values, timestamp, created_ts in data: + row_key = serialize_entity_key(entity_key).hex() + values_dict = {} + for feature_name, val in values.items(): + values_dict[ + HbaseConstants.get_col_from_feature(feature_name) + ] = val.SerializeToString() + if isinstance(timestamp, datetime): + values_dict[HbaseConstants.DEFAULT_EVENT_TS] = struct.pack( + ">L", int(calendar.timegm(timestamp.timetuple())) + ) + else: + values_dict[HbaseConstants.DEFAULT_EVENT_TS] = timestamp + if created_ts is not None: + if isinstance(created_ts, datetime): + values_dict[HbaseConstants.DEFAULT_CREATED_TS] = struct.pack( + ">L", int(calendar.timegm(created_ts.timetuple())) + ) + else: + values_dict[HbaseConstants.DEFAULT_CREATED_TS] = created_ts + b.put(row_key, values_dict) + b.send() + + def online_read( + self, + config: RepoConfig, + table: FeatureView, + entity_keys: List[EntityKeyProto], + requested_features: Optional[List[str]] = None, + ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: + """ + Retrieve feature values from the Hbase online store. + + Args: + config: The RepoConfig for the current FeatureStore. + table: Feast FeatureView. + entity_keys: a list of entity keys that should be read from the FeatureStore. + """ + hbase = HbaseUtils(self._get_conn(config)) + project = config.project + table_name = _table_id(project, table) + + result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = [] + + row_keys = [ + serialize_entity_key(entity_key).hex() for entity_key in entity_keys + ] + rows = hbase.rows(table_name, row_keys=row_keys) + + for _, row in rows: + res = {} + res_ts = None + for feature_name, feature_value in row.items(): + f_name = HbaseConstants.get_feature_from_col(feature_name) + if requested_features is not None and f_name in requested_features: + v = ValueProto() + v.ParseFromString(feature_value) + res[f_name] = v + if f_name is HbaseConstants.EVENT_TS: + ts = struct.unpack(">L", feature_value)[0] + res_ts = datetime.fromtimestamp(ts) + if not res: + result.append((None, None)) + else: + result.append((res_ts, res)) + return result + + def update( + self, + config: RepoConfig, + tables_to_delete: Sequence[FeatureView], + tables_to_keep: Sequence[FeatureView], + entities_to_delete: Sequence[Entity], + entities_to_keep: Sequence[Entity], + partial: bool, + ): + """ + Update tables from the Hbase Online Store. + + Args: + config: The RepoConfig for the current FeatureStore. + tables_to_delete: Tables to delete from the Hbase Online Store. + tables_to_keep: Tables to keep in the Hbase Online Store. + """ + hbase = HbaseUtils(self._get_conn(config)) + project = config.project + + # We don't create any special state for the entites in this implementation. + for table in tables_to_keep: + table_name = _table_id(project, table) + if not hbase.check_if_table_exist(table_name): + hbase.create_table_with_default_cf(table_name) + + for table in tables_to_delete: + table_name = _table_id(project, table) + hbase.delete_table(table_name) + + def teardown( + self, + config: RepoConfig, + tables: Sequence[FeatureView], + entities: Sequence[Entity], + ): + """ + Delete tables from the Hbase Online Store. + + Args: + config: The RepoConfig for the current FeatureStore. + tables: Tables to delete from the feature repo. + """ + hbase = HbaseUtils(self._get_conn(config)) + project = config.project + + for table in tables: + table_name = _table_id(project, table) + hbase.delete_table(table_name) + + +def _table_id(project: str, table: FeatureView) -> str: + """ + Returns table name given the project_name and the feature_view. + + Args: + project: Name of the feast project. + table: Feast FeatureView. + """ + return f"{project}_{table.name}" diff --git a/sdk/python/feast/infra/online_stores/contrib/hbase_repo_configuration.py b/sdk/python/feast/infra/online_stores/contrib/hbase_repo_configuration.py new file mode 100644 index 0000000000..4e32a654b5 --- /dev/null +++ b/sdk/python/feast/infra/online_stores/contrib/hbase_repo_configuration.py @@ -0,0 +1,10 @@ +from tests.integration.feature_repos.integration_test_repo_config import ( + IntegrationTestRepoConfig, +) +from tests.integration.feature_repos.universal.online_store.hbase import ( + HbaseOnlineStoreCreator, +) + +FULL_REPO_CONFIGS = [ + IntegrationTestRepoConfig(online_store_creator=HbaseOnlineStoreCreator), +] diff --git a/sdk/python/feast/infra/utils/hbase_utils.py b/sdk/python/feast/infra/utils/hbase_utils.py new file mode 100644 index 0000000000..78a39caed8 --- /dev/null +++ b/sdk/python/feast/infra/utils/hbase_utils.py @@ -0,0 +1,189 @@ +from typing import List + +from happybase import Connection + +from feast.infra.key_encoding_utils import serialize_entity_key +from feast.protos.feast.types.EntityKey_pb2 import EntityKey + + +class HbaseConstants: + """Constants to be used by the Hbase Online Store.""" + + DEFAULT_COLUMN_FAMILY = "default" + EVENT_TS = "event_ts" + CREATED_TS = "created_ts" + DEFAULT_EVENT_TS = DEFAULT_COLUMN_FAMILY + ":" + EVENT_TS + DEFAULT_CREATED_TS = DEFAULT_COLUMN_FAMILY + ":" + CREATED_TS + + @staticmethod + def get_feature_from_col(col): + """Given the column name, exclude the column family to get the feature name.""" + return col.decode("utf-8").split(":")[1] + + @staticmethod + def get_col_from_feature(feature): + """Given the feature name, add the column family to get the column name.""" + if isinstance(feature, bytes): + feature = feature.decode("utf-8") + return HbaseConstants.DEFAULT_COLUMN_FAMILY + ":" + feature + + +class HbaseUtils: + """ + Utils class to manage different Hbase operations. + + Attributes: + conn: happybase Connection to connect to hbase. + host: hostname of the hbase thrift server. + port: port in which thrift server is running. + timeout: socket timeout in milliseconds. + """ + + def __init__( + self, conn: Connection = None, host: str = None, port: int = None, timeout=None + ): + if conn is None: + self.host = host + self.port = port + self.conn = Connection(host=host, port=port, timeout=timeout) + else: + self.conn = conn + + def create_table(self, table_name: str, colm_family: List[str]): + """ + Create table in hbase online store. + + Arguments: + table_name: Name of the Hbase table. + colm_family: List of names of column families to be created in the hbase table. + """ + cf_dict: dict = {} + for cf in colm_family: + cf_dict[cf] = dict() + return self.conn.create_table(table_name, cf_dict) + + def create_table_with_default_cf(self, table_name: str): + """ + Create table in hbase online store with one column family "default". + + Arguments: + table_name: Name of the Hbase table. + """ + return self.conn.create_table(table_name, {"default": dict()}) + + def check_if_table_exist(self, table_name: str): + """ + Check if table exists in hbase. + + Arguments: + table_name: Name of the Hbase table. + """ + return bytes(table_name, "utf-8") in self.conn.tables() + + def batch(self, table_name: str): + """ + Returns a 'Batch' instance that can be used for mass data manipulation in the hbase table. + + Arguments: + table_name: Name of the Hbase table. + """ + return self.conn.table(table_name).batch() + + def put(self, table_name: str, row_key: str, data: dict): + """ + Store data in the hbase table. + + Arguments: + table_name: Name of the Hbase table. + row_key: Row key of the row to be inserted to hbase table. + data: Mapping of column family name:column name to column values + """ + table = self.conn.table(table_name) + table.put(row_key, data) + + def row( + self, + table_name: str, + row_key, + columns=None, + timestamp=None, + include_timestamp=False, + ): + """ + Fetch a row of data from the hbase table. + + Arguments: + table_name: Name of the Hbase table. + row_key: Row key of the row to be inserted to hbase table. + columns: the name of columns that needs to be fetched. + timestamp: timestamp specifies the maximum version the cells can have. + include_timestamp: specifies if (column, timestamp) to be return instead of only column. + """ + table = self.conn.table(table_name) + return table.row(row_key, columns, timestamp, include_timestamp) + + def rows( + self, + table_name: str, + row_keys, + columns=None, + timestamp=None, + include_timestamp=False, + ): + """ + Fetch multiple rows of data from the hbase table. + + Arguments: + table_name: Name of the Hbase table. + row_keys: List of row key of the row to be inserted to hbase table. + columns: the name of columns that needs to be fetched. + timestamp: timestamp specifies the maximum version the cells can have. + include_timestamp: specifies if (column, timestamp) to be return instead of only column. + """ + table = self.conn.table(table_name) + return table.rows(row_keys, columns, timestamp, include_timestamp) + + def print_table(self, table_name): + """Prints the table scanning all the rows of the hbase table.""" + table = self.conn.table(table_name) + scan_data = table.scan() + for row_key, cols in scan_data: + print(row_key.decode("utf-8"), cols) + + def delete_table(self, table: str): + """Deletes the hbase table given the table name.""" + if self.check_if_table_exist(table): + self.conn.delete_table(table, disable=True) + + def close_conn(self): + """Closes the happybase connection.""" + self.conn.close() + + +def main(): + from feast.protos.feast.types.Value_pb2 import Value + + connection = Connection(host="localhost", port=9090) + table = connection.table("test_hbase_driver_hourly_stats") + row_keys = [ + serialize_entity_key( + EntityKey(join_keys=["driver_id"], entity_values=[Value(int64_val=1004)]) + ).hex(), + serialize_entity_key( + EntityKey(join_keys=["driver_id"], entity_values=[Value(int64_val=1005)]) + ).hex(), + serialize_entity_key( + EntityKey(join_keys=["driver_id"], entity_values=[Value(int64_val=1024)]) + ).hex(), + ] + rows = table.rows(row_keys) + + for row_key, row in rows: + for key, value in row.items(): + col_name = bytes.decode(key, "utf-8").split(":")[1] + print(col_name, value) + print() + + +if __name__ == "__main__": + main() diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index 5ea7a8979f..ef1871f2fe 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -36,6 +36,7 @@ "dynamodb": "feast.infra.online_stores.dynamodb.DynamoDBOnlineStore", "snowflake.online": "feast.infra.online_stores.snowflake.SnowflakeOnlineStore", "postgres": "feast.infra.online_stores.contrib.postgres.PostgreSQLOnlineStore", + "hbase": "feast.infra.online_stores.contrib.hbase_online_store.hbase.HbaseOnlineStore", } OFFLINE_STORE_CLASS_FOR_TYPE = { diff --git a/sdk/python/feast/templates/hbase/__init__.py b/sdk/python/feast/templates/hbase/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/sdk/python/feast/templates/hbase/bootstrap.py b/sdk/python/feast/templates/hbase/bootstrap.py new file mode 100644 index 0000000000..4013ca5a8d --- /dev/null +++ b/sdk/python/feast/templates/hbase/bootstrap.py @@ -0,0 +1,35 @@ +def bootstrap(): + # Bootstrap() will automatically be called from the init_repo() during `feast init` + + import pathlib + from datetime import datetime, timedelta + + from feast.driver_test_data import create_driver_hourly_stats_df + + repo_path = pathlib.Path(__file__).parent.absolute() + data_path = repo_path / "data" + data_path.mkdir(exist_ok=True) + + end_date = datetime.now().replace(microsecond=0, second=0, minute=0) + start_date = end_date - timedelta(days=15) + + driver_entities = [1001, 1002, 1003, 1004, 1005] + driver_df = create_driver_hourly_stats_df(driver_entities, start_date, end_date) + + driver_stats_path = data_path / "driver_stats.parquet" + driver_df.to_parquet(path=str(driver_stats_path), allow_truncated_timestamps=True) + + example_py_file = repo_path / "example.py" + replace_str_in_file(example_py_file, "%PARQUET_PATH%", str(driver_stats_path)) + + +def replace_str_in_file(file_path, match_str, sub_str): + with open(file_path, "r") as f: + contents = f.read() + contents = contents.replace(match_str, sub_str) + with open(file_path, "wt") as f: + f.write(contents) + + +if __name__ == "__main__": + bootstrap() diff --git a/sdk/python/feast/templates/hbase/example.py b/sdk/python/feast/templates/hbase/example.py new file mode 100644 index 0000000000..1d441e0e99 --- /dev/null +++ b/sdk/python/feast/templates/hbase/example.py @@ -0,0 +1,36 @@ +# This is an example feature definition file + +from datetime import timedelta + +from feast import Entity, FeatureView, Field, FileSource, ValueType +from feast.types import Float32, Int64 + +# Read data from parquet files. Parquet is convenient for local development mode. For +# production, you can use your favorite DWH, such as BigQuery. See Feast documentation +# for more info. +driver_hourly_stats = FileSource( + path="%PARQUET_PATH%", + timestamp_field="event_timestamp", + created_timestamp_column="created", +) + +# Define an entity for the driver. You can think of entity as a primary key used to +# fetch features. +driver = Entity(name="driver", join_keys=["driver_id"], value_type=ValueType.INT64,) + +# Our parquet files contain sample data that includes a driver_id column, timestamps and +# three feature column. Here we define a Feature View that will allow us to serve this +# data to our model online. +driver_hourly_stats_view = FeatureView( + name="driver_hourly_stats", + entities=["driver"], + ttl=timedelta(days=1), + schema=[ + Field(name="conv_rate", dtype=Float32), + Field(name="acc_rate", dtype=Float32), + Field(name="avg_daily_trips", dtype=Int64), + ], + online=True, + source=driver_hourly_stats, + tags={}, +) diff --git a/sdk/python/feast/templates/hbase/feature_store.yaml b/sdk/python/feast/templates/hbase/feature_store.yaml new file mode 100644 index 0000000000..83ce237b71 --- /dev/null +++ b/sdk/python/feast/templates/hbase/feature_store.yaml @@ -0,0 +1,7 @@ +project: my_project +registry: data/registry.db +provider: local +online_store: + type: hbase + host: 127.0.0.1 + port: 9090 diff --git a/sdk/python/setup.py b/sdk/python/setup.py index b02993d68c..3d076be776 100644 --- a/sdk/python/setup.py +++ b/sdk/python/setup.py @@ -109,6 +109,10 @@ "psycopg2-binary>=2.8.3", ] +HBASE_REQUIRED = [ + "happybase>=1.2.0", +] + GE_REQUIRED = [ "great_expectations>=0.14.0,<0.15.0" ] @@ -168,6 +172,7 @@ + POSTGRES_REQUIRED + TRINO_REQUIRED + GE_REQUIRED + + HBASE_REQUIRED ) DEV_REQUIRED = ["mypy-protobuf==3.1", "grpcio-testing==1.*"] + CI_REQUIRED @@ -444,6 +449,7 @@ def copy_extensions_to_source(self): "trino": TRINO_REQUIRED, "postgres": POSTGRES_REQUIRED, "ge": GE_REQUIRED, + "hbase": HBASE_REQUIRED, "go": GO_REQUIRED, }, include_package_data=True, diff --git a/sdk/python/tests/integration/feature_repos/universal/online_store/hbase.py b/sdk/python/tests/integration/feature_repos/universal/online_store/hbase.py new file mode 100644 index 0000000000..e53435bc0f --- /dev/null +++ b/sdk/python/tests/integration/feature_repos/universal/online_store/hbase.py @@ -0,0 +1,28 @@ +from typing import Dict + +from testcontainers.core.container import DockerContainer +from testcontainers.core.waiting_utils import wait_for_logs + +from tests.integration.feature_repos.universal.online_store_creator import ( + OnlineStoreCreator, +) + + +class HbaseOnlineStoreCreator(OnlineStoreCreator): + def __init__(self, project_name: str): + super().__init__(project_name) + self.container = DockerContainer("harisekhon/hbase").with_exposed_ports("9090") + + def create_online_store(self) -> Dict[str, str]: + self.container.start() + log_string_to_wait_for = ( + "Initializing Hbase Local with the following configuration:" + ) + wait_for_logs( + container=self.container, predicate=log_string_to_wait_for, timeout=5 + ) + exposed_port = self.container.get_exposed_port("9090") + return {"type": "hbase", "host": "127.0.0.1", "port": exposed_port} + + def teardown(self): + self.container.stop()