-
Notifications
You must be signed in to change notification settings - Fork 128
Expand file tree
/
Copy pathio-blocking.pxi
More file actions
165 lines (143 loc) · 4.43 KB
/
io-blocking.pxi
File metadata and controls
165 lines (143 loc) · 4.43 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
(ns pixie.io-blocking
(:require [pixie.streams :as st :refer :all]))
(def DEFAULT-BUFFER-SIZE (* 32 1024))
(def fopen (ffi-fn libc "fopen" [CCharP CCharP] CVoidP))
(def fseek (ffi-fn libc "fseek" [CVoidP CInt CInt] CInt))
(def ftell (ffi-fn libc "ftell" [CVoidP] CInt))
(def -rewind (ffi-fn libc "rewind" [CVoidP] CVoidP))
(def fread (ffi-fn libc "fread" [CVoidP CInt CInt CVoidP] CInt))
(def fgetc (ffi-fn libc "fgetc" [CVoidP] CInt))
(def fputc (ffi-fn libc "fputc" [CInt CVoidP] CInt))
(def fwrite (ffi-fn libc "fwrite" [CVoidP CInt CInt CVoidP] CInt))
(def fclose (ffi-fn libc "fclose" [CVoidP] CInt))
(def popen (ffi-fn libc "popen" [CCharP CCharP] CVoidP))
(def pclose (ffi-fn libc "pclose" [CVoidP] CInt))
(defn stream-reducer [this f init]
(let [buf (buffer DEFAULT-BUFFER-SIZE)
rrf (preserving-reduced f)]
(loop [acc init]
(let [read-count (read this buf DEFAULT-BUFFER-SIZE)]
(if (> read-count 0)
(let [result (reduce rrf acc buf)]
(if (not (reduced? result))
(recur result)
@result))
acc)))))
(deftype FileStream [fp]
IInputStream
(read [this buffer len]
(assert (>= (buffer-capacity buffer) len)
"Not enough capacity in the buffer")
(let [read-count (fread buffer 1 len fp)]
(set-buffer-count! buffer read-count)
read-count))
(read-byte [this]
(fgetc buffer))
ISeekableStream
(seek [this pos]
(fseek fp pos 0))
(rewind [this]
(-rewind fp))
IDisposable
(-dispose! [this]
(fclose fp))
IReduce
(-reduce [this f init]
(stream-reducer this f init)))
(defn open-read
{:doc "Open a file for reading, returning a IInputStream"
:added "0.1"}
[filename]
(assert (string? filename) "Filename must be a string")
(->FileStream (fopen filename "r")))
(defn read-line
"Read one line from input-stream for each invocation.
nil when all lines have been read"
[input-stream]
(let [line-feed (into #{} (map int [\newline \return]))
buf (buffer 1)]
(loop [acc []]
(let [len (read input-stream buf 1)]
(cond
(and (pos? len) (not (line-feed (first buf))))
(recur (conj acc (first buf)))
(and (zero? len) (empty? acc)) nil
:else (apply str (map char acc)))))))
(defn line-seq
"Returns the lines of text from input-stream as a lazy sequence of strings.
input-stream must implement IInputStream"
[input-stream]
(when-let [line (read-line input-stream)]
(cons line (lazy-seq (line-seq input-stream)))))
(deftype FileOutputStream [fp]
IByteOutputStream
(write-byte [this val]
(assert (integer? val) "Value must be a int")
(println val)
(fputc val fp))
IOutputStream
(write [this buffer]
(fwrite buffer 1 (count buffer) fp))
IDisposable
(-dispose! [this]
(fclose fp)))
(defn file-output-rf [filename]
(let [fp (->FileOutputStream (fopen filename "w"))]
(fn ([] 0)
([cnt] (dispose! fp) nil)
([cnt chr]
(assert (integer? chr))
(let [written (write-byte fp chr)]
(if (= written 0)
(reduced cnt)
(+ cnt written)))))))
(defn spit [filename val]
(transduce (map int)
(file-output-rf filename)
(str val)))
(defn slurp [filename]
(let [c (->FileStream (fopen filename "r"))
result (transduce
(map char)
string-builder
c)]
(dispose! c)
result))
(defn slurp-stream [stream]
(let [c stream
result (transduce
(map char)
string-builder
c)]
(dispose! c)
result))
(deftype ProcessInputStream [fp]
IInputStream
(read [this buffer len]
(assert (<= (buffer-capacity buffer) len)
"Not enough capacity in the buffer")
(let [read-count (fread buffer 1 len fp)]
(set-buffer-count! buffer read-count)
read-count))
(read-byte [this]
(fgetc fp))
IDisposable
(-dispose! [this]
(pclose fp))
IReduce
(-reduce [this f init]
(stream-reducer this f init)))
(defn popen-read
{:doc "Open a file for reading, returning a IInputStream"
:added "0.1"}
[command]
(assert (string? command) "Command must be a string")
(->ProcessInputStream (popen command "r")))
(defn run-command [command]
(let [c (->ProcessInputStream (popen command "r"))
result (transduce
(map char)
string-builder
c)]
(dispose! c)
result))