Skip to content

Commit 913cdac

Browse files
committed
Remove Weight type parameter
Signed-off-by: Mihai Budiu <mbudiu@feldera.com>
1 parent bd31ba3 commit 913cdac

26 files changed

Lines changed: 326 additions & 236 deletions

sql-to-dbsp-compiler/simulator/src/main/java/org/dbsp/simulator/AggregateDescription.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
package org.dbsp.simulator;
22

3+
import org.dbsp.simulator.types.Weight;
34
import org.dbsp.simulator.util.TriFunction;
45

56
import java.util.function.Function;
67

7-
public class AggregateDescription<Result, IntermediateResult, Data, Weight> {
8+
public class AggregateDescription<Result, IntermediateResult, Data> {
89
public final IntermediateResult initialValue;
910
public final TriFunction<IntermediateResult, Data, Weight, IntermediateResult> update;
1011
public final Function<IntermediateResult, Result> finalize;
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package org.dbsp.simulator;
2+
3+
import org.dbsp.simulator.types.DataType;
4+
import org.dbsp.simulator.types.FunctionType;
5+
6+
import java.util.function.Function;
7+
8+
public class RuntimeFunction {
9+
final FunctionType type;
10+
final Function<DataType, DataType> implementation;
11+
12+
public RuntimeFunction(FunctionType type, Function<DataType, DataType> implementation) {
13+
this.type = type;
14+
this.implementation = implementation;
15+
assert type.getParameterCount() == 1;
16+
}
17+
18+
public Function<DataType, DataType> getFunction() {
19+
return this.implementation;
20+
}
21+
}
Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,31 @@
11
package org.dbsp.simulator.collections;
22

3+
import org.dbsp.simulator.types.DataType;
4+
import org.dbsp.simulator.types.WeightType;
35
import org.dbsp.simulator.util.ICastable;
46
import org.dbsp.simulator.util.IndentStream;
57
import org.dbsp.simulator.util.ToIndentableString;
8+
import org.dbsp.simulator.values.SqlTuple;
9+
10+
import java.util.function.Function;
11+
import java.util.function.Predicate;
12+
13+
public abstract class BaseCollection implements ICastable, ToIndentableString, DataType {
14+
final WeightType weightType;
15+
16+
protected BaseCollection(WeightType weightType) {
17+
this.weightType = weightType;
18+
}
619

7-
public abstract class BaseCollection<Weight> implements ICastable, ToIndentableString {
820
@Override
921
public String toString() {
1022
StringBuilder builder = new StringBuilder();
1123
IndentStream stream = new IndentStream(builder);
1224
this.toString(stream);
1325
return stream.toString();
1426
}
27+
28+
ZSet<DataType> as() {
29+
return (ZSet<DataType>) this;
30+
}
1531
}

sql-to-dbsp-compiler/simulator/src/main/java/org/dbsp/simulator/collections/IndexedZSet.java

Lines changed: 25 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,28 @@
11
package org.dbsp.simulator.collections;
22

33
import org.dbsp.simulator.AggregateDescription;
4+
import org.dbsp.simulator.types.Weight;
45
import org.dbsp.simulator.types.WeightType;
56
import org.dbsp.simulator.util.IIndentStream;
67
import org.dbsp.simulator.util.ToIndentableString;
8+
import org.dbsp.simulator.values.SqlTuple;
79

810
import java.util.HashMap;
911
import java.util.Map;
1012
import java.util.function.BiFunction;
13+
import java.util.function.Function;
14+
import java.util.function.Predicate;
1115

12-
public class IndexedZSet<Key, Value, Weight> extends BaseCollection<Weight> implements ToIndentableString {
13-
final WeightType<Weight> weightType;
14-
final Map<Key, ZSet<Value, Weight>> index;
16+
public class IndexedZSet<Key, Value> extends BaseCollection implements ToIndentableString {
17+
final Map<Key, ZSet<Value>> index;
1518

16-
public IndexedZSet(WeightType<Weight> weightType) {
19+
public IndexedZSet(WeightType weightType) {
20+
super(weightType);
1721
index = new HashMap<>();
18-
this.weightType = weightType;
1922
}
2023

2124
public void append(Key key, Value value, Weight weight) {
22-
ZSet<Value, Weight> zset = this.index.getOrDefault(
25+
ZSet<Value> zset = this.index.getOrDefault(
2326
key, new ZSet<>(this.weightType));
2427
if (zset.isEmpty())
2528
// This is a new key
@@ -30,41 +33,41 @@ public void append(Key key, Value value, Weight weight) {
3033
this.index.remove(key);
3134
}
3235

33-
public <Result, OtherValue> IndexedZSet<Key, Result, Weight> join(
34-
IndexedZSet<Key, OtherValue, Weight> other,
36+
public <Result, OtherValue> IndexedZSet<Key, Result> join(
37+
IndexedZSet<Key, OtherValue> other,
3538
BiFunction<Value, OtherValue, Result> combiner) {
36-
IndexedZSet<Key, Result, Weight> result = new IndexedZSet<>(this.weightType);
39+
IndexedZSet<Key, Result> result = new IndexedZSet<>(this.weightType);
3740
for (Key key: this.index.keySet()) {
3841
if (other.index.containsKey(key)) {
39-
ZSet<Value, Weight> left = this.index.get(key);
40-
ZSet<OtherValue, Weight> right = other.index.get(key);
41-
ZSet<Result, Weight> product = left.multiply(right, combiner);
42+
ZSet<Value> left = this.index.get(key);
43+
ZSet<OtherValue> right = other.index.get(key);
44+
ZSet<Result> product = left.multiply(right, combiner);
4245
result.index.put(key, product);
4346
}
4447
}
4548
return result;
4649
}
4750

48-
public <Result, IntermediateResult> IndexedZSet<Key, Result, Weight>
49-
aggregate(AggregateDescription<Result, IntermediateResult, Value, Weight> aggregate) {
50-
IndexedZSet<Key, Result, Weight> result = new IndexedZSet<>(this.weightType);
51+
public <Result, IntermediateResult> IndexedZSet<Key, Result>
52+
aggregate(AggregateDescription<Result, IntermediateResult, Value> aggregate) {
53+
IndexedZSet<Key, Result> result = new IndexedZSet<>(this.weightType);
5154
for (Key key: this.index.keySet()) {
52-
ZSet<Value, Weight> set = this.index.get(key);
55+
ZSet<Value> set = this.index.get(key);
5356
Result agg = set.aggregate(aggregate);
5457
result.append(key, agg, this.weightType.one());
5558
}
5659
return result;
5760
}
5861

59-
public ZSet<Value, Weight> deindex() {
62+
public ZSet<Value> deindex() {
6063
return this.flatten((k, v) -> v);
6164
}
6265

63-
public <Result> ZSet<Result, Weight> flatten(BiFunction<Key, Value, Result> combine) {
64-
ZSet<Result, Weight> result = new ZSet<>(this.weightType);
66+
public <Result> ZSet<Result> flatten(BiFunction<Key, Value, Result> combine) {
67+
ZSet<Result> result = new ZSet<>(this.weightType);
6568
for (Key key: this.index.keySet()) {
66-
ZSet<Value, Weight> set = this.index.get(key);
67-
ZSet<Result, Weight> map = set.map(v -> combine.apply(key, v));
69+
ZSet<Value> set = this.index.get(key);
70+
ZSet<Result> map = set.map(v -> combine.apply(key, v));
6871
result.append(map);
6972
}
7073
return result;
@@ -78,7 +81,7 @@ public IIndentStream toString(IIndentStream stream) {
7881
stream.append("{").increase();
7982
boolean first = true;
8083
for (Key key: this.index.keySet()) {
81-
ZSet<Value, Weight> data = this.index.get(key);
84+
ZSet<Value> data = this.index.get(key);
8285
if (!first)
8386
stream.append(",\n");
8487
first = false;

sql-to-dbsp-compiler/simulator/src/main/java/org/dbsp/simulator/collections/ZSet.java

Lines changed: 29 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package org.dbsp.simulator.collections;
22

33
import org.dbsp.simulator.AggregateDescription;
4+
import org.dbsp.simulator.types.Weight;
45
import org.dbsp.simulator.types.WeightType;
56
import org.dbsp.simulator.util.IIndentStream;
67
import org.dbsp.simulator.util.ToIndentableString;
@@ -11,15 +12,14 @@
1112
import java.util.function.Function;
1213
import java.util.function.Predicate;
1314

14-
public class ZSet<Data, Weight> extends BaseCollection<Weight> implements ToIndentableString {
15+
public class ZSet<Data> extends BaseCollection implements ToIndentableString {
1516
/** Maps values to weights. Invariant: weights are never zero */
1617
final Map<Data, Weight> data;
17-
final WeightType<Weight> weightType;
1818

1919
/** Create a Z-set by cloning the data from the specified map. */
20-
public ZSet(Map<Data, Weight> data, WeightType<Weight> weightType) {
20+
public ZSet(Map<Data, Weight> data, WeightType weightType) {
21+
super(weightType);
2122
this.data = new HashMap<>();
22-
this.weightType = weightType;
2323
for (Map.Entry<Data, Weight> datum: data.entrySet()) {
2424
if (!this.weightType.isZero(datum.getValue()))
2525
this.data.put(datum.getKey(), datum.getValue());
@@ -37,28 +37,28 @@ public int entryCount() {
3737
}
3838

3939
/** Create an empty Z-set */
40-
public ZSet(WeightType<Weight> weightType) {
40+
public ZSet(WeightType weightType) {
41+
super(weightType);
4142
this.data = new HashMap<>();
42-
this.weightType = weightType;
4343
}
4444

45-
public ZSet(Collection<Data> data, WeightType<Weight> weightType) {
45+
public ZSet(Collection<Data> data, WeightType weightType) {
46+
super(weightType);
4647
this.data = new HashMap<>();
47-
this.weightType = weightType;
4848
for (Data datum: data) {
4949
this.data.merge(datum, this.weightType.one(), this::merger);
5050
}
5151
}
5252

53-
public ZSet<Data, Weight> negate() {
53+
public ZSet<Data> negate() {
5454
Map<Data, Weight> result = new HashMap<>();
5555
for (Map.Entry<Data, Weight> entry: this.data.entrySet()) {
5656
result.put(entry.getKey(), this.weightType.negate(entry.getValue()));
5757
}
5858
return new ZSet<>(result, this.weightType);
5959
}
6060

61-
public static <Data, Weight> ZSet<Data, Weight> zero(WeightType<Weight> weightType) {
61+
public static <Data, Weight> ZSet<Data> zero(WeightType weightType) {
6262
return new ZSet<>(weightType);
6363
}
6464

@@ -70,18 +70,18 @@ Weight merger(Weight oldWeight, Weight newWeight) {
7070
return w;
7171
}
7272

73-
public ZSet<Data, Weight> add(ZSet<Data, Weight> other) {
73+
public ZSet<Data> add(ZSet<Data> other) {
7474
Map<Data, Weight> result = new HashMap<>(this.data);
7575
for (Map.Entry<Data, Weight> entry: other.data.entrySet()) {
7676
result.merge(entry.getKey(), entry.getValue(), this::merger);
7777
}
7878
return new ZSet<>(result, this.weightType);
7979
}
8080

81-
public <OtherData, Result> ZSet<Result, Weight> multiply(
82-
ZSet<OtherData, Weight> other,
81+
public <OtherData, Result> ZSet<Result> multiply(
82+
ZSet<OtherData> other,
8383
BiFunction<Data, OtherData, Result> combiner) {
84-
ZSet<Result, Weight> result = new ZSet<>(this.weightType);
84+
ZSet<Result> result = new ZSet<>(this.weightType);
8585
for (Map.Entry<Data, Weight> entry: this.data.entrySet()) {
8686
for (Map.Entry<OtherData, Weight> otherEntry: other.data.entrySet()) {
8787
Result data = combiner.apply(entry.getKey(), otherEntry.getKey());
@@ -92,36 +92,36 @@ public <OtherData, Result> ZSet<Result, Weight> multiply(
9292
return result;
9393
}
9494

95-
public ZSet<Data, Weight> subtract(ZSet<Data, Weight> other) {
95+
public ZSet<Data> subtract(ZSet<Data> other) {
9696
Map<Data, Weight> result = new HashMap<>(this.data);
9797
for (Map.Entry<Data, Weight> entry: other.data.entrySet()) {
9898
result.merge(entry.getKey(), this.weightType.negate(entry.getValue()), this::merger);
9999
}
100100
return new ZSet<>(result, this.weightType);
101101
}
102102

103-
public ZSet<Data, Weight> append(Data data, Weight weight) {
103+
public ZSet<Data> append(Data data, Weight weight) {
104104
this.data.merge(data, weight, this::merger);
105105
return this;
106106
}
107107

108-
public boolean equals(ZSet<Data, Weight> other) {
108+
public boolean equals(ZSet<Data> other) {
109109
return this.subtract(other).isEmpty();
110110
}
111111

112-
public ZSet<Data, Weight> append(Data data) {
112+
public ZSet<Data> append(Data data) {
113113
this.append(data, this.weightType.one());
114114
return this;
115115
}
116116

117-
public ZSet<Data, Weight> append(ZSet<Data, Weight> other) {
117+
public ZSet<Data> append(ZSet<Data> other) {
118118
for (Map.Entry<Data, Weight> entry: other.data.entrySet()) {
119119
this.data.merge(entry.getKey(), entry.getValue(), this::merger);
120120
}
121121
return this;
122122
}
123123

124-
public ZSet<Data, Weight> positive(boolean set) {
124+
public ZSet<Data> positive(boolean set) {
125125
Map<Data, Weight> result = new HashMap<>();
126126
for (Map.Entry<Data, Weight> entry: this.data.entrySet()) {
127127
Weight weight = entry.getValue();
@@ -134,11 +134,11 @@ public ZSet<Data, Weight> positive(boolean set) {
134134
return new ZSet<>(result, this.weightType);
135135
}
136136

137-
public ZSet<Data, Weight> distinct() {
137+
public ZSet<Data> distinct() {
138138
return this.positive(true);
139139
}
140140

141-
public <OData> ZSet<OData, Weight> map(Function<Data, OData> tupleTransform) {
141+
public <OData> ZSet<OData> map(Function<Data, OData> tupleTransform) {
142142
Map<OData, Weight> result = new HashMap<>();
143143
for (Map.Entry<Data, Weight> entry: this.data.entrySet()) {
144144
Weight weight = entry.getValue();
@@ -148,7 +148,7 @@ public <OData> ZSet<OData, Weight> map(Function<Data, OData> tupleTransform) {
148148
return new ZSet<>(result, this.weightType);
149149
}
150150

151-
public ZSet<Data, Weight> filter(Predicate<Data> keep) {
151+
public ZSet<Data> filter(Predicate<Data> keep) {
152152
Map<Data, Weight> result = new HashMap<>();
153153
for (Map.Entry<Data, Weight> entry: this.data.entrySet()) {
154154
Weight weight = entry.getValue();
@@ -158,8 +158,8 @@ public ZSet<Data, Weight> filter(Predicate<Data> keep) {
158158
return new ZSet<>(result, this.weightType);
159159
}
160160

161-
public <Key> IndexedZSet<Key, Data, Weight> index(Function<Data, Key> key) {
162-
IndexedZSet<Key, Data, Weight> result = new IndexedZSet<>(this.weightType);
161+
public <Key> IndexedZSet<Key, Data> index(Function<Data, Key> key) {
162+
IndexedZSet<Key, Data> result = new IndexedZSet<>(this.weightType);
163163
for (Map.Entry<Data, Weight> entry: this.data.entrySet()) {
164164
Weight weight = entry.getValue();
165165
Key keyValue = key.apply(entry.getKey());
@@ -186,20 +186,20 @@ public Collection<Data> toCollection() {
186186
return result;
187187
}
188188

189-
public ZSet<Data, Weight> union(ZSet<Data, Weight> other) {
189+
public ZSet<Data> union(ZSet<Data> other) {
190190
return this.add(other).distinct();
191191
}
192192

193-
public ZSet<Data, Weight> union_all(ZSet<Data, Weight> other) {
193+
public ZSet<Data> union_all(ZSet<Data> other) {
194194
return this.add(other);
195195
}
196196

197-
public ZSet<Data, Weight> except(ZSet<Data, Weight> other) {
197+
public ZSet<Data> except(ZSet<Data> other) {
198198
return this.distinct().subtract(other.distinct()).distinct();
199199
}
200200

201201
public <Result, IntermediateResult> Result aggregate(
202-
AggregateDescription<Result, IntermediateResult, Data, Weight> aggregate) {
202+
AggregateDescription<Result, IntermediateResult, Data> aggregate) {
203203
IntermediateResult result = aggregate.initialValue;
204204
for (Map.Entry<Data, Weight> entry : this.data.entrySet()) {
205205
Weight weight = entry.getValue();

sql-to-dbsp-compiler/simulator/src/main/java/org/dbsp/simulator/operators/BaseOperator.java

Lines changed: 8 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,23 @@
11

22
package org.dbsp.simulator.operators;
33

4-
import org.dbsp.simulator.collections.BaseCollection;
5-
import org.dbsp.simulator.types.WeightType;
4+
import org.dbsp.simulator.types.DataType;
65

7-
import javax.annotation.Nullable;
8-
import java.util.Objects;
6+
public abstract class BaseOperator {
7+
final Stream[] inputs;
8+
final Stream output;
99

10-
public abstract class BaseOperator<Weight> {
11-
final WeightType<Weight> weightType;
12-
final BaseOperator<Weight>[] inputs;
13-
@Nullable
14-
BaseCollection<Weight> nextOutput;
15-
16-
@SafeVarargs
17-
protected BaseOperator(WeightType<Weight> weightType, BaseOperator<Weight>... inputs) {
18-
this.weightType = weightType;
10+
protected BaseOperator(DataType outputType, Stream... inputs) {
1911
this.inputs = inputs;
20-
this.nextOutput = null;
12+
this.output = new Stream(outputType, this);
2113
}
2214

2315
/** Execute one computation step: gather data from the inputs,
2416
* and compute the current output. */
2517
public abstract void step();
2618

27-
public BaseCollection<Weight> getOutput() {
28-
return Objects.requireNonNull(this.nextOutput);
19+
public Stream getOutput() {
20+
return this.output;
2921
}
3022

3123
public int getInputCount() {

0 commit comments

Comments
 (0)