Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public static class JobProperties {
public static class JobUpdatesProperties {

private long timeoutSeconds;
private long pollingIntervalMillis;
}

@Getter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package feast.core.service;

import com.google.protobuf.InvalidProtocolBufferException;
import feast.core.CoreServiceProto.ListFeatureSetsRequest;
import feast.core.CoreServiceProto.ListStoresRequest.Filter;
import feast.core.CoreServiceProto.ListStoresResponse;
Expand Down Expand Up @@ -53,7 +54,6 @@
@Service
public class JobCoordinatorService {

private final long POLLING_INTERVAL_MILLISECONDS = 60000; // 1 min
private JobRepository jobRepository;
private FeatureSetRepository featureSetRepository;
private SpecService specService;
Expand Down Expand Up @@ -86,8 +86,8 @@ public JobCoordinatorService(
* <p>4) Updates Feature set statuses
*/
@Transactional
@Scheduled(fixedDelay = POLLING_INTERVAL_MILLISECONDS)
public void Poll() {
@Scheduled(fixedDelayString = "${feast.jobs.updates.pollingIntervalMillis}")
public void Poll() throws InvalidProtocolBufferException {
log.info("Polling for new jobs...");
List<JobUpdateTask> jobUpdateTasks = new ArrayList<>();
ListStoresResponse listStoresResponse = specService.listStores(Filter.newBuilder().build());
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ feast:
# Key-value dict of job options to be passed to the population jobs.
options: {}
updates:
# Job update polling interval in milliseconds: how often Feast checks if new jobs should be sent to the runner.
pollingIntervalMillis: 60000
# Timeout in seconds for each attempt to update or submit a new job to the runner.
timeoutSeconds: 240
metrics:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public void setUp() {
}

@Test
public void shouldDoNothingIfNoStoresFound() {
public void shouldDoNothingIfNoStoresFound() throws InvalidProtocolBufferException {
when(specService.listStores(any())).thenReturn(ListStoresResponse.newBuilder().build());
JobCoordinatorService jcs =
new JobCoordinatorService(
Expand Down
1 change: 1 addition & 0 deletions infra/scripts/test-end-to-end-batch.sh
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ feast:
runner: DirectRunner
options: {}
updates:
pollingIntervalMillis: 30000
timeoutSeconds: 240
metrics:
enabled: false
Expand Down
1 change: 1 addition & 0 deletions infra/scripts/test-end-to-end.sh
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ feast:
runner: DirectRunner
options: {}
updates:
pollingIntervalMillis: 30000
timeoutSeconds: 240
metrics:
enabled: false
Expand Down