@@ -19,13 +19,18 @@ import (
1919 "errors"
2020 "fmt"
2121 "io/ioutil"
22+ "time"
2223
2324 "github.com/gojek/feast/cli/feast/pkg/parse"
2425 "github.com/gojek/feast/protos/generated/go/feast/core"
2526
2627 "github.com/spf13/cobra"
2728)
2829
30+ var (
31+ waitJobComplete = false
32+ )
33+
2934// jobsCmd represents the jobs command
3035var jobsCmd = & cobra.Command {
3136 Use : "jobs" ,
@@ -63,6 +68,7 @@ var jobsAbortCmd = &cobra.Command{
6368}
6469
6570func init () {
71+ jobsRunCmd .Flags ().BoolVar (& waitJobComplete , "wait" , false , "wait for job to run to completion" )
6672 jobsCmd .AddCommand (jobsRunCmd )
6773 jobsCmd .AddCommand (jobsAbortCmd )
6874 rootCmd .AddCommand (jobsCmd )
@@ -86,9 +92,35 @@ func runJob(ctx context.Context, path string) error {
8692 return fmt .Errorf ("[jobs] failed to start job: %v" , err )
8793 }
8894 fmt .Printf ("[jobs] started job with ID: %s" , out .GetJobId ())
95+ if waitJobComplete {
96+ return waitJob (ctx , jobsClient , out .GetJobId ())
97+ }
8998 return nil
9099}
91100
101+ func waitJob (ctx context.Context , jobsClient core.JobServiceClient , jobID string ) error {
102+ for {
103+ response , err := jobsClient .GetJob (ctx , & core.JobServiceTypes_GetJobRequest {
104+ Id : jobID ,
105+ })
106+ if err != nil {
107+ return fmt .Errorf ("[jobs] error while querying job id %s: %v" , jobID , err )
108+ }
109+
110+ status := response .GetJob ().GetStatus ()
111+ fmt .Printf ("\r [jobs] job id %s is currently: %s\n " , jobID , status )
112+ switch status {
113+ case "COMPLETED" :
114+ return nil
115+ case "ABORTED" :
116+ return fmt .Errorf ("[jobs] job id %s failed: Job was aborted" , jobID )
117+ case "ERROR" :
118+ return fmt .Errorf ("[jobs] job id %s failed: Job terminated with error. For more information, refer to job logs" , jobID )
119+ }
120+ time .Sleep (5 * time .Second )
121+ }
122+ }
123+
92124func abortJob (ctx context.Context , id string ) error {
93125 initConn ()
94126 jobsClient := core .NewJobServiceClient (coreConn )
0 commit comments