Skip to content

Commit 51dc6ad

Browse files
author
Aapo Kyrola
committed
fixed a subtle synchronization issue related to randomaccessfile buffers. Thanks for Michael Leznik for debugging!
1 parent 4d6d347 commit 51dc6ad

4 files changed

Lines changed: 84 additions & 4 deletions

File tree

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package edu.cmu.graphchi.apps;
2+
3+
import edu.cmu.graphchi.ChiVertex;
4+
import edu.cmu.graphchi.GraphChiContext;
5+
import edu.cmu.graphchi.GraphChiProgram;
6+
import edu.cmu.graphchi.datablocks.IntConverter;
7+
import edu.cmu.graphchi.engine.GraphChiEngine;
8+
import edu.cmu.graphchi.engine.VertexInterval;
9+
10+
import java.util.TreeSet;
11+
12+
/**
13+
* Tests that the system works.
14+
* @author akyrola
15+
* Date: 7/11/12
16+
*/
17+
public class SmokeTest implements GraphChiProgram<Integer, Integer> {
18+
19+
20+
public void update(ChiVertex<Integer, Integer> vertex, GraphChiContext context) {
21+
if (context.getIteration() == 0) {
22+
vertex.setValue(vertex.getId() + context.getIteration());
23+
} else {
24+
int curval = vertex.getValue();
25+
int vexpected = vertex.getId() + context.getIteration() - 1;
26+
if (curval != vexpected) {
27+
throw new RuntimeException("Mismatch (vertex). Expected: " + vexpected + " but had " +
28+
curval);
29+
30+
}
31+
for(int i=0; i<vertex.numInEdges(); i++) {
32+
int has = vertex.inEdge(i).getValue();
33+
int correction = vertex.getId() > vertex.inEdge(i).getVertexId() ? +1 : 0;
34+
int expected = vertex.inEdge(i).getVertexId() + context.getIteration() - 1 + correction;
35+
if (expected != has)
36+
throw new RuntimeException("Mismatch (edge): " + expected + " expected but had "+ has +
37+
". Iteration:" + context.getIteration() + ", edge:" + vertex.inEdge(i).getVertexId()
38+
+ " -> " + vertex.getId());
39+
}
40+
vertex.setValue(vertex.getId() + context.getIteration());
41+
42+
}
43+
int val = vertex.getValue();
44+
for(int i=0; i<vertex.numOutEdges(); i++) {
45+
vertex.outEdge(i).setValue(val);
46+
}
47+
}
48+
49+
50+
public void beginIteration(GraphChiContext ctx) {}
51+
52+
public void endIteration(GraphChiContext ctx) {}
53+
54+
public void beginInterval(GraphChiContext ctx, VertexInterval interval) {}
55+
56+
public void endInterval(GraphChiContext ctx, VertexInterval interval) {}
57+
58+
public static void main(String[] args) throws Exception {
59+
String baseFilename = args[0];
60+
int nShards = Integer.parseInt(args[1]);
61+
62+
GraphChiEngine<Integer, Integer> engine = new GraphChiEngine<Integer, Integer>(baseFilename, nShards);
63+
engine.setEdataConverter(new IntConverter());
64+
engine.setVertexDataConverter(new IntConverter());
65+
engine.setModifiesInedges(false); // Important optimization
66+
engine.run(new SmokeTest(), 5);
67+
68+
System.out.println("Ready.");
69+
70+
}
71+
}

src/edu/cmu/graphchi/engine/auxdata/VertexData.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,11 +58,8 @@ public VertexData(int nvertices, String baseFilename, BytesToValueConverter<Vert
5858
fos.close();
5959
}
6060

61-
6261
vertexDataFile = new RandomAccessFile(ChiFilenames.getFilenameOfVertexData(baseFilename, converter), "rwd");
6362
vertexEn = vertexSt = 0;
64-
65-
6663
}
6764

6865
public void releaseAndCommit() throws IOException {

src/edu/cmu/graphchi/shards/MemoryShard.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,14 +55,17 @@ public void commitAndRelease(boolean modifiesInedges, boolean modifiesOutedges)
5555
if (modifiesInedges) {
5656
FileOutputStream fos = new FileOutputStream(new File(edgeDataFilename));
5757
fos.write(data);
58+
fos.flush();
5859
fos.close();
5960
} else if (modifiesOutedges) {
6061
ucar.unidata.io.RandomAccessFile rFile =
6162
new ucar.unidata.io.RandomAccessFile(edgeDataFilename, "rwd");
6263
rFile.seek(rangeStartEdgePtr);
6364
int last = streamingOffsetEdgePtr;
6465
if (last == 0) last = edataFilesize;
66+
6567
rFile.write(data, rangeStartEdgePtr, last - rangeStartEdgePtr);
68+
rFile.flush();
6669
rFile.close();
6770
}
6871
dataBlockManager.release(blockId);

src/edu/cmu/graphchi/shards/SlidingShard.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,10 @@ public SlidingShard(String edgeDataFilename, String adjDataFilename,
6262
adjFilesize = new File(adjDataFilename).length();
6363
edataFilesize = new File(edgeDataFilename).length();
6464
activeBlocks = new ArrayList<Block>();
65-
edataFile = new RandomAccessFile(this.edgeDataFilename, "rwd");
65+
}
66+
67+
void openEdataFile() throws IOException {
68+
edataFile = new RandomAccessFile(this.edgeDataFilename, "rwd");
6669
}
6770

6871
public void finalize() {
@@ -105,6 +108,8 @@ public void skip(int n) throws IOException {
105108
}
106109

107110
public void readNextVertices(ChiVertex[] vertices, int start, boolean disableWrites) throws IOException {
111+
if (edataFile == null) openEdataFile();
112+
108113
int nvecs = vertices.length;
109114
curBlock = null;
110115
releasePriorToOffset(false, disableWrites);
@@ -186,6 +191,10 @@ public void readNextVertices(ChiVertex[] vertices, int start, boolean disableWri
186191

187192
public void flush() throws IOException {
188193
releasePriorToOffset(true, false);
194+
if (edataFile != null) {
195+
edataFile.close();
196+
edataFile = null; // Close the file to avoid any buffering issues
197+
}
189198
}
190199

191200
public void setOffset(int newoff, int _curvid, int edgeptr) {

0 commit comments

Comments
 (0)