Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Initial Template
Signed-off-by: Elliot Scribner <elliot.scribner@couchbase.com>
  • Loading branch information
ejscribner committed Feb 27, 2025
commit fa34d1501e432dcb1c6c194641eabae7f5ef42e3
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import numpy as np
import pandas as pd
import pyarrow
import pyarrow as pa
from couchbase_columnar.cluster import Cluster
from couchbase_columnar.common.result import BlockingQueryResult
Expand Down Expand Up @@ -265,15 +264,6 @@ def pull_all_from_table_or_query(
timestamp_field=timestamp_field,
)

@staticmethod
def offline_write_batch(
config: RepoConfig,
feature_view: FeatureView,
table: pyarrow.Table,
progress: Optional[Callable[[int], Any]],
):
raise NotImplementedError("Couchbase offline_write_batch not implemented yet.")


class CouchbaseColumnarRetrievalJob(RetrievalJob):
def __init__(
Expand Down Expand Up @@ -349,7 +339,7 @@ def persist(
):
assert isinstance(storage, SavedDatasetCouchbaseColumnarStorage)
table_name = f"{storage.couchbase_options._database}.{storage.couchbase_options._scope}.{offline_utils.get_temp_entity_table_name()}"
_df_to_columnar(self.to_df(), table_name, self._config)
df_to_columnar(self.to_df(), table_name, self._config.offline_store)


def _get_columnar_cluster(config: CouchbaseColumnarOfflineStoreConfig) -> Cluster:
Expand All @@ -375,7 +365,7 @@ def _execute_query(
)


def _df_to_columnar(df: pd.DataFrame, table_name: str, config: RepoConfig):
def df_to_columnar(df: pd.DataFrame, table_name: str, offline_store: CouchbaseColumnarOfflineStoreConfig):
df_copy = df.copy()
insert_values = df_copy.apply(
lambda row: {
Expand All @@ -392,15 +382,15 @@ def _df_to_columnar(df: pd.DataFrame, table_name: str, config: RepoConfig):
create_collection_query = f"CREATE COLLECTION {table_name} IF NOT EXISTS PRIMARY KEY(pk: UUID) AUTOGENERATED;"
insert_query = f"INSERT INTO {table_name} ({insert_values});"

_execute_query(config.offline_store, create_collection_query)
_execute_query(config.offline_store, insert_query)
_execute_query(offline_store, create_collection_query)
_execute_query(offline_store, insert_query)


def _upload_entity_df(
config: RepoConfig, entity_df: Union[pd.DataFrame, str], table_name: str
):
if isinstance(entity_df, pd.DataFrame):
_df_to_columnar(entity_df, table_name, config)
df_to_columnar(entity_df, table_name, config.offline_store)
elif isinstance(entity_df, str):
# If the entity_df is a string (SQL query), create a Columnar collection out of it
create_collection_query = f"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,10 @@ def get_table_column_names_and_types(
field_types_list = rows[0].get("field_types", [])
for field in field_types_list:
field_name = field.get("field-name", "unknown")
# if field_name == "pk":
# continue
field_type = field.get("field-type", "unknown")
# drop uuid fields to ensure schema matches dataframe
if field_type == "uuid":
continue
field_type = self._infer_composite_type(field)
field_type_pairs.append((field_name, field_type))
return field_type_pairs
Expand Down
106 changes: 106 additions & 0 deletions sdk/python/feast/templates/couchbase/bootstrap.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
import click
from couchbase_columnar.cluster import Cluster
from couchbase_columnar.common.errors import InvalidCredentialError, TimeoutError
from couchbase_columnar.common.options import QueryOptions
from couchbase_columnar.credential import Credential

from feast.file_utils import replace_str_in_file
from feast.infra.offline_stores.contrib.couchbase_offline_store.couchbase import (
CouchbaseColumnarOfflineStoreConfig,
df_to_columnar,
)


def bootstrap():
# Bootstrap() will automatically be called from the init_repo() during `feast init`

import pathlib
from datetime import datetime, timedelta

from feast.driver_test_data import create_driver_hourly_stats_df

repo_path = pathlib.Path(__file__).parent.absolute() / "feature_repo"
config_file = repo_path / "feature_store.yaml"

if click.confirm("Configure Couchbase Online Store?", default=True):
connection_string = click.prompt(
"Couchbase Connection String", default="couchbase://localhost"
)
user = click.prompt("Couchbase Username", default="Administrator")
password = click.prompt("Couchbase Password", hide_input=True)
bucket_name = click.prompt("Couchbase Bucket Name", default="feast")
kv_port = click.prompt("Couchbase KV Port", default=11210)

replace_str_in_file(
config_file, "COUCHBASE_CONNECTION_STRING", connection_string
)
replace_str_in_file(config_file, "COUCHBASE_USER", user)
replace_str_in_file(config_file, "COUCHBASE_PASSWORD", password)
replace_str_in_file(config_file, "COUCHBASE_BUCKET_NAME", bucket_name)
replace_str_in_file(config_file, "COUCHBASE_KV_PORT", str(kv_port))

if click.confirm(
"Configure Couchbase Columnar Offline Store? (Note: requires Couchbase Capella Columnar)",
default=True,
):
end_date = datetime.now().replace(microsecond=0, second=0, minute=0)
start_date = end_date - timedelta(days=15)

driver_entities = [1001, 1002, 1003, 1004, 1005]
driver_df = create_driver_hourly_stats_df(driver_entities, start_date, end_date)

columnar_connection_string = click.prompt("Columnar Connection String")
columnar_user = click.prompt("Columnar Username")
columnar_password = click.prompt("Columnar Password", hide_input=True)
columnar_timeout = click.prompt("Couchbase Columnar Timeout", default=480)

if click.confirm(
'Should I upload example data to Couchbase Capella Columnar (overwriting "Default.Default.feast_driver_hourly_stats" table)?',
default=True,
):
cred = Credential.from_username_and_password(
columnar_user, columnar_password
)
cluster = Cluster.create_instance(columnar_connection_string, cred)

table_name = "Default.Default.feast_driver_hourly_stats"
try:
cluster.execute_query(
f"DROP COLLECTION {table_name} IF EXISTS",
QueryOptions(timeout=timedelta(seconds=500)),
)
except TimeoutError:
# FIXME: temp workaround, timeouts occur in Columnar SDK even when the drop was successful
pass
except InvalidCredentialError:
print("Error: Invalid Cluster Credentials.")
return

offline_store = CouchbaseColumnarOfflineStoreConfig(
type="couchbase",
connection_string=columnar_connection_string,
user=columnar_user,
password=columnar_password,
timeout=columnar_timeout,
)

df_to_columnar(
df=driver_df, table_name=table_name, offline_store=offline_store
)

replace_str_in_file(
config_file,
"COUCHBASE_COLUMNAR_CONNECTION_STRING",
columnar_connection_string,
)
replace_str_in_file(config_file, "COUCHBASE_COLUMNAR_USER", columnar_user)
replace_str_in_file(
config_file, "COUCHBASE_COLUMNAR_PASSWORD", columnar_password
)
replace_str_in_file(
config_file, "COUCHBASE_COLUMNAR_TIMEOUT", str(columnar_timeout)
)


if __name__ == "__main__":
bootstrap()
134 changes: 134 additions & 0 deletions sdk/python/feast/templates/couchbase/feature_repo/example_repo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
# This is an example feature definition file

from datetime import timedelta

import pandas as pd

from feast import Entity, FeatureService, FeatureView, Field, PushSource, RequestSource
from feast.infra.offline_stores.contrib.couchbase_offline_store.couchbase_source import (
CouchbaseColumnarSource,
)
from feast.on_demand_feature_view import on_demand_feature_view
from feast.types import Float32, Float64, Int64

# Define an entity for the driver. You can think of an entity as a primary key used to
# fetch features.
driver = Entity(name="driver", join_keys=["driver_id"])

driver_stats_source = CouchbaseColumnarSource(
name="driver_hourly_stats_source",
query="SELECT * FROM foo_database.bar_scope.`feast_driver_hourly_stats`",
database="foo_database",
scope="bar_scope",
collection="feast_driver_hourly_stats",
timestamp_field="event_timestamp",
created_timestamp_column="created",
)

# Our parquet files contain sample data that includes a driver_id column, timestamps and
# three feature column. Here we define a Feature View that will allow us to serve this
# data to our model online.
driver_stats_fv = FeatureView(
# The unique name of this feature view. Two feature views in a single
# project cannot have the same name
name="driver_hourly_stats",
entities=[driver],
ttl=timedelta(days=1),
# The list of features defined below act as a schema to both define features
# for both materialization of features into a store, and are used as references
# during retrieval for building a training dataset or serving features
schema=[
Field(name="conv_rate", dtype=Float32),
Field(name="acc_rate", dtype=Float32),
Field(name="avg_daily_trips", dtype=Int64),
],
online=True,
source=driver_stats_source,
# Tags are user defined key/value pairs that are attached to each
# feature view
tags={"team": "driver_performance"},
)

# Define a request data source which encodes features / information only
# available at request time (e.g. part of the user initiated HTTP request)
input_request = RequestSource(
name="vals_to_add",
schema=[
Field(name="val_to_add", dtype=Int64),
Field(name="val_to_add_2", dtype=Int64),
],
)


# Define an on demand feature view which can generate new features based on
# existing feature views and RequestSource features
@on_demand_feature_view(
sources=[driver_stats_fv, input_request],
schema=[
Field(name="conv_rate_plus_val1", dtype=Float64),
Field(name="conv_rate_plus_val2", dtype=Float64),
],
)
def transformed_conv_rate(inputs: pd.DataFrame) -> pd.DataFrame:
df = pd.DataFrame()
df["conv_rate_plus_val1"] = inputs["conv_rate"] + inputs["val_to_add"]
df["conv_rate_plus_val2"] = inputs["conv_rate"] + inputs["val_to_add_2"]
return df


# This groups features into a model version
driver_activity_v1 = FeatureService(
name="driver_activity_v1",
features=[
driver_stats_fv[["conv_rate"]], # Sub-selects a feature from a feature view
transformed_conv_rate, # Selects all features from the feature view
],
)
driver_activity_v2 = FeatureService(
name="driver_activity_v2", features=[driver_stats_fv, transformed_conv_rate]
)

# Defines a way to push data (to be available offline, online or both) into Feast.
driver_stats_push_source = PushSource(
name="driver_stats_push_source",
batch_source=driver_stats_source,
)

# Defines a slightly modified version of the feature view from above, where the source
# has been changed to the push source. This allows fresh features to be directly pushed
# to the online store for this feature view.
driver_stats_fresh_fv = FeatureView(
name="driver_hourly_stats_fresh",
entities=[driver],
ttl=timedelta(days=1),
schema=[
Field(name="conv_rate", dtype=Float32),
Field(name="acc_rate", dtype=Float32),
Field(name="avg_daily_trips", dtype=Int64),
],
online=True,
source=driver_stats_push_source, # Changed from above
tags={"team": "driver_performance"},
)


# Define an on demand feature view which can generate new features based on
# existing feature views and RequestSource features
@on_demand_feature_view(
sources=[driver_stats_fresh_fv, input_request], # relies on fresh version of FV
schema=[
Field(name="conv_rate_plus_val1", dtype=Float64),
Field(name="conv_rate_plus_val2", dtype=Float64),
],
)
def transformed_conv_rate_fresh(inputs: pd.DataFrame) -> pd.DataFrame:
df = pd.DataFrame()
df["conv_rate_plus_val1"] = inputs["conv_rate"] + inputs["val_to_add"]
df["conv_rate_plus_val2"] = inputs["conv_rate"] + inputs["val_to_add_2"]
return df


driver_activity_v3 = FeatureService(
name="driver_activity_v3",
features=[driver_stats_fresh_fv, transformed_conv_rate_fresh],
)
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
project: my_project
registry: /path/to/registry.db
registry: data/registry.db
provider: local
online_store:
type: couchbase
Expand All @@ -8,4 +8,10 @@ online_store:
password: COUCHBASE_PASSWORD # Couchbase password from database access credentials
bucket_name: COUCHBASE_BUCKET_NAME # Couchbase bucket name, defaults to feast
kv_port: COUCHBASE_KV_PORT # Couchbase key-value port, defaults to 11210. Required if custom ports are used.
offline_store:
type: couchbase
connection_string: COUCHBASE_COLUMNAR_CONNECTION_STRING # Copied from 'Connect' page in Capella Columnar console, starts with couchbases:// user: COUCHBASE_USER # Couchbase username from database access credentials
user: COUCHBASE_COLUMNAR_USER # Couchbase cluster access name from access control
password: COUCHBASE_COLUMNAR_PASSWORD # Couchbase password from access control
timeout: COUCHBASE_COLUMNAR_TIMEOUT # Timeout in seconds for Columnar operations, optional
entity_key_serialization_version: 2
Loading