Skip to content

Commit 598df0a

Browse files
committed
Add Iteratee and IO for efficient, failsafe file processing (with CheckIO/CheckIteratee tests). The WordCount sample (and tests) demonstrate the usage.
1 parent f03172c commit 598df0a

File tree

6 files changed

+1258
-0
lines changed

6 files changed

+1258
-0
lines changed

core/src/main/java/fj/data/IO.java

Lines changed: 309 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,309 @@
1+
package fj.data;
2+
3+
import static fj.Bottom.errorF;
4+
import static fj.Function.constant;
5+
import static fj.Function.partialApply2;
6+
7+
import java.io.BufferedReader;
8+
import java.io.File;
9+
import java.io.FileInputStream;
10+
import java.io.IOException;
11+
import java.io.InputStreamReader;
12+
import java.io.Reader;
13+
import java.nio.charset.Charset;
14+
import java.util.Arrays;
15+
16+
import fj.F;
17+
import fj.Function;
18+
import fj.P;
19+
import fj.P1;
20+
import fj.P2;
21+
import fj.Unit;
22+
import fj.data.Iteratee.Input;
23+
import fj.data.Iteratee.IterV;
24+
25+
/**
26+
* IO monad for processing files, with main methods {@link #enumFileLines(File, Option, IterV)},
27+
* {@link #enumFileChars(File, Option, IterV)} and {@link #enumFileCharChunks(File, Option, IterV)}
28+
* (the latter one is the fastest as char chunks read from the file are directly passed to the iteratee
29+
* without indirection in between).
30+
*
31+
* @author Martin Grotzke
32+
*
33+
* @param <A> the type of the result produced by the wrapped iteratee
34+
*/
35+
public abstract class IO<A> {
36+
37+
private static final int DEFAULT_BUFFER_SIZE = 1024 * 4;
38+
39+
public static final F<Reader, IO<Unit>> closeReader =
40+
new F<Reader, IO<Unit>>() {
41+
@Override
42+
public IO<Unit> f(final Reader r) {
43+
return closeReader(r);
44+
}
45+
};
46+
47+
public static IO<Unit> closeReader(final Reader r) {
48+
return new IO<Unit>() {
49+
@Override
50+
public Unit run() throws IOException {
51+
r.close();
52+
return Unit.unit();
53+
}
54+
};
55+
}
56+
57+
/**
58+
* An IO monad that reads lines from the given file (using a {@link BufferedReader}) and passes
59+
* lines to the provided iteratee. May not be suitable for files with very long
60+
* lines, consider to use {@link #enumFileCharChunks(File, IterV)} or {@link #enumFileChars(File, IterV)}
61+
* as an alternative.
62+
*
63+
* @param f the file to read, must not be <code>null</code>
64+
* @param encoding the encoding to use, {@link Option#none()} means platform default
65+
* @param i the iteratee that is fed with lines read from the file
66+
*/
67+
public static <A> IO<IterV<String, A>> enumFileLines(final File f, final Option<Charset> encoding, final IterV<String, A> i) {
68+
return bracket(bufferedReader(f, encoding)
69+
, Function.<BufferedReader, IO<Unit>>vary(closeReader)
70+
, partialApply2(IO.<A>lineReader(), i));
71+
}
72+
73+
/**
74+
* An IO monad that reads char chunks from the given file and passes them to the given iteratee.
75+
*
76+
* @param f the file to read, must not be <code>null</code>
77+
* @param encoding the encoding to use, {@link Option#none()} means platform default
78+
* @param i the iteratee that is fed with char chunks read from the file
79+
*/
80+
public static <A> IO<IterV<char[], A>> enumFileCharChunks(final File f, final Option<Charset> encoding, final IterV<char[], A> i) {
81+
return bracket(fileReader(f, encoding)
82+
, Function.<Reader, IO<Unit>>vary(closeReader)
83+
, partialApply2(IO.<A>charChunkReader(), i));
84+
}
85+
86+
/**
87+
* An IO monad that reads char chunks from the given file and passes single chars to the given iteratee.
88+
*
89+
* @param f the file to read, must not be <code>null</code>
90+
* @param encoding the encoding to use, {@link Option#none()} means platform default
91+
* @param i the iteratee that is fed with chars read from the file
92+
*/
93+
public static <A> IO<IterV<Character, A>> enumFileChars(final File f, final Option<Charset> encoding, final IterV<Character, A> i) {
94+
return bracket(fileReader(f, encoding)
95+
, Function.<Reader, IO<Unit>>vary(closeReader)
96+
, partialApply2(IO.<A>charChunkReader2(), i));
97+
}
98+
99+
public static IO<BufferedReader> bufferedReader(final File f, final Option<Charset> encoding) {
100+
return fileReader(f, encoding).map(new F<Reader, BufferedReader>() {
101+
@Override
102+
public BufferedReader f(final Reader a) {
103+
return new BufferedReader(a);
104+
}});
105+
}
106+
107+
public static IO<Reader> fileReader(final File f, final Option<Charset> encoding) {
108+
return new IO<Reader>() {
109+
@Override
110+
public Reader run() throws IOException {
111+
final FileInputStream fis = new FileInputStream(f);
112+
return encoding.isNone() ? new InputStreamReader(fis) : new InputStreamReader(fis, encoding.some());
113+
}
114+
};
115+
}
116+
117+
public static final <A, B, C> IO<C> bracket(final IO<A> init, final F<A, IO<B>> fin, final F<A, IO<C>> body) {
118+
return new IO<C>() {
119+
@Override
120+
public C run() throws IOException {
121+
final A a = init.run();
122+
try {
123+
return body.f(a).run();
124+
} catch (final IOException e) {
125+
throw e;
126+
} finally {
127+
fin.f(a);
128+
}
129+
}
130+
};
131+
}
132+
133+
public static final <A> IO<A> unit(final A a) {
134+
return new IO<A>() {
135+
@Override
136+
public A run() throws IOException {
137+
return a;
138+
}
139+
};
140+
}
141+
142+
/**
143+
* A function that feeds an iteratee with lines read from a {@link BufferedReader}.
144+
*/
145+
public static <A> F<BufferedReader, F<IterV<String, A>, IO<IterV<String, A>>>> lineReader() {
146+
final F<IterV<String, A>, Boolean> isDone =
147+
new F<Iteratee.IterV<String, A>, Boolean>() {
148+
final F<P2<A, Input<String>>, P1<Boolean>> done = constant(P.p(true));
149+
final F<F<Input<String>, IterV<String, A>>, P1<Boolean>> cont = constant(P.p(false));
150+
151+
@Override
152+
public Boolean f(final IterV<String, A> i) {
153+
return i.fold(done, cont)._1();
154+
}
155+
};
156+
157+
return new F<BufferedReader, F<IterV<String, A>, IO<IterV<String, A>>>>() {
158+
@Override
159+
public F<IterV<String, A>, IO<IterV<String, A>>> f(final BufferedReader r) {
160+
return new F<IterV<String, A>, IO<IterV<String, A>>>() {
161+
final F<P2<A, Input<String>>, P1<IterV<String, A>>> done = errorF("iteratee is done"); //$NON-NLS-1$
162+
163+
@Override
164+
public IO<IterV<String, A>> f(final IterV<String, A> it) {
165+
// use loop instead of recursion because of missing TCO
166+
return new IO<Iteratee.IterV<String, A>>() {
167+
@Override
168+
public IterV<String, A> run() throws IOException {
169+
IterV<String, A> i = it;
170+
while (!isDone.f(i)) {
171+
final String s = r.readLine();
172+
if (s == null) { return i; }
173+
final Input<String> input = Input.<String>el(s);
174+
final F<F<Input<String>, IterV<String, A>>, P1<IterV<String, A>>> cont = Function.<Input<String>, IterV<String, A>>apply(input).lazy();
175+
i = i.fold(done, cont)._1();
176+
}
177+
return i;
178+
}
179+
};
180+
}
181+
};
182+
}
183+
};
184+
}
185+
186+
/**
187+
* A function that feeds an iteratee with character chunks read from a {@link Reader}
188+
* (char[] of size {@link #DEFAULT_BUFFER_SIZE}).
189+
*/
190+
public static <A> F<Reader, F<IterV<char[], A>, IO<IterV<char[], A>>>> charChunkReader() {
191+
final F<IterV<char[], A>, Boolean> isDone =
192+
new F<Iteratee.IterV<char[], A>, Boolean>() {
193+
final F<P2<A, Input<char[]>>, P1<Boolean>> done = constant(P.p(true));
194+
final F<F<Input<char[]>, IterV<char[], A>>, P1<Boolean>> cont = constant(P.p(false));
195+
196+
@Override
197+
public Boolean f(final IterV<char[], A> i) {
198+
return i.fold(done, cont)._1();
199+
}
200+
};
201+
202+
return new F<Reader, F<IterV<char[], A>, IO<IterV<char[], A>>>>() {
203+
@Override
204+
public F<IterV<char[], A>, IO<IterV<char[], A>>> f(final Reader r) {
205+
return new F<IterV<char[], A>, IO<IterV<char[], A>>>() {
206+
final F<P2<A, Input<char[]>>, P1<IterV<char[], A>>> done = errorF("iteratee is done"); //$NON-NLS-1$
207+
208+
@Override
209+
public IO<IterV<char[], A>> f(final IterV<char[], A> it) {
210+
// use loop instead of recursion because of missing TCO
211+
return new IO<Iteratee.IterV<char[], A>>() {
212+
@Override
213+
public IterV<char[], A> run() throws IOException {
214+
215+
IterV<char[], A> i = it;
216+
while (!isDone.f(i)) {
217+
char[] buffer = new char[DEFAULT_BUFFER_SIZE];
218+
final int numRead = r.read(buffer);
219+
if (numRead == -1) { return i; }
220+
if(numRead < buffer.length) {
221+
buffer = Arrays.copyOfRange(buffer, 0, numRead);
222+
}
223+
final Input<char[]> input = Input.<char[]>el(buffer);
224+
final F<F<Input<char[]>, IterV<char[], A>>, P1<IterV<char[], A>>> cont =
225+
Function.<Input<char[]>, IterV<char[], A>>apply(input).lazy();
226+
i = i.fold(done, cont)._1();
227+
}
228+
return i;
229+
}
230+
};
231+
}
232+
};
233+
}
234+
};
235+
}
236+
237+
/**
238+
* A function that feeds an iteratee with characters read from a {@link Reader}
239+
* (chars are read in chunks of size {@link #DEFAULT_BUFFER_SIZE}).
240+
*/
241+
public static <A> F<Reader, F<IterV<Character, A>, IO<IterV<Character, A>>>> charChunkReader2() {
242+
final F<IterV<Character, A>, Boolean> isDone =
243+
new F<Iteratee.IterV<Character, A>, Boolean>() {
244+
final F<P2<A, Input<Character>>, P1<Boolean>> done = constant(P.p(true));
245+
final F<F<Input<Character>, IterV<Character, A>>, P1<Boolean>> cont = constant(P.p(false));
246+
247+
@Override
248+
public Boolean f(final IterV<Character, A> i) {
249+
return i.fold(done, cont)._1();
250+
}
251+
};
252+
253+
return new F<Reader, F<IterV<Character, A>, IO<IterV<Character, A>>>>() {
254+
@Override
255+
public F<IterV<Character, A>, IO<IterV<Character, A>>> f(final Reader r) {
256+
return new F<IterV<Character, A>, IO<IterV<Character, A>>>() {
257+
final F<P2<A, Input<Character>>, IterV<Character, A>> done = errorF("iteratee is done"); //$NON-NLS-1$
258+
259+
@Override
260+
public IO<IterV<Character, A>> f(final IterV<Character, A> it) {
261+
// use loop instead of recursion because of missing TCO
262+
return new IO<Iteratee.IterV<Character, A>>() {
263+
@Override
264+
public IterV<Character, A> run() throws IOException {
265+
266+
IterV<Character, A> i = it;
267+
while (!isDone.f(i)) {
268+
char[] buffer = new char[DEFAULT_BUFFER_SIZE];
269+
final int numRead = r.read(buffer);
270+
if (numRead == -1) { return i; }
271+
if(numRead < buffer.length) {
272+
buffer = Arrays.copyOfRange(buffer, 0, numRead);
273+
}
274+
for(int c = 0; c < buffer.length; c++) {
275+
final Input<Character> input = Input.el(buffer[c]);
276+
final F<F<Input<Character>, IterV<Character, A>>, IterV<Character, A>> cont =
277+
Function.<Input<Character>, IterV<Character, A>>apply(input);
278+
i = i.fold(done, cont);
279+
}
280+
}
281+
return i;
282+
}
283+
};
284+
}
285+
};
286+
}
287+
};
288+
}
289+
290+
public abstract A run() throws IOException;
291+
292+
public final <B> IO<B> map(final F<A, B> f) {
293+
return new IO<B>() {
294+
@Override
295+
public B run() throws IOException {
296+
return f.f(IO.this.run());
297+
}
298+
};
299+
}
300+
301+
public final <B> IO<B> bind(final F<A, IO<B>> f) {
302+
return new IO<B>() {
303+
@Override
304+
public B run() throws IOException {
305+
return f.f(IO.this.run()).run();
306+
}
307+
};
308+
}
309+
}

0 commit comments

Comments
 (0)