Skip to content

Commit 65702dc

Browse files
rakeshkashyap123Rakesh Kashyap Hanasoge Padmanabha
andauthored
Fix bug when SWA hdfs and local paths without data.avro.json extensio… (#1130)
* fix bug when SWA hdfs and local paths without data.avro.json extensions are included for evaluation * try * Fix tests * revert test file * Add tests * Add private classifier to variable * fix test * fix test --------- Co-authored-by: Rakesh Kashyap Hanasoge Padmanabha <rkashyap@rkashyap-mn3.linkedin.biz>
1 parent 04b0199 commit 65702dc

File tree

7 files changed

+90
-6
lines changed

7 files changed

+90
-6
lines changed

feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/pathutil/HdfsPathChecker.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,13 @@ private[offline] class HdfsPathChecker extends PathChecker {
99
override def isMock(path: String): Boolean = false
1010

1111
override def exists(path: String): Boolean = HdfsUtils.exists(path)
12+
13+
/**
14+
* Check whether the given path has any file content. If it is directory, then ensure it has atleast one file, else check
15+
* the file size is not empty.
16+
*
17+
* @param path input path
18+
* @return true if the path is non empty.
19+
*/
20+
override def nonEmpty(path: String): Boolean = HdfsUtils.nonEmpty(path)
1221
}

feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/pathutil/LocalPathChecker.scala

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,13 @@ import com.linkedin.feathr.offline.util.{HdfsUtils, LocalFeatureJoinUtils, Sourc
44
import com.linkedin.feathr.offline.source.dataloader.DataLoaderHandler
55
import org.apache.hadoop.conf.Configuration
66

7+
import java.io.File
8+
79
/**
810
* path checker for local test files.
911
* @param hadoopConf hadoop configuration
1012
*/
1113
private[offline] class LocalPathChecker(hadoopConf: Configuration, dataLoaderHandlers: List[DataLoaderHandler]) extends PathChecker {
12-
1314
private val TEST_AVRO_JSON_FILE = "/data.avro.json"
1415

1516
/**
@@ -35,7 +36,7 @@ private[offline] class LocalPathChecker(hadoopConf: Configuration, dataLoaderHan
3536
if (dataLoaderHandler.validatePath(path)) {
3637
isExternalDataSourceFlag = true
3738
break
38-
}
39+
}
3940
}
4041
}
4142
isExternalDataSourceFlag
@@ -53,4 +54,25 @@ private[offline] class LocalPathChecker(hadoopConf: Configuration, dataLoaderHan
5354
if (getClass.getClassLoader.getResource(path + TEST_AVRO_JSON_FILE) != null) return true
5455
false
5556
}
57+
58+
/**
59+
* Checks if the given path is non empty. If the path is a directory, check if it has files listed under it. Else, just check the
60+
* file length.
61+
* @param path input path
62+
* @return true if the path is non empty.
63+
*/
64+
override def nonEmpty(path: String): Boolean = {
65+
if (!isExternalDataSource(path) && HdfsUtils.nonEmpty(path)) return true
66+
val filePath = if (LocalFeatureJoinUtils.getMockPathIfExist(path, hadoopConf, None).isDefined) {
67+
new File(LocalFeatureJoinUtils.getMockPathIfExist(path, hadoopConf, None).get)
68+
} else {
69+
new File(getClass.getClassLoader.getResource(path).toURI)
70+
}
71+
72+
if (filePath.isDirectory) { // we do not need to check if the file have any length
73+
filePath.listFiles().length > 0
74+
} else {
75+
filePath.length() > 0
76+
}
77+
}
5678
}

feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/pathutil/PathChecker.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,14 @@ private[offline] trait PathChecker {
2020
* @return true if the path exists.
2121
*/
2222
def exists(path: String) : Boolean
23+
24+
/**
25+
* Check whether the given path has any file content. If it is directory, then ensure it has atleast one file, else check
26+
* the file size is not empty.
27+
* @param path input path
28+
* @return true if the path is non empty.
29+
*/
30+
def nonEmpty(path: String) : Boolean
2331
}
2432

2533
/**

feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/pathutil/TimeBasedHdfsPathGenerator.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import java.time.format.DateTimeFormatter
99
* @param pathChecker the path checker is used to check whether a file path exists.
1010
*/
1111
private[offline] class TimeBasedHdfsPathGenerator(pathChecker: PathChecker) {
12-
1312
/**
1413
* Helper function for generating file names for daily and hourly format data
1514
* Supported path format include:
@@ -35,7 +34,7 @@ private[offline] class TimeBasedHdfsPathGenerator(pathChecker: PathChecker) {
3534
.map(offset => pathInfo.basePath + formatter.format(factDataStartTime.plus(offset, chronUnit)) + postfixPath).distinct
3635

3736
if (ignoreMissingFiles) {
38-
filePaths.filter(pathChecker.exists)
37+
filePaths.filter(filePath => pathChecker.exists(filePath) && pathChecker.nonEmpty(filePath))
3938
} else {
4039
filePaths
4140
}

feathr-impl/src/main/scala/com/linkedin/feathr/offline/util/HdfsUtils.scala

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
package com.linkedin.feathr.offline.util
22

3+
import com.linkedin.feathr.offline.util.HdfsUtils.conf
34
import org.apache.hadoop.conf.Configuration
4-
import org.apache.hadoop.fs.{Path,LocatedFileStatus,FileSystem,PathFilter,RemoteIterator}
5+
import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path, PathFilter, RemoteIterator}
56
import org.apache.log4j.{Logger, PatternLayout, WriterAppender}
67

78
import java.io.{FileSystem => _, _}
89
import java.time.format.DateTimeFormatter
910
import java.time.temporal.ChronoUnit
1011
import java.time.{LocalDateTime, ZoneId, ZoneOffset}
11-
1212
import scala.annotation.tailrec
1313

1414

@@ -479,6 +479,25 @@ object HdfsUtils {
479479
}
480480
}
481481

482+
/**
483+
* For a given input path, check if it is non-empty. If the path is a directory, check if it has any files within the directory.
484+
* Otherwise, check if the file is non-empty.
485+
*
486+
* @param conf Hadoop Configuration
487+
* @param inputPath input path
488+
* @return true if nonEmpty
489+
*/
490+
def nonEmpty(inputPath: String, conf: Configuration = conf): Boolean = {
491+
val fs = FileSystem.get(conf)
492+
val path = new Path(inputPath)
493+
if (!exists(inputPath)) return false
494+
if (fs.getFileStatus(path).isDirectory) {
495+
fs.listStatus(path).length > 0
496+
} else {
497+
fs.getFileStatus(path).getLen > 0
498+
}
499+
}
500+
482501
/**
483502
* Creaete directories as needed
484503
*/

feathr-impl/src/test/scala/com/linkedin/feathr/offline/source/pathutil/TestPathChecker.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ class TestPathChecker extends TestFeathr with MockitoSugar {
3434
assertEquals(hdfsPathChecker.isMock("anyPath"), false)
3535
assertEquals(hdfsPathChecker.exists("src/test/resources/anchor1-source.csv"), true)
3636
assertEquals(hdfsPathChecker.exists("non_existing_path"), false)
37+
assertEquals(hdfsPathChecker.nonEmpty("src/test/resources/generation/daily/2019/05/19"), true)
38+
assertEquals(hdfsPathChecker.nonEmpty("src/test/resources/generation/daily/2019/05/18"), false)
39+
assertEquals(hdfsPathChecker.nonEmpty("src/test/resources/anchor1-source.csv"), true)
3740
}
3841

3942

@@ -44,6 +47,8 @@ class TestPathChecker extends TestFeathr with MockitoSugar {
4447
assertEquals(localPathChecker.exists("anchor1-source.csv"), true)
4548
assertEquals(localPathChecker.exists("generation/daily/2019/05/19"), true)
4649
assertEquals(localPathChecker.exists("non-existing_path"), false)
50+
assertEquals(localPathChecker.nonEmpty("generation/daily/2019/05/19"), true)
51+
assertEquals(localPathChecker.nonEmpty("anchor1-source.csv"), true)
4752
}
4853

4954
@Test(description = "test isExternalDataSource method for LocalPathChecker")

feathr-impl/src/test/scala/com/linkedin/feathr/offline/source/pathutil/TestTimeBasedHdfsPathGenerator.scala

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,11 +57,33 @@ class TestTimeBasedHdfsPathGenerator extends TestFeathr with MockitoSugar{
5757
val pathInfo = PathInfo("src/test/resources/generation/daily/", DateTimeResolution.DAILY, "yyyy/MM/dd")
5858
when(mockPathChecker.exists("src/test/resources/generation/daily/2019/05/18")).thenReturn(false)
5959
when(mockPathChecker.exists("src/test/resources/generation/daily/2019/05/19")).thenReturn(true)
60+
when(mockPathChecker.nonEmpty("src/test/resources/generation/daily/2019/05/19")).thenReturn(true)
6061
val interval = TestUtils.createDailyInterval("2019-05-18", "2019-05-20")
6162
val pathList = pathGenerator.generate(pathInfo, interval, true)
6263
assertEquals(pathList.toList, List("src/test/resources/generation/daily/2019/05/19"))
6364
verify(mockPathChecker).exists("src/test/resources/generation/daily/2019/05/18")
6465
verify(mockPathChecker).exists("src/test/resources/generation/daily/2019/05/19")
66+
verify(mockPathChecker).nonEmpty("src/test/resources/generation/daily/2019/05/19")
67+
verifyNoMoreInteractions(mockPathChecker)
68+
}
69+
70+
@Test(description = "test ignore missing files and empty folders")
71+
def testGenerateDailyFilesWithEmptyFolders(): Unit = {
72+
val mockPathChecker = mock[PathChecker]
73+
val pathGenerator = new TimeBasedHdfsPathGenerator(mockPathChecker)
74+
75+
val pathInfo = PathInfo("src/test/resources/generation/daily/", DateTimeResolution.DAILY, "yyyy/MM/dd")
76+
when(mockPathChecker.exists("src/test/resources/generation/daily/2019/05/18")).thenReturn(true)
77+
when(mockPathChecker.exists("src/test/resources/generation/daily/2019/05/19")).thenReturn(true)
78+
when(mockPathChecker.nonEmpty("src/test/resources/generation/daily/2019/05/18")).thenReturn(false)
79+
when(mockPathChecker.nonEmpty("src/test/resources/generation/daily/2019/05/19")).thenReturn(true)
80+
val interval = TestUtils.createDailyInterval("2019-05-18", "2019-05-20")
81+
val pathList = pathGenerator.generate(pathInfo, interval, true)
82+
assertEquals(pathList.toList, List("src/test/resources/generation/daily/2019/05/19"))
83+
verify(mockPathChecker).exists("src/test/resources/generation/daily/2019/05/18")
84+
verify(mockPathChecker).exists("src/test/resources/generation/daily/2019/05/19")
85+
verify(mockPathChecker).nonEmpty("src/test/resources/generation/daily/2019/05/19")
86+
verify(mockPathChecker).nonEmpty("src/test/resources/generation/daily/2019/05/18")
6587
verifyNoMoreInteractions(mockPathChecker)
6688
}
6789

0 commit comments

Comments
 (0)