Skip to content

Commit 6297ef1

Browse files
authored
[enhancement](plugin) import audit logs for slow queries into a separate table (#14100)
* import audit logs for slow queries into a separate table
1 parent 12652eb commit 6297ef1

5 files changed

Lines changed: 183 additions & 51 deletions

File tree

docs/en/docs/ecosystem/audit-plugin.md

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,46 @@ You can place this file on an http download server or copy(or unzip) it to the s
5252

5353
### Installation
5454

55-
After deployment is complete, and before installing the plugin, you need to create the audit database and tables previously specified in `plugin.conf`. The table creation statement is as follows:
55+
After deployment is complete, and before installing the plugin, you need to create the audit database and tables previously specified in `plugin.conf`. The database and table creation statement is as follows:
5656

5757
```
58-
create table doris_audit_tbl__
58+
create database doris_audit_db__;
59+
60+
create table doris_audit_db__.doris_audit_log_tbl__
61+
(
62+
query_id varchar(48) comment "Unique query id",
63+
`time` datetime not null comment "Query start time",
64+
client_ip varchar(32) comment "Client IP",
65+
user varchar(64) comment "User name",
66+
db varchar(96) comment "Database of this query",
67+
state varchar(8) comment "Query result state. EOF, ERR, OK",
68+
query_time bigint comment "Query execution time in millisecond",
69+
scan_bytes bigint comment "Total scan bytes of this query",
70+
scan_rows bigint comment "Total scan rows of this query",
71+
return_rows bigint comment "Returned rows of this query",
72+
stmt_id int comment "An incremental id of statement",
73+
is_query tinyint comment "Is this statemt a query. 1 or 0",
74+
frontend_ip varchar(32) comment "Frontend ip of executing this statement",
75+
cpu_time_ms bigint comment "Total scan cpu time in millisecond of this query",
76+
sql_hash varchar(48) comment "Hash value for this query",
77+
sql_digest varchar(48) comment "Sql digest for this query",
78+
peak_memory_bytes bigint comment "Peak memory bytes used on all backends of this query",
79+
stmt string comment "The original statement, trimed if longer than 2G "
80+
) engine=OLAP
81+
duplicate key(query_id, `time`, client_ip)
82+
partition by range(`time`) ()
83+
distributed by hash(query_id) buckets 1
84+
properties(
85+
"dynamic_partition.time_unit" = "DAY",
86+
"dynamic_partition.start" = "-30",
87+
"dynamic_partition.end" = "3",
88+
"dynamic_partition.prefix" = "p",
89+
"dynamic_partition.buckets" = "1",
90+
"dynamic_partition.enable" = "true",
91+
"replication_num" = "3"
92+
);
93+
94+
create table doris_audit_db__.doris_slow_log_tbl__
5995
(
6096
query_id varchar(48) comment "Unique query id",
6197
`time` datetime not null comment "Query start time",
@@ -71,7 +107,7 @@ create table doris_audit_tbl__
71107
is_query tinyint comment "Is this statemt a query. 1 or 0",
72108
frontend_ip varchar(32) comment "Frontend ip of executing this statement",
73109
cpu_time_ms bigint comment "Total scan cpu time in millisecond of this query",
74-
sql_hash varchar(50) comment "Hash value for this query",
110+
sql_hash varchar(48) comment "Hash value for this query",
75111
sql_digest varchar(48) comment "Sql digest for this query",
76112
peak_memory_bytes bigint comment "Peak memory bytes used on all backends of this query",
77113
stmt string comment "The original statement, trimed if longer than 2G"

docs/zh-CN/docs/ecosystem/audit-plugin.md

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,46 @@ auditloader plugin的配置位于`${DORIS}/fe_plugins/auditloader/src/main/assem
5252

5353
### 安装
5454

55-
部署完成后,安装插件前,需要创建之前在 `plugin.conf` 中指定的审计数据库和表。其中建表语句如下
55+
部署完成后,安装插件前,需要创建之前在 `plugin.conf` 中指定的审计数据库和表。其中建库与建表语句如下
5656

5757
```
58-
create table doris_audit_tbl__
58+
create database doris_audit_db__;
59+
60+
create table doris_audit_db__.doris_audit_log_tbl__
61+
(
62+
query_id varchar(48) comment "Unique query id",
63+
`time` datetime not null comment "Query start time",
64+
client_ip varchar(32) comment "Client IP",
65+
user varchar(64) comment "User name",
66+
db varchar(96) comment "Database of this query",
67+
state varchar(8) comment "Query result state. EOF, ERR, OK",
68+
query_time bigint comment "Query execution time in millisecond",
69+
scan_bytes bigint comment "Total scan bytes of this query",
70+
scan_rows bigint comment "Total scan rows of this query",
71+
return_rows bigint comment "Returned rows of this query",
72+
stmt_id int comment "An incremental id of statement",
73+
is_query tinyint comment "Is this statemt a query. 1 or 0",
74+
frontend_ip varchar(32) comment "Frontend ip of executing this statement",
75+
cpu_time_ms bigint comment "Total scan cpu time in millisecond of this query",
76+
sql_hash varchar(48) comment "Hash value for this query",
77+
sql_digest varchar(48) comment "Sql digest for this query",
78+
peak_memory_bytes bigint comment "Peak memory bytes used on all backends of this query",
79+
stmt string comment "The original statement, trimed if longer than 2G"
80+
) engine=OLAP
81+
duplicate key(query_id, `time`, client_ip)
82+
partition by range(`time`) ()
83+
distributed by hash(query_id) buckets 1
84+
properties(
85+
"dynamic_partition.time_unit" = "DAY",
86+
"dynamic_partition.start" = "-30",
87+
"dynamic_partition.end" = "3",
88+
"dynamic_partition.prefix" = "p",
89+
"dynamic_partition.buckets" = "1",
90+
"dynamic_partition.enable" = "true",
91+
"replication_num" = "3"
92+
);
93+
94+
create table doris_audit_db__.doris_slow_log_tbl__
5995
(
6096
query_id varchar(48) comment "Unique query id",
6197
`time` datetime not null comment "Query start time",

fe_plugins/auditloader/src/main/assembly/plugin.conf

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,14 @@ frontend_host_port=127.0.0.1:8030
3636
# Database of the audit table
3737
database=doris_audit_db__
3838

39-
# Audit table name, to save the audit data.
40-
table=doris_audit_tbl__
39+
# Audit table name, to save the audit log data.
40+
audit_log_table=doris_audit_log_tbl__
41+
42+
# Audit table name, to save the slow log data.
43+
slow_log_table=doris_slow_log_tbl__
44+
45+
# Whether import slow logs into a separate slow table, default is false
46+
enable_slow_log=false
4147

4248
# Doris user. This user must have LOAD_PRIV to the audit table.
4349
user=root

fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java

Lines changed: 81 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.doris.plugin.PluginContext;
2525
import org.apache.doris.plugin.PluginException;
2626
import org.apache.doris.plugin.PluginInfo;
27+
import org.apache.doris.common.Config;
2728

2829
import org.apache.logging.log4j.LogManager;
2930
import org.apache.logging.log4j.Logger;
@@ -56,8 +57,10 @@ public class AuditLoaderPlugin extends Plugin implements AuditPlugin {
5657
private static final ThreadLocal<SimpleDateFormat> dateFormatContainer = ThreadLocal.withInitial(
5758
() -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
5859

59-
private StringBuilder auditBuffer = new StringBuilder();
60-
private long lastLoadTime = 0;
60+
private StringBuilder auditLogBuffer = new StringBuilder();
61+
private StringBuilder slowLogBuffer = new StringBuilder();
62+
private long lastLoadTimeAuditLog = 0;
63+
private long lastLoadTimeSlowLog = 0;
6164

6265
private BlockingQueue<AuditEvent> auditEventQueue;
6366
private DorisStreamLoader streamLoader;
@@ -75,7 +78,8 @@ public void init(PluginInfo info, PluginContext ctx) throws PluginException {
7578
if (isInit) {
7679
return;
7780
}
78-
this.lastLoadTime = System.currentTimeMillis();
81+
this.lastLoadTimeAuditLog = System.currentTimeMillis();
82+
this.lastLoadTimeSlowLog = System.currentTimeMillis();
7983

8084
loadConfig(ctx, info.getProperties());
8185

@@ -146,28 +150,35 @@ public void exec(AuditEvent event) {
146150
}
147151

148152
private void assembleAudit(AuditEvent event) {
149-
auditBuffer.append(event.queryId).append("\t");
150-
auditBuffer.append(longToTimeString(event.timestamp)).append("\t");
151-
auditBuffer.append(event.clientIp).append("\t");
152-
auditBuffer.append(event.user).append("\t");
153-
auditBuffer.append(event.db).append("\t");
154-
auditBuffer.append(event.state).append("\t");
155-
auditBuffer.append(event.queryTime).append("\t");
156-
auditBuffer.append(event.scanBytes).append("\t");
157-
auditBuffer.append(event.scanRows).append("\t");
158-
auditBuffer.append(event.returnRows).append("\t");
159-
auditBuffer.append(event.stmtId).append("\t");
160-
auditBuffer.append(event.isQuery ? 1 : 0).append("\t");
161-
auditBuffer.append(event.feIp).append("\t");
162-
auditBuffer.append(event.cpuTimeMs).append("\t");
163-
auditBuffer.append(event.sqlHash).append("\t");
164-
auditBuffer.append(event.sqlDigest).append("\t");
165-
auditBuffer.append(event.peakMemoryBytes).append("\t");
153+
if (conf.enableSlowLog && event.queryTime > Config.qe_slow_log_ms) {
154+
fillLogBuffer(event, slowLogBuffer);
155+
}
156+
fillLogBuffer(event, auditLogBuffer);
157+
}
158+
159+
private void fillLogBuffer(AuditEvent event, StringBuilder logBuffer) {
160+
logBuffer.append(event.queryId).append("\t");
161+
logBuffer.append(longToTimeString(event.timestamp)).append("\t");
162+
logBuffer.append(event.clientIp).append("\t");
163+
logBuffer.append(event.user).append("\t");
164+
logBuffer.append(event.db).append("\t");
165+
logBuffer.append(event.state).append("\t");
166+
logBuffer.append(event.queryTime).append("\t");
167+
logBuffer.append(event.scanBytes).append("\t");
168+
logBuffer.append(event.scanRows).append("\t");
169+
logBuffer.append(event.returnRows).append("\t");
170+
logBuffer.append(event.stmtId).append("\t");
171+
logBuffer.append(event.isQuery ? 1 : 0).append("\t");
172+
logBuffer.append(event.feIp).append("\t");
173+
logBuffer.append(event.cpuTimeMs).append("\t");
174+
logBuffer.append(event.sqlHash).append("\t");
175+
logBuffer.append(event.sqlDigest).append("\t");
176+
logBuffer.append(event.peakMemoryBytes).append("\t");
166177
// trim the query to avoid too long
167178
// use `getBytes().length` to get real byte length
168179
String stmt = truncateByBytes(event.stmt).replace("\n", " ").replace("\t", " ");
169180
LOG.debug("receive audit event with stmt: {}", stmt);
170-
auditBuffer.append(stmt).append("\n");
181+
logBuffer.append(stmt).append("\n");
171182
}
172183

173184
private String truncateByBytes(String str) {
@@ -186,21 +197,34 @@ private String truncateByBytes(String str) {
186197
return new String(charBuffer.array(), 0, charBuffer.position());
187198
}
188199

189-
private void loadIfNecessary(DorisStreamLoader loader) {
190-
if (auditBuffer.length() < conf.maxBatchSize && System.currentTimeMillis() - lastLoadTime < conf.maxBatchIntervalSec * 1000) {
191-
return;
200+
private void loadIfNecessary(DorisStreamLoader loader, boolean slowLog) {
201+
StringBuilder logBuffer = slowLog ? slowLogBuffer : auditLogBuffer;
202+
long lastLoadTime = slowLog ? lastLoadTimeSlowLog : lastLoadTimeAuditLog;
203+
long currentTime = System.currentTimeMillis();
204+
205+
if (logBuffer.length() >= conf.maxBatchSize || currentTime - lastLoadTime >= conf.maxBatchIntervalSec * 1000) {
206+
// begin to load
207+
try {
208+
DorisStreamLoader.LoadResponse response = loader.loadBatch(logBuffer, slowLog);
209+
LOG.debug("audit loader response: {}", response);
210+
} catch (Exception e) {
211+
LOG.debug("encounter exception when putting current audit batch, discard current batch", e);
212+
} finally {
213+
// make a new string builder to receive following events.
214+
resetLogBufferAndLastLoadTime(currentTime, slowLog);
215+
}
192216
}
193217

194-
lastLoadTime = System.currentTimeMillis();
195-
// begin to load
196-
try {
197-
DorisStreamLoader.LoadResponse response = loader.loadBatch(auditBuffer);
198-
LOG.debug("audit loader response: {}", response);
199-
} catch (Exception e) {
200-
LOG.debug("encounter exception when putting current audit batch, discard current batch", e);
201-
} finally {
202-
// make a new string builder to receive following events.
203-
this.auditBuffer = new StringBuilder();
218+
return;
219+
}
220+
221+
private void resetLogBufferAndLastLoadTime(long currentTime, boolean slowLog) {
222+
if (slowLog) {
223+
this.slowLogBuffer = new StringBuilder();
224+
lastLoadTimeSlowLog = currentTime;
225+
} else {
226+
this.auditLogBuffer = new StringBuilder();
227+
lastLoadTimeAuditLog = currentTime;
204228
}
205229

206230
return;
@@ -215,6 +239,9 @@ public static class AuditLoaderConf {
215239
public static final String PROP_PASSWORD = "password";
216240
public static final String PROP_DATABASE = "database";
217241
public static final String PROP_TABLE = "table";
242+
public static final String PROP_AUDIT_LOG_TABLE = "audit_log_table";
243+
public static final String PROP_SLOW_LOG_TABLE = "slow_log_table";
244+
public static final String PROP_ENABLE_SLOW_LOG = "enable_slow_log";
218245
// the max stmt length to be loaded in audit table.
219246
public static final String MAX_STMT_LENGTH = "max_stmt_length";
220247

@@ -225,7 +252,9 @@ public static class AuditLoaderConf {
225252
public String user = "root";
226253
public String password = "";
227254
public String database = "doris_audit_db__";
228-
public String table = "doris_audit_tbl__";
255+
public String auditLogTable = "doris_audit_log_tbl__";
256+
public String slowLogTable = "doris_slow_log_tbl__";
257+
public boolean enableSlowLog = false;
229258
// the identity of FE which run this plugin
230259
public String feIdentity = "";
231260
public int max_stmt_length = 4096;
@@ -253,8 +282,18 @@ public void init(Map<String, String> properties) throws PluginException {
253282
if (properties.containsKey(PROP_DATABASE)) {
254283
database = properties.get(PROP_DATABASE);
255284
}
285+
// If plugin.conf is not changed, the audit logs are imported to previous table
256286
if (properties.containsKey(PROP_TABLE)) {
257-
table = properties.get(PROP_TABLE);
287+
auditLogTable = properties.get(PROP_TABLE);
288+
}
289+
if (properties.containsKey(PROP_AUDIT_LOG_TABLE)) {
290+
auditLogTable = properties.get(PROP_AUDIT_LOG_TABLE);
291+
}
292+
if (properties.containsKey(PROP_SLOW_LOG_TABLE)) {
293+
slowLogTable = properties.get(PROP_SLOW_LOG_TABLE);
294+
}
295+
if (properties.containsKey(PROP_ENABLE_SLOW_LOG)) {
296+
enableSlowLog = Boolean.valueOf(properties.get(PROP_ENABLE_SLOW_LOG));
258297
}
259298
if (properties.containsKey(MAX_STMT_LENGTH)) {
260299
max_stmt_length = Integer.parseInt(properties.get(MAX_STMT_LENGTH));
@@ -278,7 +317,12 @@ public void run() {
278317
AuditEvent event = auditEventQueue.poll(5, TimeUnit.SECONDS);
279318
if (event != null) {
280319
assembleAudit(event);
281-
loadIfNecessary(loader);
320+
// process slow audit logs
321+
if (conf.enableSlowLog) {
322+
loadIfNecessary(loader, true);
323+
}
324+
// process all audit logs
325+
loadIfNecessary(loader, false);
282326
}
283327
} catch (InterruptedException ie) {
284328
LOG.debug("encounter exception when loading current audit batch", ie);

fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/DorisStreamLoader.java

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,21 +38,25 @@ public class DorisStreamLoader {
3838
private static String loadUrlPattern = "http://%s/api/%s/%s/_stream_load?";
3939
private String hostPort;
4040
private String db;
41-
private String tbl;
41+
private String auditLogTbl;
42+
private String slowLogTbl;
4243
private String user;
4344
private String passwd;
44-
private String loadUrlStr;
45+
private String auditLogLoadUrlStr;
46+
private String slowLogLoadUrlStr;
4547
private String authEncoding;
4648
private String feIdentity;
4749

4850
public DorisStreamLoader(AuditLoaderPlugin.AuditLoaderConf conf) {
4951
this.hostPort = conf.frontendHostPort;
5052
this.db = conf.database;
51-
this.tbl = conf.table;
53+
this.auditLogTbl = conf.auditLogTable;
54+
this.slowLogTbl = conf.slowLogTable;
5255
this.user = conf.user;
5356
this.passwd = conf.password;
5457

55-
this.loadUrlStr = String.format(loadUrlPattern, hostPort, db, tbl);
58+
this.auditLogLoadUrlStr = String.format(loadUrlPattern, hostPort, db, auditLogTbl);
59+
this.slowLogLoadUrlStr = String.format(loadUrlPattern, hostPort, db, slowLogTbl);
5660
this.authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8));
5761
// currently, FE identity is FE's IP, so we replace the "." in IP to make it suitable for label
5862
this.feIdentity = conf.feIdentity.replaceAll("\\.", "_");
@@ -112,9 +116,9 @@ private String getContent(HttpURLConnection conn) {
112116
return response.toString();
113117
}
114118

115-
public LoadResponse loadBatch(StringBuilder sb) {
119+
public LoadResponse loadBatch(StringBuilder sb, boolean slowLog) {
116120
Calendar calendar = Calendar.getInstance();
117-
String label = String.format("audit_%s%02d%02d_%02d%02d%02d_%s",
121+
String label = String.format("_log_%s%02d%02d_%02d%02d%02d_%s",
118122
calendar.get(Calendar.YEAR), calendar.get(Calendar.MONTH) + 1, calendar.get(Calendar.DAY_OF_MONTH),
119123
calendar.get(Calendar.HOUR_OF_DAY), calendar.get(Calendar.MINUTE), calendar.get(Calendar.SECOND),
120124
feIdentity);
@@ -123,7 +127,13 @@ public LoadResponse loadBatch(StringBuilder sb) {
123127
HttpURLConnection beConn = null;
124128
try {
125129
// build request and send to fe
126-
feConn = getConnection(loadUrlStr, label);
130+
if (slowLog) {
131+
label = "slow" + label;
132+
feConn = getConnection(slowLogLoadUrlStr, label);
133+
} else {
134+
label = "audit" + label;
135+
feConn = getConnection(auditLogLoadUrlStr, label);
136+
}
127137
int status = feConn.getResponseCode();
128138
// fe send back http response code TEMPORARY_REDIRECT 307 and new be location
129139
if (status != 307) {

0 commit comments

Comments
 (0)