Skip to content

Commit 45e44af

Browse files
authored
add unity catalog suppot (#1230)
* add unity catalog suppot * fix closing bracket typo
1 parent 6847ef9 commit 45e44af

3 files changed

Lines changed: 14 additions & 3 deletions

File tree

feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/dataloader/BatchDataLoader.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,8 @@ private[offline] class BatchDataLoader(ss: SparkSession, location: DataLocation,
9797
}
9898
df
9999
} catch {
100-
case _: Throwable =>
100+
case e: Throwable =>
101+
e.printStackTrace() // This prints the stack trace of the Throwable
101102
try {
102103
new AvroJsonDataLoader(ss, dataPath + "/data.avro.json").loadDataFrame()
103104
} catch {
@@ -106,6 +107,7 @@ private[offline] class BatchDataLoader(ss: SparkSession, location: DataLocation,
106107
ss.read.format("csv").option("header", "true").option("delimiter", csvDelimiterOption).load(dataPath)
107108
} catch {
108109
case e: Exception =>
110+
e.printStackTrace()
109111
// If data loading from source failed, retry it automatically, as it might due to data source still being written into.
110112
log.info(s"Loading ${location} failed, retrying for ${retry}-th time..")
111113
if (retry > 0) {

feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/dataloader/hdfs/FileFormat.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ object FileFormat {
2626
// Snowflake type
2727
val SNOWFLAKE = "SNOWFLAKE"
2828

29+
val UNITY_CATALOG = "UNITY_CATALOG"
30+
2931
private val AVRO_DATASOURCE = "avro"
3032
// Use Spark native orc reader instead of hive-orc since Spark 2.3
3133
private val ORC_DATASOURCE = "orc"
@@ -47,6 +49,7 @@ object FileFormat {
4749
case p if p.endsWith(".avro") => AVRO
4850
case p if p.startsWith("jdbc:") => JDBC
4951
case p if p.startsWith("snowflake:") => SNOWFLAKE
52+
case p if p.startsWith("unity:") => UNITY_CATALOG
5053
case _ =>
5154
// if we cannot tell the file format from the file extensions, we should read from `spark.feathr.inputFormat` to get the format that's sepcified by user.
5255
if (ss.conf.get("spark.feathr.inputFormat","").nonEmpty) ss.conf.get("spark.feathr.inputFormat") else PATHLIST
@@ -85,6 +88,7 @@ object FileFormat {
8588
case p if p.endsWith(".avro") => AVRO
8689
case p if p.startsWith("jdbc:") => JDBC
8790
case p if p.startsWith("snowflake:") => SNOWFLAKE
91+
case p if p.startsWith("unity:") => UNITY_CATALOG
8892
case _ =>
8993
// if we cannot tell the file format from the file extensions, we should read from `spark.feathr.inputFormat` to get the format that's sepcified by user.
9094
dataIOParameters.getOrElse(DATA_FORMAT, ss.conf.get("spark.feathr.inputFormat", AVRO)).toUpperCase
@@ -96,7 +100,7 @@ object FileFormat {
96100
def loadHdfsDataFrame(format: String, existingHdfsPaths: Seq[String]): DataFrame = {
97101

98102
// Get csvDelimiterOption set with spark.feathr.inputFormat.csvOptions.sep and check if it is set properly (Only for CSV and TSV)
99-
val csvDelimiterOption = checkDelimiterOption(ss.sqlContext.getConf("spark.feathr.inputFormat.csvOptions.sep", ","))
103+
val csvDelimiterOption = checkDelimiterOption(ss.sqlContext.getConf("spark.feathr.inputFormat.csvOptions.sep", ","))
100104

101105
val df = format match {
102106
case CSV =>
@@ -112,6 +116,11 @@ object FileFormat {
112116
JdbcUtils.loadDataFrame(ss, existingHdfsPaths.head)
113117
case SNOWFLAKE =>
114118
SnowflakeUtils.loadDataFrame(ss, existingHdfsPaths.head)
119+
120+
case UNITY_CATALOG =>
121+
val pathHead = existingHdfsPaths.head
122+
val unityCatalogTable = pathHead.replaceFirst("unity:", "")
123+
ss.table(unityCatalogTable)
115124
case _ =>
116125
// Allow dynamic config of the file format if users want to use one
117126
if (ss.conf.getOption("spark.feathr.inputFormat").nonEmpty) ss.read.format(ss.conf.get("spark.feathr.inputFormat")).load(existingHdfsPaths: _*)

feathr-impl/src/main/scala/com/linkedin/feathr/offline/util/AclCheckUtils.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ private[offline] object AclCheckUtils {
3333
// Check read authorization on a path string
3434
def checkReadAuthorization(conf: Configuration, pathName: String): Try[Unit] = {
3535
// no way to check jdbc auth yet
36-
if (pathName.startsWith("jdbc:")) {
36+
if (pathName.startsWith("jdbc:") || pathName.startsWith("unity:")) {
3737
Success(())
3838
} else {
3939
val path = new Path(pathName)

0 commit comments

Comments
 (0)