Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 29 additions & 6 deletions plugins/destination/snowflake/client/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ import (
)

const (
// Use ILIKE for case insensitivity.
// Use ILIKE for case insensitivity

isTableExistSQL = "SELECT count(*) FROM information_schema.tables WHERE table_name ILIKE '?';"
sqlTableInfo = "select column_name, data_type, is_nullable from information_schema.columns where table_name ILIKE '?';"
isTableExistSQL = "SELECT count(*) FROM information_schema.tables WHERE table_name ILIKE ?;"
sqlTableInfo = "select column_name, data_type, is_nullable from information_schema.columns where table_name ILIKE ?;"
)

type columnInfo struct {
Expand Down Expand Up @@ -84,11 +84,12 @@ func (c *Client) autoMigrateTable(_ context.Context, table *schema.Table) error
switch {
case snowflakeColumn == nil:
c.logger.Debug().Str("table", table.Name).Str("column", col.Name).Msg("Column doesn't exist, creating")
sql := "alter table " + table.Name + " add column \"" + columnName + "\" \"" + columnType + `"`
sql := "alter table " + table.Name + " add column \"" + columnName + "\"" + columnType
fmt.Println(sql)
if _, err := c.db.Exec(sql); err != nil {
return fmt.Errorf("failed to add column %s on table %s: %w", col.Name, table.Name, err)
}
case snowflakeColumn.typ != columnType:
case !strings.EqualFold(snowflakeColumn.typ, columnType):
return fmt.Errorf("column %s on table %s has different type than schema, expected %s got %s. Try dropping the column and re-running", col.Name, table.Name, columnType, snowflakeColumn.typ)
}
}
Expand Down Expand Up @@ -138,12 +139,23 @@ func (c *Client) getTableInfo(tableName string) (*tableInfo, error) {
defer rows.Close()
for rows.Next() {
colInfo := columnInfo{}

// 'information_schema.is_nullable' is a string containing 'YES' or 'NO'.
// We save it here as a string, and parse it later.
var isNullableTemp string

if err := rows.Scan(
&colInfo.name,
&colInfo.typ,
&colInfo.notNull); err != nil {
&isNullableTemp); err != nil {
return nil, err
}

colInfo.notNull, err = parseYesNoString(isNullableTemp)
if err != nil {
return nil, err
}

colInfo.typ = strings.ToLower(colInfo.typ)
info.columns = append(info.columns, colInfo)
}
Expand All @@ -152,3 +164,14 @@ func (c *Client) getTableInfo(tableName string) (*tableInfo, error) {
}
return &info, nil
}

func parseYesNoString(str string) (bool, error) {
switch str {
case "YES":
return true, nil
case "NO":
return false, nil
default:
return false, fmt.Errorf("failed to parse yes/no string: %s", str)
}
}
6 changes: 3 additions & 3 deletions plugins/destination/snowflake/client/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ func (*Client) SchemaTypeToSnowflake(t schema.ValueType) string {
case schema.TypeBool:
return "boolean"
case schema.TypeInt:
return "bigint"
return "number"
case schema.TypeFloat:
return "float8"
return "float"
case schema.TypeUUID:
return "text"
case schema.TypeString:
Expand All @@ -21,7 +21,7 @@ func (*Client) SchemaTypeToSnowflake(t schema.ValueType) string {
case schema.TypeStringArray:
return "array"
case schema.TypeTimestamp:
return "timestamp"
return "timestamp_ntz"
case schema.TypeJSON:
return "variant"
case schema.TypeUUIDArray:
Expand Down