Skip to content

Commit 2a53800

Browse files
authored
Add created_timestamp_column to DataSource. Rename timestamp_column -> event_timestamp_column (feast-dev#1048)
* timestamp_column -> event_timestamp_column; new created_timestamp_column Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com> * e2e tests Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com> * lint Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
1 parent ccafcca commit 2a53800

22 files changed

Lines changed: 174 additions & 148 deletions

File tree

common-test/src/main/java/feast/common/it/DataGenerator.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,7 @@ public static DataSource createFileDataSourceSpec(
243243
.setType(DataSource.SourceType.BATCH_FILE)
244244
.setFileOptions(
245245
FileOptions.newBuilder().setFileFormat(fileFormat).setFileUrl(fileURL).build())
246-
.setTimestampColumn(timestampColumn)
246+
.setEventTimestampColumn(timestampColumn)
247247
.setDatePartitionColumn(datePartitionColumn)
248248
.build();
249249
}
@@ -253,7 +253,7 @@ public static DataSource createBigQueryDataSourceSpec(
253253
return DataSource.newBuilder()
254254
.setType(DataSource.SourceType.BATCH_BIGQUERY)
255255
.setBigqueryOptions(BigQueryOptions.newBuilder().setTableRef(bigQueryTableRef).build())
256-
.setTimestampColumn(timestampColumn)
256+
.setEventTimestampColumn(timestampColumn)
257257
.setDatePartitionColumn(datePartitionColumn)
258258
.build();
259259
}
@@ -268,7 +268,7 @@ public static DataSource createKafkaDataSourceSpec(
268268
.setBootstrapServers(servers)
269269
.setClassPath(classPath)
270270
.build())
271-
.setTimestampColumn(timestampColumn)
271+
.setEventTimestampColumn(timestampColumn)
272272
.build();
273273
}
274274
}

core/src/main/java/feast/core/model/DataSource.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,10 @@ public class DataSource {
6363
private String fieldMapJSON;
6464

6565
@Column(name = "timestamp_column")
66-
private String timestampColumn;
66+
private String eventTimestampColumn;
67+
68+
@Column(name = "created_timestamp_column")
69+
private String createdTimestampColumn;
6770

6871
@Column(name = "date_partition_column")
6972
private String datePartitionColumn;
@@ -115,7 +118,8 @@ public static DataSource fromProto(DataSourceProto.DataSource spec) {
115118
source.setFieldMapJSON(TypeConversion.convertMapToJsonString(spec.getFieldMappingMap()));
116119

117120
// Set timestamp mapping columns
118-
source.setTimestampColumn(spec.getTimestampColumn());
121+
source.setEventTimestampColumn(spec.getEventTimestampColumn());
122+
source.setCreatedTimestampColumn(spec.getCreatedTimestampColumn());
119123
source.setDatePartitionColumn(spec.getDatePartitionColumn());
120124

121125
return source;
@@ -163,7 +167,8 @@ public DataSourceProto.DataSource toProto() {
163167
// Parse field mapping and options from JSON
164168
spec.putAllFieldMapping(TypeConversion.convertJsonStringToMap(getFieldMapJSON()));
165169

166-
spec.setTimestampColumn(getTimestampColumn());
170+
spec.setEventTimestampColumn(getEventTimestampColumn());
171+
spec.setCreatedTimestampColumn(getCreatedTimestampColumn());
167172
spec.setDatePartitionColumn(getDatePartitionColumn());
168173

169174
return spec.build();
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
ALTER TABLE data_sources ADD COLUMN created_timestamp_column character varying(255);

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ require (
2525
golang.org/x/lint v0.0.0-20200302205851-738671d3881b // indirect
2626
golang.org/x/net v0.0.0-20200822124328-c89045814202
2727
golang.org/x/sys v0.0.0-20200515095857-1151b9dac4a9 // indirect
28-
golang.org/x/tools v0.0.0-20201001230009-b5b87423c93b // indirect
28+
golang.org/x/tools v0.0.0-20201013053347-2db1cd791039 // indirect
2929
google.golang.org/grpc v1.29.1
3030
google.golang.org/protobuf v1.25.0 // indirect
3131
gopkg.in/russross/blackfriday.v2 v2.0.0 // indirect

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -490,6 +490,8 @@ golang.org/x/tools v0.0.0-20200929223013-bf155c11ec6f h1:7+Nz9MyPqt2qMCTvNiRy1G0
490490
golang.org/x/tools v0.0.0-20200929223013-bf155c11ec6f/go.mod h1:z6u4i615ZeAfBE4XtMziQW1fSVJXACjjbWkB/mvPzlU=
491491
golang.org/x/tools v0.0.0-20201001230009-b5b87423c93b h1:07IVqnnzaip3TGyl/cy32V5YP3FguWG4BybYDTBNpm0=
492492
golang.org/x/tools v0.0.0-20201001230009-b5b87423c93b/go.mod h1:z6u4i615ZeAfBE4XtMziQW1fSVJXACjjbWkB/mvPzlU=
493+
golang.org/x/tools v0.0.0-20201013053347-2db1cd791039 h1:kLBxO4OPBgPwjg8Vvu+/0DCHIfDwYIGNFcD66NU9kpo=
494+
golang.org/x/tools v0.0.0-20201013053347-2db1cd791039/go.mod h1:z6u4i615ZeAfBE4XtMziQW1fSVJXACjjbWkB/mvPzlU=
493495
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
494496
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
495497
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=

protos/feast/core/DataSource.proto

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,13 +38,16 @@ message DataSource {
3838
// and fields in parent FeatureTable.
3939
map<string, string> field_mapping = 2;
4040

41-
// Must specify timestamp column name
42-
string timestamp_column = 3;
41+
// Must specify event timestamp column name
42+
string event_timestamp_column = 3;
4343

4444
// (Optional) Specify partition column
4545
// useful for file sources
4646
string date_partition_column = 4;
4747

48+
// Must specify creation timestamp column name
49+
string created_timestamp_column = 5;
50+
4851
// Defines options for DataSource that sources features from a file
4952
message FileOptions {
5053
// File Format of the file containing the features

sdk/python/feast/client.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -656,7 +656,7 @@ def ingest(
656656
_check_field_mappings(
657657
column_names,
658658
name,
659-
feature_table.batch_source.timestamp_column,
659+
feature_table.batch_source.event_timestamp_column,
660660
feature_table.batch_source.field_mapping,
661661
)
662662

@@ -671,7 +671,7 @@ def ingest(
671671
column_names,
672672
pyarrow_table,
673673
feature_table.batch_source.date_partition_column,
674-
feature_table.batch_source.timestamp_column,
674+
feature_table.batch_source.event_timestamp_column,
675675
)
676676
else:
677677
dir_path, dest_path = _write_non_partitioned_table_from_source(
@@ -680,12 +680,12 @@ def ingest(
680680

681681
try:
682682
if issubclass(type(feature_table.batch_source), FileSource):
683-
file_url = feature_table.batch_source.file_options.file_url[:-1]
683+
file_url = feature_table.batch_source.file_options.file_url.rstrip("*")
684684
_upload_to_file_source(file_url, with_partitions, dest_path)
685685
if issubclass(type(feature_table.batch_source), BigQuerySource):
686686
bq_table_ref = feature_table.batch_source.bigquery_options.table_ref
687687
feature_table_timestamp_column = (
688-
feature_table.batch_source.timestamp_column
688+
feature_table.batch_source.event_timestamp_column
689689
)
690690

691691
_upload_to_bq_source(

0 commit comments

Comments
 (0)