Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ All notable changes to CloudQuery will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]

### :rocket: Added
* Added core migrations implementation
* Added fetch summary saving to `fetches` table


<!--
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ require (
github.com/georgysavva/scany v0.2.9
github.com/getsentry/sentry-go v0.12.0
github.com/google/go-cmp v0.5.6
github.com/google/uuid v1.3.0
github.com/hashicorp/go-getter v1.5.10
github.com/huandu/go-sqlbuilder v1.13.0
github.com/jackc/pgconn v1.10.0
Expand Down Expand Up @@ -72,7 +73,6 @@ require (
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/go-querystring v1.0.0 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/googleapis/gax-go/v2 v2.0.5 // indirect
github.com/googleapis/google-cloud-go-testing v0.0.0-20200911160855-bcd43fbb19e8 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
Expand Down
440 changes: 437 additions & 3 deletions go.sum

Large diffs are not rendered by default.

106 changes: 100 additions & 6 deletions pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,19 @@ package client

import (
"context"
"embed"
"errors"
"fmt"
"io"
"path/filepath"
"sort"
"strconv"
"strings"
"time"

"github.com/cloudquery/cq-provider-sdk/helpers"
"github.com/getsentry/sentry-go"
"google.golang.org/grpc/status"

"github.com/cloudquery/cloudquery/pkg/client/history"

"github.com/cloudquery/cloudquery/internal/logging"
"github.com/cloudquery/cloudquery/internal/telemetry"
"github.com/cloudquery/cloudquery/pkg/client/history"
"github.com/cloudquery/cloudquery/pkg/config"
"github.com/cloudquery/cloudquery/pkg/module"
"github.com/cloudquery/cloudquery/pkg/module/drift"
Expand All @@ -26,10 +23,13 @@ import (
"github.com/cloudquery/cloudquery/pkg/policy"
"github.com/cloudquery/cloudquery/pkg/ui"
"github.com/cloudquery/cq-provider-sdk/cqproto"
"github.com/cloudquery/cq-provider-sdk/helpers"
"github.com/cloudquery/cq-provider-sdk/provider"
"github.com/cloudquery/cq-provider-sdk/provider/schema"
"github.com/cloudquery/cq-provider-sdk/provider/schema/diag"
"github.com/getsentry/sentry-go"
"github.com/golang-migrate/migrate/v4"
"github.com/google/uuid"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-version"
"github.com/hashicorp/hcl/v2"
Expand All @@ -39,10 +39,17 @@ import (
otrace "go.opentelemetry.io/otel/trace"
"golang.org/x/sync/errgroup"
gcodes "google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

var (
ErrMigrationsNotSupported = errors.New("provider doesn't support migrations")
//go:embed migrations/*.sql
coreMigrations embed.FS
)

const (
latestVersion = "latest"
)

// FetchRequest is provided to the Client to execute a fetch on one or more providers
Expand Down Expand Up @@ -278,6 +285,13 @@ func New(ctx context.Context, options ...Option) (*Client, error) {
c.Logger.Warn("postgres validation warning", "err", err)
}
}
// migrate cloudquery core tables to latest version
if c.DSN != "" {
if err := c.MigrateCore(ctx); err != nil {
return nil, fmt.Errorf("failed to migrate cloudquery_core tables: %w", err)
}
}

if err := c.setupTableCreator(ctx); err != nil {
return nil, err
}
Expand Down Expand Up @@ -402,6 +416,11 @@ func (c *Client) Fetch(ctx context.Context, request FetchRequest) (res *FetchRes
// Ignoring gctx since we don't want to stop other running providers if one provider fails with an error
// future refactor should probably use a something else rather than error group.
errGroup, _ := errgroup.WithContext(ctx)
fetchId, err := uuid.NewUUID()
if err != nil {
return nil, err
}

for _, providerConfig := range request.Providers {
providerConfig := providerConfig
c.Logger.Debug("creating provider plugin", "provider", providerConfig.Name)
Expand All @@ -410,8 +429,22 @@ func (c *Client) Fetch(ctx context.Context, request FetchRequest) (res *FetchRes
c.Logger.Error("failed to create provider plugin", "provider", providerConfig.Name, "error", err)
return nil, err
}

// TODO: move this into an outer function
errGroup.Go(func() error {
fs := FetchSummary{
FetchId: fetchId,
ProviderName: providerConfig.Name,
ProviderVersion: providerPlugin.Version(),
Start: time.Now().UTC(),
}

defer func() {
if err := SaveFetchSummary(ctx, c.pool, &fs); err != nil {
c.Logger.Error("failed to save fetch summary", "err", err)
}
}()

pLog := c.Logger.With("provider", providerConfig.Name, "alias", providerConfig.Alias, "version", providerPlugin.Version())
pLog.Info("requesting provider to configure")
if c.HistoryCfg != nil {
Expand Down Expand Up @@ -453,6 +486,7 @@ func (c *Client) Fetch(ctx context.Context, request FetchRequest) (res *FetchRes
partialFetchResults []*cqproto.FailedResourceFetch
fetchedResources = make(map[string]cqproto.ResourceFetchSummary, len(providerConfig.Resources))
totalResources uint64 = 0
totalErrors uint64 = 0
)
for {
resp, err := stream.Recv()
Expand Down Expand Up @@ -481,6 +515,10 @@ func (c *Client) Fetch(ctx context.Context, request FetchRequest) (res *FetchRes
FetchResources: fetchedResources,
Status: status,
}
fs.Finish = time.Now().UTC()
fs.IsSuccess = true
fs.TotalErrorsCount = totalErrors
fs.TotalResourceCount = totalResources
return nil
}
pLog.Error("received provider fetch error", "error", err)
Expand All @@ -500,6 +538,7 @@ func (c *Client) Fetch(ctx context.Context, request FetchRequest) (res *FetchRes
}

totalResources += resp.ResourceCount
totalErrors += uint64(len(resp.PartialFetchFailedResources))
fetchedResources[resp.ResourceName] = resp.Summary

if resp.Error != "" {
Expand All @@ -511,6 +550,20 @@ func (c *Client) Fetch(ctx context.Context, request FetchRequest) (res *FetchRes
if request.UpdateCallback != nil {
request.UpdateCallback(update)
}

if fs.Resources == nil {
fs.Resources = &[]ResourceFetchSummary{}
}

*fs.Resources = append(*fs.Resources, ResourceFetchSummary{
ResourceName: resp.ResourceName,
FinishedResources: resp.FinishedResources,
Status: strconv.Itoa(int(resp.Summary.Status)), // todo use human readable representation of status
Error: resp.Error,
PartialFetchFailedResources: resp.PartialFetchFailedResources,
ResourceCount: resp.ResourceCount,
Diagnostics: resp.Summary.Diagnostics,
})
}
})
}
Expand Down Expand Up @@ -876,6 +929,33 @@ func (c *Client) buildProviderMigrator(migrations map[string][]byte, providerNam
return m, providerConfig, err
}

func (c *Client) MigrateCore(ctx context.Context) error {
err := createCoreSchema(ctx, c.pool)
if err != nil {
return err
}
migrations, err := provider.ReadMigrationFiles(c.Logger, coreMigrations)
if err != nil {
return err
}
dsn := c.DSN + "&search_path=cloudquery"
m, err := provider.NewMigrator(c.Logger, migrations, dsn, "cloudquery_core")
if err != nil {
return err
}

defer func() {
if err := m.Close(); err != nil {
c.Logger.Error("failed to close migrator connection", "error", err)
}
}()

if err := m.UpgradeProvider(latestVersion); err != nil && err != migrate.ErrNoChange {
return fmt.Errorf("failed to migrate cloudquery core schema: %w", err)
}
return nil
}

func (c *Client) getProviderConfig(providerName string) (*config.RequiredProvider, error) {
var providerConfig *config.RequiredProvider
for _, p := range c.Providers {
Expand Down Expand Up @@ -1029,3 +1109,17 @@ func reportFetchSummaryErrors(span otrace.Span, fetchSummaries map[string]Provid
attribute.Int64("fetch.errors.total", int64(totalErrors)),
)
}

func createCoreSchema(ctx context.Context, pool *pgxpool.Pool) error {
conn, err := pool.Acquire(ctx)
if err != nil {
return err
}
defer conn.Release()

_, err = conn.Exec(ctx, "CREATE SCHEMA IF NOT EXISTS cloudquery")
if err != nil {
return err
}
return nil
}
77 changes: 77 additions & 0 deletions pkg/client/fetch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package client

import (
"context"
"database/sql/driver"
"encoding/json"
"time"

"github.com/cloudquery/cq-provider-sdk/cqproto"
"github.com/cloudquery/cq-provider-sdk/provider/schema/diag"
"github.com/doug-martin/goqu/v9"
"github.com/google/uuid"
"github.com/jackc/pgx/v4/pgxpool"
)

// FetchSummary includes a summarized report of fetch, such as fetch id, fetch start and finish,
// resources fetch results
type FetchSummary struct {
CqId uuid.UUID `db:"id"`
// Unique Id of fetch session
FetchId uuid.UUID `db:"fetch_id"`
Start time.Time `db:"start"`
Finish time.Time `db:"finish"`
IsSuccess bool `db:"is_success"`
TotalResourceCount uint64 `db:"total_resource_count"`
TotalErrorsCount uint64 `db:"total_errors_count"`
ProviderName string `db:"provider_name"`
ProviderVersion string `db:"provider_version"`
Resources *[]ResourceFetchSummary `db:"results"`
}

// ResourceFetchSummary includes a data about fetching specific resource
type ResourceFetchSummary struct {
ResourceName string `json:"resource_name"`
// map of resources that have finished fetching
FinishedResources map[string]bool `json:"finished_resources"`
// Amount of resources collected so far
// Error value if any, if returned the stream will be canceled
Error string `json:"error"`
// list of resources where the fetching failed
PartialFetchFailedResources []*cqproto.FailedResourceFetch `json:"partial_fetch_failed_resources"`
// Execution status of resource
Status string `json:"status"`
// Total Amount of resources collected by this resource
ResourceCount uint64 `json:"resource_count"`
// Diagnostics of failed resource fetch, the diagnostic provides insights such as severity, summary and
// details on how to solve this issue
Diagnostics diag.Diagnostics `json:"diagnostics"`
}

// Value implements Valuer interface required by goqu
func (r ResourceFetchSummary) Value() (driver.Value, error) {
return json.Marshal(r)
}

// SaveFetchSummary saves fetch summary into fetches database
func SaveFetchSummary(ctx context.Context, pool *pgxpool.Pool, fs *FetchSummary) error {
conn, err := pool.Acquire(ctx)
if err != nil {
return err
}
defer conn.Release()

id, err := uuid.NewUUID()
if err != nil {
return err
}
fs.CqId = id
q := goqu.Dialect("postgres").Insert("cloudquery.fetches").Rows(fs)
sql, args, err := q.ToSQL()
if err != nil {
return err
}

_, err = conn.Exec(ctx, sql, args...)
return err
}
Loading