-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathtest_dataflow.py
More file actions
124 lines (109 loc) · 4.52 KB
/
test_dataflow.py
File metadata and controls
124 lines (109 loc) · 4.52 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
import copy
from dffml.df.types import DataFlow, Input
from dffml.df.memory import MemoryOrchestrator
from dffml.operation.dataflow import run_dataflow, RunDataFlowConfig
from dffml.operation.output import GetSingle
from dffml.util.asynctestcase import AsyncTestCase
from ..test_df import DATAFLOW, add, mult, parse_line
class TestRunDataFlowOnRecord(AsyncTestCase):
async def test_run(self):
test_dataflow = DataFlow(
operations={
"run_dataflow": run_dataflow.op,
"get_single": GetSingle.imp.op,
},
configs={"run_dataflow": RunDataFlowConfig(dataflow=DATAFLOW)},
seed=[
Input(
value=[run_dataflow.op.outputs["results"].name],
definition=GetSingle.op.inputs["spec"],
)
],
)
test_inputs = [
{
"add_op": [
{
"value": "add 40 and 2",
"definition": parse_line.op.inputs["line"].name,
},
{
"value": [add.op.outputs["sum"].name],
"definition": GetSingle.op.inputs["spec"].name,
},
]
},
{
"mult_op": [
{
"value": "multiply 42 and 10",
"definition": parse_line.op.inputs["line"].name,
},
{
"value": [mult.op.outputs["product"].name],
"definition": GetSingle.op.inputs["spec"].name,
},
]
},
]
test_outputs = {"add_op": 42, "mult_op": 420}
async with MemoryOrchestrator.withconfig({}) as orchestrator:
async with orchestrator(test_dataflow) as octx:
async for _ctx, results in octx.run(
{
list(test_input.keys())[0]: [
Input(
value=test_input,
definition=run_dataflow.op.inputs["inputs"],
)
]
for test_input in test_inputs
}
):
ctx_str = (await _ctx.handle()).as_string()
self.assertIn("flow_results", results)
results = results["flow_results"]
self.assertIn(ctx_str, map(str, results.keys()))
self.assertIn(ctx_str, test_outputs)
results = results[list(results.keys())[0]]
self.assertIn("result", results)
results = results["result"]
expected_results = test_outputs[ctx_str]
self.assertEqual(expected_results, results)
async def test_run_custom(self):
output_definition = add.op.outputs["sum"]
get_single_spec_input = Input(
value=[output_definition.name],
definition=GetSingle.op.inputs["spec"],
)
subflow = copy.deepcopy(DATAFLOW)
subflow.seed.append(get_single_spec_input)
test_dataflow = DataFlow(
operations={
"run_dataflow": run_dataflow.op._replace(
inputs=parse_line.op.inputs,
outputs={output_definition.name: output_definition},
),
"get_single": GetSingle.imp.op,
},
configs={"run_dataflow": RunDataFlowConfig(dataflow=subflow)},
seed=[get_single_spec_input],
)
test_outputs = {"add 40 and 2": 42, "multiply 42 and 10": 420}
async with MemoryOrchestrator.withconfig({}) as orchestrator:
async with orchestrator(test_dataflow) as octx:
async for _ctx, results in octx.run(
{
input_line: [
Input(
value=input_line,
definition=parse_line.op.inputs["line"],
)
]
for input_line in test_outputs
}
):
ctx_str = (await _ctx.handle()).as_string()
results = results[output_definition.name]
expected_results = test_outputs[ctx_str]
self.assertEqual(expected_results, results)