diff --git a/pom.xml b/pom.xml index 0abd9a5..288ef93 100644 --- a/pom.xml +++ b/pom.xml @@ -20,17 +20,17 @@ io.cdap.plugin multi-table-plugins - 1.4.0-SNAPSHOT + 1.4.2 jar Multiple Table Plugins UTF-8 - 6.8.0-SNAPSHOT + 6.8.0 2.10.2 2.2.4 - 2.11.0-SNAPSHOT + 2.11.0 widgets docs @@ -43,23 +43,12 @@ - - sonatype - https://oss.sonatype.org/content/groups/public - sonatype-snapshots - https://oss.sonatype.org/content/repositories/snapshots + https://central.sonatype.com/repository/maven-snapshots - - - sonatype - https://oss.sonatype.org/content/groups/public - - - io.cdap.cdap @@ -73,7 +62,7 @@ io.cdap.cdap - cdap-data-pipeline2_2.11 + cdap-data-pipeline3_2.12 ${cdap.version} test diff --git a/src/main/java/io/cdap/plugin/MultiTableDBSource.java b/src/main/java/io/cdap/plugin/MultiTableDBSource.java index af64ad7..9f0ac4b 100644 --- a/src/main/java/io/cdap/plugin/MultiTableDBSource.java +++ b/src/main/java/io/cdap/plugin/MultiTableDBSource.java @@ -32,6 +32,8 @@ import io.cdap.cdap.etl.api.batch.BatchSourceContext; import io.cdap.plugin.common.Asset; import io.cdap.plugin.common.LineageRecorder; +import io.cdap.plugin.common.ReferenceBatchSource; +import io.cdap.plugin.common.ReferencePluginConfig; import io.cdap.plugin.common.SourceInputFormatProvider; import io.cdap.plugin.format.DBTableInfo; import io.cdap.plugin.format.MultiSQLStatementInputFormat; @@ -63,7 +65,7 @@ @Description("Reads from multiple tables in a relational database. " + "Outputs one record for each row in each table, with the table name as a record field. " + "Also sets a pipeline argument for each table read, which contains the table schema. ") -public class MultiTableDBSource extends BatchSource { +public class MultiTableDBSource extends ReferenceBatchSource { private static final Logger LOG = LoggerFactory.getLogger(MultiTableDBSource.class); private static final String JDBC_PLUGIN_ID = "jdbc.driver"; @@ -71,11 +73,13 @@ public class MultiTableDBSource extends BatchSource jdbcDriverClass = pipelineConfigurer.usePluginClass("jdbc", conf.getJdbcPluginName(), JDBC_PLUGIN_ID, PluginProperties.builder().build()); diff --git a/src/main/java/io/cdap/plugin/format/DBTableRecordReader.java b/src/main/java/io/cdap/plugin/format/DBTableRecordReader.java index ad3ac28..b612a6e 100644 --- a/src/main/java/io/cdap/plugin/format/DBTableRecordReader.java +++ b/src/main/java/io/cdap/plugin/format/DBTableRecordReader.java @@ -24,6 +24,8 @@ import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.sql.Connection; @@ -38,6 +40,8 @@ * Record reader that reads the entire contents of a database table using JDBC. */ public class DBTableRecordReader extends RecordReader { + private static final Logger LOG = LoggerFactory.getLogger(DBTableRecordReader.class); + private final DBTableName tableName; private final String tableNameField; private final MultiTableConf dbConf; @@ -85,6 +89,10 @@ public boolean nextKeyValue() throws IOException { schema = Schema.recordOf(tableName.getTable(), schemaFields); } if (!results.next()) { + if (pos == 0 && DBTableSplit.DEFAULT_CLAUSE.equals(split.getLowerClause()) + && DBTableSplit.DEFAULT_CLAUSE.equals(split.getUpperClause())) { + LOG.info("Source table '{}' has zero records.", tableName.fullTableName()); + } return false; } diff --git a/src/main/java/io/cdap/plugin/format/DBTableSplit.java b/src/main/java/io/cdap/plugin/format/DBTableSplit.java index 87142cb..02c115c 100644 --- a/src/main/java/io/cdap/plugin/format/DBTableSplit.java +++ b/src/main/java/io/cdap/plugin/format/DBTableSplit.java @@ -26,7 +26,7 @@ * A split representing data in a database table. */ public class DBTableSplit extends DataDrivenDBInputFormat.DataDrivenDBInputSplit { - private static final String DEFAULT_CLAUSE = "1=1"; + public static final String DEFAULT_CLAUSE = "1=1"; private DBTableName tableName; diff --git a/src/main/java/io/cdap/plugin/format/MultiTableDBInputFormat.java b/src/main/java/io/cdap/plugin/format/MultiTableDBInputFormat.java index b985e34..69d7402 100644 --- a/src/main/java/io/cdap/plugin/format/MultiTableDBInputFormat.java +++ b/src/main/java/io/cdap/plugin/format/MultiTableDBInputFormat.java @@ -196,6 +196,9 @@ private List getTableSplits(Connection connection, MultiTableDBConfi columnName, conf.getPluginConf().getWhereClause()))) { results.next(); + if (results.getObject(1) == null && results.getObject(2) == null) { + return Collections.singletonList(new DBTableSplit(info.getDbTableName())); + } // Based on the type of the results, use a different mechanism // for interpolating split points (i.e., numeric splits, text splits, diff --git a/src/main/java/io/cdap/plugin/format/SQLStatementRecordReader.java b/src/main/java/io/cdap/plugin/format/SQLStatementRecordReader.java index 4e534e8..37c4950 100644 --- a/src/main/java/io/cdap/plugin/format/SQLStatementRecordReader.java +++ b/src/main/java/io/cdap/plugin/format/SQLStatementRecordReader.java @@ -89,6 +89,9 @@ public boolean nextKeyValue() throws IOException { schema = Schema.recordOf(tableName, schemaFields); } if (!results.next()) { + if (pos == 0) { + LOG.info("SQL statement '{}' ('{}') has zero records.", split.getId(), split.getSqlStatement()); + } return false; } diff --git a/src/test/java/io/cdap/plugin/MultiTableDBSourceTest.java b/src/test/java/io/cdap/plugin/MultiTableDBSourceTest.java new file mode 100644 index 0000000..2237356 --- /dev/null +++ b/src/test/java/io/cdap/plugin/MultiTableDBSourceTest.java @@ -0,0 +1,56 @@ +/* + * Copyright © 2024 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin; + + +import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.etl.api.validation.ValidationException; +import io.cdap.cdap.etl.mock.common.MockPipelineConfigurer; +import io.cdap.plugin.format.MultiTableConf; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Tests for MultiTableDBSource + */ +public class MultiTableDBSourceTest { + + MockPipelineConfigurer mockPipelineConfigurer = new MockPipelineConfigurer(Schema.of(Schema.Type.STRING)); + MultiTableDBSource multiTableDBSource; + String referenceNameWithSpace = "Hello MultiTable"; + + @Before + public void setUp() { + multiTableDBSource = new MultiTableDBSource(new MultiTableConf(referenceNameWithSpace)); + } + + /** + * Plugin should not allow space in reference name for MultiTableDBSource + */ + @Test + public void testMultiTableReferenceNameWithSpace() { + try { + multiTableDBSource.configurePipeline(mockPipelineConfigurer); + } catch (ValidationException e) { + // expected + } + Assert.assertEquals(1, mockPipelineConfigurer.getStageConfigurer().getFailureCollector().getValidationFailures() + .size()); + } +}