11package com .linkedin .feathr .offline .join .workflow
22
33import com .linkedin .feathr .common .exception .{ErrorLabel , FeathrFeatureJoinException }
4- import com .linkedin .feathr .common .{ErasedEntityTaggedFeature , FeatureTypeConfig }
4+ import com .linkedin .feathr .common .{ErasedEntityTaggedFeature , FeatureTypeConfig , FeatureTypes }
55import com .linkedin .feathr .offline
66import com .linkedin .feathr .offline .FeatureDataFrame
77import com .linkedin .feathr .offline .anchored .feature .FeatureAnchorWithSource
@@ -12,15 +12,16 @@ import com.linkedin.feathr.offline.job.KeyedTransformedResult
1212import com .linkedin .feathr .offline .join ._
1313import com .linkedin .feathr .offline .join .algorithms ._
1414import com .linkedin .feathr .offline .join .util .FrequentItemEstimatorFactory
15+ import com .linkedin .feathr .offline .logical .{LogicalPlan , MultiStageJoinPlan }
1516import com .linkedin .feathr .offline .mvel .plugins .FeathrExpressionExecutionContext
1617import com .linkedin .feathr .offline .source .accessor .DataSourceAccessor
1718import com .linkedin .feathr .offline .transformation .DataFrameDefaultValueSubstituter .substituteDefaults
1819import com .linkedin .feathr .offline .transformation .DataFrameExt ._
19- import com .linkedin .feathr .offline .util .{DataFrameUtils , FeathrUtils }
20+ import com .linkedin .feathr .offline .util .{DataFrameUtils , FeathrUtils , FeaturizedDatasetUtils }
2021import com .linkedin .feathr .offline .util .FeathrUtils .shouldCheckPoint
2122import org .apache .logging .log4j .LogManager
2223import org .apache .spark .sql .{DataFrame , SparkSession }
23- import org .apache .spark .sql .functions .lit
24+ import org .apache .spark .sql .functions .{ col , lit }
2425
2526/**
2627 * An abstract class provides default implementation of anchored feature join step
@@ -39,8 +40,64 @@ private[offline] class AnchoredFeatureJoinStep(
3940 extends FeatureJoinStep [AnchorJoinStepInput , DataFrameJoinStepOutput ] {
4041 @ transient lazy val log = LogManager .getLogger(getClass.getName)
4142
43+ /**
44+ * When the add.default.col.for.missing.data flag is turned, some features could be skipped because of missing data.
45+ * For such anchored features, we will add a feature column with a configured default value (if present in the feature anchor) or
46+ * a null value column.
47+ * @param sparkSession spark session
48+ * @param dataframe the original observation dataframe
49+ * @param logicalPlan logical plan generated using the join config
50+ * @param missingFeatures Map of missing feature names to the corresponding featureAnchorWithSource object.
51+ * @return Dataframe with the missing feature columns added
52+ */
53+ def substituteDefaultsForDataMissingFeatures (sparkSession : SparkSession , dataframe : DataFrame , logicalPlan : MultiStageJoinPlan ,
54+ missingFeatures : Map [String , FeatureAnchorWithSource ]): DataFrame = {
55+ // Create a map of feature name to corresponding defaults. If a feature does not have default value, it would be missing
56+ // from this map and we would add a default column of nulls for those features.
57+ val defaults = missingFeatures.flatMap(s => s._2.featureAnchor.defaults)
58+
59+ // Create a map of feature to their feature type if configured.
60+ val featureTypes = missingFeatures
61+ .map(x => Some (x._2.featureAnchor.featureTypeConfigs))
62+ .foldLeft(Map .empty[String , FeatureTypeConfig ])((a, b) => a ++ b.getOrElse(Map .empty[String , FeatureTypeConfig ]))
63+
64+ // We try to guess the column data type from the configured feature type. If feature type is not present, we will default to
65+ // default feathr behavior of returning a map column of string to float.
66+ val obsDfWithDefaultNullColumn = missingFeatures.keys.foldLeft(dataframe) { (observationDF, featureName) =>
67+ val featureColumnType = if (featureTypes.contains(featureName)) {
68+ featureTypes(featureName).getFeatureType match {
69+ case FeatureTypes .NUMERIC => " float"
70+ case FeatureTypes .BOOLEAN => " boolean"
71+ case FeatureTypes .DENSE_VECTOR => " array<float>"
72+ case FeatureTypes .CATEGORICAL => " string"
73+ case FeatureTypes .CATEGORICAL_SET => " array<string>"
74+ case FeatureTypes .TERM_VECTOR => " map<string,float>"
75+ case FeatureTypes .UNSPECIFIED => " map<string,float>"
76+ case _ => " map<string,float>"
77+ }
78+ } else { // feature type is not configured
79+ " map<string,float>"
80+ }
81+ observationDF.withColumn(DataFrameColName .genFeatureColumnName(FEATURE_NAME_PREFIX + featureName), lit(null ).cast(featureColumnType))
82+ }
83+
84+ val dataframeWithDefaults = substituteDefaults(obsDfWithDefaultNullColumn, missingFeatures.keys.toSeq, defaults, featureTypes,
85+ sparkSession, (s : String ) => s " ${FEATURE_NAME_PREFIX }$s" )
86+
87+ // We want to duplicate this column with the correct feathr supported feature name which is required for further processing.
88+ // For example, if feature name is abc and the corresponding key is x, the column name would be __feathr_feature_abc_x.
89+ // This column will be dropped after all the joins are done.
90+ missingFeatures.keys.foldLeft(dataframeWithDefaults) { (dataframeWithDefaults, featureName) =>
91+ val keyTags = logicalPlan.joinStages.filter(kv => kv._2.contains(featureName)).head._1
92+ val keyStr = keyTags.map(logicalPlan.keyTagIntsToStrings).toList
93+ dataframeWithDefaults.withColumn(DataFrameColName .genFeatureColumnName(FEATURE_NAME_PREFIX + featureName, Some (keyStr)),
94+ col(DataFrameColName .genFeatureColumnName(FEATURE_NAME_PREFIX + featureName)))
95+ }
96+ }
97+
4298 /**
4399 * Join anchored features to the observation passed as part of the input context.
100+ *
44101 * @param features Non-window aggregation, basic anchored features.
45102 * @param input input context for this step.
46103 * @param ctx environment variable that contains join job execution context.
@@ -49,10 +106,22 @@ private[offline] class AnchoredFeatureJoinStep(
49106 override def joinFeatures (features : Seq [ErasedEntityTaggedFeature ], input : AnchorJoinStepInput )(
50107 implicit ctx : JoinExecutionContext ): FeatureDataFrameOutput = {
51108 val AnchorJoinStepInput (observationDF, anchorDFMap) = input
109+ val shouldAddDefault = FeathrUtils .getFeathrJobParam(ctx.sparkSession.sparkContext.getConf,
110+ FeathrUtils .ADD_DEFAULT_COL_FOR_MISSING_DATA ).toBoolean
111+ val withMissingFeaturesSubstituted = if (shouldAddDefault) {
112+ val missingFeatures = features.map(x => x.getFeatureName).filter(x => {
113+ val containsFeature : Seq [Boolean ] = anchorDFMap.map(y => y._1.selectedFeatures.contains(x)).toSeq
114+ containsFeature.contains(false )
115+ })
116+ val missingAnchoredFeatures = ctx.featureGroups.allAnchoredFeatures.filter(featureName => missingFeatures.contains(featureName._1))
117+ substituteDefaultsForDataMissingFeatures(ctx.sparkSession, observationDF, ctx.logicalPlan,
118+ missingAnchoredFeatures)
119+ }else observationDF
120+
52121 val allAnchoredFeatures : Map [String , FeatureAnchorWithSource ] = ctx.featureGroups.allAnchoredFeatures
53122 val joinStages = ctx.logicalPlan.joinStages
54123 val joinOutput = joinStages
55- .foldLeft(FeatureDataFrame (observationDF , Map .empty[String , FeatureTypeConfig ]))((accFeatureDataFrame, joinStage) => {
124+ .foldLeft(FeatureDataFrame (withMissingFeaturesSubstituted , Map .empty[String , FeatureTypeConfig ]))((accFeatureDataFrame, joinStage) => {
56125 val (keyTags : Seq [Int ], featureNames : Seq [String ]) = joinStage
57126 val FeatureDataFrame (contextDF, inferredFeatureTypeMap) = accFeatureDataFrame
58127 // map feature name to its transformed dataframe and the join key of the dataframe
0 commit comments