diff --git a/pkg/client/client.go b/pkg/client/client.go index 9d77734600fd76..0c73016756d2bc 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -2,7 +2,6 @@ package client import ( "context" - "embed" "errors" "fmt" "io" @@ -18,6 +17,7 @@ import ( "github.com/cloudquery/cloudquery/pkg/client/database" "github.com/cloudquery/cloudquery/pkg/client/database/timescale" "github.com/cloudquery/cloudquery/pkg/client/history" + "github.com/cloudquery/cloudquery/pkg/client/meta_storage" "github.com/cloudquery/cloudquery/pkg/config" "github.com/cloudquery/cloudquery/pkg/module" "github.com/cloudquery/cloudquery/pkg/module/drift" @@ -30,7 +30,6 @@ import ( "github.com/cloudquery/cq-provider-sdk/database/dsn" "github.com/cloudquery/cq-provider-sdk/migration/migrator" "github.com/cloudquery/cq-provider-sdk/provider/diag" - "github.com/cloudquery/cq-provider-sdk/provider/execution" "github.com/cloudquery/cq-provider-sdk/provider/schema" "github.com/getsentry/sentry-go" "github.com/golang-migrate/migrate/v4" @@ -49,8 +48,6 @@ import ( var ( ErrMigrationsNotSupported = errors.New("provider doesn't support migrations") - //go:embed migrations/*/*.sql - coreMigrations embed.FS ) // FetchRequest is provided to the Client to execute a fetch on one or more providers @@ -240,6 +237,8 @@ type Client struct { // HistoryConfig defines configuration for CloudQuery history mode HistoryCfg *history.Config + // metaStorage interacts with cloudquery core resources + metaStorage *meta_storage.Client db *sdkdb.DB dialectExecutor database.DialectExecutor } @@ -409,7 +408,7 @@ func (c *Client) Fetch(ctx context.Context, request FetchRequest) (res *FetchRes for _, providerConfig := range request.Providers { providerConfig := providerConfig createdAt := time.Now().UTC() - fetchSummary := FetchSummary{ + fetchSummary := meta_storage.FetchSummary{ FetchId: fetchId, ProviderName: providerConfig.Name, ProviderAlias: providerConfig.Alias, @@ -417,7 +416,7 @@ func (c *Client) Fetch(ctx context.Context, request FetchRequest) (res *FetchRes CoreVersion: Version, } saveFetchSummary := func() { - if err := c.SaveFetchSummary(ctx, &fetchSummary); err != nil { + if err := c.metaStorage.SaveFetchSummary(ctx, &fetchSummary); err != nil { c.Logger.Error("failed to save fetch summary", "err", err) } } @@ -541,7 +540,7 @@ func (c *Client) Fetch(ctx context.Context, request FetchRequest) (res *FetchRes request.UpdateCallback(update) } - fetchSummary.Resources = append(fetchSummary.Resources, ResourceFetchSummary{ + fetchSummary.Resources = append(fetchSummary.Resources, meta_storage.ResourceFetchSummary{ ResourceName: resp.ResourceName, FinishedResources: resp.FinishedResources, Status: resp.Summary.Status.String(), @@ -970,42 +969,6 @@ func (c *Client) buildProviderMigrator(ctx context.Context, migrations map[strin return m, providerConfig, err } -func (c *Client) MigrateCore(ctx context.Context, de database.DialectExecutor) error { - err := createCoreSchema(ctx, c.db) - if err != nil { - return err - } - - newDSN, err := de.Setup(ctx) - if err != nil { - return err - } - - migrations, err := migrator.ReadMigrationFiles(c.Logger, coreMigrations) - if err != nil { - return err - } - newDSN, err = dsn.SetDSNElement(newDSN, map[string]string{"search_path": "cloudquery"}) - if err != nil { - return err - } - m, err := migrator.New(c.Logger, schema.Postgres, migrations, newDSN, "cloudquery_core") - if err != nil { - return err - } - - defer func() { - if err := m.Close(); err != nil { - c.Logger.Error("failed to close migrator connection", "error", err) - } - }() - - if err := m.UpgradeProvider(migrator.Latest); err != nil && err != migrate.ErrNoChange { - return fmt.Errorf("failed to migrate cloudquery core schema: %w", err) - } - return nil -} - func (c *Client) getProviderConfig(providerName string) (*config.RequiredProvider, error) { var providerConfig *config.RequiredProvider for _, p := range c.Providers { @@ -1139,10 +1102,6 @@ func reportFetchSummaryErrors(span trace.Span, fetchSummaries map[string]Provide ) } -func createCoreSchema(ctx context.Context, db execution.QueryExecer) error { - return db.Exec(ctx, "CREATE SCHEMA IF NOT EXISTS cloudquery") -} - func (c *Client) initDatabase(ctx context.Context) error { var err error c.db, err = sdkdb.New(ctx, c.Logger, c.DSN) @@ -1178,8 +1137,9 @@ func (c *Client) initDatabase(ctx context.Context) error { c.Logger.Warn("database validation warning") } + c.metaStorage = meta_storage.NewClient(c.db, c.Logger) // migrate cloudquery core tables to latest version - if err := c.MigrateCore(ctx, c.dialectExecutor); err != nil { + if err := c.metaStorage.MigrateCore(ctx, c.dialectExecutor); err != nil { return fmt.Errorf("failed to migrate cloudquery_core tables: %w", err) } diff --git a/pkg/client/meta_storage/client.go b/pkg/client/meta_storage/client.go new file mode 100644 index 00000000000000..9509dc901ea85e --- /dev/null +++ b/pkg/client/meta_storage/client.go @@ -0,0 +1,69 @@ +// Package meta_storage interacts with core database schema and stores cloudquery metadata such as fetch summaries +package meta_storage + +import ( + "context" + "embed" + "fmt" + + "github.com/cloudquery/cloudquery/pkg/client/database" + "github.com/cloudquery/cq-provider-sdk/database/dsn" + "github.com/cloudquery/cq-provider-sdk/migration/migrator" + "github.com/cloudquery/cq-provider-sdk/provider/execution" + "github.com/cloudquery/cq-provider-sdk/provider/schema" + "github.com/golang-migrate/migrate/v4" + "github.com/hashicorp/go-hclog" +) + +var ( + //go:embed migrations/*/*.sql + coreMigrations embed.FS +) + +type Client struct { + db execution.QueryExecer + Logger hclog.Logger +} + +func NewClient(db execution.QueryExecer, logger hclog.Logger) *Client { + return &Client{ + db: db, + Logger: logger, + } +} + +func (c *Client) MigrateCore(ctx context.Context, de database.DialectExecutor) error { + err := c.db.Exec(ctx, "CREATE SCHEMA IF NOT EXISTS cloudquery") + if err != nil { + return err + } + + newDSN, err := de.Setup(ctx) + if err != nil { + return err + } + + migrations, err := migrator.ReadMigrationFiles(c.Logger, coreMigrations) + if err != nil { + return err + } + newDSN, err = dsn.SetDSNElement(newDSN, map[string]string{"search_path": "cloudquery"}) + if err != nil { + return err + } + m, err := migrator.New(c.Logger, schema.Postgres, migrations, newDSN, "cloudquery_core") + if err != nil { + return err + } + + defer func() { + if err := m.Close(); err != nil { + c.Logger.Error("failed to close migrator connection", "error", err) + } + }() + + if err := m.UpgradeProvider(migrator.Latest); err != nil && err != migrate.ErrNoChange { + return fmt.Errorf("failed to migrate cloudquery core schema: %w", err) + } + return nil +} diff --git a/pkg/client/fetch.go b/pkg/client/meta_storage/fetch_summary.go similarity index 71% rename from pkg/client/fetch.go rename to pkg/client/meta_storage/fetch_summary.go index 38da583f277a3a..2fcd6ef4036569 100644 --- a/pkg/client/fetch.go +++ b/pkg/client/meta_storage/fetch_summary.go @@ -1,17 +1,19 @@ -package client +package meta_storage import ( "context" "database/sql/driver" "encoding/json" + "errors" "time" "github.com/doug-martin/goqu/v9" + "github.com/georgysavva/scany/pgxscan" "github.com/google/uuid" + "github.com/jackc/pgx/v4" ) -// FetchSummary includes a summarized report of fetch, such as fetch id, fetch start and finish, -// resources fetch results +// FetchSummary includes a summarized report of fetch, such as fetch id, fetch start and finish, resources fetch results type FetchSummary struct { CqId uuid.UUID `db:"id"` // Unique Id of fetch session @@ -64,6 +66,28 @@ func (c *Client) SaveFetchSummary(ctx context.Context, fs *FetchSummary) error { if err != nil { return err } - return c.db.Exec(ctx, sql, args...) } + +// GetFetchSummaryForProvider gets latest fetch summary for specific provider +func (c *Client) GetFetchSummaryForProvider(ctx context.Context, provider string) (*FetchSummary, error) { + q := goqu.Dialect("postgres"). + Select("provider_version", "is_success"). + From("cloudquery.fetches"). + Where(goqu.Ex{"provider_name": provider, "finish": goqu.Op{"isNot": nil}}). + Limit(1). + Order(goqu.I("finish").Desc()) + sql, _, err := q.ToSQL() + if err != nil { + return nil, err + } + var data FetchSummary + err = pgxscan.Get(ctx, c.db, &data, sql) + if err != nil { + if errors.Is(err, pgx.ErrNoRows) { + return nil, errors.New("could not find a completed fetch for requested provider") + } + return nil, err + } + return &data, nil +} diff --git a/pkg/client/fetch_test.go b/pkg/client/meta_storage/fetch_summary_test.go similarity index 68% rename from pkg/client/fetch_test.go rename to pkg/client/meta_storage/fetch_summary_test.go index 6a1d97841a22e5..0ea35d06f0d0d7 100644 --- a/pkg/client/fetch_test.go +++ b/pkg/client/meta_storage/fetch_summary_test.go @@ -1,12 +1,17 @@ -package client +package meta_storage import ( "context" "errors" + "fmt" "testing" "time" + "github.com/cloudquery/cloudquery/pkg/client/database" + "github.com/cloudquery/cloudquery/pkg/client/history" + sdkdb "github.com/cloudquery/cq-provider-sdk/database" "github.com/google/uuid" + "github.com/hashicorp/go-hclog" "github.com/stretchr/testify/assert" ) @@ -74,10 +79,19 @@ var fetchSummaryTests = []fetchSummaryTest{ }, } -func TestFetchSummary(t *testing.T) { - c, err := New(context.Background(), func(c *Client) { - c.DSN = testDBConnection - }) +func TestFetchSaveSummary(t *testing.T) { + // create database connection + db, err := sdkdb.New(context.Background(), hclog.NewNullLogger(), testDBConnection) + assert.NoError(t, err) + + fetchSummaryClient := NewClient(db, hclog.NewNullLogger()) + + _, de, err := database.GetExecutor(hclog.NewNullLogger(), testDBConnection, &history.Config{}) + if err != nil { + t.Fatal(fmt.Errorf("getExecutor: %w", err)) + } + + err = fetchSummaryClient.MigrateCore(context.Background(), de) assert.NoError(t, err) fetchId := uuid.New() @@ -87,7 +101,7 @@ func TestFetchSummary(t *testing.T) { } start := time.Now() f.summary.Start = &start - err := c.SaveFetchSummary(context.Background(), &f.summary) + err := fetchSummaryClient.SaveFetchSummary(context.Background(), &f.summary) if f.err != nil { assert.EqualError(t, err, f.err.Error()) } else { diff --git a/pkg/client/meta_storage/migrations/postgres/1_v0.19.2.down.sql b/pkg/client/meta_storage/migrations/postgres/1_v0.19.2.down.sql new file mode 100644 index 00000000000000..de917492044789 --- /dev/null +++ b/pkg/client/meta_storage/migrations/postgres/1_v0.19.2.down.sql @@ -0,0 +1 @@ +DROP TABLE fetches; diff --git a/pkg/client/migrations/postgres/1_v0.19.2.up.sql b/pkg/client/meta_storage/migrations/postgres/1_v0.19.2.up.sql similarity index 100% rename from pkg/client/migrations/postgres/1_v0.19.2.up.sql rename to pkg/client/meta_storage/migrations/postgres/1_v0.19.2.up.sql diff --git a/pkg/client/migrations/postgres/2_v0.19.3.down.sql b/pkg/client/meta_storage/migrations/postgres/2_v0.19.3.down.sql similarity index 100% rename from pkg/client/migrations/postgres/2_v0.19.3.down.sql rename to pkg/client/meta_storage/migrations/postgres/2_v0.19.3.down.sql diff --git a/pkg/client/migrations/postgres/2_v0.19.3.up.sql b/pkg/client/meta_storage/migrations/postgres/2_v0.19.3.up.sql similarity index 100% rename from pkg/client/migrations/postgres/2_v0.19.3.up.sql rename to pkg/client/meta_storage/migrations/postgres/2_v0.19.3.up.sql diff --git a/pkg/client/migrations/postgres/1_v0.19.2.down.sql b/pkg/client/migrations/postgres/1_v0.19.2.down.sql deleted file mode 100644 index c22d574b4800f1..00000000000000 --- a/pkg/client/migrations/postgres/1_v0.19.2.down.sql +++ /dev/null @@ -1 +0,0 @@ -DROP TABLE cq_fetches; diff --git a/pkg/policy/execute.go b/pkg/policy/execute.go index b9434499702f2f..92e0dd5919748d 100644 --- a/pkg/policy/execute.go +++ b/pkg/policy/execute.go @@ -9,6 +9,7 @@ import ( "strings" "time" + "github.com/cloudquery/cloudquery/pkg/client/meta_storage" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-version" "github.com/spf13/afero" @@ -16,6 +17,8 @@ import ( var ErrPolicyOrQueryNotFound = errors.New("selected policy/query not found") +const testDBConnection = "postgres://postgres:pass@localhost:5432/postgres?sslmode=disable" + type UpdateCallback func(update Update) type Update struct { @@ -133,6 +136,9 @@ func (e *Executor) Execute(ctx context.Context, req *ExecuteRequest, policy *Pol if err := e.checkVersions(policy.Config, req.ProviderVersions); err != nil { return nil, fmt.Errorf("%s: %w", policy.Name, err) } + if err := e.checkFetches(ctx, policy.Config); err != nil { + return nil, fmt.Errorf("%s: %w, please run `cloudquery fetch` before running policy", policy.Name, err) + } if err := e.createViews(ctx, policy); err != nil { return nil, err } @@ -174,6 +180,38 @@ func (e *Executor) Execute(ctx context.Context, req *ExecuteRequest, policy *Pol return &total, nil } +// checkFetches checks if there are fetch reports in database that satisfy providers from policy +func (e *Executor) checkFetches(ctx context.Context, policyConfig *Configuration) error { + if policyConfig == nil { + return nil + } + metaStorage := meta_storage.NewClient(e.conn, e.log) + for _, p := range policyConfig.Providers { + c, err := version.NewConstraint(p.Version) + if err != nil { + return fmt.Errorf("failed to parse version constraint for provider %s: %w", p.Type, err) + } + fetchSummary, err := metaStorage.GetFetchSummaryForProvider(ctx, p.Type) + if err != nil { + return fmt.Errorf("failed to get fetch summary for provider %s: %w", p.Type, err) + } + if fetchSummary == nil { + return fmt.Errorf("could not find finished fetches for provider %s", p.Type) + } + if !fetchSummary.IsSuccess { + return fmt.Errorf("last fetch for provider %s wasn't successful", p.Type) + } + v, err := version.NewVersion(fetchSummary.ProviderVersion) + if err != nil { + return fmt.Errorf("failed to parse version for %s fetch summary: %w", p.Type, err) + } + if !c.Check(v) { + return fmt.Errorf("the latest fetch for provider %s does not satisfy version requirement %s", p.Type, c) + } + } + return nil +} + func (*Executor) checkVersions(policyConfig *Configuration, actual map[string]*version.Version) error { if policyConfig == nil { return nil diff --git a/pkg/policy/execute_test.go b/pkg/policy/execute_test.go index e2b598a0e87a9a..fa0894f229320b 100644 --- a/pkg/policy/execute_test.go +++ b/pkg/policy/execute_test.go @@ -2,10 +2,17 @@ package policy import ( "context" + "errors" "fmt" "testing" + "time" + "github.com/cloudquery/cloudquery/pkg/client/database" + "github.com/cloudquery/cloudquery/pkg/client/history" + "github.com/cloudquery/cloudquery/pkg/client/meta_storage" sdkdb "github.com/cloudquery/cq-provider-sdk/database" + "github.com/cloudquery/cq-provider-sdk/provider/execution" + "github.com/google/uuid" "github.com/hashicorp/go-hclog" "github.com/stretchr/testify/assert" ) @@ -347,3 +354,125 @@ func TestExecutor_Execute(t *testing.T) { }) } } + +func setupCheckFetchDatabase(db execution.QueryExecer, summary *meta_storage.FetchSummary, c *meta_storage.Client) (error, func(t *testing.T)) { + if summary == nil { + return nil, func(t *testing.T) {} + } + summary.CqId = uuid.New() + summary.FetchId = uuid.New() + finish := time.Now().UTC() + summary.Finish = &finish + err := c.SaveFetchSummary(context.Background(), summary) + if err != nil { + return err, nil + } + + // Return conn and tear down func + return nil, func(t *testing.T) { + err = db.Exec(context.Background(), fmt.Sprintf(`DELETE FROM "cloudquery"."fetches" WHERE "id" = '%s';`, summary.FetchId.String())) + assert.NoError(t, err) + } +} + +func TestExecutor_CheckFetches(t *testing.T) { + // create database connection + db, err := sdkdb.New(context.Background(), hclog.NewNullLogger(), testDBConnection) + assert.NoError(t, err) + + metaStorage := meta_storage.NewClient(db, hclog.NewNullLogger()) + + _, de, err := database.GetExecutor(hclog.NewNullLogger(), testDBConnection, &history.Config{}) + if err != nil { + t.Fatal(fmt.Errorf("getExecutor: %w", err)) + } + + err = metaStorage.MigrateCore(context.Background(), de) + assert.NoError(t, err) + + executor := NewExecutor(db, hclog.Default(), nil) + + finish := time.Now().UTC() + assert.NoError(t, err) + cases := []struct { + Name string + Config Configuration + f *meta_storage.FetchSummary + err error + }{ + { + Name: "correct version", + Config: Configuration{ + Providers: []*Provider{ + {Type: "test1", Version: "~> v0.2.0"}, + }, + }, + f: &meta_storage.FetchSummary{ProviderName: "test1", ProviderVersion: "v0.2.3", Finish: &finish, IsSuccess: true}, + err: nil, + }, + { + Name: "no fetches", + Config: Configuration{ + Providers: []*Provider{ + {Type: "test2", Version: "~> v0.2.0"}, + }, + }, + err: errors.New("failed to get fetch summary for provider test2: could not find a completed fetch for requested provider"), + }, { + Name: "no finished fetches", + Config: Configuration{ + Providers: []*Provider{ + {Type: "no_finish", Version: "~> v0.2.0"}, + }, + }, + f: &meta_storage.FetchSummary{ProviderName: "test3", ProviderVersion: "v0.2.3", IsSuccess: false}, + err: errors.New("failed to get fetch summary for provider no_finish: could not find a completed fetch for requested provider"), + }, + { + Name: "no fetches", + Config: Configuration{ + Providers: []*Provider{ + {Type: "test3", Version: "~> v0.2.0"}, + }, + }, + f: &meta_storage.FetchSummary{ProviderName: "test3", ProviderVersion: "v0.2.3", Finish: &finish, IsSuccess: false}, + err: errors.New("last fetch for provider test3 wasn't successful"), + }, + { + Name: "no fetches", + Config: Configuration{ + Providers: []*Provider{ + {Type: "test4", Version: "~> v0.3.0"}, + }, + }, + f: &meta_storage.FetchSummary{ProviderName: "test4", ProviderVersion: "v0.2.3", Finish: &finish, IsSuccess: true}, + err: errors.New("the latest fetch for provider test4 does not satisfy version requirement ~> v0.3.0"), + }, + { + Name: "no fetches", + Config: Configuration{ + Providers: []*Provider{ + {Type: "test4", Version: ""}, + }, + }, + f: &meta_storage.FetchSummary{ProviderName: "test4", ProviderVersion: "v0.2.3", Finish: &finish, IsSuccess: true}, + err: errors.New("failed to parse version constraint for provider test4: Malformed constraint: "), + }, + } + + for _, tc := range cases { + t.Run(tc.Name, func(t *testing.T) { + err, clear := setupCheckFetchDatabase(db, tc.f, metaStorage) + assert.NoError(t, err) + + err = executor.checkFetches(context.Background(), &tc.Config) + if tc.err != nil { + assert.Equal(t, tc.err.Error(), err.Error()) + } else { + assert.NoError(t, err) + } + clear(t) + }) + + } +}