Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 27 additions & 22 deletions pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ type FetchUpdate struct {
// ProviderFetchSummary represents a request for the FetchFinishCallback
type ProviderFetchSummary struct {
ProviderName string
ProviderAlias string
Version string
PartialFetchErrors []*cqproto.FailedResourceFetch
FetchErrors []error
Expand Down Expand Up @@ -410,6 +411,19 @@ func (c *Client) Fetch(ctx context.Context, request FetchRequest) (res *FetchRes

for _, providerConfig := range request.Providers {
providerConfig := providerConfig
createdAt := time.Now().UTC()
fetchSummary := FetchSummary{
FetchId: fetchId,
Comment thread
amanenk marked this conversation as resolved.
ProviderName: providerConfig.Name,
ProviderAlias: providerConfig.Alias,
CreatedAt: &createdAt,
CoreVersion: Version,
}
saveFetchSummary := func() {
if err := c.SaveFetchSummary(ctx, &fetchSummary); err != nil {
c.Logger.Error("failed to save fetch summary", "err", err)
}
}
c.Logger.Debug("creating provider plugin", "provider", providerConfig.Name)
providerPlugin, err := c.Manager.CreatePlugin(providerConfig.Name, providerConfig.Alias, providerConfig.Env)
if err != nil {
Expand All @@ -419,19 +433,7 @@ func (c *Client) Fetch(ctx context.Context, request FetchRequest) (res *FetchRes

// TODO: move this into an outer function
errGroup.Go(func() error {
fs := FetchSummary{
FetchId: fetchId,
ProviderName: providerConfig.Name,
ProviderVersion: providerPlugin.Version(),
Start: time.Now().UTC(),
}

defer func() {
if err := c.SaveFetchSummary(ctx, &fs); err != nil {
c.Logger.Error("failed to save fetch summary", "err", err)
}
}()

defer saveFetchSummary()
pLog := c.Logger.With("provider", providerConfig.Name, "alias", providerConfig.Alias, "version", providerPlugin.Version())
pLog.Info("requesting provider to configure")
if c.HistoryCfg != nil {
Expand All @@ -458,6 +460,7 @@ func (c *Client) Fetch(ctx context.Context, request FetchRequest) (res *FetchRes

pLog.Info("requesting provider fetch", "partial_fetch_enabled", providerConfig.EnablePartialFetch)
fetchStart := time.Now()
fetchSummary.Start = &fetchStart
stream, err := providerPlugin.Provider().FetchResources(ctx,
&cqproto.FetchResourcesRequest{
Resources: providerConfig.Resources,
Expand All @@ -483,10 +486,10 @@ func (c *Client) Fetch(ctx context.Context, request FetchRequest) (res *FetchRes

if (ok && st.Code() == gcodes.Canceled) || err == io.EOF {
message := "provider finished fetch"
status := "Finished"
fetchStatus := "Finished"
if ok && st.Code() == gcodes.Canceled {
message = "provider fetch canceled"
status = "Canceled"
fetchStatus = "Canceled"
}

pLog.Info(message, "execution", time.Since(fetchStart).String())
Expand All @@ -495,17 +498,19 @@ func (c *Client) Fetch(ctx context.Context, request FetchRequest) (res *FetchRes
}
fetchSummaries <- ProviderFetchSummary{
ProviderName: providerConfig.Name,
ProviderAlias: providerConfig.Alias,
Version: providerPlugin.Version(),
TotalResourcesFetched: totalResources,
PartialFetchErrors: partialFetchResults,
FetchErrors: fetchErrors,
FetchResources: fetchedResources,
Status: status,
Status: fetchStatus,
}
fs.Finish = time.Now().UTC()
fs.IsSuccess = true
fs.TotalErrorsCount = totalErrors
fs.TotalResourceCount = totalResources
t := time.Now().UTC()
fetchSummary.Finish = &t
fetchSummary.IsSuccess = true
fetchSummary.TotalErrorsCount = totalErrors
fetchSummary.TotalResourceCount = totalResources
return nil
}
pLog.Error("received provider fetch error", "error", err)
Expand Down Expand Up @@ -538,7 +543,7 @@ func (c *Client) Fetch(ctx context.Context, request FetchRequest) (res *FetchRes
request.UpdateCallback(update)
}

fs.Resources = append(fs.Resources, ResourceFetchSummary{
fetchSummary.Resources = append(fetchSummary.Resources, ResourceFetchSummary{
ResourceName: resp.ResourceName,
FinishedResources: resp.FinishedResources,
Status: strconv.Itoa(int(resp.Summary.Status)), // todo use human readable representation of status
Expand All @@ -560,7 +565,7 @@ func (c *Client) Fetch(ctx context.Context, request FetchRequest) (res *FetchRes
close(fetchSummaries)

for ps := range fetchSummaries {
response.ProviderFetchSummary[ps.ProviderName] = ps
response.ProviderFetchSummary[fmt.Sprintf("%s(%s)", ps.ProviderName, ps.ProviderAlias)] = ps
}

reportFetchSummaryErrors(otrace.SpanFromContext(ctx), response.ProviderFetchSummary)
Expand Down
10 changes: 6 additions & 4 deletions pkg/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,10 @@ func TestClient_FailOnFetchWithPartialFetch(t *testing.T) {
},
})
assert.Nil(t, err)
testSummary, ok := result.ProviderFetchSummary["test"]
if result == nil {
return
}
testSummary, ok := result.ProviderFetchSummary["test(test_alias)"]
assert.True(t, ok)
assert.True(t, testSummary.HasErrors())
assert.Len(t, testSummary.PartialFetchErrors, 2)
Expand Down Expand Up @@ -139,7 +142,7 @@ func TestClient_FailOnFetch(t *testing.T) {
},
})
assert.Nil(t, err)
testSummary, ok := result.ProviderFetchSummary["test"]
testSummary, ok := result.ProviderFetchSummary["test(test_alias)"]
assert.True(t, ok)
assert.True(t, testSummary.HasErrors())
assert.Len(t, testSummary.FetchErrors, 2)
Expand Down Expand Up @@ -173,7 +176,7 @@ func TestClient_PartialFetch(t *testing.T) {
},
})
assert.Nil(t, err)
testSummary, ok := result.ProviderFetchSummary["test"]
testSummary, ok := result.ProviderFetchSummary["test(test_alias)"]
assert.True(t, ok)
assert.Len(t, testSummary.PartialFetchErrors, 2)
}
Expand Down Expand Up @@ -578,7 +581,6 @@ func setupTestPlugin(t *testing.T) context.CancelFunc {
}

func Test_normalizeResources(t *testing.T) {

tests := []struct {
name string
requested []string
Expand Down
7 changes: 5 additions & 2 deletions pkg/client/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@ type FetchSummary struct {
CqId uuid.UUID `db:"id"`
// Unique Id of fetch session
FetchId uuid.UUID `db:"fetch_id"`
Start time.Time `db:"start"`
Finish time.Time `db:"finish"`
CreatedAt *time.Time `db:"created_at"`
Start *time.Time `db:"start"`
Finish *time.Time `db:"finish"`
IsSuccess bool `db:"is_success"`
TotalResourceCount uint64 `db:"total_resource_count"`
TotalErrorsCount uint64 `db:"total_errors_count"`
ProviderName string `db:"provider_name"`
ProviderAlias string `db:"provider_alias"`
ProviderVersion string `db:"provider_version"`
CoreVersion string `db:"core_version"`
Resources ResourceFetchSummaries `db:"results"`
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/client/fetch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ func TestFetchSummary(t *testing.T) {
if !f.skipFetchId {
f.summary.FetchId = fetchId
}
f.summary.Start = time.Now()
start := time.Now()
f.summary.Start = &start
err := c.SaveFetchSummary(context.Background(), &f.summary)
if f.err != nil {
assert.EqualError(t, err, f.err.Error())
Expand Down
5 changes: 5 additions & 0 deletions pkg/client/migrations/postgres/2_v0.19.3.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
ALTER TABLE IF EXISTS fetches
DROP CONSTRAINT fetches_pk;
ALTER TABLE IF EXISTS fetches ADD CONSTRAINT "fetches_pk" UNIQUE (fetch_id, provider_name);
ALTER TABLE IF EXISTS fetches
DROP COLUMN provider_alias;
10 changes: 10 additions & 0 deletions pkg/client/migrations/postgres/2_v0.19.3.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
ALTER TABLE IF EXISTS fetches
ADD COLUMN IF NOT EXISTS provider_alias TEXT;
ALTER TABLE IF EXISTS fetches
ADD COLUMN IF NOT EXISTS core_version TEXT;
ALTER TABLE IF EXISTS fetches
ADD COLUMN IF NOT EXISTS created_at TIMESTAMP;
ALTER TABLE IF EXISTS fetches
DROP CONSTRAINT fetches_pk;
ALTER TABLE IF EXISTS fetches
ADD CONSTRAINT "fetches_pk" UNIQUE (fetch_id, provider_name, provider_alias);
6 changes: 5 additions & 1 deletion pkg/ui/console/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,12 @@ func (c Client) Fetch(ctx context.Context, failOnError bool) error {
if summary.Status == "Canceled" {
status = emojiStatus[ui.StatusError] + " (canceled)"
}
key := summary.ProviderName
if summary.ProviderName != summary.ProviderAlias {
key = fmt.Sprintf("%s(%s)", summary.ProviderName, summary.ProviderAlias)
}
ui.ColorizedOutput(ui.ColorHeader, "Provider %s fetch summary: %s Total Resources fetched: %d\t ⚠️ Warnings: %d\t ❌ Errors: %d\n",
summary.ProviderName, status, summary.TotalResourcesFetched,
key, status, summary.TotalResourcesFetched,
summary.Diagnostics().Warnings(), summary.Diagnostics().Errors())
if failOnError && summary.HasErrors() {
err = fmt.Errorf("provider fetch has one or more errors")
Expand Down