Skip to content

Commit 1c25afd

Browse files
committed
[SQL][SQLLIB] Use microsecond resolution for TIMESTAMP
Signed-off-by: Mihai Budiu <mbudiu@feldera.com>
1 parent db28cfe commit 1c25afd

File tree

66 files changed

+2103
-2069
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

66 files changed

+2103
-2069
lines changed

crates/adapters/src/format/avro/serializer.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -942,8 +942,8 @@ mod test {
942942
field: 1,
943943
field_0: Some("foo".to_string()),
944944
field_1: false,
945-
field_2: Timestamp::new(1713597703),
946-
field_3: Date::new(19833),
945+
field_2: Timestamp::from_milliseconds(1713597703),
946+
field_3: Date::from_days(19833),
947947
field_5: Some(EmbeddedStruct { field: true }),
948948
field_6: Some(BTreeMap::from([
949949
("foo".to_string(), 1),

crates/adapters/src/format/json/input.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1197,7 +1197,7 @@ mod test {
11971197
metadata.insert("kafka_topic", Variant::String(SqlString::from("my_topic")));
11981198
metadata.insert(
11991199
"kafka_timestamp",
1200-
Variant::Timestamp(Timestamp::new(1763626606441)),
1200+
Variant::Timestamp(Timestamp::from_milliseconds(1763626606441)),
12011201
);
12021202
metadata.insert("kafka_partition", Variant::Int(10));
12031203
metadata.insert("kafka_offset", Variant::Int(1_000_000));
@@ -1216,7 +1216,7 @@ mod test {
12161216
0,
12171217
Variant::Map(Arc::new(BTreeMap::new())),
12181218
SqlString::from("my_topic"),
1219-
Timestamp::new(1763626606441),
1219+
Timestamp::from_milliseconds(1763626606441),
12201220
10,
12211221
1_000_000,
12221222
),

crates/adapters/src/test/data.rs

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -420,9 +420,9 @@ impl Arbitrary for TestStruct2 {
420420
field: f,
421421
field_0: if f1 { Some(f0) } else { None },
422422
field_1: f1,
423-
field_2: Timestamp::new(f2 as i64 * 1_000),
424-
field_3: Date::new(f3 as i32),
425-
// field_4: Time::new(f4 * 1000),
423+
field_2: Timestamp::from_milliseconds(f2 as i64 * 1_000),
424+
field_3: Date::from_days(f3 as i32),
425+
// field_4: Time::from_nanoseconds(f4 * 1000),
426426
field_5: Some(f5),
427427
field_6: Some(f6),
428428
field_7: SqlDecimal::<10, 3>::new(f7_num, f7_scale as i32).unwrap(),
@@ -439,9 +439,9 @@ impl TestStruct2 {
439439
field: 1,
440440
field_0: Some("test".to_string()),
441441
field_1: false,
442-
field_2: Timestamp::new(1000),
443-
field_3: Date::new(1),
444-
// field_4: Time::new(1),
442+
field_2: Timestamp::from_milliseconds(1000),
443+
field_3: Date::from_days(1),
444+
// field_4: Time::from_nanoseconds(1),
445445
field_5: Some(EmbeddedStruct { field: false }),
446446
field_6: Some(BTreeMap::from([
447447
("foo".to_string(), 100),
@@ -453,9 +453,9 @@ impl TestStruct2 {
453453
field: 2,
454454
field_0: None,
455455
field_1: true,
456-
field_2: Timestamp::new(2000),
457-
field_3: Date::new(12),
458-
// field_4: Time::new(1_000_000_000),
456+
field_2: Timestamp::from_milliseconds(2000),
457+
field_3: Date::from_days(12),
458+
// field_4: Time::from_nanoseconds(1_000_000_000),
459459
field_5: Some(EmbeddedStruct { field: true }),
460460
field_6: Some(BTreeMap::new()),
461461
field_7: SqlDecimal::new(1, 3).unwrap(),
@@ -832,9 +832,9 @@ impl Arbitrary for IcebergTestStruct {
832832
r: F32::new(r),
833833
d: F64::new(d),
834834
dec: SqlDecimal::<10, 3>::new(dec_num, dec_scale as i32).unwrap(),
835-
dt: Date::new(dt),
836-
tm: Time::new(tm),
837-
ts: Timestamp::new(ts),
835+
dt: Date::from_days(dt),
836+
tm: Time::from_nanoseconds(tm),
837+
ts: Timestamp::from_milliseconds(ts),
838838
s: s.to_string(),
839839
// uuid: ByteArray::from_vec(uuid),
840840
fixed: ByteArray::from_vec(fixed),
@@ -1053,15 +1053,15 @@ impl Arbitrary for DeltaTestStruct {
10531053
bigint,
10541054
binary: ByteArray::from_vec(binary),
10551055
boolean,
1056-
date: Date::new(date),
1056+
date: Date::from_days(date),
10571057
decimal_10_3: SqlDecimal::<10, 3>::new(decimal_digits, 3).unwrap(),
10581058
double: F64::new(double.trunc()), // truncate to avoid rounding errors when serializing floats to/from JSON
10591059
float: F32::new(float.trunc()),
10601060
int,
10611061
smallint,
10621062
string: string.to_string(),
10631063
unused: Some(unused.to_string()),
1064-
timestamp_ntz: Timestamp::new(timestamp_ntz * 1000),
1064+
timestamp_ntz: Timestamp::from_milliseconds(timestamp_ntz * 1000),
10651065
tinyint,
10661066
string_array,
10671067
struct1,

crates/adapters/src/test/datagen.rs

Lines changed: 37 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1130,15 +1130,15 @@ fn test_time_types() {
11301130
let mut iter = zst.flushed.iter();
11311131
let first = iter.next().unwrap();
11321132
let record = first.unwrap_insert();
1133-
assert_eq!(record.field, Timestamp::new(0));
1134-
assert_eq!(record.field_1, Date::new(0));
1135-
assert_eq!(record.field_2, Time::new(0));
1133+
assert_eq!(record.field, Timestamp::from_milliseconds(0));
1134+
assert_eq!(record.field_1, Date::from_days(0));
1135+
assert_eq!(record.field_2, Time::from_nanoseconds(0));
11361136

11371137
let second = iter.next().unwrap();
11381138
let record = second.unwrap_insert();
1139-
assert_eq!(record.field, Timestamp::new(1));
1140-
assert_eq!(record.field_1, Date::new(1));
1141-
assert_eq!(record.field_2, Time::new(1000000));
1139+
assert_eq!(record.field, Timestamp::from_milliseconds(1));
1140+
assert_eq!(record.field_1, Date::from_days(1));
1141+
assert_eq!(record.field_2, Time::from_nanoseconds(1000000));
11421142
}
11431143

11441144
#[test]
@@ -1186,21 +1186,24 @@ fn test_time_types_with_integer_range() {
11861186
let mut iter = zst.flushed.iter();
11871187
let first = iter.next().unwrap();
11881188
let record = first.unwrap_insert();
1189-
assert_eq!(record.field, Timestamp::new(1724803200000));
1190-
assert_eq!(record.field_1, Date::new(19963));
1191-
assert_eq!(record.field_2, Time::new(5000000));
1189+
assert_eq!(record.field, Timestamp::from_milliseconds(1724803200000));
1190+
assert_eq!(record.field_1, Date::from_days(19963));
1191+
assert_eq!(record.field_2, Time::from_nanoseconds(5000000));
11921192

11931193
let second = iter.next().unwrap();
11941194
let record = second.unwrap_insert();
1195-
assert_eq!(record.field, Timestamp::new(1724803200000 + 1));
1196-
assert_eq!(record.field_1, Date::new(19963 + 1));
1197-
assert_eq!(record.field_2, Time::new(5000000 + 1000000));
1195+
assert_eq!(
1196+
record.field,
1197+
Timestamp::from_milliseconds(1724803200000 + 1)
1198+
);
1199+
assert_eq!(record.field_1, Date::from_days(19963 + 1));
1200+
assert_eq!(record.field_2, Time::from_nanoseconds(5000000 + 1000000));
11981201

11991202
let second = iter.next().unwrap();
12001203
let record = second.unwrap_insert();
1201-
assert_eq!(record.field, Timestamp::new(1724803200000));
1202-
assert_eq!(record.field_1, Date::new(19963));
1203-
assert_eq!(record.field_2, Time::new(5000000));
1204+
assert_eq!(record.field, Timestamp::from_milliseconds(1724803200000));
1205+
assert_eq!(record.field_1, Date::from_days(19963));
1206+
assert_eq!(record.field_2, Time::from_nanoseconds(5000000));
12041207
}
12051208

12061209
#[test]
@@ -1252,10 +1255,10 @@ fn test_uniform_dates_times_timestamps() {
12521255
let record = record.unwrap_insert();
12531256
assert!(record.field >= Timestamp::from_dateTime("2024-10-11T11:04:00Z".parse().unwrap()));
12541257
assert!(record.field < Timestamp::from_dateTime("2024-10-11T11:05:02Z".parse().unwrap()));
1255-
assert!(record.field_1 >= Date::new(19963));
1256-
assert!(record.field_1 < Date::new(19965));
1257-
assert!(record.field_2 >= Time::new(5000000));
1258-
assert!(record.field_2 < Time::new(7000000));
1258+
assert!(record.field_1 >= Date::from_days(19963));
1259+
assert!(record.field_1 < Date::from_days(19965));
1260+
assert!(record.field_2 >= Time::from_nanoseconds(5000000));
1261+
assert!(record.field_2 < Time::from_nanoseconds(7000000));
12591262
}
12601263
}
12611264

@@ -1402,21 +1405,24 @@ fn test_time_types_with_string_range() {
14021405
let mut iter = zst.flushed.iter();
14031406
let first = iter.next().unwrap();
14041407
let record = first.unwrap_insert();
1405-
assert_eq!(record.field, Timestamp::new(1724803200000));
1406-
assert_eq!(record.field_1, Date::new(19963));
1407-
assert_eq!(record.field_2, Time::new(5000000000));
1408+
assert_eq!(record.field, Timestamp::from_milliseconds(1724803200000));
1409+
assert_eq!(record.field_1, Date::from_days(19963));
1410+
assert_eq!(record.field_2, Time::from_nanoseconds(5000000000));
14081411

14091412
let second = iter.next().unwrap();
14101413
let record = second.unwrap_insert();
1411-
assert_eq!(record.field, Timestamp::new(1724803200000 + 1000));
1412-
assert_eq!(record.field_1, Date::new(19963 + 1));
1413-
assert_eq!(record.field_2, Time::new(6000000000));
1414+
assert_eq!(
1415+
record.field,
1416+
Timestamp::from_milliseconds(1724803200000 + 1000)
1417+
);
1418+
assert_eq!(record.field_1, Date::from_days(19963 + 1));
1419+
assert_eq!(record.field_2, Time::from_nanoseconds(6000000000));
14141420

14151421
let second = iter.next().unwrap();
14161422
let record = second.unwrap_insert();
1417-
assert_eq!(record.field, Timestamp::new(1724803200000));
1418-
assert_eq!(record.field_1, Date::new(19963));
1419-
assert_eq!(record.field_2, Time::new(5000000000));
1423+
assert_eq!(record.field, Timestamp::from_milliseconds(1724803200000));
1424+
assert_eq!(record.field_1, Date::from_days(19963));
1425+
assert_eq!(record.field_2, Time::from_nanoseconds(5000000000));
14201426
}
14211427

14221428
/// Field T not found, "t" is case sensitive.
@@ -1486,9 +1492,9 @@ fn test_case_insensitivity() {
14861492
let mut iter = zst.flushed.iter();
14871493
let first = iter.next().unwrap();
14881494
let record = first.unwrap_insert();
1489-
assert_eq!(record.field, Timestamp::new(0));
1490-
assert_eq!(record.field_1, Date::new(1));
1491-
assert_eq!(record.field_2, Time::new(1_000_000_000));
1495+
assert_eq!(record.field, Timestamp::from_milliseconds(0));
1496+
assert_eq!(record.field_1, Date::from_days(1));
1497+
assert_eq!(record.field_2, Time::from_nanoseconds(1_000_000_000));
14921498
}
14931499

14941500
#[test]

crates/adapters/src/transport/kafka/ft/input.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -988,7 +988,7 @@ impl PartitionReceiver {
988988
if let Some(timestamp) = timestamp {
989989
metadata.insert(
990990
"kafka_timestamp",
991-
Variant::Timestamp(Timestamp::from(timestamp)),
991+
Variant::Timestamp(Timestamp::from_milliseconds(timestamp)),
992992
);
993993
}
994994
}

crates/adapters/src/transport/kafka/ft/test.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2427,23 +2427,23 @@ fn test_kafka_metadata_json() {
24272427
),
24282428
]))),
24292429
SqlString::from(topic),
2430-
feldera_sqllib::Timestamp::new(0),
2430+
feldera_sqllib::Timestamp::from_microseconds(0),
24312431
0,
24322432
0,
24332433
),
24342434
TestStructMetadata::new(
24352435
1,
24362436
Variant::Map(Arc::new(BTreeMap::new())),
24372437
SqlString::from(topic),
2438-
feldera_sqllib::Timestamp::new(0),
2438+
feldera_sqllib::Timestamp::from_microseconds(0),
24392439
0,
24402440
1,
24412441
),
24422442
];
24432443

24442444
let mut received = wait_for_output_count(&zset, 2, flush);
2445-
received[0].kafka_timestamp = feldera_sqllib::Timestamp::new(0);
2446-
received[1].kafka_timestamp = feldera_sqllib::Timestamp::new(0);
2445+
received[0].kafka_timestamp = feldera_sqllib::Timestamp::from_microseconds(0);
2446+
received[1].kafka_timestamp = feldera_sqllib::Timestamp::from_microseconds(0);
24472447
assert_eq!(received, expected);
24482448
}
24492449

@@ -2607,22 +2607,22 @@ fn test_kafka_metadata_raw() {
26072607
),
26082608
]))),
26092609
SqlString::from(topic),
2610-
feldera_sqllib::Timestamp::new(0),
2610+
feldera_sqllib::Timestamp::from_microseconds(0),
26112611
0,
26122612
0,
26132613
),
26142614
TestRawStructMetadata::new(
26152615
SqlString::from("bar"),
26162616
Variant::Map(Arc::new(BTreeMap::new())),
26172617
SqlString::from(topic),
2618-
feldera_sqllib::Timestamp::new(0),
2618+
feldera_sqllib::Timestamp::from_microseconds(0),
26192619
0,
26202620
1,
26212621
),
26222622
];
26232623

26242624
let mut received = wait_for_output_count(&zset, 2, flush);
2625-
received[0].kafka_timestamp = feldera_sqllib::Timestamp::new(0);
2626-
received[1].kafka_timestamp = feldera_sqllib::Timestamp::new(0);
2625+
received[0].kafka_timestamp = feldera_sqllib::Timestamp::from_microseconds(0);
2626+
received[1].kafka_timestamp = feldera_sqllib::Timestamp::from_microseconds(0);
26272627
assert_eq!(received, expected);
26282628
}

crates/adapters/src/transport/redis/test.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,15 +31,15 @@ fn test_redis_output() {
3131
bigint: 1,
3232
binary: ByteArray::new(&[0, 1, 2]),
3333
boolean: false,
34-
date: Date::new(1),
34+
date: Date::from_days(1),
3535
decimal_10_3: SqlDecimal::new(123i128, 2).unwrap(),
3636
double: F64::from_str("1.123").unwrap(),
3737
float: F32::from_str("1.123").unwrap(),
3838
int: 1,
3939
smallint: 2,
4040
string: "test".to_owned(),
4141
unused: None,
42-
timestamp_ntz: Timestamp::new(1),
42+
timestamp_ntz: Timestamp::from_milliseconds(1),
4343
tinyint: 1,
4444
string_array: vec!["a".to_owned(), "b".to_owned()],
4545
struct1: TestStruct {

crates/dbsp/src/storage/file/format.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,11 +92,12 @@ use size_of::SizeOf;
9292
/// - v2: TODO.
9393
/// - v3: Bloom filter format change.
9494
/// - v4: Tup None optimizations.
95+
/// - v5: Change in representation for Timestamp, ShortInterval
9596
///
9697
/// When a new version is created, make sure to generate new golden
9798
/// files for it in crate `storage-test-compat` to check for
9899
/// backwards compatibility.
99-
pub const VERSION_NUMBER: u32 = 4;
100+
pub const VERSION_NUMBER: u32 = 5;
100101

101102
/// Magic number for data blocks.
102103
pub const DATA_BLOCK_MAGIC: [u8; 4] = *b"LFDB";

0 commit comments

Comments
 (0)