Skip to content

Commit add429a

Browse files
committed
[SQL] Support for TIMESTAMP WITH TIME ZONE
Signed-off-by: Mihai Budiu <mbudiu@feldera.com>
1 parent 2df0115 commit add429a

38 files changed

Lines changed: 1753 additions & 101 deletions

File tree

crates/adapters/src/adhoc/table.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ use uuid::Uuid;
4646
pub const fn input_adhoc_arrow_serde_config() -> &'static SqlSerdeConfig {
4747
&SqlSerdeConfig {
4848
timestamp_format: TimestampFormat::String("%FT%T%.f"),
49+
timestamp_tz_format: TimestampFormat::String("%FT%T%.f%Z"),
4950
time_format: TimeFormat::String("%T"),
5051
date_format: DateFormat::String("%Y-%m-%d"),
5152
decimal_format: DecimalFormat::String,
@@ -59,6 +60,7 @@ pub const fn input_adhoc_arrow_serde_config() -> &'static SqlSerdeConfig {
5960
pub const fn output_adhoc_arrow_serde_config() -> &'static SqlSerdeConfig {
6061
&SqlSerdeConfig {
6162
timestamp_format: TimestampFormat::MicrosSinceEpoch,
63+
timestamp_tz_format: TimestampFormat::MicrosSinceEpoch,
6264
time_format: TimeFormat::NanosSigned,
6365
date_format: DateFormat::String("%Y-%m-%d"),
6466
decimal_format: DecimalFormat::String,

crates/adapters/src/format/avro/input.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ use super::{schema::schema_unwrap_optional, schema_registry_settings};
4646
pub const fn avro_de_config() -> &'static SqlSerdeConfig {
4747
&SqlSerdeConfig {
4848
timestamp_format: TimestampFormat::MicrosSinceEpoch,
49+
timestamp_tz_format: TimestampFormat::MicrosSinceEpoch,
4950
time_format: TimeFormat::Micros,
5051
date_format: DateFormat::DaysSinceEpoch,
5152
decimal_format: DecimalFormat::String,

crates/adapters/src/format/avro/schema.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -358,7 +358,7 @@ pub fn validate_field_schema(
358358
SqlType::Date => {
359359
return validate_date_schema(avro_schema);
360360
}
361-
SqlType::Timestamp => {
361+
SqlType::Timestamp | SqlType::TimestampTz => {
362362
return validate_timestamp_schema(avro_schema);
363363
}
364364
SqlType::Interval(_) => {
@@ -585,7 +585,7 @@ impl AvroSchemaBuilder {
585585
SqlType::Varbinary => AvroSchema::Bytes,
586586
SqlType::Time => AvroSchema::TimeMicros,
587587
SqlType::Date => AvroSchema::Date,
588-
SqlType::Timestamp => AvroSchema::TimestampMicros,
588+
SqlType::Timestamp | SqlType::TimestampTz => AvroSchema::TimestampMicros,
589589
SqlType::Interval(_) => {
590590
return Err("not implemented: Avro encoding for the SQL interval type".to_string());
591591
}

crates/adapters/src/format/json/schema.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,7 @@ mod kafka_connect_json_converter {
244244
SqlType::Real => RepresentationType::Float,
245245
SqlType::Double => RepresentationType::Double,
246246
SqlType::Time => RepresentationType::Int64,
247-
SqlType::Timestamp => RepresentationType::Int64,
247+
SqlType::Timestamp | SqlType::TimestampTz => RepresentationType::Int64,
248248
SqlType::Date => RepresentationType::Int32,
249249
SqlType::Binary | SqlType::Varbinary => RepresentationType::Bytes,
250250
SqlType::Array => RepresentationType::Array {

crates/adapters/src/format/parquet.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ pub mod test;
4747
pub const fn default_arrow_serde_config() -> &'static SqlSerdeConfig {
4848
&SqlSerdeConfig {
4949
timestamp_format: TimestampFormat::MicrosSinceEpoch,
50+
timestamp_tz_format: TimestampFormat::MicrosSinceEpoch,
5051
time_format: TimeFormat::NanosSigned,
5152
date_format: DateFormat::String("%Y-%m-%d"),
5253
decimal_format: DecimalFormat::String,
@@ -296,6 +297,7 @@ pub fn relation_to_arrow_fields(fields: &[Field], delta_lake: bool) -> Vec<Arrow
296297
TimeUnit::Microsecond,
297298
if delta_lake { Some("UTC".into()) } else { None },
298299
),
300+
SqlType::TimestampTz => DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
299301
SqlType::Date => DataType::Date32,
300302
SqlType::Null => DataType::Null,
301303
// Today all supported connectors happen to use string encoding for UUID.

crates/adapters/src/integrated/delta_table/output.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ use tracing::{Instrument, info, info_span, warn};
4444
pub const fn delta_arrow_serde_config() -> &'static SqlSerdeConfig {
4545
&SqlSerdeConfig {
4646
timestamp_format: TimestampFormat::MicrosSinceEpoch,
47+
timestamp_tz_format: TimestampFormat::MicrosSinceEpoch,
4748
time_format: TimeFormat::NanosSigned,
4849
date_format: DateFormat::String("%Y-%m-%d"),
4950
decimal_format: DecimalFormat::String,

crates/datagen/src/lib.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1058,6 +1058,7 @@ impl<'a> RecordGenerator<'a> {
10581058
SqlType::Char
10591059
| SqlType::Varchar
10601060
| SqlType::Timestamp
1061+
| SqlType::TimestampTz
10611062
| SqlType::Date
10621063
| SqlType::Variant
10631064
| SqlType::Time => Value::String(String::new()),
@@ -1135,7 +1136,9 @@ impl<'a> RecordGenerator<'a> {
11351136
SqlType::Char | SqlType::Varchar => {
11361137
self.generate_string(field, settings, incr, rng, obj)
11371138
}
1138-
SqlType::Timestamp => self.generate_timestamp(field, settings, incr, rng, obj),
1139+
SqlType::Timestamp | SqlType::TimestampTz => {
1140+
self.generate_timestamp(field, settings, incr, rng, obj)
1141+
}
11391142
SqlType::Date => self.generate_date(field, settings, incr, rng, obj),
11401143
SqlType::Time => self.generate_time(field, settings, incr, rng, obj),
11411144
SqlType::Interval(_unit) => {

crates/feldera-types/src/program_schema.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -447,6 +447,8 @@ pub enum SqlType {
447447
Date,
448448
/// SQL `TIMESTAMP` type.
449449
Timestamp,
450+
/// SQL `TIMESTAMP WITH TIME ZONE` type.
451+
TimestampTz,
450452
/// SQL `INTERVAL ... X` type where `X` is a unit.
451453
Interval(IntervalUnit),
452454
/// SQL `ARRAY` type.
@@ -509,6 +511,7 @@ impl<'de> Deserialize<'de> for SqlType {
509511
"time" => Ok(SqlType::Time),
510512
"date" => Ok(SqlType::Date),
511513
"timestamp" => Ok(SqlType::Timestamp),
514+
"timestamp_tz" => Ok(SqlType::TimestampTz),
512515
"array" => Ok(SqlType::Array),
513516
"struct" => Ok(SqlType::Struct),
514517
"map" => Ok(SqlType::Map),
@@ -547,6 +550,7 @@ impl Serialize for SqlType {
547550
SqlType::Time => "TIME",
548551
SqlType::Date => "DATE",
549552
SqlType::Timestamp => "TIMESTAMP",
553+
SqlType::TimestampTz => "TIMESTAMP_TZ",
550554
SqlType::Interval(interval_unit) => match interval_unit {
551555
IntervalUnit::Day => "INTERVAL_DAY",
552556
IntervalUnit::DayToHour => "INTERVAL_DAY_HOUR",
@@ -907,6 +911,19 @@ impl ColumnType {
907911
}
908912
}
909913

914+
pub fn timestamp_tz(nullable: bool) -> Self {
915+
ColumnType {
916+
typ: SqlType::TimestampTz,
917+
nullable,
918+
precision: None,
919+
scale: None,
920+
component: None,
921+
fields: None,
922+
key: None,
923+
value: None,
924+
}
925+
}
926+
910927
pub fn variant(nullable: bool) -> Self {
911928
ColumnType {
912929
typ: SqlType::Variant,
@@ -1014,6 +1031,7 @@ mod tests {
10141031
("Time", SqlType::Time),
10151032
("Date", SqlType::Date),
10161033
("Timestamp", SqlType::Timestamp),
1034+
("TimestampTz", SqlType::TimestampTz),
10171035
("Interval_Day", SqlType::Interval(IntervalUnit::Day)),
10181036
(
10191037
"Interval_Day_Hour",

crates/feldera-types/src/serde_with_context/serde_config.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,8 @@ pub struct SqlSerdeConfig {
127127
pub date_format: DateFormat,
128128
/// `TIMESTAMP` format.
129129
pub timestamp_format: TimestampFormat,
130+
/// `TIMESTAMP WITH TIME ZONE` format.
131+
pub timestamp_tz_format: TimestampFormat,
130132
/// `DECIMAL` format.
131133
pub decimal_format: DecimalFormat,
132134
/// `VARIANT` format
@@ -190,6 +192,7 @@ impl From<JsonFlavor> for SqlSerdeConfig {
190192
time_format: TimeFormat::Millis,
191193
date_format: DateFormat::DaysSinceEpoch,
192194
timestamp_format: TimestampFormat::MillisSinceEpoch,
195+
timestamp_tz_format: TimestampFormat::MillisSinceEpoch,
193196
decimal_format: DecimalFormat::String,
194197
variant_format: VariantFormat::JsonString,
195198
binary_format: BinaryFormat::Array,
@@ -198,7 +201,9 @@ impl From<JsonFlavor> for SqlSerdeConfig {
198201
JsonFlavor::DebeziumMySql => Self {
199202
time_format: TimeFormat::Micros,
200203
date_format: DateFormat::DaysSinceEpoch,
204+
// Why is this missing fractions of second?
201205
timestamp_format: TimestampFormat::String("%Y-%m-%dT%H:%M:%S%Z"),
206+
timestamp_tz_format: TimestampFormat::String("%Y-%m-%dT%H:%M:%S%Z"),
202207
decimal_format: DecimalFormat::String,
203208
variant_format: VariantFormat::JsonString,
204209
binary_format: BinaryFormat::Array,
@@ -208,6 +213,7 @@ impl From<JsonFlavor> for SqlSerdeConfig {
208213
time_format: TimeFormat::Micros,
209214
date_format: DateFormat::DaysSinceEpoch,
210215
timestamp_format: TimestampFormat::MillisSinceEpoch,
216+
timestamp_tz_format: TimestampFormat::MillisSinceEpoch,
211217
decimal_format: DecimalFormat::String,
212218
variant_format: VariantFormat::JsonString,
213219
binary_format: BinaryFormat::Array,
@@ -217,6 +223,7 @@ impl From<JsonFlavor> for SqlSerdeConfig {
217223
time_format: TimeFormat::String("%H:%M:%S%.f"),
218224
date_format: DateFormat::String("%Y-%m-%d"),
219225
timestamp_format: TimestampFormat::String("%Y-%m-%dT%H:%M:%S%.f%:z"),
226+
timestamp_tz_format: TimestampFormat::String("%Y-%m-%dT%H:%M:%S%.f%:z"),
220227
decimal_format: DecimalFormat::String,
221228
variant_format: VariantFormat::JsonString,
222229
binary_format: BinaryFormat::Array,
@@ -226,6 +233,7 @@ impl From<JsonFlavor> for SqlSerdeConfig {
226233
time_format: TimeFormat::String("%H:%M:%S%.f"),
227234
date_format: DateFormat::String("%Y-%m-%d"),
228235
timestamp_format: TimestampFormat::MillisSinceEpoch,
236+
timestamp_tz_format: TimestampFormat::MillisSinceEpoch,
229237
decimal_format: DecimalFormat::String,
230238
variant_format: VariantFormat::JsonString,
231239
binary_format: BinaryFormat::Array,
@@ -239,6 +247,7 @@ impl From<JsonFlavor> for SqlSerdeConfig {
239247
time_format: TimeFormat::Nanos,
240248
date_format: DateFormat::String("%Y-%m-%d"),
241249
timestamp_format: TimestampFormat::String("%Y-%m-%d %H:%M:%S.%f %:z"), // 2023-11-04 15:33:47.123 +00:00
250+
timestamp_tz_format: TimestampFormat::String("%Y-%m-%d %H:%M:%S.%f %:z"), // 2023-11-04 15:33:47.123 +00:00
242251
decimal_format: DecimalFormat::String,
243252
variant_format: VariantFormat::JsonString,
244253
binary_format: BinaryFormat::Base64,
@@ -251,6 +260,7 @@ impl From<JsonFlavor> for SqlSerdeConfig {
251260
time_format: TimeFormat::default(), // H-M-S
252261
date_format: DateFormat::default(), // Y-m-d
253262
timestamp_format: TimestampFormat::default(),
263+
timestamp_tz_format: TimestampFormat::default(),
254264
decimal_format: DecimalFormat::String,
255265
variant_format: VariantFormat::Json,
256266
// We need [`BinaryFormat::PgHex`] only because we serialize

crates/pipeline-manager/src/compiler/sql_compiler.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -980,6 +980,7 @@ mod test {
980980
val_varbinary VARBINARY,
981981
val_time TIME,
982982
val_timestamp TIMESTAMP,
983+
val_timestampTz TIMESTAMP WITH TIME ZONE,
983984
val_date DATE,
984985
val_row ROW(l INT NULL, r VARCHAR),
985986
val_array INT ARRAY,
@@ -1008,7 +1009,7 @@ mod test {
10081009
for relation in [table, view] {
10091010
assert!(!relation.materialized);
10101011
assert!(relation.properties.is_empty());
1011-
assert_eq!(relation.fields.len(), 22);
1012+
assert_eq!(relation.fields.len(), 23);
10121013

10131014
// BOOLEAN, TINYINT, SMALLINT, INTEGER, BIGINT, UUID
10141015
assert_eq!(
@@ -1096,6 +1097,10 @@ mod test {
10961097
relation.field("val_timestamp").unwrap().columntype.typ,
10971098
SqlType::Timestamp
10981099
);
1100+
assert_eq!(
1101+
relation.field("val_timestampTz").unwrap().columntype.typ,
1102+
SqlType::TimestampTz
1103+
);
10991104
assert_eq!(
11001105
relation.field("val_date").unwrap().columntype.typ,
11011106
SqlType::Date

0 commit comments

Comments
 (0)