forked from pixie-lang/pixie
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathio.pxi
More file actions
214 lines (186 loc) · 6.49 KB
/
io.pxi
File metadata and controls
214 lines (186 loc) · 6.49 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
(ns pixie.io
(:require [pixie.streams :as st :refer :all]
[pixie.streams.utf8 :as utf8]
[pixie.io-blocking :as io-blocking]
[pixie.uv :as uv]
[pixie.stacklets :as st]
[pixie.ffi :as ffi]
[pixie.ffi-infer :as ffi-infer]))
(uv/defuvfsfn fs_open [path flags mode] :result)
(uv/defuvfsfn fs_read [file bufs nbufs offset] :result)
(uv/defuvfsfn fs_write [file bufs nbufs offset] :result)
(uv/defuvfsfn fs_close [file] :result)
(def DEFAULT-BUFFER-SIZE 1024)
(deftype FileStream [fp offset uvbuf]
IInputStream
(read [this buffer len]
(assert (<= (buffer-capacity buffer) len)
"Not enough capacity in the buffer")
(let [_ (pixie.ffi/set! uvbuf :base buffer)
_ (pixie.ffi/set! uvbuf :len (buffer-capacity buffer))
read-count (fs_read fp uvbuf 1 offset)]
(assert (not (neg? read-count)) "Read Error")
(set-field! this :offset (+ offset read-count))
(set-buffer-count! buffer read-count)
read-count))
ISeekableStream
(position [this]
offset)
(rewind [this]
(set-field! this :offset 0))
(seek [this pos]
(set-field! this :offset pos))
IDisposable
(-dispose! [this]
(dispose! uvbuf)
(fs_close fp))
IReduce
(-reduce [this f init]
(let [buf (buffer DEFAULT-BUFFER-SIZE)
rrf (preserving-reduced f)]
(loop [acc init]
(let [read-count (read this buf DEFAULT-BUFFER-SIZE)]
(if (> read-count 0)
(let [result (reduce rrf acc buf)]
(if (not (reduced? result))
(recur result)
@result))
acc))))))
(defn open-read
{:doc "Open a file for reading, returning a IInputStream"
:added "0.1"}
[filename]
(assert (string? filename) "Filename must be a string")
(->FileStream (fs_open filename uv/O_RDONLY 0) 0 (uv/uv_buf_t)))
(defn read-line
"Read one line from input-stream for each invocation.
nil when all lines have been read"
[input-stream]
(let [line-feed (into #{} (map int [\newline \return]))
buf (buffer 1)]
(loop [acc []]
(let [len (read input-stream buf 1)]
(cond
(and (pos? len) (not (line-feed (first buf))))
(recur (conj acc (first buf)))
(and (zero? len) (empty? acc)) nil
:else (apply str (map char acc)))))))
(defn line-seq
"Returns the lines of text from input-stream as a lazy sequence of strings.
input-stream must implement IInputStream"
[input-stream]
(when-let [line (read-line input-stream)]
(cons line (lazy-seq (line-seq input-stream)))))
(deftype FileOutputStream [fp offset uvbuf]
IOutputStream
(write [this buffer]
(loop [buffer-offset 0]
(let [_ (pixie.ffi/set! uvbuf :base (ffi/ptr-add buffer buffer-offset))
_ (pixie.ffi/set! uvbuf :len (- (count buffer) buffer-offset))
write-count (fs_write fp uvbuf 1 offset)]
(when (neg? write-count)
(throw (uv/uv_err_name read-count)))
(set-field! this :offset (+ offset write-count))
(if (< (+ buffer-offset write-count) (count buffer))
(recur (+ buffer-offset write-count))
write-count))))
IDisposable
(-dispose! [this]
(fclose fp)))
(deftype BufferedOutputStream [downstream idx buffer]
IByteOutputStream
(write-byte [this val]
(pixie.ffi/pack! buffer idx CUInt8 val)
(set-field! this :idx (inc idx))
(when (= idx (buffer-capacity buffer))
(set-buffer-count! buffer (buffer-capacity buffer))
(write downstream buffer)
(set-field! this :idx 0)))
IDisposable
(-dispose! [this]
(set-buffer-count! buffer idx)
(write downstream buffer))
IFlushableStream
(flush [this]
(set-buffer-count! buffer idx)
(set-field! this :idx 0)
(write downstream buffer)))
(deftype BufferedInputStream [upstream idx buffer]
IByteInputStream
(read-byte [this]
(when (= idx (count buffer))
(set-field! this :idx 0)
(read upstream buffer (buffer-capacity buffer)))
(let [val (nth buffer idx)]
(set-field! this :idx (inc idx))
val))
IDisposable
(-dispose! [this]
(dispose! buffer)))
(defn buffered-output-stream
([downstream]
(buffered-output-stream downstream DEFAULT-BUFFER-SIZE))
([downstream size]
(->BufferedOutputStream downstream 0 (buffer size))))
(defn buffered-input-stream
([upstream]
(buffered-input-stream upstream DEFAULT-BUFFER-SIZE))
([upstream size]
(let [b (buffer size)]
(set-buffer-count! b size)
(->BufferedInputStream upstream size b))))
(defn throw-on-error [result]
(when (neg? result)
(throw (uv/uv_err_name result)))
result)
(defn open-write
{:doc "Open a file for writing, returning a IOutputStream"
:added "0.1"}
[filename]
(assert (string? filename) "Filename must be a string")
(->FileOutputStream (throw-on-error (fs_open filename
(bit-or uv/O_WRONLY uv/O_CREAT)
uv/S_IRWXU))
0
(uv/uv_buf_t)))
(defn spit
"Writes the content to output. Output must be a file or an IOutputStream."
[output content]
(cond
(string? output)
(transduce (map identity)
(-> output
open-write
buffered-output-stream
utf8/utf8-output-stream-rf)
(str content))
(satisfies? IOutputStream output)
(transduce (map identity)
(-> output
buffered-output-stream
utf8/utf8-output-stream-rf)
(str content))
:else (throw "Expected a string or IOutputStream")))
(defn slurp
"Reads in the contents of input. Input must be a filename or an IInputStream"
[input]
(let [stream (cond
(string? input) (open-read input)
(satisfies? IInputStream input) input
:else (throw "Expected a string or an IInputStream"))
result (transduce
(map char)
string-builder
stream)]
(dispose! stream)
result))
(defn run-command [command]
(st/apply-blocking io-blocking/run-command command))
(comment
(st/apply-blocking println "FROM OTHER THREAD <---!!!!!")
(tcp-server "0.0.0.0" 4242 nil))
(comment
(defmacro make-readline-async []
`(let [libname ~(ffi-infer/compile-library {:prefix "pixie.io.readline"
:includes ["uv.h" "editline/readline.h"]})]))
(ffi-infer/compile-library))