forked from intel/dffml
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathop.py
More file actions
88 lines (75 loc) · 2.83 KB
/
Copy pathop.py
File metadata and controls
88 lines (75 loc) · 2.83 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
from typing import List
from ..record import Record
from ..high_level.dataflow import run
from ..base import config, field
from ..df.types import DataFlow, Input
from ..util.entrypoint import entrypoint
from ..operation.output import GetSingle
from ..df.base import OperationImplementation
from .memory import MemorySource, MemorySourceContext
class OnlyOneOutputAllowedError(Exception):
"""
Raised when the opimp given has more than one output
"""
class EmptyError(Exception):
"""
Raised when the source is still empty after running the opimp
"""
class NotEnoughArgs(Exception):
"""
Raised when the source was not given an arg for each operation input
"""
@config
class OpSourceConfig:
opimp: OperationImplementation
args: List[str] = field(
"Arguments to operation in input order", default_factory=lambda: [],
)
allowempty: bool = field(
"Raise an error if the source is empty after running the loading operation",
default=False,
)
@entrypoint("op")
class OpSource(MemorySource):
CONTEXT = MemorySourceContext
CONFIG = OpSourceConfig
async def __aenter__(self):
await super().__aenter__()
# Ensure the opimp only has one output
if len(self.config.opimp.op.outputs) != 1:
raise OnlyOneOutputAllowedError(self.config.opimp.op.outputs)
# Make a DataFlow
dataflow = DataFlow.auto(self.config.opimp.__class__, GetSingle)
# Make get_single output operation grab the output we care about
dataflow.seed.append(
Input(
value=[list(self.config.opimp.op.outputs.values())[0].name],
definition=GetSingle.op.inputs["spec"],
)
)
# Ensure we have enough inputs
if len(self.config.args) != len(self.config.opimp.op.inputs):
raise NotEnoughArgs(
f"Args: {self.config.args}, Inputs: {self.config.opimp.op.inputs}"
)
# Add inputs for operation
for value, definition in zip(
self.config.args, self.config.opimp.op.inputs.values()
):
dataflow.seed.append(Input(value=value, definition=definition))
# Run the DataFlow
async for _ctx, result in run(dataflow):
# Grab output definition from result of get_single
result = result[
list(self.config.opimp.op.outputs.values())[0].name
]
# Convert to record objects if dict's
for key, value in result.items():
if not isinstance(value, Record):
result[key] = Record(key, data=value)
# Set mem to result of operation
self.mem = result
# Ensure the source isn't empty
if not self.mem and not self.config.allowempty:
raise EmptyError()
return self