From b08cfaba0956d4f6a4751c841917cfd3ccef351b Mon Sep 17 00:00:00 2001 From: Rakesh Kashyap Hanasoge Padmanabha Date: Tue, 31 Jan 2023 21:50:30 -0800 Subject: [PATCH 1/9] Skip Anchored Features --- .../feathr/offline/client/FeathrClient.scala | 36 +++++++++++++------ .../feathr/offline/job/FeatureJoinJob.scala | 10 ++++-- .../feathr/offline/util/AclCheckUtils.scala | 34 ++++++++++-------- 3 files changed, 51 insertions(+), 29 deletions(-) diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/client/FeathrClient.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/client/FeathrClient.scala index b289ba3c5..4824f2793 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/client/FeathrClient.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/client/FeathrClient.scala @@ -233,19 +233,33 @@ class FeathrClient private[offline] (sparkSession: SparkSession, featureGroups: * 2. If dateParams are specified for any feature, update the "FeatureAnchorWithSource" object to decorate it with the specified * dateParams. */ - val updatedFeatureGroups = featureGroupsUpdater.updateFeatureGroups(featureGroups, keyTaggedFeatures) - - val logicalPlan = logicalPlanner.getLogicalPlan(updatedFeatureGroups, keyTaggedFeatures) - - if (!sparkSession.sparkContext.isLocal) { + var updatedFeatureGroups = featureGroupsUpdater.updateFeatureGroups(featureGroups, keyTaggedFeatures) + + var logicalPlan = logicalPlanner.getLogicalPlan(updatedFeatureGroups, keyTaggedFeatures) + val shouldSkipFeature = FeathrUtils.getFeathrJobParam(sparkSession.sparkContext.getConf, FeathrUtils.SKIP_MISSING_FEATURE).toBoolean + val featureToPathsMap = (for { + requiredFeature <- logicalPlan.allRequiredFeatures + featureAnchorWithSource <- allAnchoredFeatures.get(requiredFeature.getFeatureName) + } yield (requiredFeature.getFeatureName -> featureAnchorWithSource.source.path)).toMap + if (sparkSession.sparkContext.isLocal) { + val featurePathsTest = AclCheckUtils.checkReadAuthorization(sparkSession, logicalPlan.allRequiredFeatures, allAnchoredFeatures) // Check read authorization for all required features - AclCheckUtils.checkReadAuthorization(sparkSession, logicalPlan.allRequiredFeatures, allAnchoredFeatures) match { + featurePathsTest._1 match { case Failure(exception) => - throw new FeathrInputDataException( - ErrorLabel.FEATHR_USER_ERROR, - "Unable to verify " + - "read authorization on feature data, it can be due to the following reasons: 1) input not exist, 2) no permission.", - exception) + if (shouldSkipFeature) { + val updatedAnchoredFeatures = allAnchoredFeatures.filter(y => { + !featurePathsTest._2.contains(featureToPathsMap(y._1)) + }) + updatedFeatureGroups = FeatureGroups(updatedAnchoredFeatures, updatedFeatureGroups.allDerivedFeatures, + updatedFeatureGroups.allWindowAggFeatures, updatedFeatureGroups.allPassthroughFeatures, updatedFeatureGroups.allSeqJoinFeatures) + logicalPlan = logicalPlanner.getLogicalPlan(updatedFeatureGroups, keyTaggedFeatures) + } else { + throw new FeathrInputDataException( + ErrorLabel.FEATHR_USER_ERROR, + "Unable to verify " + + "read authorization on feature data, it can be due to the following reasons: 1) input not exist, 2) no permission.", + exception) + } case Success(_) => log.debug("Checked read authorization on all feature data") } } diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/job/FeatureJoinJob.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/job/FeatureJoinJob.scala index 3f3f7be05..6e96b96a6 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/job/FeatureJoinJob.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/job/FeatureJoinJob.scala @@ -111,9 +111,13 @@ object FeatureJoinJob { targetDate=None, failOnMissing=failOnMissing, dataLoaderHandlers=dataLoaderHandlers) - AclCheckUtils.checkReadAuthorization(hadoopConf, pathList) match { - case Failure(e) => throw new FeathrInputDataException(ErrorLabel.FEATHR_USER_ERROR, s"No read permission on observation data $pathList.", e) - case Success(_) => log.debug("Checked read authorization on observation data of the following paths:\n" + pathList.mkString("\n")) + val testValidPaths = AclCheckUtils.checkReadAuthorization(hadoopConf, pathList) + if (testValidPaths.isEmpty) { + log.debug("Checked read authorization on observation data of the following paths:\n" + pathList.mkString("\n")) + } else { + val invalidPaths = testValidPaths.map(_._2) + val errorMsgs = testValidPaths.map(_._1) + throw new FeathrInputDataException(ErrorLabel.FEATHR_USER_ERROR, s"No read permission on observation data $invalidPaths with $errorMsgs") } }) } diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/util/AclCheckUtils.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/util/AclCheckUtils.scala index c5c2f3f24..723672339 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/util/AclCheckUtils.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/util/AclCheckUtils.scala @@ -24,19 +24,9 @@ private[offline] object AclCheckUtils { private val log = Logger.getLogger(getClass) // Check read authorization on all paths in a given list - def checkReadAuthorization(conf: Configuration, pathList: Seq[String]): Try[Unit] = { - val failureMessages = (for (path <- pathList) yield (path, checkReadAuthorization(conf, path))) collect { - case (path, Failure(e)) => path + " , " + e.getMessage - } - - if (failureMessages.isEmpty) { - Success(()) - } else { - Failure( - new RuntimeException( - "Can not verify read authorization on the following paths. This can be due to" + - " 1) the user does not have correct ACL, 2) path does not exist, 3) IO exception when reading the data :\n" + - failureMessages.mkString("\n"))) + def checkReadAuthorization(conf: Configuration, pathList: Seq[String]): Seq[(String, String)] = { + (for (path <- pathList) yield (path, checkReadAuthorization(conf, path))) collect { + case (path, Failure(e)) => (path + " , " + e.getMessage, path) } } @@ -69,7 +59,7 @@ private[offline] object AclCheckUtils { def checkReadAuthorization( ss: SparkSession, allRequiredFeatures: Seq[common.ErasedEntityTaggedFeature], - allAnchoredFeatures: Map[String, FeatureAnchorWithSource]): Try[Unit] = { + allAnchoredFeatures: Map[String, FeatureAnchorWithSource]): (Try[Unit], Seq[String]) = { val conf = ss.sparkContext.hadoopConfiguration val allRequiredPaths = for { @@ -77,7 +67,21 @@ private[offline] object AclCheckUtils { featureAnchorWithSource <- allAnchoredFeatures.get(requiredFeature.getFeatureName) } yield featureAnchorWithSource.source.path - AclCheckUtils.checkReadAuthorization(conf, allRequiredPaths.distinct) + val shouldSkipFeature = FeathrUtils.getFeathrJobParam(ss.sparkContext.getConf, FeathrUtils.SKIP_MISSING_FEATURE).toBoolean + val invalidPaths = AclCheckUtils.checkReadAuthorization(conf, allRequiredPaths.distinct) + if (invalidPaths.isEmpty) { + (Success(()), invalidPaths.map(_._2)) + } else { + if (shouldSkipFeature) { + (Failure( + new RuntimeException( + "Can not verify read authorization on the following paths. This can be due to" + + " 1) the user does not have correct ACL, 2) path does not exist, 3) IO exception when reading the data :\n" + + invalidPaths.map(_._1).mkString("\n"))), invalidPaths.map(_._2)) + } else { + (Success(()), invalidPaths.map(_._2)) + } + } } // Check write authorization on a path string, i.e., check write and execute authorization on its parent path From 72d343965e014a811842c089cfd1a9c4ec18c399 Mon Sep 17 00:00:00 2001 From: Rakesh Kashyap Hanasoge Padmanabha Date: Tue, 31 Jan 2023 22:30:31 -0800 Subject: [PATCH 2/9] Added unit test --- .../feathr/offline/client/FeathrClient.scala | 18 +++--- .../config/sources/FeatureGroupsUpdater.scala | 18 +++++- .../feathr/offline/job/FeatureJoinJob.scala | 8 +-- .../sources/TestFeatureGroupsUpdater.scala | 59 ++++++++++++++++++- 4 files changed, 86 insertions(+), 17 deletions(-) diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/client/FeathrClient.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/client/FeathrClient.scala index 4824f2793..cd5d16591 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/client/FeathrClient.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/client/FeathrClient.scala @@ -2,12 +2,13 @@ package com.linkedin.feathr.offline.client import com.linkedin.feathr.common.exception._ import com.linkedin.feathr.common.{FeatureInfo, Header, InternalApi, JoiningFeatureParams, RichConfig, TaggedFeatureName} +import com.linkedin.feathr.offline.anchored.feature.FeatureAnchorWithSource import com.linkedin.feathr.offline.config.sources.FeatureGroupsUpdater import com.linkedin.feathr.offline.config.{FeathrConfig, FeathrConfigLoader, FeatureGroupsGenerator, FeatureJoinConfig} import com.linkedin.feathr.offline.generation.{DataFrameFeatureGenerator, FeatureGenKeyTagAnalyzer, StreamingFeatureGenerator} import com.linkedin.feathr.offline.job._ import com.linkedin.feathr.offline.join.DataFrameFeatureJoiner -import com.linkedin.feathr.offline.logical.{FeatureGroups, MultiStageJoinPlanner} +import com.linkedin.feathr.offline.logical.{FeatureGroups, MultiStageJoinPlan, MultiStageJoinPlanner} import com.linkedin.feathr.offline.mvel.plugins.FeathrExpressionExecutionContext import com.linkedin.feathr.offline.source.DataSource import com.linkedin.feathr.offline.source.accessor.DataPathHandler @@ -233,7 +234,7 @@ class FeathrClient private[offline] (sparkSession: SparkSession, featureGroups: * 2. If dateParams are specified for any feature, update the "FeatureAnchorWithSource" object to decorate it with the specified * dateParams. */ - var updatedFeatureGroups = featureGroupsUpdater.updateFeatureGroups(featureGroups, keyTaggedFeatures) + val updatedFeatureGroups = featureGroupsUpdater.updateFeatureGroups(featureGroups, keyTaggedFeatures) var logicalPlan = logicalPlanner.getLogicalPlan(updatedFeatureGroups, keyTaggedFeatures) val shouldSkipFeature = FeathrUtils.getFeathrJobParam(sparkSession.sparkContext.getConf, FeathrUtils.SKIP_MISSING_FEATURE).toBoolean @@ -242,17 +243,14 @@ class FeathrClient private[offline] (sparkSession: SparkSession, featureGroups: featureAnchorWithSource <- allAnchoredFeatures.get(requiredFeature.getFeatureName) } yield (requiredFeature.getFeatureName -> featureAnchorWithSource.source.path)).toMap if (sparkSession.sparkContext.isLocal) { - val featurePathsTest = AclCheckUtils.checkReadAuthorization(sparkSession, logicalPlan.allRequiredFeatures, allAnchoredFeatures) // Check read authorization for all required features + val featurePathsTest = AclCheckUtils.checkReadAuthorization(sparkSession, logicalPlan.allRequiredFeatures, allAnchoredFeatures) featurePathsTest._1 match { case Failure(exception) => - if (shouldSkipFeature) { - val updatedAnchoredFeatures = allAnchoredFeatures.filter(y => { - !featurePathsTest._2.contains(featureToPathsMap(y._1)) - }) - updatedFeatureGroups = FeatureGroups(updatedAnchoredFeatures, updatedFeatureGroups.allDerivedFeatures, - updatedFeatureGroups.allWindowAggFeatures, updatedFeatureGroups.allPassthroughFeatures, updatedFeatureGroups.allSeqJoinFeatures) - logicalPlan = logicalPlanner.getLogicalPlan(updatedFeatureGroups, keyTaggedFeatures) + if (shouldSkipFeature) { // If skip feature, remove the corresponding anchored feature from the feature group and produce a new logical plan + val featureGroupsWithoutInvalidFeatures = FeatureGroupsUpdater() + .getUpdatedFeatureGroupsWithoutInvalidPaths(featureToPathsMap, updatedFeatureGroups, featurePathsTest._2) + logicalPlanner.getLogicalPlan(featureGroupsWithoutInvalidFeatures, keyTaggedFeatures) } else { throw new FeathrInputDataException( ErrorLabel.FEATHR_USER_ERROR, diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/config/sources/FeatureGroupsUpdater.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/config/sources/FeatureGroupsUpdater.scala index 7a182070f..8c174eda0 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/config/sources/FeatureGroupsUpdater.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/config/sources/FeatureGroupsUpdater.scala @@ -2,7 +2,8 @@ package com.linkedin.feathr.offline.config.sources import com.linkedin.feathr.common.JoiningFeatureParams import com.linkedin.feathr.offline.anchored.anchorExtractor.TimeWindowConfigurableAnchorExtractor -import com.linkedin.feathr.offline.logical.FeatureGroups +import com.linkedin.feathr.offline.anchored.feature.FeatureAnchorWithSource +import com.linkedin.feathr.offline.logical.{FeatureGroups, MultiStageJoinPlan} /** * Feature groups will be generated using only the feature def config using the [[com.linkedin.feathr.offline.config.FeatureGroupsGenerator]] @@ -70,6 +71,21 @@ private[offline] class FeatureGroupsUpdater { featureGroups.allSeqJoinFeatures) } + /** + * + * @param featureToPathsMap + * @param featureGroups + * @param paths + * @return + */ + def getUpdatedFeatureGroupsWithoutInvalidPaths(featureToPathsMap: Map[String, String], featureGroups: FeatureGroups, invalidPaths: Seq[String]): FeatureGroups = { + val updatedAnchoredFeatures = featureGroups.allAnchoredFeatures.filter(featureNameToAnchoredObject => { + !invalidPaths.contains(featureToPathsMap(featureNameToAnchoredObject._1)) + }) + FeatureGroups(updatedAnchoredFeatures, featureGroups.allDerivedFeatures, + featureGroups.allWindowAggFeatures, featureGroups.allPassthroughFeatures, featureGroups.allSeqJoinFeatures) + } + } /** diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/job/FeatureJoinJob.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/job/FeatureJoinJob.scala index 6e96b96a6..009a3cb4a 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/job/FeatureJoinJob.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/job/FeatureJoinJob.scala @@ -111,12 +111,12 @@ object FeatureJoinJob { targetDate=None, failOnMissing=failOnMissing, dataLoaderHandlers=dataLoaderHandlers) - val testValidPaths = AclCheckUtils.checkReadAuthorization(hadoopConf, pathList) - if (testValidPaths.isEmpty) { + val invalidPathsAndErrors = AclCheckUtils.checkReadAuthorization(hadoopConf, pathList) + if (invalidPathsAndErrors.isEmpty) { log.debug("Checked read authorization on observation data of the following paths:\n" + pathList.mkString("\n")) } else { - val invalidPaths = testValidPaths.map(_._2) - val errorMsgs = testValidPaths.map(_._1) + val invalidPaths = invalidPathsAndErrors.map(_._2) + val errorMsgs = invalidPathsAndErrors.map(_._1) throw new FeathrInputDataException(ErrorLabel.FEATHR_USER_ERROR, s"No read permission on observation data $invalidPaths with $errorMsgs") } }) diff --git a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/config/sources/TestFeatureGroupsUpdater.scala b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/config/sources/TestFeatureGroupsUpdater.scala index afe2a5722..efe15a39c 100644 --- a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/config/sources/TestFeatureGroupsUpdater.scala +++ b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/config/sources/TestFeatureGroupsUpdater.scala @@ -2,8 +2,9 @@ package com.linkedin.feathr.offline.config.sources import com.linkedin.feathr.common.{DateParam, JoiningFeatureParams} import com.linkedin.feathr.offline.TestFeathr -import com.linkedin.feathr.offline.config.{FeatureGroupsGenerator, FeathrConfigLoader} -import org.testng.Assert.assertEquals +import com.linkedin.feathr.offline.config.{FeathrConfigLoader, FeatureGroupsGenerator} +import com.linkedin.feathr.offline.logical.FeatureGroups +import org.testng.Assert.{assertEquals, assertTrue} import org.testng.annotations.Test class TestFeatureGroupsUpdater extends TestFeathr { @@ -111,6 +112,60 @@ class TestFeatureGroupsUpdater extends TestFeathr { assertEquals(updatedFeatureGroups.allAnchoredFeatures("sampleTimeBasedFeature").dateParam, Some(DateParam(Some("20200707"), Some("20200708")))) } + /** + * This tests the updation of feature groups when an invalid path is found with the skip feature flag turned on. + */ + @Test + def testUpdateFeaturesWithInvalidPaths(): Unit = { + val featureConfig = + """ + |anchors: { + | sample1: { + | source: source1 + | key: "x" + | features: { + | sampleTimeBasedFeature1: { + | def: count + | type: NUMERIC + | } + | } + | } + | sample2: { + | source: source2 + | key: "x" + | features: { + | sampleTimeBasedFeature2: { + | def: count + | type: NUMERIC + | } + | } + | } + | } + |sources: { + | source1:{ + | type: "HDFS" + | location: { + | path: "/feathr/part_a/daily" + | } + | } + | source2:{ + | type: "HDFS" + | location: { + | path: "/invalid/path" + | } + | } + |} + """.stripMargin + + val featureDefConfig = _feathrConfigLoader.load(featureConfig) + val featureGroups = FeatureGroupsGenerator(Seq(featureDefConfig)).getFeatureGroups() + val featureToPathsMap = Map("sampleTimeBasedFeature1" -> "/feathr/part_a/daily", "sampleTimeBasedFeature2" -> "/invalid/path") + val invalidPaths = Seq("/invalid/path") + val updatedFeatureGroups = FeatureGroupsUpdater().getUpdatedFeatureGroupsWithoutInvalidPaths(featureToPathsMap, featureGroups, invalidPaths) + assertTrue(updatedFeatureGroups.allAnchoredFeatures.size == 1) + assertTrue(updatedFeatureGroups.allAnchoredFeatures.contains("sampleTimeBasedFeature1")) + } + /** * This tests that the feature groups are not updated if only overrideTimeDelay or featureAlias is specified. */ From ad9b58e22ceda19ddbcfd87db88727d2e4055c95 Mon Sep 17 00:00:00 2001 From: Rakesh Kashyap Hanasoge Padmanabha Date: Wed, 1 Feb 2023 10:01:52 -0800 Subject: [PATCH 3/9] Extend it to derived features --- .../feathr/offline/client/FeathrClient.scala | 2 +- .../config/sources/FeatureGroupsUpdater.scala | 20 +++++-- .../sources/TestFeatureGroupsUpdater.scala | 60 +++++++++++++++++++ 3 files changed, 76 insertions(+), 6 deletions(-) diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/client/FeathrClient.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/client/FeathrClient.scala index cd5d16591..c04691930 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/client/FeathrClient.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/client/FeathrClient.scala @@ -242,7 +242,7 @@ class FeathrClient private[offline] (sparkSession: SparkSession, featureGroups: requiredFeature <- logicalPlan.allRequiredFeatures featureAnchorWithSource <- allAnchoredFeatures.get(requiredFeature.getFeatureName) } yield (requiredFeature.getFeatureName -> featureAnchorWithSource.source.path)).toMap - if (sparkSession.sparkContext.isLocal) { + if (!sparkSession.sparkContext.isLocal) { // Check read authorization for all required features val featurePathsTest = AclCheckUtils.checkReadAuthorization(sparkSession, logicalPlan.allRequiredFeatures, allAnchoredFeatures) featurePathsTest._1 match { diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/config/sources/FeatureGroupsUpdater.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/config/sources/FeatureGroupsUpdater.scala index 8c174eda0..985bb5b5f 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/config/sources/FeatureGroupsUpdater.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/config/sources/FeatureGroupsUpdater.scala @@ -72,17 +72,27 @@ private[offline] class FeatureGroupsUpdater { } /** - * - * @param featureToPathsMap - * @param featureGroups - * @param paths + * Exclude anchored and derived features features from the join stage if they do not have a valid path. + * @param featureToPathsMap Map of anchored feature names to their paths + * @param featureGroups All feature groups + * @param invalidPaths List of all invalid paths * @return */ def getUpdatedFeatureGroupsWithoutInvalidPaths(featureToPathsMap: Map[String, String], featureGroups: FeatureGroups, invalidPaths: Seq[String]): FeatureGroups = { val updatedAnchoredFeatures = featureGroups.allAnchoredFeatures.filter(featureNameToAnchoredObject => { !invalidPaths.contains(featureToPathsMap(featureNameToAnchoredObject._1)) }) - FeatureGroups(updatedAnchoredFeatures, featureGroups.allDerivedFeatures, + + // Iterate over the derived features and remove the derived features which contains these anchored features. + val updatedDerivedFeatures = featureGroups.allDerivedFeatures.filter(derivedFeature => { + // Find the constituent anchored features for every derived feature + val allAnchoredFeaturesInDerived = derivedFeature._2.consumedFeatureNames.map(_.getFeatureName) + // Check if any of the features does not have a valid path + val containsFeature: Seq[Boolean] = allAnchoredFeaturesInDerived + .map(featureName => !invalidPaths.contains(featureToPathsMap(featureName))) + !containsFeature.contains(false) + }) + FeatureGroups(updatedAnchoredFeatures, updatedDerivedFeatures, featureGroups.allWindowAggFeatures, featureGroups.allPassthroughFeatures, featureGroups.allSeqJoinFeatures) } diff --git a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/config/sources/TestFeatureGroupsUpdater.scala b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/config/sources/TestFeatureGroupsUpdater.scala index efe15a39c..cec0ed061 100644 --- a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/config/sources/TestFeatureGroupsUpdater.scala +++ b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/config/sources/TestFeatureGroupsUpdater.scala @@ -166,6 +166,66 @@ class TestFeatureGroupsUpdater extends TestFeathr { assertTrue(updatedFeatureGroups.allAnchoredFeatures.contains("sampleTimeBasedFeature1")) } + /** + * This tests the updation of feature groups when an invalid path is found with the skip feature flag turned on. + */ + @Test + def testUpdateFeaturesWithInvalidPathsWithDerived(): Unit = { + val featureConfig = + """ + |anchors: { + | sample1: { + | source: source1 + | key: "x" + | features: { + | sampleTimeBasedFeature1: { + | def: count + | type: NUMERIC + | } + | } + | } + | sample2: { + | source: source2 + | key: "x" + | features: { + | sampleTimeBasedFeature2: { + | def: count + | type: NUMERIC + | } + | } + | } + | } + | + |derivations: { + | d1: "sampleTimeBasedFeature1 + sampleTimeBasedFeature2" + |} + | + |sources: { + | source1:{ + | type: "HDFS" + | location: { + | path: "/feathr/part_a/daily" + | } + | } + | source2:{ + | type: "HDFS" + | location: { + | path: "/invalid/path" + | } + | } + |} + """.stripMargin + + val featureDefConfig = _feathrConfigLoader.load(featureConfig) + val featureGroups = FeatureGroupsGenerator(Seq(featureDefConfig)).getFeatureGroups() + val featureToPathsMap = Map("sampleTimeBasedFeature1" -> "/feathr/part_a/daily", "sampleTimeBasedFeature2" -> "/invalid/path") + val invalidPaths = Seq("/invalid/path") + val updatedFeatureGroups = FeatureGroupsUpdater().getUpdatedFeatureGroupsWithoutInvalidPaths(featureToPathsMap, featureGroups, invalidPaths) + assertTrue(updatedFeatureGroups.allAnchoredFeatures.size == 1) + assertTrue(updatedFeatureGroups.allAnchoredFeatures.contains("sampleTimeBasedFeature1")) + assertTrue(updatedFeatureGroups.allDerivedFeatures.isEmpty) + } + /** * This tests that the feature groups are not updated if only overrideTimeDelay or featureAlias is specified. */ From f5ac2c76e7fd64e128770720fecd2dee2a7862bd Mon Sep 17 00:00:00 2001 From: Rakesh Kashyap Hanasoge Padmanabha Date: Thu, 2 Feb 2023 10:10:01 -0800 Subject: [PATCH 4/9] Skip missing features in feature gen --- .../SequentialJoinAsDerivation.scala | 4 +- .../DataFrameFeatureGenerator.scala | 30 +++- .../StreamingFeatureGenerator.scala | 4 +- .../feathr/offline/job/FeatureGenJob.scala | 9 +- .../feathr/offline/job/FeatureJoinJob.scala | 4 +- .../offline/join/DataFrameFeatureJoiner.scala | 5 +- .../NonTimeBasedDataSourceAccessor.scala | 25 +-- .../AnchorToDataSourceMapper.scala | 59 ++++--- .../feathr/offline/util/FeathrUtils.scala | 2 +- .../feathr/offline/FeatureGenIntegTest.scala | 146 +++++++++++++++++- 10 files changed, 236 insertions(+), 52 deletions(-) diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/derived/strategies/SequentialJoinAsDerivation.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/derived/strategies/SequentialJoinAsDerivation.scala index 3be7b6d61..30995cfa8 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/derived/strategies/SequentialJoinAsDerivation.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/derived/strategies/SequentialJoinAsDerivation.scala @@ -217,9 +217,11 @@ private[offline] class SequentialJoinAsDerivation(ss: SparkSession, val ss = SparkSession.builder().getOrCreate() val failOnMissingPartition = FeathrUtils.getFeathrJobParam(ss, FeathrUtils.FAIL_ON_MISSING_PARTITION).toBoolean val anchorDFMap1 = anchorToDataSourceMapper.getBasicAnchorDFMapForJoin(ss, Seq(featureAnchor), failOnMissingPartition) + val updatedAnchorDFMap = anchorDFMap1.filter(anchorEntry => anchorEntry._2.isDefined) + .map(anchorEntry => anchorEntry._1 -> anchorEntry._2.get) val featureInfo = FeatureTransformation.directCalculate( anchorGroup: AnchorFeatureGroups, - anchorDFMap1(featureAnchor), + updatedAnchorDFMap(featureAnchor), featureAnchor.featureAnchor.sourceKeyExtractor, None, None, diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/generation/DataFrameFeatureGenerator.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/generation/DataFrameFeatureGenerator.scala index 57f4def55..e11b2864b 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/generation/DataFrameFeatureGenerator.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/generation/DataFrameFeatureGenerator.scala @@ -4,12 +4,13 @@ import com.linkedin.feathr.common.exception.{ErrorLabel, FeathrException} import com.linkedin.feathr.common.{Header, JoiningFeatureParams, TaggedFeatureName} import com.linkedin.feathr.offline import com.linkedin.feathr.offline.anchored.feature.FeatureAnchorWithSource.{getDefaultValues, getFeatureTypes} +import com.linkedin.feathr.offline.config.sources.FeatureGroupsUpdater import com.linkedin.feathr.offline.derived.functions.SeqJoinDerivationFunction import com.linkedin.feathr.offline.derived.strategies.{DerivationStrategies, RowBasedDerivation, SequentialJoinDerivationStrategy, SparkUdfDerivation, SqlDerivationSpark} import com.linkedin.feathr.offline.derived.{DerivedFeature, DerivedFeatureEvaluator} import com.linkedin.feathr.offline.evaluator.DerivedFeatureGenStage import com.linkedin.feathr.offline.job.{FeatureGenSpec, FeatureTransformation} -import com.linkedin.feathr.offline.logical.{FeatureGroups, MultiStageJoinPlan} +import com.linkedin.feathr.offline.logical.{FeatureGroups, MultiStageJoinPlan, MultiStageJoinPlanner} import com.linkedin.feathr.offline.mvel.plugins.FeathrExpressionExecutionContext import com.linkedin.feathr.offline.source.accessor.DataPathHandler import com.linkedin.feathr.offline.source.dataloader.DataLoaderHandler @@ -63,6 +64,9 @@ private[offline] class DataFrameFeatureGenerator(logicalPlan: MultiStageJoinPlan val requiredRegularFeatureAnchorsWithTime = allRequiredFeatureAnchorWithSourceAndTime.values.toSeq val anchorDFRDDMap = anchorToDataFrameMapper.getAnchorDFMapForGen(ss, requiredRegularFeatureAnchorsWithTime, Some(incrementalAggContext), failOnMissingPartition) + val updatedAnchorDFRDDMap = anchorDFRDDMap.filter(anchorEntry => anchorEntry._2.isDefined).map(anchorEntry => anchorEntry._1 -> anchorEntry._2.get) + if(updatedAnchorDFRDDMap.isEmpty) return Map() + // 3. Load user specified default values and feature types, if any. val featureToDefaultValueMap = getDefaultValues(allRequiredFeatureAnchorWithSourceAndTime.values.toSeq) val featureToTypeConfigMap = getFeatureTypes(allRequiredFeatureAnchorWithSourceAndTime.values.toSeq) @@ -72,7 +76,7 @@ private[offline] class DataFrameFeatureGenerator(logicalPlan: MultiStageJoinPlan case (_: Seq[Int], featureNames: Seq[String]) => val (anchoredFeatureNamesThisStage, _) = featureNames.partition(featureGroups.allAnchoredFeatures.contains) val anchoredFeaturesThisStage = featureNames.filter(featureGroups.allAnchoredFeatures.contains).map(allRequiredFeatureAnchorWithSourceAndTime).distinct - val anchoredDFThisStage = anchorDFRDDMap.filterKeys(anchoredFeaturesThisStage.toSet) + val anchoredDFThisStage = updatedAnchorDFRDDMap.filterKeys(anchoredFeaturesThisStage.toSet) FeatureTransformation .transformFeatures(anchoredDFThisStage, anchoredFeatureNamesThisStage, None, Some(incrementalAggContext), mvelContext) @@ -80,7 +84,23 @@ private[offline] class DataFrameFeatureGenerator(logicalPlan: MultiStageJoinPlan }.toMap // 5. Group features based on grouping specified in output processors - val groupedAnchoredFeatures = featureGenFeatureGrouper.group(allStageFeatures, featureGenSpec.getOutputProcessorConfigs, featureGroups.allDerivedFeatures) + val updatedAllStageFeatures = allStageFeatures.filter(keyValue => !keyValue._2._1.df.isEmpty) + val updatedAnchoredFeatures = featureGroups.allAnchoredFeatures.filter(featureRow => updatedAllStageFeatures.contains(featureRow._1)) + // Iterate over the derived features and remove the derived features which contains these anchored features. + val updatedDerivedFeatures = featureGroups.allDerivedFeatures.filter(derivedFeature => { + // Find the constituent anchored features for every derived feature + val allAnchoredFeaturesInDerived = derivedFeature._2.consumedFeatureNames.map(_.getFeatureName) + val containsFeature: Seq[Boolean] = allAnchoredFeaturesInDerived.map(feature => updatedAnchoredFeatures.contains(feature)) + !containsFeature.contains(false) + }) + val updatedWindowAggFeatures = featureGroups.allWindowAggFeatures.filter(windowAggFeature => updatedAnchoredFeatures.contains(windowAggFeature._1)) + val updatedFeatureGroups = FeatureGroups(updatedAnchoredFeatures, updatedDerivedFeatures, featureGroups.allWindowAggFeatures, + featureGroups.allPassthroughFeatures, featureGroups.allSeqJoinFeatures) + val updatedKeyTaggedFeatures = keyTaggedFeatures.filter(feature => updatedAnchoredFeatures.contains(feature.featureName) + || updatedDerivedFeatures.contains(feature.featureName) || updatedDerivedFeatures.contains(feature.featureName) + || featureGroups.allPassthroughFeatures.contains(feature.featureName) || featureGroups.allSeqJoinFeatures.contains(feature.featureName)) + val updatedLogicalPlan = MultiStageJoinPlanner().getLogicalPlan(updatedFeatureGroups, updatedKeyTaggedFeatures) + val groupedAnchoredFeatures = featureGenFeatureGrouper.group(updatedAllStageFeatures, featureGenSpec.getOutputProcessorConfigs, updatedDerivedFeatures) // 6. Substitute defaults at this stage since all anchored features are generated and grouped together. // Substitute before generating derived features. @@ -89,9 +109,9 @@ private[offline] class DataFrameFeatureGenerator(logicalPlan: MultiStageJoinPlan // 7. Calculate derived features. val derivedFeatureEvaluator = getDerivedFeatureEvaluatorInstance(ss, featureGroups) - val derivedFeatureGenerator = DerivedFeatureGenStage(featureGroups, logicalPlan, derivedFeatureEvaluator) + val derivedFeatureGenerator = DerivedFeatureGenStage(updatedFeatureGroups, updatedLogicalPlan, derivedFeatureEvaluator) - val derivationsEvaluatedFeatures = (logicalPlan.joinStages ++ logicalPlan.convertErasedEntityTaggedToJoinStage(logicalPlan.postJoinDerivedFeatures)) + val derivationsEvaluatedFeatures = (updatedLogicalPlan.joinStages ++ updatedLogicalPlan.convertErasedEntityTaggedToJoinStage(logicalPlan.postJoinDerivedFeatures)) .foldLeft(defaultSubstitutedFeatures)((accFeatureData, currentStage) => { val (keyTags, featureNames) = currentStage val derivedFeatureNamesThisStage = featureNames.filter(featureGroups.allDerivedFeatures.contains) diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/generation/StreamingFeatureGenerator.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/generation/StreamingFeatureGenerator.scala index 126128323..2289fe9e9 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/generation/StreamingFeatureGenerator.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/generation/StreamingFeatureGenerator.scala @@ -56,7 +56,9 @@ class StreamingFeatureGenerator(dataPathHandlers: List[DataPathHandler]) { } // Load the raw streaming source data val anchorDfRDDMap = anchorToDataFrameMapper.getAnchorDFMapForGen(ss, anchors, None, false, true) - anchorDfRDDMap.par.map { case (anchor, dfAccessor) => { + val updatedAnchorDFRDDMap = anchorDfRDDMap.filter(anchorEntry => anchorEntry._2.isDefined).map(anchorEntry => anchorEntry._1 -> anchorEntry._2.get) + + updatedAnchorDFRDDMap.par.map { case (anchor, dfAccessor) => { val schemaStr = anchor.source.location.asInstanceOf[KafkaEndpoint].schema.avroJson val schemaStruct = SchemaConverters.toSqlType(Schema.parse(schemaStr)).dataType.asInstanceOf[StructType] val rowForRecord = (input: Any) => { diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/job/FeatureGenJob.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/job/FeatureGenJob.scala index b9098a647..8fd950677 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/job/FeatureGenJob.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/job/FeatureGenJob.scala @@ -231,17 +231,16 @@ object FeatureGenJob { sparkSession, allAnchoredFeatures.values.toSeq, failOnMissing) + val updatedAnchorsWithSource = anchorsWithSource.filter(anchorEntry => anchorEntry._2.isDefined).map(anchorEntry => anchorEntry._1 -> anchorEntry._2.get) + if (updatedAnchorsWithSource.isEmpty) return featureNamesInAnchorSet.asScala.map(featureName => featureName -> sparkSession.emptyDataFrame).toMap.asJava // Only load DataFrames for anchors that have preprocessing UDF // So we filter out anchors that doesn't have preprocessing UDFs // We use feature names sorted and merged as the key to find the anchor // For example, f1, f2 belongs to anchor. Then Map("f1,f2"-> anchor) - val dataFrameMapForPreprocessing = anchorsWithSource + updatedAnchorsWithSource .filter(x => featureNamesInAnchorSet.contains(x._1.featureAnchor.features.toSeq.sorted.mkString(","))) - .map(x => (x._1.featureAnchor.features.toSeq.sorted.mkString(","), x._2.get())) - - // Pyspark only understand Java map so we need to convert Scala map back to Java map. - dataFrameMapForPreprocessing.asJava + .map(x => (x._1.featureAnchor.features.toSeq.sorted.mkString(","), x._2.get())).asJava } def prepareSparkSession(args: Array[String]): FeathrGenPreparationInfo = { diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/job/FeatureJoinJob.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/job/FeatureJoinJob.scala index 009a3cb4a..0c4be62c3 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/job/FeatureJoinJob.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/job/FeatureJoinJob.scala @@ -381,12 +381,14 @@ object FeatureJoinJob { sparkSession, allAnchoredFeatures.values.toSeq, failOnMissing) + val updatedAnchorsWithSource = anchorsWithSource.filter(anchorEntry => anchorEntry._2.isDefined) + .map(anchorEntry => anchorEntry._1 -> anchorEntry._2.get) // Only load DataFrames for anchors that have preprocessing UDF // So we filter out anchors that doesn't have preprocessing UDFs // We use feature names sorted and merged as the key to find the anchor // For example, f1, f2 belongs to anchor. Then Map("f1,f2"-> anchor) - val dataFrameMapForPreprocessing = anchorsWithSource + val dataFrameMapForPreprocessing = updatedAnchorsWithSource .filter(x => featureNamesInAnchorSet.contains(x._1.featureAnchor.features.toSeq.sorted.mkString(","))) .map(x => (x._1.featureAnchor.features.toSeq.sorted.mkString(","), x._2.get())) diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/join/DataFrameFeatureJoiner.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/join/DataFrameFeatureJoiner.scala index a03abc83c..82ae6feba 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/join/DataFrameFeatureJoiner.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/join/DataFrameFeatureJoiner.scala @@ -190,6 +190,9 @@ private[offline] class DataFrameFeatureJoiner(logicalPlan: MultiStageJoinPlan, d .map(featureGroups.allAnchoredFeatures), failOnMissingPartition) + val updatedSourceAccessorMap = anchorSourceAccessorMap.filter(anchorEntry => anchorEntry._2.isDefined) + .map(anchorEntry => anchorEntry._1 -> anchorEntry._2.get) + implicit val joinExecutionContext: JoinExecutionContext = JoinExecutionContext(ss, logicalPlan, featureGroups, bloomFilters, Some(saltedJoinFrequentItemDFs)) // 3. Join sliding window aggregation features @@ -210,7 +213,7 @@ private[offline] class DataFrameFeatureJoiner(logicalPlan: MultiStageJoinPlan, d SparkJoinWithJoinCondition(EqualityJoinConditionBuilder), mvelContext) } val FeatureDataFrameOutput(FeatureDataFrame(withAllBasicAnchoredFeatureDF, inferredBasicAnchoredFeatureTypes)) = - anchoredFeatureJoinStep.joinFeatures(requiredRegularFeatureAnchors, AnchorJoinStepInput(withWindowAggFeatureDF, anchorSourceAccessorMap)) + anchoredFeatureJoinStep.joinFeatures(requiredRegularFeatureAnchors, AnchorJoinStepInput(withWindowAggFeatureDF, updatedSourceAccessorMap)) // 5. If useSlickJoin, restore(join back) all observation fields before we evaluate post derived features, sequential join and passthrough // anchored features, as they might require other columns in the original observation data, while the current observation // dataset does not have these fields (were removed in the preProcessObservation) diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/accessor/NonTimeBasedDataSourceAccessor.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/accessor/NonTimeBasedDataSourceAccessor.scala index 4860f93ba..dfbe50f01 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/accessor/NonTimeBasedDataSourceAccessor.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/accessor/NonTimeBasedDataSourceAccessor.scala @@ -5,6 +5,7 @@ import com.linkedin.feathr.offline.source.DataSource import com.linkedin.feathr.offline.source.dataloader.{CaseInsensitiveGenericRecordWrapper, DataLoaderFactory} import com.linkedin.feathr.offline.testfwk.TestFwkUtils import com.linkedin.feathr.offline.transformation.DataFrameExt._ +import com.linkedin.feathr.offline.util.FeathrUtils import org.apache.avro.generic.{GenericRecord, IndexedRecord} import org.apache.avro.specific.SpecificRecordBase import org.apache.spark.rdd.RDD @@ -29,16 +30,22 @@ private[offline] class NonTimeBasedDataSourceAccessor( * @return the dataframe */ override def get(): DataFrame = { + val shouldSkipFeature = FeathrUtils.getFeathrJobParam(ss.sparkContext.getConf, FeathrUtils.SKIP_MISSING_FEATURE).toBoolean println(s"NonTimeBasedDataSourceAccessor loading source ${source.location}") - val df = source.location match { - case SimplePath(_) => List(source.path).map(fileLoaderFactory.create(_).loadDataFrame()).reduce((x, y) => x.fuzzyUnion(y)) - case PathList(paths) => paths.map(fileLoaderFactory.create(_).loadDataFrame()).reduce((x, y) => x.fuzzyUnion(y)) - case Jdbc(_, _, _, _, _) => source.location.loadDf(SparkSession.builder().getOrCreate()) - case GenericLocation(_, _) => source.location.loadDf(SparkSession.builder().getOrCreate()) - case SparkSqlLocation(_, _) => source.location.loadDf(SparkSession.builder().getOrCreate()) - case Snowflake(_, _, _, _) => source.location.loadDf(SparkSession.builder().getOrCreate()) - case _ => fileLoaderFactory.createFromLocation(source.location).loadDataFrame() - } + val df = + try { + source.location match { + case SimplePath(_) => List(source.path).map(fileLoaderFactory.create(_).loadDataFrame()).reduce((x, y) => x.fuzzyUnion(y)) + case PathList(paths) => paths.map(fileLoaderFactory.create(_).loadDataFrame()).reduce((x, y) => x.fuzzyUnion(y)) + case Jdbc(_, _, _, _, _) => source.location.loadDf(SparkSession.builder().getOrCreate()) + case GenericLocation(_, _) => source.location.loadDf(SparkSession.builder().getOrCreate()) + case SparkSqlLocation(_, _) => source.location.loadDf(SparkSession.builder().getOrCreate()) + case Snowflake(_, _, _, _) => source.location.loadDf(SparkSession.builder().getOrCreate()) + case _ => fileLoaderFactory.createFromLocation(source.location).loadDataFrame() + } + } catch { + case e: Exception => if (shouldSkipFeature) ss.emptyDataFrame else throw e + } if (TestFwkUtils.IS_DEBUGGER_ENABLED) { println() diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/transformation/AnchorToDataSourceMapper.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/transformation/AnchorToDataSourceMapper.scala index ef714450b..d977d5493 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/transformation/AnchorToDataSourceMapper.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/transformation/AnchorToDataSourceMapper.scala @@ -4,7 +4,7 @@ import java.time.Duration import com.linkedin.feathr.common.{DateParam, DateTimeResolution} import com.linkedin.feathr.offline.source.SourceFormatType._ import com.linkedin.feathr.offline.anchored.feature.FeatureAnchorWithSource -import com.linkedin.feathr.offline.config.location.{PathList, SimplePath} +import com.linkedin.feathr.offline.config.location.{DataLocation, PathList, SimplePath} import com.linkedin.feathr.offline.generation.IncrementalAggContext import com.linkedin.feathr.offline.source.DataSource import com.linkedin.feathr.offline.source.accessor.DataSourceAccessor @@ -34,7 +34,8 @@ private[offline] class AnchorToDataSourceMapper(dataPathHandlers: List[DataPathH def getBasicAnchorDFMapForJoin( ss: SparkSession, requiredFeatureAnchors: Seq[FeatureAnchorWithSource], - failOnMissingPartition: Boolean): Map[FeatureAnchorWithSource, DataSourceAccessor] = { + failOnMissingPartition: Boolean): Map[FeatureAnchorWithSource, Option[DataSourceAccessor]] = { + val shouldSkipFeature = FeathrUtils.getFeathrJobParam(ss.sparkContext.getConf, FeathrUtils.SKIP_MISSING_FEATURE).toBoolean // get a Map from each source to a list of all anchors based on this source val sourceToAnchor = requiredFeatureAnchors .map(anchor => (anchor.source, anchor)) @@ -62,12 +63,16 @@ private[offline] class AnchorToDataSourceMapper(dataPathHandlers: List[DataPathH } } } - val timeSeriesSource = DataSourceAccessor(ss = ss, - source = source, - dateIntervalOpt = dateInterval, - expectDatumType = Some(expectDatumType), - failOnMissingPartition = failOnMissingPartition, - dataPathHandlers = dataPathHandlers) + val timeSeriesSource = try { + Some(DataSourceAccessor(ss = ss, + source = source, + dateIntervalOpt = dateInterval, + expectDatumType = Some(expectDatumType), + failOnMissingPartition = failOnMissingPartition, + dataPathHandlers = dataPathHandlers)) + } catch { + case e: Exception => if (shouldSkipFeature) None else throw e + } anchorsWithDate.map(anchor => (anchor, timeSeriesSource)) }) @@ -96,16 +101,19 @@ private[offline] class AnchorToDataSourceMapper(dataPathHandlers: List[DataPathH val dataLoaderHandlers: List[DataLoaderHandler] = dataPathHandlers.map(_.dataLoaderHandler) // Only file-based source has real "path", others are just single dataset - val adjustedObsTimeRange = if (factDataSource.location.isFileBasedLocation()) { + val (adjustedObsTimeRange, dataSourcePath) = if (factDataSource.location.isFileBasedLocation()) { val pathChecker = PathChecker(ss, dataLoaderHandlers) val pathAnalyzer = new TimeBasedHdfsPathAnalyzer(pathChecker, dataLoaderHandlers) val pathInfo = pathAnalyzer.analyze(factDataSource.path) if (pathInfo.dateTimeResolution == DateTimeResolution.DAILY) { - obsTimeRange.adjustWithDateTimeResolution(DateTimeResolution.DAILY) - } else obsTimeRange + (obsTimeRange.adjustWithDateTimeResolution(DateTimeResolution.DAILY), pathInfo.basePath) + } else (obsTimeRange, pathInfo.basePath) } else { - obsTimeRange + (obsTimeRange, factDataSource.path) } + // Copy the pathInfo's path into the datasource path as it adds the daily/hourly keyword if it is missing from the path + val updatedFactDataSource = DataSource(dataSourcePath, factDataSource.sourceType, factDataSource.timeWindowParams, + factDataSource.timePartitionPattern, factDataSource.postfixPath) val timeInterval = OfflineDateTimeUtils.getFactDataTimeRange(adjustedObsTimeRange, window, timeDelays) val needCreateTimestampColumn = SlidingWindowFeatureUtils.needCreateTimestampColumnFromPartition(factDataSource) @@ -115,7 +123,7 @@ private[offline] class AnchorToDataSourceMapper(dataPathHandlers: List[DataPathH val timeSeriesSource = DataSourceAccessor( ss = ss, - source = factDataSource, + source = updatedFactDataSource, dateIntervalOpt = Some(timeInterval), expectDatumType = None, failOnMissingPartition = failOnMissingPartition, @@ -144,7 +152,8 @@ private[offline] class AnchorToDataSourceMapper(dataPathHandlers: List[DataPathH requiredFeatureAnchors: Seq[FeatureAnchorWithSource], incrementalAggContext: Option[IncrementalAggContext], failOnMissingPartition: Boolean, - isStreaming: Boolean = false): Map[FeatureAnchorWithSource, DataSourceAccessor] = { + isStreaming: Boolean = false): Map[FeatureAnchorWithSource, Option[DataSourceAccessor]] = { + val shouldSkipFeature = FeathrUtils.getFeathrJobParam(ss.sparkContext.getConf, FeathrUtils.SKIP_MISSING_FEATURE).toBoolean // get a Map from each source to a list of all anchors based on this source val sourceToAnchor = requiredFeatureAnchors .map(anchor => (anchor.source, anchor)) @@ -168,15 +177,19 @@ private[offline] class AnchorToDataSourceMapper(dataPathHandlers: List[DataPathH } } val needCreateTimestampColumn = source.timePartitionPattern.nonEmpty && source.timeWindowParams.isEmpty - val timeSeriesSource = DataSourceAccessor( - ss = ss, - source = source, - dateIntervalOpt = dateIntervalOpt, - expectDatumType = Some(expectDatumType), - failOnMissingPartition = failOnMissingPartition, - addTimestampColumn = needCreateTimestampColumn, - isStreaming = isStreaming, - dataPathHandlers = dataPathHandlers) + val timeSeriesSource = try { + Some(DataSourceAccessor( + ss = ss, + source = source, + dateIntervalOpt = dateIntervalOpt, + expectDatumType = Some(expectDatumType), + failOnMissingPartition = failOnMissingPartition, + addTimestampColumn = needCreateTimestampColumn, + isStreaming = isStreaming, + dataPathHandlers = dataPathHandlers)) + } catch { + case e: Exception => if (shouldSkipFeature) None else throw e + } anchors.map(anchor => (anchor, timeSeriesSource)) }) diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/util/FeathrUtils.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/util/FeathrUtils.scala index f4c62a6fc..2c57107a7 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/util/FeathrUtils.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/util/FeathrUtils.scala @@ -49,7 +49,7 @@ private[offline] object FeathrUtils { FAIL_ON_MISSING_PARTITION -> "false", SEQ_JOIN_ARRAY_EXPLODE_ENABLED -> "true", ENABLE_SALTED_JOIN -> "false", - SKIP_MISSING_FEATURE -> "false", + SKIP_MISSING_FEATURE -> "true", // If one key appears more than 0.02% in the dataset, we will salt this join key and split them into multiple partitions // This is an empirical value SALTED_JOIN_FREQ_ITEM_THRESHOLD -> "0.0002", diff --git a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/FeatureGenIntegTest.scala b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/FeatureGenIntegTest.scala index cfc568302..ee8deef42 100644 --- a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/FeatureGenIntegTest.scala +++ b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/FeatureGenIntegTest.scala @@ -9,7 +9,7 @@ import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema import org.apache.spark.sql.functions.col import org.apache.spark.sql.types._ -import org.testng.Assert.{assertEquals, assertNotNull} +import org.testng.Assert.{assertEquals, assertNotNull, assertTrue} import org.testng.annotations.Test import scala.collection.mutable @@ -29,6 +29,18 @@ class FeatureGenIntegTest extends FeathrIntegTest { | } | } """.stripMargin + + private val malformedSource = + """ + | swaSource11: { + | location: { path: "generation/daiy/" } + | timePartitionPattern: "yyyy/MM/dd" + | timeWindowParameters: { + | timestampColumn: "timestamp" + | timestampColumnFormat: "yyyy-MM-dd" + | } + | } + """.stripMargin val aFeaturesGroup = Seq( // feature group 1: // A feature ref string with "-" delimiters @@ -817,6 +829,64 @@ class FeatureGenIntegTest extends FeathrIntegTest { }) } + /** + * test sliding window aggregation feature with malformed source and the skip missing feature flag turned on. + * The feature with the malformed source should be skipped. + */ + @Test(enabled=false) + def testSWAFeatureWithMalformedSource(): Unit = { + val swaApplicationConfig = generateSimpleApplicationConfig(features = "f3, f4, f5, f6", endTime = "2019-05-21") + val swaFeatureDefConfig = + s""" + |sources: { + | ${defaultSwaSource} + | ${malformedSource} + |} + |anchors: { + | swaAnchor1: { + | source: "swaSource11" + | key: x + | features: { + | f3: { + | def: "count" + | aggregation: SUM + | window: 1d + | } + | } + | } + | + | swaAnchor2: { + | source: "swaSource" + | key: x + | features: { + | f4: { + | def: "count" + | aggregation: SUM + | window: 1d + | } + | f5: { + | def: "count" + | aggregation: SUM + | window: 3d + | } + | + | f6: { + | def: "count" + | aggregation: LATEST + | window: 3d + | } + | } + | } + |} + """.stripMargin + val dfs = localFeatureGenerate(swaApplicationConfig, swaFeatureDefConfig) + // group by dataframe + val dfCount = dfs.groupBy(_._2.data).size + assertEquals(dfCount, 2) + // Assert that the feature is skipped. + assertTrue(!dfs.keySet.map(x => x.getFeatureName).contains("f3")) + } + /** * test sliding window aggregation feature with different anchors */ @@ -827,10 +897,11 @@ class FeatureGenIntegTest extends FeathrIntegTest { s""" |sources: { | ${defaultSwaSource} + | ${malformedSource} |} |anchors: { | swaAnchor1: { - | source: "swaSource" + | source: "swaSource11" | key: x | features: { | f3: { @@ -1704,13 +1775,20 @@ class FeatureGenIntegTest extends FeathrIntegTest { val featureDefConf = """ |anchors: { - | -local: { - | source: "anchorAndDerivations/derivations/featureGeneration/Data.avro.json" + | local: { + | source: "anchorAndDerivations/derivations/featureGeneation/Data.avro.json" | key: ["x", "y"] | features: { | a_z: "z" | } | } + | local1: { + | source: "anchorAndDerivations/derivations/featureGeneration/Data.avro.json" + | key: ["x", "y"] + | features: { + | a_z1: "z" + | } + | } |} | |derivations: { @@ -1724,7 +1802,7 @@ class FeatureGenIntegTest extends FeathrIntegTest { |} | """.stripMargin - val features = Seq("a_derived_z") + val features = Seq("a_derived_z", "a_z", "a_z1") val output = localFeatureGenerateForSeqOfFeatures(features, featureDefConf) /** @@ -1765,6 +1843,64 @@ class FeatureGenIntegTest extends FeathrIntegTest { FeathrTestUtils.assertDataFrameApproximatelyEquals(groupedOutput.head._1, expectedDf, cmpFunc) } + /** + * This test validates that derived features with multiple keys are supported + * if and only if all dependent features have the same keys. + */ + @Test + def testDerivedFeatureGenWithSkipFeatureFlagOn(): Unit = { + val featureDefConf = + """ + |anchors: { + | local: { + | source: "anchorAndDerivations/derivations/featureGeneation/Data.avro.json" + | key: ["x", "y"] + | features: { + | a_z: "z" + | } + | } + | local1: { + | source: "anchorAndDerivations/derivations/featureGeneration/Data.avro.json" + | key: ["x", "y"] + | features: { + | a_z1: "z" + | } + | } + |} + | + |derivations: { + | a_derived_z: { + | key: ["x", "Id"] + | inputs: { + | arg1: { key: ["x", "Id"], feature: a_z } + | } + | definition: arg1 + | } + |} + | + """.stripMargin + val features = Seq("a_derived_z", "a_z", "a_z1") + val output = localFeatureGenerateForSeqOfFeatures(features, featureDefConf) + + /** + * Expected Output: + * +----------+--------+---------------------------+ + * |x|x|a_derived_z| + * +----------+--------+---------------------------+ + * | 2| 1| [b -> 1.0]| + * | 5| 2| [e -> 1.0]| + * | 1| 1| [a ->...| + * | 4| 2| [d -> 1.0]| + * | 6| 3| [f -> 1.0]| + * | 3| 2| [c -> 1.0]| + * +----------+--------+---------------------------+ + */ + val groupedOutput = output.groupBy(_._2.data) + assertEquals(groupedOutput.size, 1) // features should be generated on a single DF + assertEquals(groupedOutput.head._2.size, 1) // Only 1 feature was generated + assertEquals(groupedOutput.head._2.keySet.head.getFeatureName, "a_z1") // Only 1 feature was generated + } + /** * Test derived feature generation does not support cross join. */ From e1a9dbff3e1fe36ca12fa3a0014a3c0553fc4309 Mon Sep 17 00:00:00 2001 From: Rakesh Kashyap Hanasoge Padmanabha Date: Thu, 2 Feb 2023 11:21:07 -0800 Subject: [PATCH 5/9] Refactor --- .../DataFrameFeatureGenerator.scala | 49 +++++++++++++------ .../feathr/offline/FeatureGenIntegTest.scala | 11 +---- 2 files changed, 36 insertions(+), 24 deletions(-) diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/generation/DataFrameFeatureGenerator.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/generation/DataFrameFeatureGenerator.scala index e11b2864b..5b0bd9112 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/generation/DataFrameFeatureGenerator.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/generation/DataFrameFeatureGenerator.scala @@ -3,12 +3,14 @@ package com.linkedin.feathr.offline.generation import com.linkedin.feathr.common.exception.{ErrorLabel, FeathrException} import com.linkedin.feathr.common.{Header, JoiningFeatureParams, TaggedFeatureName} import com.linkedin.feathr.offline +import com.linkedin.feathr.offline.{FeatureDataFrame, JoinKeys} import com.linkedin.feathr.offline.anchored.feature.FeatureAnchorWithSource.{getDefaultValues, getFeatureTypes} import com.linkedin.feathr.offline.config.sources.FeatureGroupsUpdater import com.linkedin.feathr.offline.derived.functions.SeqJoinDerivationFunction import com.linkedin.feathr.offline.derived.strategies.{DerivationStrategies, RowBasedDerivation, SequentialJoinDerivationStrategy, SparkUdfDerivation, SqlDerivationSpark} import com.linkedin.feathr.offline.derived.{DerivedFeature, DerivedFeatureEvaluator} import com.linkedin.feathr.offline.evaluator.DerivedFeatureGenStage +import com.linkedin.feathr.offline.job.FeatureJoinJob.FeatureName import com.linkedin.feathr.offline.job.{FeatureGenSpec, FeatureTransformation} import com.linkedin.feathr.offline.logical.{FeatureGroups, MultiStageJoinPlan, MultiStageJoinPlanner} import com.linkedin.feathr.offline.mvel.plugins.FeathrExpressionExecutionContext @@ -31,8 +33,36 @@ private[offline] class DataFrameFeatureGenerator(logicalPlan: MultiStageJoinPlan @transient val featureGenDefaultsSubstituter = FeatureGenDefaultsSubstituter() @transient val postGenPruner = PostGenPruner() + /** + * Update the feature groups based on feature missing features. Few features can be missing if the feature data is not present. + * + * @param featureGroups + * @param allStageFeatures + * @param keyTaggedFeatures + * @return + */ + def getUpdatedFeatureGroups(featureGroups: FeatureGroups, allStageFeatures: Map[FeatureName, (FeatureDataFrame, JoinKeys)], + keyTaggedFeatures: Seq[JoiningFeatureParams]): (FeatureGroups, Seq[JoiningFeatureParams]) = { + val updatedAnchoredFeatures = featureGroups.allAnchoredFeatures.filter(featureRow => allStageFeatures.contains(featureRow._1)) + // Iterate over the derived features and remove the derived features which contains these anchored features. + val updatedDerivedFeatures = featureGroups.allDerivedFeatures.filter(derivedFeature => { + // Find the constituent anchored features for every derived feature + val allAnchoredFeaturesInDerived = derivedFeature._2.consumedFeatureNames.map(_.getFeatureName) + val containsFeature: Seq[Boolean] = allAnchoredFeaturesInDerived.map(feature => updatedAnchoredFeatures.contains(feature)) + !containsFeature.contains(false) + }) + val updatedWindowAggFeatures = featureGroups.allWindowAggFeatures.filter(windowAggFeature => updatedAnchoredFeatures.contains(windowAggFeature._1)) + val updatedFeatureGroups = FeatureGroups(updatedAnchoredFeatures, updatedDerivedFeatures, updatedWindowAggFeatures, + featureGroups.allPassthroughFeatures, featureGroups.allSeqJoinFeatures) + val updatedKeyTaggedFeatures = keyTaggedFeatures.filter(feature => updatedAnchoredFeatures.contains(feature.featureName) + || updatedDerivedFeatures.contains(feature.featureName) || updatedWindowAggFeatures.contains(feature.featureName) + || featureGroups.allPassthroughFeatures.contains(feature.featureName) || featureGroups.allSeqJoinFeatures.contains(feature.featureName)) + (updatedFeatureGroups, updatedKeyTaggedFeatures) + } + /** * Generate anchored and derived features and return the feature DataFrame and feature metadata. + * * @param ss input spark session. * @param featureGenSpec specification for a feature generation job. * @param featureGroups all features in scope grouped under different types. @@ -85,22 +115,11 @@ private[offline] class DataFrameFeatureGenerator(logicalPlan: MultiStageJoinPlan // 5. Group features based on grouping specified in output processors val updatedAllStageFeatures = allStageFeatures.filter(keyValue => !keyValue._2._1.df.isEmpty) - val updatedAnchoredFeatures = featureGroups.allAnchoredFeatures.filter(featureRow => updatedAllStageFeatures.contains(featureRow._1)) - // Iterate over the derived features and remove the derived features which contains these anchored features. - val updatedDerivedFeatures = featureGroups.allDerivedFeatures.filter(derivedFeature => { - // Find the constituent anchored features for every derived feature - val allAnchoredFeaturesInDerived = derivedFeature._2.consumedFeatureNames.map(_.getFeatureName) - val containsFeature: Seq[Boolean] = allAnchoredFeaturesInDerived.map(feature => updatedAnchoredFeatures.contains(feature)) - !containsFeature.contains(false) - }) - val updatedWindowAggFeatures = featureGroups.allWindowAggFeatures.filter(windowAggFeature => updatedAnchoredFeatures.contains(windowAggFeature._1)) - val updatedFeatureGroups = FeatureGroups(updatedAnchoredFeatures, updatedDerivedFeatures, featureGroups.allWindowAggFeatures, - featureGroups.allPassthroughFeatures, featureGroups.allSeqJoinFeatures) - val updatedKeyTaggedFeatures = keyTaggedFeatures.filter(feature => updatedAnchoredFeatures.contains(feature.featureName) - || updatedDerivedFeatures.contains(feature.featureName) || updatedDerivedFeatures.contains(feature.featureName) - || featureGroups.allPassthroughFeatures.contains(feature.featureName) || featureGroups.allSeqJoinFeatures.contains(feature.featureName)) + val (updatedFeatureGroups, updatedKeyTaggedFeatures) = getUpdatedFeatureGroups(featureGroups, updatedAllStageFeatures, keyTaggedFeatures) + val updatedLogicalPlan = MultiStageJoinPlanner().getLogicalPlan(updatedFeatureGroups, updatedKeyTaggedFeatures) - val groupedAnchoredFeatures = featureGenFeatureGrouper.group(updatedAllStageFeatures, featureGenSpec.getOutputProcessorConfigs, updatedDerivedFeatures) + val groupedAnchoredFeatures = featureGenFeatureGrouper.group(updatedAllStageFeatures, featureGenSpec.getOutputProcessorConfigs, + updatedFeatureGroups.allDerivedFeatures) // 6. Substitute defaults at this stage since all anchored features are generated and grouped together. // Substitute before generating derived features. diff --git a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/FeatureGenIntegTest.scala b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/FeatureGenIntegTest.scala index ee8deef42..b9b272d9d 100644 --- a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/FeatureGenIntegTest.scala +++ b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/FeatureGenIntegTest.scala @@ -1776,17 +1776,10 @@ class FeatureGenIntegTest extends FeathrIntegTest { """ |anchors: { | local: { - | source: "anchorAndDerivations/derivations/featureGeneation/Data.avro.json" - | key: ["x", "y"] - | features: { - | a_z: "z" - | } - | } - | local1: { | source: "anchorAndDerivations/derivations/featureGeneration/Data.avro.json" | key: ["x", "y"] | features: { - | a_z1: "z" + | a_z: "z" | } | } |} @@ -1802,7 +1795,7 @@ class FeatureGenIntegTest extends FeathrIntegTest { |} | """.stripMargin - val features = Seq("a_derived_z", "a_z", "a_z1") + val features = Seq("a_derived_z") val output = localFeatureGenerateForSeqOfFeatures(features, featureDefConf) /** From b88ed4599842d89100ab15906169467481d97603 Mon Sep 17 00:00:00 2001 From: Rakesh Kashyap Hanasoge Padmanabha Date: Thu, 2 Feb 2023 14:15:14 -0800 Subject: [PATCH 6/9] Add log msgs --- .../config/sources/FeatureGroupsUpdater.scala | 55 ++++++++++++++++++- .../SequentialJoinAsDerivation.scala | 2 + .../DataFrameFeatureGenerator.scala | 30 +--------- .../StreamingFeatureGenerator.scala | 2 + .../feathr/offline/util/AclCheckUtils.scala | 2 +- .../feathr/offline/util/FeathrUtils.scala | 2 +- .../feathr/offline/FeatureGenIntegTest.scala | 7 ++- 7 files changed, 66 insertions(+), 34 deletions(-) diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/config/sources/FeatureGroupsUpdater.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/config/sources/FeatureGroupsUpdater.scala index 985bb5b5f..c34d53fa4 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/config/sources/FeatureGroupsUpdater.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/config/sources/FeatureGroupsUpdater.scala @@ -1,8 +1,10 @@ package com.linkedin.feathr.offline.config.sources import com.linkedin.feathr.common.JoiningFeatureParams +import com.linkedin.feathr.offline.{FeatureDataFrame, JoinKeys} import com.linkedin.feathr.offline.anchored.anchorExtractor.TimeWindowConfigurableAnchorExtractor import com.linkedin.feathr.offline.anchored.feature.FeatureAnchorWithSource +import com.linkedin.feathr.offline.job.FeatureJoinJob.{FeatureName, log} import com.linkedin.feathr.offline.logical.{FeatureGroups, MultiStageJoinPlan} /** @@ -71,6 +73,43 @@ private[offline] class FeatureGroupsUpdater { featureGroups.allSeqJoinFeatures) } + /** + * Update the feature groups (for Feature gen) based on feature missing features. Few anchored features can be missing if the feature data + * is not present. Remove those anchored features, and also the corresponding derived feature which are dependent on it. + * @param featureGroups + * @param allStageFeatures + * @param keyTaggedFeatures + * @return + */ + def getUpdatedFeatureGroups(featureGroups: FeatureGroups, allStageFeatures: Map[FeatureName, (FeatureDataFrame, JoinKeys)], + keyTaggedFeatures: Seq[JoiningFeatureParams]): (FeatureGroups, Seq[JoiningFeatureParams]) = { + val updatedAnchoredFeatures = featureGroups.allAnchoredFeatures.filter(featureRow => allStageFeatures.contains(featureRow._1)) + // Iterate over the derived features and remove the derived features which contains these anchored features. + val updatedDerivedFeatures = featureGroups.allDerivedFeatures.filter(derivedFeature => { + // Find the constituent anchored features for every derived feature + val allAnchoredFeaturesInDerived = derivedFeature._2.consumedFeatureNames.map(_.getFeatureName) + val containsFeature: Seq[Boolean] = allAnchoredFeaturesInDerived.map(feature => updatedAnchoredFeatures.contains(feature)) + !containsFeature.contains(false) + }) + val updatedSeqJoinFeature = featureGroups.allSeqJoinFeatures.filter(seqJoinFeature => { + // Find the constituent anchored features for every derived feature + val allAnchoredFeaturesInDerived = seqJoinFeature._2.consumedFeatureNames.map(_.getFeatureName) + val containsFeature: Seq[Boolean] = allAnchoredFeaturesInDerived.map(feature => updatedAnchoredFeatures.contains(feature)) + !containsFeature.contains(false) + }) + val updatedWindowAggFeatures = featureGroups.allWindowAggFeatures.filter(windowAggFeature => updatedAnchoredFeatures.contains(windowAggFeature._1)) + + log.warn(s"Removed the following features:- ${featureGroups.allAnchoredFeatures.keySet.diff(updatedAnchoredFeatures.keySet)}," + + s"${featureGroups.allDerivedFeatures.keySet.diff(updatedDerivedFeatures.keySet)}," + + s" ${featureGroups.allSeqJoinFeatures.keySet.diff(updatedSeqJoinFeature.keySet)}") + val updatedFeatureGroups = FeatureGroups(updatedAnchoredFeatures, updatedDerivedFeatures, updatedWindowAggFeatures, + featureGroups.allPassthroughFeatures, updatedSeqJoinFeature) + val updatedKeyTaggedFeatures = keyTaggedFeatures.filter(feature => updatedAnchoredFeatures.contains(feature.featureName) + || updatedDerivedFeatures.contains(feature.featureName) || updatedWindowAggFeatures.contains(feature.featureName) + || featureGroups.allPassthroughFeatures.contains(feature.featureName) || updatedSeqJoinFeature.contains(feature.featureName)) + (updatedFeatureGroups, updatedKeyTaggedFeatures) + } + /** * Exclude anchored and derived features features from the join stage if they do not have a valid path. * @param featureToPathsMap Map of anchored feature names to their paths @@ -92,8 +131,22 @@ private[offline] class FeatureGroupsUpdater { .map(featureName => !invalidPaths.contains(featureToPathsMap(featureName))) !containsFeature.contains(false) }) + + // Iterate over the seq join features and remove the derived features which contains these anchored features. + val updatedSeqJoinFeatures = featureGroups.allSeqJoinFeatures.filter(seqJoinFeature => { + // Find the constituent anchored features for every derived feature + val allAnchoredFeaturesInDerived = seqJoinFeature._2.consumedFeatureNames.map(_.getFeatureName) + // Check if any of the features does not have a valid path + val containsFeature: Seq[Boolean] = allAnchoredFeaturesInDerived + .map(featureName => !invalidPaths.contains(featureToPathsMap(featureName))) + !containsFeature.contains(false) + }) + + log.warn(s"Removed the following features:- ${featureGroups.allAnchoredFeatures.keySet.diff(updatedAnchoredFeatures.keySet)}," + + s"${featureGroups.allDerivedFeatures.keySet.diff(updatedDerivedFeatures.keySet)}," + + s" ${featureGroups.allSeqJoinFeatures.keySet.diff(updatedSeqJoinFeatures.keySet)}") FeatureGroups(updatedAnchoredFeatures, updatedDerivedFeatures, - featureGroups.allWindowAggFeatures, featureGroups.allPassthroughFeatures, featureGroups.allSeqJoinFeatures) + featureGroups.allWindowAggFeatures, featureGroups.allPassthroughFeatures, updatedSeqJoinFeatures) } } diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/derived/strategies/SequentialJoinAsDerivation.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/derived/strategies/SequentialJoinAsDerivation.scala index 30995cfa8..c8eb12c90 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/derived/strategies/SequentialJoinAsDerivation.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/derived/strategies/SequentialJoinAsDerivation.scala @@ -219,6 +219,8 @@ private[offline] class SequentialJoinAsDerivation(ss: SparkSession, val anchorDFMap1 = anchorToDataSourceMapper.getBasicAnchorDFMapForJoin(ss, Seq(featureAnchor), failOnMissingPartition) val updatedAnchorDFMap = anchorDFMap1.filter(anchorEntry => anchorEntry._2.isDefined) .map(anchorEntry => anchorEntry._1 -> anchorEntry._2.get) + // We dont need to check if the anchored feature's dataframes are missing (due to skip missing feature) as such + // seq join features have already been removed in the FeatureGroupsUpdater#getUpdatedFeatureGroupsWithoutInvalidPaths. val featureInfo = FeatureTransformation.directCalculate( anchorGroup: AnchorFeatureGroups, updatedAnchorDFMap(featureAnchor), diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/generation/DataFrameFeatureGenerator.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/generation/DataFrameFeatureGenerator.scala index 5b0bd9112..ed5c48d5c 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/generation/DataFrameFeatureGenerator.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/generation/DataFrameFeatureGenerator.scala @@ -33,33 +33,6 @@ private[offline] class DataFrameFeatureGenerator(logicalPlan: MultiStageJoinPlan @transient val featureGenDefaultsSubstituter = FeatureGenDefaultsSubstituter() @transient val postGenPruner = PostGenPruner() - /** - * Update the feature groups based on feature missing features. Few features can be missing if the feature data is not present. - * - * @param featureGroups - * @param allStageFeatures - * @param keyTaggedFeatures - * @return - */ - def getUpdatedFeatureGroups(featureGroups: FeatureGroups, allStageFeatures: Map[FeatureName, (FeatureDataFrame, JoinKeys)], - keyTaggedFeatures: Seq[JoiningFeatureParams]): (FeatureGroups, Seq[JoiningFeatureParams]) = { - val updatedAnchoredFeatures = featureGroups.allAnchoredFeatures.filter(featureRow => allStageFeatures.contains(featureRow._1)) - // Iterate over the derived features and remove the derived features which contains these anchored features. - val updatedDerivedFeatures = featureGroups.allDerivedFeatures.filter(derivedFeature => { - // Find the constituent anchored features for every derived feature - val allAnchoredFeaturesInDerived = derivedFeature._2.consumedFeatureNames.map(_.getFeatureName) - val containsFeature: Seq[Boolean] = allAnchoredFeaturesInDerived.map(feature => updatedAnchoredFeatures.contains(feature)) - !containsFeature.contains(false) - }) - val updatedWindowAggFeatures = featureGroups.allWindowAggFeatures.filter(windowAggFeature => updatedAnchoredFeatures.contains(windowAggFeature._1)) - val updatedFeatureGroups = FeatureGroups(updatedAnchoredFeatures, updatedDerivedFeatures, updatedWindowAggFeatures, - featureGroups.allPassthroughFeatures, featureGroups.allSeqJoinFeatures) - val updatedKeyTaggedFeatures = keyTaggedFeatures.filter(feature => updatedAnchoredFeatures.contains(feature.featureName) - || updatedDerivedFeatures.contains(feature.featureName) || updatedWindowAggFeatures.contains(feature.featureName) - || featureGroups.allPassthroughFeatures.contains(feature.featureName) || featureGroups.allSeqJoinFeatures.contains(feature.featureName)) - (updatedFeatureGroups, updatedKeyTaggedFeatures) - } - /** * Generate anchored and derived features and return the feature DataFrame and feature metadata. * @@ -115,7 +88,8 @@ private[offline] class DataFrameFeatureGenerator(logicalPlan: MultiStageJoinPlan // 5. Group features based on grouping specified in output processors val updatedAllStageFeatures = allStageFeatures.filter(keyValue => !keyValue._2._1.df.isEmpty) - val (updatedFeatureGroups, updatedKeyTaggedFeatures) = getUpdatedFeatureGroups(featureGroups, updatedAllStageFeatures, keyTaggedFeatures) + val (updatedFeatureGroups, updatedKeyTaggedFeatures) = FeatureGroupsUpdater().getUpdatedFeatureGroups(featureGroups, + updatedAllStageFeatures, keyTaggedFeatures) val updatedLogicalPlan = MultiStageJoinPlanner().getLogicalPlan(updatedFeatureGroups, updatedKeyTaggedFeatures) val groupedAnchoredFeatures = featureGenFeatureGrouper.group(updatedAllStageFeatures, featureGenSpec.getOutputProcessorConfigs, diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/generation/StreamingFeatureGenerator.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/generation/StreamingFeatureGenerator.scala index 2289fe9e9..7d965f0bc 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/generation/StreamingFeatureGenerator.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/generation/StreamingFeatureGenerator.scala @@ -56,6 +56,8 @@ class StreamingFeatureGenerator(dataPathHandlers: List[DataPathHandler]) { } // Load the raw streaming source data val anchorDfRDDMap = anchorToDataFrameMapper.getAnchorDFMapForGen(ss, anchors, None, false, true) + + // Remove entries for which feature dataframe cannot be loaded. val updatedAnchorDFRDDMap = anchorDfRDDMap.filter(anchorEntry => anchorEntry._2.isDefined).map(anchorEntry => anchorEntry._1 -> anchorEntry._2.get) updatedAnchorDFRDDMap.par.map { case (anchor, dfAccessor) => { diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/util/AclCheckUtils.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/util/AclCheckUtils.scala index 723672339..ba7d229b6 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/util/AclCheckUtils.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/util/AclCheckUtils.scala @@ -72,7 +72,7 @@ private[offline] object AclCheckUtils { if (invalidPaths.isEmpty) { (Success(()), invalidPaths.map(_._2)) } else { - if (shouldSkipFeature) { + if (!shouldSkipFeature) { (Failure( new RuntimeException( "Can not verify read authorization on the following paths. This can be due to" + diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/util/FeathrUtils.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/util/FeathrUtils.scala index 2c57107a7..f4c62a6fc 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/util/FeathrUtils.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/util/FeathrUtils.scala @@ -49,7 +49,7 @@ private[offline] object FeathrUtils { FAIL_ON_MISSING_PARTITION -> "false", SEQ_JOIN_ARRAY_EXPLODE_ENABLED -> "true", ENABLE_SALTED_JOIN -> "false", - SKIP_MISSING_FEATURE -> "true", + SKIP_MISSING_FEATURE -> "false", // If one key appears more than 0.02% in the dataset, we will salt this join key and split them into multiple partitions // This is an empirical value SALTED_JOIN_FREQ_ITEM_THRESHOLD -> "0.0002", diff --git a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/FeatureGenIntegTest.scala b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/FeatureGenIntegTest.scala index b9b272d9d..aea1ca10d 100644 --- a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/FeatureGenIntegTest.scala +++ b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/FeatureGenIntegTest.scala @@ -897,11 +897,10 @@ class FeatureGenIntegTest extends FeathrIntegTest { s""" |sources: { | ${defaultSwaSource} - | ${malformedSource} |} |anchors: { | swaAnchor1: { - | source: "swaSource11" + | source: "swaSource" | key: x | features: { | f3: { @@ -1839,8 +1838,10 @@ class FeatureGenIntegTest extends FeathrIntegTest { /** * This test validates that derived features with multiple keys are supported * if and only if all dependent features have the same keys. + * To enable this test, set the value of FeatureUtils.SKIP_MISSING_FEATURE to True. From + * Spark 3.1, SparkContext.updateConf() is not supported. */ - @Test + @Test(enabled=false) def testDerivedFeatureGenWithSkipFeatureFlagOn(): Unit = { val featureDefConf = """ From 25bfbaafc3bd1557a0501d25c09aa1ee8f24b7ee Mon Sep 17 00:00:00 2001 From: Rakesh Kashyap Hanasoge Padmanabha Date: Thu, 2 Feb 2023 16:41:19 -0800 Subject: [PATCH 7/9] Enable tests --- .../linkedin/feathr/offline/client/FeathrClient.scala | 2 +- .../feathr/offline/job/LocalFeatureJoinJob.scala | 6 ++++++ .../accessor/NonTimeBasedDataSourceAccessor.scala | 4 +++- .../transformation/AnchorToDataSourceMapper.scala | 8 +++++--- .../linkedin/feathr/offline/FeatureGenIntegTest.scala | 11 ++++++++--- .../feathr/offline/SlidingWindowAggIntegTest.scala | 7 ++++++- 6 files changed, 29 insertions(+), 9 deletions(-) diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/client/FeathrClient.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/client/FeathrClient.scala index c04691930..bad3d0575 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/client/FeathrClient.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/client/FeathrClient.scala @@ -247,7 +247,7 @@ class FeathrClient private[offline] (sparkSession: SparkSession, featureGroups: val featurePathsTest = AclCheckUtils.checkReadAuthorization(sparkSession, logicalPlan.allRequiredFeatures, allAnchoredFeatures) featurePathsTest._1 match { case Failure(exception) => - if (shouldSkipFeature) { // If skip feature, remove the corresponding anchored feature from the feature group and produce a new logical plan + if (shouldSkipFeature || SQLConf.get.getConf(LocalFeatureJoinJob.SKIP_MISSING_FEATURE)) { // If skip feature, remove the corresponding anchored feature from the feature group and produce a new logical plan val featureGroupsWithoutInvalidFeatures = FeatureGroupsUpdater() .getUpdatedFeatureGroupsWithoutInvalidPaths(featureToPathsMap, updatedFeatureGroups, featurePathsTest._2) logicalPlanner.getLogicalPlan(featureGroupsWithoutInvalidFeatures, keyTaggedFeatures) diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/job/LocalFeatureJoinJob.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/job/LocalFeatureJoinJob.scala index aa92cd546..c5599c5ff 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/job/LocalFeatureJoinJob.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/job/LocalFeatureJoinJob.scala @@ -10,6 +10,7 @@ import com.linkedin.feathr.offline.source.{DataSource, SourceFormatType} import com.linkedin.feathr.offline.util.FeathrTestUtils.createSparkSession import com.linkedin.feathr.offline.util.{FeaturizedDatasetMetadata, SparkFeaturizedDataset} import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.internal.SQLConf /** * This object is used to help user to test/debug their workflow locally with sample data. This class serves as a @@ -20,6 +21,11 @@ object LocalFeatureJoinJob { // for user convenience, create spark session within this function, so user does not need to create one // this also ensure it has same setting as the real feathr join job val ss: SparkSession = createSparkSession(enableHiveSupport = true) + val SKIP_MISSING_FEATURE = SQLConf + .buildConf("spark.feathr.skip.missing.feature") + .doc("Whether to use the V2 implementation, which should have better performance.") + .booleanConf + .createWithDefault(false) /** * local debug API, used in unit test and local debug diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/accessor/NonTimeBasedDataSourceAccessor.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/accessor/NonTimeBasedDataSourceAccessor.scala index dfbe50f01..00de80b0c 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/accessor/NonTimeBasedDataSourceAccessor.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/accessor/NonTimeBasedDataSourceAccessor.scala @@ -1,6 +1,7 @@ package com.linkedin.feathr.offline.source.accessor import com.linkedin.feathr.offline.config.location.{GenericLocation, Jdbc, PathList, SimplePath, Snowflake, SparkSqlLocation} +import com.linkedin.feathr.offline.job.LocalFeatureJoinJob import com.linkedin.feathr.offline.source.DataSource import com.linkedin.feathr.offline.source.dataloader.{CaseInsensitiveGenericRecordWrapper, DataLoaderFactory} import com.linkedin.feathr.offline.testfwk.TestFwkUtils @@ -9,6 +10,7 @@ import com.linkedin.feathr.offline.util.FeathrUtils import org.apache.avro.generic.{GenericRecord, IndexedRecord} import org.apache.avro.specific.SpecificRecordBase import org.apache.spark.rdd.RDD +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.{DataFrame, SparkSession} /** * load a dataset from a non-partitioned source. @@ -44,7 +46,7 @@ private[offline] class NonTimeBasedDataSourceAccessor( case _ => fileLoaderFactory.createFromLocation(source.location).loadDataFrame() } } catch { - case e: Exception => if (shouldSkipFeature) ss.emptyDataFrame else throw e + case e: Exception => if (shouldSkipFeature || SQLConf.get.getConf(LocalFeatureJoinJob.SKIP_MISSING_FEATURE)) ss.emptyDataFrame else throw e } if (TestFwkUtils.IS_DEBUGGER_ENABLED) { diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/transformation/AnchorToDataSourceMapper.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/transformation/AnchorToDataSourceMapper.scala index d977d5493..571bba6b8 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/transformation/AnchorToDataSourceMapper.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/transformation/AnchorToDataSourceMapper.scala @@ -6,6 +6,7 @@ import com.linkedin.feathr.offline.source.SourceFormatType._ import com.linkedin.feathr.offline.anchored.feature.FeatureAnchorWithSource import com.linkedin.feathr.offline.config.location.{DataLocation, PathList, SimplePath} import com.linkedin.feathr.offline.generation.IncrementalAggContext +import com.linkedin.feathr.offline.job.LocalFeatureJoinJob import com.linkedin.feathr.offline.source.DataSource import com.linkedin.feathr.offline.source.accessor.DataSourceAccessor import com.linkedin.feathr.offline.source.accessor.DataPathHandler @@ -14,6 +15,7 @@ import com.linkedin.feathr.offline.source.pathutil.{PathChecker, TimeBasedHdfsPa import com.linkedin.feathr.offline.swa.SlidingWindowFeatureUtils import com.linkedin.feathr.offline.util.{FeathrUtils, SourceUtils} import com.linkedin.feathr.offline.util.datetime.{DateTimeInterval, OfflineDateTimeUtils} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.{DataFrame, SparkSession} /** @@ -71,7 +73,7 @@ private[offline] class AnchorToDataSourceMapper(dataPathHandlers: List[DataPathH failOnMissingPartition = failOnMissingPartition, dataPathHandlers = dataPathHandlers)) } catch { - case e: Exception => if (shouldSkipFeature) None else throw e + case e: Exception => if (shouldSkipFeature || SQLConf.get.getConf(LocalFeatureJoinJob.SKIP_MISSING_FEATURE)) None else throw e } anchorsWithDate.map(anchor => (anchor, timeSeriesSource)) @@ -132,7 +134,7 @@ private[offline] class AnchorToDataSourceMapper(dataPathHandlers: List[DataPathH timeSeriesSource.get() } catch {// todo - Add this functionality to only specific exception types and not for all error types. - case e: Exception => if (shouldSkipFeature) ss.emptyDataFrame else throw e + case e: Exception => if (shouldSkipFeature || SQLConf.get.getConf(LocalFeatureJoinJob.SKIP_MISSING_FEATURE)) ss.emptyDataFrame else throw e } } @@ -188,7 +190,7 @@ private[offline] class AnchorToDataSourceMapper(dataPathHandlers: List[DataPathH isStreaming = isStreaming, dataPathHandlers = dataPathHandlers)) } catch { - case e: Exception => if (shouldSkipFeature) None else throw e + case e: Exception => if (shouldSkipFeature|| SQLConf.get.getConf(LocalFeatureJoinJob.SKIP_MISSING_FEATURE)) None else throw e } anchors.map(anchor => (anchor, timeSeriesSource)) diff --git a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/FeatureGenIntegTest.scala b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/FeatureGenIntegTest.scala index aea1ca10d..6c75e0838 100644 --- a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/FeatureGenIntegTest.scala +++ b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/FeatureGenIntegTest.scala @@ -2,12 +2,13 @@ package com.linkedin.feathr.offline import com.linkedin.feathr.common.exception.FeathrException import com.linkedin.feathr.offline.AssertFeatureUtils._ -import com.linkedin.feathr.offline.job.PreprocessedDataFrameManager +import com.linkedin.feathr.offline.job.{LocalFeatureJoinJob, PreprocessedDataFrameManager} import com.linkedin.feathr.offline.source.dataloader.CsvDataLoader import com.linkedin.feathr.offline.util.{FeathrTestUtils, FeatureGenConstants} import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema import org.apache.spark.sql.functions.col +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.testng.Assert.{assertEquals, assertNotNull, assertTrue} import org.testng.annotations.Test @@ -833,8 +834,9 @@ class FeatureGenIntegTest extends FeathrIntegTest { * test sliding window aggregation feature with malformed source and the skip missing feature flag turned on. * The feature with the malformed source should be skipped. */ - @Test(enabled=false) + @Test def testSWAFeatureWithMalformedSource(): Unit = { + SQLConf.get.setConf(LocalFeatureJoinJob.SKIP_MISSING_FEATURE, true) val swaApplicationConfig = generateSimpleApplicationConfig(features = "f3, f4, f5, f6", endTime = "2019-05-21") val swaFeatureDefConfig = s""" @@ -885,6 +887,7 @@ class FeatureGenIntegTest extends FeathrIntegTest { assertEquals(dfCount, 2) // Assert that the feature is skipped. assertTrue(!dfs.keySet.map(x => x.getFeatureName).contains("f3")) + SQLConf.get.setConf(LocalFeatureJoinJob.SKIP_MISSING_FEATURE, true) } /** @@ -1841,8 +1844,9 @@ class FeatureGenIntegTest extends FeathrIntegTest { * To enable this test, set the value of FeatureUtils.SKIP_MISSING_FEATURE to True. From * Spark 3.1, SparkContext.updateConf() is not supported. */ - @Test(enabled=false) + @Test def testDerivedFeatureGenWithSkipFeatureFlagOn(): Unit = { + SQLConf.get.setConf(LocalFeatureJoinJob.SKIP_MISSING_FEATURE, true) val featureDefConf = """ |anchors: { @@ -1893,6 +1897,7 @@ class FeatureGenIntegTest extends FeathrIntegTest { assertEquals(groupedOutput.size, 1) // features should be generated on a single DF assertEquals(groupedOutput.head._2.size, 1) // Only 1 feature was generated assertEquals(groupedOutput.head._2.keySet.head.getFeatureName, "a_z1") // Only 1 feature was generated + SQLConf.get.setConf(LocalFeatureJoinJob.SKIP_MISSING_FEATURE, false) } /** diff --git a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/SlidingWindowAggIntegTest.scala b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/SlidingWindowAggIntegTest.scala index 9e45ec992..2f3ccf55c 100644 --- a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/SlidingWindowAggIntegTest.scala +++ b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/SlidingWindowAggIntegTest.scala @@ -1,8 +1,11 @@ package com.linkedin.feathr.offline import com.linkedin.feathr.offline.AssertFeatureUtils.{rowApproxEquals, validateRows} +import com.linkedin.feathr.offline.job.LocalFeatureJoinJob +import com.linkedin.feathr.offline.util.FeathrUtils import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{LongType, StructField, StructType} import org.testng.Assert._ import org.testng.annotations._ @@ -319,8 +322,9 @@ class SlidingWindowAggIntegTest extends FeathrIntegTest { * SWA test with missing features. To enable this test, set the value of FeatureUtils.SKIP_MISSING_FEATURE to True. From * Spark 3.1, SparkContext.updateConf() is not supported. */ - @Test(enabled = false) + @Test def testSWAWithMissingFeatureData(): Unit = { + SQLConf.get.setConf(LocalFeatureJoinJob.SKIP_MISSING_FEATURE, true) val joinConfigAsString = """ | settings: { @@ -399,6 +403,7 @@ class SlidingWindowAggIntegTest extends FeathrIntegTest { val df = res.collect()(0) assertEquals(df.getAs[Float]("simplePageViewCount"), 10f) assert(!res.columns.contains("simpleFeature")) + SQLConf.get.setConf(LocalFeatureJoinJob.SKIP_MISSING_FEATURE, false) } /** From 33751c36c85ac475ec909182cbb42cbc9be4002f Mon Sep 17 00:00:00 2001 From: Rakesh Kashyap Hanasoge Padmanabha Date: Fri, 3 Feb 2023 00:52:34 -0800 Subject: [PATCH 8/9] bump rc version --- .../linkedin/feathr/offline/client/FeathrClient.scala | 5 +++-- .../source/accessor/NonTimeBasedDataSourceAccessor.scala | 3 ++- .../transformation/AnchorToDataSourceMapper.scala | 9 ++++++--- .../feathr/offline/SlidingWindowAggIntegTest.scala | 2 +- gradle.properties | 2 +- 5 files changed, 13 insertions(+), 8 deletions(-) diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/client/FeathrClient.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/client/FeathrClient.scala index bad3d0575..852ddaa05 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/client/FeathrClient.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/client/FeathrClient.scala @@ -246,8 +246,9 @@ class FeathrClient private[offline] (sparkSession: SparkSession, featureGroups: // Check read authorization for all required features val featurePathsTest = AclCheckUtils.checkReadAuthorization(sparkSession, logicalPlan.allRequiredFeatures, allAnchoredFeatures) featurePathsTest._1 match { - case Failure(exception) => - if (shouldSkipFeature || SQLConf.get.getConf(LocalFeatureJoinJob.SKIP_MISSING_FEATURE)) { // If skip feature, remove the corresponding anchored feature from the feature group and produce a new logical plan + case Failure(exception) => // If skip feature, remove the corresponding anchored feature from the feature group and produce a new logical plan + if (shouldSkipFeature || (sparkSession.sparkContext.isLocal && + SQLConf.get.getConf(LocalFeatureJoinJob.SKIP_MISSING_FEATURE))) { val featureGroupsWithoutInvalidFeatures = FeatureGroupsUpdater() .getUpdatedFeatureGroupsWithoutInvalidPaths(featureToPathsMap, updatedFeatureGroups, featurePathsTest._2) logicalPlanner.getLogicalPlan(featureGroupsWithoutInvalidFeatures, keyTaggedFeatures) diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/accessor/NonTimeBasedDataSourceAccessor.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/accessor/NonTimeBasedDataSourceAccessor.scala index 00de80b0c..60c2fd417 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/accessor/NonTimeBasedDataSourceAccessor.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/accessor/NonTimeBasedDataSourceAccessor.scala @@ -46,7 +46,8 @@ private[offline] class NonTimeBasedDataSourceAccessor( case _ => fileLoaderFactory.createFromLocation(source.location).loadDataFrame() } } catch { - case e: Exception => if (shouldSkipFeature || SQLConf.get.getConf(LocalFeatureJoinJob.SKIP_MISSING_FEATURE)) ss.emptyDataFrame else throw e + case e: Exception => if (shouldSkipFeature || (ss.sparkContext.isLocal && + SQLConf.get.getConf(LocalFeatureJoinJob.SKIP_MISSING_FEATURE))) ss.emptyDataFrame else throw e } if (TestFwkUtils.IS_DEBUGGER_ENABLED) { diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/transformation/AnchorToDataSourceMapper.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/transformation/AnchorToDataSourceMapper.scala index 571bba6b8..2cf5de3e2 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/transformation/AnchorToDataSourceMapper.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/transformation/AnchorToDataSourceMapper.scala @@ -73,7 +73,8 @@ private[offline] class AnchorToDataSourceMapper(dataPathHandlers: List[DataPathH failOnMissingPartition = failOnMissingPartition, dataPathHandlers = dataPathHandlers)) } catch { - case e: Exception => if (shouldSkipFeature || SQLConf.get.getConf(LocalFeatureJoinJob.SKIP_MISSING_FEATURE)) None else throw e + case e: Exception => if (shouldSkipFeature || (ss.sparkContext.isLocal && + SQLConf.get.getConf(LocalFeatureJoinJob.SKIP_MISSING_FEATURE))) None else throw e } anchorsWithDate.map(anchor => (anchor, timeSeriesSource)) @@ -134,7 +135,8 @@ private[offline] class AnchorToDataSourceMapper(dataPathHandlers: List[DataPathH timeSeriesSource.get() } catch {// todo - Add this functionality to only specific exception types and not for all error types. - case e: Exception => if (shouldSkipFeature || SQLConf.get.getConf(LocalFeatureJoinJob.SKIP_MISSING_FEATURE)) ss.emptyDataFrame else throw e + case e: Exception => if (shouldSkipFeature || (ss.sparkContext.isLocal && + SQLConf.get.getConf(LocalFeatureJoinJob.SKIP_MISSING_FEATURE))) ss.emptyDataFrame else throw e } } @@ -190,7 +192,8 @@ private[offline] class AnchorToDataSourceMapper(dataPathHandlers: List[DataPathH isStreaming = isStreaming, dataPathHandlers = dataPathHandlers)) } catch { - case e: Exception => if (shouldSkipFeature|| SQLConf.get.getConf(LocalFeatureJoinJob.SKIP_MISSING_FEATURE)) None else throw e + case e: Exception => if (shouldSkipFeature || (ss.sparkContext.isLocal && + SQLConf.get.getConf(LocalFeatureJoinJob.SKIP_MISSING_FEATURE))) None else throw e } anchors.map(anchor => (anchor, timeSeriesSource)) diff --git a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/SlidingWindowAggIntegTest.scala b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/SlidingWindowAggIntegTest.scala index 2f3ccf55c..bbb93dfdc 100644 --- a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/SlidingWindowAggIntegTest.scala +++ b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/SlidingWindowAggIntegTest.scala @@ -281,7 +281,7 @@ class SlidingWindowAggIntegTest extends FeathrIntegTest { """ |sources: { | swaSource: { - | location: { path: "slidingWindowAgg/localSWADefaultTest/daily" } + | location: { path: "slidingWindowAgg/localSWDefaultTest/daily" } | timePartitionPattern: "yyyy/MM/dd" | timeWindowParameters: { | timestampColumn: "timestamp" diff --git a/gradle.properties b/gradle.properties index ac0755a31..99df84ee6 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,3 +1,3 @@ -version=0.10.4-rc3 +version=0.10.4-rc4 SONATYPE_AUTOMATIC_RELEASE=true POM_ARTIFACT_ID=feathr_2.12 From 1e7fbbb1c041b8afcb0d8e239ffd4f0c1fd5938c Mon Sep 17 00:00:00 2001 From: Rakesh Kashyap Hanasoge Padmanabha Date: Fri, 3 Feb 2023 11:05:33 -0800 Subject: [PATCH 9/9] Remove version bump --- gradle.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gradle.properties b/gradle.properties index 99df84ee6..ac0755a31 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,3 +1,3 @@ -version=0.10.4-rc4 +version=0.10.4-rc3 SONATYPE_AUTOMATIC_RELEASE=true POM_ARTIFACT_ID=feathr_2.12