props) throws Exception {
- AwsTableInfo tableInfo = new AwsTableInfo();
- tableInfo.setName(tableName);
- parseFieldsInfo(fieldsInfo, tableInfo);
-
- tableInfo.setParallelism(MathUtil.getIntegerVal(props.get(AwsConstantKey.PARALLELISM_KEY.toLowerCase())));
- tableInfo.setAccessKey(MathUtil.getString(props.get(AwsConstantKey.ACCESS_KEY.toLowerCase())));
- tableInfo.setSecretKey(MathUtil.getString(props.get(AwsConstantKey.SECRET_KEY.toLowerCase())));
- tableInfo.setStorageType(MathUtil.getString(props.get(AwsConstantKey.STORAGE_TYPE.toLowerCase())));
- tableInfo.setHostname(MathUtil.getString(props.get(AwsConstantKey.HOST_NAME.toLowerCase())));
- tableInfo.setBucketAcl(MathUtil.getString(props.get(AwsConstantKey.BUCKET_ACL.toLowerCase())));
- tableInfo.setBucketName(MathUtil.getString(props.get(AwsConstantKey.BUCKET_KEY.toLowerCase())));
- tableInfo.setObjectName(MathUtil.getString(props.get(AwsConstantKey.OBJECT_NAME.toLowerCase())));
-
- tableInfo.check();
-
- return tableInfo;
- }
-}
diff --git a/aws/aws-sink/src/main/java/com/dtstack/flink/sql/sink/aws/table/AwsTableInfo.java b/aws/aws-sink/src/main/java/com/dtstack/flink/sql/sink/aws/table/AwsTableInfo.java
deleted file mode 100644
index f02826679..000000000
--- a/aws/aws-sink/src/main/java/com/dtstack/flink/sql/sink/aws/table/AwsTableInfo.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.dtstack.flink.sql.sink.aws.table;
-
-import com.dtstack.flink.sql.table.AbstractTargetTableInfo;
-import com.google.common.base.Preconditions;
-
-/**
- * @author tiezhu
- * date 2020/12/1
- * company dtstack
- */
-public class AwsTableInfo extends AbstractTargetTableInfo {
-
- private static final String CURRENT_TYPE = "aws";
-
- private String accessKey;
-
- private String secretKey;
-
- private String hostname;
-
- private String bucketName;
-
- /**
- * 写入s3的objectName
- */
- private String objectName;
-
- /**
- * 文件存储的类型,分为Standard【标准存储】和 Standard-ia【低频存储】
- */
- private String storageType;
-
- /**
- * 设置bucket的访问权限,有基本的两种,CannedAccessControlList和AccessControlList
- */
- private String bucketAcl;
-
- private String userId;
-
- private String userDisplayName;
-
- private String ownerId;
-
- private String ownerDisplayName;
-
- public String getAccessKey() {
- return accessKey;
- }
-
- public void setAccessKey(String accessKey) {
- this.accessKey = accessKey;
- }
-
- public String getSecretKey() {
- return secretKey;
- }
-
- public void setSecretKey(String secretKey) {
- this.secretKey = secretKey;
- }
-
- public String getHostname() {
- return hostname;
- }
-
- public void setHostname(String hostname) {
- this.hostname = hostname;
- }
-
- public String getBucketName() {
- return bucketName;
- }
-
- public void setBucketName(String bucketName) {
- this.bucketName = bucketName;
- }
-
- public String getStorageType() {
- return storageType;
- }
-
- public void setStorageType(String storageType) {
- this.storageType = storageType;
- }
-
- public String getBucketAcl() {
- return bucketAcl;
- }
-
- public void setBucketAcl(String bucketAcl) {
- this.bucketAcl = bucketAcl;
- }
-
- public String getUserId() {
- return userId;
- }
-
- public void setUserId(String userId) {
- this.userId = userId;
- }
-
- public String getUserDisplayName() {
- return userDisplayName;
- }
-
- public void setUserDisplayName(String userDisplayName) {
- this.userDisplayName = userDisplayName;
- }
-
- public String getOwnerId() {
- return ownerId;
- }
-
- public void setOwnerId(String ownerId) {
- this.ownerId = ownerId;
- }
-
- public String getOwnerDisplayName() {
- return ownerDisplayName;
- }
-
- public void setOwnerDisplayName(String ownerDisplayName) {
- this.ownerDisplayName = ownerDisplayName;
- }
-
- public String getObjectName() {
- return objectName;
- }
-
- public void setObjectName(String objectName) {
- this.objectName = objectName;
- }
-
- @Override
- public boolean check() {
- Preconditions.checkNotNull(accessKey, "S3 field of AccessKey is required!");
- Preconditions.checkNotNull(secretKey, "S3 field of SecretKey is required!");
- Preconditions.checkNotNull(bucketName, "S3 field of BucketName is required!");
- Preconditions.checkNotNull(objectName, "S3 field of ObjectName is required!");
-
- return true;
- }
-
- @Override
- public String getType() {
- return CURRENT_TYPE;
- }
-}
diff --git a/aws/aws-sink/src/main/java/com/dtstack/flink/sql/sink/aws/util/AwsManager.java b/aws/aws-sink/src/main/java/com/dtstack/flink/sql/sink/aws/util/AwsManager.java
deleted file mode 100644
index 9cf1b3ccd..000000000
--- a/aws/aws-sink/src/main/java/com/dtstack/flink/sql/sink/aws/util/AwsManager.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.dtstack.flink.sql.sink.aws.util;
-
-import com.amazonaws.ClientConfiguration;
-import com.amazonaws.auth.AWSCredentials;
-import com.amazonaws.auth.BasicAWSCredentials;
-import com.amazonaws.services.s3.AmazonS3Client;
-import com.amazonaws.services.s3.model.AccessControlList;
-import com.amazonaws.services.s3.model.CannedAccessControlList;
-import com.amazonaws.services.s3.model.CanonicalGrantee;
-import com.amazonaws.services.s3.model.CreateBucketRequest;
-import com.amazonaws.services.s3.model.ObjectMetadata;
-import com.amazonaws.services.s3.model.Owner;
-import com.amazonaws.services.s3.model.Permission;
-import com.amazonaws.services.s3.model.S3Object;
-import com.amazonaws.services.s3.model.SetBucketAclRequest;
-import com.amazonaws.services.s3.model.StorageClass;
-import com.dtstack.flink.sql.sink.aws.AwsConstantKey;
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Objects;
-
-/**
- * @author tiezhu
- * date 2020/12/1
- * company dtstack
- *
- * 一些针对s3的操作api
- */
-public class AwsManager {
- private static final Logger LOG = LoggerFactory.getLogger(AwsManager.class);
-
- /**
- * 创建s3 client
- *
- * @param accessKey accessKey
- * @param secretKey secretKey
- * @param hostname hostname
- * @return 客户端
- */
- public static AmazonS3Client initClient(String accessKey,
- String secretKey,
- String hostname) {
- ClientConfiguration configuration = new ClientConfiguration();
- // TODO 这个参数要对外开发出去
- configuration.setSignerOverride("S3SignerType");
- AWSCredentials credentials = new BasicAWSCredentials(accessKey, secretKey);
- AmazonS3Client client = new AmazonS3Client(credentials, configuration);
- client.setEndpoint(hostname);
- return client;
- }
-
- /**
- * 创建bucket
- *
- * @param bucketName bucketName,不能为空
- * @param client 客户端,必须可用
- * @param readWrite 读写权限,默认私有
- * @param storageClass 存储类型,有STANDARD 【标准存储】 和 STANDARD_IA 【低频存储】 默认标准存储
- */
- public static void createBucket(String bucketName,
- AmazonS3Client client,
- String readWrite,
- String storageClass) {
- if (StringUtils.isBlank(readWrite)) {
- client.createBucket(bucketName);
- return;
- }
-
- CreateBucketRequest request = new CreateBucketRequest(bucketName);
-
- // TODO 完善多个类型
- if (readWrite.equalsIgnoreCase(AwsConstantKey.PUBLIC_READ_WRITE)) {
- request.setCannedAcl(CannedAccessControlList.PublicReadWrite);
- }
-
- // TODO 完善多个存储类型
- if (storageClass.equalsIgnoreCase(AwsConstantKey.STANDARD)) {
- request.setStorageClass(StorageClass.Standard);
- }
- }
-
- /**
- * 给bucket设置访问权限CannedAccess
- *
- * @param client 客户端
- * @param bucketName bucketName
- * @param aclName 权限名,权限名分为CannedAccessControlList 与AccessControlList 两种格式。
- */
- public static void setBucketCannedAccessControl(AmazonS3Client client,
- String bucketName,
- String aclName) {
- client.setBucketAcl(bucketName, CannedAccessControlList.valueOf(aclName));
- }
-
- /**
- * 给bucket 设置访问权限AccessControl
- *
- * @param client 客户端
- * @param bucketName bucketName
- * @param aclName 权限名
- */
- public static void setBucketAccessControl(AmazonS3Client client,
- String bucketName,
- String aclName) {
- AccessControlList list = new AccessControlList();
- // set owner
- Owner owner = new Owner("owner-id", "owner-displayName");
- list.setOwner(owner);
-
- // set owner id
- CanonicalGrantee grantee = new CanonicalGrantee("owner-id");
-
- // set owner display name
- grantee.setDisplayName("user-displayName");
- // set access of acl
- //TODO 完善Permission选择
- list.grantPermission(grantee, Permission.parsePermission(aclName));
- SetBucketAclRequest request = new SetBucketAclRequest(bucketName, list);
- client.setBucketAcl(request);
- }
-
- /**
- * 获取当前 Object 字节位数,如果 Object 不存在,那么返回 0
- *
- * @param bucket bucket
- * @param objectName object name
- * @param client client
- * @return object bytes 位数
- */
- public static long getObjectPosition(String bucket, String objectName, AmazonS3Client client) {
- if (!client.doesObjectExist(bucket, objectName)) {
- return 0;
- }
-
- S3Object object = client.getObject(bucket, objectName);
- ObjectMetadata objectMetadata = object.getObjectMetadata();
- return objectMetadata.getContentLength();
- }
-}
diff --git a/aws/pom.xml b/aws/pom.xml
deleted file mode 100644
index 7525d712c..000000000
--- a/aws/pom.xml
+++ /dev/null
@@ -1,32 +0,0 @@
-
-
-
- flink.sql
- com.dtstack.flink
- 1.0-SNAPSHOT
- ../pom.xml
-
- 4.0.0
-
- sql.aws
- pom
-
- aws-sink
-
-
-
- 1.0-SNAPSHOT
-
-
-
-
- com.dtstack.flink
- sql.core
- ${sql.core.version}
- provided
-
-
-
-
\ No newline at end of file
diff --git a/cassandra/cassandra-side/cassandra-all-side/pom.xml b/cassandra/cassandra-side/cassandra-all-side/pom.xml
index 5715ab605..461e70beb 100644
--- a/cassandra/cassandra-side/cassandra-all-side/pom.xml
+++ b/cassandra/cassandra-side/cassandra-all-side/pom.xml
@@ -36,10 +36,9 @@
shade
- false
- org.slf4j
+
@@ -70,14 +69,14 @@
-
+
-
+
diff --git a/cassandra/cassandra-side/cassandra-all-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAllReqRow.java b/cassandra/cassandra-side/cassandra-all-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAllReqRow.java
index 28830135a..eecb1350e 100644
--- a/cassandra/cassandra-side/cassandra-all-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAllReqRow.java
+++ b/cassandra/cassandra-side/cassandra-all-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAllReqRow.java
@@ -28,26 +28,25 @@
import com.datastax.driver.core.SocketOptions;
import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy;
import com.datastax.driver.core.policies.RetryPolicy;
-import com.dtstack.flink.sql.side.AbstractSideTableInfo;
-import com.dtstack.flink.sql.side.BaseAllReqRow;
+import com.dtstack.flink.sql.side.AllReqRow;
import com.dtstack.flink.sql.side.FieldInfo;
import com.dtstack.flink.sql.side.JoinInfo;
+import com.dtstack.flink.sql.side.SideTableInfo;
import com.dtstack.flink.sql.side.cassandra.table.CassandraSideTableInfo;
-import com.dtstack.flink.sql.util.RowDataComplete;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
import org.apache.calcite.sql.JoinType;
import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.table.dataformat.BaseRow;
-import org.apache.flink.table.dataformat.GenericRow;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
+import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetAddress;
import java.sql.SQLException;
+import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.List;
@@ -60,12 +59,14 @@
*
* @author xuqianjin
*/
-public class CassandraAllReqRow extends BaseAllReqRow {
+public class CassandraAllReqRow extends AllReqRow {
private static final long serialVersionUID = 54015343561288219L;
private static final Logger LOG = LoggerFactory.getLogger(CassandraAllReqRow.class);
+ private static final String cassandra_DRIVER = "com.cassandra.jdbc.Driver";
+
private static final int CONN_RETRY_NUM = 3;
private static final int FETCH_SIZE = 1000;
@@ -75,10 +76,36 @@ public class CassandraAllReqRow extends BaseAllReqRow {
private AtomicReference