Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 8 additions & 48 deletions pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package client

import (
"context"
"embed"
"errors"
"fmt"
"io"
Expand All @@ -18,6 +17,7 @@ import (
"github.com/cloudquery/cloudquery/pkg/client/database"
"github.com/cloudquery/cloudquery/pkg/client/database/timescale"
"github.com/cloudquery/cloudquery/pkg/client/history"
"github.com/cloudquery/cloudquery/pkg/client/meta_storage"
"github.com/cloudquery/cloudquery/pkg/config"
"github.com/cloudquery/cloudquery/pkg/module"
"github.com/cloudquery/cloudquery/pkg/module/drift"
Expand All @@ -30,7 +30,6 @@ import (
"github.com/cloudquery/cq-provider-sdk/database/dsn"
"github.com/cloudquery/cq-provider-sdk/migration/migrator"
"github.com/cloudquery/cq-provider-sdk/provider/diag"
"github.com/cloudquery/cq-provider-sdk/provider/execution"
"github.com/cloudquery/cq-provider-sdk/provider/schema"
"github.com/getsentry/sentry-go"
"github.com/golang-migrate/migrate/v4"
Expand All @@ -49,8 +48,6 @@ import (

var (
ErrMigrationsNotSupported = errors.New("provider doesn't support migrations")
//go:embed migrations/*/*.sql
coreMigrations embed.FS
)

// FetchRequest is provided to the Client to execute a fetch on one or more providers
Expand Down Expand Up @@ -240,6 +237,8 @@ type Client struct {
// HistoryConfig defines configuration for CloudQuery history mode
HistoryCfg *history.Config

// metaStorage interacts with cloudquery core resources
metaStorage *meta_storage.Client
db *sdkdb.DB
dialectExecutor database.DialectExecutor
}
Expand Down Expand Up @@ -409,15 +408,15 @@ func (c *Client) Fetch(ctx context.Context, request FetchRequest) (res *FetchRes
for _, providerConfig := range request.Providers {
providerConfig := providerConfig
createdAt := time.Now().UTC()
fetchSummary := FetchSummary{
fetchSummary := meta_storage.FetchSummary{
FetchId: fetchId,
ProviderName: providerConfig.Name,
ProviderAlias: providerConfig.Alias,
CreatedAt: &createdAt,
CoreVersion: Version,
}
saveFetchSummary := func() {
if err := c.SaveFetchSummary(ctx, &fetchSummary); err != nil {
if err := c.metaStorage.SaveFetchSummary(ctx, &fetchSummary); err != nil {
c.Logger.Error("failed to save fetch summary", "err", err)
}
}
Expand Down Expand Up @@ -541,7 +540,7 @@ func (c *Client) Fetch(ctx context.Context, request FetchRequest) (res *FetchRes
request.UpdateCallback(update)
}

fetchSummary.Resources = append(fetchSummary.Resources, ResourceFetchSummary{
fetchSummary.Resources = append(fetchSummary.Resources, meta_storage.ResourceFetchSummary{
ResourceName: resp.ResourceName,
FinishedResources: resp.FinishedResources,
Status: resp.Summary.Status.String(),
Expand Down Expand Up @@ -970,42 +969,6 @@ func (c *Client) buildProviderMigrator(ctx context.Context, migrations map[strin
return m, providerConfig, err
}

func (c *Client) MigrateCore(ctx context.Context, de database.DialectExecutor) error {
err := createCoreSchema(ctx, c.db)
if err != nil {
return err
}

newDSN, err := de.Setup(ctx)
if err != nil {
return err
}

migrations, err := migrator.ReadMigrationFiles(c.Logger, coreMigrations)
if err != nil {
return err
}
newDSN, err = dsn.SetDSNElement(newDSN, map[string]string{"search_path": "cloudquery"})
if err != nil {
return err
}
m, err := migrator.New(c.Logger, schema.Postgres, migrations, newDSN, "cloudquery_core")
if err != nil {
return err
}

defer func() {
if err := m.Close(); err != nil {
c.Logger.Error("failed to close migrator connection", "error", err)
}
}()

if err := m.UpgradeProvider(migrator.Latest); err != nil && err != migrate.ErrNoChange {
return fmt.Errorf("failed to migrate cloudquery core schema: %w", err)
}
return nil
}

func (c *Client) getProviderConfig(providerName string) (*config.RequiredProvider, error) {
var providerConfig *config.RequiredProvider
for _, p := range c.Providers {
Expand Down Expand Up @@ -1139,10 +1102,6 @@ func reportFetchSummaryErrors(span trace.Span, fetchSummaries map[string]Provide
)
}

func createCoreSchema(ctx context.Context, db execution.QueryExecer) error {
return db.Exec(ctx, "CREATE SCHEMA IF NOT EXISTS cloudquery")
}

func (c *Client) initDatabase(ctx context.Context) error {
var err error
c.db, err = sdkdb.New(ctx, c.Logger, c.DSN)
Expand Down Expand Up @@ -1178,8 +1137,9 @@ func (c *Client) initDatabase(ctx context.Context) error {
c.Logger.Warn("database validation warning")
}

c.metaStorage = meta_storage.NewClient(c.db, c.Logger)
// migrate cloudquery core tables to latest version
if err := c.MigrateCore(ctx, c.dialectExecutor); err != nil {
if err := c.metaStorage.MigrateCore(ctx, c.dialectExecutor); err != nil {
return fmt.Errorf("failed to migrate cloudquery_core tables: %w", err)
}

Expand Down
69 changes: 69 additions & 0 deletions pkg/client/meta_storage/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Package meta_storage interacts with core database schema and stores cloudquery metadata such as fetch summaries
package meta_storage

import (
"context"
"embed"
"fmt"

"github.com/cloudquery/cloudquery/pkg/client/database"
"github.com/cloudquery/cq-provider-sdk/database/dsn"
"github.com/cloudquery/cq-provider-sdk/migration/migrator"
"github.com/cloudquery/cq-provider-sdk/provider/execution"
"github.com/cloudquery/cq-provider-sdk/provider/schema"
"github.com/golang-migrate/migrate/v4"
"github.com/hashicorp/go-hclog"
)

var (
//go:embed migrations/*/*.sql
coreMigrations embed.FS
)

type Client struct {
db execution.QueryExecer
Logger hclog.Logger
}

func NewClient(db execution.QueryExecer, logger hclog.Logger) *Client {
return &Client{
db: db,
Logger: logger,
}
}

func (c *Client) MigrateCore(ctx context.Context, de database.DialectExecutor) error {
err := c.db.Exec(ctx, "CREATE SCHEMA IF NOT EXISTS cloudquery")
if err != nil {
return err
}

newDSN, err := de.Setup(ctx)
if err != nil {
return err
}

migrations, err := migrator.ReadMigrationFiles(c.Logger, coreMigrations)
if err != nil {
return err
}
newDSN, err = dsn.SetDSNElement(newDSN, map[string]string{"search_path": "cloudquery"})
if err != nil {
return err
}
m, err := migrator.New(c.Logger, schema.Postgres, migrations, newDSN, "cloudquery_core")
if err != nil {
return err
}

defer func() {
if err := m.Close(); err != nil {
c.Logger.Error("failed to close migrator connection", "error", err)
}
}()

if err := m.UpgradeProvider(migrator.Latest); err != nil && err != migrate.ErrNoChange {
return fmt.Errorf("failed to migrate cloudquery core schema: %w", err)
}
return nil
}
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
package client
package meta_storage

import (
"context"
"database/sql/driver"
"encoding/json"
"errors"
"time"

"github.com/doug-martin/goqu/v9"
"github.com/georgysavva/scany/pgxscan"
"github.com/google/uuid"
"github.com/jackc/pgx/v4"
)

// FetchSummary includes a summarized report of fetch, such as fetch id, fetch start and finish,
// resources fetch results
// FetchSummary includes a summarized report of fetch, such as fetch id, fetch start and finish, resources fetch results
type FetchSummary struct {
CqId uuid.UUID `db:"id"`
// Unique Id of fetch session
Expand Down Expand Up @@ -64,6 +66,28 @@ func (c *Client) SaveFetchSummary(ctx context.Context, fs *FetchSummary) error {
if err != nil {
return err
}

return c.db.Exec(ctx, sql, args...)
}

// GetFetchSummaryForProvider gets latest fetch summary for specific provider
func (c *Client) GetFetchSummaryForProvider(ctx context.Context, provider string) (*FetchSummary, error) {
q := goqu.Dialect("postgres").
Select("provider_version", "is_success").
From("cloudquery.fetches").
Where(goqu.Ex{"provider_name": provider, "finish": goqu.Op{"isNot": nil}}).
Limit(1).
Order(goqu.I("finish").Desc())
sql, _, err := q.ToSQL()
if err != nil {
return nil, err
}
var data FetchSummary
err = pgxscan.Get(ctx, c.db, &data, sql)
if err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return nil, errors.New("could not find a completed fetch for requested provider")
}
return nil, err
}
return &data, nil
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
package client
package meta_storage

import (
"context"
"errors"
"fmt"
"testing"
"time"

"github.com/cloudquery/cloudquery/pkg/client/database"
"github.com/cloudquery/cloudquery/pkg/client/history"
sdkdb "github.com/cloudquery/cq-provider-sdk/database"
"github.com/google/uuid"
"github.com/hashicorp/go-hclog"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -74,10 +79,19 @@ var fetchSummaryTests = []fetchSummaryTest{
},
}

func TestFetchSummary(t *testing.T) {
c, err := New(context.Background(), func(c *Client) {
c.DSN = testDBConnection
})
func TestFetchSaveSummary(t *testing.T) {
// create database connection
db, err := sdkdb.New(context.Background(), hclog.NewNullLogger(), testDBConnection)
assert.NoError(t, err)

fetchSummaryClient := NewClient(db, hclog.NewNullLogger())

_, de, err := database.GetExecutor(hclog.NewNullLogger(), testDBConnection, &history.Config{})
if err != nil {
t.Fatal(fmt.Errorf("getExecutor: %w", err))
}

err = fetchSummaryClient.MigrateCore(context.Background(), de)
assert.NoError(t, err)

fetchId := uuid.New()
Expand All @@ -87,7 +101,7 @@ func TestFetchSummary(t *testing.T) {
}
start := time.Now()
f.summary.Start = &start
err := c.SaveFetchSummary(context.Background(), &f.summary)
err := fetchSummaryClient.SaveFetchSummary(context.Background(), &f.summary)
if f.err != nil {
assert.EqualError(t, err, f.err.Error())
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE fetches;
1 change: 0 additions & 1 deletion pkg/client/migrations/postgres/1_v0.19.2.down.sql

This file was deleted.

38 changes: 38 additions & 0 deletions pkg/policy/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,16 @@ import (
"strings"
"time"

"github.com/cloudquery/cloudquery/pkg/client/meta_storage"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-version"
"github.com/spf13/afero"
)

var ErrPolicyOrQueryNotFound = errors.New("selected policy/query not found")

const testDBConnection = "postgres://postgres:pass@localhost:5432/postgres?sslmode=disable"

type UpdateCallback func(update Update)

type Update struct {
Expand Down Expand Up @@ -133,6 +136,9 @@ func (e *Executor) Execute(ctx context.Context, req *ExecuteRequest, policy *Pol
if err := e.checkVersions(policy.Config, req.ProviderVersions); err != nil {
return nil, fmt.Errorf("%s: %w", policy.Name, err)
}
if err := e.checkFetches(ctx, policy.Config); err != nil {
return nil, fmt.Errorf("%s: %w, please run `cloudquery fetch` before running policy", policy.Name, err)
}
if err := e.createViews(ctx, policy); err != nil {
return nil, err
}
Expand Down Expand Up @@ -174,6 +180,38 @@ func (e *Executor) Execute(ctx context.Context, req *ExecuteRequest, policy *Pol
return &total, nil
}

// checkFetches checks if there are fetch reports in database that satisfy providers from policy
func (e *Executor) checkFetches(ctx context.Context, policyConfig *Configuration) error {
if policyConfig == nil {
return nil
}
metaStorage := meta_storage.NewClient(e.conn, e.log)
for _, p := range policyConfig.Providers {
c, err := version.NewConstraint(p.Version)
if err != nil {
return fmt.Errorf("failed to parse version constraint for provider %s: %w", p.Type, err)
}
fetchSummary, err := metaStorage.GetFetchSummaryForProvider(ctx, p.Type)
if err != nil {
return fmt.Errorf("failed to get fetch summary for provider %s: %w", p.Type, err)
}
if fetchSummary == nil {
return fmt.Errorf("could not find finished fetches for provider %s", p.Type)
}
if !fetchSummary.IsSuccess {
return fmt.Errorf("last fetch for provider %s wasn't successful", p.Type)
}
v, err := version.NewVersion(fetchSummary.ProviderVersion)
if err != nil {
return fmt.Errorf("failed to parse version for %s fetch summary: %w", p.Type, err)
}
if !c.Check(v) {
return fmt.Errorf("the latest fetch for provider %s does not satisfy version requirement %s", p.Type, c)
}
}
return nil
}

func (*Executor) checkVersions(policyConfig *Configuration, actual map[string]*version.Version) error {
if policyConfig == nil {
return nil
Expand Down
Loading