Skip to content

Commit 8f49bd5

Browse files
authored
Add new multi-level aggregation framework and bucketed count distinct aggregation. (#1159)
The bucketed aggregation works by aggregate data at lower level timestamp, e.g. 5 minutes bucket, then leverage the lower level bucket aggregated result to produce the higher level aggregation result such as 1 hour, 1 day, etc. The support levels are 5 minutes, 1 hour, 1 week, 1 month, 1 year.
1 parent c239d22 commit 8f49bd5

14 files changed

Lines changed: 896 additions & 20 deletions

feathr-impl/src/main/scala/com/linkedin/feathr/offline/anchored/WindowTimeUnit.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import java.time.Duration
1212
*/
1313
private[offline] object WindowTimeUnit extends Enumeration {
1414
type WindowTimeUnit = Value
15-
val D, H, M, S = Value
15+
val D, H, M, S, W, Y = Value
1616

1717
def parseWindowTime(timeWindowStr: String): Duration = {
1818
try {
@@ -22,6 +22,7 @@ private[offline] object WindowTimeUnit extends Enumeration {
2222
case H => Duration.ofHours(timeWindowStr.dropRight(1).trim.toLong)
2323
case M => Duration.ofMinutes(timeWindowStr.dropRight(1).trim.toLong)
2424
case S => Duration.ofSeconds(timeWindowStr.dropRight(1).trim.toLong)
25+
case _ => Duration.ofSeconds(0)
2526
}
2627
} catch {
2728
case ex: Exception =>

feathr-impl/src/main/scala/com/linkedin/feathr/offline/anchored/anchorExtractor/TimeWindowConfigurableAnchorExtractor.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ private[offline] class TimeWindowConfigurableAnchorExtractor(@JsonProperty("feat
6161
*/
6262
override def aggregateAsColumns(groupedDataFrame: DataFrame): Seq[(String, Column)] = {
6363
val columnPairs = aggFeatures.collect {
64-
case (featureName, featureDef) =>
64+
case (featureName, featureDef) if !featureDef.timeWindowFeatureDefinition.aggregationType.toString.startsWith("BUCKETED_") =>
6565
// for basic sliding window aggregation
6666
// no complex aggregation will be defined
6767
if (featureDef.swaFeature.lateralView.isDefined) {

feathr-impl/src/main/scala/com/linkedin/feathr/offline/config/TimeWindowFeatureDefinition.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ case class TimeWindowFeatureDefinition(
3535
`def`: String,
3636
aggregationType: AggregationType.Value,
3737
window: Duration,
38+
window_str: String,
3839
groupBy: Option[String],
3940
limit: Option[Int],
4041
filter: Option[String],
@@ -83,6 +84,11 @@ class TimeWindowFeatureDefinitionDeserializer extends JsonDeserializer[TimeWindo
8384
case _ =>
8485
throw new FeathrConfigException(ErrorLabel.FEATHR_USER_ERROR, s"'window' field is required in aggregation feature but is not provided $node.")
8586
},
87+
node.get("window") match {
88+
case field: TextNode => field.textValue()
89+
case _ =>
90+
throw new FeathrConfigException(ErrorLabel.FEATHR_USER_ERROR, s"'window' field is required in aggregation feature but is not provided $node.")
91+
},
8692
node.get("groupBy") match {
8793
case field: TextNode => Option(field.textValue())
8894
case _ => None

feathr-impl/src/main/scala/com/linkedin/feathr/offline/job/FeatureTransformation.scala

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -198,15 +198,26 @@ private[offline] object FeatureTransformation {
198198
df: DataFrame,
199199
requestedFeatureRefString: Seq[String],
200200
inputDateInterval: Option[DateTimeInterval],
201-
mvelContext: Option[FeathrExpressionExecutionContext]): TransformedResult = {
201+
mvelContext: Option[FeathrExpressionExecutionContext],
202+
keyColumnExprAndAlias: Seq[(String, String)] = Seq.empty[(String, String)]): TransformedResult = {
202203
val featureNamePrefix = getFeatureNamePrefix(featureAnchorWithSource.featureAnchor.extractor)
203204
val featureNamePrefixPairs = requestedFeatureRefString.map((_, featureNamePrefix))
204205

205206
// return the feature dataframe, the feature column format and the actual(inferred or user provided) feature types
206207
val featureTypeConfigs = featureAnchorWithSource.featureAnchor.featureTypeConfigs
207208
val transformedFeatureData: TransformedResult = featureAnchorWithSource.featureAnchor.extractor match {
208209
case transformer: TimeWindowConfigurableAnchorExtractor =>
209-
WindowAggregationEvaluator.transform(transformer, df, featureNamePrefixPairs, featureAnchorWithSource, inputDateInterval)
210+
val nonBucketedFeatures = transformer.features.map(_._2.aggregationType).filter(agg => agg == AggregationType.BUCKETED_COUNT_DISTINCT)
211+
if (!(nonBucketedFeatures.size != transformer.features || transformer.features.isEmpty)) {
212+
throw new FeathrFeatureTransformationException(
213+
ErrorLabel.FEATHR_USER_ERROR,
214+
s"All features ${transformer.features.keys.mkString(",")} should be either be all bucket or non-bucketed aggregation functions.")
215+
}
216+
if (nonBucketedFeatures.isEmpty) {
217+
WindowAggregationEvaluator.transform(transformer, df, featureNamePrefixPairs, featureAnchorWithSource, inputDateInterval)
218+
} else {
219+
BucketedWindowAggregationEvaluator.transform(transformer, df, featureNamePrefixPairs, featureAnchorWithSource, keyColumnExprAndAlias)
220+
}
210221
case transformer: SimpleAnchorExtractorSpark =>
211222
// transform from avro tensor to FDS format, avro tensor can be shared by online/offline
212223
// so that transformation logic can be written only once
@@ -350,7 +361,7 @@ private[offline] object FeatureTransformation {
350361
(prevTransformedResult, featureAnchorWithSource) => {
351362
val requestedFeatures = featureAnchorWithSource.selectedFeatures
352363
val transformedResultWithoutKey =
353-
transformSingleAnchorDF(featureAnchorWithSource, prevTransformedResult.df, requestedFeatures, inputDateInterval, mvelContext)
364+
transformSingleAnchorDF(featureAnchorWithSource, prevTransformedResult.df, requestedFeatures, inputDateInterval, mvelContext, outputJoinKeyColumnNames.zip(outputJoinKeyColumnNames))
354365
val namePrefixPairs = prevTransformedResult.featureNameAndPrefixPairs ++ transformedResultWithoutKey.featureNameAndPrefixPairs
355366
val columnNameToFeatureNameAndType = prevTransformedResult.inferredFeatureTypes ++ transformedResultWithoutKey.inferredFeatureTypes
356367
val featureColumnFormats = prevTransformedResult.featureColumnFormats ++ transformedResultWithoutKey.featureColumnFormats

feathr-impl/src/main/scala/com/linkedin/feathr/offline/join/DataFrameFeatureJoiner.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package com.linkedin.feathr.offline.join
22

33
import com.linkedin.feathr.common._
44
import com.linkedin.feathr.offline
5+
import com.linkedin.feathr.offline.anchored.keyExtractor.SQLSourceKeyExtractor
56
import com.linkedin.feathr.offline.client.DataFrameColName
67
import com.linkedin.feathr.offline.client.DataFrameColName.getFeatureAlias
78
import com.linkedin.feathr.offline.config.FeatureJoinConfig
@@ -72,7 +73,14 @@ private[offline] class DataFrameFeatureJoiner(logicalPlan: MultiStageJoinPlan, d
7273
(dfWithFeatureNames, featureAnchorWithSourcePair) => {
7374
val featureAnchorWithSource = featureAnchorWithSourcePair._1
7475
val requestedFeatures = featureAnchorWithSourcePair._2.toSeq
75-
val resultWithoutKey = transformSingleAnchorDF(featureAnchorWithSource, dfWithFeatureNames.df, requestedFeatures, None, mvelContext)
76+
val keyColumnNames = featureAnchorWithSourcePair._1.featureAnchor.sourceKeyExtractor.getKeyColumnNames()
77+
val keyColumnExprAndAlias = if (featureAnchorWithSourcePair._1.featureAnchor.sourceKeyExtractor.isInstanceOf[SQLSourceKeyExtractor]) {
78+
val keyExprs = featureAnchorWithSourcePair._1.featureAnchor.sourceKeyExtractor.asInstanceOf[SQLSourceKeyExtractor].keyExprs
79+
keyExprs.zip(keyColumnNames)
80+
} else {
81+
keyColumnNames.zip(keyColumnNames)
82+
}
83+
val resultWithoutKey = transformSingleAnchorDF(featureAnchorWithSource, dfWithFeatureNames.df, requestedFeatures, None, mvelContext, keyColumnExprAndAlias)
7684
val namePrefixPairs = dfWithFeatureNames.featureNameAndPrefixPairs ++ resultWithoutKey.featureNameAndPrefixPairs
7785
val inferredFeatureTypeConfigs = dfWithFeatureNames.inferredFeatureTypes ++ resultWithoutKey.inferredFeatureTypes
7886
val featureColumnFormats = resultWithoutKey.featureColumnFormats ++ dfWithFeatureNames.featureColumnFormats

feathr-impl/src/main/scala/com/linkedin/feathr/offline/logical/MultiStageJoinPlanner.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,14 @@ package com.linkedin.feathr.offline.logical
33
import com.linkedin.feathr.common
44
import com.linkedin.feathr.common.exception.{ErrorLabel, FeathrConfigException, FeathrException}
55
import com.linkedin.feathr.common.{FeatureDependencyGraph, JoiningFeatureParams}
6-
import com.linkedin.feathr.offline.{ErasedEntityTaggedFeature, FeatureName, JoinStage, KeyTagIdTuple}
76
import com.linkedin.feathr.offline.anchored.feature.FeatureAnchorWithSource
87
import com.linkedin.feathr.offline.derived.DerivedFeature
8+
import com.linkedin.feathr.offline.{ErasedEntityTaggedFeature, FeatureName, JoinStage, KeyTagIdTuple}
99
import org.apache.logging.log4j.LogManager
1010

11-
import scala.collection.mutable
1211
import scala.collection.JavaConverters._
1312
import scala.collection.convert.wrapAll._
13+
import scala.collection.mutable
1414

1515
/**
1616
* Multi-stage join planner is an implementation of Logical Planner in Feathr which analyzes the requested features,
@@ -84,7 +84,10 @@ private[offline] class MultiStageJoinPlanner extends LogicalPlanner[MultiStageJo
8484
val allPassthroughFeatures = featureGroups.allPassthroughFeatures
8585
val allDerivedFeatures = featureGroups.allDerivedFeatures
8686

87-
val windowAggFeaturesOrdered = requiredFeatures.filter(taggedFeature => allWindowAggFeatures.contains(taggedFeature.getFeatureName))
87+
val windowAggFeaturesOrdered = requiredFeatures.filter(taggedFeature =>
88+
allWindowAggFeatures.contains(taggedFeature.getFeatureName) &&
89+
!("PASSTHROUGH".equals(allWindowAggFeatures(taggedFeature.getFeatureName).source.path))
90+
)
8891

8992
// All required basic anchored features, basic anchored features are non-SWA features and non-passthrough features
9093
val requiredBasicAnchoredFeatures = requiredFeatures

feathr-impl/src/main/scala/com/linkedin/feathr/offline/swa/SlidingWindowFeatureUtils.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import com.linkedin.feathr.offline.transformation.FeatureColumnFormat.FeatureCol
1414
import com.linkedin.feathr.offline.util.FeaturizedDatasetUtils
1515
import com.linkedin.feathr.offline.util.datetime.{DateTimeInterval, OfflineDateTimeUtils}
1616
import com.linkedin.feathr.swj.{FactData, GroupBySpec, LateralViewParams, SlidingWindowFeature, WindowSpec}
17-
import com.linkedin.feathr.swj.aggregate.{AggregationType, AvgAggregate, AvgPoolingAggregate, CountAggregate, CountDistinctAggregate, LatestAggregate, MaxAggregate, MaxPoolingAggregate, MinAggregate, MinPoolingAggregate, SumAggregate}
17+
import com.linkedin.feathr.swj.aggregate.{AggregationType, AvgAggregate, AvgPoolingAggregate, CountAggregate, CountDistinctAggregate, DummyAggregate, LatestAggregate, MaxAggregate, MaxPoolingAggregate, MinAggregate, MinPoolingAggregate, SumAggregate}
1818
import org.apache.logging.log4j.LogManager
1919
import org.apache.spark.sql.DataFrame
2020
import org.apache.spark.sql.expressions.UserDefinedFunction
@@ -186,6 +186,7 @@ private[offline] object SlidingWindowFeatureUtils {
186186
case AggregationType.MAX_POOLING => new MaxPoolingAggregate(featureDef)
187187
case AggregationType.MIN_POOLING => new MinPoolingAggregate(featureDef)
188188
case AggregationType.AVG_POOLING => new AvgPoolingAggregate(featureDef)
189+
case AggregationType.BUCKETED_COUNT_DISTINCT => new DummyAggregate(featureDef)
189190
}
190191
swj.SlidingWindowFeature(featureName, aggregationSpec, windowSpec, filter, groupBySpec, lateralViewParams)
191192
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package com.linkedin.feathr.offline.transformation
2+
3+
import com.linkedin.feathr.common.FeatureTypeConfig
4+
import com.linkedin.feathr.offline.anchored.anchorExtractor.TimeWindowConfigurableAnchorExtractor
5+
import com.linkedin.feathr.offline.anchored.feature.FeatureAnchorWithSource
6+
import com.linkedin.feathr.offline.job.TransformedResult
7+
import com.linkedin.feathr.offline.swa.SlidingWindowFeatureUtils
8+
import org.apache.spark.sql.{DataFrame, SparkSession}
9+
10+
/**
11+
* Evaluator that transforms features using MultiLevelAggregationTransform,
12+
*/
13+
14+
private[offline] object BucketedWindowAggregationEvaluator {
15+
16+
/**
17+
* Transform and add feature column to input dataframe using TimeWindowConfigurableAnchorExtractor
18+
* @param transformer SimpleAnchorExtractorSpark implementation
19+
* @param inputDf input dataframe
20+
* @param requestedFeatureNameAndPrefix feature names and prefix pairs.
21+
* @param featureAnchorWithSource feature anchor with source that has the transformer
22+
* @return (dataframe with features, feature column format), feature column format can only be FeatureColumnFormatRAW for now
23+
*/
24+
def transform(
25+
transformer: TimeWindowConfigurableAnchorExtractor,
26+
df: DataFrame,
27+
requestedFeatureNameAndPrefix: Seq[(String, String)],
28+
featureAnchorWithSource: FeatureAnchorWithSource,
29+
keyColumnExprAndAlias: Seq[(String, String)]): TransformedResult = {
30+
val ss = SparkSession.builder().getOrCreate()
31+
val evaluator = new MultiLevelAggregationTransform(ss)
32+
val resultDf = transformer.features.foldLeft(df)(
33+
(inputDf, featureNameDefPair) => {
34+
val (featureName, featureDef) = featureNameDefPair
35+
val timeWindowParams = SlidingWindowFeatureUtils.getTimeWindowParam(featureAnchorWithSource.source)
36+
evaluator.applyAggregate(inputDf, featureDef.`def`,
37+
featureName,
38+
featureDef.window_str,
39+
keyColumnExprAndAlias,
40+
timeWindowParams.timestampColumn,
41+
timeWindowParams.timestampColumnFormat,
42+
featureDef.aggregationType.toString)
43+
}
44+
)
45+
TransformedResult(
46+
requestedFeatureNameAndPrefix,
47+
resultDf,
48+
requestedFeatureNameAndPrefix.map(c => (c._1, FeatureColumnFormat.RAW)).toMap,
49+
Map.empty[String, FeatureTypeConfig])
50+
}
51+
}

0 commit comments

Comments
 (0)