Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@ import com.linkedin.feathr.offline.derived.strategies.{DerivationStrategies, Row
import com.linkedin.feathr.offline.derived.{DerivedFeature, DerivedFeatureEvaluator}
import com.linkedin.feathr.offline.evaluator.DerivedFeatureGenStage
import com.linkedin.feathr.offline.job.FeatureJoinJob.FeatureName
import com.linkedin.feathr.offline.job.{FeatureGenSpec, FeatureTransformation}
import com.linkedin.feathr.offline.job.{FeatureGenSpec, FeatureTransformation, LocalFeatureJoinJob}
import com.linkedin.feathr.offline.logical.{FeatureGroups, MultiStageJoinPlan, MultiStageJoinPlanner}
import com.linkedin.feathr.offline.mvel.plugins.FeathrExpressionExecutionContext
import com.linkedin.feathr.offline.source.accessor.DataPathHandler
import com.linkedin.feathr.offline.source.dataloader.DataLoaderHandler
import com.linkedin.feathr.offline.transformation.AnchorToDataSourceMapper
import com.linkedin.feathr.offline.util.{AnchorUtils, FeathrUtils}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
Expand Down Expand Up @@ -68,12 +69,16 @@ private[offline] class DataFrameFeatureGenerator(logicalPlan: MultiStageJoinPlan
val anchorDFRDDMap = anchorToDataFrameMapper.getAnchorDFMapForGen(ss, requiredRegularFeatureAnchorsWithTime, Some(incrementalAggContext), failOnMissingPartition)

val updatedAnchorDFRDDMap = anchorDFRDDMap.filter(anchorEntry => anchorEntry._2.isDefined).map(anchorEntry => anchorEntry._1 -> anchorEntry._2.get)

// It could happen that all features are skipped, then return empty result
if(updatedAnchorDFRDDMap.isEmpty) return Map()

// 3. Load user specified default values and feature types, if any.
val featureToDefaultValueMap = getDefaultValues(allRequiredFeatureAnchorWithSourceAndTime.values.toSeq)
val featureToTypeConfigMap = getFeatureTypes(allRequiredFeatureAnchorWithSourceAndTime.values.toSeq)

val shouldSkipFeature = FeathrUtils.getFeathrJobParam(ss.sparkContext.getConf, FeathrUtils.SKIP_MISSING_FEATURE).toBoolean

// 4. Calculate anchored features
val allStageFeatures = joinStages.flatMap {
case (_: Seq[Int], featureNames: Seq[String]) =>
Expand All @@ -86,12 +91,18 @@ private[offline] class DataFrameFeatureGenerator(logicalPlan: MultiStageJoinPlan
.map(f => (f._1, (offline.FeatureDataFrame(f._2.transformedResult.df, f._2.transformedResult.inferredFeatureTypes), f._2.joinKey)))
}.toMap

// 5. Group features based on grouping specified in output processors
val updatedAllStageFeatures = allStageFeatures.filter(keyValue => !keyValue._2._1.df.isEmpty)
// Update features based on skip missing feature flag and empty dataframe
val updatedAllStageFeatures = if (shouldSkipFeature || (ss.sparkContext.isLocal &&
SQLConf.get.getConf(LocalFeatureJoinJob.SKIP_MISSING_FEATURE))) {
allStageFeatures.filter(keyValue => !keyValue._2._1.df.isEmpty)
} else allStageFeatures

val (updatedFeatureGroups, updatedKeyTaggedFeatures) = FeatureGroupsUpdater().getUpdatedFeatureGroups(featureGroups,
updatedAllStageFeatures, keyTaggedFeatures)

val updatedLogicalPlan = MultiStageJoinPlanner().getLogicalPlan(updatedFeatureGroups, updatedKeyTaggedFeatures)

// 5. Group features based on grouping specified in output processors
val groupedAnchoredFeatures = featureGenFeatureGrouper.group(updatedAllStageFeatures, featureGenSpec.getOutputProcessorConfigs,
updatedFeatureGroups.allDerivedFeatures)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,16 @@ object LocalFeatureJoinJob {
val ss: SparkSession = createSparkSession(enableHiveSupport = true)
val SKIP_MISSING_FEATURE = SQLConf
.buildConf("spark.feathr.skip.missing.feature")
.doc("Whether to use the V2 implementation, which should have better performance.")
.doc("Whether to skip features if data is missing.")
.booleanConf
.createWithDefault(false)

val MAX_DATA_LOAD_RETRY = SQLConf
.buildConf("spark.feathr.max.data.load.retry")
.doc("Number of retries if data is missing.")
.intConf
.createWithDefault(0)

/**
* local debug API, used in unit test and local debug
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,15 @@ import com.linkedin.feathr.common.exception.{ErrorLabel, FeathrInputDataExceptio
import com.linkedin.feathr.offline.config.location.DataLocation
import com.linkedin.feathr.offline.generation.SparkIOUtils
import com.linkedin.feathr.offline.job.DataSourceUtils.getSchemaFromAvroDataFile
import com.linkedin.feathr.offline.job.LocalFeatureJoinJob
import com.linkedin.feathr.offline.source.dataloader.DataLoaderHandler
import com.linkedin.feathr.offline.util.DelimiterUtils.checkDelimiterOption
import com.linkedin.feathr.offline.util.FeathrUtils
import com.linkedin.feathr.offline.util.FeathrUtils.DATA_LOAD_WAIT_IN_MS
import org.apache.avro.Schema
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
Expand All @@ -17,9 +21,14 @@ import org.apache.spark.sql.{DataFrame, SparkSession}
* @param path input data path
*/
private[offline] class BatchDataLoader(ss: SparkSession, location: DataLocation, dataLoaderHandlers: List[DataLoaderHandler]) extends DataLoader {
val retryWaitTime = FeathrUtils.getFeathrJobParam(ss.sparkContext.getConf, FeathrUtils.DATA_LOAD_WAIT_IN_MS).toInt

val initialNumOfRetries = if (!ss.sparkContext.isLocal) FeathrUtils.getFeathrJobParam(ss.sparkContext.getConf,
FeathrUtils.MAX_DATA_LOAD_RETRY).toInt else SQLConf.get.getConf(LocalFeatureJoinJob.MAX_DATA_LOAD_RETRY)

/**
* get the schema of the source. It's only used in the deprecated DataSource.getDataSetAndSchema
*
* @return an Avro Schema
*/
override def loadSchema(): Schema = {
Expand All @@ -45,19 +54,23 @@ private[offline] class BatchDataLoader(ss: SparkSession, location: DataLocation,

/**
* load the source data as dataframe.
*
* @return an dataframe
*/
override def loadDataFrame(): DataFrame = {
loadDataFrame(Map(), new JobConf(ss.sparkContext.hadoopConfiguration))
val retry = FeathrUtils.getFeathrJobParam(ss.sparkContext.getConf, FeathrUtils.MAX_DATA_LOAD_RETRY).toInt
val retryCount = if (ss.sparkContext.isLocal) SQLConf.get.getConf(LocalFeatureJoinJob.MAX_DATA_LOAD_RETRY) else retry
loadDataFrameWithRetry(Map(), new JobConf(ss.sparkContext.hadoopConfiguration), retryCount)
}

/**
* load the source data as dataframe.
*
* @param dataIOParameters extra parameters
* @param jobConf Hadoop JobConf to be passed
* @param jobConf Hadoop JobConf to be passed
* @return an dataframe
*/
def loadDataFrame(dataIOParameters: Map[String, String], jobConf: JobConf): DataFrame = {
def loadDataFrameWithRetry(dataIOParameters: Map[String, String], jobConf: JobConf, retry: Int): DataFrame = {
val sparkConf = ss.sparkContext.getConf
val inputSplitSize = sparkConf.get("spark.feathr.input.split.size", "")
val dataIOParametersWithSplitSize = Map(SparkIOUtils.SPLIT_SIZE -> inputSplitSize) ++ dataIOParameters
Expand All @@ -73,7 +86,7 @@ private[offline] class BatchDataLoader(ss: SparkSession, location: DataLocation,

var dfOpt: Option[DataFrame] = None
breakable {
for(dataLoaderHandler <- dataLoaderHandlers) {
for (dataLoaderHandler <- dataLoaderHandlers) {
println(s"Applying dataLoaderHandler ${dataLoaderHandler}")
if (dataLoaderHandler.validatePath(dataPath)) {
dfOpt = Some(dataLoaderHandler.createDataFrame(dataPath, dataIOParametersWithSplitSize, jobConf))
Expand All @@ -87,12 +100,22 @@ private[offline] class BatchDataLoader(ss: SparkSession, location: DataLocation,
}
df
} catch {
case feathrException: FeathrInputDataException =>
println(feathrException.toString)
throw feathrException // Throwing exception to avoid dataLoaderHandler hook exception from being swallowed.
case e: Throwable => //TODO: Analyze all thrown exceptions, instead of swalling them all, and reading as a csv
println(e.toString)
ss.read.format("csv").option("header", "true").option("delimiter", csvDelimiterOption).load(dataPath)
case _: Throwable =>
try {
ss.read.format("csv").option("header", "true").option("delimiter", csvDelimiterOption).load(dataPath)
} catch {
case e: Exception =>
// If data loading from source failed, retry it automatically, as it might due to data source still being written into.
log.info(s"Loading ${location} failed, retrying for ${retry}-th time..")
if (retry > 0) {
Thread.sleep(retryWaitTime)
loadDataFrameWithRetry(dataIOParameters, jobConf, retry - 1)
} else {
// Throwing exception to avoid dataLoaderHandler hook exception from being swallowed.
throw new FeathrInputDataException(ErrorLabel.FEATHR_USER_ERROR, s"Failed to load ${dataPath} after ${initialNumOfRetries} retries" +
s" and retry time of ${retryWaitTime}ms.")
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ import com.linkedin.feathr.offline.source.dataloader.DataLoaderHandler
* @param pathChecker the path checker is used to check whether a file path exists.
*/
private[offline] class TimeBasedHdfsPathAnalyzer(pathChecker: PathChecker, dataLoaderHandlers: List[DataLoaderHandler]) {
val dailyFolder = "daily/"
val hourlyFolder = "hourly/"
val dailyPattern = "yyyy/MM/dd"
val hourlyPattern = "yyyy/MM/dd/HH"

/**
* check whether the given path is daily or hourly partitioned.
Expand All @@ -22,10 +26,6 @@ private[offline] class TimeBasedHdfsPathAnalyzer(pathChecker: PathChecker, dataL
* @return a PathInfo object to show how the data source is partitioned.
*/
def analyze(filePath: String): PathInfo = {
val dailyFolder = "daily/"
val hourlyFolder = "hourly/"
val dailyPattern = "yyyy/MM/dd"
val hourlyPattern = "yyyy/MM/dd/HH"
val fileFolder = if (filePath.endsWith("/")) filePath else filePath + "/"

var pathInfoOpt: Option[PathInfo] = None // Used to store the pathInfo of any file caught by data loader handlers.
Expand All @@ -36,7 +36,7 @@ private[offline] class TimeBasedHdfsPathAnalyzer(pathChecker: PathChecker, dataL
for(dataLoaderHandler <- dataLoaderHandlers) {
if (dataLoaderHandler.validatePath(fileFolder)) {
pathInfoOpt = Some(PathInfo(fileFolder, DateTimeResolution.DAILY, dailyPattern))
}
}
}
}
}
Expand All @@ -54,7 +54,7 @@ private[offline] class TimeBasedHdfsPathAnalyzer(pathChecker: PathChecker, dataL
PathInfo(fileFolder + hourlyFolder, DateTimeResolution.HOURLY, hourlyPattern)
} else {
// Daily data can be Orc/Hive data following in HomeDir/datepartition=yyyy-MM-dd-00
PathInfo(fileFolder + "datepartition=", DateTimeResolution.DAILY, "yyyy-MM-dd-00")
PathInfo(fileFolder, DateTimeResolution.DAILY, dailyPattern)
}
}

Expand All @@ -70,7 +70,17 @@ private[offline] class TimeBasedHdfsPathAnalyzer(pathChecker: PathChecker, dataL
def analyze(filePath: String, timePartitionPattern: String): PathInfo = {
val basePath = if (filePath.endsWith("/") || filePath.endsWith("=")) filePath else filePath + "/"
val dateTimeResolution = OfflineDateTimeUtils.getDateTimeResolutionFromPattern(timePartitionPattern)
PathInfo(basePath, dateTimeResolution, timePartitionPattern)
if (basePath.endsWith(dailyFolder)) {
PathInfo(basePath, dateTimeResolution, timePartitionPattern)
} else if (basePath.endsWith(hourlyFolder)) {
PathInfo(basePath, dateTimeResolution, timePartitionPattern)
} else if (pathChecker.exists(basePath + dailyFolder)) {
PathInfo(basePath + dailyFolder, dateTimeResolution, timePartitionPattern)
} else if (pathChecker.exists(basePath + hourlyFolder)) {
PathInfo(basePath + hourlyFolder, dateTimeResolution, timePartitionPattern)
} else {
PathInfo(filePath, dateTimeResolution, timePartitionPattern)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.linkedin.feathr.offline.transformation

import com.linkedin.feathr.common.configObj.configbuilder.FeatureGenConfigBuilder

import java.time.Duration
import com.linkedin.feathr.common.{DateParam, DateTimeResolution}
import com.linkedin.feathr.offline.source.SourceFormatType._
Expand All @@ -15,13 +17,15 @@ import com.linkedin.feathr.offline.source.pathutil.{PathChecker, TimeBasedHdfsPa
import com.linkedin.feathr.offline.swa.SlidingWindowFeatureUtils
import com.linkedin.feathr.offline.util.{FeathrUtils, SourceUtils}
import com.linkedin.feathr.offline.util.datetime.{DateTimeInterval, OfflineDateTimeUtils}
import org.apache.log4j.Logger
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
* The primary responsibility of this class is to Map the anchored features to its DataFrame.
*/
private[offline] class AnchorToDataSourceMapper(dataPathHandlers: List[DataPathHandler]) {
private val logger = Logger.getLogger(classOf[AnchorToDataSourceMapper])

/**
* Get basic anchored feature to datasource mapping for feature join use case.
Expand Down Expand Up @@ -136,7 +140,11 @@ private[offline] class AnchorToDataSourceMapper(dataPathHandlers: List[DataPathH
}
catch {// todo - Add this functionality to only specific exception types and not for all error types.
case e: Exception => if (shouldSkipFeature || (ss.sparkContext.isLocal &&
SQLConf.get.getConf(LocalFeatureJoinJob.SKIP_MISSING_FEATURE))) ss.emptyDataFrame else throw e
SQLConf.get.getConf(LocalFeatureJoinJob.SKIP_MISSING_FEATURE))) {
logger.warn(s"shouldSkipFeature is " + shouldSkipFeature + "spark is session " + ss.sparkContext.isLocal + "local skip feature is "
+ SQLConf.get.getConf(LocalFeatureJoinJob.SKIP_MISSING_FEATURE))
ss.emptyDataFrame
} else throw e
}
}

Expand Down Expand Up @@ -193,9 +201,13 @@ private[offline] class AnchorToDataSourceMapper(dataPathHandlers: List[DataPathH
dataPathHandlers = dataPathHandlers))
} catch {
case e: Exception => if (shouldSkipFeature || (ss.sparkContext.isLocal &&
SQLConf.get.getConf(LocalFeatureJoinJob.SKIP_MISSING_FEATURE))) None else throw e
SQLConf.get.getConf(LocalFeatureJoinJob.SKIP_MISSING_FEATURE)))
{
logger.warn(s"shouldSkipFeature is " + shouldSkipFeature + "spark is session " + ss.sparkContext.isLocal + "local skip feature is "
+ SQLConf.get.getConf(LocalFeatureJoinJob.SKIP_MISSING_FEATURE))
None
} else throw e
}

anchors.map(anchor => (anchor, timeSeriesSource))
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ private[offline] object FeathrUtils {
val SPARK_JOIN_MAX_PARALLELISM = "max.parallelism"
val CHECKPOINT_OUTPUT_PATH = "checkpoint.dir"
val SPARK_JOIN_MIN_PARALLELISM = "min.parallelism"
val MAX_DATA_LOAD_RETRY = "max.data.load.retry"
val DATA_LOAD_WAIT_IN_MS = "data.load.wait.in.ms"

val defaultParams: Map[String, String] = Map(
ENABLE_DEBUG_OUTPUT -> "false",
Expand All @@ -50,6 +52,8 @@ private[offline] object FeathrUtils {
SEQ_JOIN_ARRAY_EXPLODE_ENABLED -> "true",
ENABLE_SALTED_JOIN -> "false",
SKIP_MISSING_FEATURE -> "false",
MAX_DATA_LOAD_RETRY-> "0",
DATA_LOAD_WAIT_IN_MS-> "1",
// If one key appears more than 0.02% in the dataset, we will salt this join key and split them into multiple partitions
// This is an empirical value
SALTED_JOIN_FREQ_ITEM_THRESHOLD -> "0.0002",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ class SlidingWindowAggIntegTest extends FeathrIntegTest {
"""
|sources: {
| swaSource: {
| location: { path: "slidingWindowAgg/localSWDefaultTest/daily" }
| location: { path: "slidingWindowAgg/localSWADefaultTest/daily" }
| timePartitionPattern: "yyyy/MM/dd"
| timeWindowParameters: {
| timestampColumn: "timestamp"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class TestTimeBasedJoin extends TestFeathr {
def testStartEndDatePath(): Unit = {
val inputData1 = InputData("/test/path/", SourceFormatType.TIME_PATH, startDate = Some("20170908"), endDate = Some("20170910"))
val pathList11 = getPathList(inputData1.sourceType, inputData1.inputPath, ss, inputData1.dateParam, dataLoaderHandlers=List())
val actualPathList = List("/test/path/datepartition=2017-09-08-00", "/test/path/datepartition=2017-09-09-00", "/test/path/datepartition=2017-09-10-00")
val actualPathList = List("/test/path/2017/09/08", "/test/path/2017/09/09", "/test/path/2017/09/10")
assert(pathList11 == actualPathList)
}

Expand Down Expand Up @@ -75,7 +75,7 @@ class TestTimeBasedJoin extends TestFeathr {

val featureDataRelative2 = InputData("FEATHR_TEST_ONLY/path/", SourceFormatType.TIME_PATH, startDate = Some("20180227"), endDate = Some("20180227"))

val pathListRelative =
val pathListRelative =
getPathList(featureDataRelative.sourceType, featureDataRelative.inputPath, ss, featureDataRelative.dateParam, targetDate=Some(observationStartDate), dataLoaderHandlers=List())
val pathListRelative2 = getPathList(featureDataRelative2.sourceType, featureDataRelative2.inputPath, ss, featureDataRelative2.dateParam,dataLoaderHandlers=List())

Expand All @@ -86,7 +86,7 @@ class TestTimeBasedJoin extends TestFeathr {
val startDate = "20180225"
val endDate = "20180225"
val featureDataSpecific = InputData("FEATHR_TEST_ONLY/path/", SourceFormatType.TIME_PATH, startDate = Some(startDate), endDate = Some(endDate))
val pathListSpecific =
val pathListSpecific =
getPathList(featureDataSpecific.sourceType, featureDataSpecific.inputPath, ss, featureDataSpecific.dateParam, targetDate=Some(observationStartDate), dataLoaderHandlers=List())
val pathListSpecific2 = getPathList(featureDataSpecific.sourceType, featureDataSpecific.inputPath, ss, featureDataSpecific.dateParam, dataLoaderHandlers=List())

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package com.linkedin.feathr.offline.source.dataloader

import com.linkedin.feathr.common.exception.{FeathrConfigException, FeathrInputDataException}
import com.linkedin.feathr.offline.TestFeathr
import com.linkedin.feathr.offline.config.location.SimplePath
import com.linkedin.feathr.offline.job.LocalFeatureJoinJob
import org.apache.spark.sql.Row
import org.apache.spark.sql.internal.SQLConf
import org.testng.Assert.assertEquals
import org.testng.annotations.Test

Expand Down Expand Up @@ -34,6 +37,20 @@ class TestBatchDataLoader extends TestFeathr {
assertEquals(df.collect(), expectedRows)
}

/**
* Test the batch loader retries before failing.
*/
@Test(expectedExceptions = Array(classOf[FeathrInputDataException]),
expectedExceptionsMessageRegExp = ".* after 3 retries and retry time of 1ms.*")
def testRetry(): Unit = {
SQLConf.get.setConf(LocalFeatureJoinJob.MAX_DATA_LOAD_RETRY, 3)
val path = "anchor11-source.csv"
val batchDataLoader = new BatchDataLoader(ss, location = SimplePath(path), List())
val df = batchDataLoader.loadDataFrame()
df.show()
SQLConf.get.setConf(LocalFeatureJoinJob.MAX_DATA_LOAD_RETRY, 0)
}

@Test(description = "test loading dataframe with BatchDataLoader by specifying delimiter")
def testBatchDataLoaderWithCsvDelimiterOption() : Unit = {
val path = "anchor1-source.tsv"
Expand Down
Loading