-
Notifications
You must be signed in to change notification settings - Fork 4.5k
Expand file tree
/
Copy pathyaml_specifiable.py
More file actions
59 lines (49 loc) · 2.13 KB
/
yaml_specifiable.py
File metadata and controls
59 lines (49 loc) · 2.13 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from apache_beam.io.filesystems import FileSystems
from apache_beam.ml.anomaly.specifiable import Spec
from apache_beam.ml.anomaly.transforms import AnomalyDetection
from apache_beam.ml.anomaly.transforms import Specifiable
from apache_beam.utils import python_callable
from apache_beam.yaml.yaml_provider import InlineProvider
def maybe_make_specifiable(v):
if isinstance(v, dict):
if "type" in v and "config" in v:
return Specifiable.from_spec(
Spec(type=v["type"], config=maybe_make_specifiable(v["config"])))
if "callable" in v:
if "path" in v or "name" in v:
raise ValueError(
"Cannot specify 'callable' with 'path' and 'name' for function.")
else:
return python_callable.PythonCallableWithSource(v["callable"])
if "path" in v and "name" in v:
return python_callable.PythonCallableWithSource.load_from_script(
FileSystems.open(v["path"]).read().decode(), v["name"])
ret = {k: maybe_make_specifiable(v[k]) for k in v}
return ret
else:
return v
class SpecProvider(InlineProvider):
def create_transform(self, type, args, yaml_create_transform):
return self._transform_factories[type](
**{
k: maybe_make_specifiable(v)
for k, v in args.items()
})
def create_spec_providers():
return SpecProvider({"AnomalyDetection": AnomalyDetection})