-
Notifications
You must be signed in to change notification settings - Fork 59
Expand file tree
/
Copy pathprocesses.ML
More file actions
392 lines (357 loc) · 16.9 KB
/
Copy pathprocesses.ML
File metadata and controls
392 lines (357 loc) · 16.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
(*
Title: Process package for ML.
Author: David C. J. Matthews
Copyright (c) 2007
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
version 2.1 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this library; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*)
(* This is provided for backwards compatibility. New programs should use
the Thread structure directly. *)
structure Process:
sig
type 'a channel
val channel: unit -> '_a channel
val send: 'a channel * 'a -> unit
val receive: 'a channel -> 'a
val fork: (unit->unit)->unit
val console: (unit->unit)->(unit->unit)
val choice: (unit->unit)*(unit->unit)->unit
val interruptConsoleProcesses: unit->unit
end =
struct
open Thread.Thread Thread.Mutex Thread.ConditionVar
val debug = ref false and identifiers = ref 0 and ids = ref 0;
(* Each process created by fork, console or choice has this information
as thread-local data. *)
datatype processData = PROC of {
synchro: (synchroniser * direction) list ref, (* The synchroniser chain. *)
blocker: conditionVar, (* Condition var to block this process. *)
processNo: int (* An identifier for debugging. *)
}
and synchroniser =
SYNCH of {
state: choicestate ref, (* The state of this choice. *)
synchLock: mutex (* A mutex to protect the state variable. *)
}
and choicestate = ChoiceUntaken | ChoiceTaken of direction
and direction = DirLeft | DirRight;
val procTag = Universal.tag(): processData Universal.tag
(* Get the process data for this thread. If it was created by a Thread call
directly it may not yet have any process data so we need to make it now. *)
fun get_process_data(): processData =
case getLocal procTag of
SOME p => p
| NONE =>
let
val pnum = (identifiers := !identifiers+1; !identifiers);
val pData = PROC { synchro = ref [], blocker = conditionVar(), processNo = pnum }
in
setLocal(procTag, pData);
pData
end
datatype 'a channel =
CHAN of {senders: 'a procVal list ref,
receivers: 'a option ref procVal list ref,
chanLock: mutex,
Id: int}
(* This represents a suspended process. The 'a is either a value to be
sent or a "basket", a reference to hold the result. *)
withtype 'a procVal = conditionVar * processData * 'a
fun channel () : 'a channel =
CHAN {senders = ref [], receivers = ref [],
chanLock = mutex(), Id = (ids := !ids+1; !ids) }
datatype 'a synchResult = NoMatch | FoundMatch of 'a procVal
(* Prunes the synchroniser list to remove committed choices.
Returns the first non-committed synchroniser or the first committed
synchroniser with a choice that is taken in the "wrong" direction
(i.e. which ndicates that this process must not be allowed to communicate). *)
fun getActiveSynchroniser(PROC{synchro, ...}, unlockAfter) =
let
fun getSynch [] = []
| getSynch(l as (SYNCH{state, synchLock}, dir) :: t) =
(
lock synchLock;
case ! state of
ChoiceUntaken => (* This is untaken. Stop here. *)
(
if unlockAfter
then unlock synchLock
else (getSynch t; ()); (* We need to lock any others. *)
l
)
| ChoiceTaken d =>
(* This is taken. Stop here if it is taken in a different way. *)
(
unlock synchLock;
if d = dir
then getSynch t
else l
)
)
val newSynchList = getSynch(! synchro)
in
(* We can update the list for this process. We don't need to lock the
synchro variable since it is only updated either by the process itself or
when this process is waiting on a channel, which is locked before access. *)
synchro := newSynchList;
newSynchList
end
(* Try to find a matching process. toSearch is the list of corresponding
processes i.e. receivers if we are trying to send, senders if we are
trying to receive. The result is a pair of the updated version of the
toSearch, with the matching process removed if a match has been found
and the matching process's data. *)
fun synchronise (toSearch: 'a procVal list, thisProcess) :
'a procVal list * 'a synchResult =
let
(* Release the lock on the synchroniser for the process that is looking for
a partner. This is only called if no matching process can be found. *)
fun releaseLock(PROC{synchro = ref synchro, ...}) =
List.app
(fn (SYNCH{synchLock, state=ref ChoiceUntaken}, _) => unlock synchLock
| _ => ()) synchro
(* Commit the choices and release the locks. If some entries are shared
between the two processes then we may find some entries already set. *)
fun commitChoices(PROC{synchro=synchro as ref synch, ...}) =
(
List.app
(fn (SYNCH{synchLock, state=state as ref ChoiceUntaken}, dir) =>
(state := ChoiceTaken dir; unlock synchLock)
| _ => ()) synch;
synchro := [] (* Since all are taken we can set this to the empty list. *)
)
(* Get the first synchroniser and lock it unless it is already committed. *)
val mySynch = getActiveSynchroniser(thisProcess, false (* Leave locked. *))
(* Get the list of synchronisers for a potential matching process. Generally
any process on the sender list will match a receiver and vice versa. The
exception is if the two processes are alternative choices. We have to be
careful with the synchroniser lists. We've already locked the list for our
process so we mustn't lock any synchronisers that are shared. *)
datatype matchResult =
MrTaken | MrAlternatives | MrOK of (synchroniser * direction) list
fun getMatchingSynchs(PROC{synchro, ...}) =
let
fun getSynch([], _) = MrOK []
| getSynch(l as (SYNCH{state, ...}, dir) :: t,
myL as (SYNCH{state=s, ...}, myDir) :: myT) =
if s <> state
then (* Different references - safe to lock. *)
lockSynch(l, myL)
else (* Same reference - already locked. *)
if dir <> myDir (* These are different choices. *)
then MrAlternatives (* Not allowed to communicate. *)
else (* OK, same choice: test the rest*)
getSynch(t, myT)
| getSynch(l, []) =
(* The list of synchronisers for the original process is empty or
has run out before this. *)
lockSynch(l, [])
and lockSynch(l as (SYNCH{state, synchLock}, dir) :: t, myL) =
(
lock synchLock;
case ! state of
ChoiceUntaken => (* This is untaken. Stop here. *)
(
getSynch(t, myL); (* We need to lock any others. *)
MrOK l
)
| ChoiceTaken d =>
(* This is taken. Stop here if it is taken in a different way. *)
(
unlock synchLock;
if d = dir
then getSynch(t, myL)
else MrTaken
)
)
| lockSynch _ = raise Match (* Suppress warning *)
in
getSynch(! synchro, mySynch)
end
fun findAProcess [] =
(* Find a process that matches and return the new list of partners
and the new list of runnable processes. *)
(* No match *) ([], NoMatch)
| findAProcess((entry as (_,d,_)) :: t) =
case getMatchingSynchs d of
MrTaken =>
(* This process is a committed choice in a different direction. Drop
it from the list since it can never communicate. *)
findAProcess t
| MrAlternatives =>
(* This process is an alternative choice with our process. It can
still communicate, just not with us. Skip this and try the next. *)
let
val (clist, result) = findAProcess t
in
(entry :: clist, result)
end
| MrOK _ =>
(t, FoundMatch entry) (* Return the new list. *)
in
case mySynch of
(SYNCH{state = ref (ChoiceTaken _), ...}, _) :: _ =>
(* This choice is already taken - kill this process.
Actually all we do at this stage is pretend that the process
cannot communicate, and suspend it. Later it may be removed
from the channel. *)
(toSearch, NoMatch)
| _ => (* No synch or uncommitted choice. *)
case findAProcess toSearch of
t as (_, NoMatch) => (releaseLock thisProcess; t)
| t as (_, FoundMatch(_,p,_)) =>
(commitChoices thisProcess; commitChoices p; t)
end
(* We need to ensure that interrupts are delivered synchronously when
synchronising rather than risk receiving an interrupt while holding a lock. *)
fun blockInterrupt (f: unit->'a) =
let
open Thread
val oldState = getAttributes()
in
case List.find (fn InterruptState _ => true | _ => false) oldState of
SOME(InterruptState InterruptDefer) => f() (* Continue to defer. *)
| SOME(InterruptState InterruptSynch) => f() (* No need to change. *)
| _ => (* Unset(?) or asynchronous. Have to make synchronous. *)
let
val () = setAttributes[InterruptState InterruptSynch]
(* Call the function. It may raise an Interrupt exception if it has to
wait. In that case we still need to restore the old state. *)
val result =
f() handle exn => (setAttributes oldState; raise exn)
val () = setAttributes oldState;
in
result
end
end
fun send (CHAN {senders, receivers, chanLock, ...}, v:'a) =
blockInterrupt(fn () =>
let
val () = lock chanLock;
val myProcessData as PROC { blocker, ...} = get_process_data()
in
case synchronise(!receivers, myProcessData)
of (newlist, FoundMatch (p,_,basket)) (* Success *) =>
(
basket := SOME v; (* Put the sent value into the receiver's basket. *)
receivers := newlist;
signal p; (* Wake up the new thread. *)
unlock chanLock
)
| (newlist, NoMatch) (* Failure *) =>
(* Set the new receiver/sender list to include this process,
and suspend ourselves, releasing the lock. *)
(
senders := (blocker, myProcessData, v) :: !senders;
receivers := newlist;
(* Wait until we're woken up and release the lock.
This may result in an exception but if the exception is
raised the lock will be reacquired so we must unlock it in
the handler. *)
wait(blocker, chanLock)
handle exn => (unlock chanLock; raise exn);
(* We don't need the lock any longer. *)
unlock chanLock
)
end
)
fun receive (CHAN {senders, receivers, chanLock, ...}): 'a =
blockInterrupt(fn () =>
let
val () = lock chanLock;
val myProcessData as PROC { blocker, ...} = get_process_data()
in
case synchronise(!senders, myProcessData)
of (newlist, FoundMatch (p,_,v)) (* Success *) =>
(
senders := newlist;
signal p; (* Wake up the sending thread. *)
unlock chanLock;
v (* This is our result *)
)
| (newlist, NoMatch) (* Failure *) =>
(* Set the new receiver/sender list to include this process,
and suspend ourselves, releasing the lock. *)
let
val basket = ref NONE; (* Create a basket to receive the result. *)
in
receivers := (blocker, myProcessData, basket) :: !receivers;
senders := newlist;
(* Wait until we're woken up and release the lock.
This may result in an exception but if the exception is
raised the lock will be reacquired so we must unlock it in
the handler. *)
wait(blocker, chanLock)
handle exn => (unlock chanLock; raise exn);
(* We don't need the lock any longer. *)
unlock chanLock;
valOf(!basket) (* This should have been set to SOME v by the sender. *)
end
end
)
fun new_process f synch attrs =
(* Make a new process. *)
let
val pnum = (identifiers := !identifiers+1; !identifiers);
val data =
PROC { synchro = ref synch, processNo = pnum, blocker = conditionVar() }
fun fun_to_fork () =
(
setLocal(procTag, data);
(f () handle _ => ())
)
val newproc = fork(fun_to_fork, attrs)
in
if !debug then (PolyML.print("new_process:", data); ()) else ();
newproc
end
fun fork f =
let
(* Get the parent's synchroniser and remove any redundant entries. *)
val synch = getActiveSynchroniser(get_process_data(), true)
val _ =
new_process f synch (* Share the parent's synchroniser. *)
[EnableBroadcastInterrupt false] (* Does not accept broadcasts. *)
in
()
end
and console f =
let
(* Get the parent's synchroniser and remove any redundant entries. *)
val synch = getActiveSynchroniser(get_process_data(), true)
val threadId =
new_process f synch (* Share the parent's synchroniser. *)
[EnableBroadcastInterrupt true] (* Accepts broadcasts. *)
in
(* Return a function that will interrupt the process. *)
fn () => interrupt threadId
end
and choice (f, g) =(* Fork a pair of "choice" processes. *)
let
(* Get the parent's synchroniser and remove any redundant entries. *)
val synch = getActiveSynchroniser(get_process_data(), true)
(* If the parent is already a Choice (whether Taken or not), we
run the new processes in Parallel with it. The reason for this
is that if we have choice( (choice(a,b); c), d) we allow both
a and c (say) to communicate (N.B. "choice" creates two new
processes and returns immediately so c runs in parallel with
a and b). It is actually equivalent to a.c + b.c + d . *)
val newSynch = SYNCH{state = ref ChoiceUntaken, synchLock = mutex()}
in
new_process g (synch @ [(newSynch, DirLeft)])
[EnableBroadcastInterrupt false];
new_process f (synch @ [(newSynch, DirRight)])
[EnableBroadcastInterrupt false];
()
end
val interruptConsoleProcesses = broadcastInterrupt
end;