-
Notifications
You must be signed in to change notification settings - Fork 111
Expand file tree
/
Copy pathdatafusion.rs
More file actions
174 lines (153 loc) · 5.92 KB
/
datafusion.rs
File metadata and controls
174 lines (153 loc) · 5.92 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
use crate::errors::journal::ControllerError;
use anyhow::{Error as AnyError, anyhow};
use arrow::array::Array;
use datafusion::common::arrow::array::{AsArray, RecordBatch};
use datafusion::logical_expr::sqlparser::parser::ParserError;
use datafusion::prelude::{SQLOptions, SessionContext};
use datafusion::sql::sqlparser::dialect::GenericDialect;
use datafusion::sql::sqlparser::parser::Parser;
use feldera_types::program_schema::{ColumnType, Field, Relation, SqlType};
/// Execute a SQL query and collect all results in a vector of `RecordBatch`'s.
pub async fn execute_query_collect(
datafusion: &SessionContext,
query: &str,
) -> Result<Vec<RecordBatch>, AnyError> {
let options = SQLOptions::new()
.with_allow_ddl(false)
.with_allow_dml(false);
let df = datafusion
.sql_with_options(query, options)
.await
.map_err(|e| anyhow!("error compiling query '{query}': {e}"))?;
df.collect()
.await
.map_err(|e| anyhow!("error executing query '{query}': {e}"))
}
/// Execute a SQL query that returns a result with exactly one row and column of type `string`.
pub async fn execute_singleton_query(
datafusion: &SessionContext,
query: &str,
) -> Result<String, AnyError> {
let result = execute_query_collect(datafusion, query).await?;
if result.len() != 1 {
return Err(anyhow!(
"internal error: query '{query}' returned {} batches; expected: 1",
result.len()
));
}
if result[0].num_rows() != 1 {
return Err(anyhow!(
"internal error: query '{query}' returned {} rows; expected: 1",
result[0].num_rows()
));
}
if result[0].num_columns() != 1 {
return Err(anyhow!(
"internal error: query '{query}' returned {} columns; expected: 1",
result[0].num_columns()
));
}
let column0 = result[0].column(0);
array_to_string(column0).ok_or_else(|| {
anyhow!("internal error: cannot retrieve the output of query '{query}' as a string")
})
}
pub fn array_to_string(array: &dyn Array) -> Option<String> {
if let Some(string_view_array) = array.as_string_view_opt() {
Some(string_view_array.value(0).to_string())
} else {
array
.as_string_opt::<i32>()
.map(|array| array.value(0).to_string())
}
}
/// Parse expression only to validate it.
pub fn validate_sql_expression(expr: &str) -> Result<(), ParserError> {
let mut parser = Parser::new(&GenericDialect).try_with_sql(expr)?;
parser.parse_expr()?;
Ok(())
}
/// Convert a value of the timestamp column returned by a SQL query into a valid
/// SQL expression.
pub fn timestamp_to_sql_expression(column_type: &ColumnType, expr: &str) -> String {
match column_type.typ {
SqlType::Timestamp => format!("timestamp '{expr}'"),
SqlType::Date => format!("date '{expr}'"),
_ => expr.to_string(),
}
}
/// Check that the `timestamp` field has one of supported types.
pub fn validate_timestamp_type(
endpoint_name: &str,
timestamp: &Field,
docs: &str,
) -> Result<(), ControllerError> {
if !timestamp.columntype.is_integral_type()
&& !matches!(
×tamp.columntype.typ,
SqlType::Date | SqlType::Timestamp
)
{
return Err(ControllerError::invalid_transport_configuration(
endpoint_name,
&format!(
"timestamp column '{}' has unsupported type {}; supported types for 'timestamp_column' are integer types, DATE, and TIMESTAMP; {docs}",
timestamp.name,
serde_json::to_string(×tamp.columntype).unwrap()
),
));
}
Ok(())
}
/// Validate 'timestamp_column'.
pub async fn validate_timestamp_column(
endpoint_name: &str,
timestamp_column: &str,
datafusion: &SessionContext,
schema: &Relation,
docs: &str,
) -> Result<(), ControllerError> {
// Lookup column in the schema.
let Some(field) = schema.field(timestamp_column) else {
return Err(ControllerError::invalid_transport_configuration(
endpoint_name,
&format!("timestamp column '{timestamp_column}' not found in table schema"),
));
};
// Field must have a supported type.
validate_timestamp_type(endpoint_name, field, docs)?;
// Column must have lateness.
let Some(lateness) = &field.lateness else {
return Err(ControllerError::invalid_transport_configuration(
endpoint_name,
&format!(
"timestamp column '{timestamp_column}' does not have a LATENESS attribute; {docs}"
),
));
};
// Validate lateness expression.
validate_sql_expression(lateness).map_err(|e|
ControllerError::invalid_transport_configuration(
endpoint_name,
&format!("error parsing LATENESS attribute '{lateness}' of the timestamp column '{timestamp_column}': {e}; {docs}"),
),
)?;
// Lateness has to be >0. Zero would mean that we need to ingest data strictly in order. If we need to support this case in the future,
// we could revert to our old (and very costly) strategy of issuing a single `select *` query with the 'ORDER BY timestamp_column' clause,
// which requires storing and sorting the entire collection locally.
let is_zero = execute_singleton_query(
datafusion,
&format!("select cast((({lateness} + {lateness}) = {lateness}) as string)"),
)
.await
.map_err(|e| ControllerError::invalid_transport_configuration(endpoint_name, &e.to_string()))?;
if &is_zero == "true" {
return Err(ControllerError::invalid_transport_configuration(
endpoint_name,
&format!(
"invalid LATENESS attribute '{lateness}' of the timestamp column '{timestamp_column}': LATENESS must be greater than zero; {docs}"
),
));
}
Ok(())
}