Skip to content

Commit 6ce855c

Browse files
authored
add bucketed_sum aggregation (#1168)
1 parent 5c02d26 commit 6ce855c

14 files changed

Lines changed: 116 additions & 101 deletions

feathr-impl/src/main/scala/com/linkedin/feathr/offline/anchored/WindowTimeUnit.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ private[offline] object WindowTimeUnit extends Enumeration {
2222
case H => Duration.ofHours(timeWindowStr.dropRight(1).trim.toLong)
2323
case M => Duration.ofMinutes(timeWindowStr.dropRight(1).trim.toLong)
2424
case S => Duration.ofSeconds(timeWindowStr.dropRight(1).trim.toLong)
25+
case Y => Duration.ofDays(365*timeWindowStr.dropRight(1).trim.toLong)
26+
case W => Duration.ofDays(7*timeWindowStr.dropRight(1).trim.toLong)
2527
case _ => Duration.ofSeconds(0)
2628
}
2729
} catch {

feathr-impl/src/main/scala/com/linkedin/feathr/offline/anchored/anchorExtractor/TimeWindowConfigurableAnchorExtractor.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import com.fasterxml.jackson.annotation.JsonProperty
44
import com.linkedin.feathr.common.exception.{ErrorLabel, FeathrConfigException}
55
import com.linkedin.feathr.offline.config.{ComplexAggregationFeature, TimeWindowFeatureDefinition}
66
import com.linkedin.feathr.offline.generation.aggregations._
7-
import com.linkedin.feathr.offline.swa.SlidingWindowFeatureUtils.convertFeathrDefToSwjDef
7+
import com.linkedin.feathr.offline.swa.SlidingWindowFeatureUtils.{convertFeathrDefToSwjDef, isBucketedFunction}
88
import com.linkedin.feathr.sparkcommon.SimpleAnchorExtractorSpark
99
import com.linkedin.feathr.swj.aggregate.AggregationType
1010
import com.typesafe.config.ConfigFactory
@@ -61,7 +61,7 @@ private[offline] class TimeWindowConfigurableAnchorExtractor(@JsonProperty("feat
6161
*/
6262
override def aggregateAsColumns(groupedDataFrame: DataFrame): Seq[(String, Column)] = {
6363
val columnPairs = aggFeatures.collect {
64-
case (featureName, featureDef) if !featureDef.timeWindowFeatureDefinition.aggregationType.toString.startsWith("BUCKETED_") =>
64+
case (featureName, featureDef) if !isBucketedFunction(featureDef.timeWindowFeatureDefinition.aggregationType) =>
6565
// for basic sliding window aggregation
6666
// no complex aggregation will be defined
6767
if (featureDef.swaFeature.lateralView.isDefined) {

feathr-impl/src/main/scala/com/linkedin/feathr/offline/generation/FeatureGenDefaultsSubstituter.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import com.linkedin.feathr.offline.{FeatureDataFrame, FeatureDataWithJoinKeys}
55
import com.linkedin.feathr.offline.client.DataFrameColName
66
import com.linkedin.feathr.offline.job.FeatureTransformation
77
import com.linkedin.feathr.offline.transformation.DataFrameDefaultValueSubstituter
8+
import com.linkedin.feathr.offline.util.FeathrUtils
89
import org.apache.spark.sql.SparkSession
910

1011
/**
@@ -43,7 +44,12 @@ private[offline] class FeatureGenDefaultsSubstituter() {
4344
withDefaultDF,
4445
featuresWithKeys.keys.map(FeatureTransformation.FEATURE_NAME_PREFIX + DataFrameColName.getEncodedFeatureRefStrForColName(_)).toSeq)
4546
// If there're multiple rows with same join key, keep one record for these duplicate records(same behavior as Feature join API)
46-
val withoutDupDF = withNullsDroppedDF.dropDuplicates(joinKeys)
47+
val dropDuplicate = FeathrUtils.getFeathrJobParam(ss, FeathrUtils.DROP_DUPLICATE_ROWS_FOR_KEYS_IN_FEATURE_GENERATION).toBoolean
48+
val withoutDupDF = if (dropDuplicate) {
49+
withNullsDroppedDF.dropDuplicates(joinKeys)
50+
} else {
51+
withNullsDroppedDF
52+
}
4753
// Return features processed in this iteration
4854
featuresWithKeys.map(f => (f._1, (FeatureDataFrame(withoutDupDF, inferredTypeConfig), joinKeys)))
4955
}

feathr-impl/src/main/scala/com/linkedin/feathr/offline/generation/FeatureGenKeyTagAnalyzer.scala

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,10 @@ import com.linkedin.feathr.common.{DateTimeParam, DateTimeUtils, JoiningFeatureP
55
import com.linkedin.feathr.offline.anchored.anchorExtractor.TimeWindowConfigurableAnchorExtractor
66
import com.linkedin.feathr.offline.job.FeatureGenSpec
77
import com.linkedin.feathr.offline.logical.FeatureGroups
8+
import com.linkedin.feathr.offline.util.FeathrUtils
9+
import org.apache.spark.sql.SparkSession
810

11+
import java.time.Duration
912
import scala.annotation.tailrec
1013
import scala.collection.convert.wrapAll._
1114

@@ -100,15 +103,19 @@ private[offline] object FeatureGenKeyTagAnalyzer extends FeatureGenKeyTagAnalyze
100103
featureGenSpec: FeatureGenSpec,
101104
featureGroups: FeatureGroups): Seq[JoiningFeatureParams] = {
102105
val refTime = featureGenSpec.dateTimeParam
106+
val ss = SparkSession.builder().getOrCreate()
107+
val expand_days = FeathrUtils.getFeathrJobParam(ss, FeathrUtils.EXPAND_DAYS_IN_FEATURE_GENERATION_CUTOFF_TIME).toInt
103108
taggedFeature.map(f => {
104109
val featureName = f.getFeatureName
105110
val featureAnchorWithSource = featureGroups.allAnchoredFeatures(featureName)
106111
val dateParam = featureAnchorWithSource.featureAnchor.extractor match {
107112
case extractor: TimeWindowConfigurableAnchorExtractor =>
108113
val aggFeature = extractor.features(featureName)
109-
val dateTimeParam = DateTimeParam.shiftStartTime(refTime, aggFeature.window)
110-
DateTimeUtils.toDateParam(dateTimeParam)
111-
case _ =>
114+
val dateTimeShifted = DateTimeParam.shiftStartTime(refTime, aggFeature.window)
115+
val dateTimeParamExpandStart = DateTimeParam.shiftStartTime(dateTimeShifted, Duration.ofDays(expand_days*2))
116+
val dateTimeParamExpandEnd = DateTimeParam.shiftEndTime(dateTimeParamExpandStart, Duration.ofDays(expand_days).negated())
117+
DateTimeUtils.toDateParam(dateTimeParamExpandEnd)
118+
case _ =>
112119
featureGenSpec.dateParam
113120
}
114121
new JoiningFeatureParams(f.getKeyTag, f.getFeatureName, Option(dateParam))

feathr-impl/src/main/scala/com/linkedin/feathr/offline/job/FeatureTransformation.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import com.linkedin.feathr.offline.join.DataFrameKeyCombiner
1515
import com.linkedin.feathr.offline.mvel.plugins.FeathrExpressionExecutionContext
1616
import com.linkedin.feathr.offline.source.accessor.{DataSourceAccessor, NonTimeBasedDataSourceAccessor, TimeBasedDataSourceAccessor}
1717
import com.linkedin.feathr.offline.swa.SlidingWindowFeatureUtils
18+
import com.linkedin.feathr.offline.swa.SlidingWindowFeatureUtils.isBucketedFunction
1819
import com.linkedin.feathr.offline.transformation.FeatureColumnFormat.FeatureColumnFormat
1920
import com.linkedin.feathr.offline.transformation._
2021
import com.linkedin.feathr.offline.util.FeaturizedDatasetUtils.tensorTypeToDataFrameSchema
@@ -207,7 +208,7 @@ private[offline] object FeatureTransformation {
207208
val featureTypeConfigs = featureAnchorWithSource.featureAnchor.featureTypeConfigs
208209
val transformedFeatureData: TransformedResult = featureAnchorWithSource.featureAnchor.extractor match {
209210
case transformer: TimeWindowConfigurableAnchorExtractor =>
210-
val nonBucketedFeatures = transformer.features.map(_._2.aggregationType).filter(agg => agg == AggregationType.BUCKETED_COUNT_DISTINCT)
211+
val nonBucketedFeatures = transformer.features.map(_._2.aggregationType).filter(agg => isBucketedFunction(agg))
211212
if (!(nonBucketedFeatures.size != transformer.features || transformer.features.isEmpty)) {
212213
throw new FeathrFeatureTransformationException(
213214
ErrorLabel.FEATHR_USER_ERROR,

feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/accessor/PathPartitionedTimeSeriesSourceAccessor.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,9 @@ private[offline] class PathPartitionedTimeSeriesSourceAccessor(
8181
throw new FeathrInputDataException(
8282
ErrorLabel.FEATHR_USER_ERROR,
8383
s"Trying to create TimeSeriesSource but no data " +
84-
s"is found to create source data. Source path: ${source.path}, source type: ${source.sourceType}")
84+
s"is found to create source data. Source path: ${source.path}, source type: ${source.sourceType}." +
85+
s"Try to get dataframe from interval ${timeIntervalOpt}, " +
86+
s"but source dataset time has interval ${datePartitions.map(_.dateInterval.toString).mkString(",")} ")
8587
}
8688
selectedPartitions
8789
}

feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/dataloader/BatchDataLoader.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ private[offline] class BatchDataLoader(ss: SparkSession, location: DataLocation,
110110
} else {
111111
// Throwing exception to avoid dataLoaderHandler hook exception from being suppressed.
112112
throw new FeathrInputDataException(ErrorLabel.FEATHR_USER_ERROR, s"Failed to load ${dataPath} after ${initialNumOfRetries} retries" +
113-
s" and retry time of ${retryWaitTime}ms.")
113+
s" and retry time of ${retryWaitTime}ms. Error message: ${e.getMessage}")
114114
}
115115
}
116116
}

feathr-impl/src/main/scala/com/linkedin/feathr/offline/swa/SlidingWindowFeatureUtils.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,14 @@ import com.linkedin.feathr.offline.transformation.FeatureColumnFormat
1313
import com.linkedin.feathr.offline.transformation.FeatureColumnFormat.FeatureColumnFormat
1414
import com.linkedin.feathr.offline.util.FeaturizedDatasetUtils
1515
import com.linkedin.feathr.offline.util.datetime.{DateTimeInterval, OfflineDateTimeUtils}
16+
import com.linkedin.feathr.swj.aggregate.AggregationType.AggregationType
1617
import com.linkedin.feathr.swj.{FactData, GroupBySpec, LateralViewParams, SlidingWindowFeature, WindowSpec}
1718
import com.linkedin.feathr.swj.aggregate.{AggregationType, AvgAggregate, AvgPoolingAggregate, CountAggregate, CountDistinctAggregate, DummyAggregate, LatestAggregate, MaxAggregate, MaxPoolingAggregate, MinAggregate, MinPoolingAggregate, SumAggregate}
1819
import org.apache.logging.log4j.LogManager
1920
import org.apache.spark.sql.DataFrame
2021
import org.apache.spark.sql.expressions.UserDefinedFunction
2122
import org.apache.spark.sql.functions._
2223
import org.apache.spark.util.sketch.BloomFilter
23-
2424
import java.text.SimpleDateFormat
2525
import java.time._
2626

@@ -41,6 +41,14 @@ private[offline] object SlidingWindowFeatureUtils {
4141
val DEFAULT_TIME_DELAY = "Default-time-delay"
4242
val TIMESTAMP_PARTITION_COLUMN = "__feathr_timestamp_column_from_partition"
4343

44+
/**
45+
* Check if an aggregation function is bucketed
46+
* @param aggregateFunction function type
47+
*/
48+
def isBucketedFunction(aggregateFunction: AggregationType): Boolean = {
49+
aggregateFunction.toString.startsWith("BUCKETED")
50+
}
51+
4452
/**
4553
* Check if an anchor contains window aggregate features.
4654
* Note: if an anchor contains window aggregate features, it will not contain other non-aggregate features.
@@ -187,6 +195,7 @@ private[offline] object SlidingWindowFeatureUtils {
187195
case AggregationType.MIN_POOLING => new MinPoolingAggregate(featureDef)
188196
case AggregationType.AVG_POOLING => new AvgPoolingAggregate(featureDef)
189197
case AggregationType.BUCKETED_COUNT_DISTINCT => new DummyAggregate(featureDef)
198+
case AggregationType.BUCKETED_SUM => new DummyAggregate(featureDef)
190199
}
191200
swj.SlidingWindowFeature(featureName, aggregationSpec, windowSpec, filter, groupBySpec, lateralViewParams)
192201
}

0 commit comments

Comments
 (0)