Skip to content
Merged
Changes from 1 commit
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
b349ad6
Initial implementation of BigTable online store.
chhabrakadabra Aug 25, 2022
f9f45bb
Attempt to run bigtable integration tests.
chhabrakadabra Sep 5, 2022
2a1a863
Got the BigTable tests running in local containers
chhabrakadabra Sep 5, 2022
96814d6
Set serialization version when computing entity ID
chhabrakadabra Sep 6, 2022
6e6233f
Switch to the recommended layout in bigtable.
chhabrakadabra Sep 6, 2022
2a0d09a
Minor bugfixes.
chhabrakadabra Sep 10, 2022
98532a5
Move BigTable online store out of contrib
chhabrakadabra Sep 27, 2022
2a65fef
Attempt to run integration tests in CI.
chhabrakadabra Sep 28, 2022
de795f3
Delete tables for entity-less feature views.
chhabrakadabra Sep 28, 2022
bf798e8
Table names should be smaller than 50 characters
chhabrakadabra Sep 28, 2022
eb3ab91
Optimize bigtable reads.
chhabrakadabra Sep 28, 2022
1383b7e
dynamodb: switch to `mock_dynamodb`
chhabrakadabra Sep 28, 2022
6986fa9
minor: rename `BigTable` to `Bigtable`
chhabrakadabra Sep 29, 2022
3cd76a8
Wrote some Bigtable documentation.
chhabrakadabra Sep 29, 2022
c7449cc
Bugfix: Deal with missing row keys.
chhabrakadabra Sep 30, 2022
f356312
Fix linting issues.
chhabrakadabra Sep 30, 2022
ff62c6b
Generate requirements files.
chhabrakadabra Sep 30, 2022
cce3602
Don't bother materializing created timestamp.
chhabrakadabra Sep 30, 2022
943ee3f
Remove `tensorflow-metadata`.
chhabrakadabra Sep 30, 2022
ab80b42
Minor fix to Bigtable documentation.
chhabrakadabra Oct 5, 2022
4755745
update roadmap docs
adchia Oct 5, 2022
c0f2d8e
Fix roadmap doc
adchia Oct 5, 2022
2d6bdac
Change link to point to roadmap page
adchia Oct 5, 2022
992c318
change order in roadmap
adchia Oct 5, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Optimize bigtable reads.
- Fetch all the rows in one bigtable fetch.
- Get only the columns that are necessary (using a column regex filter).

Signed-off-by: Abhin Chhabra <abhin.chhabra@shopify.com>
  • Loading branch information
chhabrakadabra committed Sep 30, 2022
commit eb3ab910f2d41758282dbfbe3363eb5b7e792d7b
65 changes: 36 additions & 29 deletions sdk/python/feast/infra/online_stores/bigtable.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import google
from google.cloud import bigtable
from google.cloud.bigtable import row_filters
from pydantic import StrictStr
from pydantic.typing import Literal

Expand Down Expand Up @@ -57,6 +58,8 @@ def online_read(
entity_keys: List[EntityKeyProto],
requested_features: Optional[List[str]] = None,
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
# Potential performance improvement opportunity described in
# https://github.com/feast-dev/feast/issues/3259
feature_view = table
bt_table_name = self._get_table_name(config=config, feature_view=feature_view)

Expand All @@ -72,36 +75,40 @@ def online_read(
for entity_key in entity_keys
]

batch_result: List[
Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]
] = []

# TODO: read all the rows in a single call instead of reading them sequentially
row_set = bigtable.row_set.RowSet()
for row_key in row_keys:
res = {}
# TODO: use filters to reduce the amount of data transfered and skip
# unnecessary columns.
row = bt_table.read_row(row_key)

if row is None:
batch_result.append((None, None))
continue

row_values = row.cells[self.feature_column_family]
# TODO: check if we need created_ts anywhere
row_values.pop(b"created_ts")
event_ts = datetime.fromisoformat(
row_values.pop(b"event_ts")[0].value.decode()
)
for feature_name, feature_values in row_values.items():
# We only want to retrieve the latest value for each feature
feature_value = feature_values[0]
val = ValueProto()
val.ParseFromString(feature_value.value)
res[feature_name.decode()] = val
row_set.add_row_key(row_key)
rows = bt_table.read_rows(
row_set=row_set,
filter_=(
row_filters.ColumnQualifierRegexFilter(
f"^({'|'.join(requested_features)}|event_ts)$".encode()
)
if requested_features
else None
),
)

return [self._process_bt_row(row) for row in rows]

def _process_bt_row(
self, row: bigtable.row.PartialRowData
) -> Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]:
res = {}

if row is None:
return (None, None)

row_values = row.cells[self.feature_column_family]
event_ts = datetime.fromisoformat(row_values.pop(b"event_ts")[0].value.decode())
for feature_name, feature_values in row_values.items():
# We only want to retrieve the latest value for each feature
feature_value = feature_values[0]
val = ValueProto()
val.ParseFromString(feature_value.value)
res[feature_name.decode()] = val

batch_result.append((event_ts, res))
return batch_result
return (event_ts, res)

@log_exceptions_and_usage(online_store="bigtable")
def online_write_batch(
Expand Down Expand Up @@ -277,7 +284,7 @@ def _get_table_name(config: RepoConfig, feature_view: FeatureView) -> str:
else DUMMY_ENTITY_NAME
)
BIGTABLE_TABLE_MAX_LENGTH = 50
ENTITIES_PART_MAX_LENGTH = 25
ENTITIES_PART_MAX_LENGTH = 24
# Bigtable limits table names to 50 characters. We'll limit the max size of of
# the `entities_part` and if that's not enough, we'll just hash the
# entities_part. The remaining length is dedicated to the project name. This
Expand Down