Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/how-to-guides/feathr-job-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ Since Feathr uses Spark as the underlying execution engine, there's a way to ove
| ------------------------- | ------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------- |
| spark.feathr.inputFormat | None | Specify the input format if the file cannot be tell automatically. By default, Feathr will read files by parsing the file extension name; However the file/folder name doesn't have extension name, this configuration can be set to tell Feathr which format it should use to read the data. Currently can only be set for Spark built-in short names, including `json`, `parquet`, `jdbc`, `orc`, `libsvm`, `csv`, `text`. For more details, see ["Manually Specifying Options"](https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html#manually-specifying-options). Additionally, `delta` is also supported if users want to read delta lake. | 0.2.1 |
| spark.feathr.outputFormat | None | Specify the output format. "avro" is the default behavior if this value is not set. Currently can only be set for Spark built-in short names, including `json`, `parquet`, `jdbc`, `orc`, `libsvm`, `csv`, `text`. For more details, see ["Manually Specifying Options"](https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html#manually-specifying-options). Additionally, `delta` is also supported if users want to write delta lake. | 0.2.1 |
| spark.feathr.inputFormat.csvOptions.sep | None | Specify the delimiter. For example, "," for commas or "\t" for tabs. | TODO: Check version |
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,15 @@ private[offline] class BatchDataLoader(ss: SparkSession, location: InputLocation
loadDataFrame(Map(), new JobConf(ss.sparkContext.hadoopConfiguration))
}

/**
* Convert string to special characters
* @return a String
*/
def escape(raw: String): String = {
import scala.reflect.runtime.universe._
Literal(Constant(raw)).toString.replaceAll("\"", "")
}

/**
* load the source data as dataframe.
* @param dataIOParameters extra parameters
Expand All @@ -64,6 +73,14 @@ private[offline] class BatchDataLoader(ss: SparkSession, location: InputLocation
val dataPath = location.getPath

log.info(s"Loading ${location} as DataFrame, using parameters ${dataIOParametersWithSplitSize}")

// Get csvDelimiterOption set with spark.feathr.inputFormat.csvOptions.sep
val sqlContext = ss.sqlContext
// Get rawCsvDelimiterOption from spark.feathr.inputFormat.csvOptions.sep
val rawCsvDelimiterOption = sqlContext.getConf("spark.feathr.inputFormat.csvOptions.sep", ",")
// If rawCsvDelimiterOption is not properly set, defaults to "," as the delimiter else csvDelimiterOption
val csvDelimiterOption = if (escape(rawCsvDelimiterOption).trim.isEmpty) "," else rawCsvDelimiterOption

try {
import scala.util.control.Breaks._

Expand All @@ -88,7 +105,7 @@ private[offline] class BatchDataLoader(ss: SparkSession, location: InputLocation
throw feathrException // Throwing exception to avoid dataLoaderHandler hook exception from being swallowed.
case e: Throwable => //TODO: Analyze all thrown exceptions, instead of swalling them all, and reading as a csv
println(e.toString)
ss.read.format("csv").option("header", "true").load(dataPath)
ss.read.format("csv").option("header", "true").option("delimiter", csvDelimiterOption).load(dataPath)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,40 @@ private[offline] class CsvDataLoader(ss: SparkSession, path: String) extends Dat
doLoadCsvDocumentLikeAvro()._2
}

/**
* Convert string to special characters
* @return a String
*/
def escape(raw: String): String = {
import scala.reflect.runtime.universe._
Literal(Constant(raw)).toString.replaceAll("\"", "")
}


/**
* load the source data as dataframe.
* @return an dataframe
*/
override def loadDataFrame(): DataFrame = {

// Get csvDelimiterOption set with spark.feathr.inputFormat.csvOptions.sep
val sqlContext = ss.sqlContext
// Get rawCsvDelimiterOption from spark.feathr.inputFormat.csvOptions.sep
val rawCsvDelimiterOption = sqlContext.getConf("spark.feathr.inputFormat.csvOptions.sep", ",")
// If rawCsvDelimiterOption is not properly set, defaults to "," as the delimiter else csvDelimiterOption
val csvDelimiterOption = if (escape(rawCsvDelimiterOption).trim.isEmpty) "," else rawCsvDelimiterOption

try {
log.debug(s"Loading CSV path :${path}")
val absolutePath = new File(path).getPath
log.debug(s"Got absolute CSV path: ${absolutePath}, loading..")
ss.read.format("csv").option("header", "true").load(absolutePath)
ss.read.format("csv").option("header", "true").option("delimiter", csvDelimiterOption).load(absolutePath)
} catch {
case _: Throwable =>
log.debug(s"Loading CSV failed, retry with class loader..")
val absolutePath = getClass.getClassLoader.getResource(path).getPath
log.debug(s"Got absolution CSV path from class loader: ${absolutePath}, loading.. ")
ss.read.format("csv").option("header", "true").load(absolutePath)
ss.read.format("csv").option("header", "true").option("delimiter", csvDelimiterOption).load(absolutePath)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,15 @@ object FileFormat {

val DATA_FORMAT = "data.format"

/**
* Convert string to special characters
* @return a String
*/
def escape(raw: String): String = {
import scala.reflect.runtime.universe._
Literal(Constant(raw)).toString.replaceAll("\"", "")
}

/**
* To define if the file is JDBC, Single File or Path list (default)
* @param path
Expand All @@ -51,9 +60,17 @@ object FileFormat {

// TODO: Complete a general loadDataFrame and replace current adhoc load data frame code
def loadDataFrame(ss: SparkSession, path: String, format: String = CSV): DataFrame = {

// Get csvDelimiterOption set with spark.feathr.inputFormat.csvOptions.sep
val sqlContext = ss.sqlContext
// Get rawCsvDelimiterOption from spark.feathr.inputFormat.csvOptions.sep
val rawCsvDelimiterOption = sqlContext.getConf("spark.feathr.inputFormat.csvOptions.sep", ",")
// If rawCsvDelimiterOption is not properly set, defaults to "," as the delimiter else csvDelimiterOption
val csvDelimiterOption = if (escape(rawCsvDelimiterOption).trim.isEmpty) "," else rawCsvDelimiterOption

format match {
case AVRO => new AvroJsonDataLoader(ss, path).loadDataFrame()
case CSV => ss.read.format("csv").option("header", "true").load(path)
case CSV => ss.read.format("csv").option("header", "true").option("delimiter", csvDelimiterOption).load(path)
case PARQUET => new ParquetDataLoader(ss, path).loadDataFrame()
case _ => ???
}
Expand All @@ -69,23 +86,32 @@ object FileFormat {
val p = existingHdfsPaths.head.toLowerCase()
p match {
case p if p.endsWith(".csv") => CSV
case p if p.endsWith(".tsv") => CSV
case p if p.endsWith(".parquet") => PARQUET
case p if p.endsWith(".orc") => ORC
case p if p.endsWith(".avro.json") => AVRO_JSON
case p if p.endsWith(".avro") => AVRO
case p if p.startsWith("jdbc:") => JDBC
case _ =>
// if we cannot tell the file format from the file extensions, we should read from `spark.feathr.inputFormat` to get the format that's sepcified by user.
dataIOParameters.getOrElse(DATA_FORMAT, ss.conf.get("spark.feathr.inputFormat", AVRO)).toUpperCase
// if we cannot tell the file format from the file extensions, we should read from `spark.feathr.inputFormat` to get the format that's sepcified by user.
dataIOParameters.getOrElse(DATA_FORMAT, ss.conf.get("spark.feathr.inputFormat", AVRO)).toUpperCase
}


}

def loadHdfsDataFrame(format: String, existingHdfsPaths: Seq[String]): DataFrame = {

// Get csvDelimiterOption set with spark.feathr.inputFormat.csvOptions.sep
val sqlContext = ss.sqlContext
// Get rawCsvDelimiterOption from spark.feathr.inputFormat.csvOptions.sep
val rawCsvDelimiterOption = sqlContext.getConf("spark.feathr.inputFormat.csvOptions.sep", ",")
// If rawCsvDelimiterOption is not properly set, defaults to "," as the delimiter else csvDelimiterOption
val csvDelimiterOption = if (escape(rawCsvDelimiterOption).trim.isEmpty) "," else rawCsvDelimiterOption

val df = format match {
case CSV =>
ss.read.format("csv").option("header", "true").load(existingHdfsPaths: _*)
ss.read.format("csv").option("header", "true").option("delimiter", csvDelimiterOption).load(existingHdfsPaths: _*)
case AVRO =>
ss.read.format(AVRO_DATASOURCE).load(existingHdfsPaths: _*)
case ORC =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,15 @@ private[offline] object SourceUtils {
val FEATURE_MP_DEF_CONFIG_SUFFIX = ".conf"
val firstRecordName = "topLevelRecord"

/**
* Convert string to special characters
* @return a String
*/
def escape(raw: String): String = {
import scala.reflect.runtime.universe._
Literal(Constant(raw)).toString.replaceAll("\"", "")
}

/**
* get AVRO datum type of a dataset we should use to load,
* it is determined by the expect datatype from a set of anchor transformers
Expand Down Expand Up @@ -665,6 +674,14 @@ private[offline] object SourceUtils {
// TODO: Split isLocal case into Test Packages
val format = FileFormat.getType(inputData.inputPath)
log.info(s"loading ${inputData.inputPath} input Path as Format: ${format}")

// Get csvDelimiterOption set with spark.feathr.inputFormat.csvOptions.sep
val sqlContext = ss.sqlContext
// Get rawCsvDelimiterOption from spark.feathr.inputFormat.csvOptions.sep
val rawCsvDelimiterOption = sqlContext.getConf("spark.feathr.inputFormat.csvOptions.sep", ",")
// If rawCsvDelimiterOption is not properly set, defaults to "," as the delimiter else csvDelimiterOption
val csvDelimiterOption = if (escape(rawCsvDelimiterOption).trim.isEmpty) "," else rawCsvDelimiterOption

format match {
case FileFormat.PATHLIST => {
val pathList = getPathList(sourceFormatType=inputData.sourceType,
Expand All @@ -689,7 +706,7 @@ private[offline] object SourceUtils {
JdbcUtils.loadDataFrame(ss, inputData.inputPath)
}
case FileFormat.CSV => {
ss.read.format("csv").option("header", "true").load(inputData.inputPath)
ss.read.format("csv").option("header", "true").option("delimiter", csvDelimiterOption).load(inputData.inputPath)
}
case _ => {
if (ss.sparkContext.isLocal){
Expand Down
8 changes: 8 additions & 0 deletions src/test/resources/anchor1-source.tsv
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
mId alpha beta gamma omega
1 apple 10 10 0.1
2 orange 10 3 0.1
3 banana 10 2 0.9
4 apple 10 1 0.7
5 apple 11 11 1.0
7 banana 2 10 81.27
9 banana 4 4 0.4
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package com.linkedin.feathr.offline.source.dataloader

import com.linkedin.feathr.offline.TestFeathr
import com.linkedin.feathr.offline.config.location.SimplePath
import org.apache.spark.sql.Row
import org.testng.Assert.assertEquals
import org.testng.annotations.Test

/**
* unit tests for [[BatchDataLoader]]
*/
class TestBatchDataLoader extends TestFeathr {

def escape(raw: String): String = {
import scala.reflect.runtime.universe._
Literal(Constant(raw)).toString
}

@Test(description = "test loading dataframe with BatchDataLoader")
def testBatchDataLoader() : Unit = {
val path = "anchor1-source.csv"
val absolutePath = getClass.getClassLoader.getResource(path).getPath
val batchDataLoader = new BatchDataLoader(ss, SimplePath(absolutePath), List())
val df = batchDataLoader.loadDataFrame()
val expectedRows = Array(
Row("1", "apple", "10", "10", "0.1"),
Row("2", "orange", "10", "3", "0.1"),
Row("3", "banana", "10", "2", "0.9"),
Row("4", "apple", "10", "1", "0.7"),
Row("5", "apple", "11", "11", "1.0"),
Row("7", "banana", "2", "10", "81.27"),
Row("9", "banana", "4", "4", "0.4")
)
assertEquals(df.collect(), expectedRows)
}

@Test(description = "test loading dataframe with BatchDataLoader by specifying delimiter")
def testBatchDataLoaderWithCsvDelimiterOption() : Unit = {
val path = "anchor1-source.tsv"
val absolutePath = getClass.getClassLoader.getResource(path).getPath
val sqlContext = ss.sqlContext
sqlContext.setConf("spark.feathr.inputFormat.csvOptions.sep", "\t")
println(s"Postset config testBatchDataLoaderWithCsvDelimiterOption: ${ss.sqlContext.getConf("spark.feathr.inputFormat.csvOptions.sep")}")
val batchDataLoader = new BatchDataLoader(ss, SimplePath(absolutePath), List())
val df = batchDataLoader.loadDataFrame()
val expectedRows = Array(
Row("1", "apple", "10", "10", "0.1"),
Row("2", "orange", "10", "3", "0.1"),
Row("3", "banana", "10", "2", "0.9"),
Row("4", "apple", "10", "1", "0.7"),
Row("5", "apple", "11", "11", "1.0"),
Row("7", "banana", "2", "10", "81.27"),
Row("9", "banana", "4", "4", "0.4")
)
assertEquals(df.collect(), expectedRows)
sqlContext.setConf("spark.feathr.inputFormat.csvOptions.sep", "")
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ class TestCsvDataLoader extends TestFeathr {
assertEquals(df.collect(), expectedRows)
}


@Test(description = "test loading Avro schema with CsvDataLoader")
def testLoadSchema() : Unit = {
val dataLoader = new CsvDataLoader(ss, "anchor1-source.csv")
Expand All @@ -46,4 +45,24 @@ class TestCsvDataLoader extends TestFeathr {
val expectedSchema = Schema.createRecord(expectedFields)
assertEquals(schema.getFields, expectedSchema.getFields)
}

@Test(description = "test loading dataframe with CsvDataLoader by specifying delimiter")
def testLoadDataFrameWithCsvDelimiterOption() : Unit = {
val sqlContext = ss.sqlContext
sqlContext.setConf("spark.feathr.inputFormat.csvOptions.sep", "\t")
val dataLoader = new CsvDataLoader(ss, "anchor1-source.tsv")

val df = dataLoader.loadDataFrame()
val expectedRows = Array(
Row("1", "apple", "10", "10", "0.1"),
Row("2", "orange", "10", "3", "0.1"),
Row("3", "banana", "10", "2", "0.9"),
Row("4", "apple", "10", "1", "0.7"),
Row("5", "apple", "11", "11", "1.0"),
Row("7", "banana", "2", "10", "81.27"),
Row("9", "banana", "4", "4", "0.4")
)
assertEquals(df.collect(), expectedRows)
sqlContext.setConf("spark.feathr.inputFormat.csvOptions.sep", "")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ class TestDataLoaderFactory extends TestFeathr {
val localDataLoaderFactory = new LocalDataLoaderFactory(ss, dataLoaderHandlers=List())
val csvLoader = localDataLoaderFactory.create("anchor1-source.csv")
assertTrue(csvLoader.isInstanceOf[CsvDataLoader])
val sqlContext = ss.sqlContext
sqlContext.setConf("spark.feathr.inputFormat.csvOptions.sep", "\t")
val csvLoaderWithDelimiter = localDataLoaderFactory.create("anchor1-source.tsv")
assertTrue(csvLoaderWithDelimiter.isInstanceOf[BatchDataLoader])
sqlContext.setConf("spark.feathr.inputFormat.csvOptions.sep", "")
val avroJsonLoader = localDataLoaderFactory.create("anchor5-source.avro.json")
assertTrue(avroJsonLoader.isInstanceOf[AvroJsonDataLoader])
val jsonWithSchemaLoader = localDataLoaderFactory.create("simple-obs2") // the mock data folder exists.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package com.linkedin.feathr.offline.source.dataloader.hdfs

import com.linkedin.feathr.offline.TestFeathr
import org.apache.spark.sql.{Row, SparkSession}
import org.testng.Assert.assertEquals
import org.testng.annotations.Test

/**
* unit tests for [[FileFormat]]
*/
class TestFileFormat extends TestFeathr {

@Test(description = "test loading dataframe with FileFormat")
def testLoadDataFrame() : Unit = {
val path = "anchor1-source.csv"
val absolutePath = getClass.getClassLoader.getResource(path).getPath
val df = FileFormat.loadDataFrame(ss, absolutePath, "CSV")

val expectedRows = Array(
Row("1", "apple", "10", "10", "0.1"),
Row("2", "orange", "10", "3", "0.1"),
Row("3", "banana", "10", "2", "0.9"),
Row("4", "apple", "10", "1", "0.7"),
Row("5", "apple", "11", "11", "1.0"),
Row("7", "banana", "2", "10", "81.27"),
Row("9", "banana", "4", "4", "0.4")
)
assertEquals(df.collect(), expectedRows)
}

@Test(description = "test loading dataframe with FileFormat by specifying delimiter")
def testLoadDataFrameWithCsvDelimiterOption() : Unit = {
val path = "anchor1-source.tsv"
val absolutePath = getClass.getClassLoader.getResource(path).getPath
val sqlContext = ss.sqlContext
sqlContext.setConf("spark.feathr.inputFormat.csvOptions.sep", "\t")
val df = FileFormat.loadDataFrame(ss, absolutePath, "CSV")

val expectedRows = Array(
Row("1", "apple", "10", "10", "0.1"),
Row("2", "orange", "10", "3", "0.1"),
Row("3", "banana", "10", "2", "0.9"),
Row("4", "apple", "10", "1", "0.7"),
Row("5", "apple", "11", "11", "1.0"),
Row("7", "banana", "2", "10", "81.27"),
Row("9", "banana", "4", "4", "0.4")
)
assertEquals(df.collect(), expectedRows)
sqlContext.setConf("spark.feathr.inputFormat.csvOptions.sep", "")
}

@Test(description = "test loading dataframe with FileFormat by specifying delimiter for HDFS")
def testLoadDataFrameWithCsvDelimiterOptionHDFS() : Unit = {
val path = "anchor1-source.tsv"
val absolutePath = getClass.getClassLoader.getResource(path).getPath
val sqlContext = ss.sqlContext
sqlContext.setConf("spark.feathr.inputFormat.csvOptions.sep", "\t")
val df = FileFormat.loadHdfsDataFrame("CSV", Seq(absolutePath))

val expectedRows = Array(
Row("1", "apple", "10", "10", "0.1"),
Row("2", "orange", "10", "3", "0.1"),
Row("3", "banana", "10", "2", "0.9"),
Row("4", "apple", "10", "1", "0.7"),
Row("5", "apple", "11", "11", "1.0"),
Row("7", "banana", "2", "10", "81.27"),
Row("9", "banana", "4", "4", "0.4")
)
assertEquals(df.collect(), expectedRows)
sqlContext.setConf("spark.feathr.inputFormat.csvOptions.sep", "")
}

}
Loading