-
Notifications
You must be signed in to change notification settings - Fork 128
Expand file tree
/
Copy pathcsp.pxi
More file actions
62 lines (49 loc) · 1.5 KB
/
csp.pxi
File metadata and controls
62 lines (49 loc) · 1.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
(ns pixie.csp
(:require [pixie.stacklets :as st]
[pixie.buffers :as b]
[pixie.channels :as chans]))
(def chan chans/chan)
(defn close!
"Closes the channel, future writes will be rejected, future reads will
drain the channel before returning nil."
[c]
(chans/-close! c))
(def -null-callback (fn [_] nil))
(defn put!
"Puts the value into the channel, calling the optional callback when the operation has
completed."
([c v]
(chans/-put! c v -null-callback))
([c v f]
(chans/-put! c v f)))
(defn take!
"Takes a value from a channel, calling the provided callback when completed"
([c f]
(chans/-take! c f)))
(defn >! [c v]
(st/call-cc (fn [k]
(chans/-put! c v (partial st/run-and-process k)))))
(defn <! [c]
(st/call-cc (fn [k]
(chans/-take! c (partial st/run-and-process k)))))
(defmacro go [& body]
`(let [ret-chan# (chans/chan 1)]
(st/spawn (put! ret-chan# (do ~@body))
(close! ret-chan#))
ret-chan#))
(extend -reduce chans/IReadPort
(fn [c f init]
(loop [acc init]
(if (reduced? acc)
@acc
(let [v (<! c)]
(if (nil? v)
acc
(recur (f acc v))))))))
(defn alts!
([ops]
(st/call-cc (fn [k]
(chans/alts! ops (partial st/run-and-process k) nil))))
([ops & opts]
(st/call-cc (fn [k]
(chans/alts! ops (partial st/run-and-process k) (apply hashmap opts))))))