From d4b10c7a8de66b6e5e34a4820551afb90ddce851 Mon Sep 17 00:00:00 2001 From: Dennis Li <23002167+dli357@users.noreply.github.com> Date: Tue, 20 Apr 2021 21:28:40 -0700 Subject: [PATCH 1/8] [CDAP-674] Added default as unsupported column type and added additional details to unsupported error --- .../java/io/cdap/plugin/format/DBTypes.java | 30 +++++++++---------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/src/main/java/io/cdap/plugin/format/DBTypes.java b/src/main/java/io/cdap/plugin/format/DBTypes.java index 9447084..711d51e 100644 --- a/src/main/java/io/cdap/plugin/format/DBTypes.java +++ b/src/main/java/io/cdap/plugin/format/DBTypes.java @@ -66,9 +66,6 @@ public static List getSchemaFields(ResultSet resultSet) throws SQL // given a sql type return schema type private static Schema getSchema(ResultSetMetaData metadata, int column) throws SQLException { int sqlType = metadata.getColumnType(column); - - // Type.STRING covers sql types - VARCHAR,CHAR,CLOB,LONGNVARCHAR,LONGVARCHAR,NCHAR,NCLOB,NVARCHAR - Schema schema = Schema.of(Schema.Type.STRING); switch (sqlType) { case Types.NULL: return Schema.of(Schema.Type.NULL); @@ -104,25 +101,28 @@ private static Schema getSchema(ResultSetMetaData metadata, int column) throws S case Types.TIMESTAMP: return Schema.of(Schema.LogicalType.TIMESTAMP_MICROS); + case Types.VARCHAR: + case Types.CHAR: + case Types.CLOB: + case Types.LONGNVARCHAR: + case Types.LONGVARCHAR: + case Types.NCHAR: + case Types.NCLOB: + case Types.NVARCHAR: + return Schema.of(Schema.Type.STRING); + case Types.BINARY: case Types.VARBINARY: case Types.LONGVARBINARY: case Types.BLOB: return Schema.of(Schema.Type.BYTES); - case Types.ARRAY: - case Types.DATALINK: - case Types.DISTINCT: - case Types.JAVA_OBJECT: - case Types.OTHER: - case Types.REF: - case Types.ROWID: - case Types.SQLXML: - case Types.STRUCT: - throw new SQLException(new UnsupportedTypeException("Unsupported SQL Type: " + sqlType)); + // Unsupported Types: ARRAY, DATALINK, DISTINCT, JAVA_OBJECT, OTHER, REF, ROWID, SQLXML, STRUCT + default: + throw new SQLException(new UnsupportedTypeException("Unsupported SQL Type: " + sqlType + " for column '" + + metadata.getColumnName(column) + "' with type '" + + metadata.getColumnTypeName(column) + "'")); } - - return schema; } public static StructuredRecord.Builder setValue(StructuredRecord.Builder record, int sqlColumnType, From 35b685cabffdaaa0b114506ceaa76fd60ebb5f6c Mon Sep 17 00:00:00 2001 From: Dennis Li <23002167+dli357@users.noreply.github.com> Date: Tue, 20 Apr 2021 21:28:40 -0700 Subject: [PATCH 2/8] [CDAP-674] Added default as unsupported column type and added additional details to unsupported error --- .../java/io/cdap/plugin/format/DBTypes.java | 30 +++++++++---------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/src/main/java/io/cdap/plugin/format/DBTypes.java b/src/main/java/io/cdap/plugin/format/DBTypes.java index 9447084..711d51e 100644 --- a/src/main/java/io/cdap/plugin/format/DBTypes.java +++ b/src/main/java/io/cdap/plugin/format/DBTypes.java @@ -66,9 +66,6 @@ public static List getSchemaFields(ResultSet resultSet) throws SQL // given a sql type return schema type private static Schema getSchema(ResultSetMetaData metadata, int column) throws SQLException { int sqlType = metadata.getColumnType(column); - - // Type.STRING covers sql types - VARCHAR,CHAR,CLOB,LONGNVARCHAR,LONGVARCHAR,NCHAR,NCLOB,NVARCHAR - Schema schema = Schema.of(Schema.Type.STRING); switch (sqlType) { case Types.NULL: return Schema.of(Schema.Type.NULL); @@ -104,25 +101,28 @@ private static Schema getSchema(ResultSetMetaData metadata, int column) throws S case Types.TIMESTAMP: return Schema.of(Schema.LogicalType.TIMESTAMP_MICROS); + case Types.VARCHAR: + case Types.CHAR: + case Types.CLOB: + case Types.LONGNVARCHAR: + case Types.LONGVARCHAR: + case Types.NCHAR: + case Types.NCLOB: + case Types.NVARCHAR: + return Schema.of(Schema.Type.STRING); + case Types.BINARY: case Types.VARBINARY: case Types.LONGVARBINARY: case Types.BLOB: return Schema.of(Schema.Type.BYTES); - case Types.ARRAY: - case Types.DATALINK: - case Types.DISTINCT: - case Types.JAVA_OBJECT: - case Types.OTHER: - case Types.REF: - case Types.ROWID: - case Types.SQLXML: - case Types.STRUCT: - throw new SQLException(new UnsupportedTypeException("Unsupported SQL Type: " + sqlType)); + // Unsupported Types: ARRAY, DATALINK, DISTINCT, JAVA_OBJECT, OTHER, REF, ROWID, SQLXML, STRUCT + default: + throw new SQLException(new UnsupportedTypeException("Unsupported SQL Type: " + sqlType + " for column '" + + metadata.getColumnName(column) + "' with type '" + + metadata.getColumnTypeName(column) + "'")); } - - return schema; } public static StructuredRecord.Builder setValue(StructuredRecord.Builder record, int sqlColumnType, From dd679ee2b701f6e47cfb9957c7110607fae54cec Mon Sep 17 00:00:00 2001 From: itsankitjain-google Date: Mon, 10 Jan 2022 16:57:39 +0530 Subject: [PATCH 3/8] returning correct transformed value with decimal data type in oracle --- pom.xml | 8 ++++---- .../io/cdap/plugin/format/DBTableRecordReader.java | 4 +++- src/main/java/io/cdap/plugin/format/DBTypes.java | 12 ++++++++++-- .../cdap/plugin/format/SQLStatementRecordReader.java | 4 +++- 4 files changed, 20 insertions(+), 8 deletions(-) diff --git a/pom.xml b/pom.xml index 04f9326..488f6db 100644 --- a/pom.xml +++ b/pom.xml @@ -20,17 +20,17 @@ io.cdap.plugin multi-table-plugins - 1.4.0-SNAPSHOT + 1.3.1 jar Multiple Table Plugins UTF-8 - 6.1.1 + 6.5.1 2.3.0 2.2.4 - 2.2.0 + 2.7.1 widgets docs @@ -72,7 +72,7 @@ io.cdap.cdap - cdap-data-pipeline + cdap-data-pipeline2_2.11 ${cdap.version} test diff --git a/src/main/java/io/cdap/plugin/format/DBTableRecordReader.java b/src/main/java/io/cdap/plugin/format/DBTableRecordReader.java index 40648b0..ad3ac28 100644 --- a/src/main/java/io/cdap/plugin/format/DBTableRecordReader.java +++ b/src/main/java/io/cdap/plugin/format/DBTableRecordReader.java @@ -108,7 +108,9 @@ public RecordWrapper getCurrentValue() throws IOException { for (int i = 0; i < tableFields.size(); i++) { Schema.Field field = tableFields.get(i); int sqlColumnType = resultMeta.getColumnType(i + 1); - DBTypes.setValue(recordBuilder, sqlColumnType, results, field.getName()); + int sqlPrecision = resultMeta.getPrecision(i + 1); + int sqlScale = resultMeta.getScale(i + 1); + DBTypes.setValue(recordBuilder, sqlColumnType, results, field.getName(), sqlPrecision, sqlScale); } } catch (SQLException e) { throw new IOException("Error decoding row from table " + tableName, e); diff --git a/src/main/java/io/cdap/plugin/format/DBTypes.java b/src/main/java/io/cdap/plugin/format/DBTypes.java index 711d51e..3ad3394 100644 --- a/src/main/java/io/cdap/plugin/format/DBTypes.java +++ b/src/main/java/io/cdap/plugin/format/DBTypes.java @@ -90,6 +90,13 @@ private static Schema getSchema(ResultSetMetaData metadata, int column) throws S case Types.DECIMAL: int precision = metadata.getPrecision(column); int scale = metadata.getScale(column); + String typeName = metadata.getColumnTypeName(column); + // decimal type with precision 0 is not supported + if (precision == 0) { + throw new SQLException(new UnsupportedTypeException( + String.format("Column %s has unsupported SQL Type: '%s' with precision: '%s'", column, + typeName, precision))); + } return Schema.decimalOf(precision, scale); case Types.DOUBLE: return Schema.of(Schema.Type.DOUBLE); @@ -126,7 +133,8 @@ private static Schema getSchema(ResultSetMetaData metadata, int column) throws S } public static StructuredRecord.Builder setValue(StructuredRecord.Builder record, int sqlColumnType, - ResultSet resultSet, String fieldName) throws SQLException { + ResultSet resultSet, String fieldName, + int precision, int scale) throws SQLException { Object original = resultSet.getObject(fieldName); if (original != null) { switch (sqlColumnType) { @@ -135,7 +143,7 @@ public static StructuredRecord.Builder setValue(StructuredRecord.Builder record, return record.set(fieldName, ((Number) original).intValue()); case Types.NUMERIC: case Types.DECIMAL: - return record.setDecimal(fieldName, ((BigDecimal) original)); + return record.setDecimal(fieldName, resultSet.getBigDecimal(fieldName, scale)); case Types.DATE: return record.setDate(fieldName, resultSet.getDate(fieldName).toLocalDate()); case Types.TIME: diff --git a/src/main/java/io/cdap/plugin/format/SQLStatementRecordReader.java b/src/main/java/io/cdap/plugin/format/SQLStatementRecordReader.java index f2cdfb2..4e534e8 100644 --- a/src/main/java/io/cdap/plugin/format/SQLStatementRecordReader.java +++ b/src/main/java/io/cdap/plugin/format/SQLStatementRecordReader.java @@ -112,7 +112,9 @@ public RecordWrapper getCurrentValue() throws IOException { for (int i = 0; i < tableFields.size(); i++) { Schema.Field field = tableFields.get(i); int sqlColumnType = resultMeta.getColumnType(i + 1); - DBTypes.setValue(recordBuilder, sqlColumnType, results, field.getName()); + int sqlPrecision = resultMeta.getPrecision(i + 1); + int sqlScale = resultMeta.getScale(i + 1); + DBTypes.setValue(recordBuilder, sqlColumnType, results, field.getName(), sqlPrecision, sqlScale); } } catch (SQLException e) { throw new IOException("Error decoding row from statement : '%s'", e); From 319735a1180a2acfd219ce2bb902d389db6ce042 Mon Sep 17 00:00:00 2001 From: Sean Zhou Date: Tue, 11 Jan 2022 11:01:32 -0800 Subject: [PATCH 4/8] revert depedency version back and add new repo for dependency --- pom.xml | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/pom.xml b/pom.xml index 488f6db..8a8a00b 100644 --- a/pom.xml +++ b/pom.xml @@ -27,10 +27,10 @@ UTF-8 - 6.5.1 + 6.1.1 2.3.0 2.2.4 - 2.7.1 + 2.2.0 widgets docs @@ -50,6 +50,10 @@ sonatype-snapshots https://oss.sonatype.org/content/repositories/snapshots + + spring-plugins + https://repo.spring.io/plugins-release + @@ -72,7 +76,7 @@ io.cdap.cdap - cdap-data-pipeline2_2.11 + cdap-data-pipeline ${cdap.version} test From 30f7327ea6b9e524910cf8b24493837d8a32bdfa Mon Sep 17 00:00:00 2001 From: itsankitjain-google Date: Tue, 8 Feb 2022 14:16:13 +0530 Subject: [PATCH 5/8] add set fetch size --- pom.xml | 2 +- .../io/cdap/plugin/ForwardingConnection.java | 327 ++++++++++++++++++ .../io/cdap/plugin/NoOpCommitConnection.java | 37 ++ .../format/ConnectionWithFetchSize.java | 108 ++++++ .../io/cdap/plugin/format/MultiTableConf.java | 18 +- widgets/MultiTableDatabase-batchsource.json | 24 +- 6 files changed, 512 insertions(+), 4 deletions(-) create mode 100644 src/main/java/io/cdap/plugin/ForwardingConnection.java create mode 100644 src/main/java/io/cdap/plugin/NoOpCommitConnection.java create mode 100644 src/main/java/io/cdap/plugin/format/ConnectionWithFetchSize.java diff --git a/pom.xml b/pom.xml index 8a8a00b..e8b33c4 100644 --- a/pom.xml +++ b/pom.xml @@ -20,7 +20,7 @@ io.cdap.plugin multi-table-plugins - 1.3.1 + 1.3.2-SNAPSHOT jar Multiple Table Plugins diff --git a/src/main/java/io/cdap/plugin/ForwardingConnection.java b/src/main/java/io/cdap/plugin/ForwardingConnection.java new file mode 100644 index 0000000..5db1ab9 --- /dev/null +++ b/src/main/java/io/cdap/plugin/ForwardingConnection.java @@ -0,0 +1,327 @@ +/* + * Copyright © 2021 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin; + +import java.sql.Array; +import java.sql.Blob; +import java.sql.CallableStatement; +import java.sql.Clob; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.NClob; +import java.sql.PreparedStatement; +import java.sql.SQLClientInfoException; +import java.sql.SQLException; +import java.sql.SQLWarning; +import java.sql.SQLXML; +import java.sql.Savepoint; +import java.sql.Statement; +import java.sql.Struct; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.Executor; + +/** + * A JDBC {@link Connection} that delegates all methods to another {@link Connection}. + */ +public abstract class ForwardingConnection implements Connection { + + private final Connection delegate; + + protected ForwardingConnection(Connection delegate) { + this.delegate = delegate; + } + + protected Connection getDelegate() { + return delegate; + } + + @Override + public Statement createStatement() throws SQLException { + return getDelegate().createStatement(); + } + + @Override + public PreparedStatement prepareStatement(String sql) throws SQLException { + return getDelegate().prepareStatement(sql); + } + + @Override + public CallableStatement prepareCall(String sql) throws SQLException { + return getDelegate().prepareCall(sql); + } + + @Override + public String nativeSQL(String sql) throws SQLException { + return getDelegate().nativeSQL(sql); + } + + @Override + public void setAutoCommit(boolean autoCommit) throws SQLException { + getDelegate().setAutoCommit(autoCommit); + } + + @Override + public boolean getAutoCommit() throws SQLException { + return getDelegate().getAutoCommit(); + } + + @Override + public void commit() throws SQLException { + getDelegate().commit(); + } + + @Override + public void rollback() throws SQLException { + getDelegate().rollback(); + } + + @Override + public void close() throws SQLException { + getDelegate().close(); + } + + @Override + public boolean isClosed() throws SQLException { + return getDelegate().isClosed(); + } + + @Override + public DatabaseMetaData getMetaData() throws SQLException { + return getDelegate().getMetaData(); + } + + @Override + public void setReadOnly(boolean readOnly) throws SQLException { + getDelegate().setReadOnly(readOnly); + } + + @Override + public boolean isReadOnly() throws SQLException { + return getDelegate().isReadOnly(); + } + + @Override + public void setCatalog(String catalog) throws SQLException { + getDelegate().setCatalog(catalog); + } + + @Override + public String getCatalog() throws SQLException { + return getDelegate().getCatalog(); + } + + @Override + public void setTransactionIsolation(int level) throws SQLException { + getDelegate().setTransactionIsolation(level); + } + + @Override + public int getTransactionIsolation() throws SQLException { + return getDelegate().getTransactionIsolation(); + } + + @Override + public SQLWarning getWarnings() throws SQLException { + return getDelegate().getWarnings(); + } + + @Override + public void clearWarnings() throws SQLException { + getDelegate().clearWarnings(); + } + + @Override + public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException { + return getDelegate().createStatement(resultSetType, resultSetConcurrency); + } + + @Override + public PreparedStatement prepareStatement(String sql, + int resultSetType, int resultSetConcurrency) throws SQLException { + return getDelegate().prepareStatement(sql, resultSetType, resultSetConcurrency); + } + + @Override + public CallableStatement prepareCall(String sql, + int resultSetType, int resultSetConcurrency) throws SQLException { + return getDelegate().prepareCall(sql, resultSetType, resultSetConcurrency); + } + + @Override + public Map> getTypeMap() throws SQLException { + return getDelegate().getTypeMap(); + } + + @Override + public void setTypeMap(Map> map) throws SQLException { + getDelegate().setTypeMap(map); + } + + @Override + public void setHoldability(int holdability) throws SQLException { + getDelegate().setHoldability(holdability); + } + + @Override + public int getHoldability() throws SQLException { + return getDelegate().getHoldability(); + } + + @Override + public Savepoint setSavepoint() throws SQLException { + return getDelegate().setSavepoint(); + } + + @Override + public Savepoint setSavepoint(String name) throws SQLException { + return getDelegate().setSavepoint(name); + } + + @Override + public void rollback(Savepoint savepoint) throws SQLException { + getDelegate().rollback(savepoint); + } + + @Override + public void releaseSavepoint(Savepoint savepoint) throws SQLException { + getDelegate().releaseSavepoint(savepoint); + } + + @Override + public Statement createStatement(int resultSetType, + int resultSetConcurrency, int resultSetHoldability) throws SQLException { + return getDelegate().createStatement(resultSetType, resultSetConcurrency, resultSetHoldability); + } + + @Override + public PreparedStatement prepareStatement(String sql, int resultSetType, + int resultSetConcurrency, int resultSetHoldability) throws SQLException { + return getDelegate().prepareStatement(sql, resultSetType, resultSetConcurrency, resultSetHoldability); + } + + @Override + public CallableStatement prepareCall(String sql, int resultSetType, + int resultSetConcurrency, int resultSetHoldability) throws SQLException { + return getDelegate().prepareCall(sql, resultSetType, resultSetConcurrency, resultSetHoldability); + } + + @Override + public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException { + return getDelegate().prepareStatement(sql, autoGeneratedKeys); + } + + @Override + public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException { + return getDelegate().prepareStatement(sql, columnIndexes); + } + + @Override + public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException { + return getDelegate().prepareStatement(sql, columnNames); + } + + @Override + public Clob createClob() throws SQLException { + return getDelegate().createClob(); + } + + @Override + public Blob createBlob() throws SQLException { + return getDelegate().createBlob(); + } + + @Override + public NClob createNClob() throws SQLException { + return getDelegate().createNClob(); + } + + @Override + public SQLXML createSQLXML() throws SQLException { + return getDelegate().createSQLXML(); + } + + @Override + public boolean isValid(int timeout) throws SQLException { + return getDelegate().isValid(timeout); + } + + @Override + public void setClientInfo(String name, String value) throws SQLClientInfoException { + getDelegate().setClientInfo(name, value); + } + + @Override + public void setClientInfo(Properties properties) throws SQLClientInfoException { + getDelegate().setClientInfo(properties); + } + + @Override + public String getClientInfo(String name) throws SQLException { + return getDelegate().getClientInfo(name); + } + + @Override + public Properties getClientInfo() throws SQLException { + return getDelegate().getClientInfo(); + } + + @Override + public Array createArrayOf(String typeName, Object[] elements) throws SQLException { + return getDelegate().createArrayOf(typeName, elements); + } + + @Override + public Struct createStruct(String typeName, Object[] attributes) throws SQLException { + return getDelegate().createStruct(typeName, attributes); + } + + @Override + public void setSchema(String schema) throws SQLException { + getDelegate().setSchema(schema); + } + + @Override + public String getSchema() throws SQLException { + return getDelegate().getSchema(); + } + + @Override + public void abort(Executor executor) throws SQLException { + getDelegate().abort(executor); + } + + @Override + public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException { + getDelegate().setNetworkTimeout(executor, milliseconds); + } + + @Override + public int getNetworkTimeout() throws SQLException { + return getDelegate().getNetworkTimeout(); + } + + @Override + public T unwrap(Class iface) throws SQLException { + return getDelegate().unwrap(iface); + } + + @Override + public boolean isWrapperFor(Class iface) throws SQLException { + return getDelegate().isWrapperFor(iface); + } +} diff --git a/src/main/java/io/cdap/plugin/NoOpCommitConnection.java b/src/main/java/io/cdap/plugin/NoOpCommitConnection.java new file mode 100644 index 0000000..b3c2d2d --- /dev/null +++ b/src/main/java/io/cdap/plugin/NoOpCommitConnection.java @@ -0,0 +1,37 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin; + + +import java.sql.Connection; +import java.sql.SQLException; + +/** + * A hack to work around jdbc drivers that create connections that don't support commit. + * This is true of the Hive jdbc driver. Delegates all operations except commit, which is a no-op. + */ +public class NoOpCommitConnection extends io.cdap.plugin.ForwardingConnection { + + public NoOpCommitConnection(Connection delegate) { + super(delegate); + } + + @Override + public void commit() throws SQLException { + // no-op + } +} diff --git a/src/main/java/io/cdap/plugin/format/ConnectionWithFetchSize.java b/src/main/java/io/cdap/plugin/format/ConnectionWithFetchSize.java new file mode 100644 index 0000000..3900520 --- /dev/null +++ b/src/main/java/io/cdap/plugin/format/ConnectionWithFetchSize.java @@ -0,0 +1,108 @@ +/* + * Copyright © 2021 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.format; + +import io.cdap.plugin.ForwardingConnection; + +import java.sql.CallableStatement; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Statement; + +/** + * A JDBC {@link Connection} that delegates all methods to another {@link Connection}, but automatically call + * the {@link Statement#setFetchSize(int)} for all {@link Statement} created via this class. + */ +public class ConnectionWithFetchSize extends ForwardingConnection { + + private final int fetchSize; + + public ConnectionWithFetchSize(Connection delegate, int fetchSize) { + super(delegate); + this.fetchSize = fetchSize; + } + + private T setFetchSize(T stmt) throws SQLException { + stmt.setFetchSize(fetchSize); + return stmt; + } + + @Override + public Statement createStatement() throws SQLException { + return setFetchSize(super.createStatement()); + } + + @Override + public PreparedStatement prepareStatement(String sql) throws SQLException { + return setFetchSize(super.prepareStatement(sql)); + } + + @Override + public CallableStatement prepareCall(String sql) throws SQLException { + return setFetchSize(super.prepareCall(sql)); + } + + @Override + public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException { + return setFetchSize(super.createStatement(resultSetType, resultSetConcurrency)); + } + + @Override + public PreparedStatement prepareStatement(String sql, + int resultSetType, int resultSetConcurrency) throws SQLException { + return setFetchSize(super.prepareStatement(sql, resultSetType, resultSetConcurrency)); + } + + @Override + public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException { + return setFetchSize(super.prepareCall(sql, resultSetType, resultSetConcurrency)); + } + + @Override + public Statement createStatement(int resultSetType, + int resultSetConcurrency, int resultSetHoldability) throws SQLException { + return setFetchSize(super.createStatement(resultSetType, resultSetConcurrency, resultSetHoldability)); + } + + @Override + public PreparedStatement prepareStatement(String sql, int resultSetType, + int resultSetConcurrency, int resultSetHoldability) throws SQLException { + return setFetchSize(super.prepareStatement(sql, resultSetType, resultSetConcurrency, resultSetHoldability)); + } + + @Override + public CallableStatement prepareCall(String sql, int resultSetType, + int resultSetConcurrency, int resultSetHoldability) throws SQLException { + return setFetchSize(super.prepareCall(sql, resultSetType, resultSetConcurrency, resultSetHoldability)); + } + + @Override + public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException { + return setFetchSize(super.prepareStatement(sql, autoGeneratedKeys)); + } + + @Override + public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException { + return setFetchSize(super.prepareStatement(sql, columnIndexes)); + } + + @Override + public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException { + return setFetchSize(super.prepareStatement(sql, columnNames)); + } +} diff --git a/src/main/java/io/cdap/plugin/format/MultiTableConf.java b/src/main/java/io/cdap/plugin/format/MultiTableConf.java index ad3f812..7f4d18b 100644 --- a/src/main/java/io/cdap/plugin/format/MultiTableConf.java +++ b/src/main/java/io/cdap/plugin/format/MultiTableConf.java @@ -23,6 +23,7 @@ import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.plugin.PluginConfig; import io.cdap.cdap.etl.api.FailureCollector; +import io.cdap.plugin.NoOpCommitConnection; import io.cdap.plugin.TransactionIsolationLevel; import java.sql.Connection; @@ -49,6 +50,7 @@ public class MultiTableConf extends PluginConfig { public static final String ERROR_HANDLING_FAIL_PIPELINE = "fail-pipeline"; public static final String NAME_TABLE_ALIASES = "tableAliases"; public static final String NAME_SQL_STATEMENTS = "sqlStatements"; + public static final String FETCH_SIZE = "fetchSize"; @Description("This will be used to uniquely identify this source for lineage, annotating metadata, etc.") private String referenceName; @@ -82,6 +84,13 @@ public class MultiTableConf extends PluginConfig { "an exception whenever a commit is called. For drivers like that, this should be set to true.") private Boolean enableAutoCommit; + @Nullable + @Name(FETCH_SIZE) + @Macro + @Description("The number of rows to fetch at a time per split. Larger fetch size can result in faster import, " + + "with the tradeoff of higher memory usage.") + private Integer fetchSize; + @Macro @Nullable @Description("A schema name pattern to read all the tables. By default all the schemas will " + @@ -297,8 +306,15 @@ protected static List splitTableAliases(String tableAliases) { public Connection getConnection() throws SQLException { Connection conn = user == null ? DriverManager.getConnection(connectionString) : DriverManager.getConnection(connectionString, user, password); - conn.setAutoCommit(enableAutoCommit); + conn.setAutoCommit(getEnableAutoCommit()); conn.setTransactionIsolation(TransactionIsolationLevel.getLevel(transactionIsolationLevel)); + if (getEnableAutoCommit()) { + // hack to work around jdbc drivers like the hive driver that throw exceptions on commit + conn = new NoOpCommitConnection(conn); + } + if (fetchSize != null && fetchSize > 0) { + conn = new ConnectionWithFetchSize(conn, fetchSize); + } return conn; } } diff --git a/widgets/MultiTableDatabase-batchsource.json b/widgets/MultiTableDatabase-batchsource.json index 70b78ba..355c8fb 100644 --- a/widgets/MultiTableDatabase-batchsource.json +++ b/widgets/MultiTableDatabase-batchsource.json @@ -104,9 +104,20 @@ "name": "tableNameField" }, { - "widget-type": "textbox", + "widget-type": "toggle", "label": "Enable Auto Commit", - "name": "enableAutoCommit" + "name": "enableAutoCommit", + "widget-attributes" : { + "on": { + "value": "true", + "label": "YES" + }, + "off": { + "value": "false", + "label": "NO" + }, + "default": "false" + } }, { "widget-type": "textbox", @@ -116,6 +127,15 @@ "default": "1" } }, + { + "widget-type": "number", + "label": "Fetch Size", + "name": "fetchSize", + "widget-attributes": { + "default": "1000", + "minimum": "0" + } + }, { "widget-type": "textbox", "label": "Query Timeout (Seconds)", From 65d7abaf1ef7f865c1eea41cd644bc27d20aca89 Mon Sep 17 00:00:00 2001 From: itsankitjain-google Date: Tue, 8 Feb 2022 14:19:10 +0530 Subject: [PATCH 6/8] updated markdown --- docs/MultiTableDatabase-batchsource.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/docs/MultiTableDatabase-batchsource.md b/docs/MultiTableDatabase-batchsource.md index 25b5430..4c47974 100644 --- a/docs/MultiTableDatabase-batchsource.md +++ b/docs/MultiTableDatabase-batchsource.md @@ -80,6 +80,9 @@ Defaults to TRANSACTION_SERIALIZABLE. See java.sql.Connection#setTransactionIsol The Phoenix jdbc driver will throw an exception if the Phoenix database does not have transactions enabled and this setting is set to true. For drivers like that, this should be set to TRANSACTION_NONE. +**Fetch Size:** The number of rows to fetch at a time per split. Larger fetch size can result in faster import, +with the tradeoff of higher memory usage. + ### Custom SQL Statements When using the **Data Selection Mode** called **SQL Statements**, the supplied list of SQL statements will be executed From 95e05546e6b66ddc88b1edaf1356654e65606e43 Mon Sep 17 00:00:00 2001 From: Baptiste Benet Date: Tue, 22 Feb 2022 21:18:38 +0100 Subject: [PATCH 7/8] BIGINT -> Decimal schema Check unsigned bigint Clean imports Assert backward compatibility protect against case Add Javadoc instead of comment --- .../java/io/cdap/plugin/format/DBTypes.java | 40 +++++++++++++------ .../java/io/cdap/plugin/PipelineTest.java | 8 ++-- 2 files changed, 31 insertions(+), 17 deletions(-) diff --git a/src/main/java/io/cdap/plugin/format/DBTypes.java b/src/main/java/io/cdap/plugin/format/DBTypes.java index 3ad3394..bc3dd14 100644 --- a/src/main/java/io/cdap/plugin/format/DBTypes.java +++ b/src/main/java/io/cdap/plugin/format/DBTypes.java @@ -21,7 +21,6 @@ import io.cdap.cdap.api.data.schema.Schema; import io.cdap.cdap.api.data.schema.UnsupportedTypeException; -import java.math.BigDecimal; import java.sql.Blob; import java.sql.Clob; import java.sql.ResultSet; @@ -63,9 +62,17 @@ public static List getSchemaFields(ResultSet resultSet) throws SQL return schemaFields; } - // given a sql type return schema type + /** + * Given the metadata and column number, return the Schema type corresponding to the SQL type. + * + * Note that the BIGINT SQL type can be signed or unsigned. An unsigned BIGINT will be mapped to a Decimal type while + * a signed BIGINT will be mapped to a Long. This distinction has to be made for backward compatibility reasons. + * + * @throws SQLException for unsupported types. + */ private static Schema getSchema(ResultSetMetaData metadata, int column) throws SQLException { int sqlType = metadata.getColumnType(column); + String typeName = metadata.getColumnTypeName(column); switch (sqlType) { case Types.NULL: return Schema.of(Schema.Type.NULL); @@ -80,7 +87,11 @@ private static Schema getSchema(ResultSetMetaData metadata, int column) throws S return Schema.of(Schema.Type.INT); case Types.BIGINT: - return Schema.of(Schema.Type.LONG); + if (typeName.toLowerCase().contains("unsigned")) { + return getDecimalSafe(metadata, column, typeName); + } else { + return Schema.of(Schema.Type.LONG); + } case Types.REAL: case Types.FLOAT: @@ -88,16 +99,7 @@ private static Schema getSchema(ResultSetMetaData metadata, int column) throws S case Types.NUMERIC: case Types.DECIMAL: - int precision = metadata.getPrecision(column); - int scale = metadata.getScale(column); - String typeName = metadata.getColumnTypeName(column); - // decimal type with precision 0 is not supported - if (precision == 0) { - throw new SQLException(new UnsupportedTypeException( - String.format("Column %s has unsupported SQL Type: '%s' with precision: '%s'", column, - typeName, precision))); - } - return Schema.decimalOf(precision, scale); + return getDecimalSafe(metadata, column, typeName); case Types.DOUBLE: return Schema.of(Schema.Type.DOUBLE); @@ -132,6 +134,18 @@ private static Schema getSchema(ResultSetMetaData metadata, int column) throws S } } + private static Schema getDecimalSafe(ResultSetMetaData metadata, int column, String typeName) throws SQLException { + int precision = metadata.getPrecision(column); + int scale = metadata.getScale(column); + // decimal type with precision 0 is not supported + if (precision == 0) { + throw new SQLException(new UnsupportedTypeException( + String.format("Column %s has unsupported SQL Type: '%s' with precision: '%s'", column, + typeName, precision))); + } + return Schema.decimalOf(precision, scale); + } + public static StructuredRecord.Builder setValue(StructuredRecord.Builder record, int sqlColumnType, ResultSet resultSet, String fieldName, int precision, int scale) throws SQLException { diff --git a/src/test/java/io/cdap/plugin/PipelineTest.java b/src/test/java/io/cdap/plugin/PipelineTest.java index 14dc4eb..6aeae89 100644 --- a/src/test/java/io/cdap/plugin/PipelineTest.java +++ b/src/test/java/io/cdap/plugin/PipelineTest.java @@ -136,7 +136,7 @@ public void testMultiTableDump() throws Exception { stmt.execute("CREATE TABLE \"MULTI2\" (NAME VARCHAR(32) NOT NULL, EMAIL VARCHAR(64))"); stmt.execute("INSERT INTO \"MULTI2\" VALUES ('samuel', 'sj@example.com'), ('dwayne', 'rock@j.com')"); - stmt.execute("CREATE TABLE \"MULTI3\" (ITEM VARCHAR(32) NOT NULL, CODE INT)"); + stmt.execute("CREATE TABLE \"MULTI3\" (ITEM VARCHAR(32) NOT NULL, CODE BIGINT)"); stmt.execute("INSERT INTO \"MULTI3\" VALUES ('donut', 100), ('scotch', 707)"); stmt.execute("CREATE TABLE \"BLACKLIST1\" (ITEM VARCHAR(32) NOT NULL, CODE INT)"); @@ -209,7 +209,7 @@ public void testMultiTableDump() throws Exception { Schema schema3 = Schema.recordOf( "MULTI3", Schema.Field.of("ITEM", Schema.of(Schema.Type.STRING)), - Schema.Field.of("CODE", Schema.nullableOf(Schema.of(Schema.Type.INT))), + Schema.Field.of("CODE", Schema.nullableOf(Schema.of(Schema.Type.LONG))), Schema.Field.of("tablename", Schema.of(Schema.Type.STRING))); DataSetManager outputManager = getDataset("multiOutput"); @@ -224,8 +224,8 @@ public void testMultiTableDump() throws Exception { .set("tablename", "MULTI2").build(), StructuredRecord.builder(schema2).set("NAME", "dwayne").set("EMAIL", "rock@j.com") .set("tablename", "MULTI2").build(), - StructuredRecord.builder(schema3).set("ITEM", "donut").set("CODE", 100).set("tablename", "MULTI3").build(), - StructuredRecord.builder(schema3).set("ITEM", "scotch").set("CODE", 707).set("tablename", "MULTI3").build()); + StructuredRecord.builder(schema3).set("ITEM", "donut").set("CODE", 100L).set("tablename", "MULTI3").build(), + StructuredRecord.builder(schema3).set("ITEM", "scotch").set("CODE", 707L).set("tablename", "MULTI3").build()); Assert.assertEquals(actual.size(), outputRecords.size()); Assert.assertEquals(expected, actual); } From 31535b91d3a7a2e6317c26d1c1497f6e2e21302a Mon Sep 17 00:00:00 2001 From: albertshau Date: Thu, 24 Mar 2022 08:54:04 -0700 Subject: [PATCH 8/8] bump version to 1.3.2 --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index e8b33c4..f2f3a90 100644 --- a/pom.xml +++ b/pom.xml @@ -20,7 +20,7 @@ io.cdap.plugin multi-table-plugins - 1.3.2-SNAPSHOT + 1.3.2 jar Multiple Table Plugins