Skip to content

Commit 488246b

Browse files
committed
Merge remote-tracking branch 'origin/master' into custom_support2
2 parents 4f4178a + 2d6bdfe commit 488246b

9 files changed

Lines changed: 172 additions & 15 deletions

File tree

R/R/flags.R

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,17 @@ parse_arguments <- function(arguments = NULL) {
3838
}
3939
next
4040
}
41+
42+
if (argument == "step-functions"){
43+
i <- i + 1
44+
if (i <= n){
45+
values$step_functions <- arguments[i]
46+
} else {
47+
values$step_functions <- ""
48+
}
49+
next
50+
}
51+
4152
if (!grepl("^--", argument)) {
4253
if (grepl("batch", argument)) {
4354
values$batch <- parse_batch(arguments)
@@ -131,11 +142,15 @@ split_parameters <- function(flags) {
131142
"package_suffixes", "no-pylint",
132143
"help", "resume",
133144
"max_num_splits", "max_workers",
134-
"other_args", "show",
135-
"authorize",
136-
"my_runs", "run_id", "user",
137-
"origin_run_id", "with",
138-
"tag"
145+
"other_args", "show", "user",
146+
"my_runs", "run_id",
147+
"origin_run_id", "with", "tag",
148+
# step-functions subcommands and options
149+
"authorize", "step_functions",
150+
"only_json", "generate_new_token",
151+
"running", "succeeded", "failed",
152+
"timed_out", "aborted", "namespace",
153+
"new_token", "workflow_timeout"
139154
)
140155
parameters <- flags[parameters]
141156
if (length(parameters) == 0) {

R/R/flow.R

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ Flow <- R6::R6Class("Flow",
7373
)
7474

7575
header <- function(flow, flow_decorators = NULL) {
76-
imports <- paste0(c("FlowSpec", "step", "Parameter", "retry", "environment", "batch", "catch", "resources"), collapse = ", ")
76+
imports <- paste0(c("FlowSpec", "step", "Parameter", "retry", "environment", "batch", "catch", "resources", "schedule"), collapse = ", ")
7777
paste0(
7878
"from metaflow import ", imports, space(1, type = "v"),
7979
"from metaflow.R import call_r", space(3, type = "v"),

R/R/run.R

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,28 @@ run_cmd <- function(flow_file, ...) {
9090
batch <- ""
9191
}
9292

93+
if ("step_functions" %in% names(flags)) {
94+
sfn_cmd <- paste("step-functions", flags$step_functions)
95+
for (subcommand in c("authorize", "generate_new_token",
96+
"only_json", "running", "succeeded",
97+
"failed", "timed_out", "aborted")){
98+
if (subcommand %in% names(flags)){
99+
subcommand_valid <- gsub("_", "-", subcommand)
100+
sfn_cmd <- paste(sfn_cmd, paste0("--", subcommand_valid))
101+
}
102+
}
103+
104+
for (subcommand in c("new_token", "tag", "namespace",
105+
"max_workers", "workflow_timeout")){
106+
if (subcommand %in% names(flags)){
107+
subcommand_valid <- gsub("_", "-", subcommand)
108+
sfn_cmd <- paste(sfn_cmd, paste0("--", subcommand_valid), flags[[subcommand]])
109+
}
110+
}
111+
} else {
112+
sfn_cmd <- ""
113+
}
114+
93115
if ("max_workers" %in% names(flags)) {
94116
max_workers <- paste0("--max-workers=", flags$max_workers)
95117
} else {
@@ -100,12 +122,15 @@ run_cmd <- function(flow_file, ...) {
100122
} else {
101123
max_num_splits <- ""
102124
}
125+
103126
if ("other_args" %in% names(flags)) {
104127
other_args <- paste(flags$other_args)
105128
} else {
106129
other_args <- ""
107130
}
131+
108132
parameters <- split_parameters(flags)
133+
109134
if ("with" %in% names(flags)) {
110135
with <- unlist(lapply(seq_along(flags$with), function(x) {
111136
paste(paste0("--with ", unlist(flags$with[x])), collapse = " ")
@@ -114,6 +139,7 @@ run_cmd <- function(flow_file, ...) {
114139
} else {
115140
with <- ""
116141
}
142+
117143
if ("tag" %in% names(flags)) {
118144
tag <- unlist(lapply(seq_along(flags$tag), function(x) {
119145
paste(paste0("--tag ", unlist(flags$tag[x])), collapse = " ")
@@ -159,15 +185,20 @@ run_cmd <- function(flow_file, ...) {
159185
cmd <- paste("Rscript", run_path, flow_RDS, show)
160186
}
161187

188+
if ("step_functions" %in% names(flags)){
189+
cmd <- paste("Rscript", run_path, flow_RDS,
190+
"--no-pylint", package_suffixes, sfn_cmd,
191+
parameters, other_args)
192+
}
193+
162194
if ("help" %in% names(flags) && flags$help) {
163195
# if help is specified by the run(...) R functions
164196
if ("help" %in% names(run_options) && run_options$help) {
165197
help_cmd <- "--help"
166198
} else { # if help is specified in command line
167199
help_cmd <- paste(commandArgs(trailingOnly = TRUE), collapse = " ")
168200
}
169-
cmd <- paste("Rscript", run_path, flow_RDS, help_cmd)
201+
cmd <- paste("Rscript", run_path, flow_RDS, "--no-pylint", help_cmd)
170202
}
171-
172203
cmd
173204
}

R/tests/testthat/test-flow.R

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ context("test-flow.R")
33
test_that("header() formatted correctly", {
44
skip_if_no_metaflow()
55
actual <- header("TestFlow")
6-
expected <- "from metaflow import FlowSpec, step, Parameter, retry, environment, batch, catch, resources\nfrom metaflow.R import call_r\n\n\nclass TestFlow(FlowSpec):\n"
6+
expected <- "from metaflow import FlowSpec, step, Parameter, retry, environment, batch, catch, resources, schedule\nfrom metaflow.R import call_r\n\n\nclass TestFlow(FlowSpec):\n"
77
expect_equal(actual, expected)
88
})
99

@@ -27,7 +27,7 @@ test_that("get_flow() returns correct string", {
2727
) %>%
2828
step(step = "end")
2929
actual <- TestFlow$get_flow()
30-
expected <- "from metaflow import FlowSpec, step, Parameter, retry, environment, batch, catch, resources\nfrom metaflow.R import call_r\n\n\nclass TestFlow(FlowSpec):\n\n @step\n def start(self):\n self.next(self.middle)\n\n @step\n def middle(self):\n self.next(self.end)\n\n @step\n def end(self):\n pass\n\n\nFLOW=TestFlow\nif __name__ == '__main__':\n TestFlow()"
30+
expected <- "from metaflow import FlowSpec, step, Parameter, retry, environment, batch, catch, resources, schedule\nfrom metaflow.R import call_r\n\n\nclass TestFlow(FlowSpec):\n\n @step\n def start(self):\n self.next(self.middle)\n\n @step\n def middle(self):\n self.next(self.end)\n\n @step\n def end(self):\n pass\n\n\nFLOW=TestFlow\nif __name__ == '__main__':\n TestFlow()"
3131
expect_equal(actual, expected)
3232
TestFlow$get_flow(save = TRUE)
3333
actual <- readChar("flow.py", nchars = nchar(expected))

R/tests/testthat/test-run-cmd.R

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
#!/usr/bin/env Rscript
2+
library(metaflow)
3+
4+
run_cmd <- metaflow:::run_cmd("flow.RDS")
5+
saveRDS(run_cmd, "run_cmd.RDS")

R/tests/testthat/test-run.R

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,6 @@ test_that("test run_cmd correctly parses help", {
3535
actual <- run_cmd("flow.RDS", help = TRUE) %>%
3636
as.character() %>%
3737
extract_args()
38-
expected <- c("--flowRDS=flow.RDS", "--help")
38+
expected <- c("--flowRDS=flow.RDS", "--no-pylint", "--help")
3939
expect_equal(actual, expected)
4040
})
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
test_that("SFN create", {
2+
skip_if_no_metaflow()
3+
4+
cmd <- "Rscript test-run-cmd.R step-functions create"
5+
system(cmd)
6+
7+
# Rscript /Library/../metaflow/run.R --flowRDS=flow.RDS step-functions create
8+
run_cmd <- strsplit(trimws(readRDS("run_cmd.RDS")), split=" ")[[1]]
9+
actual <- paste(run_cmd[3:length(run_cmd)], collapse=" ")
10+
11+
expected <- "--flowRDS=flow.RDS --no-pylint step-functions create"
12+
13+
expect_equal(actual, expected)
14+
on.exit(file.remove("run_cmd.RDS"))
15+
})
16+
17+
test_that("SFN create --help", {
18+
skip_if_no_metaflow()
19+
20+
cmd <- "Rscript test-run-cmd.R step-functions create --help"
21+
system(cmd)
22+
23+
run_cmd <- strsplit(trimws(readRDS("run_cmd.RDS")), split=" ")[[1]]
24+
actual <- paste(run_cmd[3:length(run_cmd)], collapse=" ")
25+
26+
expected <- "--flowRDS=flow.RDS --no-pylint step-functions create --help"
27+
28+
expect_equal(actual, expected)
29+
on.exit(file.remove("run_cmd.RDS"))
30+
})
31+
32+
test_that("SFN create --package-suffixes", {
33+
skip_if_no_metaflow()
34+
35+
cmd <- "Rscript test-run-cmd.R --package-suffixes=.csv,.RDS,.R step-functions create"
36+
system(cmd)
37+
38+
run_cmd <- strsplit(trimws(readRDS("run_cmd.RDS")), split=" ")[[1]]
39+
actual <- paste(run_cmd[3:length(run_cmd)], collapse=" ")
40+
41+
expected <- "--flowRDS=flow.RDS --no-pylint --package-suffixes=.csv,.RDS,.R step-functions create"
42+
43+
expect_equal(actual, expected)
44+
on.exit(file.remove("run_cmd.RDS"))
45+
})
46+
47+
test_that("SFN create --generate-new-token", {
48+
skip_if_no_metaflow()
49+
50+
cmd <- "Rscript test-run-cmd.R step-functions create --generate-new-token"
51+
system(cmd)
52+
53+
run_cmd <- strsplit(trimws(readRDS("run_cmd.RDS")), split=" ")[[1]]
54+
actual <- paste(run_cmd[3:length(run_cmd)], collapse=" ")
55+
56+
expected <- "--flowRDS=flow.RDS --no-pylint step-functions create --generate-new-token"
57+
58+
expect_equal(actual, expected)
59+
on.exit(file.remove("run_cmd.RDS"))
60+
})
61+
62+
test_that("SFN create --generate-new-token --max-workers 100 --lr 0.01", {
63+
skip_if_no_metaflow()
64+
65+
cmd <- "Rscript test-run-cmd.R step-functions create --generate-new-token --max-workers 100 --lr 0.01"
66+
system(cmd)
67+
68+
run_cmd <- strsplit(trimws(readRDS("run_cmd.RDS")), split=" ")[[1]]
69+
actual <- paste(run_cmd[3:length(run_cmd)], collapse=" ")
70+
71+
expected <- "--flowRDS=flow.RDS --no-pylint step-functions create --generate-new-token --max-workers 100 --lr 0.01"
72+
expect_equal(actual, expected)
73+
on.exit(file.remove("run_cmd.RDS"))
74+
})
75+
76+
77+
test_that("SFN trigger", {
78+
skip_if_no_metaflow()
79+
80+
cmd <- "Rscript test-run-cmd.R step-functions trigger"
81+
system(cmd)
82+
83+
run_cmd <- strsplit(trimws(readRDS("run_cmd.RDS")), split=" ")[[1]]
84+
actual <- paste(run_cmd[3:length(run_cmd)], collapse=" ")
85+
86+
expected <- "--flowRDS=flow.RDS --no-pylint step-functions trigger"
87+
88+
expect_equal(actual, expected)
89+
on.exit(file.remove("run_cmd.RDS"))
90+
})
91+
92+
93+
test_that("SFN list-runs --running", {
94+
skip_if_no_metaflow()
95+
96+
cmd <- "Rscript test-run-cmd.R step-functions list-runs --running"
97+
system(cmd)
98+
99+
run_cmd <- strsplit(trimws(readRDS("run_cmd.RDS")), split=" ")[[1]]
100+
actual <- paste(run_cmd[3:length(run_cmd)], collapse=" ")
101+
102+
expected <- "--flowRDS=flow.RDS --no-pylint step-functions list-runs --running"
103+
104+
expect_equal(actual, expected)
105+
on.exit(file.remove("run_cmd.RDS"))
106+
})

R/tests/testthat/test-step.R

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ test_that("test join step", {
1010
next_step = "end"
1111
)
1212
actual <- TestFlow$get_flow()
13-
expected <- "from metaflow import FlowSpec, step, Parameter, retry, environment, batch, catch, resources\nfrom metaflow.R import call_r\n\n\nclass TestFlow(FlowSpec):\n\n @step\n def join(self, inputs):\n self.next(self.end)\n\n\nFLOW=TestFlow\nif __name__ == '__main__':\n TestFlow()"
13+
expected <- "from metaflow import FlowSpec, step, Parameter, retry, environment, batch, catch, resources, schedule\nfrom metaflow.R import call_r\n\n\nclass TestFlow(FlowSpec):\n\n @step\n def join(self, inputs):\n self.next(self.end)\n\n\nFLOW=TestFlow\nif __name__ == '__main__':\n TestFlow()"
1414
expect_equal(actual, expected)
1515
})
1616

@@ -23,7 +23,7 @@ test_that("test foreach step", {
2323
next_step = "end"
2424
)
2525
actual <- TestFlow$get_flow()
26-
expected <- "from metaflow import FlowSpec, step, Parameter, retry, environment, batch, catch, resources\nfrom metaflow.R import call_r\n\n\nclass TestFlow(FlowSpec):\n\n @step\n def join(self):\n self.next(self.end, foreach='parameters')\n\n\nFLOW=TestFlow\nif __name__ == '__main__':\n TestFlow()"
26+
expected <- "from metaflow import FlowSpec, step, Parameter, retry, environment, batch, catch, resources, schedule\nfrom metaflow.R import call_r\n\n\nclass TestFlow(FlowSpec):\n\n @step\n def join(self):\n self.next(self.end, foreach='parameters')\n\n\nFLOW=TestFlow\nif __name__ == '__main__':\n TestFlow()"
2727
expect_equal(actual, expected)
2828
})
2929

@@ -40,7 +40,7 @@ test_that("test join + r_function step", {
4040
next_step = "end"
4141
)
4242
actual <- TestFlow$get_flow()
43-
expected <- "from metaflow import FlowSpec, step, Parameter, retry, environment, batch, catch, resources\nfrom metaflow.R import call_r\n\n\nclass TestFlow(FlowSpec):\n\n @step\n def join(self, inputs):\n r_inputs = {node._current_step : node for node in inputs} if len(inputs[0].foreach_stack()) == 0 else list(inputs)\n call_r('join_fun', (self, r_inputs))\n self.next(self.end)\n\n\nFLOW=TestFlow\nif __name__ == '__main__':\n TestFlow()"
43+
expected <- "from metaflow import FlowSpec, step, Parameter, retry, environment, batch, catch, resources, schedule\nfrom metaflow.R import call_r\n\n\nclass TestFlow(FlowSpec):\n\n @step\n def join(self, inputs):\n r_inputs = {node._current_step : node for node in inputs} if len(inputs[0].foreach_stack()) == 0 else list(inputs)\n call_r('join_fun', (self, r_inputs))\n self.next(self.end)\n\n\nFLOW=TestFlow\nif __name__ == '__main__':\n TestFlow()"
4444
expect_equal(actual, expected)
4545
})
4646

metaflow/R.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,4 +111,4 @@ def run(flow_script,
111111
os.remove(tmp.name)
112112
os._exit(1)
113113
finally:
114-
os.remove(tmp.name)
114+
os.remove(tmp.name)

0 commit comments

Comments
 (0)