diff --git a/pom.xml b/pom.xml
index 0abd9a5..288ef93 100644
--- a/pom.xml
+++ b/pom.xml
@@ -20,17 +20,17 @@
io.cdap.plugin
multi-table-plugins
- 1.4.0-SNAPSHOT
+ 1.4.2
jar
Multiple Table Plugins
UTF-8
- 6.8.0-SNAPSHOT
+ 6.8.0
2.10.2
2.2.4
- 2.11.0-SNAPSHOT
+ 2.11.0
widgets
docs
@@ -43,23 +43,12 @@
-
- sonatype
- https://oss.sonatype.org/content/groups/public
-
sonatype-snapshots
- https://oss.sonatype.org/content/repositories/snapshots
+ https://central.sonatype.com/repository/maven-snapshots
-
-
- sonatype
- https://oss.sonatype.org/content/groups/public
-
-
-
io.cdap.cdap
@@ -73,7 +62,7 @@
io.cdap.cdap
- cdap-data-pipeline2_2.11
+ cdap-data-pipeline3_2.12
${cdap.version}
test
diff --git a/src/main/java/io/cdap/plugin/MultiTableDBSource.java b/src/main/java/io/cdap/plugin/MultiTableDBSource.java
index af64ad7..9f0ac4b 100644
--- a/src/main/java/io/cdap/plugin/MultiTableDBSource.java
+++ b/src/main/java/io/cdap/plugin/MultiTableDBSource.java
@@ -32,6 +32,8 @@
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
import io.cdap.plugin.common.Asset;
import io.cdap.plugin.common.LineageRecorder;
+import io.cdap.plugin.common.ReferenceBatchSource;
+import io.cdap.plugin.common.ReferencePluginConfig;
import io.cdap.plugin.common.SourceInputFormatProvider;
import io.cdap.plugin.format.DBTableInfo;
import io.cdap.plugin.format.MultiSQLStatementInputFormat;
@@ -63,7 +65,7 @@
@Description("Reads from multiple tables in a relational database. " +
"Outputs one record for each row in each table, with the table name as a record field. " +
"Also sets a pipeline argument for each table read, which contains the table schema. ")
-public class MultiTableDBSource extends BatchSource {
+public class MultiTableDBSource extends ReferenceBatchSource {
private static final Logger LOG = LoggerFactory.getLogger(MultiTableDBSource.class);
private static final String JDBC_PLUGIN_ID = "jdbc.driver";
@@ -71,11 +73,13 @@ public class MultiTableDBSource extends BatchSource jdbcDriverClass = pipelineConfigurer.usePluginClass("jdbc", conf.getJdbcPluginName(),
JDBC_PLUGIN_ID,
PluginProperties.builder().build());
diff --git a/src/main/java/io/cdap/plugin/format/DBTableRecordReader.java b/src/main/java/io/cdap/plugin/format/DBTableRecordReader.java
index ad3ac28..b612a6e 100644
--- a/src/main/java/io/cdap/plugin/format/DBTableRecordReader.java
+++ b/src/main/java/io/cdap/plugin/format/DBTableRecordReader.java
@@ -24,6 +24,8 @@
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.sql.Connection;
@@ -38,6 +40,8 @@
* Record reader that reads the entire contents of a database table using JDBC.
*/
public class DBTableRecordReader extends RecordReader {
+ private static final Logger LOG = LoggerFactory.getLogger(DBTableRecordReader.class);
+
private final DBTableName tableName;
private final String tableNameField;
private final MultiTableConf dbConf;
@@ -85,6 +89,10 @@ public boolean nextKeyValue() throws IOException {
schema = Schema.recordOf(tableName.getTable(), schemaFields);
}
if (!results.next()) {
+ if (pos == 0 && DBTableSplit.DEFAULT_CLAUSE.equals(split.getLowerClause())
+ && DBTableSplit.DEFAULT_CLAUSE.equals(split.getUpperClause())) {
+ LOG.info("Source table '{}' has zero records.", tableName.fullTableName());
+ }
return false;
}
diff --git a/src/main/java/io/cdap/plugin/format/DBTableSplit.java b/src/main/java/io/cdap/plugin/format/DBTableSplit.java
index 87142cb..02c115c 100644
--- a/src/main/java/io/cdap/plugin/format/DBTableSplit.java
+++ b/src/main/java/io/cdap/plugin/format/DBTableSplit.java
@@ -26,7 +26,7 @@
* A split representing data in a database table.
*/
public class DBTableSplit extends DataDrivenDBInputFormat.DataDrivenDBInputSplit {
- private static final String DEFAULT_CLAUSE = "1=1";
+ public static final String DEFAULT_CLAUSE = "1=1";
private DBTableName tableName;
diff --git a/src/main/java/io/cdap/plugin/format/MultiTableDBInputFormat.java b/src/main/java/io/cdap/plugin/format/MultiTableDBInputFormat.java
index b985e34..69d7402 100644
--- a/src/main/java/io/cdap/plugin/format/MultiTableDBInputFormat.java
+++ b/src/main/java/io/cdap/plugin/format/MultiTableDBInputFormat.java
@@ -196,6 +196,9 @@ private List getTableSplits(Connection connection, MultiTableDBConfi
columnName,
conf.getPluginConf().getWhereClause()))) {
results.next();
+ if (results.getObject(1) == null && results.getObject(2) == null) {
+ return Collections.singletonList(new DBTableSplit(info.getDbTableName()));
+ }
// Based on the type of the results, use a different mechanism
// for interpolating split points (i.e., numeric splits, text splits,
diff --git a/src/main/java/io/cdap/plugin/format/SQLStatementRecordReader.java b/src/main/java/io/cdap/plugin/format/SQLStatementRecordReader.java
index 4e534e8..37c4950 100644
--- a/src/main/java/io/cdap/plugin/format/SQLStatementRecordReader.java
+++ b/src/main/java/io/cdap/plugin/format/SQLStatementRecordReader.java
@@ -89,6 +89,9 @@ public boolean nextKeyValue() throws IOException {
schema = Schema.recordOf(tableName, schemaFields);
}
if (!results.next()) {
+ if (pos == 0) {
+ LOG.info("SQL statement '{}' ('{}') has zero records.", split.getId(), split.getSqlStatement());
+ }
return false;
}
diff --git a/src/test/java/io/cdap/plugin/MultiTableDBSourceTest.java b/src/test/java/io/cdap/plugin/MultiTableDBSourceTest.java
new file mode 100644
index 0000000..2237356
--- /dev/null
+++ b/src/test/java/io/cdap/plugin/MultiTableDBSourceTest.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright © 2024 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin;
+
+
+import io.cdap.cdap.api.data.schema.Schema;
+import io.cdap.cdap.etl.api.validation.ValidationException;
+import io.cdap.cdap.etl.mock.common.MockPipelineConfigurer;
+import io.cdap.plugin.format.MultiTableConf;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Tests for MultiTableDBSource
+ */
+public class MultiTableDBSourceTest {
+
+ MockPipelineConfigurer mockPipelineConfigurer = new MockPipelineConfigurer(Schema.of(Schema.Type.STRING));
+ MultiTableDBSource multiTableDBSource;
+ String referenceNameWithSpace = "Hello MultiTable";
+
+ @Before
+ public void setUp() {
+ multiTableDBSource = new MultiTableDBSource(new MultiTableConf(referenceNameWithSpace));
+ }
+
+ /**
+ * Plugin should not allow space in reference name for MultiTableDBSource
+ */
+ @Test
+ public void testMultiTableReferenceNameWithSpace() {
+ try {
+ multiTableDBSource.configurePipeline(mockPipelineConfigurer);
+ } catch (ValidationException e) {
+ // expected
+ }
+ Assert.assertEquals(1, mockPipelineConfigurer.getStageConfigurer().getFailureCollector().getValidationFailures()
+ .size());
+ }
+}