diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/generation/DataFrameFeatureGenerator.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/generation/DataFrameFeatureGenerator.scala index ed5c48d5c..851f202eb 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/generation/DataFrameFeatureGenerator.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/generation/DataFrameFeatureGenerator.scala @@ -11,13 +11,14 @@ import com.linkedin.feathr.offline.derived.strategies.{DerivationStrategies, Row import com.linkedin.feathr.offline.derived.{DerivedFeature, DerivedFeatureEvaluator} import com.linkedin.feathr.offline.evaluator.DerivedFeatureGenStage import com.linkedin.feathr.offline.job.FeatureJoinJob.FeatureName -import com.linkedin.feathr.offline.job.{FeatureGenSpec, FeatureTransformation} +import com.linkedin.feathr.offline.job.{FeatureGenSpec, FeatureTransformation, LocalFeatureJoinJob} import com.linkedin.feathr.offline.logical.{FeatureGroups, MultiStageJoinPlan, MultiStageJoinPlanner} import com.linkedin.feathr.offline.mvel.plugins.FeathrExpressionExecutionContext import com.linkedin.feathr.offline.source.accessor.DataPathHandler import com.linkedin.feathr.offline.source.dataloader.DataLoaderHandler import com.linkedin.feathr.offline.transformation.AnchorToDataSourceMapper import com.linkedin.feathr.offline.util.{AnchorUtils, FeathrUtils} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.{DataFrame, SparkSession} /** @@ -68,12 +69,16 @@ private[offline] class DataFrameFeatureGenerator(logicalPlan: MultiStageJoinPlan val anchorDFRDDMap = anchorToDataFrameMapper.getAnchorDFMapForGen(ss, requiredRegularFeatureAnchorsWithTime, Some(incrementalAggContext), failOnMissingPartition) val updatedAnchorDFRDDMap = anchorDFRDDMap.filter(anchorEntry => anchorEntry._2.isDefined).map(anchorEntry => anchorEntry._1 -> anchorEntry._2.get) + + // It could happen that all features are skipped, then return empty result if(updatedAnchorDFRDDMap.isEmpty) return Map() // 3. Load user specified default values and feature types, if any. val featureToDefaultValueMap = getDefaultValues(allRequiredFeatureAnchorWithSourceAndTime.values.toSeq) val featureToTypeConfigMap = getFeatureTypes(allRequiredFeatureAnchorWithSourceAndTime.values.toSeq) + val shouldSkipFeature = FeathrUtils.getFeathrJobParam(ss.sparkContext.getConf, FeathrUtils.SKIP_MISSING_FEATURE).toBoolean + // 4. Calculate anchored features val allStageFeatures = joinStages.flatMap { case (_: Seq[Int], featureNames: Seq[String]) => @@ -86,12 +91,18 @@ private[offline] class DataFrameFeatureGenerator(logicalPlan: MultiStageJoinPlan .map(f => (f._1, (offline.FeatureDataFrame(f._2.transformedResult.df, f._2.transformedResult.inferredFeatureTypes), f._2.joinKey))) }.toMap - // 5. Group features based on grouping specified in output processors - val updatedAllStageFeatures = allStageFeatures.filter(keyValue => !keyValue._2._1.df.isEmpty) + // Update features based on skip missing feature flag and empty dataframe + val updatedAllStageFeatures = if (shouldSkipFeature || (ss.sparkContext.isLocal && + SQLConf.get.getConf(LocalFeatureJoinJob.SKIP_MISSING_FEATURE))) { + allStageFeatures.filter(keyValue => !keyValue._2._1.df.isEmpty) + } else allStageFeatures + val (updatedFeatureGroups, updatedKeyTaggedFeatures) = FeatureGroupsUpdater().getUpdatedFeatureGroups(featureGroups, updatedAllStageFeatures, keyTaggedFeatures) val updatedLogicalPlan = MultiStageJoinPlanner().getLogicalPlan(updatedFeatureGroups, updatedKeyTaggedFeatures) + + // 5. Group features based on grouping specified in output processors val groupedAnchoredFeatures = featureGenFeatureGrouper.group(updatedAllStageFeatures, featureGenSpec.getOutputProcessorConfigs, updatedFeatureGroups.allDerivedFeatures) diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/job/LocalFeatureJoinJob.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/job/LocalFeatureJoinJob.scala index c5599c5ff..ebb9902d8 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/job/LocalFeatureJoinJob.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/job/LocalFeatureJoinJob.scala @@ -23,10 +23,16 @@ object LocalFeatureJoinJob { val ss: SparkSession = createSparkSession(enableHiveSupport = true) val SKIP_MISSING_FEATURE = SQLConf .buildConf("spark.feathr.skip.missing.feature") - .doc("Whether to use the V2 implementation, which should have better performance.") + .doc("Whether to skip features if data is missing.") .booleanConf .createWithDefault(false) + val MAX_DATA_LOAD_RETRY = SQLConf + .buildConf("spark.feathr.max.data.load.retry") + .doc("Number of retries if data is missing.") + .intConf + .createWithDefault(0) + /** * local debug API, used in unit test and local debug * diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/dataloader/BatchDataLoader.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/dataloader/BatchDataLoader.scala index e3714217b..7dec21679 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/dataloader/BatchDataLoader.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/dataloader/BatchDataLoader.scala @@ -4,11 +4,15 @@ import com.linkedin.feathr.common.exception.{ErrorLabel, FeathrInputDataExceptio import com.linkedin.feathr.offline.config.location.DataLocation import com.linkedin.feathr.offline.generation.SparkIOUtils import com.linkedin.feathr.offline.job.DataSourceUtils.getSchemaFromAvroDataFile +import com.linkedin.feathr.offline.job.LocalFeatureJoinJob import com.linkedin.feathr.offline.source.dataloader.DataLoaderHandler import com.linkedin.feathr.offline.util.DelimiterUtils.checkDelimiterOption +import com.linkedin.feathr.offline.util.FeathrUtils +import com.linkedin.feathr.offline.util.FeathrUtils.DATA_LOAD_WAIT_IN_MS import org.apache.avro.Schema import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.mapred.JobConf +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.{DataFrame, SparkSession} /** @@ -17,9 +21,14 @@ import org.apache.spark.sql.{DataFrame, SparkSession} * @param path input data path */ private[offline] class BatchDataLoader(ss: SparkSession, location: DataLocation, dataLoaderHandlers: List[DataLoaderHandler]) extends DataLoader { + val retryWaitTime = FeathrUtils.getFeathrJobParam(ss.sparkContext.getConf, FeathrUtils.DATA_LOAD_WAIT_IN_MS).toInt + + val initialNumOfRetries = if (!ss.sparkContext.isLocal) FeathrUtils.getFeathrJobParam(ss.sparkContext.getConf, + FeathrUtils.MAX_DATA_LOAD_RETRY).toInt else SQLConf.get.getConf(LocalFeatureJoinJob.MAX_DATA_LOAD_RETRY) /** * get the schema of the source. It's only used in the deprecated DataSource.getDataSetAndSchema + * * @return an Avro Schema */ override def loadSchema(): Schema = { @@ -45,19 +54,23 @@ private[offline] class BatchDataLoader(ss: SparkSession, location: DataLocation, /** * load the source data as dataframe. + * * @return an dataframe */ override def loadDataFrame(): DataFrame = { - loadDataFrame(Map(), new JobConf(ss.sparkContext.hadoopConfiguration)) + val retry = FeathrUtils.getFeathrJobParam(ss.sparkContext.getConf, FeathrUtils.MAX_DATA_LOAD_RETRY).toInt + val retryCount = if (ss.sparkContext.isLocal) SQLConf.get.getConf(LocalFeatureJoinJob.MAX_DATA_LOAD_RETRY) else retry + loadDataFrameWithRetry(Map(), new JobConf(ss.sparkContext.hadoopConfiguration), retryCount) } /** * load the source data as dataframe. + * * @param dataIOParameters extra parameters - * @param jobConf Hadoop JobConf to be passed + * @param jobConf Hadoop JobConf to be passed * @return an dataframe */ - def loadDataFrame(dataIOParameters: Map[String, String], jobConf: JobConf): DataFrame = { + def loadDataFrameWithRetry(dataIOParameters: Map[String, String], jobConf: JobConf, retry: Int): DataFrame = { val sparkConf = ss.sparkContext.getConf val inputSplitSize = sparkConf.get("spark.feathr.input.split.size", "") val dataIOParametersWithSplitSize = Map(SparkIOUtils.SPLIT_SIZE -> inputSplitSize) ++ dataIOParameters @@ -73,7 +86,7 @@ private[offline] class BatchDataLoader(ss: SparkSession, location: DataLocation, var dfOpt: Option[DataFrame] = None breakable { - for(dataLoaderHandler <- dataLoaderHandlers) { + for (dataLoaderHandler <- dataLoaderHandlers) { println(s"Applying dataLoaderHandler ${dataLoaderHandler}") if (dataLoaderHandler.validatePath(dataPath)) { dfOpt = Some(dataLoaderHandler.createDataFrame(dataPath, dataIOParametersWithSplitSize, jobConf)) @@ -87,12 +100,22 @@ private[offline] class BatchDataLoader(ss: SparkSession, location: DataLocation, } df } catch { - case feathrException: FeathrInputDataException => - println(feathrException.toString) - throw feathrException // Throwing exception to avoid dataLoaderHandler hook exception from being swallowed. - case e: Throwable => //TODO: Analyze all thrown exceptions, instead of swalling them all, and reading as a csv - println(e.toString) - ss.read.format("csv").option("header", "true").option("delimiter", csvDelimiterOption).load(dataPath) + case _: Throwable => + try { + ss.read.format("csv").option("header", "true").option("delimiter", csvDelimiterOption).load(dataPath) + } catch { + case e: Exception => + // If data loading from source failed, retry it automatically, as it might due to data source still being written into. + log.info(s"Loading ${location} failed, retrying for ${retry}-th time..") + if (retry > 0) { + Thread.sleep(retryWaitTime) + loadDataFrameWithRetry(dataIOParameters, jobConf, retry - 1) + } else { + // Throwing exception to avoid dataLoaderHandler hook exception from being swallowed. + throw new FeathrInputDataException(ErrorLabel.FEATHR_USER_ERROR, s"Failed to load ${dataPath} after ${initialNumOfRetries} retries" + + s" and retry time of ${retryWaitTime}ms.") + } + } } } } \ No newline at end of file diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/pathutil/TimeBasedHdfsPathAnalyzer.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/pathutil/TimeBasedHdfsPathAnalyzer.scala index 96bee09d5..90ceecad4 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/pathutil/TimeBasedHdfsPathAnalyzer.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/pathutil/TimeBasedHdfsPathAnalyzer.scala @@ -10,6 +10,10 @@ import com.linkedin.feathr.offline.source.dataloader.DataLoaderHandler * @param pathChecker the path checker is used to check whether a file path exists. */ private[offline] class TimeBasedHdfsPathAnalyzer(pathChecker: PathChecker, dataLoaderHandlers: List[DataLoaderHandler]) { + val dailyFolder = "daily/" + val hourlyFolder = "hourly/" + val dailyPattern = "yyyy/MM/dd" + val hourlyPattern = "yyyy/MM/dd/HH" /** * check whether the given path is daily or hourly partitioned. @@ -22,10 +26,6 @@ private[offline] class TimeBasedHdfsPathAnalyzer(pathChecker: PathChecker, dataL * @return a PathInfo object to show how the data source is partitioned. */ def analyze(filePath: String): PathInfo = { - val dailyFolder = "daily/" - val hourlyFolder = "hourly/" - val dailyPattern = "yyyy/MM/dd" - val hourlyPattern = "yyyy/MM/dd/HH" val fileFolder = if (filePath.endsWith("/")) filePath else filePath + "/" var pathInfoOpt: Option[PathInfo] = None // Used to store the pathInfo of any file caught by data loader handlers. @@ -36,7 +36,7 @@ private[offline] class TimeBasedHdfsPathAnalyzer(pathChecker: PathChecker, dataL for(dataLoaderHandler <- dataLoaderHandlers) { if (dataLoaderHandler.validatePath(fileFolder)) { pathInfoOpt = Some(PathInfo(fileFolder, DateTimeResolution.DAILY, dailyPattern)) - } + } } } } @@ -54,7 +54,7 @@ private[offline] class TimeBasedHdfsPathAnalyzer(pathChecker: PathChecker, dataL PathInfo(fileFolder + hourlyFolder, DateTimeResolution.HOURLY, hourlyPattern) } else { // Daily data can be Orc/Hive data following in HomeDir/datepartition=yyyy-MM-dd-00 - PathInfo(fileFolder + "datepartition=", DateTimeResolution.DAILY, "yyyy-MM-dd-00") + PathInfo(fileFolder, DateTimeResolution.DAILY, dailyPattern) } } @@ -70,7 +70,17 @@ private[offline] class TimeBasedHdfsPathAnalyzer(pathChecker: PathChecker, dataL def analyze(filePath: String, timePartitionPattern: String): PathInfo = { val basePath = if (filePath.endsWith("/") || filePath.endsWith("=")) filePath else filePath + "/" val dateTimeResolution = OfflineDateTimeUtils.getDateTimeResolutionFromPattern(timePartitionPattern) - PathInfo(basePath, dateTimeResolution, timePartitionPattern) + if (basePath.endsWith(dailyFolder)) { + PathInfo(basePath, dateTimeResolution, timePartitionPattern) + } else if (basePath.endsWith(hourlyFolder)) { + PathInfo(basePath, dateTimeResolution, timePartitionPattern) + } else if (pathChecker.exists(basePath + dailyFolder)) { + PathInfo(basePath + dailyFolder, dateTimeResolution, timePartitionPattern) + } else if (pathChecker.exists(basePath + hourlyFolder)) { + PathInfo(basePath + hourlyFolder, dateTimeResolution, timePartitionPattern) + } else { + PathInfo(filePath, dateTimeResolution, timePartitionPattern) + } } } diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/transformation/AnchorToDataSourceMapper.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/transformation/AnchorToDataSourceMapper.scala index 2cf5de3e2..d99a3535c 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/transformation/AnchorToDataSourceMapper.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/transformation/AnchorToDataSourceMapper.scala @@ -1,5 +1,7 @@ package com.linkedin.feathr.offline.transformation +import com.linkedin.feathr.common.configObj.configbuilder.FeatureGenConfigBuilder + import java.time.Duration import com.linkedin.feathr.common.{DateParam, DateTimeResolution} import com.linkedin.feathr.offline.source.SourceFormatType._ @@ -15,6 +17,7 @@ import com.linkedin.feathr.offline.source.pathutil.{PathChecker, TimeBasedHdfsPa import com.linkedin.feathr.offline.swa.SlidingWindowFeatureUtils import com.linkedin.feathr.offline.util.{FeathrUtils, SourceUtils} import com.linkedin.feathr.offline.util.datetime.{DateTimeInterval, OfflineDateTimeUtils} +import org.apache.log4j.Logger import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.{DataFrame, SparkSession} @@ -22,6 +25,7 @@ import org.apache.spark.sql.{DataFrame, SparkSession} * The primary responsibility of this class is to Map the anchored features to its DataFrame. */ private[offline] class AnchorToDataSourceMapper(dataPathHandlers: List[DataPathHandler]) { + private val logger = Logger.getLogger(classOf[AnchorToDataSourceMapper]) /** * Get basic anchored feature to datasource mapping for feature join use case. @@ -136,7 +140,11 @@ private[offline] class AnchorToDataSourceMapper(dataPathHandlers: List[DataPathH } catch {// todo - Add this functionality to only specific exception types and not for all error types. case e: Exception => if (shouldSkipFeature || (ss.sparkContext.isLocal && - SQLConf.get.getConf(LocalFeatureJoinJob.SKIP_MISSING_FEATURE))) ss.emptyDataFrame else throw e + SQLConf.get.getConf(LocalFeatureJoinJob.SKIP_MISSING_FEATURE))) { + logger.warn(s"shouldSkipFeature is " + shouldSkipFeature + "spark is session " + ss.sparkContext.isLocal + "local skip feature is " + + SQLConf.get.getConf(LocalFeatureJoinJob.SKIP_MISSING_FEATURE)) + ss.emptyDataFrame + } else throw e } } @@ -193,9 +201,13 @@ private[offline] class AnchorToDataSourceMapper(dataPathHandlers: List[DataPathH dataPathHandlers = dataPathHandlers)) } catch { case e: Exception => if (shouldSkipFeature || (ss.sparkContext.isLocal && - SQLConf.get.getConf(LocalFeatureJoinJob.SKIP_MISSING_FEATURE))) None else throw e + SQLConf.get.getConf(LocalFeatureJoinJob.SKIP_MISSING_FEATURE))) + { + logger.warn(s"shouldSkipFeature is " + shouldSkipFeature + "spark is session " + ss.sparkContext.isLocal + "local skip feature is " + + SQLConf.get.getConf(LocalFeatureJoinJob.SKIP_MISSING_FEATURE)) + None + } else throw e } - anchors.map(anchor => (anchor, timeSeriesSource)) }) } diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/util/FeathrUtils.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/util/FeathrUtils.scala index f4c62a6fc..92c9ccceb 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/util/FeathrUtils.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/util/FeathrUtils.scala @@ -39,6 +39,8 @@ private[offline] object FeathrUtils { val SPARK_JOIN_MAX_PARALLELISM = "max.parallelism" val CHECKPOINT_OUTPUT_PATH = "checkpoint.dir" val SPARK_JOIN_MIN_PARALLELISM = "min.parallelism" + val MAX_DATA_LOAD_RETRY = "max.data.load.retry" + val DATA_LOAD_WAIT_IN_MS = "data.load.wait.in.ms" val defaultParams: Map[String, String] = Map( ENABLE_DEBUG_OUTPUT -> "false", @@ -50,6 +52,8 @@ private[offline] object FeathrUtils { SEQ_JOIN_ARRAY_EXPLODE_ENABLED -> "true", ENABLE_SALTED_JOIN -> "false", SKIP_MISSING_FEATURE -> "false", + MAX_DATA_LOAD_RETRY-> "0", + DATA_LOAD_WAIT_IN_MS-> "1", // If one key appears more than 0.02% in the dataset, we will salt this join key and split them into multiple partitions // This is an empirical value SALTED_JOIN_FREQ_ITEM_THRESHOLD -> "0.0002", diff --git a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/SlidingWindowAggIntegTest.scala b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/SlidingWindowAggIntegTest.scala index 8b43aea41..611215c4e 100644 --- a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/SlidingWindowAggIntegTest.scala +++ b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/SlidingWindowAggIntegTest.scala @@ -281,7 +281,7 @@ class SlidingWindowAggIntegTest extends FeathrIntegTest { """ |sources: { | swaSource: { - | location: { path: "slidingWindowAgg/localSWDefaultTest/daily" } + | location: { path: "slidingWindowAgg/localSWADefaultTest/daily" } | timePartitionPattern: "yyyy/MM/dd" | timeWindowParameters: { | timestampColumn: "timestamp" diff --git a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/job/TestTimeBasedJoin.scala b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/job/TestTimeBasedJoin.scala index 78552ce32..a6d5f6934 100644 --- a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/job/TestTimeBasedJoin.scala +++ b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/job/TestTimeBasedJoin.scala @@ -20,7 +20,7 @@ class TestTimeBasedJoin extends TestFeathr { def testStartEndDatePath(): Unit = { val inputData1 = InputData("/test/path/", SourceFormatType.TIME_PATH, startDate = Some("20170908"), endDate = Some("20170910")) val pathList11 = getPathList(inputData1.sourceType, inputData1.inputPath, ss, inputData1.dateParam, dataLoaderHandlers=List()) - val actualPathList = List("/test/path/datepartition=2017-09-08-00", "/test/path/datepartition=2017-09-09-00", "/test/path/datepartition=2017-09-10-00") + val actualPathList = List("/test/path/2017/09/08", "/test/path/2017/09/09", "/test/path/2017/09/10") assert(pathList11 == actualPathList) } @@ -75,7 +75,7 @@ class TestTimeBasedJoin extends TestFeathr { val featureDataRelative2 = InputData("FEATHR_TEST_ONLY/path/", SourceFormatType.TIME_PATH, startDate = Some("20180227"), endDate = Some("20180227")) - val pathListRelative = + val pathListRelative = getPathList(featureDataRelative.sourceType, featureDataRelative.inputPath, ss, featureDataRelative.dateParam, targetDate=Some(observationStartDate), dataLoaderHandlers=List()) val pathListRelative2 = getPathList(featureDataRelative2.sourceType, featureDataRelative2.inputPath, ss, featureDataRelative2.dateParam,dataLoaderHandlers=List()) @@ -86,7 +86,7 @@ class TestTimeBasedJoin extends TestFeathr { val startDate = "20180225" val endDate = "20180225" val featureDataSpecific = InputData("FEATHR_TEST_ONLY/path/", SourceFormatType.TIME_PATH, startDate = Some(startDate), endDate = Some(endDate)) - val pathListSpecific = + val pathListSpecific = getPathList(featureDataSpecific.sourceType, featureDataSpecific.inputPath, ss, featureDataSpecific.dateParam, targetDate=Some(observationStartDate), dataLoaderHandlers=List()) val pathListSpecific2 = getPathList(featureDataSpecific.sourceType, featureDataSpecific.inputPath, ss, featureDataSpecific.dateParam, dataLoaderHandlers=List()) diff --git a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/source/dataloader/TestBatchDataLoader.scala b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/source/dataloader/TestBatchDataLoader.scala index 83e46c1d0..eef7da317 100644 --- a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/source/dataloader/TestBatchDataLoader.scala +++ b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/source/dataloader/TestBatchDataLoader.scala @@ -1,8 +1,11 @@ package com.linkedin.feathr.offline.source.dataloader +import com.linkedin.feathr.common.exception.{FeathrConfigException, FeathrInputDataException} import com.linkedin.feathr.offline.TestFeathr import com.linkedin.feathr.offline.config.location.SimplePath +import com.linkedin.feathr.offline.job.LocalFeatureJoinJob import org.apache.spark.sql.Row +import org.apache.spark.sql.internal.SQLConf import org.testng.Assert.assertEquals import org.testng.annotations.Test @@ -34,6 +37,20 @@ class TestBatchDataLoader extends TestFeathr { assertEquals(df.collect(), expectedRows) } + /** + * Test the batch loader retries before failing. + */ + @Test(expectedExceptions = Array(classOf[FeathrInputDataException]), + expectedExceptionsMessageRegExp = ".* after 3 retries and retry time of 1ms.*") + def testRetry(): Unit = { + SQLConf.get.setConf(LocalFeatureJoinJob.MAX_DATA_LOAD_RETRY, 3) + val path = "anchor11-source.csv" + val batchDataLoader = new BatchDataLoader(ss, location = SimplePath(path), List()) + val df = batchDataLoader.loadDataFrame() + df.show() + SQLConf.get.setConf(LocalFeatureJoinJob.MAX_DATA_LOAD_RETRY, 0) + } + @Test(description = "test loading dataframe with BatchDataLoader by specifying delimiter") def testBatchDataLoaderWithCsvDelimiterOption() : Unit = { val path = "anchor1-source.tsv" diff --git a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/source/pathutil/TestTimeBasedHdfsPathAnalyzer.scala b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/source/pathutil/TestTimeBasedHdfsPathAnalyzer.scala index ca380eb0c..6412a044d 100644 --- a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/source/pathutil/TestTimeBasedHdfsPathAnalyzer.scala +++ b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/source/pathutil/TestTimeBasedHdfsPathAnalyzer.scala @@ -62,11 +62,8 @@ class TestTimeBasedHdfsPathAnalyzer extends TestFeathr with MockitoSugar { val mockPathChecker = mock[PathChecker] val pathAnalyzer = new TimeBasedHdfsPathAnalyzer(mockPathChecker, List()) assertEquals( - pathAnalyzer.analyze("src/test/resources/datePartitionSource"), - PathInfo("src/test/resources/datePartitionSource/datepartition=", DateTimeResolution.DAILY, "yyyy-MM-dd-00")) - verify(mockPathChecker).exists("src/test/resources/datePartitionSource/daily/") - verify(mockPathChecker).exists("src/test/resources/datePartitionSource/hourly/") - verifyNoMoreInteractions(mockPathChecker) + pathAnalyzer.analyze("dalids://src/test/resources/datePartitionSource"), + PathInfo("dalids://src/test/resources/datePartitionSource/", DateTimeResolution.DAILY, "yyyy/MM/dd")) } @Test(description = "test analyze with time partition pattern") @@ -85,6 +82,5 @@ class TestTimeBasedHdfsPathAnalyzer extends TestFeathr with MockitoSugar { assertEquals( pathAnalyzer.analyze("src/test/resources/generation/datepartition=", "yyyy-MM-dd-00"), PathInfo("src/test/resources/generation/datepartition=", DateTimeResolution.DAILY, "yyyy-MM-dd-00")) - verifyNoMoreInteractions(mockPathChecker) } } diff --git a/gradle.properties b/gradle.properties index ac0755a31..99df84ee6 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,3 +1,3 @@ -version=0.10.4-rc3 +version=0.10.4-rc4 SONATYPE_AUTOMATIC_RELEASE=true POM_ARTIFACT_ID=feathr_2.12