2424import org .apache .doris .plugin .PluginContext ;
2525import org .apache .doris .plugin .PluginException ;
2626import org .apache .doris .plugin .PluginInfo ;
27+ import org .apache .doris .common .Config ;
2728
2829import org .apache .logging .log4j .LogManager ;
2930import org .apache .logging .log4j .Logger ;
@@ -56,8 +57,10 @@ public class AuditLoaderPlugin extends Plugin implements AuditPlugin {
5657 private static final ThreadLocal <SimpleDateFormat > dateFormatContainer = ThreadLocal .withInitial (
5758 () -> new SimpleDateFormat ("yyyy-MM-dd HH:mm:ss" ));
5859
59- private StringBuilder auditBuffer = new StringBuilder ();
60- private long lastLoadTime = 0 ;
60+ private StringBuilder auditLogBuffer = new StringBuilder ();
61+ private StringBuilder slowLogBuffer = new StringBuilder ();
62+ private long lastLoadTimeAuditLog = 0 ;
63+ private long lastLoadTimeSlowLog = 0 ;
6164
6265 private BlockingQueue <AuditEvent > auditEventQueue ;
6366 private DorisStreamLoader streamLoader ;
@@ -75,7 +78,8 @@ public void init(PluginInfo info, PluginContext ctx) throws PluginException {
7578 if (isInit ) {
7679 return ;
7780 }
78- this .lastLoadTime = System .currentTimeMillis ();
81+ this .lastLoadTimeAuditLog = System .currentTimeMillis ();
82+ this .lastLoadTimeSlowLog = System .currentTimeMillis ();
7983
8084 loadConfig (ctx , info .getProperties ());
8185
@@ -146,28 +150,35 @@ public void exec(AuditEvent event) {
146150 }
147151
148152 private void assembleAudit (AuditEvent event ) {
149- auditBuffer .append (event .queryId ).append ("\t " );
150- auditBuffer .append (longToTimeString (event .timestamp )).append ("\t " );
151- auditBuffer .append (event .clientIp ).append ("\t " );
152- auditBuffer .append (event .user ).append ("\t " );
153- auditBuffer .append (event .db ).append ("\t " );
154- auditBuffer .append (event .state ).append ("\t " );
155- auditBuffer .append (event .queryTime ).append ("\t " );
156- auditBuffer .append (event .scanBytes ).append ("\t " );
157- auditBuffer .append (event .scanRows ).append ("\t " );
158- auditBuffer .append (event .returnRows ).append ("\t " );
159- auditBuffer .append (event .stmtId ).append ("\t " );
160- auditBuffer .append (event .isQuery ? 1 : 0 ).append ("\t " );
161- auditBuffer .append (event .feIp ).append ("\t " );
162- auditBuffer .append (event .cpuTimeMs ).append ("\t " );
163- auditBuffer .append (event .sqlHash ).append ("\t " );
164- auditBuffer .append (event .sqlDigest ).append ("\t " );
165- auditBuffer .append (event .peakMemoryBytes ).append ("\t " );
153+ if (conf .enableSlowLog && event .queryTime > Config .qe_slow_log_ms ) {
154+ fillLogBuffer (event , slowLogBuffer );
155+ }
156+ fillLogBuffer (event , auditLogBuffer );
157+ }
158+
159+ private void fillLogBuffer (AuditEvent event , StringBuilder logBuffer ) {
160+ logBuffer .append (event .queryId ).append ("\t " );
161+ logBuffer .append (longToTimeString (event .timestamp )).append ("\t " );
162+ logBuffer .append (event .clientIp ).append ("\t " );
163+ logBuffer .append (event .user ).append ("\t " );
164+ logBuffer .append (event .db ).append ("\t " );
165+ logBuffer .append (event .state ).append ("\t " );
166+ logBuffer .append (event .queryTime ).append ("\t " );
167+ logBuffer .append (event .scanBytes ).append ("\t " );
168+ logBuffer .append (event .scanRows ).append ("\t " );
169+ logBuffer .append (event .returnRows ).append ("\t " );
170+ logBuffer .append (event .stmtId ).append ("\t " );
171+ logBuffer .append (event .isQuery ? 1 : 0 ).append ("\t " );
172+ logBuffer .append (event .feIp ).append ("\t " );
173+ logBuffer .append (event .cpuTimeMs ).append ("\t " );
174+ logBuffer .append (event .sqlHash ).append ("\t " );
175+ logBuffer .append (event .sqlDigest ).append ("\t " );
176+ logBuffer .append (event .peakMemoryBytes ).append ("\t " );
166177 // trim the query to avoid too long
167178 // use `getBytes().length` to get real byte length
168179 String stmt = truncateByBytes (event .stmt ).replace ("\n " , " " ).replace ("\t " , " " );
169180 LOG .debug ("receive audit event with stmt: {}" , stmt );
170- auditBuffer .append (stmt ).append ("\n " );
181+ logBuffer .append (stmt ).append ("\n " );
171182 }
172183
173184 private String truncateByBytes (String str ) {
@@ -186,21 +197,34 @@ private String truncateByBytes(String str) {
186197 return new String (charBuffer .array (), 0 , charBuffer .position ());
187198 }
188199
189- private void loadIfNecessary (DorisStreamLoader loader ) {
190- if (auditBuffer .length () < conf .maxBatchSize && System .currentTimeMillis () - lastLoadTime < conf .maxBatchIntervalSec * 1000 ) {
191- return ;
200+ private void loadIfNecessary (DorisStreamLoader loader , boolean slowLog ) {
201+ StringBuilder logBuffer = slowLog ? slowLogBuffer : auditLogBuffer ;
202+ long lastLoadTime = slowLog ? lastLoadTimeSlowLog : lastLoadTimeAuditLog ;
203+ long currentTime = System .currentTimeMillis ();
204+
205+ if (logBuffer .length () >= conf .maxBatchSize || currentTime - lastLoadTime >= conf .maxBatchIntervalSec * 1000 ) {
206+ // begin to load
207+ try {
208+ DorisStreamLoader .LoadResponse response = loader .loadBatch (logBuffer , slowLog );
209+ LOG .debug ("audit loader response: {}" , response );
210+ } catch (Exception e ) {
211+ LOG .debug ("encounter exception when putting current audit batch, discard current batch" , e );
212+ } finally {
213+ // make a new string builder to receive following events.
214+ resetLogBufferAndLastLoadTime (currentTime , slowLog );
215+ }
192216 }
193217
194- lastLoadTime = System . currentTimeMillis () ;
195- // begin to load
196- try {
197- DorisStreamLoader . LoadResponse response = loader . loadBatch ( auditBuffer );
198- LOG . debug ( "audit loader response: {}" , response );
199- } catch ( Exception e ) {
200- LOG . debug ( "encounter exception when putting current audit batch, discard current batch" , e ) ;
201- } finally {
202- // make a new string builder to receive following events.
203- this . auditBuffer = new StringBuilder () ;
218+ return ;
219+ }
220+
221+ private void resetLogBufferAndLastLoadTime ( long currentTime , boolean slowLog ) {
222+ if ( slowLog ) {
223+ this . slowLogBuffer = new StringBuilder ();
224+ lastLoadTimeSlowLog = currentTime ;
225+ } else {
226+ this . auditLogBuffer = new StringBuilder ();
227+ lastLoadTimeAuditLog = currentTime ;
204228 }
205229
206230 return ;
@@ -215,6 +239,9 @@ public static class AuditLoaderConf {
215239 public static final String PROP_PASSWORD = "password" ;
216240 public static final String PROP_DATABASE = "database" ;
217241 public static final String PROP_TABLE = "table" ;
242+ public static final String PROP_AUDIT_LOG_TABLE = "audit_log_table" ;
243+ public static final String PROP_SLOW_LOG_TABLE = "slow_log_table" ;
244+ public static final String PROP_ENABLE_SLOW_LOG = "enable_slow_log" ;
218245 // the max stmt length to be loaded in audit table.
219246 public static final String MAX_STMT_LENGTH = "max_stmt_length" ;
220247
@@ -225,7 +252,9 @@ public static class AuditLoaderConf {
225252 public String user = "root" ;
226253 public String password = "" ;
227254 public String database = "doris_audit_db__" ;
228- public String table = "doris_audit_tbl__" ;
255+ public String auditLogTable = "doris_audit_log_tbl__" ;
256+ public String slowLogTable = "doris_slow_log_tbl__" ;
257+ public boolean enableSlowLog = false ;
229258 // the identity of FE which run this plugin
230259 public String feIdentity = "" ;
231260 public int max_stmt_length = 4096 ;
@@ -253,8 +282,18 @@ public void init(Map<String, String> properties) throws PluginException {
253282 if (properties .containsKey (PROP_DATABASE )) {
254283 database = properties .get (PROP_DATABASE );
255284 }
285+ // If plugin.conf is not changed, the audit logs are imported to previous table
256286 if (properties .containsKey (PROP_TABLE )) {
257- table = properties .get (PROP_TABLE );
287+ auditLogTable = properties .get (PROP_TABLE );
288+ }
289+ if (properties .containsKey (PROP_AUDIT_LOG_TABLE )) {
290+ auditLogTable = properties .get (PROP_AUDIT_LOG_TABLE );
291+ }
292+ if (properties .containsKey (PROP_SLOW_LOG_TABLE )) {
293+ slowLogTable = properties .get (PROP_SLOW_LOG_TABLE );
294+ }
295+ if (properties .containsKey (PROP_ENABLE_SLOW_LOG )) {
296+ enableSlowLog = Boolean .valueOf (properties .get (PROP_ENABLE_SLOW_LOG ));
258297 }
259298 if (properties .containsKey (MAX_STMT_LENGTH )) {
260299 max_stmt_length = Integer .parseInt (properties .get (MAX_STMT_LENGTH ));
@@ -278,7 +317,12 @@ public void run() {
278317 AuditEvent event = auditEventQueue .poll (5 , TimeUnit .SECONDS );
279318 if (event != null ) {
280319 assembleAudit (event );
281- loadIfNecessary (loader );
320+ // process slow audit logs
321+ if (conf .enableSlowLog ) {
322+ loadIfNecessary (loader , true );
323+ }
324+ // process all audit logs
325+ loadIfNecessary (loader , false );
282326 }
283327 } catch (InterruptedException ie ) {
284328 LOG .debug ("encounter exception when loading current audit batch" , ie );
0 commit comments