From 065503bff6d8240b7c12a3c32a3e8dc30526cc8b Mon Sep 17 00:00:00 2001 From: Rakesh Kashyap Hanasoge Padmanabha Date: Fri, 3 Feb 2023 14:41:54 -0800 Subject: [PATCH 1/8] Add optional retry mechanism if data loading fails --- .../offline/job/LocalFeatureJoinJob.scala | 8 ++++- .../source/dataloader/BatchDataLoader.scala | 30 ++++++++++++++----- .../feathr/offline/util/FeathrUtils.scala | 4 +++ .../dataloader/TestBatchDataLoader.scala | 17 +++++++++++ gradle.properties | 2 +- 5 files changed, 51 insertions(+), 10 deletions(-) diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/job/LocalFeatureJoinJob.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/job/LocalFeatureJoinJob.scala index c5599c5ff..ebb9902d8 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/job/LocalFeatureJoinJob.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/job/LocalFeatureJoinJob.scala @@ -23,10 +23,16 @@ object LocalFeatureJoinJob { val ss: SparkSession = createSparkSession(enableHiveSupport = true) val SKIP_MISSING_FEATURE = SQLConf .buildConf("spark.feathr.skip.missing.feature") - .doc("Whether to use the V2 implementation, which should have better performance.") + .doc("Whether to skip features if data is missing.") .booleanConf .createWithDefault(false) + val MAX_DATA_LOAD_RETRY = SQLConf + .buildConf("spark.feathr.max.data.load.retry") + .doc("Number of retries if data is missing.") + .intConf + .createWithDefault(0) + /** * local debug API, used in unit test and local debug * diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/dataloader/BatchDataLoader.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/dataloader/BatchDataLoader.scala index e3714217b..6ee40d729 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/dataloader/BatchDataLoader.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/dataloader/BatchDataLoader.scala @@ -4,11 +4,15 @@ import com.linkedin.feathr.common.exception.{ErrorLabel, FeathrInputDataExceptio import com.linkedin.feathr.offline.config.location.DataLocation import com.linkedin.feathr.offline.generation.SparkIOUtils import com.linkedin.feathr.offline.job.DataSourceUtils.getSchemaFromAvroDataFile +import com.linkedin.feathr.offline.job.LocalFeatureJoinJob import com.linkedin.feathr.offline.source.dataloader.DataLoaderHandler import com.linkedin.feathr.offline.util.DelimiterUtils.checkDelimiterOption +import com.linkedin.feathr.offline.util.FeathrUtils +import com.linkedin.feathr.offline.util.FeathrUtils.DATA_LOAD_WAIT_IN_MS import org.apache.avro.Schema import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.mapred.JobConf +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.{DataFrame, SparkSession} /** @@ -17,7 +21,10 @@ import org.apache.spark.sql.{DataFrame, SparkSession} * @param path input data path */ private[offline] class BatchDataLoader(ss: SparkSession, location: DataLocation, dataLoaderHandlers: List[DataLoaderHandler]) extends DataLoader { + val retryWaitTime = FeathrUtils.getFeathrJobParam(ss.sparkContext.getConf, FeathrUtils.DATA_LOAD_WAIT_IN_MS).toInt + val initialNumOfRetries = if (!ss.sparkContext.isLocal) FeathrUtils.getFeathrJobParam(ss.sparkContext.getConf, + FeathrUtils.MAX_DATA_LOAD_RETRY).toInt else SQLConf.get.getConf(LocalFeatureJoinJob.MAX_DATA_LOAD_RETRY) /** * get the schema of the source. It's only used in the deprecated DataSource.getDataSetAndSchema * @return an Avro Schema @@ -48,7 +55,9 @@ private[offline] class BatchDataLoader(ss: SparkSession, location: DataLocation, * @return an dataframe */ override def loadDataFrame(): DataFrame = { - loadDataFrame(Map(), new JobConf(ss.sparkContext.hadoopConfiguration)) + val retry = FeathrUtils.getFeathrJobParam(ss.sparkContext.getConf, FeathrUtils.MAX_DATA_LOAD_RETRY).toInt + val retryCount = if (ss.sparkContext.isLocal) SQLConf.get.getConf(LocalFeatureJoinJob.MAX_DATA_LOAD_RETRY) else retry + loadDataFrameWithRetry(Map(), new JobConf(ss.sparkContext.hadoopConfiguration), retryCount) } /** @@ -57,7 +66,7 @@ private[offline] class BatchDataLoader(ss: SparkSession, location: DataLocation, * @param jobConf Hadoop JobConf to be passed * @return an dataframe */ - def loadDataFrame(dataIOParameters: Map[String, String], jobConf: JobConf): DataFrame = { + def loadDataFrameWithRetry(dataIOParameters: Map[String, String], jobConf: JobConf, retry: Int): DataFrame = { val sparkConf = ss.sparkContext.getConf val inputSplitSize = sparkConf.get("spark.feathr.input.split.size", "") val dataIOParametersWithSplitSize = Map(SparkIOUtils.SPLIT_SIZE -> inputSplitSize) ++ dataIOParameters @@ -87,12 +96,17 @@ private[offline] class BatchDataLoader(ss: SparkSession, location: DataLocation, } df } catch { - case feathrException: FeathrInputDataException => - println(feathrException.toString) - throw feathrException // Throwing exception to avoid dataLoaderHandler hook exception from being swallowed. - case e: Throwable => //TODO: Analyze all thrown exceptions, instead of swalling them all, and reading as a csv - println(e.toString) - ss.read.format("csv").option("header", "true").option("delimiter", csvDelimiterOption).load(dataPath) + case _: Throwable => + // If data loading from source failed, retry it automatically, as it might due to data source still being written into. + log.info(s"Loading ${location} failed, retrying for ${retry}-th time..") + if (retry > 0) { + Thread.sleep(retryWaitTime) + loadDataFrameWithRetry(dataIOParameters, jobConf, retry - 1) + } else { + // Throwing exception to avoid dataLoaderHandler hook exception from being swallowed. + throw new FeathrInputDataException(ErrorLabel.FEATHR_USER_ERROR, s"Failed to load ${dataPath} after ${initialNumOfRetries} retries" + + s" and retry time of ${retryWaitTime}ms.") + } } } } \ No newline at end of file diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/util/FeathrUtils.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/util/FeathrUtils.scala index f4c62a6fc..92c9ccceb 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/util/FeathrUtils.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/util/FeathrUtils.scala @@ -39,6 +39,8 @@ private[offline] object FeathrUtils { val SPARK_JOIN_MAX_PARALLELISM = "max.parallelism" val CHECKPOINT_OUTPUT_PATH = "checkpoint.dir" val SPARK_JOIN_MIN_PARALLELISM = "min.parallelism" + val MAX_DATA_LOAD_RETRY = "max.data.load.retry" + val DATA_LOAD_WAIT_IN_MS = "data.load.wait.in.ms" val defaultParams: Map[String, String] = Map( ENABLE_DEBUG_OUTPUT -> "false", @@ -50,6 +52,8 @@ private[offline] object FeathrUtils { SEQ_JOIN_ARRAY_EXPLODE_ENABLED -> "true", ENABLE_SALTED_JOIN -> "false", SKIP_MISSING_FEATURE -> "false", + MAX_DATA_LOAD_RETRY-> "0", + DATA_LOAD_WAIT_IN_MS-> "1", // If one key appears more than 0.02% in the dataset, we will salt this join key and split them into multiple partitions // This is an empirical value SALTED_JOIN_FREQ_ITEM_THRESHOLD -> "0.0002", diff --git a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/source/dataloader/TestBatchDataLoader.scala b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/source/dataloader/TestBatchDataLoader.scala index 83e46c1d0..eef7da317 100644 --- a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/source/dataloader/TestBatchDataLoader.scala +++ b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/source/dataloader/TestBatchDataLoader.scala @@ -1,8 +1,11 @@ package com.linkedin.feathr.offline.source.dataloader +import com.linkedin.feathr.common.exception.{FeathrConfigException, FeathrInputDataException} import com.linkedin.feathr.offline.TestFeathr import com.linkedin.feathr.offline.config.location.SimplePath +import com.linkedin.feathr.offline.job.LocalFeatureJoinJob import org.apache.spark.sql.Row +import org.apache.spark.sql.internal.SQLConf import org.testng.Assert.assertEquals import org.testng.annotations.Test @@ -34,6 +37,20 @@ class TestBatchDataLoader extends TestFeathr { assertEquals(df.collect(), expectedRows) } + /** + * Test the batch loader retries before failing. + */ + @Test(expectedExceptions = Array(classOf[FeathrInputDataException]), + expectedExceptionsMessageRegExp = ".* after 3 retries and retry time of 1ms.*") + def testRetry(): Unit = { + SQLConf.get.setConf(LocalFeatureJoinJob.MAX_DATA_LOAD_RETRY, 3) + val path = "anchor11-source.csv" + val batchDataLoader = new BatchDataLoader(ss, location = SimplePath(path), List()) + val df = batchDataLoader.loadDataFrame() + df.show() + SQLConf.get.setConf(LocalFeatureJoinJob.MAX_DATA_LOAD_RETRY, 0) + } + @Test(description = "test loading dataframe with BatchDataLoader by specifying delimiter") def testBatchDataLoaderWithCsvDelimiterOption() : Unit = { val path = "anchor1-source.tsv" diff --git a/gradle.properties b/gradle.properties index ac0755a31..99df84ee6 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,3 +1,3 @@ -version=0.10.4-rc3 +version=0.10.4-rc4 SONATYPE_AUTOMATIC_RELEASE=true POM_ARTIFACT_ID=feathr_2.12 From c1da5019cfb6ca78cfebc519cdfcea1457fa6070 Mon Sep 17 00:00:00 2001 From: Rakesh Kashyap Hanasoge Padmanabha Date: Sat, 4 Feb 2023 11:49:45 -0800 Subject: [PATCH 2/8] Fix unit tests --- .../source/dataloader/BatchDataLoader.scala | 31 ++++++++++++------- .../offline/SlidingWindowAggIntegTest.scala | 2 +- 2 files changed, 21 insertions(+), 12 deletions(-) diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/dataloader/BatchDataLoader.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/dataloader/BatchDataLoader.scala index 6ee40d729..7dec21679 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/dataloader/BatchDataLoader.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/dataloader/BatchDataLoader.scala @@ -25,8 +25,10 @@ private[offline] class BatchDataLoader(ss: SparkSession, location: DataLocation, val initialNumOfRetries = if (!ss.sparkContext.isLocal) FeathrUtils.getFeathrJobParam(ss.sparkContext.getConf, FeathrUtils.MAX_DATA_LOAD_RETRY).toInt else SQLConf.get.getConf(LocalFeatureJoinJob.MAX_DATA_LOAD_RETRY) + /** * get the schema of the source. It's only used in the deprecated DataSource.getDataSetAndSchema + * * @return an Avro Schema */ override def loadSchema(): Schema = { @@ -52,6 +54,7 @@ private[offline] class BatchDataLoader(ss: SparkSession, location: DataLocation, /** * load the source data as dataframe. + * * @return an dataframe */ override def loadDataFrame(): DataFrame = { @@ -62,8 +65,9 @@ private[offline] class BatchDataLoader(ss: SparkSession, location: DataLocation, /** * load the source data as dataframe. + * * @param dataIOParameters extra parameters - * @param jobConf Hadoop JobConf to be passed + * @param jobConf Hadoop JobConf to be passed * @return an dataframe */ def loadDataFrameWithRetry(dataIOParameters: Map[String, String], jobConf: JobConf, retry: Int): DataFrame = { @@ -82,7 +86,7 @@ private[offline] class BatchDataLoader(ss: SparkSession, location: DataLocation, var dfOpt: Option[DataFrame] = None breakable { - for(dataLoaderHandler <- dataLoaderHandlers) { + for (dataLoaderHandler <- dataLoaderHandlers) { println(s"Applying dataLoaderHandler ${dataLoaderHandler}") if (dataLoaderHandler.validatePath(dataPath)) { dfOpt = Some(dataLoaderHandler.createDataFrame(dataPath, dataIOParametersWithSplitSize, jobConf)) @@ -97,15 +101,20 @@ private[offline] class BatchDataLoader(ss: SparkSession, location: DataLocation, df } catch { case _: Throwable => - // If data loading from source failed, retry it automatically, as it might due to data source still being written into. - log.info(s"Loading ${location} failed, retrying for ${retry}-th time..") - if (retry > 0) { - Thread.sleep(retryWaitTime) - loadDataFrameWithRetry(dataIOParameters, jobConf, retry - 1) - } else { - // Throwing exception to avoid dataLoaderHandler hook exception from being swallowed. - throw new FeathrInputDataException(ErrorLabel.FEATHR_USER_ERROR, s"Failed to load ${dataPath} after ${initialNumOfRetries} retries" + - s" and retry time of ${retryWaitTime}ms.") + try { + ss.read.format("csv").option("header", "true").option("delimiter", csvDelimiterOption).load(dataPath) + } catch { + case e: Exception => + // If data loading from source failed, retry it automatically, as it might due to data source still being written into. + log.info(s"Loading ${location} failed, retrying for ${retry}-th time..") + if (retry > 0) { + Thread.sleep(retryWaitTime) + loadDataFrameWithRetry(dataIOParameters, jobConf, retry - 1) + } else { + // Throwing exception to avoid dataLoaderHandler hook exception from being swallowed. + throw new FeathrInputDataException(ErrorLabel.FEATHR_USER_ERROR, s"Failed to load ${dataPath} after ${initialNumOfRetries} retries" + + s" and retry time of ${retryWaitTime}ms.") + } } } } diff --git a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/SlidingWindowAggIntegTest.scala b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/SlidingWindowAggIntegTest.scala index 8b43aea41..611215c4e 100644 --- a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/SlidingWindowAggIntegTest.scala +++ b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/SlidingWindowAggIntegTest.scala @@ -281,7 +281,7 @@ class SlidingWindowAggIntegTest extends FeathrIntegTest { """ |sources: { | swaSource: { - | location: { path: "slidingWindowAgg/localSWDefaultTest/daily" } + | location: { path: "slidingWindowAgg/localSWADefaultTest/daily" } | timePartitionPattern: "yyyy/MM/dd" | timeWindowParameters: { | timestampColumn: "timestamp" From 1ed66c4ee60e2d10b55abd9ccaa5ae12c71f29b1 Mon Sep 17 00:00:00 2001 From: Rakesh Kashyap Hanasoge Padmanabha Date: Sun, 5 Feb 2023 20:00:02 -0800 Subject: [PATCH 3/8] Fix test failures caused by SWA paths --- .../pathutil/TimeBasedHdfsPathAnalyzer.scala | 24 +++++++++++++------ .../offline/job/TestTimeBasedJoin.scala | 6 ++--- .../TestTimeBasedHdfsPathAnalyzer.scala | 8 ++----- 3 files changed, 22 insertions(+), 16 deletions(-) diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/pathutil/TimeBasedHdfsPathAnalyzer.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/pathutil/TimeBasedHdfsPathAnalyzer.scala index 96bee09d5..90ceecad4 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/pathutil/TimeBasedHdfsPathAnalyzer.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/pathutil/TimeBasedHdfsPathAnalyzer.scala @@ -10,6 +10,10 @@ import com.linkedin.feathr.offline.source.dataloader.DataLoaderHandler * @param pathChecker the path checker is used to check whether a file path exists. */ private[offline] class TimeBasedHdfsPathAnalyzer(pathChecker: PathChecker, dataLoaderHandlers: List[DataLoaderHandler]) { + val dailyFolder = "daily/" + val hourlyFolder = "hourly/" + val dailyPattern = "yyyy/MM/dd" + val hourlyPattern = "yyyy/MM/dd/HH" /** * check whether the given path is daily or hourly partitioned. @@ -22,10 +26,6 @@ private[offline] class TimeBasedHdfsPathAnalyzer(pathChecker: PathChecker, dataL * @return a PathInfo object to show how the data source is partitioned. */ def analyze(filePath: String): PathInfo = { - val dailyFolder = "daily/" - val hourlyFolder = "hourly/" - val dailyPattern = "yyyy/MM/dd" - val hourlyPattern = "yyyy/MM/dd/HH" val fileFolder = if (filePath.endsWith("/")) filePath else filePath + "/" var pathInfoOpt: Option[PathInfo] = None // Used to store the pathInfo of any file caught by data loader handlers. @@ -36,7 +36,7 @@ private[offline] class TimeBasedHdfsPathAnalyzer(pathChecker: PathChecker, dataL for(dataLoaderHandler <- dataLoaderHandlers) { if (dataLoaderHandler.validatePath(fileFolder)) { pathInfoOpt = Some(PathInfo(fileFolder, DateTimeResolution.DAILY, dailyPattern)) - } + } } } } @@ -54,7 +54,7 @@ private[offline] class TimeBasedHdfsPathAnalyzer(pathChecker: PathChecker, dataL PathInfo(fileFolder + hourlyFolder, DateTimeResolution.HOURLY, hourlyPattern) } else { // Daily data can be Orc/Hive data following in HomeDir/datepartition=yyyy-MM-dd-00 - PathInfo(fileFolder + "datepartition=", DateTimeResolution.DAILY, "yyyy-MM-dd-00") + PathInfo(fileFolder, DateTimeResolution.DAILY, dailyPattern) } } @@ -70,7 +70,17 @@ private[offline] class TimeBasedHdfsPathAnalyzer(pathChecker: PathChecker, dataL def analyze(filePath: String, timePartitionPattern: String): PathInfo = { val basePath = if (filePath.endsWith("/") || filePath.endsWith("=")) filePath else filePath + "/" val dateTimeResolution = OfflineDateTimeUtils.getDateTimeResolutionFromPattern(timePartitionPattern) - PathInfo(basePath, dateTimeResolution, timePartitionPattern) + if (basePath.endsWith(dailyFolder)) { + PathInfo(basePath, dateTimeResolution, timePartitionPattern) + } else if (basePath.endsWith(hourlyFolder)) { + PathInfo(basePath, dateTimeResolution, timePartitionPattern) + } else if (pathChecker.exists(basePath + dailyFolder)) { + PathInfo(basePath + dailyFolder, dateTimeResolution, timePartitionPattern) + } else if (pathChecker.exists(basePath + hourlyFolder)) { + PathInfo(basePath + hourlyFolder, dateTimeResolution, timePartitionPattern) + } else { + PathInfo(filePath, dateTimeResolution, timePartitionPattern) + } } } diff --git a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/job/TestTimeBasedJoin.scala b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/job/TestTimeBasedJoin.scala index 78552ce32..a6d5f6934 100644 --- a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/job/TestTimeBasedJoin.scala +++ b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/job/TestTimeBasedJoin.scala @@ -20,7 +20,7 @@ class TestTimeBasedJoin extends TestFeathr { def testStartEndDatePath(): Unit = { val inputData1 = InputData("/test/path/", SourceFormatType.TIME_PATH, startDate = Some("20170908"), endDate = Some("20170910")) val pathList11 = getPathList(inputData1.sourceType, inputData1.inputPath, ss, inputData1.dateParam, dataLoaderHandlers=List()) - val actualPathList = List("/test/path/datepartition=2017-09-08-00", "/test/path/datepartition=2017-09-09-00", "/test/path/datepartition=2017-09-10-00") + val actualPathList = List("/test/path/2017/09/08", "/test/path/2017/09/09", "/test/path/2017/09/10") assert(pathList11 == actualPathList) } @@ -75,7 +75,7 @@ class TestTimeBasedJoin extends TestFeathr { val featureDataRelative2 = InputData("FEATHR_TEST_ONLY/path/", SourceFormatType.TIME_PATH, startDate = Some("20180227"), endDate = Some("20180227")) - val pathListRelative = + val pathListRelative = getPathList(featureDataRelative.sourceType, featureDataRelative.inputPath, ss, featureDataRelative.dateParam, targetDate=Some(observationStartDate), dataLoaderHandlers=List()) val pathListRelative2 = getPathList(featureDataRelative2.sourceType, featureDataRelative2.inputPath, ss, featureDataRelative2.dateParam,dataLoaderHandlers=List()) @@ -86,7 +86,7 @@ class TestTimeBasedJoin extends TestFeathr { val startDate = "20180225" val endDate = "20180225" val featureDataSpecific = InputData("FEATHR_TEST_ONLY/path/", SourceFormatType.TIME_PATH, startDate = Some(startDate), endDate = Some(endDate)) - val pathListSpecific = + val pathListSpecific = getPathList(featureDataSpecific.sourceType, featureDataSpecific.inputPath, ss, featureDataSpecific.dateParam, targetDate=Some(observationStartDate), dataLoaderHandlers=List()) val pathListSpecific2 = getPathList(featureDataSpecific.sourceType, featureDataSpecific.inputPath, ss, featureDataSpecific.dateParam, dataLoaderHandlers=List()) diff --git a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/source/pathutil/TestTimeBasedHdfsPathAnalyzer.scala b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/source/pathutil/TestTimeBasedHdfsPathAnalyzer.scala index ca380eb0c..6412a044d 100644 --- a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/source/pathutil/TestTimeBasedHdfsPathAnalyzer.scala +++ b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/source/pathutil/TestTimeBasedHdfsPathAnalyzer.scala @@ -62,11 +62,8 @@ class TestTimeBasedHdfsPathAnalyzer extends TestFeathr with MockitoSugar { val mockPathChecker = mock[PathChecker] val pathAnalyzer = new TimeBasedHdfsPathAnalyzer(mockPathChecker, List()) assertEquals( - pathAnalyzer.analyze("src/test/resources/datePartitionSource"), - PathInfo("src/test/resources/datePartitionSource/datepartition=", DateTimeResolution.DAILY, "yyyy-MM-dd-00")) - verify(mockPathChecker).exists("src/test/resources/datePartitionSource/daily/") - verify(mockPathChecker).exists("src/test/resources/datePartitionSource/hourly/") - verifyNoMoreInteractions(mockPathChecker) + pathAnalyzer.analyze("dalids://src/test/resources/datePartitionSource"), + PathInfo("dalids://src/test/resources/datePartitionSource/", DateTimeResolution.DAILY, "yyyy/MM/dd")) } @Test(description = "test analyze with time partition pattern") @@ -85,6 +82,5 @@ class TestTimeBasedHdfsPathAnalyzer extends TestFeathr with MockitoSugar { assertEquals( pathAnalyzer.analyze("src/test/resources/generation/datepartition=", "yyyy-MM-dd-00"), PathInfo("src/test/resources/generation/datepartition=", DateTimeResolution.DAILY, "yyyy-MM-dd-00")) - verifyNoMoreInteractions(mockPathChecker) } } From 3aa9b1e24d33e78ab9074822f035032ab206046b Mon Sep 17 00:00:00 2001 From: Rakesh Kashyap Hanasoge Padmanabha Date: Sun, 5 Feb 2023 21:27:34 -0800 Subject: [PATCH 4/8] test run --- .../feathr/offline/generation/DataFrameFeatureGenerator.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/generation/DataFrameFeatureGenerator.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/generation/DataFrameFeatureGenerator.scala index ed5c48d5c..ed83432c5 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/generation/DataFrameFeatureGenerator.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/generation/DataFrameFeatureGenerator.scala @@ -68,6 +68,8 @@ private[offline] class DataFrameFeatureGenerator(logicalPlan: MultiStageJoinPlan val anchorDFRDDMap = anchorToDataFrameMapper.getAnchorDFMapForGen(ss, requiredRegularFeatureAnchorsWithTime, Some(incrementalAggContext), failOnMissingPartition) val updatedAnchorDFRDDMap = anchorDFRDDMap.filter(anchorEntry => anchorEntry._2.isDefined).map(anchorEntry => anchorEntry._1 -> anchorEntry._2.get) + + // It could happen that all features are skipped, then return empty result if(updatedAnchorDFRDDMap.isEmpty) return Map() // 3. Load user specified default values and feature types, if any. From 78217c4fc4c7ec41e937b575db1b78613e2acffb Mon Sep 17 00:00:00 2001 From: Rakesh Kashyap Hanasoge Padmanabha Date: Sun, 5 Feb 2023 22:32:05 -0800 Subject: [PATCH 5/8] add logs --- .../AnchorToDataSourceMapper.scala | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/transformation/AnchorToDataSourceMapper.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/transformation/AnchorToDataSourceMapper.scala index 2cf5de3e2..61928a381 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/transformation/AnchorToDataSourceMapper.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/transformation/AnchorToDataSourceMapper.scala @@ -1,5 +1,7 @@ package com.linkedin.feathr.offline.transformation +import com.linkedin.feathr.common.configObj.configbuilder.FeatureGenConfigBuilder + import java.time.Duration import com.linkedin.feathr.common.{DateParam, DateTimeResolution} import com.linkedin.feathr.offline.source.SourceFormatType._ @@ -15,6 +17,7 @@ import com.linkedin.feathr.offline.source.pathutil.{PathChecker, TimeBasedHdfsPa import com.linkedin.feathr.offline.swa.SlidingWindowFeatureUtils import com.linkedin.feathr.offline.util.{FeathrUtils, SourceUtils} import com.linkedin.feathr.offline.util.datetime.{DateTimeInterval, OfflineDateTimeUtils} +import org.apache.log4j.Logger import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.{DataFrame, SparkSession} @@ -22,6 +25,7 @@ import org.apache.spark.sql.{DataFrame, SparkSession} * The primary responsibility of this class is to Map the anchored features to its DataFrame. */ private[offline] class AnchorToDataSourceMapper(dataPathHandlers: List[DataPathHandler]) { + private val logger = Logger.getLogger(classOf[AnchorToDataSourceMapper]) /** * Get basic anchored feature to datasource mapping for feature join use case. @@ -136,7 +140,11 @@ private[offline] class AnchorToDataSourceMapper(dataPathHandlers: List[DataPathH } catch {// todo - Add this functionality to only specific exception types and not for all error types. case e: Exception => if (shouldSkipFeature || (ss.sparkContext.isLocal && - SQLConf.get.getConf(LocalFeatureJoinJob.SKIP_MISSING_FEATURE))) ss.emptyDataFrame else throw e + SQLConf.get.getConf(LocalFeatureJoinJob.SKIP_MISSING_FEATURE))) { + logger.warn(s"shouldSkipFeature is " + shouldSkipFeature + "spark is session " + ss.sparkContext.isLocal + "local skip feature is " + + SQLConf.get.getConf(LocalFeatureJoinJob.SKIP_MISSING_FEATURE)) + ss.emptyDataFrame + } else throw e } } @@ -193,7 +201,12 @@ private[offline] class AnchorToDataSourceMapper(dataPathHandlers: List[DataPathH dataPathHandlers = dataPathHandlers)) } catch { case e: Exception => if (shouldSkipFeature || (ss.sparkContext.isLocal && - SQLConf.get.getConf(LocalFeatureJoinJob.SKIP_MISSING_FEATURE))) None else throw e + SQLConf.get.getConf(LocalFeatureJoinJob.SKIP_MISSING_FEATURE))) + { + logger.warn(s"shouldSkipFeature is " + shouldSkipFeature + "spark is session " + ss.sparkContext.isLocal + "local skip feature is " + + SQLConf.get.getConf(LocalFeatureJoinJob.SKIP_MISSING_FEATURE)) + None + } else throw e } anchors.map(anchor => (anchor, timeSeriesSource)) From c9f856a83dd084108abeb29405f0f96f48fe28b5 Mon Sep 17 00:00:00 2001 From: Rakesh Kashyap Hanasoge Padmanabha Date: Sun, 5 Feb 2023 23:33:05 -0800 Subject: [PATCH 6/8] Fix test --- .../generation/DataFrameFeatureGenerator.scala | 15 ++++++++++++--- .../transformation/AnchorToDataSourceMapper.scala | 1 - 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/generation/DataFrameFeatureGenerator.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/generation/DataFrameFeatureGenerator.scala index ed83432c5..851f202eb 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/generation/DataFrameFeatureGenerator.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/generation/DataFrameFeatureGenerator.scala @@ -11,13 +11,14 @@ import com.linkedin.feathr.offline.derived.strategies.{DerivationStrategies, Row import com.linkedin.feathr.offline.derived.{DerivedFeature, DerivedFeatureEvaluator} import com.linkedin.feathr.offline.evaluator.DerivedFeatureGenStage import com.linkedin.feathr.offline.job.FeatureJoinJob.FeatureName -import com.linkedin.feathr.offline.job.{FeatureGenSpec, FeatureTransformation} +import com.linkedin.feathr.offline.job.{FeatureGenSpec, FeatureTransformation, LocalFeatureJoinJob} import com.linkedin.feathr.offline.logical.{FeatureGroups, MultiStageJoinPlan, MultiStageJoinPlanner} import com.linkedin.feathr.offline.mvel.plugins.FeathrExpressionExecutionContext import com.linkedin.feathr.offline.source.accessor.DataPathHandler import com.linkedin.feathr.offline.source.dataloader.DataLoaderHandler import com.linkedin.feathr.offline.transformation.AnchorToDataSourceMapper import com.linkedin.feathr.offline.util.{AnchorUtils, FeathrUtils} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.{DataFrame, SparkSession} /** @@ -76,6 +77,8 @@ private[offline] class DataFrameFeatureGenerator(logicalPlan: MultiStageJoinPlan val featureToDefaultValueMap = getDefaultValues(allRequiredFeatureAnchorWithSourceAndTime.values.toSeq) val featureToTypeConfigMap = getFeatureTypes(allRequiredFeatureAnchorWithSourceAndTime.values.toSeq) + val shouldSkipFeature = FeathrUtils.getFeathrJobParam(ss.sparkContext.getConf, FeathrUtils.SKIP_MISSING_FEATURE).toBoolean + // 4. Calculate anchored features val allStageFeatures = joinStages.flatMap { case (_: Seq[Int], featureNames: Seq[String]) => @@ -88,12 +91,18 @@ private[offline] class DataFrameFeatureGenerator(logicalPlan: MultiStageJoinPlan .map(f => (f._1, (offline.FeatureDataFrame(f._2.transformedResult.df, f._2.transformedResult.inferredFeatureTypes), f._2.joinKey))) }.toMap - // 5. Group features based on grouping specified in output processors - val updatedAllStageFeatures = allStageFeatures.filter(keyValue => !keyValue._2._1.df.isEmpty) + // Update features based on skip missing feature flag and empty dataframe + val updatedAllStageFeatures = if (shouldSkipFeature || (ss.sparkContext.isLocal && + SQLConf.get.getConf(LocalFeatureJoinJob.SKIP_MISSING_FEATURE))) { + allStageFeatures.filter(keyValue => !keyValue._2._1.df.isEmpty) + } else allStageFeatures + val (updatedFeatureGroups, updatedKeyTaggedFeatures) = FeatureGroupsUpdater().getUpdatedFeatureGroups(featureGroups, updatedAllStageFeatures, keyTaggedFeatures) val updatedLogicalPlan = MultiStageJoinPlanner().getLogicalPlan(updatedFeatureGroups, updatedKeyTaggedFeatures) + + // 5. Group features based on grouping specified in output processors val groupedAnchoredFeatures = featureGenFeatureGrouper.group(updatedAllStageFeatures, featureGenSpec.getOutputProcessorConfigs, updatedFeatureGroups.allDerivedFeatures) diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/transformation/AnchorToDataSourceMapper.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/transformation/AnchorToDataSourceMapper.scala index 61928a381..d99a3535c 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/transformation/AnchorToDataSourceMapper.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/transformation/AnchorToDataSourceMapper.scala @@ -208,7 +208,6 @@ private[offline] class AnchorToDataSourceMapper(dataPathHandlers: List[DataPathH None } else throw e } - anchors.map(anchor => (anchor, timeSeriesSource)) }) } From b6fa905289f9dbf7c55da35f352af89cdfdc0b72 Mon Sep 17 00:00:00 2001 From: Rakesh Kashyap Hanasoge Padmanabha Date: Mon, 6 Feb 2023 08:40:17 -0800 Subject: [PATCH 7/8] Remove debug logs --- .../offline/transformation/AnchorToDataSourceMapper.scala | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/transformation/AnchorToDataSourceMapper.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/transformation/AnchorToDataSourceMapper.scala index d99a3535c..5419093db 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/transformation/AnchorToDataSourceMapper.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/transformation/AnchorToDataSourceMapper.scala @@ -141,8 +141,6 @@ private[offline] class AnchorToDataSourceMapper(dataPathHandlers: List[DataPathH catch {// todo - Add this functionality to only specific exception types and not for all error types. case e: Exception => if (shouldSkipFeature || (ss.sparkContext.isLocal && SQLConf.get.getConf(LocalFeatureJoinJob.SKIP_MISSING_FEATURE))) { - logger.warn(s"shouldSkipFeature is " + shouldSkipFeature + "spark is session " + ss.sparkContext.isLocal + "local skip feature is " - + SQLConf.get.getConf(LocalFeatureJoinJob.SKIP_MISSING_FEATURE)) ss.emptyDataFrame } else throw e } @@ -201,10 +199,7 @@ private[offline] class AnchorToDataSourceMapper(dataPathHandlers: List[DataPathH dataPathHandlers = dataPathHandlers)) } catch { case e: Exception => if (shouldSkipFeature || (ss.sparkContext.isLocal && - SQLConf.get.getConf(LocalFeatureJoinJob.SKIP_MISSING_FEATURE))) - { - logger.warn(s"shouldSkipFeature is " + shouldSkipFeature + "spark is session " + ss.sparkContext.isLocal + "local skip feature is " - + SQLConf.get.getConf(LocalFeatureJoinJob.SKIP_MISSING_FEATURE)) + SQLConf.get.getConf(LocalFeatureJoinJob.SKIP_MISSING_FEATURE))) { None } else throw e } From 0462369eb3afcffdfb3b1bfb620b82ac851b99b2 Mon Sep 17 00:00:00 2001 From: Rakesh Kashyap Hanasoge Padmanabha Date: Mon, 6 Feb 2023 10:17:12 -0800 Subject: [PATCH 8/8] Add log --- .../offline/transformation/AnchorToDataSourceMapper.scala | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/transformation/AnchorToDataSourceMapper.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/transformation/AnchorToDataSourceMapper.scala index 5419093db..d99a3535c 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/transformation/AnchorToDataSourceMapper.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/transformation/AnchorToDataSourceMapper.scala @@ -141,6 +141,8 @@ private[offline] class AnchorToDataSourceMapper(dataPathHandlers: List[DataPathH catch {// todo - Add this functionality to only specific exception types and not for all error types. case e: Exception => if (shouldSkipFeature || (ss.sparkContext.isLocal && SQLConf.get.getConf(LocalFeatureJoinJob.SKIP_MISSING_FEATURE))) { + logger.warn(s"shouldSkipFeature is " + shouldSkipFeature + "spark is session " + ss.sparkContext.isLocal + "local skip feature is " + + SQLConf.get.getConf(LocalFeatureJoinJob.SKIP_MISSING_FEATURE)) ss.emptyDataFrame } else throw e } @@ -199,7 +201,10 @@ private[offline] class AnchorToDataSourceMapper(dataPathHandlers: List[DataPathH dataPathHandlers = dataPathHandlers)) } catch { case e: Exception => if (shouldSkipFeature || (ss.sparkContext.isLocal && - SQLConf.get.getConf(LocalFeatureJoinJob.SKIP_MISSING_FEATURE))) { + SQLConf.get.getConf(LocalFeatureJoinJob.SKIP_MISSING_FEATURE))) + { + logger.warn(s"shouldSkipFeature is " + shouldSkipFeature + "spark is session " + ss.sparkContext.isLocal + "local skip feature is " + + SQLConf.get.getConf(LocalFeatureJoinJob.SKIP_MISSING_FEATURE)) None } else throw e }