From 4ad4c065c4c360bc6009b6e6b0a7edeac2b65086 Mon Sep 17 00:00:00 2001 From: Andrii Romanenko Date: Wed, 26 Jan 2022 14:37:20 +0200 Subject: [PATCH 01/19] fetches check before running policy --- pkg/client/client.go | 9 ++- .../fetch_summary.go} | 41 ++++++++++++- .../fetch_summary_test.go} | 13 ++-- pkg/policy/execute.go | 38 ++++++++++++ pkg/policy/execute_test.go | 60 +++++++++++++++++++ 5 files changed, 150 insertions(+), 11 deletions(-) rename pkg/client/{fetch.go => fetch_summary/fetch_summary.go} (71%) rename pkg/client/{fetch_test.go => fetch_summary/fetch_summary_test.go} (83%) diff --git a/pkg/client/client.go b/pkg/client/client.go index 9b6dc72c490687..ce462f19787e69 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -5,6 +5,7 @@ import ( "embed" "errors" "fmt" + "github.com/cloudquery/cloudquery/pkg/client/fetch_summary" "io" "io/fs" "path/filepath" @@ -409,10 +410,12 @@ func (c *Client) Fetch(ctx context.Context, request FetchRequest) (res *FetchRes return nil, err } + fetchSummaryClient := fetch_summary.NewFetchSummaryClient(c.db) + for _, providerConfig := range request.Providers { providerConfig := providerConfig createdAt := time.Now().UTC() - fetchSummary := FetchSummary{ + fetchSummary := fetch_summary.FetchSummary{ FetchId: fetchId, ProviderName: providerConfig.Name, ProviderAlias: providerConfig.Alias, @@ -420,7 +423,7 @@ func (c *Client) Fetch(ctx context.Context, request FetchRequest) (res *FetchRes CoreVersion: Version, } saveFetchSummary := func() { - if err := c.SaveFetchSummary(ctx, &fetchSummary); err != nil { + if err := fetchSummaryClient.SaveFetchSummary(ctx, &fetchSummary); err != nil { c.Logger.Error("failed to save fetch summary", "err", err) } } @@ -543,7 +546,7 @@ func (c *Client) Fetch(ctx context.Context, request FetchRequest) (res *FetchRes request.UpdateCallback(update) } - fetchSummary.Resources = append(fetchSummary.Resources, ResourceFetchSummary{ + fetchSummary.Resources = append(fetchSummary.Resources, fetch_summary.ResourceFetchSummary{ ResourceName: resp.ResourceName, FinishedResources: resp.FinishedResources, Status: strconv.Itoa(int(resp.Summary.Status)), // todo use human readable representation of status diff --git a/pkg/client/fetch.go b/pkg/client/fetch_summary/fetch_summary.go similarity index 71% rename from pkg/client/fetch.go rename to pkg/client/fetch_summary/fetch_summary.go index 5b3d5ebfe42d40..87aee6a05b6973 100644 --- a/pkg/client/fetch.go +++ b/pkg/client/fetch_summary/fetch_summary.go @@ -1,4 +1,4 @@ -package client +package fetch_summary import ( "context" @@ -6,12 +6,25 @@ import ( "encoding/json" "time" + "github.com/cloudquery/cq-provider-sdk/provider/schema" + "github.com/georgysavva/scany/pgxscan" + "github.com/cloudquery/cq-provider-sdk/cqproto" "github.com/cloudquery/cq-provider-sdk/provider/schema/diag" "github.com/doug-martin/goqu/v9" "github.com/google/uuid" ) +type FetchSummaryClient struct { + db schema.QueryExecer +} + +func NewFetchSummaryClient(db schema.QueryExecer) *FetchSummaryClient { + return &FetchSummaryClient{ + db: db, + } +} + // FetchSummary includes a summarized report of fetch, such as fetch id, fetch start and finish, // resources fetch results type FetchSummary struct { @@ -61,7 +74,7 @@ type ResourceFetchSummary struct { } // SaveFetchSummary saves fetch summary into fetches database -func (c *Client) SaveFetchSummary(ctx context.Context, fs *FetchSummary) error { +func (c *FetchSummaryClient) SaveFetchSummary(ctx context.Context, fs *FetchSummary) error { id, err := uuid.NewUUID() if err != nil { return err @@ -72,6 +85,28 @@ func (c *Client) SaveFetchSummary(ctx context.Context, fs *FetchSummary) error { if err != nil { return err } - return c.db.Exec(ctx, sql, args...) } + +// GetLatestFetchSummaryForProvider gets latest fetch summary for specific provider +func (c *FetchSummaryClient) GetLatestFetchSummaryForProvider(ctx context.Context, provider string) (*FetchSummary, error) { + q := goqu.Dialect("postgres"). + Select("provider_version", "is_success"). + From("cloudquery.fetches"). + Where(goqu.Ex{"provider_name": provider, "finish": goqu.Op{"isNot": nil}}). + Limit(1). + Order(goqu.I("finish").Desc()) + sql, _, err := q.ToSQL() + if err != nil { + return nil, err + } + var data []FetchSummary + err = pgxscan.Select(ctx, c.db, &data, sql) + if err != nil { + return nil, err + } + if len(data) == 0 { + return nil, nil + } + return &data[0], nil +} diff --git a/pkg/client/fetch_test.go b/pkg/client/fetch_summary/fetch_summary_test.go similarity index 83% rename from pkg/client/fetch_test.go rename to pkg/client/fetch_summary/fetch_summary_test.go index 6a1d97841a22e5..dd3a1b5d16d785 100644 --- a/pkg/client/fetch_test.go +++ b/pkg/client/fetch_summary/fetch_summary_test.go @@ -1,8 +1,10 @@ -package client +package fetch_summary import ( "context" "errors" + sdkdb "github.com/cloudquery/cq-provider-sdk/database" + "github.com/hashicorp/go-hclog" "testing" "time" @@ -75,11 +77,12 @@ var fetchSummaryTests = []fetchSummaryTest{ } func TestFetchSummary(t *testing.T) { - c, err := New(context.Background(), func(c *Client) { - c.DSN = testDBConnection - }) + // create database connection + db, err := sdkdb.New(context.Background(), hclog.NewNullLogger(), testDBConnection) assert.NoError(t, err) + fetchSummaryClient := NewFetchSummaryClient(db) + fetchId := uuid.New() for _, f := range fetchSummaryTests { if !f.skipFetchId { @@ -87,7 +90,7 @@ func TestFetchSummary(t *testing.T) { } start := time.Now() f.summary.Start = &start - err := c.SaveFetchSummary(context.Background(), &f.summary) + err := fetchSummaryClient.SaveFetchSummary(context.Background(), &f.summary) if f.err != nil { assert.EqualError(t, err, f.err.Error()) } else { diff --git a/pkg/policy/execute.go b/pkg/policy/execute.go index b55353af320abe..e292c3b07932fe 100644 --- a/pkg/policy/execute.go +++ b/pkg/policy/execute.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "github.com/cloudquery/cloudquery/pkg/client/fetch_summary" "path" "path/filepath" "strings" @@ -17,6 +18,8 @@ import ( var ErrPolicyOrQueryNotFound = errors.New("selected policy/query is not found") +const testDBConnection = "postgres://postgres:pass@localhost:5432/postgres?sslmode=disable" + type UpdateCallback func(update Update) type Update struct { @@ -124,6 +127,9 @@ func (e *Executor) Execute(ctx context.Context, req *ExecuteRequest, policy *Pol if err := e.checkVersions(policy.Config, req.ProviderVersions); err != nil { return nil, fmt.Errorf("%s: %w", policy.Name, err) } + if err := e.checkFetches(ctx, policy.Config); err != nil { + return nil, fmt.Errorf("%s: %w", policy.Name, err) + } if err := e.createViews(ctx, policy); err != nil { return nil, err } @@ -181,6 +187,38 @@ func (e *Executor) Execute(ctx context.Context, req *ExecuteRequest, policy *Pol return &total, nil } +// checkFetches checks if there are fetch reports in database that satisfy providers from policy +func (e *Executor) checkFetches(ctx context.Context, policyConfig *Configuration) error { + if policyConfig == nil { + return nil + } + fetchSummaryClient := fetch_summary.NewFetchSummaryClient(e.conn) + for _, p := range policyConfig.Providers { + c, err := version.NewConstraint(p.Version) + if err != nil { + return fmt.Errorf("failed to parse version constraint for provider %s: %w", p.Type, err) + } + fetchSummary, err := fetchSummaryClient.GetLatestFetchSummaryForProvider(ctx, p.Type) + if err != nil { + return fmt.Errorf("failed to get fetch summary for provider %s: %w", p.Type, err) + } + if fetchSummary == nil { + return fmt.Errorf("there were no fetches for provider %s. please run cloudquery fetch berfore running policy", p.Type) + } + if fetchSummary.IsSuccess == false { + return fmt.Errorf("last fetch for provider %s wasn't successful", p.Type) + } + v, err := version.NewVersion(fetchSummary.ProviderVersion) + if err != nil { + return fmt.Errorf("failed to parse version for %s fetch summary: %w", p.Type, err) + } + if !c.Check(v) { + return fmt.Errorf("the latest fetch for provider %s does not satisfy version requirement %s", p.Type, c) + } + } + return nil +} + func (*Executor) checkVersions(policyConfig *Configuration, actual map[string]*version.Version) error { if policyConfig == nil { return nil diff --git a/pkg/policy/execute_test.go b/pkg/policy/execute_test.go index adfdf2de130df4..b3b1f6da93e8c9 100644 --- a/pkg/policy/execute_test.go +++ b/pkg/policy/execute_test.go @@ -3,7 +3,10 @@ package policy import ( "context" "fmt" + "github.com/cloudquery/cloudquery/pkg/client/fetch_summary" + uuid "github.com/google/uuid" "testing" + "time" sdkdb "github.com/cloudquery/cq-provider-sdk/database" "github.com/cloudquery/cq-provider-sdk/provider/schema" @@ -332,3 +335,60 @@ func TestExecutor_Execute(t *testing.T) { }) } } + +func setupCheckFetchDatabase(db schema.QueryExecer, fetchSummary *fetch_summary.FetchSummary) (error, func(t *testing.T)) { + if fetchSummary == nil { + return nil, func(t *testing.T) {} + } + fetchSummaryClient := fetch_summary.NewFetchSummaryClient(db) + err := fetchSummaryClient.SaveFetchSummary(context.Background(), fetchSummary) + if err != nil { + return err, nil + } + + // Return conn and tear down func + return nil, func(t *testing.T) { + err = db.Exec(context.Background(), fmt.Sprintf(`DELETE FROM "cloudquery"."fetches" WHERE "id" = '%s';`, fetchSummary.FetchId.String())) + assert.NoError(t, err) + } +} + +func TestExecutor_CheckFetches(t *testing.T) { + //todo migrate database to check it + + db, err := sdkdb.New(context.Background(), hclog.NewNullLogger(), testDBConnection) + executor := NewExecutor(db, hclog.Default(), nil) + + assert.NoError(t, err) + cases := []struct { + Name string + Config Configuration + f *fetch_summary.FetchSummary + err error + }{ + { + Name: "correct version", + Config: Configuration{ + Providers: []*Provider{ + {Type: "test1", Version: "~> v0.2.0"}, + }, + }, + f: &fetch_summary.FetchSummary{ProviderName: "test1", ProviderVersion: "v0.2.3", IsSuccess: true}, + err: nil, + }, + } + + for _, tc := range cases { + tc.f.CqId = uuid.New() + tc.f.FetchId = uuid.New() + finish := time.Now().UTC() + tc.f.Finish = &finish + err, clear := setupCheckFetchDatabase(db, tc.f) + assert.NoError(t, err) + + err = executor.checkFetches(context.Background(), &tc.Config) + assert.Equal(t, err, tc.err) + clear(t) + + } +} From 6e4d06adc5fe0dfecfe66ac75490a891cd5bd06d Mon Sep 17 00:00:00 2001 From: Andrii Romanenko Date: Wed, 26 Jan 2022 14:52:50 +0200 Subject: [PATCH 02/19] added more tests --- .../fetch_summary/fetch_summary_test.go | 1 + pkg/policy/execute.go | 4 +- pkg/policy/execute_test.go | 74 +++++++++++++++---- 3 files changed, 62 insertions(+), 17 deletions(-) diff --git a/pkg/client/fetch_summary/fetch_summary_test.go b/pkg/client/fetch_summary/fetch_summary_test.go index dd3a1b5d16d785..2d34635d3161cc 100644 --- a/pkg/client/fetch_summary/fetch_summary_test.go +++ b/pkg/client/fetch_summary/fetch_summary_test.go @@ -77,6 +77,7 @@ var fetchSummaryTests = []fetchSummaryTest{ } func TestFetchSummary(t *testing.T) { + //todo be sure that it is running after core migrations // create database connection db, err := sdkdb.New(context.Background(), hclog.NewNullLogger(), testDBConnection) assert.NoError(t, err) diff --git a/pkg/policy/execute.go b/pkg/policy/execute.go index e292c3b07932fe..50129c3afe033e 100644 --- a/pkg/policy/execute.go +++ b/pkg/policy/execute.go @@ -128,7 +128,7 @@ func (e *Executor) Execute(ctx context.Context, req *ExecuteRequest, policy *Pol return nil, fmt.Errorf("%s: %w", policy.Name, err) } if err := e.checkFetches(ctx, policy.Config); err != nil { - return nil, fmt.Errorf("%s: %w", policy.Name, err) + return nil, fmt.Errorf("%s: %w, please run `cloudquery fetch` berfore running policy", policy.Name, err) } if err := e.createViews(ctx, policy); err != nil { return nil, err @@ -203,7 +203,7 @@ func (e *Executor) checkFetches(ctx context.Context, policyConfig *Configuration return fmt.Errorf("failed to get fetch summary for provider %s: %w", p.Type, err) } if fetchSummary == nil { - return fmt.Errorf("there were no fetches for provider %s. please run cloudquery fetch berfore running policy", p.Type) + return fmt.Errorf("there is no finished fetches for provider %s", p.Type) } if fetchSummary.IsSuccess == false { return fmt.Errorf("last fetch for provider %s wasn't successful", p.Type) diff --git a/pkg/policy/execute_test.go b/pkg/policy/execute_test.go index b3b1f6da93e8c9..5e3826857c5530 100644 --- a/pkg/policy/execute_test.go +++ b/pkg/policy/execute_test.go @@ -2,16 +2,16 @@ package policy import ( "context" + "errors" "fmt" "github.com/cloudquery/cloudquery/pkg/client/fetch_summary" - uuid "github.com/google/uuid" - "testing" - "time" - sdkdb "github.com/cloudquery/cq-provider-sdk/database" "github.com/cloudquery/cq-provider-sdk/provider/schema" + "github.com/google/uuid" "github.com/hashicorp/go-hclog" "github.com/stretchr/testify/assert" + "testing" + "time" ) func setupPolicyDatabase(t *testing.T, tableName string) (schema.QueryExecer, func(t *testing.T)) { @@ -340,6 +340,10 @@ func setupCheckFetchDatabase(db schema.QueryExecer, fetchSummary *fetch_summary. if fetchSummary == nil { return nil, func(t *testing.T) {} } + fetchSummary.CqId = uuid.New() + fetchSummary.FetchId = uuid.New() + finish := time.Now().UTC() + fetchSummary.Finish = &finish fetchSummaryClient := fetch_summary.NewFetchSummaryClient(db) err := fetchSummaryClient.SaveFetchSummary(context.Background(), fetchSummary) if err != nil { @@ -354,8 +358,7 @@ func setupCheckFetchDatabase(db schema.QueryExecer, fetchSummary *fetch_summary. } func TestExecutor_CheckFetches(t *testing.T) { - //todo migrate database to check it - + //todo be sure that it is running after core migrations db, err := sdkdb.New(context.Background(), hclog.NewNullLogger(), testDBConnection) executor := NewExecutor(db, hclog.Default(), nil) @@ -376,19 +379,60 @@ func TestExecutor_CheckFetches(t *testing.T) { f: &fetch_summary.FetchSummary{ProviderName: "test1", ProviderVersion: "v0.2.3", IsSuccess: true}, err: nil, }, + { + Name: "no fetches", + Config: Configuration{ + Providers: []*Provider{ + {Type: "test2", Version: "~> v0.2.0"}, + }, + }, + err: errors.New("there is no finished fetches for provider test2"), + }, + { + Name: "no fetches", + Config: Configuration{ + Providers: []*Provider{ + {Type: "test3", Version: "~> v0.2.0"}, + }, + }, + f: &fetch_summary.FetchSummary{ProviderName: "test3", ProviderVersion: "v0.2.3", IsSuccess: false}, + err: errors.New("last fetch for provider test3 wasn't successful"), + }, + { + Name: "no fetches", + Config: Configuration{ + Providers: []*Provider{ + {Type: "test4", Version: "~> v0.3.0"}, + }, + }, + f: &fetch_summary.FetchSummary{ProviderName: "test4", ProviderVersion: "v0.2.3", IsSuccess: true}, + err: errors.New("the latest fetch for provider test4 does not satisfy version requirement ~> v0.3.0"), + }, + { + Name: "no fetches", + Config: Configuration{ + Providers: []*Provider{ + {Type: "test4", Version: ""}, + }, + }, + f: &fetch_summary.FetchSummary{ProviderName: "test4", ProviderVersion: "v0.2.3", IsSuccess: true}, + err: errors.New("failed to parse version constraint for provider test4: Malformed constraint: "), + }, } for _, tc := range cases { - tc.f.CqId = uuid.New() - tc.f.FetchId = uuid.New() - finish := time.Now().UTC() - tc.f.Finish = &finish - err, clear := setupCheckFetchDatabase(db, tc.f) - assert.NoError(t, err) + t.Run(tc.Name, func(t *testing.T) { + err, clear := setupCheckFetchDatabase(db, tc.f) + assert.NoError(t, err) - err = executor.checkFetches(context.Background(), &tc.Config) - assert.Equal(t, err, tc.err) - clear(t) + err = executor.checkFetches(context.Background(), &tc.Config) + if tc.err != nil { + assert.Equal(t, tc.err.Error(), err.Error()) + } else { + assert.NoError(t, err) + } + clear(t) + }) } } From 7bd5c5ad5213aa86049f963a91beb06b8bc20d04 Mon Sep 17 00:00:00 2001 From: Andrii Romanenko Date: Wed, 26 Jan 2022 14:57:09 +0200 Subject: [PATCH 03/19] added more tests --- pkg/client/client.go | 3 ++- pkg/client/fetch_summary/fetch_summary_test.go | 7 ++++--- pkg/policy/execute.go | 5 +++-- pkg/policy/execute_test.go | 7 ++++--- 4 files changed, 13 insertions(+), 9 deletions(-) diff --git a/pkg/client/client.go b/pkg/client/client.go index ce462f19787e69..6469e2c86fd8f3 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -5,7 +5,6 @@ import ( "embed" "errors" "fmt" - "github.com/cloudquery/cloudquery/pkg/client/fetch_summary" "io" "io/fs" "path/filepath" @@ -14,6 +13,8 @@ import ( "strings" "time" + "github.com/cloudquery/cloudquery/pkg/client/fetch_summary" + "github.com/cloudquery/cloudquery/internal/logging" "github.com/cloudquery/cloudquery/internal/telemetry" "github.com/cloudquery/cloudquery/pkg/client/database" diff --git a/pkg/client/fetch_summary/fetch_summary_test.go b/pkg/client/fetch_summary/fetch_summary_test.go index 2d34635d3161cc..a9bc4df04fdcc2 100644 --- a/pkg/client/fetch_summary/fetch_summary_test.go +++ b/pkg/client/fetch_summary/fetch_summary_test.go @@ -3,11 +3,12 @@ package fetch_summary import ( "context" "errors" - sdkdb "github.com/cloudquery/cq-provider-sdk/database" - "github.com/hashicorp/go-hclog" "testing" "time" + sdkdb "github.com/cloudquery/cq-provider-sdk/database" + "github.com/hashicorp/go-hclog" + "github.com/google/uuid" "github.com/stretchr/testify/assert" ) @@ -77,7 +78,7 @@ var fetchSummaryTests = []fetchSummaryTest{ } func TestFetchSummary(t *testing.T) { - //todo be sure that it is running after core migrations + // todo be sure that it is running after core migrations // create database connection db, err := sdkdb.New(context.Background(), hclog.NewNullLogger(), testDBConnection) assert.NoError(t, err) diff --git a/pkg/policy/execute.go b/pkg/policy/execute.go index 50129c3afe033e..8a41dbda882544 100644 --- a/pkg/policy/execute.go +++ b/pkg/policy/execute.go @@ -5,11 +5,12 @@ import ( "encoding/json" "errors" "fmt" - "github.com/cloudquery/cloudquery/pkg/client/fetch_summary" "path" "path/filepath" "strings" + "github.com/cloudquery/cloudquery/pkg/client/fetch_summary" + "github.com/cloudquery/cq-provider-sdk/provider/schema" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-version" @@ -205,7 +206,7 @@ func (e *Executor) checkFetches(ctx context.Context, policyConfig *Configuration if fetchSummary == nil { return fmt.Errorf("there is no finished fetches for provider %s", p.Type) } - if fetchSummary.IsSuccess == false { + if !fetchSummary.IsSuccess { return fmt.Errorf("last fetch for provider %s wasn't successful", p.Type) } v, err := version.NewVersion(fetchSummary.ProviderVersion) diff --git a/pkg/policy/execute_test.go b/pkg/policy/execute_test.go index 5e3826857c5530..f75f6c47582130 100644 --- a/pkg/policy/execute_test.go +++ b/pkg/policy/execute_test.go @@ -4,14 +4,15 @@ import ( "context" "errors" "fmt" + "testing" + "time" + "github.com/cloudquery/cloudquery/pkg/client/fetch_summary" sdkdb "github.com/cloudquery/cq-provider-sdk/database" "github.com/cloudquery/cq-provider-sdk/provider/schema" "github.com/google/uuid" "github.com/hashicorp/go-hclog" "github.com/stretchr/testify/assert" - "testing" - "time" ) func setupPolicyDatabase(t *testing.T, tableName string) (schema.QueryExecer, func(t *testing.T)) { @@ -358,7 +359,7 @@ func setupCheckFetchDatabase(db schema.QueryExecer, fetchSummary *fetch_summary. } func TestExecutor_CheckFetches(t *testing.T) { - //todo be sure that it is running after core migrations + // todo be sure that it is running after core migrations db, err := sdkdb.New(context.Background(), hclog.NewNullLogger(), testDBConnection) executor := NewExecutor(db, hclog.Default(), nil) From 7a28410fc05a96bf096eab6bdb54d362aa761f3d Mon Sep 17 00:00:00 2001 From: Andrii Romanenko Date: Mon, 31 Jan 2022 15:58:53 +0200 Subject: [PATCH 04/19] fix --- pkg/client/client.go | 3 +-- pkg/client/fetch_summary/fetch_summary.go | 14 ++++++++------ pkg/policy/execute_test.go | 20 +++++++++++++++----- 3 files changed, 24 insertions(+), 13 deletions(-) diff --git a/pkg/client/client.go b/pkg/client/client.go index 6469e2c86fd8f3..98fd5e0ca7a779 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -13,12 +13,11 @@ import ( "strings" "time" - "github.com/cloudquery/cloudquery/pkg/client/fetch_summary" - "github.com/cloudquery/cloudquery/internal/logging" "github.com/cloudquery/cloudquery/internal/telemetry" "github.com/cloudquery/cloudquery/pkg/client/database" "github.com/cloudquery/cloudquery/pkg/client/database/timescale" + "github.com/cloudquery/cloudquery/pkg/client/fetch_summary" "github.com/cloudquery/cloudquery/pkg/client/history" "github.com/cloudquery/cloudquery/pkg/config" "github.com/cloudquery/cloudquery/pkg/module" diff --git a/pkg/client/fetch_summary/fetch_summary.go b/pkg/client/fetch_summary/fetch_summary.go index 87aee6a05b6973..83cafe5c0ad87b 100644 --- a/pkg/client/fetch_summary/fetch_summary.go +++ b/pkg/client/fetch_summary/fetch_summary.go @@ -4,6 +4,8 @@ import ( "context" "database/sql/driver" "encoding/json" + "fmt" + "github.com/jackc/pgx/v4" "time" "github.com/cloudquery/cq-provider-sdk/provider/schema" @@ -100,13 +102,13 @@ func (c *FetchSummaryClient) GetLatestFetchSummaryForProvider(ctx context.Contex if err != nil { return nil, err } - var data []FetchSummary - err = pgxscan.Select(ctx, c.db, &data, sql) + var data FetchSummary + err = pgxscan.Get(ctx, c.db, &data, sql) if err != nil { + if err == pgx.ErrNoRows { + return nil, fmt.Errorf("there is no successful fetch for provider %s: %w", provider, err) + } return nil, err } - if len(data) == 0 { - return nil, nil - } - return &data[0], nil + return &data, nil } diff --git a/pkg/policy/execute_test.go b/pkg/policy/execute_test.go index f75f6c47582130..bf4965b37c54c9 100644 --- a/pkg/policy/execute_test.go +++ b/pkg/policy/execute_test.go @@ -363,6 +363,7 @@ func TestExecutor_CheckFetches(t *testing.T) { db, err := sdkdb.New(context.Background(), hclog.NewNullLogger(), testDBConnection) executor := NewExecutor(db, hclog.Default(), nil) + finish := time.Now().UTC() assert.NoError(t, err) cases := []struct { Name string @@ -377,7 +378,7 @@ func TestExecutor_CheckFetches(t *testing.T) { {Type: "test1", Version: "~> v0.2.0"}, }, }, - f: &fetch_summary.FetchSummary{ProviderName: "test1", ProviderVersion: "v0.2.3", IsSuccess: true}, + f: &fetch_summary.FetchSummary{ProviderName: "test1", ProviderVersion: "v0.2.3", Finish: &finish, IsSuccess: true}, err: nil, }, { @@ -387,7 +388,16 @@ func TestExecutor_CheckFetches(t *testing.T) { {Type: "test2", Version: "~> v0.2.0"}, }, }, - err: errors.New("there is no finished fetches for provider test2"), + err: errors.New("failed to get fetch summary for provider test2: no rows in result set"), + }, { + Name: "no finished fetches", + Config: Configuration{ + Providers: []*Provider{ + {Type: "no_finish", Version: "~> v0.2.0"}, + }, + }, + f: &fetch_summary.FetchSummary{ProviderName: "test3", ProviderVersion: "v0.2.3", IsSuccess: false}, + err: errors.New("failed to get fetch summary for provider no_finish: no rows in result set"), }, { Name: "no fetches", @@ -396,7 +406,7 @@ func TestExecutor_CheckFetches(t *testing.T) { {Type: "test3", Version: "~> v0.2.0"}, }, }, - f: &fetch_summary.FetchSummary{ProviderName: "test3", ProviderVersion: "v0.2.3", IsSuccess: false}, + f: &fetch_summary.FetchSummary{ProviderName: "test3", ProviderVersion: "v0.2.3", Finish: &finish, IsSuccess: false}, err: errors.New("last fetch for provider test3 wasn't successful"), }, { @@ -406,7 +416,7 @@ func TestExecutor_CheckFetches(t *testing.T) { {Type: "test4", Version: "~> v0.3.0"}, }, }, - f: &fetch_summary.FetchSummary{ProviderName: "test4", ProviderVersion: "v0.2.3", IsSuccess: true}, + f: &fetch_summary.FetchSummary{ProviderName: "test4", ProviderVersion: "v0.2.3", Finish: &finish, IsSuccess: true}, err: errors.New("the latest fetch for provider test4 does not satisfy version requirement ~> v0.3.0"), }, { @@ -416,7 +426,7 @@ func TestExecutor_CheckFetches(t *testing.T) { {Type: "test4", Version: ""}, }, }, - f: &fetch_summary.FetchSummary{ProviderName: "test4", ProviderVersion: "v0.2.3", IsSuccess: true}, + f: &fetch_summary.FetchSummary{ProviderName: "test4", ProviderVersion: "v0.2.3", Finish: &finish, IsSuccess: true}, err: errors.New("failed to parse version constraint for provider test4: Malformed constraint: "), }, } From c8420f53c53f4a5ed4bfb6dd68bee2083d89839c Mon Sep 17 00:00:00 2001 From: Andrii Romanenko Date: Mon, 31 Jan 2022 17:27:05 +0200 Subject: [PATCH 05/19] fix --- pkg/client/fetch_summary/fetch_summary.go | 6 +++--- pkg/policy/execute_test.go | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/client/fetch_summary/fetch_summary.go b/pkg/client/fetch_summary/fetch_summary.go index 83cafe5c0ad87b..c5c14cf070b055 100644 --- a/pkg/client/fetch_summary/fetch_summary.go +++ b/pkg/client/fetch_summary/fetch_summary.go @@ -4,7 +4,7 @@ import ( "context" "database/sql/driver" "encoding/json" - "fmt" + "errors" "github.com/jackc/pgx/v4" "time" @@ -105,8 +105,8 @@ func (c *FetchSummaryClient) GetLatestFetchSummaryForProvider(ctx context.Contex var data FetchSummary err = pgxscan.Get(ctx, c.db, &data, sql) if err != nil { - if err == pgx.ErrNoRows { - return nil, fmt.Errorf("there is no successful fetch for provider %s: %w", provider, err) + if errors.Is(err, pgx.ErrNoRows) { + return nil, errors.New("there is no successful fetch for requested provider") } return nil, err } diff --git a/pkg/policy/execute_test.go b/pkg/policy/execute_test.go index bf4965b37c54c9..da8893c6be3a3e 100644 --- a/pkg/policy/execute_test.go +++ b/pkg/policy/execute_test.go @@ -388,7 +388,7 @@ func TestExecutor_CheckFetches(t *testing.T) { {Type: "test2", Version: "~> v0.2.0"}, }, }, - err: errors.New("failed to get fetch summary for provider test2: no rows in result set"), + err: errors.New("failed to get fetch summary for provider test2: there is no successful fetch for requested provider"), }, { Name: "no finished fetches", Config: Configuration{ @@ -397,7 +397,7 @@ func TestExecutor_CheckFetches(t *testing.T) { }, }, f: &fetch_summary.FetchSummary{ProviderName: "test3", ProviderVersion: "v0.2.3", IsSuccess: false}, - err: errors.New("failed to get fetch summary for provider no_finish: no rows in result set"), + err: errors.New("failed to get fetch summary for provider no_finish: there is no successful fetch for requested provider"), }, { Name: "no fetches", From 897382d53a8541ac98fe452fa3aa05bd6a0c0865 Mon Sep 17 00:00:00 2001 From: Andrii Romanenko Date: Mon, 31 Jan 2022 18:17:28 +0200 Subject: [PATCH 06/19] fix --- pkg/client/fetch_summary/fetch_summary.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/client/fetch_summary/fetch_summary.go b/pkg/client/fetch_summary/fetch_summary.go index c5c14cf070b055..33b8a372698808 100644 --- a/pkg/client/fetch_summary/fetch_summary.go +++ b/pkg/client/fetch_summary/fetch_summary.go @@ -5,15 +5,15 @@ import ( "database/sql/driver" "encoding/json" "errors" - "github.com/jackc/pgx/v4" "time" - "github.com/cloudquery/cq-provider-sdk/provider/schema" - "github.com/georgysavva/scany/pgxscan" + "github.com/jackc/pgx/v4" "github.com/cloudquery/cq-provider-sdk/cqproto" + "github.com/cloudquery/cq-provider-sdk/provider/schema" "github.com/cloudquery/cq-provider-sdk/provider/schema/diag" "github.com/doug-martin/goqu/v9" + "github.com/georgysavva/scany/pgxscan" "github.com/google/uuid" ) From a689ca66b15cfe91c40e785e43da7eca4863dfde Mon Sep 17 00:00:00 2001 From: Andrii Romanenko Date: Mon, 31 Jan 2022 19:30:39 +0200 Subject: [PATCH 07/19] naming fixes --- pkg/client/client.go | 4 ++-- pkg/client/fetch_summary/fetch_summary.go | 14 +++++++------- pkg/client/fetch_summary/fetch_summary_test.go | 7 +++---- pkg/policy/execute.go | 4 ++-- pkg/policy/execute_test.go | 4 ++-- 5 files changed, 16 insertions(+), 17 deletions(-) diff --git a/pkg/client/client.go b/pkg/client/client.go index 98fd5e0ca7a779..cf17fb02f58fc9 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -410,7 +410,7 @@ func (c *Client) Fetch(ctx context.Context, request FetchRequest) (res *FetchRes return nil, err } - fetchSummaryClient := fetch_summary.NewFetchSummaryClient(c.db) + fetchSummaryClient := fetch_summary.NewClient(c.db) for _, providerConfig := range request.Providers { providerConfig := providerConfig @@ -423,7 +423,7 @@ func (c *Client) Fetch(ctx context.Context, request FetchRequest) (res *FetchRes CoreVersion: Version, } saveFetchSummary := func() { - if err := fetchSummaryClient.SaveFetchSummary(ctx, &fetchSummary); err != nil { + if err := fetchSummaryClient.Save(ctx, &fetchSummary); err != nil { c.Logger.Error("failed to save fetch summary", "err", err) } } diff --git a/pkg/client/fetch_summary/fetch_summary.go b/pkg/client/fetch_summary/fetch_summary.go index 33b8a372698808..17f62d01958de3 100644 --- a/pkg/client/fetch_summary/fetch_summary.go +++ b/pkg/client/fetch_summary/fetch_summary.go @@ -17,12 +17,12 @@ import ( "github.com/google/uuid" ) -type FetchSummaryClient struct { +type Client struct { db schema.QueryExecer } -func NewFetchSummaryClient(db schema.QueryExecer) *FetchSummaryClient { - return &FetchSummaryClient{ +func NewClient(db schema.QueryExecer) *Client { + return &Client{ db: db, } } @@ -75,8 +75,8 @@ type ResourceFetchSummary struct { Diagnostics diag.Diagnostics `json:"diagnostics"` } -// SaveFetchSummary saves fetch summary into fetches database -func (c *FetchSummaryClient) SaveFetchSummary(ctx context.Context, fs *FetchSummary) error { +// Save saves fetch summary into fetches database +func (c *Client) Save(ctx context.Context, fs *FetchSummary) error { id, err := uuid.NewUUID() if err != nil { return err @@ -90,8 +90,8 @@ func (c *FetchSummaryClient) SaveFetchSummary(ctx context.Context, fs *FetchSumm return c.db.Exec(ctx, sql, args...) } -// GetLatestFetchSummaryForProvider gets latest fetch summary for specific provider -func (c *FetchSummaryClient) GetLatestFetchSummaryForProvider(ctx context.Context, provider string) (*FetchSummary, error) { +// GetForProvider gets latest fetch summary for specific provider +func (c *Client) GetForProvider(ctx context.Context, provider string) (*FetchSummary, error) { q := goqu.Dialect("postgres"). Select("provider_version", "is_success"). From("cloudquery.fetches"). diff --git a/pkg/client/fetch_summary/fetch_summary_test.go b/pkg/client/fetch_summary/fetch_summary_test.go index a9bc4df04fdcc2..ca0128477b5881 100644 --- a/pkg/client/fetch_summary/fetch_summary_test.go +++ b/pkg/client/fetch_summary/fetch_summary_test.go @@ -7,9 +7,8 @@ import ( "time" sdkdb "github.com/cloudquery/cq-provider-sdk/database" - "github.com/hashicorp/go-hclog" - "github.com/google/uuid" + "github.com/hashicorp/go-hclog" "github.com/stretchr/testify/assert" ) @@ -83,7 +82,7 @@ func TestFetchSummary(t *testing.T) { db, err := sdkdb.New(context.Background(), hclog.NewNullLogger(), testDBConnection) assert.NoError(t, err) - fetchSummaryClient := NewFetchSummaryClient(db) + fetchSummaryClient := NewClient(db) fetchId := uuid.New() for _, f := range fetchSummaryTests { @@ -92,7 +91,7 @@ func TestFetchSummary(t *testing.T) { } start := time.Now() f.summary.Start = &start - err := fetchSummaryClient.SaveFetchSummary(context.Background(), &f.summary) + err := fetchSummaryClient.Save(context.Background(), &f.summary) if f.err != nil { assert.EqualError(t, err, f.err.Error()) } else { diff --git a/pkg/policy/execute.go b/pkg/policy/execute.go index 8a41dbda882544..9aff0c3bbe8043 100644 --- a/pkg/policy/execute.go +++ b/pkg/policy/execute.go @@ -193,13 +193,13 @@ func (e *Executor) checkFetches(ctx context.Context, policyConfig *Configuration if policyConfig == nil { return nil } - fetchSummaryClient := fetch_summary.NewFetchSummaryClient(e.conn) + fetchSummaryClient := fetch_summary.NewClient(e.conn) for _, p := range policyConfig.Providers { c, err := version.NewConstraint(p.Version) if err != nil { return fmt.Errorf("failed to parse version constraint for provider %s: %w", p.Type, err) } - fetchSummary, err := fetchSummaryClient.GetLatestFetchSummaryForProvider(ctx, p.Type) + fetchSummary, err := fetchSummaryClient.GetForProvider(ctx, p.Type) if err != nil { return fmt.Errorf("failed to get fetch summary for provider %s: %w", p.Type, err) } diff --git a/pkg/policy/execute_test.go b/pkg/policy/execute_test.go index da8893c6be3a3e..fd6c41f5ff8ffc 100644 --- a/pkg/policy/execute_test.go +++ b/pkg/policy/execute_test.go @@ -345,8 +345,8 @@ func setupCheckFetchDatabase(db schema.QueryExecer, fetchSummary *fetch_summary. fetchSummary.FetchId = uuid.New() finish := time.Now().UTC() fetchSummary.Finish = &finish - fetchSummaryClient := fetch_summary.NewFetchSummaryClient(db) - err := fetchSummaryClient.SaveFetchSummary(context.Background(), fetchSummary) + fetchSummaryClient := fetch_summary.NewClient(db) + err := fetchSummaryClient.Save(context.Background(), fetchSummary) if err != nil { return err, nil } From dbc0d68752966637a4a75860440ce6339e07c4e0 Mon Sep 17 00:00:00 2001 From: Andrii Romanenko Date: Tue, 1 Feb 2022 16:53:30 +0200 Subject: [PATCH 08/19] core migration moved to fetch summary client added migrations for timescaledb --- pkg/client/client.go | 62 +++----------- .../{fetch_summary => fetch}/fetch_summary.go | 81 +++++++++++++++---- .../fetch_summary_test.go | 36 ++++++--- .../migrations/postgres/1_v0.19.2.down.sql | 0 .../migrations/postgres/1_v0.19.2.up.sql | 0 .../migrations/postgres/2_v0.19.3.down.sql | 0 .../migrations/postgres/2_v0.19.3.up.sql | 0 .../migrations/timescale/1_v0.19.2.down.sql | 1 + .../migrations/timescale/1_v0.19.2.up.sql | 16 ++++ .../migrations/timescale/2_v0.19.3.down.sql | 5 ++ .../migrations/timescale/2_v0.19.3.up.sql | 10 +++ pkg/policy/execute.go | 6 +- pkg/policy/execute_test.go | 47 +++++++---- 13 files changed, 165 insertions(+), 99 deletions(-) rename pkg/client/{fetch_summary => fetch}/fetch_summary.go (62%) rename pkg/client/{fetch_summary => fetch}/fetch_summary_test.go (71%) rename pkg/client/{ => fetch}/migrations/postgres/1_v0.19.2.down.sql (100%) rename pkg/client/{ => fetch}/migrations/postgres/1_v0.19.2.up.sql (100%) rename pkg/client/{ => fetch}/migrations/postgres/2_v0.19.3.down.sql (100%) rename pkg/client/{ => fetch}/migrations/postgres/2_v0.19.3.up.sql (100%) create mode 100644 pkg/client/fetch/migrations/timescale/1_v0.19.2.down.sql create mode 100644 pkg/client/fetch/migrations/timescale/1_v0.19.2.up.sql create mode 100644 pkg/client/fetch/migrations/timescale/2_v0.19.3.down.sql create mode 100644 pkg/client/fetch/migrations/timescale/2_v0.19.3.up.sql diff --git a/pkg/client/client.go b/pkg/client/client.go index cf17fb02f58fc9..a40fa6b455f67b 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -2,7 +2,6 @@ package client import ( "context" - "embed" "errors" "fmt" "io" @@ -17,7 +16,7 @@ import ( "github.com/cloudquery/cloudquery/internal/telemetry" "github.com/cloudquery/cloudquery/pkg/client/database" "github.com/cloudquery/cloudquery/pkg/client/database/timescale" - "github.com/cloudquery/cloudquery/pkg/client/fetch_summary" + "github.com/cloudquery/cloudquery/pkg/client/fetch" "github.com/cloudquery/cloudquery/pkg/client/history" "github.com/cloudquery/cloudquery/pkg/config" "github.com/cloudquery/cloudquery/pkg/module" @@ -49,8 +48,6 @@ import ( var ( ErrMigrationsNotSupported = errors.New("provider doesn't support migrations") - //go:embed migrations/*/*.sql - coreMigrations embed.FS ) // FetchRequest is provided to the Client to execute a fetch on one or more providers @@ -247,8 +244,10 @@ type Client struct { // HistoryConfig defines configuration for CloudQuery history mode HistoryCfg *history.Config - db *sdkdb.DB - dialectExecutor database.DialectExecutor + // fetchSummaryClient interacts with cloudquery core resources + fetchSummaryClient *fetch.Client + db *sdkdb.DB + dialectExecutor database.DialectExecutor } func New(ctx context.Context, options ...Option) (*Client, error) { @@ -410,12 +409,10 @@ func (c *Client) Fetch(ctx context.Context, request FetchRequest) (res *FetchRes return nil, err } - fetchSummaryClient := fetch_summary.NewClient(c.db) - for _, providerConfig := range request.Providers { providerConfig := providerConfig createdAt := time.Now().UTC() - fetchSummary := fetch_summary.FetchSummary{ + fetchSummary := fetch.Summary{ FetchId: fetchId, ProviderName: providerConfig.Name, ProviderAlias: providerConfig.Alias, @@ -423,7 +420,7 @@ func (c *Client) Fetch(ctx context.Context, request FetchRequest) (res *FetchRes CoreVersion: Version, } saveFetchSummary := func() { - if err := fetchSummaryClient.Save(ctx, &fetchSummary); err != nil { + if err := c.fetchSummaryClient.Save(ctx, &fetchSummary); err != nil { c.Logger.Error("failed to save fetch summary", "err", err) } } @@ -546,7 +543,7 @@ func (c *Client) Fetch(ctx context.Context, request FetchRequest) (res *FetchRes request.UpdateCallback(update) } - fetchSummary.Resources = append(fetchSummary.Resources, fetch_summary.ResourceFetchSummary{ + fetchSummary.Resources = append(fetchSummary.Resources, fetch.ResourceSummary{ ResourceName: resp.ResourceName, FinishedResources: resp.FinishedResources, Status: strconv.Itoa(int(resp.Summary.Status)), // todo use human readable representation of status @@ -935,42 +932,6 @@ func (c *Client) buildProviderMigrator(ctx context.Context, migrations map[strin return m, providerConfig, err } -func (c *Client) MigrateCore(ctx context.Context, de database.DialectExecutor) error { - err := createCoreSchema(ctx, c.db) - if err != nil { - return err - } - - newDSN, err := de.Setup(ctx) - if err != nil { - return err - } - - migrations, err := migrator.ReadMigrationFiles(c.Logger, coreMigrations) - if err != nil { - return err - } - newDSN, err = dsn.SetDSNElement(newDSN, map[string]string{"search_path": "cloudquery"}) - if err != nil { - return err - } - m, err := migrator.New(c.Logger, schema.Postgres, migrations, newDSN, "cloudquery_core", nil) - if err != nil { - return err - } - - defer func() { - if err := m.Close(); err != nil { - c.Logger.Error("failed to close migrator connection", "error", err) - } - }() - - if err := m.UpgradeProvider(migrator.Latest); err != nil && err != migrate.ErrNoChange { - return fmt.Errorf("failed to migrate cloudquery core schema: %w", err) - } - return nil -} - func (c *Client) getProviderConfig(providerName string) (*config.RequiredProvider, error) { var providerConfig *config.RequiredProvider for _, p := range c.Providers { @@ -1087,10 +1048,6 @@ func reportFetchSummaryErrors(span otrace.Span, fetchSummaries map[string]Provid ) } -func createCoreSchema(ctx context.Context, db schema.QueryExecer) error { - return db.Exec(ctx, "CREATE SCHEMA IF NOT EXISTS cloudquery") -} - func (c *Client) initDatabase(ctx context.Context) error { var err error c.db, err = sdkdb.New(ctx, c.Logger, c.DSN) @@ -1123,8 +1080,9 @@ func (c *Client) initDatabase(ctx context.Context) error { c.Logger.Warn("postgres validation warning") } + c.fetchSummaryClient = fetch.NewClient(c.db, c.Logger) // migrate cloudquery core tables to latest version - if err := c.MigrateCore(ctx, c.dialectExecutor); err != nil { + if err := c.fetchSummaryClient.MigrateCore(ctx, c.dialectExecutor); err != nil { return fmt.Errorf("failed to migrate cloudquery_core tables: %w", err) } diff --git a/pkg/client/fetch_summary/fetch_summary.go b/pkg/client/fetch/fetch_summary.go similarity index 62% rename from pkg/client/fetch_summary/fetch_summary.go rename to pkg/client/fetch/fetch_summary.go index 17f62d01958de3..f0b239c832f2ff 100644 --- a/pkg/client/fetch_summary/fetch_summary.go +++ b/pkg/client/fetch/fetch_summary.go @@ -1,35 +1,48 @@ -package fetch_summary +package fetch import ( "context" "database/sql/driver" + "embed" "encoding/json" "errors" + "fmt" "time" - "github.com/jackc/pgx/v4" - + "github.com/cloudquery/cloudquery/pkg/client/database" "github.com/cloudquery/cq-provider-sdk/cqproto" + "github.com/cloudquery/cq-provider-sdk/database/dsn" + "github.com/cloudquery/cq-provider-sdk/migration/migrator" "github.com/cloudquery/cq-provider-sdk/provider/schema" "github.com/cloudquery/cq-provider-sdk/provider/schema/diag" "github.com/doug-martin/goqu/v9" "github.com/georgysavva/scany/pgxscan" + "github.com/golang-migrate/migrate/v4" "github.com/google/uuid" + "github.com/hashicorp/go-hclog" + "github.com/jackc/pgx/v4" +) + +var ( + //go:embed migrations/*/*.sql + coreMigrations embed.FS ) type Client struct { - db schema.QueryExecer + db schema.QueryExecer + Logger hclog.Logger } -func NewClient(db schema.QueryExecer) *Client { +func NewClient(db schema.QueryExecer, logger hclog.Logger) *Client { return &Client{ - db: db, + db: db, + Logger: logger, } } -// FetchSummary includes a summarized report of fetch, such as fetch id, fetch start and finish, +// Summary includes a summarized report of fetch, such as fetch id, fetch start and finish, // resources fetch results -type FetchSummary struct { +type Summary struct { CqId uuid.UUID `db:"id"` // Unique Id of fetch session FetchId uuid.UUID `db:"fetch_id"` @@ -46,7 +59,7 @@ type FetchSummary struct { Resources ResourceFetchSummaries `db:"results"` } -type ResourceFetchSummaries []ResourceFetchSummary +type ResourceFetchSummaries []ResourceSummary // Value implements Valuer interface required by goqu func (r ResourceFetchSummaries) Value() (driver.Value, error) { @@ -56,8 +69,8 @@ func (r ResourceFetchSummaries) Value() (driver.Value, error) { return json.Marshal(r) } -// ResourceFetchSummary includes a data about fetching specific resource -type ResourceFetchSummary struct { +// ResourceSummary includes a data about fetching specific resource +type ResourceSummary struct { ResourceName string `json:"resource_name"` // map of resources that have finished fetching FinishedResources map[string]bool `json:"finished_resources"` @@ -76,7 +89,7 @@ type ResourceFetchSummary struct { } // Save saves fetch summary into fetches database -func (c *Client) Save(ctx context.Context, fs *FetchSummary) error { +func (c *Client) Save(ctx context.Context, fs *Summary) error { id, err := uuid.NewUUID() if err != nil { return err @@ -91,7 +104,7 @@ func (c *Client) Save(ctx context.Context, fs *FetchSummary) error { } // GetForProvider gets latest fetch summary for specific provider -func (c *Client) GetForProvider(ctx context.Context, provider string) (*FetchSummary, error) { +func (c *Client) GetForProvider(ctx context.Context, provider string) (*Summary, error) { q := goqu.Dialect("postgres"). Select("provider_version", "is_success"). From("cloudquery.fetches"). @@ -102,7 +115,7 @@ func (c *Client) GetForProvider(ctx context.Context, provider string) (*FetchSum if err != nil { return nil, err } - var data FetchSummary + var data Summary err = pgxscan.Get(ctx, c.db, &data, sql) if err != nil { if errors.Is(err, pgx.ErrNoRows) { @@ -112,3 +125,43 @@ func (c *Client) GetForProvider(ctx context.Context, provider string) (*FetchSum } return &data, nil } + +func (c *Client) MigrateCore(ctx context.Context, de database.DialectExecutor) error { + err := createCoreSchema(ctx, c.db) + if err != nil { + return err + } + + newDSN, err := de.Setup(ctx) + if err != nil { + return err + } + + migrations, err := migrator.ReadMigrationFiles(c.Logger, coreMigrations) + if err != nil { + return err + } + newDSN, err = dsn.SetDSNElement(newDSN, map[string]string{"search_path": "cloudquery"}) + if err != nil { + return err + } + m, err := migrator.New(c.Logger, schema.Postgres, migrations, newDSN, "cloudquery_core", nil) + if err != nil { + return err + } + + defer func() { + if err := m.Close(); err != nil { + c.Logger.Error("failed to close migrator connection", "error", err) + } + }() + + if err := m.UpgradeProvider(migrator.Latest); err != nil && err != migrate.ErrNoChange { + return fmt.Errorf("failed to migrate cloudquery core schema: %w", err) + } + return nil +} + +func createCoreSchema(ctx context.Context, db schema.QueryExecer) error { + return db.Exec(ctx, "CREATE SCHEMA IF NOT EXISTS cloudquery") +} diff --git a/pkg/client/fetch_summary/fetch_summary_test.go b/pkg/client/fetch/fetch_summary_test.go similarity index 71% rename from pkg/client/fetch_summary/fetch_summary_test.go rename to pkg/client/fetch/fetch_summary_test.go index ca0128477b5881..52d0698d174fe3 100644 --- a/pkg/client/fetch_summary/fetch_summary_test.go +++ b/pkg/client/fetch/fetch_summary_test.go @@ -1,11 +1,14 @@ -package fetch_summary +package fetch import ( "context" "errors" + "fmt" "testing" "time" + "github.com/cloudquery/cloudquery/pkg/client/database" + "github.com/cloudquery/cloudquery/pkg/client/history" sdkdb "github.com/cloudquery/cq-provider-sdk/database" "github.com/google/uuid" "github.com/hashicorp/go-hclog" @@ -15,22 +18,22 @@ import ( const testDBConnection = "postgres://postgres:pass@localhost:5432/postgres?sslmode=disable" type fetchSummaryTest struct { - summary FetchSummary + summary Summary err error skipFetchId bool } var fetchSummaryTests = []fetchSummaryTest{ { - summary: FetchSummary{ + summary: Summary{ ProviderName: "test", ProviderVersion: "v0.0.0", }, }, { - summary: FetchSummary{ + summary: Summary{ ProviderName: "test1", - Resources: []ResourceFetchSummary{ + Resources: []ResourceSummary{ { ResourceName: "test", ResourceCount: 99, @@ -39,15 +42,15 @@ var fetchSummaryTests = []fetchSummaryTest{ }, }, { - summary: FetchSummary{ + summary: Summary{ ProviderName: "test2", ProviderVersion: "v0.0.1", }, }, { - summary: FetchSummary{ + summary: Summary{ ProviderName: "test4", - Resources: []ResourceFetchSummary{ + Resources: []ResourceSummary{ { ResourceName: "test", ResourceCount: 99, @@ -60,14 +63,14 @@ var fetchSummaryTests = []fetchSummaryTest{ }, }, { - summary: FetchSummary{ + summary: Summary{ ProviderName: "test2", ProviderVersion: "v0.0.1", }, err: errors.New("ERROR: duplicate key value violates unique constraint \"fetches_pk\" (SQLSTATE 23505)"), }, { - summary: FetchSummary{ + summary: Summary{ ProviderName: "test3", ProviderVersion: "v0.0.1", }, @@ -76,13 +79,20 @@ var fetchSummaryTests = []fetchSummaryTest{ }, } -func TestFetchSummary(t *testing.T) { - // todo be sure that it is running after core migrations +func TestFetchSaveSummary(t *testing.T) { // create database connection db, err := sdkdb.New(context.Background(), hclog.NewNullLogger(), testDBConnection) assert.NoError(t, err) - fetchSummaryClient := NewClient(db) + fetchSummaryClient := NewClient(db, hclog.NewNullLogger()) + + _, de, err := database.GetExecutor(hclog.NewNullLogger(), testDBConnection, &history.Config{}) + if err != nil { + t.Fatal(fmt.Errorf("getExecutor: %w", err)) + } + + err = fetchSummaryClient.MigrateCore(context.Background(), de) + assert.NoError(t, err) fetchId := uuid.New() for _, f := range fetchSummaryTests { diff --git a/pkg/client/migrations/postgres/1_v0.19.2.down.sql b/pkg/client/fetch/migrations/postgres/1_v0.19.2.down.sql similarity index 100% rename from pkg/client/migrations/postgres/1_v0.19.2.down.sql rename to pkg/client/fetch/migrations/postgres/1_v0.19.2.down.sql diff --git a/pkg/client/migrations/postgres/1_v0.19.2.up.sql b/pkg/client/fetch/migrations/postgres/1_v0.19.2.up.sql similarity index 100% rename from pkg/client/migrations/postgres/1_v0.19.2.up.sql rename to pkg/client/fetch/migrations/postgres/1_v0.19.2.up.sql diff --git a/pkg/client/migrations/postgres/2_v0.19.3.down.sql b/pkg/client/fetch/migrations/postgres/2_v0.19.3.down.sql similarity index 100% rename from pkg/client/migrations/postgres/2_v0.19.3.down.sql rename to pkg/client/fetch/migrations/postgres/2_v0.19.3.down.sql diff --git a/pkg/client/migrations/postgres/2_v0.19.3.up.sql b/pkg/client/fetch/migrations/postgres/2_v0.19.3.up.sql similarity index 100% rename from pkg/client/migrations/postgres/2_v0.19.3.up.sql rename to pkg/client/fetch/migrations/postgres/2_v0.19.3.up.sql diff --git a/pkg/client/fetch/migrations/timescale/1_v0.19.2.down.sql b/pkg/client/fetch/migrations/timescale/1_v0.19.2.down.sql new file mode 100644 index 00000000000000..c22d574b4800f1 --- /dev/null +++ b/pkg/client/fetch/migrations/timescale/1_v0.19.2.down.sql @@ -0,0 +1 @@ +DROP TABLE cq_fetches; diff --git a/pkg/client/fetch/migrations/timescale/1_v0.19.2.up.sql b/pkg/client/fetch/migrations/timescale/1_v0.19.2.up.sql new file mode 100644 index 00000000000000..ab3f35fc1b2d7a --- /dev/null +++ b/pkg/client/fetch/migrations/timescale/1_v0.19.2.up.sql @@ -0,0 +1,16 @@ +CREATE TABLE IF NOT EXISTS fetches +( + id UUID NOT NULL, + fetch_id UUID NOT NULL, + START TIMESTAMP, + finish TIMESTAMP, + total_resource_count BIGINT, + total_errors_count BIGINT, + provider_name TEXT, + provider_version TEXT, + is_success BOOLEAN, + results jsonb, + CONSTRAINT "fetches_id" PRIMARY KEY (id), + CONSTRAINT "fetches_pk" UNIQUE (fetch_id, provider_name), + CONSTRAINT "non_nil_fetch_id" CHECK (fetch_id != '00000000-0000-0000-0000-000000000000') +); \ No newline at end of file diff --git a/pkg/client/fetch/migrations/timescale/2_v0.19.3.down.sql b/pkg/client/fetch/migrations/timescale/2_v0.19.3.down.sql new file mode 100644 index 00000000000000..fb0f9b44c67913 --- /dev/null +++ b/pkg/client/fetch/migrations/timescale/2_v0.19.3.down.sql @@ -0,0 +1,5 @@ +ALTER TABLE IF EXISTS fetches + DROP CONSTRAINT fetches_pk; +ALTER TABLE IF EXISTS fetches ADD CONSTRAINT "fetches_pk" UNIQUE (fetch_id, provider_name); +ALTER TABLE IF EXISTS fetches + DROP COLUMN provider_alias; \ No newline at end of file diff --git a/pkg/client/fetch/migrations/timescale/2_v0.19.3.up.sql b/pkg/client/fetch/migrations/timescale/2_v0.19.3.up.sql new file mode 100644 index 00000000000000..21a6dd24d0444e --- /dev/null +++ b/pkg/client/fetch/migrations/timescale/2_v0.19.3.up.sql @@ -0,0 +1,10 @@ +ALTER TABLE IF EXISTS fetches + ADD COLUMN IF NOT EXISTS provider_alias TEXT; +ALTER TABLE IF EXISTS fetches + ADD COLUMN IF NOT EXISTS core_version TEXT; +ALTER TABLE IF EXISTS fetches + ADD COLUMN IF NOT EXISTS created_at TIMESTAMP; +ALTER TABLE IF EXISTS fetches + DROP CONSTRAINT fetches_pk; +ALTER TABLE IF EXISTS fetches + ADD CONSTRAINT "fetches_pk" UNIQUE (fetch_id, provider_name, provider_alias); \ No newline at end of file diff --git a/pkg/policy/execute.go b/pkg/policy/execute.go index 9aff0c3bbe8043..d3dd9e5f6ae58c 100644 --- a/pkg/policy/execute.go +++ b/pkg/policy/execute.go @@ -9,7 +9,7 @@ import ( "path/filepath" "strings" - "github.com/cloudquery/cloudquery/pkg/client/fetch_summary" + "github.com/cloudquery/cloudquery/pkg/client/fetch" "github.com/cloudquery/cq-provider-sdk/provider/schema" "github.com/hashicorp/go-hclog" @@ -193,13 +193,13 @@ func (e *Executor) checkFetches(ctx context.Context, policyConfig *Configuration if policyConfig == nil { return nil } - fetchSummaryClient := fetch_summary.NewClient(e.conn) + fsClient := fetch.NewClient(e.conn, e.log) for _, p := range policyConfig.Providers { c, err := version.NewConstraint(p.Version) if err != nil { return fmt.Errorf("failed to parse version constraint for provider %s: %w", p.Type, err) } - fetchSummary, err := fetchSummaryClient.GetForProvider(ctx, p.Type) + fetchSummary, err := fsClient.GetForProvider(ctx, p.Type) if err != nil { return fmt.Errorf("failed to get fetch summary for provider %s: %w", p.Type, err) } diff --git a/pkg/policy/execute_test.go b/pkg/policy/execute_test.go index fd6c41f5ff8ffc..0953d9e40fe996 100644 --- a/pkg/policy/execute_test.go +++ b/pkg/policy/execute_test.go @@ -7,7 +7,9 @@ import ( "testing" "time" - "github.com/cloudquery/cloudquery/pkg/client/fetch_summary" + "github.com/cloudquery/cloudquery/pkg/client/database" + "github.com/cloudquery/cloudquery/pkg/client/fetch" + "github.com/cloudquery/cloudquery/pkg/client/history" sdkdb "github.com/cloudquery/cq-provider-sdk/database" "github.com/cloudquery/cq-provider-sdk/provider/schema" "github.com/google/uuid" @@ -337,30 +339,41 @@ func TestExecutor_Execute(t *testing.T) { } } -func setupCheckFetchDatabase(db schema.QueryExecer, fetchSummary *fetch_summary.FetchSummary) (error, func(t *testing.T)) { - if fetchSummary == nil { +func setupCheckFetchDatabase(db schema.QueryExecer, summary *fetch.Summary, c *fetch.Client) (error, func(t *testing.T)) { + if summary == nil { return nil, func(t *testing.T) {} } - fetchSummary.CqId = uuid.New() - fetchSummary.FetchId = uuid.New() + summary.CqId = uuid.New() + summary.FetchId = uuid.New() finish := time.Now().UTC() - fetchSummary.Finish = &finish - fetchSummaryClient := fetch_summary.NewClient(db) - err := fetchSummaryClient.Save(context.Background(), fetchSummary) + summary.Finish = &finish + err := c.Save(context.Background(), summary) if err != nil { return err, nil } // Return conn and tear down func return nil, func(t *testing.T) { - err = db.Exec(context.Background(), fmt.Sprintf(`DELETE FROM "cloudquery"."fetches" WHERE "id" = '%s';`, fetchSummary.FetchId.String())) + err = db.Exec(context.Background(), fmt.Sprintf(`DELETE FROM "cloudquery"."fetches" WHERE "id" = '%summary';`, summary.FetchId.String())) assert.NoError(t, err) } } func TestExecutor_CheckFetches(t *testing.T) { - // todo be sure that it is running after core migrations + // create database connection db, err := sdkdb.New(context.Background(), hclog.NewNullLogger(), testDBConnection) + assert.NoError(t, err) + + fetchSummaryClient := fetch.NewClient(db, hclog.NewNullLogger()) + + _, de, err := database.GetExecutor(hclog.NewNullLogger(), testDBConnection, &history.Config{}) + if err != nil { + t.Fatal(fmt.Errorf("getExecutor: %w", err)) + } + + err = fetchSummaryClient.MigrateCore(context.Background(), de) + assert.NoError(t, err) + executor := NewExecutor(db, hclog.Default(), nil) finish := time.Now().UTC() @@ -368,7 +381,7 @@ func TestExecutor_CheckFetches(t *testing.T) { cases := []struct { Name string Config Configuration - f *fetch_summary.FetchSummary + f *fetch.Summary err error }{ { @@ -378,7 +391,7 @@ func TestExecutor_CheckFetches(t *testing.T) { {Type: "test1", Version: "~> v0.2.0"}, }, }, - f: &fetch_summary.FetchSummary{ProviderName: "test1", ProviderVersion: "v0.2.3", Finish: &finish, IsSuccess: true}, + f: &fetch.Summary{ProviderName: "test1", ProviderVersion: "v0.2.3", Finish: &finish, IsSuccess: true}, err: nil, }, { @@ -396,7 +409,7 @@ func TestExecutor_CheckFetches(t *testing.T) { {Type: "no_finish", Version: "~> v0.2.0"}, }, }, - f: &fetch_summary.FetchSummary{ProviderName: "test3", ProviderVersion: "v0.2.3", IsSuccess: false}, + f: &fetch.Summary{ProviderName: "test3", ProviderVersion: "v0.2.3", IsSuccess: false}, err: errors.New("failed to get fetch summary for provider no_finish: there is no successful fetch for requested provider"), }, { @@ -406,7 +419,7 @@ func TestExecutor_CheckFetches(t *testing.T) { {Type: "test3", Version: "~> v0.2.0"}, }, }, - f: &fetch_summary.FetchSummary{ProviderName: "test3", ProviderVersion: "v0.2.3", Finish: &finish, IsSuccess: false}, + f: &fetch.Summary{ProviderName: "test3", ProviderVersion: "v0.2.3", Finish: &finish, IsSuccess: false}, err: errors.New("last fetch for provider test3 wasn't successful"), }, { @@ -416,7 +429,7 @@ func TestExecutor_CheckFetches(t *testing.T) { {Type: "test4", Version: "~> v0.3.0"}, }, }, - f: &fetch_summary.FetchSummary{ProviderName: "test4", ProviderVersion: "v0.2.3", Finish: &finish, IsSuccess: true}, + f: &fetch.Summary{ProviderName: "test4", ProviderVersion: "v0.2.3", Finish: &finish, IsSuccess: true}, err: errors.New("the latest fetch for provider test4 does not satisfy version requirement ~> v0.3.0"), }, { @@ -426,14 +439,14 @@ func TestExecutor_CheckFetches(t *testing.T) { {Type: "test4", Version: ""}, }, }, - f: &fetch_summary.FetchSummary{ProviderName: "test4", ProviderVersion: "v0.2.3", Finish: &finish, IsSuccess: true}, + f: &fetch.Summary{ProviderName: "test4", ProviderVersion: "v0.2.3", Finish: &finish, IsSuccess: true}, err: errors.New("failed to parse version constraint for provider test4: Malformed constraint: "), }, } for _, tc := range cases { t.Run(tc.Name, func(t *testing.T) { - err, clear := setupCheckFetchDatabase(db, tc.f) + err, clear := setupCheckFetchDatabase(db, tc.f, fetchSummaryClient) assert.NoError(t, err) err = executor.checkFetches(context.Background(), &tc.Config) From 544d4305504f604842dd6657089dc9940dac128e Mon Sep 17 00:00:00 2001 From: Andrii Romanenko Date: Tue, 1 Feb 2022 16:59:35 +0200 Subject: [PATCH 09/19] linter fix --- pkg/policy/execute.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/policy/execute.go b/pkg/policy/execute.go index d3dd9e5f6ae58c..455cde7bce39fa 100644 --- a/pkg/policy/execute.go +++ b/pkg/policy/execute.go @@ -10,7 +10,6 @@ import ( "strings" "github.com/cloudquery/cloudquery/pkg/client/fetch" - "github.com/cloudquery/cq-provider-sdk/provider/schema" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-version" From a9bc4c56dda5c8ff85dfdfc340713f050bd09e64 Mon Sep 17 00:00:00 2001 From: Andrii Romanenko Date: Tue, 1 Feb 2022 17:07:44 +0200 Subject: [PATCH 10/19] doc --- pkg/client/fetch/{fetch_summary.go => fetch.go} | 1 + pkg/client/fetch/{fetch_summary_test.go => fetch_test.go} | 0 2 files changed, 1 insertion(+) rename pkg/client/fetch/{fetch_summary.go => fetch.go} (98%) rename pkg/client/fetch/{fetch_summary_test.go => fetch_test.go} (100%) diff --git a/pkg/client/fetch/fetch_summary.go b/pkg/client/fetch/fetch.go similarity index 98% rename from pkg/client/fetch/fetch_summary.go rename to pkg/client/fetch/fetch.go index f0b239c832f2ff..e7deb1b8ad0c2a 100644 --- a/pkg/client/fetch/fetch_summary.go +++ b/pkg/client/fetch/fetch.go @@ -1,3 +1,4 @@ +// Package fetch interacts with core database and handles fetch summary routines. package fetch import ( diff --git a/pkg/client/fetch/fetch_summary_test.go b/pkg/client/fetch/fetch_test.go similarity index 100% rename from pkg/client/fetch/fetch_summary_test.go rename to pkg/client/fetch/fetch_test.go From b19dc9f6fb86ca5ae93aa2b7f18aff6404afc0fd Mon Sep 17 00:00:00 2001 From: Andrii Romanenko Date: Tue, 1 Feb 2022 17:23:43 +0200 Subject: [PATCH 11/19] fixed typo --- pkg/policy/execute_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/policy/execute_test.go b/pkg/policy/execute_test.go index 0953d9e40fe996..ce312f236077f3 100644 --- a/pkg/policy/execute_test.go +++ b/pkg/policy/execute_test.go @@ -354,7 +354,7 @@ func setupCheckFetchDatabase(db schema.QueryExecer, summary *fetch.Summary, c *f // Return conn and tear down func return nil, func(t *testing.T) { - err = db.Exec(context.Background(), fmt.Sprintf(`DELETE FROM "cloudquery"."fetches" WHERE "id" = '%summary';`, summary.FetchId.String())) + err = db.Exec(context.Background(), fmt.Sprintf(`DELETE FROM "cloudquery"."fetches" WHERE "id" = '%s';`, summary.FetchId.String())) assert.NoError(t, err) } } From c6291c1a8430ba02122291e2ce08691d6654a951 Mon Sep 17 00:00:00 2001 From: Andrii Romanenko Date: Mon, 7 Feb 2022 13:25:25 +0200 Subject: [PATCH 12/19] merge --- pkg/client/fetch.go | 0 pkg/client/fetch/fetch.go | 14 ++++++-------- pkg/policy/execute.go | 2 +- pkg/policy/execute_test.go | 4 ++-- 4 files changed, 9 insertions(+), 11 deletions(-) delete mode 100644 pkg/client/fetch.go diff --git a/pkg/client/fetch.go b/pkg/client/fetch.go deleted file mode 100644 index e69de29bb2d1d6..00000000000000 diff --git a/pkg/client/fetch/fetch.go b/pkg/client/fetch/fetch.go index e7deb1b8ad0c2a..c5f190ec5b4cf0 100644 --- a/pkg/client/fetch/fetch.go +++ b/pkg/client/fetch/fetch.go @@ -10,12 +10,14 @@ import ( "fmt" "time" + "github.com/cloudquery/cq-provider-sdk/provider/execution" + "github.com/cloudquery/cloudquery/pkg/client/database" "github.com/cloudquery/cq-provider-sdk/cqproto" "github.com/cloudquery/cq-provider-sdk/database/dsn" "github.com/cloudquery/cq-provider-sdk/migration/migrator" + "github.com/cloudquery/cq-provider-sdk/provider/diag" "github.com/cloudquery/cq-provider-sdk/provider/schema" - "github.com/cloudquery/cq-provider-sdk/provider/schema/diag" "github.com/doug-martin/goqu/v9" "github.com/georgysavva/scany/pgxscan" "github.com/golang-migrate/migrate/v4" @@ -30,11 +32,11 @@ var ( ) type Client struct { - db schema.QueryExecer + db execution.QueryExecer Logger hclog.Logger } -func NewClient(db schema.QueryExecer, logger hclog.Logger) *Client { +func NewClient(db execution.QueryExecer, logger hclog.Logger) *Client { return &Client{ db: db, Logger: logger, @@ -128,7 +130,7 @@ func (c *Client) GetForProvider(ctx context.Context, provider string) (*Summary, } func (c *Client) MigrateCore(ctx context.Context, de database.DialectExecutor) error { - err := createCoreSchema(ctx, c.db) + err := c.db.Exec(ctx, "CREATE SCHEMA IF NOT EXISTS cloudquery") if err != nil { return err } @@ -162,7 +164,3 @@ func (c *Client) MigrateCore(ctx context.Context, de database.DialectExecutor) e } return nil } - -func createCoreSchema(ctx context.Context, db schema.QueryExecer) error { - return db.Exec(ctx, "CREATE SCHEMA IF NOT EXISTS cloudquery") -} diff --git a/pkg/policy/execute.go b/pkg/policy/execute.go index ec904a70af8330..83981f39b3a675 100644 --- a/pkg/policy/execute.go +++ b/pkg/policy/execute.go @@ -10,7 +10,7 @@ import ( "time" "github.com/cloudquery/cloudquery/pkg/client/fetch" - "github.com/cloudquery/cq-provider-sdk/provider/schema" + "github.com/cloudquery/cq-provider-sdk/provider/execution" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-version" "github.com/spf13/afero" diff --git a/pkg/policy/execute_test.go b/pkg/policy/execute_test.go index 9b9c2e0db9590c..0b361d8c337fb5 100644 --- a/pkg/policy/execute_test.go +++ b/pkg/policy/execute_test.go @@ -11,7 +11,7 @@ import ( "github.com/cloudquery/cloudquery/pkg/client/fetch" "github.com/cloudquery/cloudquery/pkg/client/history" sdkdb "github.com/cloudquery/cq-provider-sdk/database" - "github.com/cloudquery/cq-provider-sdk/provider/schema" + "github.com/cloudquery/cq-provider-sdk/provider/execution" "github.com/google/uuid" "github.com/hashicorp/go-hclog" "github.com/stretchr/testify/assert" @@ -345,7 +345,7 @@ func TestExecutor_Execute(t *testing.T) { } } -func setupCheckFetchDatabase(db schema.QueryExecer, summary *fetch.Summary, c *fetch.Client) (error, func(t *testing.T)) { +func setupCheckFetchDatabase(db execution.QueryExecer, summary *fetch.Summary, c *fetch.Client) (error, func(t *testing.T)) { if summary == nil { return nil, func(t *testing.T) {} } From 30a4a9a56399ded2cc1ff05d9de260c3c46bdf78 Mon Sep 17 00:00:00 2001 From: Andrii Romanenko Date: Mon, 7 Feb 2022 21:37:56 +0200 Subject: [PATCH 13/19] package renamed --- pkg/client/client.go | 20 ++--- .../migrations/postgres/1_v0.19.2.down.sql | 1 - .../migrations/timescale/1_v0.19.2.down.sql | 1 - pkg/client/meta_storage/client.go | 69 +++++++++++++++ .../fetch_summary.go} | 87 +++---------------- .../fetch_summary_test.go} | 22 ++--- .../migrations/postgres/1_v0.19.2.down.sql | 1 + .../migrations/postgres/1_v0.19.2.up.sql | 0 .../migrations/postgres/2_v0.19.3.down.sql | 0 .../migrations/postgres/2_v0.19.3.up.sql | 0 .../migrations/timescale/1_v0.19.2.down.sql | 1 + .../migrations/timescale/1_v0.19.2.up.sql | 0 .../migrations/timescale/2_v0.19.3.down.sql | 0 .../migrations/timescale/2_v0.19.3.up.sql | 0 pkg/policy/execute.go | 6 +- pkg/policy/execute_test.go | 24 ++--- 16 files changed, 118 insertions(+), 114 deletions(-) delete mode 100644 pkg/client/fetch/migrations/postgres/1_v0.19.2.down.sql delete mode 100644 pkg/client/fetch/migrations/timescale/1_v0.19.2.down.sql create mode 100644 pkg/client/meta_storage/client.go rename pkg/client/{fetch/fetch.go => meta_storage/fetch_summary.go} (56%) rename pkg/client/{fetch/fetch_test.go => meta_storage/fetch_summary_test.go} (85%) create mode 100644 pkg/client/meta_storage/migrations/postgres/1_v0.19.2.down.sql rename pkg/client/{fetch => meta_storage}/migrations/postgres/1_v0.19.2.up.sql (100%) rename pkg/client/{fetch => meta_storage}/migrations/postgres/2_v0.19.3.down.sql (100%) rename pkg/client/{fetch => meta_storage}/migrations/postgres/2_v0.19.3.up.sql (100%) create mode 100644 pkg/client/meta_storage/migrations/timescale/1_v0.19.2.down.sql rename pkg/client/{fetch => meta_storage}/migrations/timescale/1_v0.19.2.up.sql (100%) rename pkg/client/{fetch => meta_storage}/migrations/timescale/2_v0.19.3.down.sql (100%) rename pkg/client/{fetch => meta_storage}/migrations/timescale/2_v0.19.3.up.sql (100%) diff --git a/pkg/client/client.go b/pkg/client/client.go index 60b476dda1e3c2..7c478c648c3d39 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -16,8 +16,8 @@ import ( "github.com/cloudquery/cloudquery/internal/telemetry" "github.com/cloudquery/cloudquery/pkg/client/database" "github.com/cloudquery/cloudquery/pkg/client/database/timescale" - "github.com/cloudquery/cloudquery/pkg/client/fetch" "github.com/cloudquery/cloudquery/pkg/client/history" + "github.com/cloudquery/cloudquery/pkg/client/meta_storage" "github.com/cloudquery/cloudquery/pkg/config" "github.com/cloudquery/cloudquery/pkg/module" "github.com/cloudquery/cloudquery/pkg/module/drift" @@ -245,10 +245,10 @@ type Client struct { // HistoryConfig defines configuration for CloudQuery history mode HistoryCfg *history.Config - // fetchSummaryClient interacts with cloudquery core resources - fetchSummaryClient *fetch.Client - db *sdkdb.DB - dialectExecutor database.DialectExecutor + // metaStorage interacts with cloudquery core resources + metaStorage *meta_storage.Client + db *sdkdb.DB + dialectExecutor database.DialectExecutor } func New(ctx context.Context, options ...Option) (*Client, error) { @@ -416,7 +416,7 @@ func (c *Client) Fetch(ctx context.Context, request FetchRequest) (res *FetchRes for _, providerConfig := range request.Providers { providerConfig := providerConfig createdAt := time.Now().UTC() - fetchSummary := fetch.Summary{ + fetchSummary := meta_storage.FetchSummary{ FetchId: fetchId, ProviderName: providerConfig.Name, ProviderAlias: providerConfig.Alias, @@ -424,7 +424,7 @@ func (c *Client) Fetch(ctx context.Context, request FetchRequest) (res *FetchRes CoreVersion: Version, } saveFetchSummary := func() { - if err := c.fetchSummaryClient.Save(ctx, &fetchSummary); err != nil { + if err := c.metaStorage.SaveFetchSummary(ctx, &fetchSummary); err != nil { c.Logger.Error("failed to save fetch summary", "err", err) } } @@ -547,7 +547,7 @@ func (c *Client) Fetch(ctx context.Context, request FetchRequest) (res *FetchRes request.UpdateCallback(update) } - fetchSummary.Resources = append(fetchSummary.Resources, fetch.ResourceSummary{ + fetchSummary.Resources = append(fetchSummary.Resources, meta_storage.ResourceFetchSummary{ ResourceName: resp.ResourceName, FinishedResources: resp.FinishedResources, Status: strconv.Itoa(int(resp.Summary.Status)), // todo use human readable representation of status @@ -1119,9 +1119,9 @@ func (c *Client) initDatabase(ctx context.Context) error { c.Logger.Warn("database validation warning") } - c.fetchSummaryClient = fetch.NewClient(c.db, c.Logger) + c.metaStorage = meta_storage.NewClient(c.db, c.Logger) // migrate cloudquery core tables to latest version - if err := c.fetchSummaryClient.MigrateCore(ctx, c.dialectExecutor); err != nil { + if err := c.metaStorage.MigrateCore(ctx, c.dialectExecutor); err != nil { return fmt.Errorf("failed to migrate cloudquery_core tables: %w", err) } diff --git a/pkg/client/fetch/migrations/postgres/1_v0.19.2.down.sql b/pkg/client/fetch/migrations/postgres/1_v0.19.2.down.sql deleted file mode 100644 index c22d574b4800f1..00000000000000 --- a/pkg/client/fetch/migrations/postgres/1_v0.19.2.down.sql +++ /dev/null @@ -1 +0,0 @@ -DROP TABLE cq_fetches; diff --git a/pkg/client/fetch/migrations/timescale/1_v0.19.2.down.sql b/pkg/client/fetch/migrations/timescale/1_v0.19.2.down.sql deleted file mode 100644 index c22d574b4800f1..00000000000000 --- a/pkg/client/fetch/migrations/timescale/1_v0.19.2.down.sql +++ /dev/null @@ -1 +0,0 @@ -DROP TABLE cq_fetches; diff --git a/pkg/client/meta_storage/client.go b/pkg/client/meta_storage/client.go new file mode 100644 index 00000000000000..d535a9eb54529d --- /dev/null +++ b/pkg/client/meta_storage/client.go @@ -0,0 +1,69 @@ +// Package meta_storage interacts with core database schema and stores cloudquery metadata such as fetch summaries +package meta_storage + +import ( + "context" + "embed" + "fmt" + + "github.com/cloudquery/cloudquery/pkg/client/database" + "github.com/cloudquery/cq-provider-sdk/database/dsn" + "github.com/cloudquery/cq-provider-sdk/migration/migrator" + "github.com/cloudquery/cq-provider-sdk/provider/execution" + "github.com/cloudquery/cq-provider-sdk/provider/schema" + "github.com/golang-migrate/migrate/v4" + "github.com/hashicorp/go-hclog" +) + +var ( + //go:embed migrations/*/*.sql + coreMigrations embed.FS +) + +type Client struct { + db execution.QueryExecer + Logger hclog.Logger +} + +func NewClient(db execution.QueryExecer, logger hclog.Logger) *Client { + return &Client{ + db: db, + Logger: logger, + } +} + +func (c *Client) MigrateCore(ctx context.Context, de database.DialectExecutor) error { + err := c.db.Exec(ctx, "CREATE SCHEMA IF NOT EXISTS cloudquery") + if err != nil { + return err + } + + newDSN, err := de.Setup(ctx) + if err != nil { + return err + } + + migrations, err := migrator.ReadMigrationFiles(c.Logger, coreMigrations) + if err != nil { + return err + } + newDSN, err = dsn.SetDSNElement(newDSN, map[string]string{"search_path": "cloudquery"}) + if err != nil { + return err + } + m, err := migrator.New(c.Logger, schema.Postgres, migrations, newDSN, "cloudquery_core", nil) + if err != nil { + return err + } + + defer func() { + if err := m.Close(); err != nil { + c.Logger.Error("failed to close migrator connection", "error", err) + } + }() + + if err := m.UpgradeProvider(migrator.Latest); err != nil && err != migrate.ErrNoChange { + return fmt.Errorf("failed to migrate cloudquery core schema: %w", err) + } + return nil +} diff --git a/pkg/client/fetch/fetch.go b/pkg/client/meta_storage/fetch_summary.go similarity index 56% rename from pkg/client/fetch/fetch.go rename to pkg/client/meta_storage/fetch_summary.go index c5f190ec5b4cf0..4c55878d25a940 100644 --- a/pkg/client/fetch/fetch.go +++ b/pkg/client/meta_storage/fetch_summary.go @@ -1,51 +1,22 @@ -// Package fetch interacts with core database and handles fetch summary routines. -package fetch +package meta_storage import ( "context" "database/sql/driver" - "embed" "encoding/json" "errors" - "fmt" "time" - "github.com/cloudquery/cq-provider-sdk/provider/execution" - - "github.com/cloudquery/cloudquery/pkg/client/database" "github.com/cloudquery/cq-provider-sdk/cqproto" - "github.com/cloudquery/cq-provider-sdk/database/dsn" - "github.com/cloudquery/cq-provider-sdk/migration/migrator" "github.com/cloudquery/cq-provider-sdk/provider/diag" - "github.com/cloudquery/cq-provider-sdk/provider/schema" "github.com/doug-martin/goqu/v9" "github.com/georgysavva/scany/pgxscan" - "github.com/golang-migrate/migrate/v4" "github.com/google/uuid" - "github.com/hashicorp/go-hclog" "github.com/jackc/pgx/v4" ) -var ( - //go:embed migrations/*/*.sql - coreMigrations embed.FS -) - -type Client struct { - db execution.QueryExecer - Logger hclog.Logger -} - -func NewClient(db execution.QueryExecer, logger hclog.Logger) *Client { - return &Client{ - db: db, - Logger: logger, - } -} - -// Summary includes a summarized report of fetch, such as fetch id, fetch start and finish, -// resources fetch results -type Summary struct { +// FetchSummary includes a summarized report of fetch, such as fetch id, fetch start and finish, resources fetch results +type FetchSummary struct { CqId uuid.UUID `db:"id"` // Unique Id of fetch session FetchId uuid.UUID `db:"fetch_id"` @@ -62,7 +33,7 @@ type Summary struct { Resources ResourceFetchSummaries `db:"results"` } -type ResourceFetchSummaries []ResourceSummary +type ResourceFetchSummaries []ResourceFetchSummary // Value implements Valuer interface required by goqu func (r ResourceFetchSummaries) Value() (driver.Value, error) { @@ -72,8 +43,8 @@ func (r ResourceFetchSummaries) Value() (driver.Value, error) { return json.Marshal(r) } -// ResourceSummary includes a data about fetching specific resource -type ResourceSummary struct { +// ResourceFetchSummary includes a data about fetching specific resource +type ResourceFetchSummary struct { ResourceName string `json:"resource_name"` // map of resources that have finished fetching FinishedResources map[string]bool `json:"finished_resources"` @@ -91,8 +62,8 @@ type ResourceSummary struct { Diagnostics diag.Diagnostics `json:"diagnostics"` } -// Save saves fetch summary into fetches database -func (c *Client) Save(ctx context.Context, fs *Summary) error { +// SaveFetchSummary saves fetch summary into fetches database +func (c *Client) SaveFetchSummary(ctx context.Context, fs *FetchSummary) error { id, err := uuid.NewUUID() if err != nil { return err @@ -106,8 +77,8 @@ func (c *Client) Save(ctx context.Context, fs *Summary) error { return c.db.Exec(ctx, sql, args...) } -// GetForProvider gets latest fetch summary for specific provider -func (c *Client) GetForProvider(ctx context.Context, provider string) (*Summary, error) { +// GetFetchSummaryForProvider gets latest fetch summary for specific provider +func (c *Client) GetFetchSummaryForProvider(ctx context.Context, provider string) (*FetchSummary, error) { q := goqu.Dialect("postgres"). Select("provider_version", "is_success"). From("cloudquery.fetches"). @@ -118,7 +89,7 @@ func (c *Client) GetForProvider(ctx context.Context, provider string) (*Summary, if err != nil { return nil, err } - var data Summary + var data FetchSummary err = pgxscan.Get(ctx, c.db, &data, sql) if err != nil { if errors.Is(err, pgx.ErrNoRows) { @@ -128,39 +99,3 @@ func (c *Client) GetForProvider(ctx context.Context, provider string) (*Summary, } return &data, nil } - -func (c *Client) MigrateCore(ctx context.Context, de database.DialectExecutor) error { - err := c.db.Exec(ctx, "CREATE SCHEMA IF NOT EXISTS cloudquery") - if err != nil { - return err - } - - newDSN, err := de.Setup(ctx) - if err != nil { - return err - } - - migrations, err := migrator.ReadMigrationFiles(c.Logger, coreMigrations) - if err != nil { - return err - } - newDSN, err = dsn.SetDSNElement(newDSN, map[string]string{"search_path": "cloudquery"}) - if err != nil { - return err - } - m, err := migrator.New(c.Logger, schema.Postgres, migrations, newDSN, "cloudquery_core", nil) - if err != nil { - return err - } - - defer func() { - if err := m.Close(); err != nil { - c.Logger.Error("failed to close migrator connection", "error", err) - } - }() - - if err := m.UpgradeProvider(migrator.Latest); err != nil && err != migrate.ErrNoChange { - return fmt.Errorf("failed to migrate cloudquery core schema: %w", err) - } - return nil -} diff --git a/pkg/client/fetch/fetch_test.go b/pkg/client/meta_storage/fetch_summary_test.go similarity index 85% rename from pkg/client/fetch/fetch_test.go rename to pkg/client/meta_storage/fetch_summary_test.go index 52d0698d174fe3..0ea35d06f0d0d7 100644 --- a/pkg/client/fetch/fetch_test.go +++ b/pkg/client/meta_storage/fetch_summary_test.go @@ -1,4 +1,4 @@ -package fetch +package meta_storage import ( "context" @@ -18,22 +18,22 @@ import ( const testDBConnection = "postgres://postgres:pass@localhost:5432/postgres?sslmode=disable" type fetchSummaryTest struct { - summary Summary + summary FetchSummary err error skipFetchId bool } var fetchSummaryTests = []fetchSummaryTest{ { - summary: Summary{ + summary: FetchSummary{ ProviderName: "test", ProviderVersion: "v0.0.0", }, }, { - summary: Summary{ + summary: FetchSummary{ ProviderName: "test1", - Resources: []ResourceSummary{ + Resources: []ResourceFetchSummary{ { ResourceName: "test", ResourceCount: 99, @@ -42,15 +42,15 @@ var fetchSummaryTests = []fetchSummaryTest{ }, }, { - summary: Summary{ + summary: FetchSummary{ ProviderName: "test2", ProviderVersion: "v0.0.1", }, }, { - summary: Summary{ + summary: FetchSummary{ ProviderName: "test4", - Resources: []ResourceSummary{ + Resources: []ResourceFetchSummary{ { ResourceName: "test", ResourceCount: 99, @@ -63,14 +63,14 @@ var fetchSummaryTests = []fetchSummaryTest{ }, }, { - summary: Summary{ + summary: FetchSummary{ ProviderName: "test2", ProviderVersion: "v0.0.1", }, err: errors.New("ERROR: duplicate key value violates unique constraint \"fetches_pk\" (SQLSTATE 23505)"), }, { - summary: Summary{ + summary: FetchSummary{ ProviderName: "test3", ProviderVersion: "v0.0.1", }, @@ -101,7 +101,7 @@ func TestFetchSaveSummary(t *testing.T) { } start := time.Now() f.summary.Start = &start - err := fetchSummaryClient.Save(context.Background(), &f.summary) + err := fetchSummaryClient.SaveFetchSummary(context.Background(), &f.summary) if f.err != nil { assert.EqualError(t, err, f.err.Error()) } else { diff --git a/pkg/client/meta_storage/migrations/postgres/1_v0.19.2.down.sql b/pkg/client/meta_storage/migrations/postgres/1_v0.19.2.down.sql new file mode 100644 index 00000000000000..de917492044789 --- /dev/null +++ b/pkg/client/meta_storage/migrations/postgres/1_v0.19.2.down.sql @@ -0,0 +1 @@ +DROP TABLE fetches; diff --git a/pkg/client/fetch/migrations/postgres/1_v0.19.2.up.sql b/pkg/client/meta_storage/migrations/postgres/1_v0.19.2.up.sql similarity index 100% rename from pkg/client/fetch/migrations/postgres/1_v0.19.2.up.sql rename to pkg/client/meta_storage/migrations/postgres/1_v0.19.2.up.sql diff --git a/pkg/client/fetch/migrations/postgres/2_v0.19.3.down.sql b/pkg/client/meta_storage/migrations/postgres/2_v0.19.3.down.sql similarity index 100% rename from pkg/client/fetch/migrations/postgres/2_v0.19.3.down.sql rename to pkg/client/meta_storage/migrations/postgres/2_v0.19.3.down.sql diff --git a/pkg/client/fetch/migrations/postgres/2_v0.19.3.up.sql b/pkg/client/meta_storage/migrations/postgres/2_v0.19.3.up.sql similarity index 100% rename from pkg/client/fetch/migrations/postgres/2_v0.19.3.up.sql rename to pkg/client/meta_storage/migrations/postgres/2_v0.19.3.up.sql diff --git a/pkg/client/meta_storage/migrations/timescale/1_v0.19.2.down.sql b/pkg/client/meta_storage/migrations/timescale/1_v0.19.2.down.sql new file mode 100644 index 00000000000000..de917492044789 --- /dev/null +++ b/pkg/client/meta_storage/migrations/timescale/1_v0.19.2.down.sql @@ -0,0 +1 @@ +DROP TABLE fetches; diff --git a/pkg/client/fetch/migrations/timescale/1_v0.19.2.up.sql b/pkg/client/meta_storage/migrations/timescale/1_v0.19.2.up.sql similarity index 100% rename from pkg/client/fetch/migrations/timescale/1_v0.19.2.up.sql rename to pkg/client/meta_storage/migrations/timescale/1_v0.19.2.up.sql diff --git a/pkg/client/fetch/migrations/timescale/2_v0.19.3.down.sql b/pkg/client/meta_storage/migrations/timescale/2_v0.19.3.down.sql similarity index 100% rename from pkg/client/fetch/migrations/timescale/2_v0.19.3.down.sql rename to pkg/client/meta_storage/migrations/timescale/2_v0.19.3.down.sql diff --git a/pkg/client/fetch/migrations/timescale/2_v0.19.3.up.sql b/pkg/client/meta_storage/migrations/timescale/2_v0.19.3.up.sql similarity index 100% rename from pkg/client/fetch/migrations/timescale/2_v0.19.3.up.sql rename to pkg/client/meta_storage/migrations/timescale/2_v0.19.3.up.sql diff --git a/pkg/policy/execute.go b/pkg/policy/execute.go index 83981f39b3a675..18b7e24398441a 100644 --- a/pkg/policy/execute.go +++ b/pkg/policy/execute.go @@ -9,7 +9,7 @@ import ( "strings" "time" - "github.com/cloudquery/cloudquery/pkg/client/fetch" + "github.com/cloudquery/cloudquery/pkg/client/meta_storage" "github.com/cloudquery/cq-provider-sdk/provider/execution" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-version" @@ -182,13 +182,13 @@ func (e *Executor) checkFetches(ctx context.Context, policyConfig *Configuration if policyConfig == nil { return nil } - fsClient := fetch.NewClient(e.conn, e.log) + metaStorage := meta_storage.NewClient(e.conn, e.log) for _, p := range policyConfig.Providers { c, err := version.NewConstraint(p.Version) if err != nil { return fmt.Errorf("failed to parse version constraint for provider %s: %w", p.Type, err) } - fetchSummary, err := fsClient.GetForProvider(ctx, p.Type) + fetchSummary, err := metaStorage.GetFetchSummaryForProvider(ctx, p.Type) if err != nil { return fmt.Errorf("failed to get fetch summary for provider %s: %w", p.Type, err) } diff --git a/pkg/policy/execute_test.go b/pkg/policy/execute_test.go index 0b361d8c337fb5..97b9ae65af5798 100644 --- a/pkg/policy/execute_test.go +++ b/pkg/policy/execute_test.go @@ -8,8 +8,8 @@ import ( "time" "github.com/cloudquery/cloudquery/pkg/client/database" - "github.com/cloudquery/cloudquery/pkg/client/fetch" "github.com/cloudquery/cloudquery/pkg/client/history" + "github.com/cloudquery/cloudquery/pkg/client/meta_storage" sdkdb "github.com/cloudquery/cq-provider-sdk/database" "github.com/cloudquery/cq-provider-sdk/provider/execution" "github.com/google/uuid" @@ -345,7 +345,7 @@ func TestExecutor_Execute(t *testing.T) { } } -func setupCheckFetchDatabase(db execution.QueryExecer, summary *fetch.Summary, c *fetch.Client) (error, func(t *testing.T)) { +func setupCheckFetchDatabase(db execution.QueryExecer, summary *meta_storage.FetchSummary, c *meta_storage.Client) (error, func(t *testing.T)) { if summary == nil { return nil, func(t *testing.T) {} } @@ -353,7 +353,7 @@ func setupCheckFetchDatabase(db execution.QueryExecer, summary *fetch.Summary, c summary.FetchId = uuid.New() finish := time.Now().UTC() summary.Finish = &finish - err := c.Save(context.Background(), summary) + err := c.SaveFetchSummary(context.Background(), summary) if err != nil { return err, nil } @@ -370,14 +370,14 @@ func TestExecutor_CheckFetches(t *testing.T) { db, err := sdkdb.New(context.Background(), hclog.NewNullLogger(), testDBConnection) assert.NoError(t, err) - fetchSummaryClient := fetch.NewClient(db, hclog.NewNullLogger()) + metaStorage := meta_storage.NewClient(db, hclog.NewNullLogger()) _, de, err := database.GetExecutor(hclog.NewNullLogger(), testDBConnection, &history.Config{}) if err != nil { t.Fatal(fmt.Errorf("getExecutor: %w", err)) } - err = fetchSummaryClient.MigrateCore(context.Background(), de) + err = metaStorage.MigrateCore(context.Background(), de) assert.NoError(t, err) executor := NewExecutor(db, hclog.Default(), nil) @@ -387,7 +387,7 @@ func TestExecutor_CheckFetches(t *testing.T) { cases := []struct { Name string Config Configuration - f *fetch.Summary + f *meta_storage.FetchSummary err error }{ { @@ -397,7 +397,7 @@ func TestExecutor_CheckFetches(t *testing.T) { {Type: "test1", Version: "~> v0.2.0"}, }, }, - f: &fetch.Summary{ProviderName: "test1", ProviderVersion: "v0.2.3", Finish: &finish, IsSuccess: true}, + f: &meta_storage.FetchSummary{ProviderName: "test1", ProviderVersion: "v0.2.3", Finish: &finish, IsSuccess: true}, err: nil, }, { @@ -415,7 +415,7 @@ func TestExecutor_CheckFetches(t *testing.T) { {Type: "no_finish", Version: "~> v0.2.0"}, }, }, - f: &fetch.Summary{ProviderName: "test3", ProviderVersion: "v0.2.3", IsSuccess: false}, + f: &meta_storage.FetchSummary{ProviderName: "test3", ProviderVersion: "v0.2.3", IsSuccess: false}, err: errors.New("failed to get fetch summary for provider no_finish: there is no successful fetch for requested provider"), }, { @@ -425,7 +425,7 @@ func TestExecutor_CheckFetches(t *testing.T) { {Type: "test3", Version: "~> v0.2.0"}, }, }, - f: &fetch.Summary{ProviderName: "test3", ProviderVersion: "v0.2.3", Finish: &finish, IsSuccess: false}, + f: &meta_storage.FetchSummary{ProviderName: "test3", ProviderVersion: "v0.2.3", Finish: &finish, IsSuccess: false}, err: errors.New("last fetch for provider test3 wasn't successful"), }, { @@ -435,7 +435,7 @@ func TestExecutor_CheckFetches(t *testing.T) { {Type: "test4", Version: "~> v0.3.0"}, }, }, - f: &fetch.Summary{ProviderName: "test4", ProviderVersion: "v0.2.3", Finish: &finish, IsSuccess: true}, + f: &meta_storage.FetchSummary{ProviderName: "test4", ProviderVersion: "v0.2.3", Finish: &finish, IsSuccess: true}, err: errors.New("the latest fetch for provider test4 does not satisfy version requirement ~> v0.3.0"), }, { @@ -445,14 +445,14 @@ func TestExecutor_CheckFetches(t *testing.T) { {Type: "test4", Version: ""}, }, }, - f: &fetch.Summary{ProviderName: "test4", ProviderVersion: "v0.2.3", Finish: &finish, IsSuccess: true}, + f: &meta_storage.FetchSummary{ProviderName: "test4", ProviderVersion: "v0.2.3", Finish: &finish, IsSuccess: true}, err: errors.New("failed to parse version constraint for provider test4: Malformed constraint: "), }, } for _, tc := range cases { t.Run(tc.Name, func(t *testing.T) { - err, clear := setupCheckFetchDatabase(db, tc.f, fetchSummaryClient) + err, clear := setupCheckFetchDatabase(db, tc.f, metaStorage) assert.NoError(t, err) err = executor.checkFetches(context.Background(), &tc.Config) From 29ecd47a537e5199b4767720f188386a376393a2 Mon Sep 17 00:00:00 2001 From: Andrii Romanenko Date: Tue, 8 Feb 2022 09:36:15 +0200 Subject: [PATCH 14/19] removed timescale migrations --- .../migrations/timescale/1_v0.19.2.down.sql | 1 - .../migrations/timescale/1_v0.19.2.up.sql | 16 ---------------- .../migrations/timescale/2_v0.19.3.down.sql | 5 ----- .../migrations/timescale/2_v0.19.3.up.sql | 10 ---------- 4 files changed, 32 deletions(-) delete mode 100644 pkg/client/meta_storage/migrations/timescale/1_v0.19.2.down.sql delete mode 100644 pkg/client/meta_storage/migrations/timescale/1_v0.19.2.up.sql delete mode 100644 pkg/client/meta_storage/migrations/timescale/2_v0.19.3.down.sql delete mode 100644 pkg/client/meta_storage/migrations/timescale/2_v0.19.3.up.sql diff --git a/pkg/client/meta_storage/migrations/timescale/1_v0.19.2.down.sql b/pkg/client/meta_storage/migrations/timescale/1_v0.19.2.down.sql deleted file mode 100644 index de917492044789..00000000000000 --- a/pkg/client/meta_storage/migrations/timescale/1_v0.19.2.down.sql +++ /dev/null @@ -1 +0,0 @@ -DROP TABLE fetches; diff --git a/pkg/client/meta_storage/migrations/timescale/1_v0.19.2.up.sql b/pkg/client/meta_storage/migrations/timescale/1_v0.19.2.up.sql deleted file mode 100644 index ab3f35fc1b2d7a..00000000000000 --- a/pkg/client/meta_storage/migrations/timescale/1_v0.19.2.up.sql +++ /dev/null @@ -1,16 +0,0 @@ -CREATE TABLE IF NOT EXISTS fetches -( - id UUID NOT NULL, - fetch_id UUID NOT NULL, - START TIMESTAMP, - finish TIMESTAMP, - total_resource_count BIGINT, - total_errors_count BIGINT, - provider_name TEXT, - provider_version TEXT, - is_success BOOLEAN, - results jsonb, - CONSTRAINT "fetches_id" PRIMARY KEY (id), - CONSTRAINT "fetches_pk" UNIQUE (fetch_id, provider_name), - CONSTRAINT "non_nil_fetch_id" CHECK (fetch_id != '00000000-0000-0000-0000-000000000000') -); \ No newline at end of file diff --git a/pkg/client/meta_storage/migrations/timescale/2_v0.19.3.down.sql b/pkg/client/meta_storage/migrations/timescale/2_v0.19.3.down.sql deleted file mode 100644 index fb0f9b44c67913..00000000000000 --- a/pkg/client/meta_storage/migrations/timescale/2_v0.19.3.down.sql +++ /dev/null @@ -1,5 +0,0 @@ -ALTER TABLE IF EXISTS fetches - DROP CONSTRAINT fetches_pk; -ALTER TABLE IF EXISTS fetches ADD CONSTRAINT "fetches_pk" UNIQUE (fetch_id, provider_name); -ALTER TABLE IF EXISTS fetches - DROP COLUMN provider_alias; \ No newline at end of file diff --git a/pkg/client/meta_storage/migrations/timescale/2_v0.19.3.up.sql b/pkg/client/meta_storage/migrations/timescale/2_v0.19.3.up.sql deleted file mode 100644 index 21a6dd24d0444e..00000000000000 --- a/pkg/client/meta_storage/migrations/timescale/2_v0.19.3.up.sql +++ /dev/null @@ -1,10 +0,0 @@ -ALTER TABLE IF EXISTS fetches - ADD COLUMN IF NOT EXISTS provider_alias TEXT; -ALTER TABLE IF EXISTS fetches - ADD COLUMN IF NOT EXISTS core_version TEXT; -ALTER TABLE IF EXISTS fetches - ADD COLUMN IF NOT EXISTS created_at TIMESTAMP; -ALTER TABLE IF EXISTS fetches - DROP CONSTRAINT fetches_pk; -ALTER TABLE IF EXISTS fetches - ADD CONSTRAINT "fetches_pk" UNIQUE (fetch_id, provider_name, provider_alias); \ No newline at end of file From f091b1e02b2953cce138463872f0d00c49e5bf89 Mon Sep 17 00:00:00 2001 From: Andrii Romanenko Date: Tue, 8 Feb 2022 15:46:15 +0200 Subject: [PATCH 15/19] Update pkg/policy/execute.go Co-authored-by: Kemal <223029+disq@users.noreply.github.com> --- pkg/policy/execute.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/policy/execute.go b/pkg/policy/execute.go index 18b7e24398441a..f6d9341809bfb5 100644 --- a/pkg/policy/execute.go +++ b/pkg/policy/execute.go @@ -134,7 +134,7 @@ func (e *Executor) Execute(ctx context.Context, req *ExecuteRequest, policy *Pol return nil, fmt.Errorf("%s: %w", policy.Name, err) } if err := e.checkFetches(ctx, policy.Config); err != nil { - return nil, fmt.Errorf("%s: %w, please run `cloudquery fetch` berfore running policy", policy.Name, err) + return nil, fmt.Errorf("%s: %w, please run `cloudquery fetch` before running policy", policy.Name, err) } if err := e.createViews(ctx, policy); err != nil { return nil, err From 9f870d5e826e16276858d8c2a83fc845aac65b8f Mon Sep 17 00:00:00 2001 From: Andrii Romanenko Date: Tue, 8 Feb 2022 15:46:23 +0200 Subject: [PATCH 16/19] Update pkg/policy/execute.go Co-authored-by: Kemal <223029+disq@users.noreply.github.com> --- pkg/policy/execute.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/policy/execute.go b/pkg/policy/execute.go index f6d9341809bfb5..ae016e1c356a1e 100644 --- a/pkg/policy/execute.go +++ b/pkg/policy/execute.go @@ -193,7 +193,7 @@ func (e *Executor) checkFetches(ctx context.Context, policyConfig *Configuration return fmt.Errorf("failed to get fetch summary for provider %s: %w", p.Type, err) } if fetchSummary == nil { - return fmt.Errorf("there is no finished fetches for provider %s", p.Type) + return fmt.Errorf("could not find finished fetches for provider %s", p.Type) } if !fetchSummary.IsSuccess { return fmt.Errorf("last fetch for provider %s wasn't successful", p.Type) From 13ca3d21704860472629bbf1fbe5603aa52e43fe Mon Sep 17 00:00:00 2001 From: Andrii Romanenko Date: Tue, 8 Feb 2022 15:46:29 +0200 Subject: [PATCH 17/19] Update pkg/client/meta_storage/fetch_summary.go Co-authored-by: Kemal <223029+disq@users.noreply.github.com> --- pkg/client/meta_storage/fetch_summary.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/client/meta_storage/fetch_summary.go b/pkg/client/meta_storage/fetch_summary.go index 4c55878d25a940..e523fccff66c2f 100644 --- a/pkg/client/meta_storage/fetch_summary.go +++ b/pkg/client/meta_storage/fetch_summary.go @@ -93,7 +93,7 @@ func (c *Client) GetFetchSummaryForProvider(ctx context.Context, provider string err = pgxscan.Get(ctx, c.db, &data, sql) if err != nil { if errors.Is(err, pgx.ErrNoRows) { - return nil, errors.New("there is no successful fetch for requested provider") + return nil, errors.New("could not find a completed fetch for requested provider") } return nil, err } From 2cc1be77c4991a64d928e48478e4e1bc15610813 Mon Sep 17 00:00:00 2001 From: Andrii Romanenko Date: Tue, 8 Feb 2022 17:03:50 +0200 Subject: [PATCH 18/19] tests fix --- pkg/policy/execute_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/policy/execute_test.go b/pkg/policy/execute_test.go index 97b9ae65af5798..2295665223e5d2 100644 --- a/pkg/policy/execute_test.go +++ b/pkg/policy/execute_test.go @@ -407,7 +407,7 @@ func TestExecutor_CheckFetches(t *testing.T) { {Type: "test2", Version: "~> v0.2.0"}, }, }, - err: errors.New("failed to get fetch summary for provider test2: there is no successful fetch for requested provider"), + err: errors.New("failed to get fetch summary for provider test2: could not find a completed fetch for requested provider"), }, { Name: "no finished fetches", Config: Configuration{ @@ -416,7 +416,7 @@ func TestExecutor_CheckFetches(t *testing.T) { }, }, f: &meta_storage.FetchSummary{ProviderName: "test3", ProviderVersion: "v0.2.3", IsSuccess: false}, - err: errors.New("failed to get fetch summary for provider no_finish: there is no successful fetch for requested provider"), + err: errors.New("failed to get fetch summary for provider no_finish: could not find a completed fetch for requested provider"), }, { Name: "no fetches", From 79bb5f2fc2b861c69a806ad1d5e015168074ece7 Mon Sep 17 00:00:00 2001 From: Andrii Romanenko Date: Mon, 7 Mar 2022 20:09:25 +0200 Subject: [PATCH 19/19] tests fix --- pkg/client/client.go | 3 +-- pkg/client/meta_storage/client.go | 2 +- pkg/policy/execute.go | 1 - 3 files changed, 2 insertions(+), 4 deletions(-) diff --git a/pkg/client/client.go b/pkg/client/client.go index 3f62d778e77b6e..0c73016756d2bc 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -30,7 +30,6 @@ import ( "github.com/cloudquery/cq-provider-sdk/database/dsn" "github.com/cloudquery/cq-provider-sdk/migration/migrator" "github.com/cloudquery/cq-provider-sdk/provider/diag" - "github.com/cloudquery/cq-provider-sdk/provider/execution" "github.com/cloudquery/cq-provider-sdk/provider/schema" "github.com/getsentry/sentry-go" "github.com/golang-migrate/migrate/v4" @@ -541,7 +540,7 @@ func (c *Client) Fetch(ctx context.Context, request FetchRequest) (res *FetchRes request.UpdateCallback(update) } - fetchSummary.Resources = append(fetchSummary.Resources, ResourceFetchSummary{ + fetchSummary.Resources = append(fetchSummary.Resources, meta_storage.ResourceFetchSummary{ ResourceName: resp.ResourceName, FinishedResources: resp.FinishedResources, Status: resp.Summary.Status.String(), diff --git a/pkg/client/meta_storage/client.go b/pkg/client/meta_storage/client.go index d535a9eb54529d..9509dc901ea85e 100644 --- a/pkg/client/meta_storage/client.go +++ b/pkg/client/meta_storage/client.go @@ -51,7 +51,7 @@ func (c *Client) MigrateCore(ctx context.Context, de database.DialectExecutor) e if err != nil { return err } - m, err := migrator.New(c.Logger, schema.Postgres, migrations, newDSN, "cloudquery_core", nil) + m, err := migrator.New(c.Logger, schema.Postgres, migrations, newDSN, "cloudquery_core") if err != nil { return err } diff --git a/pkg/policy/execute.go b/pkg/policy/execute.go index 4566d646b69507..92e0dd5919748d 100644 --- a/pkg/policy/execute.go +++ b/pkg/policy/execute.go @@ -10,7 +10,6 @@ import ( "time" "github.com/cloudquery/cloudquery/pkg/client/meta_storage" - "github.com/cloudquery/cq-provider-sdk/provider/execution" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-version" "github.com/spf13/afero"