Skip to content

Commit 28cdb13

Browse files
ryzhykgz
authored andcommitted
[docs] Document the Kafka metadata feature.
Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
1 parent d92bb68 commit 28cdb13

4 files changed

Lines changed: 80 additions & 49 deletions

File tree

crates/feldera-types/src/transport/kafka.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -80,35 +80,35 @@ pub struct KafkaInputConfig {
8080
/// Whether to include Kafka headers in the record metadata.
8181
///
8282
/// When `true`, Kafka message headers are available via the `CONNECTOR_METADATA()` function.
83-
/// See [https://docs.feldera.com/connectors/sources/kafka#metadata] for details.
83+
/// See <https://docs.feldera.com/connectors/sources/kafka#metadata> for details.
8484
#[serde(default)]
8585
pub include_headers: Option<bool>,
8686

8787
/// Whether to include Kafka message timestamp in the record metadata.
8888
///
8989
/// When `true`, Kafka message timestamp is available via the `CONNECTOR_METADATA()` function.
90-
/// See [https://docs.feldera.com/connectors/sources/kafka#metadata] for details.
90+
/// See <https://docs.feldera.com/connectors/sources/kafka#metadata> for details.
9191
#[serde(default)]
9292
pub include_timestamp: Option<bool>,
9393

9494
/// Whether to include Kafka partition in the record metadata.
9595
///
9696
/// When `true`, Kafka partition from which the message was read is available via the `CONNECTOR_METADATA()` function.
97-
/// See [https://docs.feldera.com/connectors/sources/kafka#metadata] for details.
97+
/// See <https://docs.feldera.com/connectors/sources/kafka#metadata> for details.
9898
#[serde(default)]
9999
pub include_partition: Option<bool>,
100100

101101
/// Whether to include Kafka message offset in the record metadata.
102102
///
103103
/// When `true`, Kafka message offset is available via the `CONNECTOR_METADATA()` function.
104-
/// See [https://docs.feldera.com/connectors/sources/kafka#metadata] for details.
104+
/// See <https://docs.feldera.com/connectors/sources/kafka#metadata> for details.
105105
#[serde(default)]
106106
pub include_offset: Option<bool>,
107107

108108
/// Whether to include Kafka topic in the record metadata.
109109
///
110110
/// When `true`, Kafka topic from which the message was read is available via the `CONNECTOR_METADATA()` function.
111-
/// See [https://docs.feldera.com/connectors/sources/kafka#metadata] for details.
111+
/// See <https://docs.feldera.com/connectors/sources/kafka#metadata> for details.
112112
#[serde(default)]
113113
pub include_topic: Option<bool>,
114114
}

docs.feldera.com/docs/connectors/sources/kafka.md

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,11 @@ tolerance](/pipelines/fault-tolerance).
1818
| `group_join_timeout_secs` | seconds | 10 | Maximum timeout (in seconds) for the endpoint to join the Kafka consumer group during initialization. |
1919
| `poller_threads` | positive integer | 3 | Number of threads used to poll Kafka messages. Setting it to multiple threads can improve performance with small messages. Default is 3. |
2020
| `resume_earliest_if_data_expires` | boolean | false | See [Tolerating missing data on resume](#tolerating-missing-data-on-resume). |
21+
| `include_headers` | boolean | false | Whether to include Kafka headers in connector metadata (see [Accessing Kafka metadata](#metadata)). |
22+
| `include_topic` | boolean | false | Whether to include Kafka topic name in connector metadata (see [Accessing Kafka metadata](#metadata)). |
23+
| `include_partition` | boolean | false | Whether to include Kafka partition in connector metadata (see [Accessing Kafka metadata](#metadata)). |
24+
| `include_offset` | boolean | false | Whether to include Kafka offset name in connector metadata (see [Accessing Kafka metadata](#metadata)). |
25+
| `include_timestamp` | boolean | false | Whether to include Kafka timestamp in connector metadata (see [Accessing Kafka metadata](#metadata)). |
2126

2227
The connector passes additional options directly to [**librdkafka**](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md). Some of the relevant options:
2328

@@ -263,6 +268,76 @@ CREATE TABLE INPUT (
263268
);
264269
```
265270

271+
## <a name="metadata"></a>Accessing Kafka metadata
272+
273+
Kafka messages include several metadata attributes in addition to the payload. These can be extracted by the Kafka connector and accessed from SQL:
274+
275+
| Metadata attribute | SQL type | `CONNECTOR_METADATA()` field |Configuration option |
276+
|--------------------|--------------------------|------------------------------|---------------------|
277+
| Message headers | `MAP<STRING, VARBINARY>` | `kafka_headers` |`include_headers` |
278+
| Topic name | `VARCHAR` | `kafka_topic` |`include_topic` |
279+
| Partition | `INT` | `kafka_partition` |`include_partition` |
280+
| Message offset | `BIGINT` | `kafka_offset` |`include_offset` |
281+
| Timestamp | `TIMESTAMP` | `kafka_timestamp` |`include_timestamp` |
282+
283+
Some applications need to ingest and store these attributes alongside the message payload.
284+
The steps below describe how to extract and use Kafka metadata in SQL tables.
285+
286+
1. **Enable metadata extraction in the Kafka connector.**
287+
Use the configuration options listed in the table above to enable only the metadata fields your application needs.
288+
Extracting unnecessary attributes adds overhead to ingestion and processing.
289+
290+
2. **Use metadata values to populate table columns.**
291+
Enabled metadata attributes are exposed via the `CONNECTOR_METADATA()` function, which returns a
292+
`VARIANT` containing a map with all selected attributes. You can reference these values in `DEFAULT`
293+
expressions to initialize table columns:
294+
295+
```sql
296+
create table my_table(
297+
x int,
298+
kafka_headers MAP<STRING, VARBINARY> DEFAULT CAST(CONNECTOR_METADATA()['kafka_headers'] as MAP<STRING, VARBINARY>),
299+
kafka_timestamp TIMESTAMP DEFAULT CAST(CONNECTOR_METADATA()['kafka_timestamp'] as TIMESTAMP),
300+
kafka_topic VARCHAR DEFAULT CAST(CONNECTOR_METADATA()['kafka_topic'] AS VARCHAR),
301+
kafka_offset BIGINT DEFAULT CAST(CONNECTOR_METADATA()['kafka_offset'] AS BIGINT),
302+
kafka_partition INT DEFAULT CAST(CONNECTOR_METADATA()['kafka_partition'] AS INT)
303+
) with (
304+
'materialized' = 'true',
305+
'connectors' = '[{
306+
"transport": {
307+
"name": "kafka_input",
308+
"config": {
309+
"topic": "meta_topic",
310+
"start_from": "earliest",
311+
"bootstrap.servers": "localhost:19092",
312+
"include_headers": true,
313+
"include_topic": true,
314+
"include_offset": true,
315+
"include_partition": true,
316+
"include_timestamp": true
317+
}
318+
},
319+
"format": {
320+
"name": "json",
321+
"config": {
322+
"update_format": "raw",
323+
"array": false
324+
}
325+
}
326+
}]');
327+
```
328+
329+
### Converting Kafka header values to strings
330+
331+
Kafka headers can contain arbitrary byte arrays, but in practice they typically hold UTF-8–encoded strings.
332+
Use the `BIN2UTF8` function to convert binary values to text:
333+
334+
```sql
335+
create materialized view v as
336+
select
337+
BIN2UTF8(kafka_headers['my_header']) as my_header
338+
from t;
339+
```
340+
266341
## Tolerating missing data on resume
267342

268343
The `resume_earliest_if_data_expires` setting controls how the Kafka

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/backend/rust/ToRustInnerVisitor.java

Lines changed: 0 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -37,15 +37,7 @@
3737
import org.dbsp.sqlCompiler.compiler.frontend.calciteCompiler.ProgramIdentifier;
3838
import org.dbsp.sqlCompiler.compiler.visitors.VisitDecision;
3939
import org.dbsp.sqlCompiler.compiler.visitors.inner.EliminateStructs;
40-
<<<<<<< Updated upstream
4140
import org.dbsp.sqlCompiler.compiler.visitors.inner.ExpressionTranslator;
42-
import org.dbsp.sqlCompiler.compiler.visitors.inner.InnerRewriteVisitor;
43-
=======
44-
<<<<<<< Updated upstream
45-
=======
46-
import org.dbsp.sqlCompiler.compiler.visitors.inner.ExpressionTranslator;
47-
>>>>>>> Stashed changes
48-
>>>>>>> Stashed changes
4941
import org.dbsp.sqlCompiler.compiler.visitors.inner.InnerVisitor;
5042
import org.dbsp.sqlCompiler.compiler.visitors.inner.CreateRuntimeErrorWrappers;
5143
import org.dbsp.sqlCompiler.ir.DBSPFunction;
@@ -1045,11 +1037,6 @@ protected void generateFromTrait(DBSPTypeStruct type) {
10451037
.newline();
10461038
}
10471039

1048-
<<<<<<< Updated upstream
1049-
=======
1050-
<<<<<<< Updated upstream
1051-
=======
1052-
>>>>>>> Stashed changes
10531040
/** Replace every instance of a call to connector_metadata() with
10541041
* a reference to a variable connector_metadata. */
10551042
static class RewriteConnectorMetadata extends ExpressionTranslator {
@@ -1066,16 +1053,13 @@ public static String variableName() {
10661053
}
10671054

10681055
@Override
1069-
<<<<<<< Updated upstream
1070-
=======
10711056
public void postorder(DBSPHandleErrorExpression expression) {
10721057
DBSPExpression source = this.getE(expression.source);
10731058
DBSPExpression result = new DBSPHandleErrorExpression(expression.getNode(), expression.index, source, false);
10741059
this.map(expression, result);
10751060
}
10761061

10771062
@Override
1078-
>>>>>>> Stashed changes
10791063
public void postorder(DBSPApplyExpression expression) {
10801064
String function = expression.getFunctionName();
10811065
if (function != null && function.equalsIgnoreCase(CustomFunctions.ConnectorMetadataFunction.NAME)) {
@@ -1086,10 +1070,6 @@ public void postorder(DBSPApplyExpression expression) {
10861070
}
10871071
}
10881072

1089-
<<<<<<< Updated upstream
1090-
=======
1091-
>>>>>>> Stashed changes
1092-
>>>>>>> Stashed changes
10931073
/**
10941074
* Generate calls to the Rust macros that generate serialization and deserialization code
10951075
* for the struct.
@@ -1132,31 +1112,17 @@ protected void generateRenameMacro(DBSPTypeStruct type,
11321112
field.type.accept(this);
11331113
this.builder.append(", ");
11341114
if (meta == null || meta.defaultValue == null) {
1135-
<<<<<<< Updated upstream
1136-
this.builder.append(field.type.mayBeNull ? "|_| Some(None)" : "|_| None");
1137-
=======
1138-
<<<<<<< Updated upstream
1139-
this.builder.append(field.type.mayBeNull ? "Some(None)" : "None");
1140-
} else {
1141-
=======
11421115
this.builder.append("|_| ");
11431116
if (isOption)
11441117
this.builder.append("Some(");
11451118
this.builder.append(field.type.mayBeNull ? "Some(None)" : "None");
1146-
>>>>>>> Stashed changes
11471119
} else {
11481120
RewriteConnectorMetadata rw = new RewriteConnectorMetadata(this.compiler);
11491121
this.builder.append("|")
11501122
.append(RewriteConnectorMetadata.variableName())
1151-
<<<<<<< Updated upstream
11521123
.append(": &Option<Variant>| Some(");
1153-
=======
1154-
.append(": &Option<Variant>| ");
11551124
if (isOption)
11561125
this.builder.append("Some(");
1157-
>>>>>>> Stashed changes
1158-
this.builder.append("Some(");
1159-
>>>>>>> Stashed changes
11601126
IDBSPInnerNode defaultValue = this.createErrorWrappers.apply(meta.defaultValue);
11611127
defaultValue = rw.apply(defaultValue);
11621128
defaultValue.accept(this);

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/visitors/inner/CreateRuntimeErrorWrappers.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -68,19 +68,9 @@ public void postorder(DBSPCastExpression expression) {
6868
DBSPExpression cast = new DBSPCastExpression(expression.getNode(), source, expression.getType(), expression.safe);
6969
// Wrap the cast into an error handler
7070
DBSPHandleErrorExpression handler = new DBSPHandleErrorExpression(
71-
<<<<<<< Updated upstream
72-
expression.getNode(), this.getIndex(expression.getSourcePosition().start), cast,
73-
// source code may not be available outside an operator
74-
this.operatorContext != null);
75-
=======
76-
<<<<<<< Updated upstream
77-
expression.getNode(), this.getIndex(expression.getSourcePosition().start), cast);
78-
=======
7971
expression.getNode(), this.getIndex(expression.getSourcePosition().start), cast,
8072
// source code may not be available outside an operator
8173
true);
82-
>>>>>>> Stashed changes
83-
>>>>>>> Stashed changes
8474
this.map(expression, handler);
8575
}
8676

0 commit comments

Comments
 (0)