Skip to content

Commit 316eac4

Browse files
zhilingcfeast-ci-bot
authored andcommitted
Add wait flag for jobs, fix go proto path for dataset service (#138)
1 parent 5dbb66e commit 316eac4

1 file changed

Lines changed: 32 additions & 0 deletions

File tree

cli/feast/cmd/jobs.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,18 @@ import (
1919
"errors"
2020
"fmt"
2121
"io/ioutil"
22+
"time"
2223

2324
"github.com/gojek/feast/cli/feast/pkg/parse"
2425
"github.com/gojek/feast/protos/generated/go/feast/core"
2526

2627
"github.com/spf13/cobra"
2728
)
2829

30+
var (
31+
waitJobComplete = false
32+
)
33+
2934
// jobsCmd represents the jobs command
3035
var jobsCmd = &cobra.Command{
3136
Use: "jobs",
@@ -63,6 +68,7 @@ var jobsAbortCmd = &cobra.Command{
6368
}
6469

6570
func init() {
71+
jobsRunCmd.Flags().BoolVar(&waitJobComplete, "wait", false, "wait for job to run to completion")
6672
jobsCmd.AddCommand(jobsRunCmd)
6773
jobsCmd.AddCommand(jobsAbortCmd)
6874
rootCmd.AddCommand(jobsCmd)
@@ -86,9 +92,35 @@ func runJob(ctx context.Context, path string) error {
8692
return fmt.Errorf("[jobs] failed to start job: %v", err)
8793
}
8894
fmt.Printf("[jobs] started job with ID: %s", out.GetJobId())
95+
if waitJobComplete {
96+
return waitJob(ctx, jobsClient, out.GetJobId())
97+
}
8998
return nil
9099
}
91100

101+
func waitJob(ctx context.Context, jobsClient core.JobServiceClient, jobID string) error {
102+
for {
103+
response, err := jobsClient.GetJob(ctx, &core.JobServiceTypes_GetJobRequest{
104+
Id: jobID,
105+
})
106+
if err != nil {
107+
return fmt.Errorf("[jobs] error while querying job id %s: %v", jobID, err)
108+
}
109+
110+
status := response.GetJob().GetStatus()
111+
fmt.Printf("\r[jobs] job id %s is currently: %s\n", jobID, status)
112+
switch status {
113+
case "COMPLETED":
114+
return nil
115+
case "ABORTED":
116+
return fmt.Errorf("[jobs] job id %s failed: Job was aborted", jobID)
117+
case "ERROR":
118+
return fmt.Errorf("[jobs] job id %s failed: Job terminated with error. For more information, refer to job logs", jobID)
119+
}
120+
time.Sleep(5 * time.Second)
121+
}
122+
}
123+
92124
func abortJob(ctx context.Context, id string) error {
93125
initConn()
94126
jobsClient := core.NewJobServiceClient(coreConn)

0 commit comments

Comments
 (0)