diff --git a/plugins/destination/snowflake/client/migrate.go b/plugins/destination/snowflake/client/migrate.go index 3c0bfe1611a331..fed93ffa851af4 100644 --- a/plugins/destination/snowflake/client/migrate.go +++ b/plugins/destination/snowflake/client/migrate.go @@ -9,10 +9,10 @@ import ( ) const ( - // Use ILIKE for case insensitivity. + // Use ILIKE for case insensitivity - isTableExistSQL = "SELECT count(*) FROM information_schema.tables WHERE table_name ILIKE '?';" - sqlTableInfo = "select column_name, data_type, is_nullable from information_schema.columns where table_name ILIKE '?';" + isTableExistSQL = "SELECT count(*) FROM information_schema.tables WHERE table_name ILIKE ?;" + sqlTableInfo = "select column_name, data_type, is_nullable from information_schema.columns where table_name ILIKE ?;" ) type columnInfo struct { @@ -84,11 +84,12 @@ func (c *Client) autoMigrateTable(_ context.Context, table *schema.Table) error switch { case snowflakeColumn == nil: c.logger.Debug().Str("table", table.Name).Str("column", col.Name).Msg("Column doesn't exist, creating") - sql := "alter table " + table.Name + " add column \"" + columnName + "\" \"" + columnType + `"` + sql := "alter table " + table.Name + " add column \"" + columnName + "\"" + columnType + fmt.Println(sql) if _, err := c.db.Exec(sql); err != nil { return fmt.Errorf("failed to add column %s on table %s: %w", col.Name, table.Name, err) } - case snowflakeColumn.typ != columnType: + case !strings.EqualFold(snowflakeColumn.typ, columnType): return fmt.Errorf("column %s on table %s has different type than schema, expected %s got %s. Try dropping the column and re-running", col.Name, table.Name, columnType, snowflakeColumn.typ) } } @@ -138,12 +139,23 @@ func (c *Client) getTableInfo(tableName string) (*tableInfo, error) { defer rows.Close() for rows.Next() { colInfo := columnInfo{} + + // 'information_schema.is_nullable' is a string containing 'YES' or 'NO'. + // We save it here as a string, and parse it later. + var isNullableTemp string + if err := rows.Scan( &colInfo.name, &colInfo.typ, - &colInfo.notNull); err != nil { + &isNullableTemp); err != nil { + return nil, err + } + + colInfo.notNull, err = parseYesNoString(isNullableTemp) + if err != nil { return nil, err } + colInfo.typ = strings.ToLower(colInfo.typ) info.columns = append(info.columns, colInfo) } @@ -152,3 +164,14 @@ func (c *Client) getTableInfo(tableName string) (*tableInfo, error) { } return &info, nil } + +func parseYesNoString(str string) (bool, error) { + switch str { + case "YES": + return true, nil + case "NO": + return false, nil + default: + return false, fmt.Errorf("failed to parse yes/no string: %s", str) + } +} diff --git a/plugins/destination/snowflake/client/types.go b/plugins/destination/snowflake/client/types.go index 625a4c079531af..e4da2b9cf5388e 100644 --- a/plugins/destination/snowflake/client/types.go +++ b/plugins/destination/snowflake/client/types.go @@ -9,9 +9,9 @@ func (*Client) SchemaTypeToSnowflake(t schema.ValueType) string { case schema.TypeBool: return "boolean" case schema.TypeInt: - return "bigint" + return "number" case schema.TypeFloat: - return "float8" + return "float" case schema.TypeUUID: return "text" case schema.TypeString: @@ -21,7 +21,7 @@ func (*Client) SchemaTypeToSnowflake(t schema.ValueType) string { case schema.TypeStringArray: return "array" case schema.TypeTimestamp: - return "timestamp" + return "timestamp_ntz" case schema.TypeJSON: return "variant" case schema.TypeUUIDArray: