From 2da160d815571ea0d0445e7a0d2ad043f6025d58 Mon Sep 17 00:00:00 2001 From: aurobindoc Date: Mon, 18 Apr 2022 16:35:16 +0530 Subject: [PATCH 1/9] Resolved conflict with latest master Signed-off-by: aurobindoc --- .../infra/online_stores/contrib/__init__.py | 1 + .../contrib/contrib_repo_configuration.py | 8 + .../contrib/hbase_online_store/__init__.py | 1 + .../contrib/hbase_online_store/hbase.py | 146 ++++++++++++++++++ .../contrib/hbase_online_store/hbase_utils.py | 99 ++++++++++++ sdk/python/feast/repo_config.py | 1 + sdk/python/setup.py | 6 + .../universal/online_store/hbase.py | 34 ++++ 8 files changed, 296 insertions(+) create mode 100644 sdk/python/feast/infra/online_stores/contrib/contrib_repo_configuration.py create mode 100644 sdk/python/feast/infra/online_stores/contrib/hbase_online_store/__init__.py create mode 100644 sdk/python/feast/infra/online_stores/contrib/hbase_online_store/hbase.py create mode 100644 sdk/python/feast/infra/online_stores/contrib/hbase_online_store/hbase_utils.py create mode 100644 sdk/python/tests/integration/feature_repos/universal/online_store/hbase.py diff --git a/sdk/python/feast/infra/online_stores/contrib/__init__.py b/sdk/python/feast/infra/online_stores/contrib/__init__.py index e69de29bb2..d1cb8b66e9 100644 --- a/sdk/python/feast/infra/online_stores/contrib/__init__.py +++ b/sdk/python/feast/infra/online_stores/contrib/__init__.py @@ -0,0 +1 @@ +# Created by aurobindo.m on 18/04/22 diff --git a/sdk/python/feast/infra/online_stores/contrib/contrib_repo_configuration.py b/sdk/python/feast/infra/online_stores/contrib/contrib_repo_configuration.py new file mode 100644 index 0000000000..a12cc34be0 --- /dev/null +++ b/sdk/python/feast/infra/online_stores/contrib/contrib_repo_configuration.py @@ -0,0 +1,8 @@ +from tests.integration.feature_repos.integration_test_repo_config import ( + IntegrationTestRepoConfig, +) +from tests.integration.feature_repos.universal.online_store.hbase import HbaseOnlineStoreCreator + +FULL_REPO_CONFIGS = [ + IntegrationTestRepoConfig(online_store_creator=HbaseOnlineStoreCreator), +] diff --git a/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/__init__.py b/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/__init__.py new file mode 100644 index 0000000000..d1cb8b66e9 --- /dev/null +++ b/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/__init__.py @@ -0,0 +1 @@ +# Created by aurobindo.m on 18/04/22 diff --git a/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/hbase.py b/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/hbase.py new file mode 100644 index 0000000000..164b9c688d --- /dev/null +++ b/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/hbase.py @@ -0,0 +1,146 @@ +# Created by aurobindo.m on 18/04/22 +import struct +import calendar +from datetime import datetime +from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple + +from feast import Entity +from feast.feature_view import FeatureView +from feast.infra.key_encoding_utils import serialize_entity_key +from feast.infra.online_stores.online_store import OnlineStore +from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto +from feast.protos.feast.types.Value_pb2 import Value as ValueProto +from feast.repo_config import FeastConfigBaseModel +from feast.repo_config import RepoConfig +from happybase import Connection + +from feast.infra.online_stores.contrib.hbase_online_store.hbase_utils import HbaseUtils, HbaseConstants + + +class HbaseOnlineStoreConfig(FeastConfigBaseModel): + type: str + host: str + port: int + + +class HbaseConnection: + def __init__(self, store_config: HbaseOnlineStoreConfig): + self._store_config = store_config + self._real_conn = Connection(host=store_config.host, port=store_config.port) + + @property + def real_conn(self) -> Connection: + return self._real_conn + + def close(self) -> None: + self.real_conn.close() + + +class HbaseOnlineStore(OnlineStore): + _conn: Connection = None + + def _get_conn(self, config: RepoConfig): + + store_config = config.online_store + assert isinstance(store_config, HbaseOnlineStoreConfig) + + if not self._conn: + self._conn = Connection(host=store_config.host, port=store_config.port) + return self._conn + + def online_write_batch(self, config: RepoConfig, table: FeatureView, data: List[ + Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]], progress: Optional[Callable[[int], Any]], ) -> None: + + hbase = HbaseUtils(self._get_conn(config)) + project = config.project + table_name = _table_id(project, table) + + b = hbase.batch(table_name) + for entity_key, values, timestamp, created_ts in data: + row_key = serialize_entity_key(entity_key).hex() + values_dict = {} + for feature_name, val in values.items(): + values_dict[HbaseConstants.get_col_from_feature(feature_name)] = val.SerializeToString() + if isinstance(timestamp, datetime): + timestamp = int(calendar.timegm(timestamp.timetuple())) + timestamp = struct.pack('>L', timestamp) + values_dict[HbaseConstants.DEFAULT_EVENT_TS] = timestamp + if created_ts is not None: + if isinstance(created_ts, datetime): + created_ts = int(calendar.timegm(created_ts.timetuple())) + created_ts = struct.pack('>L', created_ts) + values_dict[HbaseConstants.DEFAULT_CREATED_TS] = created_ts + b.put(row_key, values_dict) + b.send() + + def online_read( + self, + config: RepoConfig, + table: FeatureView, + entity_keys: List[EntityKeyProto], + requested_features: Optional[List[str]] = None, + ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: + hbase = HbaseUtils(self._get_conn(config)) + project = config.project + table_name = _table_id(project, table) + + result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = [] + + for entity_key in entity_keys: + row_key = serialize_entity_key(entity_key).hex() + val = hbase.row(table_name, row_key=row_key) + res = {} + res_ts = None + for feature_name, feature_value in val.items(): + f_name = HbaseConstants.get_feature_from_col(feature_name) + if f_name in requested_features: + v = ValueProto() + v.ParseFromString(feature_value) + res[f_name] = v + if f_name is HbaseConstants.EVENT_TS: + ts = struct.unpack('>L', feature_value)[0] + res_ts = datetime.fromtimestamp(ts) + if not res: + result.append((None, None)) + else: + result.append((res_ts, res)) + return result + + def update( + self, + config: RepoConfig, + tables_to_delete: Sequence[FeatureView], + tables_to_keep: Sequence[FeatureView], + entities_to_delete: Sequence[Entity], + entities_to_keep: Sequence[Entity], + partial: bool, + ): + hbase = HbaseUtils(self._get_conn(config)) + project = config.project + + # We don't create any special state for the entites in this implementation. + for table in tables_to_keep: + table_name = _table_id(project, table) + if not hbase.check_if_table_exist(table_name): + hbase.create_table_with_default_cf(table_name) + + for table in tables_to_delete: + table_name = _table_id(project, table) + hbase.delete_table(table_name) + + def teardown( + self, + config: RepoConfig, + tables: Sequence[FeatureView], + entities: Sequence[Entity], + ): + hbase = HbaseUtils(self._get_conn(config)) + project = config.project + + for table in tables: + table_name = _table_id(project, table) + hbase.delete_table(table_name) + + +def _table_id(project: str, table: FeatureView) -> str: + return f"{project}_{table.name}" diff --git a/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/hbase_utils.py b/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/hbase_utils.py new file mode 100644 index 0000000000..b78b290f17 --- /dev/null +++ b/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/hbase_utils.py @@ -0,0 +1,99 @@ +# Created by aurobindo.m on 18/04/22 +import struct +from datetime import datetime +from typing import List + +from happybase import Connection + +from feast.infra.key_encoding_utils import serialize_entity_key +from feast.protos.feast.types.EntityKey_pb2 import EntityKey + + +class HbaseConstants: + DEFAULT_COLUMN_FAMILY = "default" + EVENT_TS = "event_ts" + CREATED_TS = "created_ts" + TS_COLUMNS = [b'default:created_ts', b'default:event_ts'] + + DEFAULT_EVENT_TS = DEFAULT_COLUMN_FAMILY + ":" + EVENT_TS + DEFAULT_CREATED_TS = DEFAULT_COLUMN_FAMILY + ":" + CREATED_TS + + @staticmethod + def get_feature_from_col(col): + return col.decode('utf-8').split(':')[1] + + @staticmethod + def get_col_from_feature(feature): + if isinstance(feature, bytes): + feature = feature.decode('utf-8') + return HbaseConstants.DEFAULT_COLUMN_FAMILY + ":" + feature + + +class HbaseUtils: + def __init__(self, conn: Connection = None, host: str = None, port: int = None, timeout=None): + if conn is None: + self.host = host + self.port = port + self.conn = Connection(host=host, port=port, timeout=timeout) + else: + self.conn = conn + + def create_table(self, table_name: str, colm_family: List[str]): + cf_dict = {} + for cf in colm_family: + cf_dict[cf] = dict() + return self.conn.create_table(table_name, cf_dict) + + def create_table_with_default_cf(self, table_name: str): + return self.conn.create_table(table_name, {"default": dict()}) + + def check_if_table_exist(self, table: str): + return bytes(table, 'utf-8') in self.conn.tables() + + def batch(self, table): + return self.conn.table(table).batch() + + def put(self, table: str, row_key, data): + table = self.conn.table(table) + table.put(row_key, data) + + def row(self, table: str, row_key, columns=None, timestamp=None, include_timestamp=False): + table = self.conn.table(table) + return table.row(row_key, columns, timestamp, include_timestamp) + + def rows(self, table: str, row_keys, columns=None, timestamp=None, include_timestamp=False): + table = self.conn.table(table) + return table.rows(row_keys, columns, timestamp, include_timestamp) + + def print_table(self, table_name): + table = self.conn.table(table_name) + scan_data = table.scan() + for row_key, cols in scan_data: + print(row_key.decode('utf-8'), cols) + + def delete_table(self, table: str): + if self.check_if_table_exist(table): + self.conn.delete_table(table, disable=True) + + def close_conn(self): + self.conn.close() + + +def main(): + from feast.protos.feast.types.Value_pb2 import Value + + connection = Connection(host='localhost', port=9090) + print(connection.tables()) + # table = connection.table('test_hbase_driver_hourly_stats') + # row = table.row(serialize_entity_key(EntityKey(join_keys=["driver_id"], entity_values=[Value(int64_val=1004)])).hex()) + # + # for key, value in row.items(): + # col_name = bytes.decode(key, "utf-8").split(":")[1] + # if col_name not in [HbaseConstants.EVENT_TS, HbaseConstants.CREATED_TS]: + # print(col_name, Value().ParseFromString(value)) + # else: + # print(col_name, datetime.fromtimestamp(struct.unpack('>L', value)[0])) + + +if __name__ == '__main__': + main() diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index 5ea7a8979f..ef1871f2fe 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -36,6 +36,7 @@ "dynamodb": "feast.infra.online_stores.dynamodb.DynamoDBOnlineStore", "snowflake.online": "feast.infra.online_stores.snowflake.SnowflakeOnlineStore", "postgres": "feast.infra.online_stores.contrib.postgres.PostgreSQLOnlineStore", + "hbase": "feast.infra.online_stores.contrib.hbase_online_store.hbase.HbaseOnlineStore", } OFFLINE_STORE_CLASS_FOR_TYPE = { diff --git a/sdk/python/setup.py b/sdk/python/setup.py index b02993d68c..3d076be776 100644 --- a/sdk/python/setup.py +++ b/sdk/python/setup.py @@ -109,6 +109,10 @@ "psycopg2-binary>=2.8.3", ] +HBASE_REQUIRED = [ + "happybase>=1.2.0", +] + GE_REQUIRED = [ "great_expectations>=0.14.0,<0.15.0" ] @@ -168,6 +172,7 @@ + POSTGRES_REQUIRED + TRINO_REQUIRED + GE_REQUIRED + + HBASE_REQUIRED ) DEV_REQUIRED = ["mypy-protobuf==3.1", "grpcio-testing==1.*"] + CI_REQUIRED @@ -444,6 +449,7 @@ def copy_extensions_to_source(self): "trino": TRINO_REQUIRED, "postgres": POSTGRES_REQUIRED, "ge": GE_REQUIRED, + "hbase": HBASE_REQUIRED, "go": GO_REQUIRED, }, include_package_data=True, diff --git a/sdk/python/tests/integration/feature_repos/universal/online_store/hbase.py b/sdk/python/tests/integration/feature_repos/universal/online_store/hbase.py new file mode 100644 index 0000000000..3b4b56e3f8 --- /dev/null +++ b/sdk/python/tests/integration/feature_repos/universal/online_store/hbase.py @@ -0,0 +1,34 @@ +from typing import Dict + +from testcontainers.core.container import DockerContainer +from testcontainers.core.waiting_utils import wait_for_logs + +from tests.integration.feature_repos.universal.online_store_creator import ( + OnlineStoreCreator, +) + + +class HbaseOnlineStoreCreator(OnlineStoreCreator): + def __init__(self, project_name: str): + super().__init__(project_name) + self.container = DockerContainer( + "harisekhon/hbase" + ).with_exposed_ports("9090") + + def create_online_store(self) -> Dict[str, str]: + self.container.start() + log_string_to_wait_for = ( + "Initializing Hbase Local with the following configuration:" + ) + wait_for_logs( + container=self.container, predicate=log_string_to_wait_for, timeout=5 + ) + exposed_port = self.container.get_exposed_port("9090") + return { + "type": "hbase", + "host": "127.0.0.1", + "port": int(exposed_port) + } + + def teardown(self): + self.container.stop() \ No newline at end of file From 374fff28cfb083923eca3f62ce35b6d362f75e38 Mon Sep 17 00:00:00 2001 From: aurobindoc Date: Thu, 21 Apr 2022 13:14:28 +0530 Subject: [PATCH 2/9] Fixed lint and format Signed-off-by: aurobindoc --- .../contrib/contrib_repo_configuration.py | 4 +- .../contrib/hbase_online_store/hbase.py | 89 +++++++++++-------- .../contrib/hbase_online_store/hbase_utils.py | 67 ++++++-------- .../universal/online_store/hbase.py | 12 +-- 4 files changed, 88 insertions(+), 84 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/contrib/contrib_repo_configuration.py b/sdk/python/feast/infra/online_stores/contrib/contrib_repo_configuration.py index a12cc34be0..4e32a654b5 100644 --- a/sdk/python/feast/infra/online_stores/contrib/contrib_repo_configuration.py +++ b/sdk/python/feast/infra/online_stores/contrib/contrib_repo_configuration.py @@ -1,7 +1,9 @@ from tests.integration.feature_repos.integration_test_repo_config import ( IntegrationTestRepoConfig, ) -from tests.integration.feature_repos.universal.online_store.hbase import HbaseOnlineStoreCreator +from tests.integration.feature_repos.universal.online_store.hbase import ( + HbaseOnlineStoreCreator, +) FULL_REPO_CONFIGS = [ IntegrationTestRepoConfig(online_store_creator=HbaseOnlineStoreCreator), diff --git a/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/hbase.py b/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/hbase.py index 164b9c688d..fdf744b3a9 100644 --- a/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/hbase.py +++ b/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/hbase.py @@ -1,32 +1,36 @@ # Created by aurobindo.m on 18/04/22 -import struct import calendar +import struct from datetime import datetime from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple +from happybase import Connection + from feast import Entity from feast.feature_view import FeatureView from feast.infra.key_encoding_utils import serialize_entity_key +from feast.infra.online_stores.contrib.hbase_online_store.hbase_utils import ( + HbaseConstants, + HbaseUtils, +) from feast.infra.online_stores.online_store import OnlineStore from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto -from feast.repo_config import FeastConfigBaseModel -from feast.repo_config import RepoConfig -from happybase import Connection - -from feast.infra.online_stores.contrib.hbase_online_store.hbase_utils import HbaseUtils, HbaseConstants +from feast.repo_config import FeastConfigBaseModel, RepoConfig class HbaseOnlineStoreConfig(FeastConfigBaseModel): type: str host: str - port: int + port: str class HbaseConnection: def __init__(self, store_config: HbaseOnlineStoreConfig): self._store_config = store_config - self._real_conn = Connection(host=store_config.host, port=store_config.port) + self._real_conn = Connection( + host=store_config.host, port=int(store_config.port) + ) @property def real_conn(self) -> Connection: @@ -45,11 +49,18 @@ def _get_conn(self, config: RepoConfig): assert isinstance(store_config, HbaseOnlineStoreConfig) if not self._conn: - self._conn = Connection(host=store_config.host, port=store_config.port) + self._conn = Connection(host=store_config.host, port=int(store_config.port)) return self._conn - def online_write_batch(self, config: RepoConfig, table: FeatureView, data: List[ - Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]], progress: Optional[Callable[[int], Any]], ) -> None: + def online_write_batch( + self, + config: RepoConfig, + table: FeatureView, + data: List[ + Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] + ], + progress: Optional[Callable[[int], Any]], + ) -> None: hbase = HbaseUtils(self._get_conn(config)) project = config.project @@ -60,25 +71,31 @@ def online_write_batch(self, config: RepoConfig, table: FeatureView, data: List[ row_key = serialize_entity_key(entity_key).hex() values_dict = {} for feature_name, val in values.items(): - values_dict[HbaseConstants.get_col_from_feature(feature_name)] = val.SerializeToString() + values_dict[ + HbaseConstants.get_col_from_feature(feature_name) + ] = val.SerializeToString() if isinstance(timestamp, datetime): - timestamp = int(calendar.timegm(timestamp.timetuple())) - timestamp = struct.pack('>L', timestamp) - values_dict[HbaseConstants.DEFAULT_EVENT_TS] = timestamp + values_dict[HbaseConstants.DEFAULT_EVENT_TS] = struct.pack( + ">L", int(calendar.timegm(timestamp.timetuple())) + ) + else: + values_dict[HbaseConstants.DEFAULT_EVENT_TS] = timestamp if created_ts is not None: if isinstance(created_ts, datetime): - created_ts = int(calendar.timegm(created_ts.timetuple())) - created_ts = struct.pack('>L', created_ts) - values_dict[HbaseConstants.DEFAULT_CREATED_TS] = created_ts + values_dict[HbaseConstants.DEFAULT_CREATED_TS] = struct.pack( + ">L", int(calendar.timegm(created_ts.timetuple())) + ) + else: + values_dict[HbaseConstants.DEFAULT_CREATED_TS] = created_ts b.put(row_key, values_dict) b.send() def online_read( - self, - config: RepoConfig, - table: FeatureView, - entity_keys: List[EntityKeyProto], - requested_features: Optional[List[str]] = None, + self, + config: RepoConfig, + table: FeatureView, + entity_keys: List[EntityKeyProto], + requested_features: Optional[List[str]] = None, ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: hbase = HbaseUtils(self._get_conn(config)) project = config.project @@ -93,12 +110,12 @@ def online_read( res_ts = None for feature_name, feature_value in val.items(): f_name = HbaseConstants.get_feature_from_col(feature_name) - if f_name in requested_features: + if requested_features is not None and f_name in requested_features: v = ValueProto() v.ParseFromString(feature_value) res[f_name] = v if f_name is HbaseConstants.EVENT_TS: - ts = struct.unpack('>L', feature_value)[0] + ts = struct.unpack(">L", feature_value)[0] res_ts = datetime.fromtimestamp(ts) if not res: result.append((None, None)) @@ -107,13 +124,13 @@ def online_read( return result def update( - self, - config: RepoConfig, - tables_to_delete: Sequence[FeatureView], - tables_to_keep: Sequence[FeatureView], - entities_to_delete: Sequence[Entity], - entities_to_keep: Sequence[Entity], - partial: bool, + self, + config: RepoConfig, + tables_to_delete: Sequence[FeatureView], + tables_to_keep: Sequence[FeatureView], + entities_to_delete: Sequence[Entity], + entities_to_keep: Sequence[Entity], + partial: bool, ): hbase = HbaseUtils(self._get_conn(config)) project = config.project @@ -129,10 +146,10 @@ def update( hbase.delete_table(table_name) def teardown( - self, - config: RepoConfig, - tables: Sequence[FeatureView], - entities: Sequence[Entity], + self, + config: RepoConfig, + tables: Sequence[FeatureView], + entities: Sequence[Entity], ): hbase = HbaseUtils(self._get_conn(config)) project = config.project diff --git a/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/hbase_utils.py b/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/hbase_utils.py index b78b290f17..755c78fecc 100644 --- a/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/hbase_utils.py +++ b/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/hbase_utils.py @@ -1,36 +1,33 @@ # Created by aurobindo.m on 18/04/22 -import struct -from datetime import datetime from typing import List from happybase import Connection -from feast.infra.key_encoding_utils import serialize_entity_key -from feast.protos.feast.types.EntityKey_pb2 import EntityKey - class HbaseConstants: DEFAULT_COLUMN_FAMILY = "default" EVENT_TS = "event_ts" CREATED_TS = "created_ts" - TS_COLUMNS = [b'default:created_ts', b'default:event_ts'] + TS_COLUMNS = [b"default:created_ts", b"default:event_ts"] DEFAULT_EVENT_TS = DEFAULT_COLUMN_FAMILY + ":" + EVENT_TS DEFAULT_CREATED_TS = DEFAULT_COLUMN_FAMILY + ":" + CREATED_TS @staticmethod def get_feature_from_col(col): - return col.decode('utf-8').split(':')[1] + return col.decode("utf-8").split(":")[1] @staticmethod def get_col_from_feature(feature): if isinstance(feature, bytes): - feature = feature.decode('utf-8') + feature = feature.decode("utf-8") return HbaseConstants.DEFAULT_COLUMN_FAMILY + ":" + feature class HbaseUtils: - def __init__(self, conn: Connection = None, host: str = None, port: int = None, timeout=None): + def __init__( + self, conn: Connection = None, host: str = None, port: int = None, timeout=None + ): if conn is None: self.host = host self.port = port @@ -39,7 +36,7 @@ def __init__(self, conn: Connection = None, host: str = None, port: int = None, self.conn = conn def create_table(self, table_name: str, colm_family: List[str]): - cf_dict = {} + cf_dict: dict = {} for cf in colm_family: cf_dict[cf] = dict() return self.conn.create_table(table_name, cf_dict) @@ -48,28 +45,42 @@ def create_table_with_default_cf(self, table_name: str): return self.conn.create_table(table_name, {"default": dict()}) def check_if_table_exist(self, table: str): - return bytes(table, 'utf-8') in self.conn.tables() + return bytes(table, "utf-8") in self.conn.tables() def batch(self, table): return self.conn.table(table).batch() - def put(self, table: str, row_key, data): - table = self.conn.table(table) + def put(self, table_name: str, row_key, data): + table = self.conn.table(table_name) table.put(row_key, data) - def row(self, table: str, row_key, columns=None, timestamp=None, include_timestamp=False): - table = self.conn.table(table) + def row( + self, + table_name: str, + row_key, + columns=None, + timestamp=None, + include_timestamp=False, + ): + table = self.conn.table(table_name) return table.row(row_key, columns, timestamp, include_timestamp) - def rows(self, table: str, row_keys, columns=None, timestamp=None, include_timestamp=False): - table = self.conn.table(table) + def rows( + self, + table_name: str, + row_keys, + columns=None, + timestamp=None, + include_timestamp=False, + ): + table = self.conn.table(table_name) return table.rows(row_keys, columns, timestamp, include_timestamp) def print_table(self, table_name): table = self.conn.table(table_name) scan_data = table.scan() for row_key, cols in scan_data: - print(row_key.decode('utf-8'), cols) + print(row_key.decode("utf-8"), cols) def delete_table(self, table: str): if self.check_if_table_exist(table): @@ -77,23 +88,3 @@ def delete_table(self, table: str): def close_conn(self): self.conn.close() - - -def main(): - from feast.protos.feast.types.Value_pb2 import Value - - connection = Connection(host='localhost', port=9090) - print(connection.tables()) - # table = connection.table('test_hbase_driver_hourly_stats') - # row = table.row(serialize_entity_key(EntityKey(join_keys=["driver_id"], entity_values=[Value(int64_val=1004)])).hex()) - # - # for key, value in row.items(): - # col_name = bytes.decode(key, "utf-8").split(":")[1] - # if col_name not in [HbaseConstants.EVENT_TS, HbaseConstants.CREATED_TS]: - # print(col_name, Value().ParseFromString(value)) - # else: - # print(col_name, datetime.fromtimestamp(struct.unpack('>L', value)[0])) - - -if __name__ == '__main__': - main() diff --git a/sdk/python/tests/integration/feature_repos/universal/online_store/hbase.py b/sdk/python/tests/integration/feature_repos/universal/online_store/hbase.py index 3b4b56e3f8..e53435bc0f 100644 --- a/sdk/python/tests/integration/feature_repos/universal/online_store/hbase.py +++ b/sdk/python/tests/integration/feature_repos/universal/online_store/hbase.py @@ -11,9 +11,7 @@ class HbaseOnlineStoreCreator(OnlineStoreCreator): def __init__(self, project_name: str): super().__init__(project_name) - self.container = DockerContainer( - "harisekhon/hbase" - ).with_exposed_ports("9090") + self.container = DockerContainer("harisekhon/hbase").with_exposed_ports("9090") def create_online_store(self) -> Dict[str, str]: self.container.start() @@ -24,11 +22,7 @@ def create_online_store(self) -> Dict[str, str]: container=self.container, predicate=log_string_to_wait_for, timeout=5 ) exposed_port = self.container.get_exposed_port("9090") - return { - "type": "hbase", - "host": "127.0.0.1", - "port": int(exposed_port) - } + return {"type": "hbase", "host": "127.0.0.1", "port": exposed_port} def teardown(self): - self.container.stop() \ No newline at end of file + self.container.stop() From 1ad256b059168d65441b20ca752d2149cf1ee0b0 Mon Sep 17 00:00:00 2001 From: aurobindoc Date: Thu, 21 Apr 2022 13:30:57 +0530 Subject: [PATCH 3/9] Added documentation for hbase online store Signed-off-by: aurobindoc --- .../contrib/hbase_online_store/README.md | 80 +++++++++++++++++++ 1 file changed, 80 insertions(+) create mode 100644 sdk/python/feast/infra/online_stores/contrib/hbase_online_store/README.md diff --git a/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/README.md b/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/README.md new file mode 100644 index 0000000000..5c3fda0e3c --- /dev/null +++ b/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/README.md @@ -0,0 +1,80 @@ +# Hbase Online Store +Hbase is not included in current [Feast](https://github.com/feast-dev/feast) roadmap, this project intends to add Hbase +support for Offline Store. + +We create a table _ which gets updated with data on every materialize call + + +#### Create a feature repository + +```shell +feast init feature_repo +cd feature_repo +``` + +#### Edit `feature_store.yaml` + +set `online_store` type to be `feast_hbase.hbase.HbaseOnlineStore` + +```yaml +project: feature_repo +registry: data/registry.db +provider: local +online_store: + type: hbase + host: 127.0.0.1 # hbase thrift endpoint + port: 9090 # hbase thrift api port +``` + +#### Apply the feature definitions in `example.py` + +```shell +feast -c feature_repo apply +``` +##### Output +``` +Registered entity driver_id +Registered feature view driver_hourly_stats_view +Deploying infrastructure for driver_hourly_stats_view +``` + +### Migrate Latest Data to Online Feature Store (Hbase) +``` +$ CURRENT_TIME=$(date -u +"%Y-%m-%dT%H:%M:%S") +$ feast -c feature_repo materialize-incremental $CURRENT_TIME +``` +#### Output +``` +Materializing 1 feature views from 2022-04-16 15:30:39+05:30 to 2022-04-19 15:31:04+05:30 into the hbase online store. + +driver_hourly_stats_view from 2022-04-16 15:30:39+05:30 to 2022-04-19 15:31:04+05:30: +100%|████████████████████████████████████████████████████████████████| 5/5 [00:00<00:00, 120.59it/s] +``` + +### Fetch the latest features for some entity id +```python +from pprint import pprint +from feast import FeatureStore + +store = FeatureStore(repo_path=".") +feature_vector = store.get_online_features( + features=[ + "driver_hourly_stats:conv_rate", + "driver_hourly_stats:acc_rate", + "driver_hourly_stats:avg_daily_trips", + ], + entity_rows=[ + {"driver_id": 1004}, + {"driver_id": 1005}, + ], +).to_dict() +pprint(feature_vector) + +``` +#### Output +``` +{'acc_rate': [0.01390857808291912, 0.4063614010810852], + 'avg_daily_trips': [69, 706], + 'conv_rate': [0.6624961495399475, 0.7595928311347961], + 'driver_id': [1004, 1005]} +``` \ No newline at end of file From b1554dc22173f1fb0166c992b6191095c1722e75 Mon Sep 17 00:00:00 2001 From: aurobindoc Date: Thu, 21 Apr 2022 13:34:32 +0530 Subject: [PATCH 4/9] Minor update in documentation for hbase online store Signed-off-by: aurobindoc --- .../infra/online_stores/contrib/hbase_online_store/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/README.md b/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/README.md index 5c3fda0e3c..ad0154f709 100644 --- a/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/README.md +++ b/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/README.md @@ -1,6 +1,6 @@ # Hbase Online Store Hbase is not included in current [Feast](https://github.com/feast-dev/feast) roadmap, this project intends to add Hbase -support for Offline Store. +support for Online Store. We create a table _ which gets updated with data on every materialize call From 4187da88909d8228a7e2ae44b98c0b803d144ba6 Mon Sep 17 00:00:00 2001 From: aurobindoc Date: Thu, 21 Apr 2022 15:27:42 +0530 Subject: [PATCH 5/9] Added docstrings for all the method of hbase online store Signed-off-by: aurobindoc --- .../contrib/hbase_online_store/hbase.py | 75 +++++++++++++++- .../contrib/hbase_online_store/hbase_utils.py | 86 +++++++++++++++++-- 2 files changed, 150 insertions(+), 11 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/hbase.py b/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/hbase.py index fdf744b3a9..6bc84d3198 100644 --- a/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/hbase.py +++ b/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/hbase.py @@ -2,7 +2,7 @@ import calendar import struct from datetime import datetime -from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple +from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Tuple from happybase import Connection @@ -20,12 +20,26 @@ class HbaseOnlineStoreConfig(FeastConfigBaseModel): - type: str + """Online store config for Hbase store""" + + type: Literal["hbase"] = "hbase" + """Online store type selector""" + host: str + """Hostname of Hbase Thrift server""" + port: str + """Port in which Hbase Thrift server is running""" class HbaseConnection: + """ + Hbase connecttion to connect to hbase. + + Attributes: + store_config: Online store config for Hbase store. + """ + def __init__(self, store_config: HbaseOnlineStoreConfig): self._store_config = store_config self._real_conn = Connection( @@ -34,16 +48,31 @@ def __init__(self, store_config: HbaseOnlineStoreConfig): @property def real_conn(self) -> Connection: + """Stores the real happybase Connection to connect to hbase.""" return self._real_conn def close(self) -> None: + """Close the happybase connection.""" self.real_conn.close() class HbaseOnlineStore(OnlineStore): + """ + Online feature store for Hbase. + + Attributes: + _conn: Happybase Connection to connect to hbase thrift server. + """ + _conn: Connection = None def _get_conn(self, config: RepoConfig): + """ + Get or Create Hbase Connection from Repoconfig. + + Args: + config: The RepoConfig for the current FeatureStore. + """ store_config = config.online_store assert isinstance(store_config, HbaseOnlineStoreConfig) @@ -61,6 +90,18 @@ def online_write_batch( ], progress: Optional[Callable[[int], Any]], ) -> None: + """ + Write a batch of feature rows to Hbase online store. + + Args: + config: The RepoConfig for the current FeatureStore. + table: Feast FeatureView. + data: a list of quadruplets containing Feature data. Each quadruplet contains an Entity Key, + a dict containing feature values, an event timestamp for the row, and + the created timestamp for the row if it exists. + progress: Optional function to be called once every mini-batch of rows is written to + the online store. Can be used to display progress. + """ hbase = HbaseUtils(self._get_conn(config)) project = config.project @@ -97,6 +138,14 @@ def online_read( entity_keys: List[EntityKeyProto], requested_features: Optional[List[str]] = None, ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: + """ + Retrieve feature values from the Hbase online store. + + Args: + config: The RepoConfig for the current FeatureStore. + table: Feast FeatureView. + entity_keys: a list of entity keys that should be read from the FeatureStore. + """ hbase = HbaseUtils(self._get_conn(config)) project = config.project table_name = _table_id(project, table) @@ -132,6 +181,14 @@ def update( entities_to_keep: Sequence[Entity], partial: bool, ): + """ + Update tables from the Hbase Online Store. + + Args: + config: The RepoConfig for the current FeatureStore. + tables_to_delete: Tables to delete from the Hbase Online Store. + tables_to_keep: Tables to keep in the Hbase Online Store. + """ hbase = HbaseUtils(self._get_conn(config)) project = config.project @@ -151,6 +208,13 @@ def teardown( tables: Sequence[FeatureView], entities: Sequence[Entity], ): + """ + Delete tables from the Hbase Online Store. + + Args: + config: The RepoConfig for the current FeatureStore. + tables: Tables to delete from the feature repo. + """ hbase = HbaseUtils(self._get_conn(config)) project = config.project @@ -160,4 +224,11 @@ def teardown( def _table_id(project: str, table: FeatureView) -> str: + """ + Returns table name given the project_name and the feature_view. + + Args: + project: Name of the feast project. + table: Feast FeatureView. + """ return f"{project}_{table.name}" diff --git a/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/hbase_utils.py b/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/hbase_utils.py index 755c78fecc..9fc50f5e5c 100644 --- a/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/hbase_utils.py +++ b/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/hbase_utils.py @@ -5,26 +5,38 @@ class HbaseConstants: + """Constants to be used by the Hbase Online Store.""" + DEFAULT_COLUMN_FAMILY = "default" EVENT_TS = "event_ts" CREATED_TS = "created_ts" - TS_COLUMNS = [b"default:created_ts", b"default:event_ts"] - DEFAULT_EVENT_TS = DEFAULT_COLUMN_FAMILY + ":" + EVENT_TS DEFAULT_CREATED_TS = DEFAULT_COLUMN_FAMILY + ":" + CREATED_TS @staticmethod def get_feature_from_col(col): + """Given the column name, exclude the column family to get the feature name.""" return col.decode("utf-8").split(":")[1] @staticmethod def get_col_from_feature(feature): + """Given the feature name, add the column family to get the column name.""" if isinstance(feature, bytes): feature = feature.decode("utf-8") return HbaseConstants.DEFAULT_COLUMN_FAMILY + ":" + feature class HbaseUtils: + """ + Utils class to manage different Hbase operations. + + Attributes: + conn: happybase Connection to connect to hbase. + host: hostname of the hbase thrift server. + port: port in which thrift server is running. + timeout: socket timeout in milliseconds. + """ + def __init__( self, conn: Connection = None, host: str = None, port: int = None, timeout=None ): @@ -36,21 +48,54 @@ def __init__( self.conn = conn def create_table(self, table_name: str, colm_family: List[str]): + """ + Create table in hbase online store. + + Arguments: + table_name: Name of the Hbase table. + colm_family: List of names of column families to be created in the hbase table. + """ cf_dict: dict = {} for cf in colm_family: cf_dict[cf] = dict() return self.conn.create_table(table_name, cf_dict) def create_table_with_default_cf(self, table_name: str): - return self.conn.create_table(table_name, {"default": dict()}) + """ + Create table in hbase online store with one column family "default". - def check_if_table_exist(self, table: str): - return bytes(table, "utf-8") in self.conn.tables() - - def batch(self, table): - return self.conn.table(table).batch() + Arguments: + table_name: Name of the Hbase table. + """ + return self.conn.create_table(table_name, {"default": dict()}) - def put(self, table_name: str, row_key, data): + def check_if_table_exist(self, table_name: str): + """ + Check if table exists in hbase. + + Arguments: + table_name: Name of the Hbase table. + """ + return bytes(table_name, "utf-8") in self.conn.tables() + + def batch(self, table_name: str): + """ + Returns a 'Batch' instance that can be used for mass data manipulation in the habse table. + + Arguments: + table_name: Name of the Hbase table. + """ + return self.conn.table(table_name).batch() + + def put(self, table_name: str, row_key: str, data: dict): + """ + Store data in the hbase table. + + Arguments: + table_name: Name of the Hbase table. + row_key: Row key of the row to be inserted to habse table. + data: Mapping of column family name:column name to column values + """ table = self.conn.table(table_name) table.put(row_key, data) @@ -62,6 +107,16 @@ def row( timestamp=None, include_timestamp=False, ): + """ + Fetch a row of data from the hbase table. + + Arguments: + table_name: Name of the Hbase table. + row_key: Row key of the row to be inserted to habse table. + columns: the name of columns that needs to be fetched. + timestamp: timestamp specifies the maximum version the cells can have. + include_timestamp: specifies if (column, timestamp) to be return instead of only column. + """ table = self.conn.table(table_name) return table.row(row_key, columns, timestamp, include_timestamp) @@ -73,18 +128,31 @@ def rows( timestamp=None, include_timestamp=False, ): + """ + Fetch multiple rows of data from the hbase table. + + Arguments: + table_name: Name of the Hbase table. + row_keys: List of row key of the row to be inserted to habse table. + columns: the name of columns that needs to be fetched. + timestamp: timestamp specifies the maximum version the cells can have. + include_timestamp: specifies if (column, timestamp) to be return instead of only column. + """ table = self.conn.table(table_name) return table.rows(row_keys, columns, timestamp, include_timestamp) def print_table(self, table_name): + """Prints the table scanning all the rows of the hbase table.""" table = self.conn.table(table_name) scan_data = table.scan() for row_key, cols in scan_data: print(row_key.decode("utf-8"), cols) def delete_table(self, table: str): + """Deletes the hbase table given the table name.""" if self.check_if_table_exist(table): self.conn.delete_table(table, disable=True) def close_conn(self): + """Closes the happybase connection.""" self.conn.close() From 984f7b5f59cd3823a84f0c403b116b4397fa336d Mon Sep 17 00:00:00 2001 From: aurobindoc Date: Thu, 21 Apr 2022 15:52:53 +0530 Subject: [PATCH 6/9] Added template for hbase online store Signed-off-by: aurobindoc --- sdk/python/feast/cli.py | 3 +- .../contrib/hbase_online_store/hbase.py | 5 +-- .../hbase_utils.py | 0 sdk/python/feast/templates/hbase/__init__.py | 0 sdk/python/feast/templates/hbase/bootstrap.py | 35 ++++++++++++++++++ sdk/python/feast/templates/hbase/example.py | 36 +++++++++++++++++++ .../feast/templates/hbase/feature_store.yaml | 7 ++++ 7 files changed, 81 insertions(+), 5 deletions(-) rename sdk/python/feast/infra/{online_stores/contrib/hbase_online_store => utils}/hbase_utils.py (100%) create mode 100644 sdk/python/feast/templates/hbase/__init__.py create mode 100644 sdk/python/feast/templates/hbase/bootstrap.py create mode 100644 sdk/python/feast/templates/hbase/example.py create mode 100644 sdk/python/feast/templates/hbase/feature_store.yaml diff --git a/sdk/python/feast/cli.py b/sdk/python/feast/cli.py index ec5b6cd1b6..bb920c3537 100644 --- a/sdk/python/feast/cli.py +++ b/sdk/python/feast/cli.py @@ -539,7 +539,8 @@ def materialize_incremental_command(ctx: click.Context, end_ts: str, views: List "--template", "-t", type=click.Choice( - ["local", "gcp", "aws", "snowflake", "spark", "postgres"], case_sensitive=False + ["local", "gcp", "aws", "snowflake", "spark", "postgres", "hbase"], + case_sensitive=False, ), help="Specify a template for the created project", default="local", diff --git a/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/hbase.py b/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/hbase.py index 6bc84d3198..7e29cd05d0 100644 --- a/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/hbase.py +++ b/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/hbase.py @@ -9,11 +9,8 @@ from feast import Entity from feast.feature_view import FeatureView from feast.infra.key_encoding_utils import serialize_entity_key -from feast.infra.online_stores.contrib.hbase_online_store.hbase_utils import ( - HbaseConstants, - HbaseUtils, -) from feast.infra.online_stores.online_store import OnlineStore +from feast.infra.utils.hbase_utils import HbaseConstants, HbaseUtils from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.repo_config import FeastConfigBaseModel, RepoConfig diff --git a/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/hbase_utils.py b/sdk/python/feast/infra/utils/hbase_utils.py similarity index 100% rename from sdk/python/feast/infra/online_stores/contrib/hbase_online_store/hbase_utils.py rename to sdk/python/feast/infra/utils/hbase_utils.py diff --git a/sdk/python/feast/templates/hbase/__init__.py b/sdk/python/feast/templates/hbase/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/sdk/python/feast/templates/hbase/bootstrap.py b/sdk/python/feast/templates/hbase/bootstrap.py new file mode 100644 index 0000000000..4013ca5a8d --- /dev/null +++ b/sdk/python/feast/templates/hbase/bootstrap.py @@ -0,0 +1,35 @@ +def bootstrap(): + # Bootstrap() will automatically be called from the init_repo() during `feast init` + + import pathlib + from datetime import datetime, timedelta + + from feast.driver_test_data import create_driver_hourly_stats_df + + repo_path = pathlib.Path(__file__).parent.absolute() + data_path = repo_path / "data" + data_path.mkdir(exist_ok=True) + + end_date = datetime.now().replace(microsecond=0, second=0, minute=0) + start_date = end_date - timedelta(days=15) + + driver_entities = [1001, 1002, 1003, 1004, 1005] + driver_df = create_driver_hourly_stats_df(driver_entities, start_date, end_date) + + driver_stats_path = data_path / "driver_stats.parquet" + driver_df.to_parquet(path=str(driver_stats_path), allow_truncated_timestamps=True) + + example_py_file = repo_path / "example.py" + replace_str_in_file(example_py_file, "%PARQUET_PATH%", str(driver_stats_path)) + + +def replace_str_in_file(file_path, match_str, sub_str): + with open(file_path, "r") as f: + contents = f.read() + contents = contents.replace(match_str, sub_str) + with open(file_path, "wt") as f: + f.write(contents) + + +if __name__ == "__main__": + bootstrap() diff --git a/sdk/python/feast/templates/hbase/example.py b/sdk/python/feast/templates/hbase/example.py new file mode 100644 index 0000000000..1d441e0e99 --- /dev/null +++ b/sdk/python/feast/templates/hbase/example.py @@ -0,0 +1,36 @@ +# This is an example feature definition file + +from datetime import timedelta + +from feast import Entity, FeatureView, Field, FileSource, ValueType +from feast.types import Float32, Int64 + +# Read data from parquet files. Parquet is convenient for local development mode. For +# production, you can use your favorite DWH, such as BigQuery. See Feast documentation +# for more info. +driver_hourly_stats = FileSource( + path="%PARQUET_PATH%", + timestamp_field="event_timestamp", + created_timestamp_column="created", +) + +# Define an entity for the driver. You can think of entity as a primary key used to +# fetch features. +driver = Entity(name="driver", join_keys=["driver_id"], value_type=ValueType.INT64,) + +# Our parquet files contain sample data that includes a driver_id column, timestamps and +# three feature column. Here we define a Feature View that will allow us to serve this +# data to our model online. +driver_hourly_stats_view = FeatureView( + name="driver_hourly_stats", + entities=["driver"], + ttl=timedelta(days=1), + schema=[ + Field(name="conv_rate", dtype=Float32), + Field(name="acc_rate", dtype=Float32), + Field(name="avg_daily_trips", dtype=Int64), + ], + online=True, + source=driver_hourly_stats, + tags={}, +) diff --git a/sdk/python/feast/templates/hbase/feature_store.yaml b/sdk/python/feast/templates/hbase/feature_store.yaml new file mode 100644 index 0000000000..c24b1a1459 --- /dev/null +++ b/sdk/python/feast/templates/hbase/feature_store.yaml @@ -0,0 +1,7 @@ +project: my_project +registry: data/registry.db +provider: local +online_store: + type: hbase + host: 127.0.0.1 + port: 9090 \ No newline at end of file From 0a9a0cd864934b41b33e056f6f6ddac3dedd05f0 Mon Sep 17 00:00:00 2001 From: aurobindoc Date: Thu, 21 Apr 2022 16:19:38 +0530 Subject: [PATCH 7/9] fixed docstring tests Signed-off-by: aurobindoc --- .../infra/online_stores/contrib/hbase_online_store/hbase.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/hbase.py b/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/hbase.py index 7e29cd05d0..cb92a24c15 100644 --- a/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/hbase.py +++ b/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/hbase.py @@ -2,9 +2,10 @@ import calendar import struct from datetime import datetime -from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Tuple +from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple from happybase import Connection +from pydantic.typing import Literal from feast import Entity from feast.feature_view import FeatureView From d87b4410024c91cf5c76b3a351089e31dd420fed Mon Sep 17 00:00:00 2001 From: aurobindoc Date: Fri, 22 Apr 2022 10:59:37 +0530 Subject: [PATCH 8/9] Addressed review comments Signed-off-by: aurobindoc --- .../infra/online_stores/contrib/__init__.py | 1 - .../contrib/hbase_online_store/README.md | 10 ++--- .../contrib/hbase_online_store/__init__.py | 1 - .../contrib/hbase_online_store/hbase.py | 12 +++--- ...uration.py => hbase_repo_configuration.py} | 0 sdk/python/feast/infra/utils/hbase_utils.py | 41 ++++++++++++++++--- .../feast/templates/hbase/feature_store.yaml | 2 +- 7 files changed, 48 insertions(+), 19 deletions(-) rename sdk/python/feast/infra/online_stores/contrib/{contrib_repo_configuration.py => hbase_repo_configuration.py} (100%) diff --git a/sdk/python/feast/infra/online_stores/contrib/__init__.py b/sdk/python/feast/infra/online_stores/contrib/__init__.py index d1cb8b66e9..e69de29bb2 100644 --- a/sdk/python/feast/infra/online_stores/contrib/__init__.py +++ b/sdk/python/feast/infra/online_stores/contrib/__init__.py @@ -1 +0,0 @@ -# Created by aurobindo.m on 18/04/22 diff --git a/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/README.md b/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/README.md index ad0154f709..8b148591be 100644 --- a/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/README.md +++ b/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/README.md @@ -1,7 +1,5 @@ -# Hbase Online Store -Hbase is not included in current [Feast](https://github.com/feast-dev/feast) roadmap, this project intends to add Hbase -support for Online Store. - +# HBase Online Store +HBase is not included in current [Feast](https://github.com/feast-dev/feast) roadmap, this project intends to add HBase support for Online Store. We create a table _ which gets updated with data on every materialize call @@ -14,7 +12,7 @@ cd feature_repo #### Edit `feature_store.yaml` -set `online_store` type to be `feast_hbase.hbase.HbaseOnlineStore` +set `online_store` type to be `hbase` ```yaml project: feature_repo @@ -38,7 +36,7 @@ Registered feature view driver_hourly_stats_view Deploying infrastructure for driver_hourly_stats_view ``` -### Migrate Latest Data to Online Feature Store (Hbase) +### Migrate Latest Data to Online Feature Store (HBase) ``` $ CURRENT_TIME=$(date -u +"%Y-%m-%dT%H:%M:%S") $ feast -c feature_repo materialize-incremental $CURRENT_TIME diff --git a/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/__init__.py b/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/__init__.py index d1cb8b66e9..e69de29bb2 100644 --- a/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/__init__.py +++ b/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/__init__.py @@ -1 +0,0 @@ -# Created by aurobindo.m on 18/04/22 diff --git a/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/hbase.py b/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/hbase.py index cb92a24c15..0c0945e786 100644 --- a/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/hbase.py +++ b/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/hbase.py @@ -1,4 +1,3 @@ -# Created by aurobindo.m on 18/04/22 import calendar import struct from datetime import datetime @@ -150,12 +149,15 @@ def online_read( result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = [] - for entity_key in entity_keys: - row_key = serialize_entity_key(entity_key).hex() - val = hbase.row(table_name, row_key=row_key) + row_keys = [ + serialize_entity_key(entity_key).hex() for entity_key in entity_keys + ] + rows = hbase.rows(table_name, row_keys=row_keys) + + for _, row in rows: res = {} res_ts = None - for feature_name, feature_value in val.items(): + for feature_name, feature_value in row.items(): f_name = HbaseConstants.get_feature_from_col(feature_name) if requested_features is not None and f_name in requested_features: v = ValueProto() diff --git a/sdk/python/feast/infra/online_stores/contrib/contrib_repo_configuration.py b/sdk/python/feast/infra/online_stores/contrib/hbase_repo_configuration.py similarity index 100% rename from sdk/python/feast/infra/online_stores/contrib/contrib_repo_configuration.py rename to sdk/python/feast/infra/online_stores/contrib/hbase_repo_configuration.py diff --git a/sdk/python/feast/infra/utils/hbase_utils.py b/sdk/python/feast/infra/utils/hbase_utils.py index 9fc50f5e5c..78a39caed8 100644 --- a/sdk/python/feast/infra/utils/hbase_utils.py +++ b/sdk/python/feast/infra/utils/hbase_utils.py @@ -1,8 +1,10 @@ -# Created by aurobindo.m on 18/04/22 from typing import List from happybase import Connection +from feast.infra.key_encoding_utils import serialize_entity_key +from feast.protos.feast.types.EntityKey_pb2 import EntityKey + class HbaseConstants: """Constants to be used by the Hbase Online Store.""" @@ -80,7 +82,7 @@ def check_if_table_exist(self, table_name: str): def batch(self, table_name: str): """ - Returns a 'Batch' instance that can be used for mass data manipulation in the habse table. + Returns a 'Batch' instance that can be used for mass data manipulation in the hbase table. Arguments: table_name: Name of the Hbase table. @@ -93,7 +95,7 @@ def put(self, table_name: str, row_key: str, data: dict): Arguments: table_name: Name of the Hbase table. - row_key: Row key of the row to be inserted to habse table. + row_key: Row key of the row to be inserted to hbase table. data: Mapping of column family name:column name to column values """ table = self.conn.table(table_name) @@ -112,7 +114,7 @@ def row( Arguments: table_name: Name of the Hbase table. - row_key: Row key of the row to be inserted to habse table. + row_key: Row key of the row to be inserted to hbase table. columns: the name of columns that needs to be fetched. timestamp: timestamp specifies the maximum version the cells can have. include_timestamp: specifies if (column, timestamp) to be return instead of only column. @@ -133,7 +135,7 @@ def rows( Arguments: table_name: Name of the Hbase table. - row_keys: List of row key of the row to be inserted to habse table. + row_keys: List of row key of the row to be inserted to hbase table. columns: the name of columns that needs to be fetched. timestamp: timestamp specifies the maximum version the cells can have. include_timestamp: specifies if (column, timestamp) to be return instead of only column. @@ -156,3 +158,32 @@ def delete_table(self, table: str): def close_conn(self): """Closes the happybase connection.""" self.conn.close() + + +def main(): + from feast.protos.feast.types.Value_pb2 import Value + + connection = Connection(host="localhost", port=9090) + table = connection.table("test_hbase_driver_hourly_stats") + row_keys = [ + serialize_entity_key( + EntityKey(join_keys=["driver_id"], entity_values=[Value(int64_val=1004)]) + ).hex(), + serialize_entity_key( + EntityKey(join_keys=["driver_id"], entity_values=[Value(int64_val=1005)]) + ).hex(), + serialize_entity_key( + EntityKey(join_keys=["driver_id"], entity_values=[Value(int64_val=1024)]) + ).hex(), + ] + rows = table.rows(row_keys) + + for row_key, row in rows: + for key, value in row.items(): + col_name = bytes.decode(key, "utf-8").split(":")[1] + print(col_name, value) + print() + + +if __name__ == "__main__": + main() diff --git a/sdk/python/feast/templates/hbase/feature_store.yaml b/sdk/python/feast/templates/hbase/feature_store.yaml index c24b1a1459..83ce237b71 100644 --- a/sdk/python/feast/templates/hbase/feature_store.yaml +++ b/sdk/python/feast/templates/hbase/feature_store.yaml @@ -4,4 +4,4 @@ provider: local online_store: type: hbase host: 127.0.0.1 - port: 9090 \ No newline at end of file + port: 9090 From 5774d6b5a7c88b497ee6e96d7f7c7c95c5f8270f Mon Sep 17 00:00:00 2001 From: aurobindoc Date: Sun, 24 Apr 2022 09:51:11 +0530 Subject: [PATCH 9/9] resolves a couple of nits Signed-off-by: aurobindoc --- CONTRIBUTING.md | 1 + .../infra/online_stores/contrib/hbase_online_store/README.md | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index de242fea54..05dc1e3eda 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -177,6 +177,7 @@ The services with containerized replacements currently implemented are: - DynamoDB - Redis - Trino +- HBase You can run `make test-python-integration-container` to run tests against the containerized versions of dependencies. diff --git a/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/README.md b/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/README.md index 8b148591be..651e4e90b8 100644 --- a/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/README.md +++ b/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/README.md @@ -36,7 +36,7 @@ Registered feature view driver_hourly_stats_view Deploying infrastructure for driver_hourly_stats_view ``` -### Migrate Latest Data to Online Feature Store (HBase) +### Materialize Latest Data to Online Feature Store (HBase) ``` $ CURRENT_TIME=$(date -u +"%Y-%m-%dT%H:%M:%S") $ feast -c feature_repo materialize-incremental $CURRENT_TIME