Skip to content

Two anchors cannot use the same source #492

Description

@xiaoyongzhu

Running the following code:


location_id = TypedKey(key_column="DOLocationID",
                       key_column_type=ValueType.INT32,
                       description="location id in NYC",
                       full_name="nyc_taxi.location_id")

batch_source = HdfsSource(name="nycTaxiBatchSource1",
                          path="wasbs://public@azurefeathrstorage.blob.core.windows.net/sample_data/green_tripdata_2020-04_with_index.csv",
                          event_timestamp_column="lpep_dropoff_datetime",
                          preprocessing=feathr_udf_day_calc,
                          timestamp_format="yyyy-MM-dd HH:mm:ss")

f_trip_distance = Feature(name="f_trip_distance",
                          feature_type=FLOAT, 
                          key=location_id,
                          transform="trip_distance")
f_trip_time_duration = Feature(name="f_trip_time_duration",
                               feature_type=INT32,
                               key=location_id,
                               transform="(to_unix_timestamp(lpep_dropoff_datetime) - to_unix_timestamp(lpep_pickup_datetime))/60")

features = [
    f_trip_distance,
    f_trip_time_duration,
    Feature(name="f_is_long_trip_distance",
            feature_type=BOOLEAN,
            key=location_id,
            transform="cast_float(trip_distance)>30"),
    Feature(name="f_day_of_week",
            feature_type=INT32,
            key=location_id,
            transform="dayofweek(lpep_dropoff_datetime)"),
]

request_anchor = FeatureAnchor(name="request_features",
                               source=batch_source,
                               features=features)


agg_features = [Feature(name="f_location_avg_fare",
                        key=location_id,
                        feature_type=FLOAT,
                        transform=WindowAggTransformation(agg_expr="cast_float(fare_amount)",
                                                          agg_func="AVG",
                                                          window="7d")),
                Feature(name="f_location_max_fare",
                        key=location_id,
                        feature_type=FLOAT,
                        transform=WindowAggTransformation(agg_expr="cast_float(fare_amount)",
                                                          agg_func="MAX",
                                                          window="7d")),
                Feature(name="f_location_total_fare_cents",
                        key=location_id,
                        feature_type=FLOAT,
                        transform=WindowAggTransformation(agg_expr="fare_amount_cents",
                                                          agg_func="SUM",
                                                          window="7d")),
                ]

agg_anchor = FeatureAnchor(name="aggregationFeatures",
                           source=batch_source,
                           features=agg_features)

# f_trip_time_distance = DerivedFeature(name="f_trip_time_distance",
#                                       feature_type=FLOAT,
#                                       input_features=[
#                                           f_trip_distance, f_trip_time_duration],
#                                       transform="f_trip_distance * f_trip_time_duration")

# f_trip_time_rounded = DerivedFeature(name="f_trip_time_rounded",
#                                      feature_type=INT32,
#                                      input_features=[f_trip_time_duration],
#                                      transform="f_trip_time_duration % 10")
client.build_features(anchor_list=[agg_anchor, request_anchor], 

It will yield this error:

Traceback (most recent call last):
  File "/Users/xiazhu/Desktop/FeathrDemoHDFSSink.py", line 312, in <module>
    client.build_features(anchor_list=[agg_anchor, request_anchor], 
  File "/Users/xiazhu/Documents/GitHub/feathr/feathr_project/feathr/client.py", line 227, in build_features
    raise RuntimeError(f"Source name should be unique but there are duplicate source names in your source "
RuntimeError: Source name should be unique but there are duplicate source names in your source definitions. Source name of <function feathr_udf_day_calc at 0x7fbff1856f70>

Two different anchors can use the same source, but seems there's a small bug in the code which prevents us to do so.

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Fields

No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions