Skip to content

Commit 2e9f888

Browse files
authored
[refactor](dialect) make http sql converter plugin and audit loader as builtin plugin (#29692)
Followup #28890 Make HttpSqlConverterPlugin and AuditLoader as Doris' builtin plugin. To make it simple for user to support sql dialect and using audit loader. HttpSqlConverterPlugin By default, there is nothing changed. There is a new global variable sql_converter_service, default is empty, if set, the HttpSqlConverterPlugin will be enabled set global sql_converter_service = "http://127.0.0.1:5001/api/v1/convert" AuditLoader By default, there is nothing changed. There is a new global variable enable_audit_plugin, default is false, if set to true, the audit loader plugin will be enable. Doris will create audit_log in __internal_schema when startup If enable_audit_plugin is true, the audit load will be inserted into audit_log table. 3 other global variables related to this plugin: audit_plugin_max_batch_interval_sec: The max interval for audit loader to insert a batch of audit log. audit_plugin_max_batch_bytes: The max batch size for audit loader to insert a batch of audit log. audit_plugin_max_sql_length: The max length of statement in audit log
1 parent 1bd1530 commit 2e9f888

34 files changed

Lines changed: 684 additions & 368 deletions

fe/fe-common/src/main/java/org/apache/doris/common/Config.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2398,13 +2398,6 @@ public class Config extends ConfigBase {
23982398
"Whether to enable the function of getting log files through http interface"})
23992399
public static boolean enable_get_log_file_api = false;
24002400

2401-
// This config is deprecated and has not taken effect anymore,
2402-
// please use dialect plugin: fe_plugins/http-dialect-converter for instead
2403-
@Deprecated
2404-
@ConfField(description = {"用于SQL方言转换的服务地址。",
2405-
"The service address for SQL dialect conversion."})
2406-
public static String sql_convertor_service = "";
2407-
24082401
@ConfField(mutable = true)
24092402
public static boolean enable_profile_when_analyze = false;
24102403
@ConfField(mutable = true)

fe/fe-core/src/main/java/org/apache/doris/analysis/InlineViewRef.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ public void analyze(Analyzer analyzer) throws AnalysisException, UserException {
200200
if (view == null && !hasExplicitAlias()) {
201201
String dialect = ConnectContext.get().getSessionVariable().getSqlDialect();
202202
Dialect sqlDialect = Dialect.getByName(dialect);
203-
if (Dialect.SPARK_SQL != sqlDialect) {
203+
if (Dialect.SPARK != sqlDialect) {
204204
ErrorReport.reportAnalysisException(ErrorCode.ERR_DERIVED_MUST_HAVE_ALIAS);
205205
}
206206
hasExplicitAlias = true;

fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java

Lines changed: 80 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import org.apache.doris.analysis.DropTableStmt;
2525
import org.apache.doris.analysis.HashDistributionDesc;
2626
import org.apache.doris.analysis.KeysDesc;
27+
import org.apache.doris.analysis.PartitionDesc;
28+
import org.apache.doris.analysis.RangePartitionDesc;
2729
import org.apache.doris.analysis.TableName;
2830
import org.apache.doris.analysis.TypeDef;
2931
import org.apache.doris.common.Config;
@@ -33,6 +35,7 @@
3335
import org.apache.doris.common.util.PropertyAnalyzer;
3436
import org.apache.doris.datasource.InternalCatalog;
3537
import org.apache.doris.ha.FrontendNodeType;
38+
import org.apache.doris.plugin.audit.AuditLoaderPlugin;
3639
import org.apache.doris.statistics.StatisticConstants;
3740
import org.apache.doris.statistics.util.StatisticsUtil;
3841

@@ -53,6 +56,33 @@ public class InternalSchemaInitializer extends Thread {
5356

5457
private static final Logger LOG = LogManager.getLogger(InternalSchemaInitializer.class);
5558

59+
public static final List<ColumnDef> AUDIT_TABLE_COLUMNS;
60+
61+
static {
62+
AUDIT_TABLE_COLUMNS = new ArrayList<>();
63+
AUDIT_TABLE_COLUMNS.add(new ColumnDef("query_id", TypeDef.createVarchar(48), true));
64+
AUDIT_TABLE_COLUMNS.add(new ColumnDef("time", TypeDef.create(PrimitiveType.DATETIME), true));
65+
AUDIT_TABLE_COLUMNS.add(new ColumnDef("client_ip", TypeDef.createVarchar(128), true));
66+
AUDIT_TABLE_COLUMNS.add(new ColumnDef("user", TypeDef.createVarchar(128), true));
67+
AUDIT_TABLE_COLUMNS.add(new ColumnDef("catalog", TypeDef.createVarchar(128), true));
68+
AUDIT_TABLE_COLUMNS.add(new ColumnDef("db", TypeDef.createVarchar(128), true));
69+
AUDIT_TABLE_COLUMNS.add(new ColumnDef("state", TypeDef.createVarchar(128), true));
70+
AUDIT_TABLE_COLUMNS.add(new ColumnDef("error_code", TypeDef.create(PrimitiveType.INT), true));
71+
AUDIT_TABLE_COLUMNS.add(new ColumnDef("error_message", TypeDef.create(PrimitiveType.STRING), true));
72+
AUDIT_TABLE_COLUMNS.add(new ColumnDef("query_time", TypeDef.create(PrimitiveType.BIGINT), true));
73+
AUDIT_TABLE_COLUMNS.add(new ColumnDef("scan_bytes", TypeDef.create(PrimitiveType.BIGINT), true));
74+
AUDIT_TABLE_COLUMNS.add(new ColumnDef("scan_rows", TypeDef.create(PrimitiveType.BIGINT), true));
75+
AUDIT_TABLE_COLUMNS.add(new ColumnDef("return_rows", TypeDef.create(PrimitiveType.BIGINT), true));
76+
AUDIT_TABLE_COLUMNS.add(new ColumnDef("stmt_id", TypeDef.create(PrimitiveType.BIGINT), true));
77+
AUDIT_TABLE_COLUMNS.add(new ColumnDef("is_query", TypeDef.create(PrimitiveType.TINYINT), true));
78+
AUDIT_TABLE_COLUMNS.add(new ColumnDef("frontend_ip", TypeDef.createVarchar(128), true));
79+
AUDIT_TABLE_COLUMNS.add(new ColumnDef("cpu_time_ms", TypeDef.create(PrimitiveType.BIGINT), true));
80+
AUDIT_TABLE_COLUMNS.add(new ColumnDef("sql_hash", TypeDef.createVarchar(128), true));
81+
AUDIT_TABLE_COLUMNS.add(new ColumnDef("sql_digest", TypeDef.createVarchar(128), true));
82+
AUDIT_TABLE_COLUMNS.add(new ColumnDef("peak_memory_bytes", TypeDef.create(PrimitiveType.BIGINT), true));
83+
AUDIT_TABLE_COLUMNS.add(new ColumnDef("stmt", TypeDef.create(PrimitiveType.STRING), true));
84+
}
85+
5686
public void run() {
5787
if (!FeConstants.enableInternalSchemaDb) {
5888
return;
@@ -83,6 +113,7 @@ public void run() {
83113
Database database = op.get();
84114
modifyTblReplicaCount(database, StatisticConstants.STATISTIC_TBL_NAME);
85115
modifyTblReplicaCount(database, StatisticConstants.HISTOGRAM_TBL_NAME);
116+
modifyTblReplicaCount(database, AuditLoaderPlugin.AUDIT_LOG_TABLE);
86117
}
87118

88119
public void modifyTblReplicaCount(Database database, String tblName) {
@@ -103,8 +134,8 @@ public void modifyTblReplicaCount(Database database, String tblName) {
103134
>= StatisticConstants.STATISTIC_INTERNAL_TABLE_REPLICA_NUM) {
104135
return;
105136
}
137+
colStatsTbl.writeLock();
106138
try {
107-
colStatsTbl.writeLock();
108139
Env.getCurrentEnv().modifyTableReplicaAllocation(database, (OlapTable) colStatsTbl, props);
109140
} finally {
110141
colStatsTbl.writeUnlock();
@@ -123,8 +154,11 @@ public void modifyTblReplicaCount(Database database, String tblName) {
123154
}
124155

125156
private void createTbl() throws UserException {
157+
// statistics
126158
Env.getCurrentEnv().getInternalCatalog().createTable(buildStatisticsTblStmt());
127159
Env.getCurrentEnv().getInternalCatalog().createTable(buildHistogramTblStmt());
160+
// audit table
161+
Env.getCurrentEnv().getInternalCatalog().createTable(buildAuditTblStmt());
128162
}
129163

130164
@VisibleForTesting
@@ -212,7 +246,40 @@ public CreateTableStmt buildHistogramTblStmt() throws UserException {
212246
return createTableStmt;
213247
}
214248

249+
private CreateTableStmt buildAuditTblStmt() throws UserException {
250+
TableName tableName = new TableName("",
251+
FeConstants.INTERNAL_DB_NAME, AuditLoaderPlugin.AUDIT_LOG_TABLE);
252+
253+
String engineName = "olap";
254+
ArrayList<String> dupKeys = Lists.newArrayList("query_id", "time", "client_ip");
255+
KeysDesc keysDesc = new KeysDesc(KeysType.DUP_KEYS, dupKeys);
256+
// partition
257+
PartitionDesc partitionDesc = new RangePartitionDesc(Lists.newArrayList("time"), Lists.newArrayList());
258+
// distribution
259+
int bucketNum = 2;
260+
DistributionDesc distributionDesc = new HashDistributionDesc(bucketNum, Lists.newArrayList("query_id"));
261+
Map<String, String> properties = new HashMap<String, String>() {
262+
{
263+
put("dynamic_partition.time_unit", "DAY");
264+
put("dynamic_partition.start", "-30");
265+
put("dynamic_partition.end", "3");
266+
put("dynamic_partition.prefix", "p");
267+
put("dynamic_partition.buckets", String.valueOf(bucketNum));
268+
put("dynamic_partition.enable", "true");
269+
put("replication_num", String.valueOf(Math.max(1,
270+
Config.min_replication_num_per_tablet)));
271+
}
272+
};
273+
CreateTableStmt createTableStmt = new CreateTableStmt(true, false,
274+
tableName, AUDIT_TABLE_COLUMNS, engineName, keysDesc, partitionDesc, distributionDesc,
275+
properties, null, "Doris internal audit table, DO NOT MODIFY IT", null);
276+
StatisticsUtil.analyze(createTableStmt);
277+
return createTableStmt;
278+
}
279+
280+
215281
private boolean created() {
282+
// 1. check database exist
216283
Optional<Database> optionalDatabase =
217284
Env.getCurrentEnv().getInternalCatalog()
218285
.getDb(FeConstants.INTERNAL_DB_NAME);
@@ -225,6 +292,7 @@ private boolean created() {
225292
return false;
226293
}
227294

295+
// 2. check statistic tables
228296
Table statsTbl = optionalStatsTbl.get();
229297
Optional<Column> optionalColumn =
230298
statsTbl.fullSchema.stream().filter(c -> c.getName().equals("count")).findFirst();
@@ -238,7 +306,17 @@ private boolean created() {
238306
}
239307
return false;
240308
}
241-
return db.getTable(StatisticConstants.HISTOGRAM_TBL_NAME).isPresent();
309+
optionalStatsTbl = db.getTable(StatisticConstants.HISTOGRAM_TBL_NAME);
310+
if (!optionalStatsTbl.isPresent()) {
311+
return false;
312+
}
313+
314+
// 3. check audit table
315+
optionalStatsTbl = db.getTable(AuditLoaderPlugin.AUDIT_LOG_TABLE);
316+
if (!optionalStatsTbl.isPresent()) {
317+
return false;
318+
}
319+
return true;
242320
}
243321

244322
}

fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java

Lines changed: 21 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
2727
import org.apache.doris.httpv2.entity.RestBaseResult;
2828
import org.apache.doris.httpv2.exception.UnauthorizedException;
29+
import org.apache.doris.mysql.privilege.Auth;
2930
import org.apache.doris.mysql.privilege.PrivPredicate;
3031
import org.apache.doris.qe.ConnectContext;
3132
import org.apache.doris.resource.Tag;
@@ -104,21 +105,21 @@ public Object streamLoad(HttpServletRequest request,
104105
return redirectToHttps(request);
105106
}
106107

107-
try {
108-
executeCheckPassword(request, response);
109-
} catch (UnauthorizedException unauthorizedException) {
110-
if (LOG.isDebugEnabled()) {
111-
LOG.debug("Check password failed, going to check auth token, request: {}", request.toString());
108+
String authToken = request.getHeader("token");
109+
// if auth token is not null, check it first
110+
if (!Strings.isNullOrEmpty(authToken)) {
111+
if (!checkClusterToken(authToken)) {
112+
throw new UnauthorizedException("Invalid token: " + authToken);
112113
}
113-
114-
if (!checkClusterToken(request)) {
115-
throw unauthorizedException;
116-
} else {
117-
return executeWithClusterToken(request, db, table, true);
114+
return executeWithClusterToken(request, db, table, true);
115+
} else {
116+
try {
117+
executeCheckPassword(request, response);
118+
return executeWithoutPassword(request, response, db, table, true, groupCommit);
119+
} finally {
120+
ConnectContext.remove();
118121
}
119122
}
120-
121-
return executeWithoutPassword(request, response, db, table, true, groupCommit);
122123
}
123124

124125
@RequestMapping(path = "/api/_http_stream",
@@ -363,18 +364,8 @@ private TNetworkAddress selectRedirectBackend(boolean groupCommit) throws LoadEx
363364
// AuditlogPlugin should be re-disigned carefully, and blow method focuses on
364365
// temporarily addressing the users' needs for audit logs.
365366
// So this function is not widely tested under general scenario
366-
private boolean checkClusterToken(HttpServletRequest request) {
367-
if (LOG.isDebugEnabled()) {
368-
LOG.debug("Checking cluser token, request {}", request.toString());
369-
}
370-
371-
String authToken = request.getHeader("token");
372-
373-
if (Strings.isNullOrEmpty(authToken)) {
374-
return false;
375-
}
376-
377-
return Env.getCurrentEnv().getLoadManager().getTokenManager().checkAuthToken(authToken);
367+
private boolean checkClusterToken(String token) {
368+
return Env.getCurrentEnv().getLoadManager().getTokenManager().checkAuthToken(token);
378369
}
379370

380371
// NOTE: This function can only be used for AuditlogPlugin stream load for now.
@@ -388,6 +379,9 @@ private Object executeWithClusterToken(HttpServletRequest request, String db,
388379
ctx.setEnv(Env.getCurrentEnv());
389380
ctx.setThreadLocalInfo();
390381
ctx.setRemoteIP(request.getRemoteAddr());
382+
// set user to ADMIN_USER, so that we can get the proper resource tag
383+
ctx.setQualifiedUser(Auth.ADMIN_USER);
384+
ctx.setThreadLocalInfo();
391385

392386
String dbName = db;
393387
String tableName = table;
@@ -444,8 +438,10 @@ private Object executeWithClusterToken(HttpServletRequest request, String db,
444438

445439
return redirectView;
446440
} catch (Exception e) {
447-
LOG.warn("Failed to execute stream load with cluster token, {}", e);
441+
LOG.warn("Failed to execute stream load with cluster token, {}", e.getMessage(), e);
448442
return new RestBaseResult(e.getMessage());
443+
} finally {
444+
ConnectContext.remove();
449445
}
450446
}
451447
}

fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecordMgr.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,9 @@
2828
import org.apache.doris.common.util.MasterDaemon;
2929
import org.apache.doris.common.util.TimeUtils;
3030
import org.apache.doris.persist.gson.GsonUtils;
31-
import org.apache.doris.plugin.AuditEvent;
32-
import org.apache.doris.plugin.AuditEvent.EventType;
33-
import org.apache.doris.plugin.StreamLoadAuditEvent;
31+
import org.apache.doris.plugin.audit.AuditEvent;
32+
import org.apache.doris.plugin.audit.AuditEvent.EventType;
33+
import org.apache.doris.plugin.audit.StreamLoadAuditEvent;
3434
import org.apache.doris.system.Backend;
3535
import org.apache.doris.thrift.BackendService;
3636
import org.apache.doris.thrift.TNetworkAddress;

fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,8 @@
4141
import org.apache.doris.load.BrokerFileGroupAggInfo;
4242
import org.apache.doris.load.EtlJobType;
4343
import org.apache.doris.load.FailMsg;
44-
import org.apache.doris.plugin.AuditEvent;
45-
import org.apache.doris.plugin.LoadAuditEvent;
44+
import org.apache.doris.plugin.audit.AuditEvent;
45+
import org.apache.doris.plugin.audit.LoadAuditEvent;
4646
import org.apache.doris.qe.ConnectContext;
4747
import org.apache.doris.qe.OriginStatement;
4848
import org.apache.doris.qe.SessionVariable;

fe/fe-core/src/main/java/org/apache/doris/nereids/parser/Dialect.java

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -36,25 +36,25 @@ public enum Dialect {
3636
*/
3737
PRESTO("presto"),
3838
/**
39-
* Spark sql parser dialect
39+
* Spark3 sql parser dialect
4040
*/
41-
SPARK_SQL("spark_sql"),
41+
SPARK("spark"),
4242
/**
43-
* Hive parser dialect
43+
* Spark2 sql parser dialect
4444
*/
45-
HIVE("hive"),
45+
SPARK2("spark2"),
4646
/**
47-
* Alibaba max compute parser dialect
47+
* Flink sql parser dialect
4848
*/
49-
MAX_COMPUTE("max_compute"),
49+
FLINK("flink"),
5050
/**
51-
* Mysql parser dialect
51+
* Hive parser dialect
5252
*/
53-
MYSQL("mysql"),
53+
HIVE("hive"),
5454
/**
5555
* Postgresql parser dialect
5656
*/
57-
POSTGRESQL("postgresql"),
57+
POSTGRES("postgres"),
5858
/**
5959
* Sqlserver parser dialect
6060
*/
@@ -64,13 +64,9 @@ public enum Dialect {
6464
*/
6565
CLICKHOUSE("clickhouse"),
6666
/**
67-
* Sap hana parser dialect
68-
*/
69-
SAP_HANA("sap_hana"),
70-
/**
71-
* OceanBase parser dialect
67+
* oracle parser dialect
7268
*/
73-
OCEANBASE("oceanbase");
69+
ORACLE("oracle");
7470

7571
public static final int MAX_DIALECT_SIZE = Dialect.values().length;
7672

fe/fe-core/src/main/java/org/apache/doris/plugin/AuditPlugin.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.doris.plugin;
1919

20+
import org.apache.doris.plugin.audit.AuditEvent;
21+
2022
/**
2123
* Audit plugin interface describe.
2224
*/

fe/fe-core/src/main/java/org/apache/doris/plugin/PluginMgr.java

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@
2727
import org.apache.doris.nereids.parser.Dialect;
2828
import org.apache.doris.plugin.PluginInfo.PluginType;
2929
import org.apache.doris.plugin.PluginLoader.PluginStatus;
30-
import org.apache.doris.qe.AuditLogBuilder;
30+
import org.apache.doris.plugin.audit.AuditLoaderPlugin;
31+
import org.apache.doris.plugin.audit.AuditLogBuilder;
32+
import org.apache.doris.plugin.dialect.HttpDialectConverterPlugin;
3133

3234
import com.google.common.base.Strings;
3335
import com.google.common.collect.ImmutableList;
@@ -104,12 +106,24 @@ private boolean removeDynamicPluginName(String name) {
104106
}
105107

106108
private void initBuiltinPlugins() {
107-
// AuditLog
109+
// AuditLog: log audit log to file
108110
AuditLogBuilder auditLogBuilder = new AuditLogBuilder();
109111
if (!registerBuiltinPlugin(auditLogBuilder.getPluginInfo(), auditLogBuilder)) {
110112
LOG.warn("failed to register audit log builder");
111113
}
112114

115+
// AuditLoader: log audit log to internal table
116+
AuditLoaderPlugin auditLoaderPlugin = new AuditLoaderPlugin();
117+
if (!registerBuiltinPlugin(auditLoaderPlugin.getPluginInfo(), auditLoaderPlugin)) {
118+
LOG.warn("failed to register audit log builder");
119+
}
120+
121+
// sql dialect converter
122+
HttpDialectConverterPlugin httpDialectConverterPlugin = new HttpDialectConverterPlugin();
123+
if (!registerBuiltinPlugin(httpDialectConverterPlugin.getPluginInfo(), httpDialectConverterPlugin)) {
124+
LOG.warn("failed to register http dialect converter plugin");
125+
}
126+
113127
// other builtin plugins
114128
}
115129

@@ -217,11 +231,17 @@ public boolean registerBuiltinPlugin(PluginInfo pluginInfo, Plugin plugin) {
217231
}
218232

219233
PluginLoader loader = new BuiltinPluginLoader(Config.plugin_dir, pluginInfo, plugin);
220-
PluginLoader checkLoader = plugins[pluginInfo.getTypeId()].putIfAbsent(pluginInfo.getName(), loader);
234+
try {
235+
loader.install();
236+
} catch (Exception e) {
237+
LOG.warn("failed to register builtin plugin {}", pluginInfo.getName(), e);
238+
return false;
239+
}
221240
// add dialect plugin
222241
if (plugin instanceof DialectConverterPlugin) {
223242
addDialectPlugin((DialectConverterPlugin) plugin, pluginInfo);
224243
}
244+
PluginLoader checkLoader = plugins[pluginInfo.getTypeId()].putIfAbsent(pluginInfo.getName(), loader);
225245
return checkLoader == null;
226246
}
227247

0 commit comments

Comments
 (0)