-
Notifications
You must be signed in to change notification settings - Fork 551
Expand file tree
/
Copy pathKuduUtil.java
More file actions
549 lines (515 loc) · 22.5 KB
/
KuduUtil.java
File metadata and controls
549 lines (515 loc) · 22.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.util;
import static java.lang.String.format;
import static org.apache.impala.service.KuduCatalogOpExecutor.GOT_KUDU_CLIENT;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.sql.Date;
import java.time.LocalDate;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nullable;
import org.apache.impala.analysis.Analyzer;
import org.apache.impala.analysis.DescriptorTable;
import org.apache.impala.analysis.Expr;
import org.apache.impala.analysis.InsertStmt;
import org.apache.impala.analysis.KuduPartitionExpr;
import org.apache.impala.analysis.LiteralExpr;
import org.apache.impala.catalog.ArrayType;
import org.apache.impala.catalog.FeKuduTable;
import org.apache.impala.catalog.ScalarType;
import org.apache.impala.catalog.Type;
import org.apache.impala.common.AnalysisException;
import org.apache.impala.common.ImpalaRuntimeException;
import org.apache.impala.common.Pair;
import org.apache.impala.service.BackendConfig;
import org.apache.impala.thrift.TColumn;
import org.apache.impala.thrift.TColumnEncoding;
import org.apache.impala.thrift.TExpr;
import org.apache.impala.thrift.TExprNode;
import org.apache.impala.thrift.TExprNodeType;
import org.apache.impala.thrift.THdfsCompression;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.ColumnSchema.CompressionAlgorithm;
import org.apache.kudu.ColumnSchema.Encoding;
import org.apache.kudu.ColumnTypeAttributes;
import org.apache.kudu.Schema;
import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.KuduClient.KuduClientBuilder;
import org.apache.kudu.client.PartialRow;
import org.apache.kudu.client.RangePartitionBound;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
public class KuduUtil {
private static final String KUDU_TABLE_NAME_PREFIX = "impala::";
// Number of worker threads created by each KuduClient, regardless of whether or not
// they're needed. Impala does not share KuduClients between operations, so the number
// of threads created can get very large under concurrent workloads. This number should
// be sufficient for the Frontend/Catalog use, and has been tested in stress tests.
private static int KUDU_CLIENT_WORKER_THREAD_COUNT = 5;
// Maps lists of master addresses to KuduClients, for sharing clients across the FE.
private static Map<String, KuduClient> kuduClients_ =
new ConcurrentHashMap<String, KuduClient>();
/**
* Gets a KuduClient for the specified Kudu master addresses (as a comma-separated
* list of host:port pairs). It will look up and share an existing KuduClient, if
* possible, or it will create a new one to return.
* The 'admin operation timeout' and the 'operation timeout' are set to
* BackendConfig.getKuduClientTimeoutMs(). The 'admin operations timeout' is used for
* operations like creating/deleting tables. The 'operation timeout' is used when
* fetching tablet metadata.
*/
public static KuduClient getKuduClient(String kuduMasters) {
KuduClient client = kuduClients_.computeIfAbsent(kuduMasters, k -> {
KuduClientBuilder b = new KuduClient.KuduClientBuilder(kuduMasters);
b.defaultAdminOperationTimeoutMs(BackendConfig.INSTANCE.getKuduClientTimeoutMs());
b.defaultOperationTimeoutMs(BackendConfig.INSTANCE.getKuduClientTimeoutMs());
b.workerCount(KUDU_CLIENT_WORKER_THREAD_COUNT);
b.saslProtocolName(BackendConfig.INSTANCE.getKuduSaslProtocolName());
return b.build();
});
return client;
}
/**
* Wrapper to get kudu client and mark the given 'catalogTimeline' when it finishes.
*/
public static KuduClient getKuduClient(String masterHosts,
EventSequence catalogTimeline) {
KuduClient kudu = KuduUtil.getKuduClient(masterHosts);
catalogTimeline.markEvent(GOT_KUDU_CLIENT);
return kudu;
}
/**
* Creates a PartialRow from a list of range partition boundary values.
* 'rangePartitionColumns' must be specified in Kudu case.
*/
private static PartialRow parseRangePartitionBoundaryValues(Schema schema,
List<String> rangePartitionColumns, List<TExpr> boundaryValues)
throws ImpalaRuntimeException {
Preconditions.checkState(rangePartitionColumns.size() == boundaryValues.size());
PartialRow bound = new PartialRow(schema);
for (int i = 0; i < boundaryValues.size(); ++i) {
String colName = rangePartitionColumns.get(i);
ColumnSchema col = schema.getColumn(colName);
Preconditions.checkNotNull(col);
setKey(col, boundaryValues.get(i), schema.getColumnIndex(colName), bound);
}
return bound;
}
/**
* Builds and returns a range-partition bound used in the creation of a Kudu
* table. The range-partition bound consists of a PartialRow with the boundary
* values and a RangePartitionBound indicating if the bound is inclusive or exclusive.
* Throws an ImpalaRuntimeException if an error occurs while parsing the boundary
* values. 'rangePartitionColumns' must be specified in Kudu case.
*/
public static Pair<PartialRow, RangePartitionBound> buildRangePartitionBound(
Schema schema, List<String> rangePartitionColumns, List<TExpr> boundaryValues,
boolean isInclusiveBound) throws ImpalaRuntimeException {
if (boundaryValues == null || boundaryValues.isEmpty()) {
// TODO: Do we need to set the bound type?
return new Pair<PartialRow, RangePartitionBound>(new PartialRow(schema),
RangePartitionBound.INCLUSIVE_BOUND);
}
PartialRow bound =
parseRangePartitionBoundaryValues(schema, rangePartitionColumns, boundaryValues);
RangePartitionBound boundType = null;
if (isInclusiveBound) {
boundType = RangePartitionBound.INCLUSIVE_BOUND;
} else {
boundType = RangePartitionBound.EXCLUSIVE_BOUND;
}
return new Pair<PartialRow, RangePartitionBound>(bound, boundType);
}
/**
* Sets the value 'boundaryVal' in 'key' at 'pos'. Checks if 'boundaryVal' has the
* expected data type.
*/
private static void setKey(ColumnSchema col, TExpr boundaryVal, int pos, PartialRow key)
throws ImpalaRuntimeException {
Preconditions.checkState(boundaryVal.getNodes().size() == 1);
TExprNode literal = boundaryVal.getNodes().get(0);
String colName = col.getName();
org.apache.kudu.Type type = col.getType();
String strLiteral;
switch (type) {
case INT8:
checkCorrectType(literal.isSetInt_literal(), type, colName, literal);
key.addByte(pos, (byte) literal.getInt_literal().getValue());
break;
case INT16:
checkCorrectType(literal.isSetInt_literal(), type, colName, literal);
key.addShort(pos, (short) literal.getInt_literal().getValue());
break;
case INT32:
checkCorrectType(literal.isSetInt_literal(), type, colName, literal);
key.addInt(pos, (int) literal.getInt_literal().getValue());
break;
case INT64:
checkCorrectType(literal.isSetInt_literal(), type, colName, literal);
key.addLong(pos, literal.getInt_literal().getValue());
break;
case VARCHAR:
checkCorrectType(literal.isSetString_literal(), type, colName, literal);
key.addVarchar(pos,
StringUtils.fromUtf8Buffer(literal.getString_literal().value, false));
break;
case STRING:
checkCorrectType(literal.isSetString_literal(), type, colName, literal);
key.addString(pos,
StringUtils.fromUtf8Buffer(literal.getString_literal().value, false));
break;
case UNIXTIME_MICROS:
checkCorrectType(literal.isSetInt_literal(), type, colName, literal);
key.addLong(pos, literal.getInt_literal().getValue());
break;
case DATE:
checkCorrectType(literal.isSetDate_literal(), type, colName, literal);
key.addDate(pos, Date.valueOf(LocalDate.ofEpochDay(
literal.getDate_literal().getDays_since_epoch())));
break;
case DECIMAL:
checkCorrectType(literal.isSetDecimal_literal(), type, colName, literal);
BigInteger unscaledVal = new BigInteger(literal.getDecimal_literal().getValue());
int scale = col.getTypeAttributes().getScale();
key.addDecimal(pos, new BigDecimal(unscaledVal, scale));
break;
default:
throw new ImpalaRuntimeException("Key columns not supported for type: "
+ type.toString());
}
}
/**
* Returns the actual value of the specified defaultValue literal. The returned type is
* the value type stored by Kudu for the column. For example, The `impalaType` is
* translated to a Kudu Type 'type' and if 'type' is 'INT8', the returned
* value is a Java byte, or if 'type' is 'UNIXTIME_MICROS', the returned value is
* a Java long.
*/
public static Object getKuduDefaultValue(
TExpr defaultValue, Type impalaType, String colName) throws ImpalaRuntimeException {
Preconditions.checkState(defaultValue.getNodes().size() == 1);
TExprNode literal = defaultValue.getNodes().get(0);
if (literal.getNode_type() == TExprNodeType.NULL_LITERAL) return null;
org.apache.kudu.Type type = KuduUtil.fromImpalaType(impalaType);
if (type == null) {
throw new ImpalaRuntimeException(
String.format("Type %s is not supported in Kudu", impalaType.toSql()));
}
switch (type) {
case INT8:
checkCorrectType(literal.isSetInt_literal(), type, colName, literal);
return (byte) literal.getInt_literal().getValue();
case INT16:
checkCorrectType(literal.isSetInt_literal(), type, colName, literal);
return (short) literal.getInt_literal().getValue();
case INT32:
checkCorrectType(literal.isSetInt_literal(), type, colName, literal);
return (int) literal.getInt_literal().getValue();
case INT64:
checkCorrectType(literal.isSetInt_literal(), type, colName, literal);
return (long) literal.getInt_literal().getValue();
case FLOAT:
checkCorrectType(literal.isSetFloat_literal(), type, colName, literal);
return (float) literal.getFloat_literal().getValue();
case DOUBLE:
checkCorrectType(literal.isSetFloat_literal(), type, colName, literal);
return (double) literal.getFloat_literal().getValue();
case VARCHAR:
case STRING:
checkCorrectType(literal.isSetString_literal(), type, colName, literal);
return StringUtils.fromUtf8Buffer(literal.getString_literal().value, false);
case BOOL:
checkCorrectType(literal.isSetBool_literal(), type, colName, literal);
return literal.getBool_literal().isValue();
case UNIXTIME_MICROS:
checkCorrectType(literal.isSetInt_literal(), type, colName, literal);
return literal.getInt_literal().getValue();
case DATE:
checkCorrectType(literal.isSetDate_literal(), type, colName, literal);
return Date.valueOf(LocalDate.ofEpochDay(
literal.getDate_literal().getDays_since_epoch()));
case DECIMAL:
checkCorrectType(literal.isSetDecimal_literal(), type, colName, literal);
BigInteger unscaledVal = new BigInteger(literal.getDecimal_literal().getValue());
return new BigDecimal(unscaledVal, impalaType.getDecimalDigits());
default:
throw new ImpalaRuntimeException("Unsupported value for column type: " +
type.toString());
}
}
public static Encoding fromThrift(TColumnEncoding encoding)
throws ImpalaRuntimeException {
switch (encoding) {
case AUTO:
return Encoding.AUTO_ENCODING;
case PLAIN:
return Encoding.PLAIN_ENCODING;
case PREFIX:
return Encoding.PREFIX_ENCODING;
case GROUP_VARINT:
return Encoding.GROUP_VARINT;
case RLE:
return Encoding.RLE;
case DICTIONARY:
return Encoding.DICT_ENCODING;
case BIT_SHUFFLE:
return Encoding.BIT_SHUFFLE;
default:
throw new ImpalaRuntimeException("Unsupported encoding: " +
encoding.toString());
}
}
public static TColumnEncoding toThrift(Encoding encoding)
throws ImpalaRuntimeException {
switch (encoding) {
case AUTO_ENCODING:
return TColumnEncoding.AUTO;
case PLAIN_ENCODING:
return TColumnEncoding.PLAIN;
case PREFIX_ENCODING:
return TColumnEncoding.PREFIX;
case GROUP_VARINT:
return TColumnEncoding.GROUP_VARINT;
case RLE:
return TColumnEncoding.RLE;
case DICT_ENCODING:
return TColumnEncoding.DICTIONARY;
case BIT_SHUFFLE:
return TColumnEncoding.BIT_SHUFFLE;
default:
throw new ImpalaRuntimeException("Unsupported encoding: " +
encoding.toString());
}
}
public static CompressionAlgorithm fromThrift(THdfsCompression compression)
throws ImpalaRuntimeException {
switch (compression) {
case DEFAULT:
return CompressionAlgorithm.DEFAULT_COMPRESSION;
case NONE:
return CompressionAlgorithm.NO_COMPRESSION;
case SNAPPY:
return CompressionAlgorithm.SNAPPY;
case LZ4:
return CompressionAlgorithm.LZ4;
case ZLIB:
return CompressionAlgorithm.ZLIB;
default:
throw new ImpalaRuntimeException("Unsupported compression algorithm: " +
compression.toString());
}
}
public static THdfsCompression toThrift(CompressionAlgorithm compression)
throws ImpalaRuntimeException {
switch (compression) {
case NO_COMPRESSION:
return THdfsCompression.NONE;
case DEFAULT_COMPRESSION:
return THdfsCompression.DEFAULT;
case SNAPPY:
return THdfsCompression.SNAPPY;
case LZ4:
return THdfsCompression.LZ4;
case ZLIB:
return THdfsCompression.ZLIB;
default:
throw new ImpalaRuntimeException("Unsupported compression algorithm: " +
compression.toString());
}
}
public static TColumn setColumnOptions(TColumn column, boolean isKey,
boolean isPrimaryKeyUnique, Boolean isNullable, boolean isAutoIncrementing,
Encoding encoding, CompressionAlgorithm compression, Expr defaultValue,
Integer blockSize, String kuduName) {
column.setIs_key(isKey);
column.setIs_primary_key_unique(isPrimaryKeyUnique);
if (isNullable != null) column.setIs_nullable(isNullable);
column.setIs_auto_incrementing(isAutoIncrementing);
try {
if (encoding != null) column.setEncoding(toThrift(encoding));
if (compression != null) column.setCompression(toThrift(compression));
} catch (ImpalaRuntimeException e) {
// This shouldn't happen
throw new IllegalStateException(String.format("Error parsing " +
"encoding/compression values for Kudu column '%s': %s", column.getColumnName(),
e.getMessage()));
}
if (defaultValue != null) {
Preconditions.checkState(defaultValue instanceof LiteralExpr);
column.setDefault_value(defaultValue.treeToThrift());
}
if (blockSize != null) column.setBlock_size(blockSize);
Preconditions.checkNotNull(kuduName);
column.setKudu_column_name(kuduName);
return column;
}
/**
* If correctType is true, returns. Otherwise throws a formatted error message
* indicating problems with the type of the literal of the range literal.
*/
private static void checkCorrectType(boolean correctType, org.apache.kudu.Type t,
String colName, TExprNode boundaryVal) throws ImpalaRuntimeException {
if (correctType) return;
throw new ImpalaRuntimeException(
format("Expected '%s' literal for column '%s' got '%s'", t.getName(), colName,
Type.fromThrift(boundaryVal.getType()).toSql()));
}
public static boolean isSupportedKeyType(org.apache.impala.catalog.Type type) {
return type.isFixedPointType() || type.isStringType() || type.isTimestamp()
|| type.isDate();
}
/**
* When Kudu's integration with the Hive Metastore is enabled, returns the Kudu
* table name with the format 'metastoreDbName.metastoreTableName'. Otherwise,
* returns with the format 'impala::metastoreDbName.metastoreTableName'. This
* should only be used for managed table.
*/
public static String getDefaultKuduTableName(String metastoreDbName,
String metastoreTableName, boolean isHMSIntegrationEnabled) {
return isHMSIntegrationEnabled ? metastoreDbName + "." + metastoreTableName :
KUDU_TABLE_NAME_PREFIX + metastoreDbName + "." + metastoreTableName;
}
/**
* Check if the given name is the default Kudu table name for managed table
* whether Kudu's integration with the Hive Metastore is enabled or not.
*/
public static boolean isDefaultKuduTableName(String name,
String metastoreDbName, String metastoreTableName) {
return getDefaultKuduTableName(metastoreDbName,
metastoreTableName, true).equals(name) ||
getDefaultKuduTableName(metastoreDbName,
metastoreTableName, false).equals(name);
}
/**
* Converts a given Impala catalog type or its item type to the Kudu type.
* Returns null if the type cannot be converted instead of throwing an exception so
* that the caller can report the full type in the error message. Since this function
* contains recursion, only the outer-most caller has the full type info.
*/
public static @Nullable org.apache.kudu.Type fromImpalaType(Type t) {
if (!t.isScalarType()) {
// Kudu does not support complex types other than ARRAY.
if (!t.isArrayType()) { return null; }
Type itemType = ((ArrayType) t).getItemType();
// Kudu does not support array of non-scalar types or 16-byte DECIMAL.
if (!itemType.isScalarType()
|| ((ScalarType) itemType).storageBytesForDecimal() == 16) {
return null;
}
return KuduUtil.fromImpalaType(itemType);
}
ScalarType s = (ScalarType) t;
switch (s.getPrimitiveType()) {
case TINYINT: return org.apache.kudu.Type.INT8;
case SMALLINT: return org.apache.kudu.Type.INT16;
case INT: return org.apache.kudu.Type.INT32;
case BIGINT: return org.apache.kudu.Type.INT64;
case BOOLEAN: return org.apache.kudu.Type.BOOL;
case STRING: return org.apache.kudu.Type.STRING;
case DOUBLE: return org.apache.kudu.Type.DOUBLE;
case FLOAT: return org.apache.kudu.Type.FLOAT;
case TIMESTAMP: return org.apache.kudu.Type.UNIXTIME_MICROS;
case DECIMAL: return org.apache.kudu.Type.DECIMAL;
case DATE: return org.apache.kudu.Type.DATE;
case VARCHAR: return org.apache.kudu.Type.VARCHAR;
case BINARY: return org.apache.kudu.Type.BINARY;
/* Fall through below */
case INVALID_TYPE:
case NULL_TYPE:
case DATETIME:
case CHAR:
default: return null;
}
}
/**
* Converts a given Kudu scalar type to its matching Impala scalar type.
*/
public static ScalarType toImpalaScalarType(org.apache.kudu.Type t,
ColumnTypeAttributes typeAttributes) throws ImpalaRuntimeException {
Preconditions.checkState(t != org.apache.kudu.Type.NESTED);
switch (t) {
case BOOL: return Type.BOOLEAN;
case DOUBLE: return Type.DOUBLE;
case FLOAT: return Type.FLOAT;
case INT8: return Type.TINYINT;
case INT16: return Type.SMALLINT;
case INT32: return Type.INT;
case INT64: return Type.BIGINT;
case STRING: return Type.STRING;
case UNIXTIME_MICROS: return Type.TIMESTAMP;
case DATE: return Type.DATE;
case DECIMAL:
return ScalarType.createDecimalType(
typeAttributes.getPrecision(), typeAttributes.getScale());
case VARCHAR: return ScalarType.createVarcharType(typeAttributes.getLength());
case BINARY: return Type.BINARY;
default:
throw new ImpalaRuntimeException(String.format(
"Kudu type '%s' is not supported in Impala", t.getName()));
}
}
/**
* Converts a given Kudu colSchema to its matching Impala type. If the Kudu type is
* NESTED, it must be an array and the element type is converted to the
* corresponding Impala type.
*/
public static Type toImpalaType(org.apache.kudu.ColumnSchema colSchema)
throws ImpalaRuntimeException {
org.apache.kudu.Type t = colSchema.getType();
ColumnTypeAttributes typeAttributes = colSchema.getTypeAttributes();
if (t != org.apache.kudu.Type.NESTED) {
return toImpalaScalarType(t, typeAttributes);
}
Preconditions.checkState(colSchema.getNestedTypeDescriptor() != null);
Preconditions.checkState(colSchema.getNestedTypeDescriptor().isArray());
org.apache.kudu.Type kuduElementType =
colSchema.getNestedTypeDescriptor().getArrayDescriptor().getElemType();
return new ArrayType(toImpalaScalarType(kuduElementType, typeAttributes));
}
/**
* Creates and returns an Expr that takes rows being inserted by 'insertStmt' and
* returns the partition number for each row.
*/
public static Expr createPartitionExpr(InsertStmt insertStmt, Analyzer analyzer)
throws AnalysisException {
Preconditions.checkState(insertStmt.getTargetTable() instanceof FeKuduTable);
Expr kuduPartitionExpr = new KuduPartitionExpr(DescriptorTable.TABLE_SINK_ID,
(FeKuduTable) insertStmt.getTargetTable(),
Lists.newArrayList(insertStmt.getPartitionKeyExprs()),
insertStmt.getPartitionColPos());
kuduPartitionExpr.analyze(analyzer);
return kuduPartitionExpr;
}
// Used for test assertions
public static int getkuduClientsSize() {
return kuduClients_.size();
}
// Used for generating log messages
public static String getPrimaryKeyString(boolean isPrimaryKeyUnique) {
StringBuilder sb = new StringBuilder();
if (!isPrimaryKeyUnique) sb.append("NON UNIQUE ");
sb.append("PRIMARY KEY");
return sb.toString();
}
// Get auto-incrementing column name of Kudu table
public static String getAutoIncrementingColumnName() {
return Schema.getAutoIncrementingColumnName();
}
}