forked from ray-project/ray
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathservices.py
More file actions
1502 lines (1330 loc) · 64.2 KB
/
services.py
File metadata and controls
1502 lines (1330 loc) · 64.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import binascii
from collections import namedtuple, OrderedDict
from datetime import datetime
import json
import os
import psutil
import pyarrow
import random
import redis
import resource
import shutil
import signal
import socket
import subprocess
import sys
import time
import threading
# Ray modules
import ray.local_scheduler
import ray.plasma
import ray.global_scheduler as global_scheduler
PROCESS_TYPE_MONITOR = "monitor"
PROCESS_TYPE_LOG_MONITOR = "log_monitor"
PROCESS_TYPE_WORKER = "worker"
PROCESS_TYPE_LOCAL_SCHEDULER = "local_scheduler"
PROCESS_TYPE_PLASMA_MANAGER = "plasma_manager"
PROCESS_TYPE_PLASMA_STORE = "plasma_store"
PROCESS_TYPE_GLOBAL_SCHEDULER = "global_scheduler"
PROCESS_TYPE_REDIS_SERVER = "redis_server"
PROCESS_TYPE_WEB_UI = "web_ui"
# This is a dictionary tracking all of the processes of different types that
# have been started by this services module. Note that the order of the keys is
# important because it determines the order in which these processes will be
# terminated when Ray exits, and certain orders will cause errors to be logged
# to the screen.
all_processes = OrderedDict([(PROCESS_TYPE_MONITOR, []),
(PROCESS_TYPE_LOG_MONITOR, []),
(PROCESS_TYPE_WORKER, []),
(PROCESS_TYPE_LOCAL_SCHEDULER, []),
(PROCESS_TYPE_PLASMA_MANAGER, []),
(PROCESS_TYPE_PLASMA_STORE, []),
(PROCESS_TYPE_GLOBAL_SCHEDULER, []),
(PROCESS_TYPE_REDIS_SERVER, []),
(PROCESS_TYPE_WEB_UI, [])],)
# True if processes are run in the valgrind profiler.
RUN_LOCAL_SCHEDULER_PROFILER = False
RUN_PLASMA_MANAGER_PROFILER = False
RUN_PLASMA_STORE_PROFILER = False
# Location of the redis server and module.
REDIS_EXECUTABLE = os.path.join(
os.path.abspath(os.path.dirname(__file__)),
"core/src/common/thirdparty/redis/src/redis-server")
REDIS_MODULE = os.path.join(
os.path.abspath(os.path.dirname(__file__)),
"core/src/common/redis_module/libray_redis_module.so")
# Location of the credis server and modules.
CREDIS_EXECUTABLE = os.path.join(
os.path.abspath(os.path.dirname(__file__)),
"core/src/credis/redis/src/redis-server")
CREDIS_MASTER_MODULE = os.path.join(
os.path.abspath(os.path.dirname(__file__)),
"core/src/credis/build/src/libmaster.so")
CREDIS_MEMBER_MODULE = os.path.join(
os.path.abspath(os.path.dirname(__file__)),
"core/src/credis/build/src/libmember.so")
# ObjectStoreAddress tuples contain all information necessary to connect to an
# object store. The fields are:
# - name: The socket name for the object store
# - manager_name: The socket name for the object store manager
# - manager_port: The Internet port that the object store manager listens on
ObjectStoreAddress = namedtuple("ObjectStoreAddress", ["name",
"manager_name",
"manager_port"])
def address(ip_address, port):
return ip_address + ":" + str(port)
def get_ip_address(address):
assert type(address) == str, "Address must be a string"
ip_address = address.split(":")[0]
return ip_address
def get_port(address):
try:
port = int(address.split(":")[1])
except Exception:
raise Exception("Unable to parse port from address {}".format(address))
return port
def new_port():
return random.randint(10000, 65535)
def random_name():
return str(random.randint(0, 99999999))
def kill_process(p):
"""Kill a process.
Args:
p: The process to kill.
Returns:
True if the process was killed successfully and false otherwise.
"""
if p.poll() is not None:
# The process has already terminated.
return True
if any([RUN_LOCAL_SCHEDULER_PROFILER, RUN_PLASMA_MANAGER_PROFILER,
RUN_PLASMA_STORE_PROFILER]):
# Give process signal to write profiler data.
os.kill(p.pid, signal.SIGINT)
# Wait for profiling data to be written.
time.sleep(0.1)
# Allow the process one second to exit gracefully.
p.terminate()
timer = threading.Timer(1, lambda p: p.kill(), [p])
try:
timer.start()
p.wait()
finally:
timer.cancel()
if p.poll() is not None:
return True
# If the process did not exit within one second, force kill it.
p.kill()
if p.poll() is not None:
return True
# The process was not killed for some reason.
return False
def cleanup():
"""When running in local mode, shutdown the Ray processes.
This method is used to shutdown processes that were started with
services.start_ray_head(). It kills all scheduler, object store, and worker
processes that were started by this services module. Driver processes are
started and disconnected by worker.py.
"""
successfully_shut_down = True
# Terminate the processes in reverse order.
for process_type in all_processes.keys():
# Kill all of the processes of a certain type.
for p in all_processes[process_type]:
success = kill_process(p)
successfully_shut_down = successfully_shut_down and success
# Reset the list of processes of this type.
all_processes[process_type] = []
if not successfully_shut_down:
print("Ray did not shut down properly.")
def all_processes_alive(exclude=[]):
"""Check if all of the processes are still alive.
Args:
exclude: Don't check the processes whose types are in this list.
"""
for process_type, processes in all_processes.items():
# Note that p.poll() returns the exit code that the process exited
# with, so an exit code of None indicates that the process is still
# alive.
processes_alive = [p.poll() is None for p in processes]
if (not all(processes_alive) and process_type not in exclude):
print("A process of type {} has died.".format(process_type))
return False
return True
def address_to_ip(address):
"""Convert a hostname to a numerical IP addresses in an address.
This should be a no-op if address already contains an actual numerical IP
address.
Args:
address: This can be either a string containing a hostname (or an IP
address) and a port or it can be just an IP address.
Returns:
The same address but with the hostname replaced by a numerical IP
address.
"""
address_parts = address.split(":")
ip_address = socket.gethostbyname(address_parts[0])
# Make sure localhost isn't resolved to the loopback ip
if ip_address == "127.0.0.1":
ip_address = get_node_ip_address()
return ":".join([ip_address] + address_parts[1:])
def get_node_ip_address(address="8.8.8.8:53"):
"""Determine the IP address of the local node.
Args:
address (str): The IP address and port of any known live service on the
network you care about.
Returns:
The IP address of the current node.
"""
ip_address, port = address.split(":")
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
# This command will raise an exception if there is no internet
# connection.
s.connect((ip_address, int(port)))
node_ip_address = s.getsockname()[0]
except Exception as e:
node_ip_address = "127.0.0.1"
return node_ip_address
def record_log_files_in_redis(redis_address, node_ip_address, log_files):
"""Record in Redis that a new log file has been created.
This is used so that each log monitor can check Redis and figure out which
log files it is reponsible for monitoring.
Args:
redis_address: The address of the redis server.
node_ip_address: The IP address of the node that the log file exists
on.
log_files: A list of file handles for the log files. If one of the file
handles is None, we ignore it.
"""
for log_file in log_files:
if log_file is not None:
redis_ip_address, redis_port = redis_address.split(":")
redis_client = redis.StrictRedis(host=redis_ip_address,
port=redis_port)
# The name of the key storing the list of log filenames for this IP
# address.
log_file_list_key = "LOG_FILENAMES:{}".format(node_ip_address)
redis_client.rpush(log_file_list_key, log_file.name)
def create_redis_client(redis_address):
"""Create a Redis client.
Args:
The IP address and port of the Redis server.
Returns:
A Redis client.
"""
redis_ip_address, redis_port = redis_address.split(":")
# For this command to work, some other client (on the same machine
# as Redis) must have run "CONFIG SET protected-mode no".
return redis.StrictRedis(host=redis_ip_address, port=int(redis_port))
def wait_for_redis_to_start(redis_ip_address, redis_port, num_retries=5):
"""Wait for a Redis server to be available.
This is accomplished by creating a Redis client and sending a random
command to the server until the command gets through.
Args:
redis_ip_address (str): The IP address of the redis server.
redis_port (int): The port of the redis server.
num_retries (int): The number of times to try connecting with redis.
The client will sleep for one second between attempts.
Raises:
Exception: An exception is raised if we could not connect with Redis.
"""
redis_client = redis.StrictRedis(host=redis_ip_address, port=redis_port)
# Wait for the Redis server to start.
counter = 0
while counter < num_retries:
try:
# Run some random command and see if it worked.
print("Waiting for redis server at {}:{} to respond..."
.format(redis_ip_address, redis_port))
redis_client.client_list()
except redis.ConnectionError as e:
# Wait a little bit.
time.sleep(1)
print("Failed to connect to the redis server, retrying.")
counter += 1
else:
break
if counter == num_retries:
raise Exception("Unable to connect to Redis. If the Redis instance is "
"on a different machine, check that your firewall is "
"configured properly.")
def _autodetect_num_gpus():
"""Attempt to detect the number of GPUs on this machine.
TODO(rkn): This currently assumes Nvidia GPUs and Linux.
Returns:
The number of GPUs if any were detected, otherwise 0.
"""
proc_gpus_path = "/proc/driver/nvidia/gpus"
if os.path.isdir(proc_gpus_path):
return len(os.listdir(proc_gpus_path))
return 0
def _compute_version_info():
"""Compute the versions of Python, pyarrow, and Ray.
Returns:
A tuple containing the version information.
"""
ray_version = ray.__version__
python_version = ".".join(map(str, sys.version_info[:3]))
pyarrow_version = pyarrow.__version__
return (ray_version, python_version, pyarrow_version)
def _put_version_info_in_redis(redis_client):
"""Store version information in Redis.
This will be used to detect if workers or drivers are started using
different versions of Python, pyarrow, or Ray.
Args:
redis_client: A client for the primary Redis shard.
"""
redis_client.set("VERSION_INFO", json.dumps(_compute_version_info()))
def check_version_info(redis_client):
"""Check if various version info of this process is correct.
This will be used to detect if workers or drivers are started using
different versions of Python, pyarrow, or Ray. If the version
information is not present in Redis, then no check is done.
Args:
redis_client: A client for the primary Redis shard.
Raises:
Exception: An exception is raised if there is a version mismatch.
"""
redis_reply = redis_client.get("VERSION_INFO")
# Don't do the check if there is no version information in Redis. This
# is to make it easier to do things like start the processes by hand.
if redis_reply is None:
return
true_version_info = tuple(json.loads(redis_reply.decode("ascii")))
version_info = _compute_version_info()
if version_info != true_version_info:
node_ip_address = ray.services.get_node_ip_address()
error_message = ("Version mismatch: The cluster was started with:\n"
" Ray: " + true_version_info[0] + "\n"
" Python: " + true_version_info[1] + "\n"
" Pyarrow: " + str(true_version_info[2]) + "\n"
"This process on node " + node_ip_address +
" was started with:" + "\n"
" Ray: " + version_info[0] + "\n"
" Python: " + version_info[1] + "\n"
" Pyarrow: " + str(version_info[2]))
if version_info[:2] != true_version_info[:2]:
raise Exception(error_message)
else:
print(error_message)
def start_credis(node_ip_address,
port=None,
redirect_output=False,
cleanup=True):
"""Start the credis global state store.
Credis is a chain replicated reliable redis store. It consists
of one master process that acts as a controller and a number of
chain members (currently two, the head and the tail).
Args:
node_ip_address: The IP address of the current node. This is only used
for recording the log filenames in Redis.
port (int): If provided, the primary Redis shard will be started on
this port.
redirect_output (bool): True if output should be redirected to a file
and false otherwise.
cleanup (bool): True if using Ray in local mode. If cleanup is true,
then all Redis processes started by this method will be killed by
services.cleanup() when the Python process that imported services
exits.
Returns:
The address (ip_address:port) of the credis master process.
"""
components = ["credis_master", "credis_head", "credis_tail"]
modules = [CREDIS_MASTER_MODULE, CREDIS_MEMBER_MODULE,
CREDIS_MEMBER_MODULE]
ports = []
for i, component in enumerate(components):
stdout_file, stderr_file = new_log_files(
component, redirect_output)
new_port, _ = start_redis_instance(
node_ip_address=node_ip_address, port=port,
stdout_file=stdout_file, stderr_file=stderr_file,
cleanup=cleanup,
module=modules[i],
executable=CREDIS_EXECUTABLE)
ports.append(new_port)
[master_port, head_port, tail_port] = ports
# Connect the members to the master
master_client = redis.StrictRedis(host=node_ip_address, port=master_port)
master_client.execute_command("MASTER.ADD", node_ip_address, head_port)
master_client.execute_command("MASTER.ADD", node_ip_address, tail_port)
return address(node_ip_address, master_port)
def start_redis(node_ip_address,
port=None,
redis_shard_ports=None,
num_redis_shards=1,
redis_max_clients=None,
redirect_output=False,
redirect_worker_output=False,
cleanup=True):
"""Start the Redis global state store.
Args:
node_ip_address: The IP address of the current node. This is only used
for recording the log filenames in Redis.
port (int): If provided, the primary Redis shard will be started on
this port.
redis_shard_ports: A list of the ports to use for the non-primary Redis
shards.
num_redis_shards (int): If provided, the number of Redis shards to
start, in addition to the primary one. The default value is one
shard.
redis_max_clients: If this is provided, Ray will attempt to configure
Redis with this maxclients number.
redirect_output (bool): True if output should be redirected to a file
and false otherwise.
redirect_worker_output (bool): True if worker output should be
redirected to a file and false otherwise. Workers will have access
to this value when they start up.
cleanup (bool): True if using Ray in local mode. If cleanup is true,
then all Redis processes started by this method will be killed by
services.cleanup() when the Python process that imported services
exits.
Returns:
A tuple of the address for the primary Redis shard and a list of
addresses for the remaining shards.
"""
redis_stdout_file, redis_stderr_file = new_log_files(
"redis", redirect_output)
if redis_shard_ports is None:
redis_shard_ports = num_redis_shards * [None]
elif len(redis_shard_ports) != num_redis_shards:
raise Exception("The number of Redis shard ports does not match the "
"number of Redis shards.")
assigned_port, _ = start_redis_instance(
node_ip_address=node_ip_address, port=port,
redis_max_clients=redis_max_clients,
stdout_file=redis_stdout_file, stderr_file=redis_stderr_file,
cleanup=cleanup)
if port is not None:
assert assigned_port == port
port = assigned_port
redis_address = address(node_ip_address, port)
# Register the number of Redis shards in the primary shard, so that clients
# know how many redis shards to expect under RedisShards.
redis_client = redis.StrictRedis(host=node_ip_address, port=port)
redis_client.set("NumRedisShards", str(num_redis_shards))
# Put the redirect_worker_output bool in the Redis shard so that workers
# can access it and know whether or not to redirect their output.
redis_client.set("RedirectOutput", 1 if redirect_worker_output else 0)
# Store version information in the primary Redis shard.
_put_version_info_in_redis(redis_client)
# Start other Redis shards. Each Redis shard logs to a separate file,
# prefixed by "redis-<shard number>".
redis_shards = []
for i in range(num_redis_shards):
redis_stdout_file, redis_stderr_file = new_log_files(
"redis-{}".format(i), redirect_output)
redis_shard_port, _ = start_redis_instance(
node_ip_address=node_ip_address,
port=redis_shard_ports[i],
redis_max_clients=redis_max_clients,
stdout_file=redis_stdout_file, stderr_file=redis_stderr_file,
cleanup=cleanup)
if redis_shard_ports[i] is not None:
assert redis_shard_port == redis_shard_ports[i]
shard_address = address(node_ip_address, redis_shard_port)
redis_shards.append(shard_address)
# Store redis shard information in the primary redis shard.
redis_client.rpush("RedisShards", shard_address)
return redis_address, redis_shards
def start_redis_instance(node_ip_address="127.0.0.1",
port=None,
redis_max_clients=None,
num_retries=20,
stdout_file=None,
stderr_file=None,
cleanup=True,
executable=REDIS_EXECUTABLE,
module=REDIS_MODULE):
"""Start a single Redis server.
Args:
node_ip_address (str): The IP address of the current node. This is only
used for recording the log filenames in Redis.
port (int): If provided, start a Redis server with this port.
redis_max_clients: If this is provided, Ray will attempt to configure
Redis with this maxclients number.
num_retries (int): The number of times to attempt to start Redis. If a
port is provided, this defaults to 1.
stdout_file: A file handle opened for writing to redirect stdout to. If
no redirection should happen, then this should be None.
stderr_file: A file handle opened for writing to redirect stderr to. If
no redirection should happen, then this should be None.
cleanup (bool): True if using Ray in local mode. If cleanup is true,
then this process will be killed by serices.cleanup() when the
Python process that imported services exits.
executable (str): Full path tho the redis-server executable.
module (str): Full path to the redis module that will be loaded in this
redis server.
Returns:
A tuple of the port used by Redis and a handle to the process that was
started. If a port is passed in, then the returned port value is
the same.
Raises:
Exception: An exception is raised if Redis could not be started.
"""
assert os.path.isfile(executable)
assert os.path.isfile(module)
counter = 0
if port is not None:
# If a port is specified, then try only once to connect.
num_retries = 1
else:
port = new_port()
while counter < num_retries:
if counter > 0:
print("Redis failed to start, retrying now.")
p = subprocess.Popen([executable,
"--port", str(port),
"--loglevel", "warning",
"--loadmodule", module],
stdout=stdout_file, stderr=stderr_file)
time.sleep(0.1)
# Check if Redis successfully started (or at least if it the executable
# did not exit within 0.1 seconds).
if p.poll() is None:
if cleanup:
all_processes[PROCESS_TYPE_REDIS_SERVER].append(p)
break
port = new_port()
counter += 1
if counter == num_retries:
raise Exception("Couldn't start Redis.")
# Create a Redis client just for configuring Redis.
redis_client = redis.StrictRedis(host="127.0.0.1", port=port)
# Wait for the Redis server to start.
wait_for_redis_to_start("127.0.0.1", port)
# Configure Redis to generate keyspace notifications. TODO(rkn): Change
# this to only generate notifications for the export keys.
redis_client.config_set("notify-keyspace-events", "Kl")
# Configure Redis to not run in protected mode so that processes on other
# hosts can connect to it. TODO(rkn): Do this in a more secure way.
redis_client.config_set("protected-mode", "no")
# If redis_max_clients is provided, attempt to raise the number of maximum
# number of Redis clients.
if redis_max_clients is not None:
redis_client.config_set("maxclients", str(redis_max_clients))
else:
# If redis_max_clients is not provided, determine the current ulimit.
# We will use this to attempt to raise the maximum number of Redis
# clients.
current_max_clients = int(
redis_client.config_get("maxclients")["maxclients"])
# The below command should be the same as doing ulimit -n.
ulimit_n = resource.getrlimit(resource.RLIMIT_NOFILE)[0]
# The quantity redis_client_buffer appears to be the required buffer
# between the maximum number of redis clients and ulimit -n. That is,
# if ulimit -n returns 10000, then we can set maxclients to
# 10000 - redis_client_buffer.
redis_client_buffer = 32
if current_max_clients < ulimit_n - redis_client_buffer:
redis_client.config_set("maxclients",
ulimit_n - redis_client_buffer)
# Increase the hard and soft limits for the redis client pubsub buffer to
# 128MB. This is a hack to make it less likely for pubsub messages to be
# dropped and for pubsub connections to therefore be killed.
cur_config = (redis_client.config_get("client-output-buffer-limit")
["client-output-buffer-limit"])
cur_config_list = cur_config.split()
assert len(cur_config_list) == 12
cur_config_list[8:] = ["pubsub", "134217728", "134217728", "60"]
redis_client.config_set("client-output-buffer-limit",
" ".join(cur_config_list))
# Put a time stamp in Redis to indicate when it was started.
redis_client.set("redis_start_time", time.time())
# Record the log files in Redis.
record_log_files_in_redis(address(node_ip_address, port), node_ip_address,
[stdout_file, stderr_file])
return port, p
def start_log_monitor(redis_address, node_ip_address, stdout_file=None,
stderr_file=None, cleanup=cleanup):
"""Start a log monitor process.
Args:
redis_address (str): The address of the Redis instance.
node_ip_address (str): The IP address of the node that this log monitor
is running on.
stdout_file: A file handle opened for writing to redirect stdout to. If
no redirection should happen, then this should be None.
stderr_file: A file handle opened for writing to redirect stderr to. If
no redirection should happen, then this should be None.
cleanup (bool): True if using Ray in local mode. If cleanup is true,
then this process will be killed by services.cleanup() when the
Python process that imported services exits.
"""
log_monitor_filepath = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
"log_monitor.py")
p = subprocess.Popen([sys.executable, "-u", log_monitor_filepath,
"--redis-address", redis_address,
"--node-ip-address", node_ip_address],
stdout=stdout_file, stderr=stderr_file)
if cleanup:
all_processes[PROCESS_TYPE_LOG_MONITOR].append(p)
record_log_files_in_redis(redis_address, node_ip_address,
[stdout_file, stderr_file])
def start_global_scheduler(redis_address, node_ip_address,
stdout_file=None, stderr_file=None, cleanup=True):
"""Start a global scheduler process.
Args:
redis_address (str): The address of the Redis instance.
node_ip_address: The IP address of the node that this scheduler will
run on.
stdout_file: A file handle opened for writing to redirect stdout to. If
no redirection should happen, then this should be None.
stderr_file: A file handle opened for writing to redirect stderr to. If
no redirection should happen, then this should be None.
cleanup (bool): True if using Ray in local mode. If cleanup is true,
then this process will be killed by services.cleanup() when the
Python process that imported services exits.
"""
p = global_scheduler.start_global_scheduler(redis_address,
node_ip_address,
stdout_file=stdout_file,
stderr_file=stderr_file)
if cleanup:
all_processes[PROCESS_TYPE_GLOBAL_SCHEDULER].append(p)
record_log_files_in_redis(redis_address, node_ip_address,
[stdout_file, stderr_file])
def start_ui(redis_address, stdout_file=None, stderr_file=None, cleanup=True):
"""Start a UI process.
Args:
redis_address: The address of the primary Redis shard.
stdout_file: A file handle opened for writing to redirect stdout to. If
no redirection should happen, then this should be None.
stderr_file: A file handle opened for writing to redirect stderr to. If
no redirection should happen, then this should be None.
cleanup (bool): True if using Ray in local mode. If cleanup is true,
then this process will be killed by services.cleanup() when the
Python process that imported services exits.
"""
new_env = os.environ.copy()
notebook_filepath = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
"WebUI.ipynb")
# We copy the notebook file so that the original doesn't get modified by
# the user.
random_ui_id = random.randint(0, 100000)
new_notebook_filepath = "/tmp/raylogs/ray_ui{}.ipynb".format(random_ui_id)
new_notebook_directory = os.path.dirname(new_notebook_filepath)
shutil.copy(notebook_filepath, new_notebook_filepath)
port = 8888
while True:
try:
port_test_socket = socket.socket()
port_test_socket.bind(("127.0.0.1", port))
port_test_socket.close()
break
except socket.error:
port += 1
new_env = os.environ.copy()
new_env["REDIS_ADDRESS"] = redis_address
# We generate the token used for authentication ourselves to avoid
# querying the jupyter server.
token = binascii.hexlify(os.urandom(24)).decode("ascii")
command = ["jupyter", "notebook", "--no-browser",
"--port={}".format(port),
"--NotebookApp.iopub_data_rate_limit=10000000000",
"--NotebookApp.open_browser=False",
"--NotebookApp.token={}".format(token)]
# If the user is root, add the --allow-root flag.
if os.geteuid() == 0:
command.append("--allow-root")
try:
ui_process = subprocess.Popen(command, env=new_env,
cwd=new_notebook_directory,
stdout=stdout_file, stderr=stderr_file)
except Exception:
print("Failed to start the UI, you may need to run "
"'pip install jupyter'.")
else:
if cleanup:
all_processes[PROCESS_TYPE_WEB_UI].append(ui_process)
webui_url = ("http://localhost:{}/notebooks/ray_ui{}.ipynb?token={}"
.format(port, random_ui_id, token))
print("\n" + "=" * 70)
print("View the web UI at {}".format(webui_url))
print("=" * 70 + "\n")
return webui_url
def start_local_scheduler(redis_address,
node_ip_address,
plasma_store_name,
plasma_manager_name,
worker_path,
plasma_address=None,
stdout_file=None,
stderr_file=None,
cleanup=True,
resources=None,
num_workers=0):
"""Start a local scheduler process.
Args:
redis_address (str): The address of the Redis instance.
node_ip_address (str): The IP address of the node that this local
scheduler is running on.
plasma_store_name (str): The name of the plasma store socket to connect
to.
plasma_manager_name (str): The name of the plasma manager socket to
connect to.
worker_path (str): The path of the script to use when the local
scheduler starts up new workers.
stdout_file: A file handle opened for writing to redirect stdout to. If
no redirection should happen, then this should be None.
stderr_file: A file handle opened for writing to redirect stderr to. If
no redirection should happen, then this should be None.
cleanup (bool): True if using Ray in local mode. If cleanup is true,
then this process will be killed by serices.cleanup() when the
Python process that imported services exits.
resources: A dictionary mapping the name of a resource to the available
quantity of that resource.
num_workers (int): The number of workers that the local scheduler
should start.
Return:
The name of the local scheduler socket.
"""
if resources is None:
resources = {}
if "CPU" not in resources:
# By default, use the number of hardware execution threads for the
# number of cores.
resources["CPU"] = psutil.cpu_count()
# See if CUDA_VISIBLE_DEVICES has already been set.
gpu_ids = ray.utils.get_cuda_visible_devices()
# Check that the number of GPUs that the local scheduler wants doesn't
# excede the amount allowed by CUDA_VISIBLE_DEVICES.
if ("GPU" in resources and gpu_ids is not None and
resources["GPU"] > len(gpu_ids)):
raise Exception("Attempting to start local scheduler with {} GPUs, "
"but CUDA_VISIBLE_DEVICES contains {}.".format(
resources["GPU"], gpu_ids))
if "GPU" not in resources:
# Try to automatically detect the number of GPUs.
resources["GPU"] = _autodetect_num_gpus()
# Don't use more GPUs than allowed by CUDA_VISIBLE_DEVICES.
if gpu_ids is not None:
resources["GPU"] = min(resources["GPU"], len(gpu_ids))
print("Starting local scheduler with the following resources: {}."
.format(resources))
local_scheduler_name, p = ray.local_scheduler.start_local_scheduler(
plasma_store_name,
plasma_manager_name,
worker_path=worker_path,
node_ip_address=node_ip_address,
redis_address=redis_address,
plasma_address=plasma_address,
use_profiler=RUN_LOCAL_SCHEDULER_PROFILER,
stdout_file=stdout_file,
stderr_file=stderr_file,
static_resources=resources,
num_workers=num_workers)
if cleanup:
all_processes[PROCESS_TYPE_LOCAL_SCHEDULER].append(p)
record_log_files_in_redis(redis_address, node_ip_address,
[stdout_file, stderr_file])
return local_scheduler_name
def start_objstore(node_ip_address, redis_address,
object_manager_port=None, store_stdout_file=None,
store_stderr_file=None, manager_stdout_file=None,
manager_stderr_file=None, objstore_memory=None,
cleanup=True, plasma_directory=None,
huge_pages=False):
"""This method starts an object store process.
Args:
node_ip_address (str): The IP address of the node running the object
store.
redis_address (str): The address of the Redis instance to connect to.
object_manager_port (int): The port to use for the object manager. If
this is not provided, one will be generated randomly.
store_stdout_file: A file handle opened for writing to redirect stdout
to. If no redirection should happen, then this should be None.
store_stderr_file: A file handle opened for writing to redirect stderr
to. If no redirection should happen, then this should be None.
manager_stdout_file: A file handle opened for writing to redirect
stdout to. If no redirection should happen, then this should be
None.
manager_stderr_file: A file handle opened for writing to redirect
stderr to. If no redirection should happen, then this should be
None.
objstore_memory: The amount of memory (in bytes) to start the object
store with.
cleanup (bool): True if using Ray in local mode. If cleanup is true,
then this process will be killed by serices.cleanup() when the
Python process that imported services exits.
plasma_directory: A directory where the Plasma memory mapped files will
be created.
huge_pages: Boolean flag indicating whether to start the Object
Store with hugetlbfs support. Requires plasma_directory.
Return:
A tuple of the Plasma store socket name, the Plasma manager socket
name, and the plasma manager port.
"""
if objstore_memory is None:
# Compute a fraction of the system memory for the Plasma store to use.
system_memory = psutil.virtual_memory().total
if sys.platform == "linux" or sys.platform == "linux2":
# On linux we use /dev/shm, its size is half the size of the
# physical memory. To not overflow it, we set the plasma memory
# limit to 0.4 times the size of the physical memory.
objstore_memory = int(system_memory * 0.4)
# Compare the requested memory size to the memory available in
# /dev/shm.
shm_fd = os.open("/dev/shm", os.O_RDONLY)
try:
shm_fs_stats = os.fstatvfs(shm_fd)
# The value shm_fs_stats.f_bsize is the block size and the
# value shm_fs_stats.f_bavail is the number of available
# blocks.
shm_avail = shm_fs_stats.f_bsize * shm_fs_stats.f_bavail
if objstore_memory > shm_avail:
print("Warning: Reducing object store memory because "
"/dev/shm has only {} bytes available. You may be "
"able to free up space by deleting files in "
"/dev/shm. If you are inside a Docker container, "
"you may need to pass an argument with the flag "
"'--shm-size' to 'docker run'.".format(shm_avail))
objstore_memory = int(shm_avail * 0.8)
finally:
os.close(shm_fd)
else:
objstore_memory = int(system_memory * 0.8)
# Start the Plasma store.
plasma_store_name, p1 = ray.plasma.start_plasma_store(
plasma_store_memory=objstore_memory,
use_profiler=RUN_PLASMA_STORE_PROFILER,
stdout_file=store_stdout_file,
stderr_file=store_stderr_file,
plasma_directory=plasma_directory,
huge_pages=huge_pages)
# Start the plasma manager.
if object_manager_port is not None:
(plasma_manager_name, p2,
plasma_manager_port) = ray.plasma.start_plasma_manager(
plasma_store_name,
redis_address,
plasma_manager_port=object_manager_port,
node_ip_address=node_ip_address,
num_retries=1,
run_profiler=RUN_PLASMA_MANAGER_PROFILER,
stdout_file=manager_stdout_file,
stderr_file=manager_stderr_file)
assert plasma_manager_port == object_manager_port
else:
(plasma_manager_name, p2,
plasma_manager_port) = ray.plasma.start_plasma_manager(
plasma_store_name,
redis_address,
node_ip_address=node_ip_address,
run_profiler=RUN_PLASMA_MANAGER_PROFILER,
stdout_file=manager_stdout_file,
stderr_file=manager_stderr_file)
if cleanup:
all_processes[PROCESS_TYPE_PLASMA_STORE].append(p1)
all_processes[PROCESS_TYPE_PLASMA_MANAGER].append(p2)
record_log_files_in_redis(redis_address, node_ip_address,
[store_stdout_file, store_stderr_file,
manager_stdout_file, manager_stderr_file])
return ObjectStoreAddress(plasma_store_name, plasma_manager_name,
plasma_manager_port)
def start_worker(node_ip_address, object_store_name, object_store_manager_name,
local_scheduler_name, redis_address, worker_path,
stdout_file=None, stderr_file=None, cleanup=True):
"""This method starts a worker process.
Args:
node_ip_address (str): The IP address of the node that this worker is
running on.
object_store_name (str): The name of the object store.
object_store_manager_name (str): The name of the object store manager.
local_scheduler_name (str): The name of the local scheduler.
redis_address (str): The address that the Redis server is listening on.
worker_path (str): The path of the source code which the worker process
will run.
stdout_file: A file handle opened for writing to redirect stdout to. If
no redirection should happen, then this should be None.
stderr_file: A file handle opened for writing to redirect stderr to. If
no redirection should happen, then this should be None.
cleanup (bool): True if using Ray in local mode. If cleanup is true,
then this process will be killed by services.cleanup() when the
Python process that imported services exits. This is True by
default.
"""
command = [sys.executable,
"-u",
worker_path,
"--node-ip-address=" + node_ip_address,
"--object-store-name=" + object_store_name,
"--object-store-manager-name=" + object_store_manager_name,
"--local-scheduler-name=" + local_scheduler_name,
"--redis-address=" + str(redis_address)]
p = subprocess.Popen(command, stdout=stdout_file, stderr=stderr_file)
if cleanup:
all_processes[PROCESS_TYPE_WORKER].append(p)
record_log_files_in_redis(redis_address, node_ip_address,
[stdout_file, stderr_file])
def start_monitor(redis_address, node_ip_address, stdout_file=None,
stderr_file=None, cleanup=True, autoscaling_config=None):
"""Run a process to monitor the other processes.