Skip to content

Commit ce3b5cb

Browse files
committed
[SQL] Support for JOIN implementation hints
Signed-off-by: Mihai Budiu <mbudiu@feldera.com>
1 parent afdc48a commit ce3b5cb

25 files changed

Lines changed: 917 additions & 71 deletions

crates/adapters/src/integrated.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ where
3636
}
3737

3838
/// Create an instance of an integrated output endpoint given its config
39-
/// and output relation schema.p
39+
/// and output relation schema.
4040
#[allow(unused)]
4141
pub fn create_integrated_output_endpoint(
4242
endpoint_id: EndpointId,

docs.feldera.com/docs/sql/grammar.md

Lines changed: 57 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -347,7 +347,7 @@ values
347347
: { VALUES | VALUE } expression [, expression ]*
348348
349349
select
350-
: SELECT [ ALL | DISTINCT ]
350+
: SELECT [ hintComment ] [ ALL | DISTINCT ]
351351
{ projectItem [, projectItem ]* }
352352
FROM tableExpression
353353
[ WHERE booleanExpression ]
@@ -360,7 +360,7 @@ select
360360
```
361361
tablePrimary
362362
: tableName '(' TABLE tableName ')'
363-
| tablePrimary '(' columnDecl [, columnDecl ]* ')'
363+
| tablePrimary [ hintComment ] '(' columnDecl [, columnDecl ]* ')'
364364
| [ LATERAL ] '(' query ')'
365365
| UNNEST '(' expression ')' [ WITH ORDINALITY ]
366366
| TABLE '(' functionName '(' expression [, expression ]* ')' ')'
@@ -442,6 +442,61 @@ the column names are *not* used to reorder columns.
442442
In `orderItem`, if expression is a positive integer n, it denotes the
443443
nth item in the `SELECT` clause.
444444

445+
## SQL hints
446+
447+
A hint is an instruction to the optimizer. When writing SQL, you may
448+
know information about the data unknown to the optimizer. Hints
449+
enable you to make decisions normally made by the optimizer.
450+
451+
We support hints in two locations:
452+
453+
- Query Hint: right after the `SELECT` keyword;
454+
- Table Hint: right after the referenced table or view name.
455+
456+
```
457+
SELECT /*+ broadcast(S), shard(T) */
458+
FROM
459+
T /*+ size(5) */
460+
JOIN
461+
S
462+
```
463+
464+
The syntax of hints is:
465+
466+
```
467+
hintComment
468+
: '/*+' hint [, hint ]* '*/'
469+
470+
hint:
471+
hintName
472+
| hintName '(' optionKey '=' optionVal [, optionKey '=' optionVal ]* ')'
473+
| hintName '(' hintOption [, hintOption ]* ')'
474+
475+
optionKey
476+
: simpleIdentifier
477+
| stringLiteral
478+
479+
optionVal
480+
: simpleIdentifier
481+
| stringLiteral
482+
483+
hintOption
484+
: simpleIdentifier
485+
| numericLiteral
486+
| stringLiteral
487+
```
488+
489+
### Supported hints and their impact on query implementation
490+
491+
- `broadcast(`*table*`)`: Indicates that the following `JOIN` should be implemented using
492+
a broadcast-join strategy by broadcasting the input with alias *table*
493+
- `shard(`*table*`)`: Indicates that the following `JOIN` should be implemented using
494+
a hash-join strategy by sharding the input with alias *table*
495+
- `balanced(`*table*`)`: Indicates that the following `JOIN` should be implemented using
496+
a balanced strategy by hashing on all fields the input with alias *table*
497+
498+
Note: specifying hints may inhibit some compiler optimizations.
499+
445500
## Creating indexes
446501

447502
Feldera supports the `CREATE INDEX` SQL statement with the following
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package org.dbsp.sqlCompiler.circuit.annotation;
2+
3+
import com.fasterxml.jackson.databind.JsonNode;
4+
import org.dbsp.sqlCompiler.compiler.errors.SourcePositionRange;
5+
import org.dbsp.util.JsonStream;
6+
import org.dbsp.util.Utilities;
7+
8+
public class JoinStrategy extends Annotation {
9+
public final Strategy strategy;
10+
/** Which input to apply the strategy to */
11+
public final int input;
12+
/** Original input name; used for error reporting only */
13+
public final String inputName;
14+
15+
public enum Strategy {
16+
/** Broadcast */
17+
Broadcast,
18+
/** Shard */
19+
Shard,
20+
/** Dynamic join balancing by hashing entire record */
21+
Balance,
22+
}
23+
24+
public JoinStrategy(Strategy strategy, int input, String inputName) {
25+
this.strategy = strategy;
26+
this.input = input;
27+
this.inputName = inputName;
28+
}
29+
30+
@Override
31+
public void asJson(JsonStream stream) {
32+
stream.beginObject()
33+
.appendClass(this)
34+
.label("strategy")
35+
.append(this.strategy.name())
36+
.label("input")
37+
.append(this.input)
38+
.label("inputName")
39+
.append(this.inputName)
40+
.endObject();
41+
}
42+
43+
public SourcePositionRange getPosition() {
44+
// TODO: https://issues.apache.org/jira/browse/CALCITE-7503
45+
return SourcePositionRange.INVALID;
46+
}
47+
48+
public boolean compatible(JoinStrategy other) {
49+
return this.strategy == other.strategy && this.input == other.input;
50+
}
51+
52+
@Override
53+
public String toString() {
54+
return this.strategy.name() + "(" + this.inputName + ")";
55+
}
56+
57+
public String toRust() {
58+
// Does not contain input; the strategy is actually applied to the corresponding input stream
59+
String policy = this.strategy.name();
60+
return "BalancerHint::Policy(Some(PartitioningPolicy::" + policy + "))";
61+
}
62+
63+
public static JoinStrategy fromJson(JsonNode node) {
64+
Strategy strategy = Strategy.valueOf(Utilities.getStringProperty(node, "strategy"));
65+
int input = Utilities.getIntProperty(node, "input");
66+
String inputName = Utilities.getStringProperty(node, "inputName");
67+
return new JoinStrategy(strategy, input, inputName);
68+
}
69+
}

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/DBSPCompiler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@
8484
import org.dbsp.util.IndentStream;
8585
import org.dbsp.util.Linq;
8686
import org.dbsp.util.Logger;
87-
import org.dbsp.util.RelJsonWriter;
87+
import org.dbsp.sqlCompiler.compiler.frontend.calciteCompiler.RelJsonWriter;
8888
import org.dbsp.util.Utilities;
8989

9090
import javax.annotation.Nullable;

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/backend/rust/BaseRustCodeGenerator.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,10 +108,19 @@ public void addDependency(String crate) {
108108
UnimplementedSemigroup, DefaultSemigroup, HasOne, HasZero, AddByRef, NegByRef,
109109
AddAssignByRef,
110110
},
111-
circuit::{checkpointer::Checkpoint, Circuit, CircuitConfig, NestedCircuit, RootCircuit, Stream, StepSize},
111+
circuit::{
112+
checkpointer::Checkpoint,
113+
circuit_builder::CircuitBase,
114+
Circuit, CircuitConfig,
115+
NestedCircuit,
116+
RootCircuit,
117+
Stream,
118+
StepSize
119+
},
112120
operator::{
113121
apply_n,
114122
dynamic::aggregate::{ArgMinSome, Max, Min, MinSome1, Postprocess},
123+
dynamic::balance::*,
115124
ConstantGenerator,
116125
Generator,
117126
Fold,

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/backend/rust/ToRustInnerVisitor.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2143,7 +2143,7 @@ public VisitDecision preorder(DBSPClosureExpression expression) {
21432143
if (!this.compact) {
21442144
DBSPType resultType = expression.getResultType();
21452145
if (!resultType.is(DBSPTypeVoid.class)) {
2146-
this.builder.append("-> ").newline();
2146+
this.builder.append("->").newline();
21472147
resultType.accept(this);
21482148
this.builder.append(" ");
21492149
}
@@ -2445,8 +2445,7 @@ public VisitDecision preorder(DBSPRawTupleExpression expression) {
24452445
this.builder.increase();
24462446
for (DBSPExpression field : expression.fields) {
24472447
field.accept(this);
2448-
this.builder.append(", ");
2449-
this.builder.newline();
2448+
this.builder.append(",").newline();
24502449
}
24512450
this.builder.decrease();
24522451
this.builder.append(")");
@@ -2470,9 +2469,11 @@ public VisitDecision preorder(DBSPTupleExpression expression) {
24702469
boolean first = true;
24712470
for (DBSPExpression field : expression.fields) {
24722471
if (!first) {
2473-
this.builder.append(", ");
2472+
this.builder.append(",");
24742473
if (newlines)
24752474
this.builder.newline();
2475+
else
2476+
this.builder.append(" ");
24762477
}
24772478
first = false;
24782479
field.accept(this);

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/backend/rust/ToRustVisitor.java

Lines changed: 46 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.fasterxml.jackson.databind.JsonNode;
2727
import com.fasterxml.jackson.databind.node.ObjectNode;
2828
import org.dbsp.sqlCompiler.circuit.DBSPCircuit;
29+
import org.dbsp.sqlCompiler.circuit.annotation.JoinStrategy;
2930
import org.dbsp.sqlCompiler.circuit.operator.*;
3031
import org.dbsp.sqlCompiler.circuit.OutputPort;
3132
import org.dbsp.sqlCompiler.circuit.annotation.OperatorHash;
@@ -90,6 +91,7 @@
9091
import java.util.Iterator;
9192
import java.util.List;
9293
import java.util.Locale;
94+
import java.util.Objects;
9395
import java.util.Set;
9496

9597
/** This visitor generates a Rust implementation of a circuit. */
@@ -1022,7 +1024,7 @@ public VisitDecision preorder(DBSPSinkOperator operator) {
10221024
this.builder.append(",").newline();
10231025
raw.tupFields[1].accept(this.innerVisitor);
10241026
this.builder.decrease().append(">");
1025-
this.builder.append("(hash, ").newline()
1027+
this.builder.append("(hash,").newline()
10261028
.append(this.getInputName(operator, 0))
10271029
.append(".clone()")
10281030
.append(", &SqlIdentifier::from(\"");
@@ -1145,7 +1147,7 @@ public VisitDecision preorder(DBSPChainAggregateOperator operator) {
11451147
this.operationCall(operator);
11461148
this.builder.increase();
11471149
operator.init.accept(this.innerVisitor);
1148-
this.builder.append(", ").newline();
1150+
this.builder.append(",").newline();
11491151
operator.getFunction().accept(this.innerVisitor);
11501152
this.innerVisitor.setOperatorContext(null);
11511153
this.builder.newline().decrease().append(")")
@@ -1331,9 +1333,9 @@ public VisitDecision preorder(DBSPConcreteAsofJoinOperator operator) {
13311333
.append(", ")
13321334
.newline();
13331335
operator.getFunction().accept(this.innerVisitor);
1334-
this.builder.append(", ").newline();
1336+
this.builder.append(",").newline();
13351337
operator.leftTimestamp.accept(this.innerVisitor);
1336-
this.builder.append(", ").newline();
1338+
this.builder.append(",").newline();
13371339
operator.rightTimestamp.accept(this.innerVisitor);
13381340
this.builder.newline()
13391341
.decrease()
@@ -1376,6 +1378,8 @@ VisitDecision processJoinIndexOperator(DBSPJoinBaseOperator operator) {
13761378
.append(this.markDistinct(operator))
13771379
.append(";");
13781380
this.tagStream(operator);
1381+
this.builder.newline();
1382+
this.emitBalancerHints(operator);
13791383
this.innerVisitor.setOperatorContext(null);
13801384
return VisitDecision.STOP;
13811385
}
@@ -1413,9 +1417,9 @@ public VisitDecision preorder(DBSPLagOperator operator) {
14131417
.increase();
14141418
DBSPISizeLiteral offset = new DBSPISizeLiteral(operator.offset);
14151419
offset.accept(this.innerVisitor);
1416-
this.builder.append(", ").newline();
1420+
this.builder.append(",").newline();
14171421
operator.projection.accept(this.innerVisitor);
1418-
this.builder.append(", ").newline();
1422+
this.builder.append(",").newline();
14191423
operator.getFunction().accept(this.innerVisitor);
14201424
this.builder.newline()
14211425
.decrease()
@@ -1464,9 +1468,9 @@ public VisitDecision preorder(DBSPPartitionedRollingAggregateOperator operator)
14641468
this.operationCall(operator);
14651469
this.builder.increase();
14661470
operator.partitioningFunction.accept(this.innerVisitor);
1467-
this.builder.append(", ").newline();
1471+
this.builder.append(",").newline();
14681472
operator.getFunction().accept(this.innerVisitor);
1469-
this.builder.append(", ").newline();
1473+
this.builder.append(",").newline();
14701474
this.emitWindowBounds(operator.lower, operator.upper);
14711475
this.builder
14721476
.decrease()
@@ -1497,9 +1501,9 @@ public VisitDecision preorder(DBSPPartitionedRollingAggregateWithWaterlineOperat
14971501
.append(this.getInputName(operator, 1))
14981502
.append(", ");
14991503
operator.partitioningFunction.accept(this.innerVisitor);
1500-
this.builder.append(", ").newline();
1504+
this.builder.append(",").newline();
15011505
operator.getFunction().accept(this.innerVisitor);
1502-
this.builder.append(", ").newline();
1506+
this.builder.append(",").newline();
15031507
this.emitWindowBounds(operator.lower, operator.upper);
15041508
this.builder
15051509
.decrease()
@@ -1553,7 +1557,7 @@ public VisitDecision preorder(DBSPAggregateLinearPostprocessOperator operator) {
15531557
this.operationCall(operator);
15541558
this.builder.increase();
15551559
operator.getFunction().accept(this.innerVisitor);
1556-
this.builder.append(", ").newline();
1560+
this.builder.append(",").newline();
15571561
operator.postProcess.accept(this.innerVisitor);
15581562
this.builder.newline()
15591563
.decrease()
@@ -1583,11 +1587,11 @@ public VisitDecision preorder(DBSPAggregateLinearPostprocessRetainKeysOperator o
15831587
.append(this.getInputName(operator, 1))
15841588
// FIXME: temporary workaround until the compiler learns about TypedBox
15851589
.append(".apply(|bound| TypedBox::<_, DynData>::new(bound.clone()))")
1586-
.append(", ").newline();
1590+
.append(",").newline();
15871591
operator.retainKeysFunction.accept(this.innerVisitor);
1588-
this.builder.append(", ").newline();
1592+
this.builder.append(",").newline();
15891593
operator.getFunction().accept(this.innerVisitor);
1590-
this.builder.append(", ").newline();
1594+
this.builder.append(",").newline();
15911595
operator.postProcess.accept(this.innerVisitor);
15921596
this.builder.newline()
15931597
.decrease()
@@ -1689,6 +1693,27 @@ IIndentStream writeComments(DBSPOperator operator) {
16891693
(more ? (operator.comment != null ? "\n" + operator.comment : "") : ""));
16901694
}
16911695

1696+
/** Generate hints for the dynamic join balancer based on user-supplied annotations */
1697+
void emitBalancerHints(DBSPBinaryOperator operator) {
1698+
var strategies = operator.annotations.get(JoinStrategy.class);
1699+
for (var strategy: strategies) {
1700+
switch (strategy.strategy) {
1701+
case Shard:
1702+
case Balance:
1703+
case Broadcast: {
1704+
this.builder.append("circuit.set_balancer_hint(").increase();
1705+
var input = strategy.input == 0 ? operator.left() : operator.right();
1706+
var inputHash = OperatorHash.getHash(input.node(), true);
1707+
this.builder.append(Utilities.doubleQuote(Objects.requireNonNull(inputHash).toString(), true))
1708+
.append(",").newline();
1709+
this.builder.append(strategy.toRust()).newline();
1710+
this.builder.decrease().append(");").newline();
1711+
break;
1712+
}
1713+
}
1714+
}
1715+
}
1716+
16921717
VisitDecision processJoinBase(DBSPJoinBaseOperator operator) {
16931718
this.computeHash(operator);
16941719
this.innerVisitor.setOperatorContext(operator);
@@ -1711,10 +1736,17 @@ VisitDecision processJoinBase(DBSPJoinBaseOperator operator) {
17111736
.append(this.markDistinct(operator))
17121737
.append(";");
17131738
this.tagStream(operator);
1739+
this.builder.newline();
1740+
this.emitBalancerHints(operator);
17141741
this.innerVisitor.setOperatorContext(null);
17151742
return VisitDecision.STOP;
17161743
}
17171744

1745+
@Override
1746+
public VisitDecision preorder(DBSPStreamJoinOperator operator) {
1747+
return this.processJoinBase(operator);
1748+
}
1749+
17181750
@Override
17191751
public VisitDecision preorder(DBSPJoinOperator operator) {
17201752
return this.processJoinBase(operator);

0 commit comments

Comments
 (0)