From ba140b9c807d4401129b0ee333d4f5a094de80a5 Mon Sep 17 00:00:00 2001 From: bbernays Date: Mon, 4 Aug 2025 11:33:30 -0500 Subject: [PATCH 1/5] Create analyze.go --- cli/cmd/analyze.go | 189 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 189 insertions(+) create mode 100644 cli/cmd/analyze.go diff --git a/cli/cmd/analyze.go b/cli/cmd/analyze.go new file mode 100644 index 00000000000000..8a5dcd22d1f2da --- /dev/null +++ b/cli/cmd/analyze.go @@ -0,0 +1,189 @@ +package cmd + +import ( + "bufio" + "fmt" + "os" + "regexp" + "sort" + "time" + + "github.com/spf13/cobra" +) + +type logEntry struct { + invocationID string + key string + ts string + typ string +} + +type tableData struct { + times []time.Time + types []string +} + +const ( + analyzeShort = "Analyze CloudQuery log files to detect stalled tables and calculate execution times" + analyzeExample = `# Analyze a CloudQuery log file to find stalled tables +cloudquery analyze --file path/to/cloudquery.log` +) + +func NewCmdAnalyze() *cobra.Command { + cmd := &cobra.Command{ + Use: "analyze", + Short: analyzeShort, + Long: analyzeShort, + Example: analyzeExample, + RunE: analyze, + } + cmd.Flags().StringP("file", "f", "", "Path to the CloudQuery log file") + _ = cmd.MarkFlagRequired("file") + + return cmd +} + +func analyze(cmd *cobra.Command, args []string) error { + logFile, err := cmd.Flags().GetString("file") + if err != nil { + return err + } + return analyzeLogFile(logFile) +} + +func extractInvocationID(line string) string { + invocationPattern := regexp.MustCompile(`invocation_id=([^\s]+)`) + if matches := invocationPattern.FindStringSubmatch(line); matches != nil { + return matches[1] + } + return "" +} + +func analyzeLogFile(filePath string) error { + // Define the regular expression patterns + patternEnd := regexp.MustCompile(`^(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z)\s+(INF|ERR|WRN)\s+(.*)\s+client=(.*)\s(.*)+errors=(\d+)?\s+module=([a-zA-Z-]+)?\s+resources=(\d+)?\s+table=(\w+)?`) + patternStart := regexp.MustCompile(`^(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z)\s+(INF|ERR|WRN)\s+(.*)\s+client=(.*)\s+(.*)\s+module=([a-zA-Z-]+)?\s+table=(\w+)?`) + + fmt.Printf("Analyzing log file: %s\n", filePath) + + // Open the log file + file, err := os.Open(filePath) + if err != nil { + return fmt.Errorf("failed to open log file: %w", err) + } + defer file.Close() + + // Process log entries + allData := []logEntry{} + scanner := bufio.NewScanner(file) + for scanner.Scan() { + line := scanner.Text() + + // Try to match end pattern + if matches := patternEnd.FindStringSubmatch(line); matches != nil { + allData = append(allData, logEntry{ + invocationID: extractInvocationID(line), + key: matches[4] + matches[9], + ts: matches[1], + typ: "end", + }) + continue + } + + // Try to match start pattern + if matches := patternStart.FindStringSubmatch(line); matches != nil { + allData = append(allData, logEntry{ + invocationID: extractInvocationID(line), + key: matches[4] + matches[7], + ts: matches[1], + typ: "start", + }) + continue + } + } + + if err := scanner.Err(); err != nil { + return fmt.Errorf("error reading log file: %w", err) + } + + // Normalize data + normalizedData := make(map[string]map[string]*tableData) + for _, data := range allData { + clientPair := data.key + ts := data.ts + + if _, exists := normalizedData[data.invocationID]; !exists { + normalizedData[data.invocationID] = make(map[string]*tableData) + } + if _, exists := normalizedData[data.invocationID][data.key]; !exists { + normalizedData[data.invocationID][clientPair] = &tableData{ + times: []time.Time{}, + types: []string{}, + } + } + + // Parse timestamp + parsedTime, err := time.Parse("2006-01-02T15:04:05Z", ts) + if err != nil { + fmt.Printf("Warning: could not parse timestamp %s: %v\n", ts, err) + continue + } + + normalizedData[data.invocationID][clientPair].times = append(normalizedData[data.invocationID][clientPair].times, parsedTime) + normalizedData[data.invocationID][clientPair].types = append(normalizedData[data.invocationID][clientPair].types, data.typ) + } + + // Sort times for each entry + for invocationID := range normalizedData { + for _, data := range normalizedData[invocationID] { + sort.Slice(data.times, func(i, j int) bool { + return data.times[i].Before(data.times[j]) + }) + } + } + + // Calculate time differences and find tables that never completed + type timeDiffKeyPair struct { + timeDiff int + key string + } + + timeDiffKeyPairs := make(map[string][]timeDiffKeyPair) + + for invocationID := range normalizedData { + for key, data := range normalizedData[invocationID] { + if len(data.times) > 1 { + // Calculate time difference in minutes + timeDiff := int(data.times[len(data.times)-1].Sub(data.times[0]).Seconds() / 60) + if timeDiff > 0 { + if _, exists := timeDiffKeyPairs[invocationID]; !exists { + timeDiffKeyPairs[invocationID] = []timeDiffKeyPair{} + } + timeDiffKeyPairs[invocationID] = append(timeDiffKeyPairs[invocationID], timeDiffKeyPair{ + timeDiff: timeDiff, + key: key, + }) + } + } else if len(data.types) == 1 && data.types[0] == "start" { + fmt.Printf("Table never completed: %s for invocation %s\n", key, invocationID) + } + } + } + + for invocationID := range timeDiffKeyPairs { + // Sort time differences in descending order + sort.Slice(timeDiffKeyPairs[invocationID], func(i, j int) bool { + return timeDiffKeyPairs[invocationID][i].timeDiff > timeDiffKeyPairs[invocationID][j].timeDiff + }) + } + + for invocationID := range timeDiffKeyPairs { + fmt.Printf("Invocation ID: %s\n", invocationID) + // Print time differences + for _, pair := range timeDiffKeyPairs[invocationID] { + fmt.Printf(" %d minutes - %s\n", pair.timeDiff, pair.key) + } + } + + return nil +} From 103873df8c08b906071420757afbe5d159f2681c Mon Sep 17 00:00:00 2001 From: bbernays Date: Mon, 4 Aug 2025 11:33:33 -0500 Subject: [PATCH 2/5] Update root.go --- cli/cmd/root.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cli/cmd/root.go b/cli/cmd/root.go index e5d10214008efc..aaa8c28ab1dba6 100644 --- a/cli/cmd/root.go +++ b/cli/cmd/root.go @@ -208,6 +208,7 @@ func NewCmdRoot() *cobra.Command { newCmdSwitch(), newCmdTestConnection(), newCmdValidateConfig(), + NewCmdAnalyze(), newCmdPluginInstall(true), // legacy pluginCmd, addonCmd, From 59f0dc903942b62add904c2431a4f4b1103c4107 Mon Sep 17 00:00:00 2001 From: bbernays Date: Mon, 4 Aug 2025 11:58:05 -0500 Subject: [PATCH 3/5] Update analyze.go --- cli/cmd/analyze.go | 49 ++++++++++++++++++++++++++++++++-------------- 1 file changed, 34 insertions(+), 15 deletions(-) diff --git a/cli/cmd/analyze.go b/cli/cmd/analyze.go index 8a5dcd22d1f2da..8287c1160d15e5 100644 --- a/cli/cmd/analyze.go +++ b/cli/cmd/analyze.go @@ -26,7 +26,10 @@ type tableData struct { const ( analyzeShort = "Analyze CloudQuery log files to detect stalled tables and calculate execution times" analyzeExample = `# Analyze a CloudQuery log file to find stalled tables -cloudquery analyze --file path/to/cloudquery.log` +cloudquery analyze --file path/to/cloudquery.log + +# Analyze data for a specific invocation ID only +cloudquery analyze --file path/to/cloudquery.log --invocation-id abc123` ) func NewCmdAnalyze() *cobra.Command { @@ -38,6 +41,7 @@ func NewCmdAnalyze() *cobra.Command { RunE: analyze, } cmd.Flags().StringP("file", "f", "", "Path to the CloudQuery log file") + cmd.Flags().StringP("invocation-id", "i", "", "Only analyze data for a specific invocation ID") _ = cmd.MarkFlagRequired("file") return cmd @@ -48,7 +52,13 @@ func analyze(cmd *cobra.Command, args []string) error { if err != nil { return err } - return analyzeLogFile(logFile) + + invocationID, err := cmd.Flags().GetString("invocation-id") + if err != nil { + return err + } + + return analyzeLogFile(logFile, invocationID) } func extractInvocationID(line string) string { @@ -59,9 +69,9 @@ func extractInvocationID(line string) string { return "" } -func analyzeLogFile(filePath string) error { +func analyzeLogFile(filePath string, filterInvocationID string) error { // Define the regular expression patterns - patternEnd := regexp.MustCompile(`^(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z)\s+(INF|ERR|WRN)\s+(.*)\s+client=(.*)\s(.*)+errors=(\d+)?\s+module=([a-zA-Z-]+)?\s+resources=(\d+)?\s+table=(\w+)?`) + patternEnd := regexp.MustCompile(`^(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z)\s+(INF|ERR|WRN)\s+(.*)\s+client=(.*)\s+(.*)\s+errors=(\d+)?\s+(.*)+\s+module=([a-zA-Z-]+)?\s+resources=(\d+)?\s+table=(\w+)?`) patternStart := regexp.MustCompile(`^(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z)\s+(INF|ERR|WRN)\s+(.*)\s+client=(.*)\s+(.*)\s+module=([a-zA-Z-]+)?\s+table=(\w+)?`) fmt.Printf("Analyzing log file: %s\n", filePath) @@ -79,11 +89,19 @@ func analyzeLogFile(filePath string) error { for scanner.Scan() { line := scanner.Text() + // Extract invocation ID for filtering + extractedInvocationID := extractInvocationID(line) + + // Skip if we're filtering by invocation ID and this line doesn't match + if filterInvocationID != "" && extractedInvocationID != filterInvocationID { + continue + } + // Try to match end pattern if matches := patternEnd.FindStringSubmatch(line); matches != nil { allData = append(allData, logEntry{ - invocationID: extractInvocationID(line), - key: matches[4] + matches[9], + invocationID: extractedInvocationID, + key: matches[4] + matches[10], ts: matches[1], typ: "end", }) @@ -93,7 +111,7 @@ func analyzeLogFile(filePath string) error { // Try to match start pattern if matches := patternStart.FindStringSubmatch(line); matches != nil { allData = append(allData, logEntry{ - invocationID: extractInvocationID(line), + invocationID: extractedInvocationID, key: matches[4] + matches[7], ts: matches[1], typ: "start", @@ -155,15 +173,15 @@ func analyzeLogFile(filePath string) error { if len(data.times) > 1 { // Calculate time difference in minutes timeDiff := int(data.times[len(data.times)-1].Sub(data.times[0]).Seconds() / 60) - if timeDiff > 0 { - if _, exists := timeDiffKeyPairs[invocationID]; !exists { - timeDiffKeyPairs[invocationID] = []timeDiffKeyPair{} - } - timeDiffKeyPairs[invocationID] = append(timeDiffKeyPairs[invocationID], timeDiffKeyPair{ - timeDiff: timeDiff, - key: key, - }) + + if _, exists := timeDiffKeyPairs[invocationID]; !exists { + timeDiffKeyPairs[invocationID] = []timeDiffKeyPair{} } + timeDiffKeyPairs[invocationID] = append(timeDiffKeyPairs[invocationID], timeDiffKeyPair{ + timeDiff: timeDiff, + key: key, + }) + } else if len(data.types) == 1 && data.types[0] == "start" { fmt.Printf("Table never completed: %s for invocation %s\n", key, invocationID) } @@ -177,6 +195,7 @@ func analyzeLogFile(filePath string) error { }) } + // Display results for all invocation IDs for invocationID := range timeDiffKeyPairs { fmt.Printf("Invocation ID: %s\n", invocationID) // Print time differences From afab67645d821f912355d239740efc8f27699c89 Mon Sep 17 00:00:00 2001 From: bbernays Date: Mon, 11 Aug 2025 11:44:13 -0500 Subject: [PATCH 4/5] Update analyze.go --- cli/cmd/analyze.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/cli/cmd/analyze.go b/cli/cmd/analyze.go index 8287c1160d15e5..7a0cfcfe38baec 100644 --- a/cli/cmd/analyze.go +++ b/cli/cmd/analyze.go @@ -172,7 +172,7 @@ func analyzeLogFile(filePath string, filterInvocationID string) error { for key, data := range normalizedData[invocationID] { if len(data.times) > 1 { // Calculate time difference in minutes - timeDiff := int(data.times[len(data.times)-1].Sub(data.times[0]).Seconds() / 60) + timeDiff := int(data.times[len(data.times)-1].Sub(data.times[0]).Seconds()) if _, exists := timeDiffKeyPairs[invocationID]; !exists { timeDiffKeyPairs[invocationID] = []timeDiffKeyPair{} @@ -183,11 +183,13 @@ func analyzeLogFile(filePath string, filterInvocationID string) error { }) } else if len(data.types) == 1 && data.types[0] == "start" { - fmt.Printf("Table never completed: %s for invocation %s\n", key, invocationID) + fmt.Printf("Table never completed: %s\n", key) } } } + + for invocationID := range timeDiffKeyPairs { // Sort time differences in descending order sort.Slice(timeDiffKeyPairs[invocationID], func(i, j int) bool { @@ -200,7 +202,7 @@ func analyzeLogFile(filePath string, filterInvocationID string) error { fmt.Printf("Invocation ID: %s\n", invocationID) // Print time differences for _, pair := range timeDiffKeyPairs[invocationID] { - fmt.Printf(" %d minutes - %s\n", pair.timeDiff, pair.key) + fmt.Printf(" %d Seconds - %s\n", pair.timeDiff, pair.key) } } From 951328120298a779a4c7841bc82abf4bb69b3e36 Mon Sep 17 00:00:00 2001 From: bbernays Date: Mon, 11 Aug 2025 13:19:56 -0500 Subject: [PATCH 5/5] Update analyze.go --- cli/cmd/analyze.go | 41 +++++++++++++++++++++++++++++++++-------- 1 file changed, 33 insertions(+), 8 deletions(-) diff --git a/cli/cmd/analyze.go b/cli/cmd/analyze.go index 7a0cfcfe38baec..33b6b8ac8c51f1 100644 --- a/cli/cmd/analyze.go +++ b/cli/cmd/analyze.go @@ -151,12 +151,21 @@ func analyzeLogFile(filePath string, filterInvocationID string) error { normalizedData[data.invocationID][clientPair].types = append(normalizedData[data.invocationID][clientPair].types, data.typ) } - // Sort times for each entry + // Sort times for each entry and find the latest timestamp in the log + latestTimestamp := time.Time{} for invocationID := range normalizedData { for _, data := range normalizedData[invocationID] { sort.Slice(data.times, func(i, j int) bool { return data.times[i].Before(data.times[j]) }) + + // Update latest timestamp if this entry has later timestamps + for _, ts := range data.times { + if !ts.After(latestTimestamp) { + continue + } + latestTimestamp = data.times[len(data.times)-1] + } } } @@ -169,6 +178,11 @@ func analyzeLogFile(filePath string, filterInvocationID string) error { timeDiffKeyPairs := make(map[string][]timeDiffKeyPair) for invocationID := range normalizedData { + // Skip if filtering by invocation ID and this isn't the one + if filterInvocationID != "" && filterInvocationID != invocationID { + continue + } + for key, data := range normalizedData[invocationID] { if len(data.times) > 1 { // Calculate time difference in minutes @@ -183,25 +197,36 @@ func analyzeLogFile(filePath string, filterInvocationID string) error { }) } else if len(data.types) == 1 && data.types[0] == "start" { - fmt.Printf("Table never completed: %s\n", key) + // Calculate how long the table was running based on the log's latest timestamp + if !latestTimestamp.IsZero() { + runningTime := int(latestTimestamp.Sub(data.times[0]).Seconds()) + fmt.Printf("Table never completed (running for %d seconds as of last log entry): %s for invocation %s\n", + runningTime, key, invocationID) + } else { + fmt.Printf("Table never completed: %s for invocation %s (cannot determine running time)\n", + key, invocationID) + } } } } - - + // Sort time differences in descending order for invocationID := range timeDiffKeyPairs { - // Sort time differences in descending order sort.Slice(timeDiffKeyPairs[invocationID], func(i, j int) bool { return timeDiffKeyPairs[invocationID][i].timeDiff > timeDiffKeyPairs[invocationID][j].timeDiff }) } - // Display results for all invocation IDs - for invocationID := range timeDiffKeyPairs { + // Display results + for invocationID, pairs := range timeDiffKeyPairs { + // Skip if filtering by invocation ID and this isn't the one + if filterInvocationID != "" && filterInvocationID != invocationID { + continue + } + fmt.Printf("Invocation ID: %s\n", invocationID) // Print time differences - for _, pair := range timeDiffKeyPairs[invocationID] { + for _, pair := range pairs { fmt.Printf(" %d Seconds - %s\n", pair.timeDiff, pair.key) } }