Skip to content

Commit 6bedf64

Browse files
author
zhilingc
committed
Rename to ingestion_id
1 parent fc0656d commit 6bedf64

6 files changed

Lines changed: 24 additions & 21 deletions

File tree

protos/feast/core/Store.proto

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ message Store {
6969
// ====================|==================|================================
7070
// - event_timestamp | TIMESTAMP | event time of the FeatureRow
7171
// - created_timestamp | TIMESTAMP | processing time of the ingestion of the FeatureRow
72-
// - dataset_id | STRING | identifier of the batch dataset a row belongs to
72+
// - ingestion_id | STRING | unique id identifying groups of rows that have been ingested together
7373
// - job_id | STRING | identifier for the job that writes the FeatureRow to the corresponding BigQuery table
7474
//
7575
// BigQuery table created will be partitioned by the field "event_timestamp"

protos/feast/types/FeatureRow.proto

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ message FeatureRow {
4040
// rows, and write the values to the correct tables.
4141
string feature_set = 6;
4242

43-
// Identifier tying this feature row to a specific ingestion dataset. For
44-
// batch loads, this dataset id can be attributed to a single ingestion job.
45-
string dataset_id = 7;
43+
// Identifier tying this feature row to a specific ingestion job.
44+
string ingestion_id = 7;
4645
}

sdk/python/feast/client.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -826,15 +826,15 @@ def ingest(
826826
# Loop optimization declarations
827827
produce = producer.produce
828828
flush = producer.flush
829-
dataset_id = _generate_dataset_id(feature_set)
829+
ingestion_id = _generate_ingestion_id(feature_set)
830830

831831
# Transform and push data to Kafka
832832
if feature_set.source.source_type == "Kafka":
833833
for chunk in get_feature_row_chunks(
834834
file=dest_path,
835835
row_groups=list(range(pq_file.num_row_groups)),
836836
fs=feature_set,
837-
dataset_id=dataset_id,
837+
ingestion_id=ingestion_id,
838838
max_workers=max_workers,
839839
):
840840

@@ -919,7 +919,7 @@ def _build_feature_references(
919919
return features
920920

921921

922-
def _generate_dataset_id(feature_set: FeatureSet) -> str:
922+
def _generate_ingestion_id(feature_set: FeatureSet) -> str:
923923
"""
924924
Generates a UUID from the feature set name, version, and the current time.
925925

sdk/python/feast/loaders/ingest.py

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626

2727

2828
def _encode_pa_tables(
29-
file: str, feature_set: str, fields: dict, dataset_id: str, row_group_idx: int
29+
file: str, feature_set: str, fields: dict, ingestion_id: str, row_group_idx: int
3030
) -> List[bytes]:
3131
"""
3232
Helper function to encode a PyArrow table(s) read from parquet file(s) into
@@ -49,8 +49,8 @@ def _encode_pa_tables(
4949
fields (dict[str, enum.Enum.ValueType]):
5050
A mapping of field names to their value types.
5151
52-
dataset_id (str):
53-
UUID unique to this dataset.
52+
ingestion_id (str):
53+
UUID unique to this ingestion job.
5454
5555
row_group_idx(int):
5656
Row group index to read and encode into byte like FeatureRow
@@ -86,7 +86,7 @@ def _encode_pa_tables(
8686
feature_row = FeatureRow(
8787
event_timestamp=datetime_col[row_idx],
8888
feature_set=feature_set,
89-
dataset_id=dataset_id,
89+
ingestion_id=ingestion_id,
9090
)
9191
# Loop optimization declaration
9292
ext = feature_row.fields.extend
@@ -102,7 +102,11 @@ def _encode_pa_tables(
102102

103103

104104
def get_feature_row_chunks(
105-
file: str, row_groups: List[int], fs: FeatureSet, dataset_id: str, max_workers: int
105+
file: str,
106+
row_groups: List[int],
107+
fs: FeatureSet,
108+
ingestion_id: str,
109+
max_workers: int,
106110
) -> Iterable[List[bytes]]:
107111
"""
108112
Iterator function to encode a PyArrow table read from a parquet file to
@@ -120,8 +124,8 @@ def get_feature_row_chunks(
120124
fs (feast.feature_set.FeatureSet):
121125
FeatureSet describing parquet files.
122126
123-
dataset_id (str):
124-
UUID unique to this dataset.
127+
ingestion_id (str):
128+
UUID unique to this ingestion job.
125129
126130
max_workers (int):
127131
Maximum number of workers to spawn.
@@ -136,7 +140,7 @@ def get_feature_row_chunks(
136140
field_map = {field.name: field.dtype for field in fs.fields.values()}
137141

138142
pool = Pool(max_workers)
139-
func = partial(_encode_pa_tables, file, feature_set, field_map, dataset_id)
143+
func = partial(_encode_pa_tables, file, feature_set, field_map, ingestion_id)
140144
for chunk in pool.imap(func, row_groups):
141145
yield chunk
142146
return

storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/writer/BigQueryFeatureSink.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,8 @@ public abstract class BigQueryFeatureSink implements FeatureSink {
4242
"Event time for the FeatureRow";
4343
public static final String BIGQUERY_CREATED_TIMESTAMP_FIELD_DESCRIPTION =
4444
"Processing time of the FeatureRow ingestion in Feast\"";
45-
public static final String BIGQUERY_DATASET_ID_FIELD_DESCRIPTION =
46-
"Identifier of the batch dataset a row belongs to";
45+
public static final String BIGQUERY_INGESTION_ID_FIELD_DESCRIPTION =
46+
"Unique id identifying groups of rows that have been ingested together";
4747
public static final String BIGQUERY_JOB_ID_FIELD_DESCRIPTION =
4848
"Feast import job ID for the FeatureRow";
4949

@@ -171,8 +171,8 @@ private TableDefinition createBigQueryTableDefinition(FeatureSetProto.FeatureSet
171171
"created_timestamp",
172172
Pair.of(
173173
StandardSQLTypeName.TIMESTAMP, BIGQUERY_CREATED_TIMESTAMP_FIELD_DESCRIPTION),
174-
"dataset_id",
175-
Pair.of(StandardSQLTypeName.STRING, BIGQUERY_DATASET_ID_FIELD_DESCRIPTION),
174+
"ingestion_id",
175+
Pair.of(StandardSQLTypeName.STRING, BIGQUERY_INGESTION_ID_FIELD_DESCRIPTION),
176176
"job_id",
177177
Pair.of(StandardSQLTypeName.STRING, BIGQUERY_JOB_ID_FIELD_DESCRIPTION));
178178
for (Map.Entry<String, Pair<StandardSQLTypeName, String>> entry :

storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/writer/FeatureRowToTableRow.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
public class FeatureRowToTableRow implements SerializableFunction<FeatureRow, TableRow> {
3232
private static final String EVENT_TIMESTAMP_COLUMN = "event_timestamp";
3333
private static final String CREATED_TIMESTAMP_COLUMN = "created_timestamp";
34-
private static final String DATASET_ID_COLUMN = "dataset_id";
34+
private static final String INGESTION_ID_COLUMN = "ingestion_id";
3535
private static final String JOB_ID_COLUMN = "job_id";
3636
private final String jobId;
3737

@@ -48,7 +48,7 @@ public TableRow apply(FeatureRow featureRow) {
4848
TableRow tableRow = new TableRow();
4949
tableRow.set(EVENT_TIMESTAMP_COLUMN, Timestamps.toString(featureRow.getEventTimestamp()));
5050
tableRow.set(CREATED_TIMESTAMP_COLUMN, Instant.now().toString());
51-
tableRow.set(DATASET_ID_COLUMN, featureRow.getDatasetId());
51+
tableRow.set(INGESTION_ID_COLUMN, featureRow.getIngestionId());
5252
tableRow.set(JOB_ID_COLUMN, jobId);
5353

5454
for (Field field : featureRow.getFieldsList()) {

0 commit comments

Comments
 (0)