forked from tensorflow/tensorflow
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdevice_setter.py
More file actions
167 lines (144 loc) · 6.04 KB
/
device_setter.py
File metadata and controls
167 lines (144 loc) · 6.04 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
# Copyright 2015 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Device function for replicated training."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from tensorflow.core.framework import graph_pb2
from tensorflow.python.framework import device as pydev
from tensorflow.python.platform import tf_logging as logging
from tensorflow.python.training import server_lib
class _ReplicaDeviceChooser(object):
"""Class to choose devices for Ops in a replicated training setup.
This class is not to be used directly by users. See instead
`replica_device_setter()` below.
"""
def __init__(self, ps_tasks, ps_device, worker_device, merge_devices, ps_ops):
"""Create a new `_ReplicaDeviceChooser`.
Args:
ps_tasks: Number of tasks in the `ps` job.
ps_device: String. Name of the `ps` job.
worker_device: String. Name of the `worker` job.
merge_devices: Boolean. Set to True to allow merging of device specs.
ps_ops: List of `Operations` that must be placed on `ps` jobs.
If None, set it to ["Variable"].
"""
if ps_ops is None:
# TODO(sherrym): Variables in the LOCAL_VARIABLES collection should not be
# placed in the parameter server.
ps_ops = ["Variable"]
self._ps_tasks = ps_tasks
self._ps_device = ps_device
self._worker_device = worker_device
self._merge_devices = merge_devices
self._next_task_num = 0
self._ps_ops = ps_ops
def _next_task(self):
"""Returns the next task to use.
Returns:
A number.
"""
task = self._next_task_num
self._next_task_num = (self._next_task_num + 1) % self._ps_tasks
return task
def device_function(self, op):
"""Chose a device for `op`.
Args:
op: an `Operation`.
Returns:
The device to use for the `Operation`.
"""
if not self._merge_devices and op.device:
return op.device
current_device = pydev.DeviceSpec.from_string(op.device or "")
spec = pydev.DeviceSpec()
if self._ps_tasks and self._ps_device:
node_def = op if isinstance(op, graph_pb2.NodeDef) else op.node_def
if node_def.op in self._ps_ops:
device_string = "%s/task:%d" % (self._ps_device, self._next_task())
if self._merge_devices:
spec = pydev.DeviceSpec.from_string(device_string)
spec.merge_from(current_device)
return spec.to_string()
else:
return device_string
if self._worker_device:
if not self._merge_devices:
return self._worker_device
spec = pydev.DeviceSpec.from_string(self._worker_device)
if not self._merge_devices:
return ""
spec.merge_from(current_device)
return spec.to_string()
def replica_device_setter(ps_tasks=0, ps_device="/job:ps",
worker_device="/job:worker", merge_devices=True,
cluster=None, ps_ops=None):
"""Return a `device function` to use when building a Graph for replicas.
Device Functions are used in `with tf.device(device_function):` statement to
automatically assign devices to `Operation` objects as they are constructed,
Device constraints are added from the inner-most context first, working
outwards. The merging behavior adds constraints to fields that are yet unset
by a more inner context. Currently the fields are (job, task, cpu/gpu).
If `cluster` is `None`, and `ps_tasks` is 0, the returned function is a no-op.
For example,
```python
# To build a cluster with two ps jobs on hosts ps0 and ps1, and 3 worker
# jobs on hosts worker0, worker1 and worker2.
cluster_spec = {
"ps": ["ps0:2222", "ps1:2222"],
"worker": ["worker0:2222", "worker1:2222", "worker2:2222"]}
with tf.device(tf.replica_device_setter(cluster=cluster_spec)):
# Build your graph
v1 = tf.Variable(...) # assigned to /job:ps/task:0
v2 = tf.Variable(...) # assigned to /job:ps/task:1
v3 = tf.Variable(...) # assigned to /job:ps/task:0
# Run compute
```
Args:
ps_tasks: Number of tasks in the `ps` job.
ps_device: String. Device of the `ps` job. If empty no `ps` job is used.
Defaults to `ps`.
worker_device: String. Device of the `worker` job. If empty no `worker`
job is used.
merge_devices: `Boolean`. If `True`, merges or only sets a device if the
device constraint is completely unset. merges device specification rather
than overriding them.
cluster: `ClusterDef` proto or `ClusterSpec`.
ps_ops: List of `Operation` objects that need to be placed on `ps` devices.
Returns:
A function to pass to `tf.device()`.
Raises:
TypeError if `cluster` is not a dictionary or `ClusterDef` protocol buffer.
"""
if cluster is not None:
if isinstance(cluster, server_lib.ClusterSpec):
cluster_spec = cluster.as_dict()
else:
cluster_spec = server_lib.ClusterSpec(cluster).as_dict()
# Get ps_job_name from ps_device by striping "/job:".
ps_job_name = ps_device.lstrip("/job:")
if ps_job_name not in cluster_spec or cluster_spec[ps_job_name] is None:
return None
ps_tasks = len(cluster_spec[ps_job_name])
if ps_tasks == 0:
return None
else:
if not merge_devices:
logging.warning(
"DEPRECATION: It is recommended to set merge_devices=true in "
"replica_device_setter")
chooser = _ReplicaDeviceChooser(
ps_tasks, ps_device, worker_device, merge_devices, ps_ops)
return chooser.device_function