Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions crates/adapters/src/adhoc/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use uuid::Uuid;
pub const fn input_adhoc_arrow_serde_config() -> &'static SqlSerdeConfig {
&SqlSerdeConfig {
timestamp_format: TimestampFormat::String("%FT%T%.f"),
timestamp_tz_format: TimestampFormat::String("%FT%T%.f%Z"),
time_format: TimeFormat::String("%T"),
date_format: DateFormat::String("%Y-%m-%d"),
decimal_format: DecimalFormat::String,
Expand All @@ -59,6 +60,7 @@ pub const fn input_adhoc_arrow_serde_config() -> &'static SqlSerdeConfig {
pub const fn output_adhoc_arrow_serde_config() -> &'static SqlSerdeConfig {
&SqlSerdeConfig {
timestamp_format: TimestampFormat::MicrosSinceEpoch,
timestamp_tz_format: TimestampFormat::MicrosSinceEpoch,
time_format: TimeFormat::NanosSigned,
date_format: DateFormat::String("%Y-%m-%d"),
decimal_format: DecimalFormat::String,
Expand Down
1 change: 1 addition & 0 deletions crates/adapters/src/format/avro/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use super::{schema::schema_unwrap_optional, schema_registry_settings};
pub const fn avro_de_config() -> &'static SqlSerdeConfig {
&SqlSerdeConfig {
timestamp_format: TimestampFormat::MicrosSinceEpoch,
timestamp_tz_format: TimestampFormat::MicrosSinceEpoch,
time_format: TimeFormat::Micros,
date_format: DateFormat::DaysSinceEpoch,
decimal_format: DecimalFormat::String,
Expand Down
4 changes: 2 additions & 2 deletions crates/adapters/src/format/avro/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ pub fn validate_field_schema(
SqlType::Date => {
return validate_date_schema(avro_schema);
}
SqlType::Timestamp => {
SqlType::Timestamp | SqlType::TimestampTz => {
return validate_timestamp_schema(avro_schema);
}
SqlType::Interval(_) => {
Expand Down Expand Up @@ -585,7 +585,7 @@ impl AvroSchemaBuilder {
SqlType::Varbinary => AvroSchema::Bytes,
SqlType::Time => AvroSchema::TimeMicros,
SqlType::Date => AvroSchema::Date,
SqlType::Timestamp => AvroSchema::TimestampMicros,
SqlType::Timestamp | SqlType::TimestampTz => AvroSchema::TimestampMicros,
SqlType::Interval(_) => {
return Err("not implemented: Avro encoding for the SQL interval type".to_string());
}
Expand Down
2 changes: 1 addition & 1 deletion crates/adapters/src/format/json/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ mod kafka_connect_json_converter {
SqlType::Real => RepresentationType::Float,
SqlType::Double => RepresentationType::Double,
SqlType::Time => RepresentationType::Int64,
SqlType::Timestamp => RepresentationType::Int64,
SqlType::Timestamp | SqlType::TimestampTz => RepresentationType::Int64,
SqlType::Date => RepresentationType::Int32,
SqlType::Binary | SqlType::Varbinary => RepresentationType::Bytes,
SqlType::Array => RepresentationType::Array {
Expand Down
2 changes: 2 additions & 0 deletions crates/adapters/src/format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ pub mod test;
pub const fn default_arrow_serde_config() -> &'static SqlSerdeConfig {
&SqlSerdeConfig {
timestamp_format: TimestampFormat::MicrosSinceEpoch,
timestamp_tz_format: TimestampFormat::MicrosSinceEpoch,
time_format: TimeFormat::NanosSigned,
date_format: DateFormat::String("%Y-%m-%d"),
decimal_format: DecimalFormat::String,
Expand Down Expand Up @@ -296,6 +297,7 @@ pub fn relation_to_arrow_fields(fields: &[Field], delta_lake: bool) -> Vec<Arrow
TimeUnit::Microsecond,
if delta_lake { Some("UTC".into()) } else { None },
),
SqlType::TimestampTz => DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
SqlType::Date => DataType::Date32,
SqlType::Null => DataType::Null,
// Today all supported connectors happen to use string encoding for UUID.
Expand Down
1 change: 1 addition & 0 deletions crates/adapters/src/integrated/delta_table/output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use tracing::{Instrument, info, info_span, warn};
pub const fn delta_arrow_serde_config() -> &'static SqlSerdeConfig {
&SqlSerdeConfig {
timestamp_format: TimestampFormat::MicrosSinceEpoch,
timestamp_tz_format: TimestampFormat::MicrosSinceEpoch,
time_format: TimeFormat::NanosSigned,
date_format: DateFormat::String("%Y-%m-%d"),
decimal_format: DecimalFormat::String,
Expand Down
5 changes: 4 additions & 1 deletion crates/datagen/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1058,6 +1058,7 @@ impl<'a> RecordGenerator<'a> {
SqlType::Char
| SqlType::Varchar
| SqlType::Timestamp
| SqlType::TimestampTz
| SqlType::Date
| SqlType::Variant
| SqlType::Time => Value::String(String::new()),
Expand Down Expand Up @@ -1135,7 +1136,9 @@ impl<'a> RecordGenerator<'a> {
SqlType::Char | SqlType::Varchar => {
self.generate_string(field, settings, incr, rng, obj)
}
SqlType::Timestamp => self.generate_timestamp(field, settings, incr, rng, obj),
SqlType::Timestamp | SqlType::TimestampTz => {
self.generate_timestamp(field, settings, incr, rng, obj)
}
SqlType::Date => self.generate_date(field, settings, incr, rng, obj),
SqlType::Time => self.generate_time(field, settings, incr, rng, obj),
SqlType::Interval(_unit) => {
Expand Down
18 changes: 18 additions & 0 deletions crates/feldera-types/src/program_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,8 @@ pub enum SqlType {
Date,
/// SQL `TIMESTAMP` type.
Timestamp,
/// SQL `TIMESTAMP WITH TIME ZONE` type.
TimestampTz,
/// SQL `INTERVAL ... X` type where `X` is a unit.
Interval(IntervalUnit),
/// SQL `ARRAY` type.
Expand Down Expand Up @@ -509,6 +511,7 @@ impl<'de> Deserialize<'de> for SqlType {
"time" => Ok(SqlType::Time),
"date" => Ok(SqlType::Date),
"timestamp" => Ok(SqlType::Timestamp),
"timestamp_tz" => Ok(SqlType::TimestampTz),
"array" => Ok(SqlType::Array),
"struct" => Ok(SqlType::Struct),
"map" => Ok(SqlType::Map),
Expand Down Expand Up @@ -547,6 +550,7 @@ impl Serialize for SqlType {
SqlType::Time => "TIME",
SqlType::Date => "DATE",
SqlType::Timestamp => "TIMESTAMP",
SqlType::TimestampTz => "TIMESTAMP_TZ",
SqlType::Interval(interval_unit) => match interval_unit {
IntervalUnit::Day => "INTERVAL_DAY",
IntervalUnit::DayToHour => "INTERVAL_DAY_HOUR",
Expand Down Expand Up @@ -907,6 +911,19 @@ impl ColumnType {
}
}

pub fn timestamp_tz(nullable: bool) -> Self {
ColumnType {
typ: SqlType::TimestampTz,
nullable,
precision: None,
scale: None,
component: None,
fields: None,
key: None,
value: None,
}
}

pub fn variant(nullable: bool) -> Self {
ColumnType {
typ: SqlType::Variant,
Expand Down Expand Up @@ -1014,6 +1031,7 @@ mod tests {
("Time", SqlType::Time),
("Date", SqlType::Date),
("Timestamp", SqlType::Timestamp),
("TimestampTz", SqlType::TimestampTz),
("Interval_Day", SqlType::Interval(IntervalUnit::Day)),
(
"Interval_Day_Hour",
Expand Down
10 changes: 10 additions & 0 deletions crates/feldera-types/src/serde_with_context/serde_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ pub struct SqlSerdeConfig {
pub date_format: DateFormat,
/// `TIMESTAMP` format.
pub timestamp_format: TimestampFormat,
/// `TIMESTAMP WITH TIME ZONE` format.
pub timestamp_tz_format: TimestampFormat,
/// `DECIMAL` format.
pub decimal_format: DecimalFormat,
/// `VARIANT` format
Expand Down Expand Up @@ -190,6 +192,7 @@ impl From<JsonFlavor> for SqlSerdeConfig {
time_format: TimeFormat::Millis,
date_format: DateFormat::DaysSinceEpoch,
timestamp_format: TimestampFormat::MillisSinceEpoch,
timestamp_tz_format: TimestampFormat::MillisSinceEpoch,
decimal_format: DecimalFormat::String,
variant_format: VariantFormat::JsonString,
binary_format: BinaryFormat::Array,
Expand All @@ -198,7 +201,9 @@ impl From<JsonFlavor> for SqlSerdeConfig {
JsonFlavor::DebeziumMySql => Self {
time_format: TimeFormat::Micros,
date_format: DateFormat::DaysSinceEpoch,
// Why is this missing fractions of second?
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stray // Why is this missing fractions of second? comment with a question to nobody. Either answer it (Debezium MySQL really does emit second-precision timestamps in this layout) or open an issue and reference it. Don't ship a TODO disguised as a question.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still here from prior review. Either answer the question (does Debezium MySQL really emit second-precision timestamps in this layout? — if so, say so) or open an issue and reference it. Don't ship a TODO disguised as a question.

timestamp_format: TimestampFormat::String("%Y-%m-%dT%H:%M:%S%Z"),
timestamp_tz_format: TimestampFormat::String("%Y-%m-%dT%H:%M:%S%Z"),
decimal_format: DecimalFormat::String,
variant_format: VariantFormat::JsonString,
binary_format: BinaryFormat::Array,
Expand All @@ -208,6 +213,7 @@ impl From<JsonFlavor> for SqlSerdeConfig {
time_format: TimeFormat::Micros,
date_format: DateFormat::DaysSinceEpoch,
timestamp_format: TimestampFormat::MillisSinceEpoch,
timestamp_tz_format: TimestampFormat::MillisSinceEpoch,
decimal_format: DecimalFormat::String,
variant_format: VariantFormat::JsonString,
binary_format: BinaryFormat::Array,
Expand All @@ -217,6 +223,7 @@ impl From<JsonFlavor> for SqlSerdeConfig {
time_format: TimeFormat::String("%H:%M:%S%.f"),
date_format: DateFormat::String("%Y-%m-%d"),
timestamp_format: TimestampFormat::String("%Y-%m-%dT%H:%M:%S%.f%:z"),
timestamp_tz_format: TimestampFormat::String("%Y-%m-%dT%H:%M:%S%.f%:z"),
decimal_format: DecimalFormat::String,
variant_format: VariantFormat::JsonString,
binary_format: BinaryFormat::Array,
Expand All @@ -226,6 +233,7 @@ impl From<JsonFlavor> for SqlSerdeConfig {
time_format: TimeFormat::String("%H:%M:%S%.f"),
date_format: DateFormat::String("%Y-%m-%d"),
timestamp_format: TimestampFormat::MillisSinceEpoch,
timestamp_tz_format: TimestampFormat::MillisSinceEpoch,
decimal_format: DecimalFormat::String,
variant_format: VariantFormat::JsonString,
binary_format: BinaryFormat::Array,
Expand All @@ -239,6 +247,7 @@ impl From<JsonFlavor> for SqlSerdeConfig {
time_format: TimeFormat::Nanos,
date_format: DateFormat::String("%Y-%m-%d"),
timestamp_format: TimestampFormat::String("%Y-%m-%d %H:%M:%S.%f %:z"), // 2023-11-04 15:33:47.123 +00:00
timestamp_tz_format: TimestampFormat::String("%Y-%m-%d %H:%M:%S.%f %:z"), // 2023-11-04 15:33:47.123 +00:00
decimal_format: DecimalFormat::String,
variant_format: VariantFormat::JsonString,
binary_format: BinaryFormat::Base64,
Expand All @@ -251,6 +260,7 @@ impl From<JsonFlavor> for SqlSerdeConfig {
time_format: TimeFormat::default(), // H-M-S
date_format: DateFormat::default(), // Y-m-d
timestamp_format: TimestampFormat::default(),
timestamp_tz_format: TimestampFormat::default(),
decimal_format: DecimalFormat::String,
variant_format: VariantFormat::Json,
// We need [`BinaryFormat::PgHex`] only because we serialize
Expand Down
7 changes: 6 additions & 1 deletion crates/pipeline-manager/src/compiler/sql_compiler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -980,6 +980,7 @@ mod test {
val_varbinary VARBINARY,
val_time TIME,
val_timestamp TIMESTAMP,
val_timestampTz TIMESTAMP WITH TIME ZONE,
val_date DATE,
val_row ROW(l INT NULL, r VARCHAR),
val_array INT ARRAY,
Expand Down Expand Up @@ -1008,7 +1009,7 @@ mod test {
for relation in [table, view] {
assert!(!relation.materialized);
assert!(relation.properties.is_empty());
assert_eq!(relation.fields.len(), 22);
assert_eq!(relation.fields.len(), 23);

// BOOLEAN, TINYINT, SMALLINT, INTEGER, BIGINT, UUID
assert_eq!(
Expand Down Expand Up @@ -1096,6 +1097,10 @@ mod test {
relation.field("val_timestamp").unwrap().columntype.typ,
SqlType::Timestamp
);
assert_eq!(
relation.field("val_timestampTz").unwrap().columntype.typ,
SqlType::TimestampTz
);
assert_eq!(
relation.field("val_date").unwrap().columntype.typ,
SqlType::Date
Expand Down
Loading
Loading