Skip to content

Commit cbd36c9

Browse files
committed
[adapters] Simplify initialization of mostly-default KafkaInputConfig.
This prepares for adding another configuration setting. Signed-off-by: Ben Pfaff <blp@feldera.com>
1 parent 3532660 commit cbd36c9

File tree

2 files changed

+49
-55
lines changed
  • crates

2 files changed

+49
-55
lines changed

crates/adapters/src/transport/kafka/ft/test.rs

Lines changed: 27 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,7 @@ use feldera_types::deserialize_table_record;
3333
use feldera_types::program_schema::{ColumnType, Field, Relation, SqlIdentifier};
3434
use feldera_types::secret_resolver::default_secrets_directory;
3535
use feldera_types::transport::kafka::{
36-
KafkaInputConfig, KafkaLogLevel, KafkaStartFromConfig, default_group_join_timeout_secs,
37-
default_redpanda_server,
36+
KafkaInputConfig, KafkaLogLevel, KafkaStartFromConfig, default_redpanda_server,
3837
};
3938
use parquet::data_type::AsBytes;
4039
use proptest::prelude::*;
@@ -1343,30 +1342,23 @@ fn test_offset(
13431342
producer.send_string("", topic);
13441343
}
13451344

1345+
let mut kafka_options = BTreeMap::new();
1346+
let auto_offset_reset = match start_from {
1347+
KafkaStartFromConfig::Earliest => Some("earliest"),
1348+
KafkaStartFromConfig::Latest => Some("latest"),
1349+
KafkaStartFromConfig::Offsets(_) | KafkaStartFromConfig::Timestamp(_) => None,
1350+
};
1351+
if let Some(auto_offset_reset) = auto_offset_reset {
1352+
kafka_options.insert("auto.offset.reset".into(), auto_offset_reset.into());
1353+
}
1354+
kafka_options.insert("bootstrap.servers".into(), default_redpanda_server());
1355+
kafka_options.insert("group.id".into(), "test-client".into());
1356+
13461357
let config = InputEndpointConfig {
13471358
stream: Cow::from("test_input"),
13481359
connector_config: ConnectorConfig {
13491360
transport: TransportConfig::KafkaInput(KafkaInputConfig {
1350-
kafka_options: {
1351-
let mut kafka_options = BTreeMap::new();
1352-
let auto_offset_reset = match start_from {
1353-
KafkaStartFromConfig::Earliest => Some("earliest"),
1354-
KafkaStartFromConfig::Latest => Some("latest"),
1355-
KafkaStartFromConfig::Offsets(_) | KafkaStartFromConfig::Timestamp(_) => {
1356-
None
1357-
}
1358-
};
1359-
if let Some(auto_offset_reset) = auto_offset_reset {
1360-
kafka_options.insert("auto.offset.reset".into(), auto_offset_reset.into());
1361-
}
1362-
kafka_options.insert("bootstrap.servers".into(), default_redpanda_server());
1363-
kafka_options.insert("group.id".into(), "test-client".into());
1364-
kafka_options
1365-
},
1366-
topic: topic.into(),
13671361
log_level: Some(KafkaLogLevel::Debug),
1368-
group_join_timeout_secs: default_group_join_timeout_secs(),
1369-
poller_threads: None,
13701362
start_from: match start_from {
13711363
KafkaStartFromConfig::Timestamp(_) => {
13721364
sleep(Duration::from_secs(2));
@@ -1381,14 +1373,7 @@ fn test_offset(
13811373
}
13821374
other => other,
13831375
},
1384-
region: None,
1385-
partitions: None,
1386-
resume_earliest_if_data_expires: false,
1387-
include_headers: None,
1388-
include_timestamp: None,
1389-
include_partition: None,
1390-
include_offset: None,
1391-
include_topic: None,
1376+
..KafkaInputConfig::default(kafka_options, topic)
13921377
}),
13931378
format: Some(FormatConfig {
13941379
name: Cow::from("csv"),
@@ -1796,39 +1781,26 @@ fn test_input_partition(
17961781
) {
17971782
let _kafka = KafkaResources::create_topics(&[(topic, n_partitions)]);
17981783

1784+
let mut kafka_options = BTreeMap::new();
1785+
let auto_offset_reset = match start_from {
1786+
KafkaStartFromConfig::Earliest => Some("earliest"),
1787+
KafkaStartFromConfig::Latest => Some("latest"),
1788+
KafkaStartFromConfig::Offsets(_) | KafkaStartFromConfig::Timestamp(_) => None,
1789+
};
1790+
if let Some(auto_offset_reset) = auto_offset_reset {
1791+
kafka_options.insert("auto.offset.reset".into(), auto_offset_reset.into());
1792+
}
1793+
kafka_options.insert("bootstrap.servers".into(), default_redpanda_server());
1794+
kafka_options.insert("group.id".into(), "test-client".into());
1795+
17991796
let config = InputEndpointConfig {
18001797
stream: Cow::from("test_input"),
18011798
connector_config: ConnectorConfig {
18021799
transport: TransportConfig::KafkaInput(KafkaInputConfig {
1803-
kafka_options: {
1804-
let mut kafka_options = BTreeMap::new();
1805-
let auto_offset_reset = match start_from {
1806-
KafkaStartFromConfig::Earliest => Some("earliest"),
1807-
KafkaStartFromConfig::Latest => Some("latest"),
1808-
KafkaStartFromConfig::Offsets(_) | KafkaStartFromConfig::Timestamp(_) => {
1809-
None
1810-
}
1811-
};
1812-
if let Some(auto_offset_reset) = auto_offset_reset {
1813-
kafka_options.insert("auto.offset.reset".into(), auto_offset_reset.into());
1814-
}
1815-
kafka_options.insert("bootstrap.servers".into(), default_redpanda_server());
1816-
kafka_options.insert("group.id".into(), "test-client".into());
1817-
kafka_options
1818-
},
1819-
topic: topic.into(),
18201800
log_level: Some(KafkaLogLevel::Debug),
1821-
group_join_timeout_secs: default_group_join_timeout_secs(),
1822-
poller_threads: None,
18231801
start_from: start_from.clone(),
1824-
region: None,
18251802
partitions: Some(partitions.clone()),
1826-
resume_earliest_if_data_expires: false,
1827-
include_headers: None,
1828-
include_timestamp: None,
1829-
include_partition: None,
1830-
include_offset: None,
1831-
include_topic: None,
1803+
..KafkaInputConfig::default(kafka_options, topic)
18321804
}),
18331805
format: Some(FormatConfig {
18341806
name: Cow::from("csv"),

crates/feldera-types/src/transport/kafka.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,28 @@ pub struct KafkaInputConfig {
114114
}
115115

116116
impl KafkaInputConfig {
117+
/// Returns a default [KafkaInputConfig] with the given `kafka_options` and
118+
/// `topic`. To be a usable configuration, `kafka_options` must contain at
119+
/// least `bootstrap.servers`.
120+
pub fn default(kafka_options: BTreeMap<String, String>, topic: impl Into<String>) -> Self {
121+
Self {
122+
kafka_options,
123+
topic: topic.into(),
124+
log_level: None,
125+
group_join_timeout_secs: default_group_join_timeout_secs(),
126+
poller_threads: None,
127+
start_from: KafkaStartFromConfig::default(),
128+
region: None,
129+
partitions: None,
130+
resume_earliest_if_data_expires: false,
131+
include_headers: None,
132+
include_timestamp: None,
133+
include_partition: None,
134+
include_offset: None,
135+
include_topic: None,
136+
}
137+
}
138+
117139
// Returns the number of threads to use based on configuration, defaults,
118140
// and system resources.
119141
pub fn poller_threads(&self) -> usize {

0 commit comments

Comments
 (0)