From 2798487d1a31ef24b5a6ac1e56ce67c8728e61a9 Mon Sep 17 00:00:00 2001 From: Andrii Romanenko Date: Wed, 19 Jan 2022 23:40:38 +0200 Subject: [PATCH 1/7] added alias to fetches table, alias used in console fetch reports, fetch summary start and end are nullable --- pkg/client/client.go | 20 ++++++++++---------- pkg/client/fetch.go | 5 +++-- pkg/client/migrations/2_v0.19.3.down.sql | 5 +++++ pkg/client/migrations/2_v0.19.3.up.sql | 7 +++++++ 4 files changed, 25 insertions(+), 12 deletions(-) create mode 100644 pkg/client/migrations/2_v0.19.3.down.sql create mode 100644 pkg/client/migrations/2_v0.19.3.up.sql diff --git a/pkg/client/client.go b/pkg/client/client.go index 7113e63fef8..70ebf37494b 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -423,28 +423,26 @@ func (c *Client) Fetch(ctx context.Context, request FetchRequest) (res *FetchRes for _, providerConfig := range request.Providers { providerConfig := providerConfig + fs := FetchSummary{ + FetchId: fetchId, + ProviderName: providerConfig.Name, + ProviderAlias: providerConfig.Alias, + } c.Logger.Debug("creating provider plugin", "provider", providerConfig.Name) providerPlugin, err := c.Manager.CreatePlugin(providerConfig.Name, providerConfig.Alias, providerConfig.Env) if err != nil { c.Logger.Error("failed to create provider plugin", "provider", providerConfig.Name, "error", err) return nil, err } + fs.ProviderVersion = providerPlugin.Version() // TODO: move this into an outer function errGroup.Go(func() error { - fs := FetchSummary{ - FetchId: fetchId, - ProviderName: providerConfig.Name, - ProviderVersion: providerPlugin.Version(), - Start: time.Now().UTC(), - } - defer func() { if err := SaveFetchSummary(ctx, c.pool, &fs); err != nil { c.Logger.Error("failed to save fetch summary", "err", err) } }() - pLog := c.Logger.With("provider", providerConfig.Name, "alias", providerConfig.Alias, "version", providerPlugin.Version()) pLog.Info("requesting provider to configure") if c.HistoryCfg != nil { @@ -471,6 +469,7 @@ func (c *Client) Fetch(ctx context.Context, request FetchRequest) (res *FetchRes pLog.Info("requesting provider fetch", "partial_fetch_enabled", providerConfig.EnablePartialFetch) fetchStart := time.Now() + fs.Start = &fetchStart stream, err := providerPlugin.Provider().FetchResources(ctx, &cqproto.FetchResourcesRequest{ Resources: providerConfig.Resources, @@ -507,7 +506,7 @@ func (c *Client) Fetch(ctx context.Context, request FetchRequest) (res *FetchRes pLog.Warn("received partial fetch error", parsePartialFetchKV(fetchError)...) } fetchSummaries <- ProviderFetchSummary{ - ProviderName: providerConfig.Name, + ProviderName: providerConfig.Alias, Version: providerPlugin.Version(), TotalResourcesFetched: totalResources, PartialFetchErrors: partialFetchResults, @@ -515,7 +514,8 @@ func (c *Client) Fetch(ctx context.Context, request FetchRequest) (res *FetchRes FetchResources: fetchedResources, Status: status, } - fs.Finish = time.Now().UTC() + t := time.Now().UTC() + fs.Finish = &t fs.IsSuccess = true fs.TotalErrorsCount = totalErrors fs.TotalResourceCount = totalResources diff --git a/pkg/client/fetch.go b/pkg/client/fetch.go index 6fe89c3b9ed..25d8ec39ef1 100644 --- a/pkg/client/fetch.go +++ b/pkg/client/fetch.go @@ -19,12 +19,13 @@ type FetchSummary struct { CqId uuid.UUID `db:"id"` // Unique Id of fetch session FetchId uuid.UUID `db:"fetch_id"` - Start time.Time `db:"start"` - Finish time.Time `db:"finish"` + Start *time.Time `db:"start"` + Finish *time.Time `db:"finish"` IsSuccess bool `db:"is_success"` TotalResourceCount uint64 `db:"total_resource_count"` TotalErrorsCount uint64 `db:"total_errors_count"` ProviderName string `db:"provider_name"` + ProviderAlias string `db:"provider_alias"` ProviderVersion string `db:"provider_version"` Resources ResourceFetchSummaries `db:"results"` } diff --git a/pkg/client/migrations/2_v0.19.3.down.sql b/pkg/client/migrations/2_v0.19.3.down.sql new file mode 100644 index 00000000000..fb0f9b44c67 --- /dev/null +++ b/pkg/client/migrations/2_v0.19.3.down.sql @@ -0,0 +1,5 @@ +ALTER TABLE IF EXISTS fetches + DROP CONSTRAINT fetches_pk; +ALTER TABLE IF EXISTS fetches ADD CONSTRAINT "fetches_pk" UNIQUE (fetch_id, provider_name); +ALTER TABLE IF EXISTS fetches + DROP COLUMN provider_alias; \ No newline at end of file diff --git a/pkg/client/migrations/2_v0.19.3.up.sql b/pkg/client/migrations/2_v0.19.3.up.sql new file mode 100644 index 00000000000..fc26c75d66d --- /dev/null +++ b/pkg/client/migrations/2_v0.19.3.up.sql @@ -0,0 +1,7 @@ +ALTER TABLE IF EXISTS fetches + ADD COLUMN provider_alias TEXT; +ALTER TABLE IF EXISTS fetches + DROP CONSTRAINT fetches_pk; +ALTER TABLE IF EXISTS fetches + ADD CONSTRAINT "fetches_pk" UNIQUE (fetch_id, provider_name, provider_alias); + From b9d9a5c4231c5bea8252d3f82c1a1cbd250ed3a5 Mon Sep 17 00:00:00 2001 From: Andrii Romanenko Date: Wed, 19 Jan 2022 23:45:39 +0200 Subject: [PATCH 2/7] test fix, fetch summary save on provider creation error. --- pkg/client/client.go | 12 +++++++----- pkg/client/fetch_test.go | 3 ++- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/pkg/client/client.go b/pkg/client/client.go index 70ebf37494b..4783b332196 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -428,21 +428,23 @@ func (c *Client) Fetch(ctx context.Context, request FetchRequest) (res *FetchRes ProviderName: providerConfig.Name, ProviderAlias: providerConfig.Alias, } + saveFetchSummary := func() { + if err := SaveFetchSummary(ctx, c.pool, &fs); err != nil { + c.Logger.Error("failed to save fetch summary", "err", err) + } + } c.Logger.Debug("creating provider plugin", "provider", providerConfig.Name) providerPlugin, err := c.Manager.CreatePlugin(providerConfig.Name, providerConfig.Alias, providerConfig.Env) if err != nil { c.Logger.Error("failed to create provider plugin", "provider", providerConfig.Name, "error", err) + saveFetchSummary() return nil, err } fs.ProviderVersion = providerPlugin.Version() // TODO: move this into an outer function errGroup.Go(func() error { - defer func() { - if err := SaveFetchSummary(ctx, c.pool, &fs); err != nil { - c.Logger.Error("failed to save fetch summary", "err", err) - } - }() + defer saveFetchSummary() pLog := c.Logger.With("provider", providerConfig.Name, "alias", providerConfig.Alias, "version", providerPlugin.Version()) pLog.Info("requesting provider to configure") if c.HistoryCfg != nil { diff --git a/pkg/client/fetch_test.go b/pkg/client/fetch_test.go index 244d346deb5..4d41ef69ef9 100644 --- a/pkg/client/fetch_test.go +++ b/pkg/client/fetch_test.go @@ -103,7 +103,8 @@ func TestFetchSummary(t *testing.T) { if !f.skipFetchId { f.summary.FetchId = fetchId } - f.summary.Start = time.Now() + start := time.Now() + f.summary.Start = &start err := SaveFetchSummary(context.Background(), pool, &f.summary) if f.err != nil { assert.Equal(t, f.err.Error(), err.Error()) From 2de22c72b7523ff9fd65aa62c16b652a4935942a Mon Sep 17 00:00:00 2001 From: Andrii Romanenko Date: Thu, 20 Jan 2022 10:18:38 +0200 Subject: [PATCH 3/7] added created_at column to fetch summary --- pkg/client/client.go | 2 ++ pkg/client/fetch.go | 1 + pkg/client/migrations/2_v0.19.3.up.sql | 2 ++ 3 files changed, 5 insertions(+) diff --git a/pkg/client/client.go b/pkg/client/client.go index 4783b332196..9567d1d5412 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -423,10 +423,12 @@ func (c *Client) Fetch(ctx context.Context, request FetchRequest) (res *FetchRes for _, providerConfig := range request.Providers { providerConfig := providerConfig + createdAt := time.Now().UTC() fs := FetchSummary{ FetchId: fetchId, ProviderName: providerConfig.Name, ProviderAlias: providerConfig.Alias, + CreatedAt: &createdAt, } saveFetchSummary := func() { if err := SaveFetchSummary(ctx, c.pool, &fs); err != nil { diff --git a/pkg/client/fetch.go b/pkg/client/fetch.go index 25d8ec39ef1..912b6960ba4 100644 --- a/pkg/client/fetch.go +++ b/pkg/client/fetch.go @@ -19,6 +19,7 @@ type FetchSummary struct { CqId uuid.UUID `db:"id"` // Unique Id of fetch session FetchId uuid.UUID `db:"fetch_id"` + CreatedAt *time.Time `db:"created_at"` Start *time.Time `db:"start"` Finish *time.Time `db:"finish"` IsSuccess bool `db:"is_success"` diff --git a/pkg/client/migrations/2_v0.19.3.up.sql b/pkg/client/migrations/2_v0.19.3.up.sql index fc26c75d66d..971407c6c33 100644 --- a/pkg/client/migrations/2_v0.19.3.up.sql +++ b/pkg/client/migrations/2_v0.19.3.up.sql @@ -1,5 +1,7 @@ ALTER TABLE IF EXISTS fetches ADD COLUMN provider_alias TEXT; +ALTER TABLE IF EXISTS fetches + ADD COLUMN created_at TIMESTAMP; ALTER TABLE IF EXISTS fetches DROP CONSTRAINT fetches_pk; ALTER TABLE IF EXISTS fetches From c16f57cfcac6171acdbc097e5e06715217740a3a Mon Sep 17 00:00:00 2001 From: Andrii Romanenko Date: Thu, 20 Jan 2022 14:20:45 +0200 Subject: [PATCH 4/7] test fixes --- pkg/client/client.go | 10 ++++++++-- pkg/client/client_test.go | 14 ++++++++------ 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/pkg/client/client.go b/pkg/client/client.go index 9567d1d5412..8de9932a075 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -83,6 +83,7 @@ type FetchUpdate struct { // ProviderFetchSummary represents a request for the FetchFinishCallback type ProviderFetchSummary struct { ProviderName string + ProviderAlias string Version string PartialFetchErrors []*cqproto.FailedResourceFetch FetchErrors []error @@ -510,7 +511,8 @@ func (c *Client) Fetch(ctx context.Context, request FetchRequest) (res *FetchRes pLog.Warn("received partial fetch error", parsePartialFetchKV(fetchError)...) } fetchSummaries <- ProviderFetchSummary{ - ProviderName: providerConfig.Alias, + ProviderName: providerConfig.Name, + ProviderAlias: providerConfig.Alias, Version: providerPlugin.Version(), TotalResourcesFetched: totalResources, PartialFetchErrors: partialFetchResults, @@ -577,7 +579,11 @@ func (c *Client) Fetch(ctx context.Context, request FetchRequest) (res *FetchRes close(fetchSummaries) for ps := range fetchSummaries { - response.ProviderFetchSummary[ps.ProviderName] = ps + key := fmt.Sprintf("%s(%s)", ps.ProviderName, ps.ProviderAlias) + if ps.ProviderName == ps.ProviderAlias { + key = ps.ProviderName + } + response.ProviderFetchSummary[key] = ps } reportFetchSummaryErrors(otrace.SpanFromContext(ctx), response.ProviderFetchSummary) diff --git a/pkg/client/client_test.go b/pkg/client/client_test.go index 968d07351b7..5e7b00af08e 100644 --- a/pkg/client/client_test.go +++ b/pkg/client/client_test.go @@ -31,7 +31,7 @@ var ( { Name: "test", Source: &providerSrc, - Version: "latest", + Version: "v0.0.9", }, } ) @@ -60,7 +60,10 @@ func TestClient_FailOnFetchWithPartialFetch(t *testing.T) { }, }) assert.Nil(t, err) - testSummary, ok := result.ProviderFetchSummary["test"] + if result == nil { + return + } + testSummary, ok := result.ProviderFetchSummary["test(test_alias)"] assert.True(t, ok) assert.True(t, testSummary.HasErrors()) assert.Len(t, testSummary.PartialFetchErrors, 2) @@ -91,7 +94,7 @@ func TestClient_FailOnFetch(t *testing.T) { }, }) assert.Nil(t, err) - testSummary, ok := result.ProviderFetchSummary["test"] + testSummary, ok := result.ProviderFetchSummary["test(test_alias)"] assert.True(t, ok) assert.True(t, testSummary.HasErrors()) assert.Len(t, testSummary.FetchErrors, 2) @@ -121,7 +124,7 @@ func TestClient_PartialFetch(t *testing.T) { }, }) assert.Nil(t, err) - testSummary, ok := result.ProviderFetchSummary["test"] + testSummary, ok := result.ProviderFetchSummary["test(test_alias)"] assert.True(t, ok) assert.Len(t, testSummary.PartialFetchErrors, 2) } @@ -489,7 +492,6 @@ func setupTestPlugin(t *testing.T) context.CancelFunc { } func Test_normalizeResources(t *testing.T) { - tests := []struct { name string requested []string @@ -655,7 +657,7 @@ func Test_CheckForProviderUpdates(t *testing.T) { { Name: "test", Source: &source, - Version: "v0.0.9", + Version: "v0.0.11", }, }, 0, From 4e23e7491baa8dd88e1ea4400283231adf9849aa Mon Sep 17 00:00:00 2001 From: Andrii Romanenko Date: Thu, 20 Jan 2022 14:38:03 +0200 Subject: [PATCH 5/7] console output adjustments, added core version column to fetches --- pkg/client/client.go | 7 ++----- pkg/client/fetch.go | 1 + pkg/client/migrations/2_v0.19.3.up.sql | 9 +++++---- pkg/ui/console/client.go | 6 +++++- 4 files changed, 13 insertions(+), 10 deletions(-) diff --git a/pkg/client/client.go b/pkg/client/client.go index 8de9932a075..cd6b950bf33 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -430,6 +430,7 @@ func (c *Client) Fetch(ctx context.Context, request FetchRequest) (res *FetchRes ProviderName: providerConfig.Name, ProviderAlias: providerConfig.Alias, CreatedAt: &createdAt, + CoreVersion: Version, } saveFetchSummary := func() { if err := SaveFetchSummary(ctx, c.pool, &fs); err != nil { @@ -579,11 +580,7 @@ func (c *Client) Fetch(ctx context.Context, request FetchRequest) (res *FetchRes close(fetchSummaries) for ps := range fetchSummaries { - key := fmt.Sprintf("%s(%s)", ps.ProviderName, ps.ProviderAlias) - if ps.ProviderName == ps.ProviderAlias { - key = ps.ProviderName - } - response.ProviderFetchSummary[key] = ps + response.ProviderFetchSummary[fmt.Sprintf("%s(%s)", ps.ProviderName, ps.ProviderAlias)] = ps } reportFetchSummaryErrors(otrace.SpanFromContext(ctx), response.ProviderFetchSummary) diff --git a/pkg/client/fetch.go b/pkg/client/fetch.go index 912b6960ba4..889f52c8d6b 100644 --- a/pkg/client/fetch.go +++ b/pkg/client/fetch.go @@ -28,6 +28,7 @@ type FetchSummary struct { ProviderName string `db:"provider_name"` ProviderAlias string `db:"provider_alias"` ProviderVersion string `db:"provider_version"` + CoreVersion string `db:"core_version"` Resources ResourceFetchSummaries `db:"results"` } diff --git a/pkg/client/migrations/2_v0.19.3.up.sql b/pkg/client/migrations/2_v0.19.3.up.sql index 971407c6c33..21a6dd24d04 100644 --- a/pkg/client/migrations/2_v0.19.3.up.sql +++ b/pkg/client/migrations/2_v0.19.3.up.sql @@ -1,9 +1,10 @@ ALTER TABLE IF EXISTS fetches - ADD COLUMN provider_alias TEXT; + ADD COLUMN IF NOT EXISTS provider_alias TEXT; ALTER TABLE IF EXISTS fetches - ADD COLUMN created_at TIMESTAMP; + ADD COLUMN IF NOT EXISTS core_version TEXT; +ALTER TABLE IF EXISTS fetches + ADD COLUMN IF NOT EXISTS created_at TIMESTAMP; ALTER TABLE IF EXISTS fetches DROP CONSTRAINT fetches_pk; ALTER TABLE IF EXISTS fetches - ADD CONSTRAINT "fetches_pk" UNIQUE (fetch_id, provider_name, provider_alias); - + ADD CONSTRAINT "fetches_pk" UNIQUE (fetch_id, provider_name, provider_alias); \ No newline at end of file diff --git a/pkg/ui/console/client.go b/pkg/ui/console/client.go index da12f271f88..a5638bb3abb 100644 --- a/pkg/ui/console/client.go +++ b/pkg/ui/console/client.go @@ -143,8 +143,12 @@ func (c Client) Fetch(ctx context.Context, failOnError bool) error { if summary.Status == "Canceled" { status = emojiStatus[ui.StatusError] + " (canceled)" } + key := summary.ProviderName + if summary.ProviderName != summary.ProviderAlias { + key = fmt.Sprintf("%s(%s)", summary.ProviderName, summary.ProviderAlias) + } ui.ColorizedOutput(ui.ColorHeader, "Provider %s fetch summary: %s Total Resources fetched: %d\t ⚠️ Warnings: %d\t ❌ Errors: %d\n", - summary.ProviderName, status, summary.TotalResourcesFetched, + key, status, summary.TotalResourcesFetched, summary.Diagnostics().Warnings(), summary.Diagnostics().Errors()) if failOnError && summary.HasErrors() { err = fmt.Errorf("provider fetch has one or more errors") From d85286c21616305918f6b0a4e41b33c4589964de Mon Sep 17 00:00:00 2001 From: Andrii Romanenko Date: Thu, 20 Jan 2022 17:39:29 +0200 Subject: [PATCH 6/7] some adjustements after merge --- pkg/client/client.go | 2 +- pkg/client/client_test.go | 2 +- pkg/client/migrations/{ => postgres}/2_v0.19.3.down.sql | 0 pkg/client/migrations/{ => postgres}/2_v0.19.3.up.sql | 0 4 files changed, 2 insertions(+), 2 deletions(-) rename pkg/client/migrations/{ => postgres}/2_v0.19.3.down.sql (100%) rename pkg/client/migrations/{ => postgres}/2_v0.19.3.up.sql (100%) diff --git a/pkg/client/client.go b/pkg/client/client.go index 10b72543e68..c329e9e92ca 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -420,7 +420,7 @@ func (c *Client) Fetch(ctx context.Context, request FetchRequest) (res *FetchRes CoreVersion: Version, } saveFetchSummary := func() { - if err := SaveFetchSummary(ctx, c.pool, &fs); err != nil { + if err := c.SaveFetchSummary(ctx, &fs); err != nil { c.Logger.Error("failed to save fetch summary", "err", err) } } diff --git a/pkg/client/client_test.go b/pkg/client/client_test.go index 795f6f47dad..525943828a5 100644 --- a/pkg/client/client_test.go +++ b/pkg/client/client_test.go @@ -37,7 +37,7 @@ func requiredTestProviders() []*config.RequiredProvider { { Name: "test", Source: &providerSrc, - Version: "v0.0.9", + Version: "latest", }, } } diff --git a/pkg/client/migrations/2_v0.19.3.down.sql b/pkg/client/migrations/postgres/2_v0.19.3.down.sql similarity index 100% rename from pkg/client/migrations/2_v0.19.3.down.sql rename to pkg/client/migrations/postgres/2_v0.19.3.down.sql diff --git a/pkg/client/migrations/2_v0.19.3.up.sql b/pkg/client/migrations/postgres/2_v0.19.3.up.sql similarity index 100% rename from pkg/client/migrations/2_v0.19.3.up.sql rename to pkg/client/migrations/postgres/2_v0.19.3.up.sql From 74c5760093ce2f55045497fd3a846b838000d114 Mon Sep 17 00:00:00 2001 From: Andrii Romanenko Date: Mon, 24 Jan 2022 09:25:43 +0200 Subject: [PATCH 7/7] status -> fetchStatus --- pkg/client/client.go | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/pkg/client/client.go b/pkg/client/client.go index c329e9e92ca..9b6dc72c490 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -412,7 +412,7 @@ func (c *Client) Fetch(ctx context.Context, request FetchRequest) (res *FetchRes for _, providerConfig := range request.Providers { providerConfig := providerConfig createdAt := time.Now().UTC() - fs := FetchSummary{ + fetchSummary := FetchSummary{ FetchId: fetchId, ProviderName: providerConfig.Name, ProviderAlias: providerConfig.Alias, @@ -420,7 +420,7 @@ func (c *Client) Fetch(ctx context.Context, request FetchRequest) (res *FetchRes CoreVersion: Version, } saveFetchSummary := func() { - if err := c.SaveFetchSummary(ctx, &fs); err != nil { + if err := c.SaveFetchSummary(ctx, &fetchSummary); err != nil { c.Logger.Error("failed to save fetch summary", "err", err) } } @@ -460,7 +460,7 @@ func (c *Client) Fetch(ctx context.Context, request FetchRequest) (res *FetchRes pLog.Info("requesting provider fetch", "partial_fetch_enabled", providerConfig.EnablePartialFetch) fetchStart := time.Now() - fs.Start = &fetchStart + fetchSummary.Start = &fetchStart stream, err := providerPlugin.Provider().FetchResources(ctx, &cqproto.FetchResourcesRequest{ Resources: providerConfig.Resources, @@ -486,10 +486,10 @@ func (c *Client) Fetch(ctx context.Context, request FetchRequest) (res *FetchRes if (ok && st.Code() == gcodes.Canceled) || err == io.EOF { message := "provider finished fetch" - status := "Finished" + fetchStatus := "Finished" if ok && st.Code() == gcodes.Canceled { message = "provider fetch canceled" - status = "Canceled" + fetchStatus = "Canceled" } pLog.Info(message, "execution", time.Since(fetchStart).String()) @@ -504,13 +504,13 @@ func (c *Client) Fetch(ctx context.Context, request FetchRequest) (res *FetchRes PartialFetchErrors: partialFetchResults, FetchErrors: fetchErrors, FetchResources: fetchedResources, - Status: status, + Status: fetchStatus, } t := time.Now().UTC() - fs.Finish = &t - fs.IsSuccess = true - fs.TotalErrorsCount = totalErrors - fs.TotalResourceCount = totalResources + fetchSummary.Finish = &t + fetchSummary.IsSuccess = true + fetchSummary.TotalErrorsCount = totalErrors + fetchSummary.TotalResourceCount = totalResources return nil } pLog.Error("received provider fetch error", "error", err) @@ -543,7 +543,7 @@ func (c *Client) Fetch(ctx context.Context, request FetchRequest) (res *FetchRes request.UpdateCallback(update) } - fs.Resources = append(fs.Resources, ResourceFetchSummary{ + fetchSummary.Resources = append(fetchSummary.Resources, ResourceFetchSummary{ ResourceName: resp.ResourceName, FinishedResources: resp.FinishedResources, Status: strconv.Itoa(int(resp.Summary.Status)), // todo use human readable representation of status