@@ -33,8 +33,7 @@ use feldera_types::deserialize_table_record;
3333use feldera_types:: program_schema:: { ColumnType , Field , Relation , SqlIdentifier } ;
3434use feldera_types:: secret_resolver:: default_secrets_directory;
3535use feldera_types:: transport:: kafka:: {
36- KafkaInputConfig , KafkaLogLevel , KafkaStartFromConfig , default_group_join_timeout_secs,
37- default_redpanda_server,
36+ KafkaInputConfig , KafkaLogLevel , KafkaStartFromConfig , default_redpanda_server,
3837} ;
3938use parquet:: data_type:: AsBytes ;
4039use proptest:: prelude:: * ;
@@ -1343,30 +1342,23 @@ fn test_offset(
13431342 producer. send_string ( "" , topic) ;
13441343 }
13451344
1345+ let mut kafka_options = BTreeMap :: new ( ) ;
1346+ let auto_offset_reset = match start_from {
1347+ KafkaStartFromConfig :: Earliest => Some ( "earliest" ) ,
1348+ KafkaStartFromConfig :: Latest => Some ( "latest" ) ,
1349+ KafkaStartFromConfig :: Offsets ( _) | KafkaStartFromConfig :: Timestamp ( _) => None ,
1350+ } ;
1351+ if let Some ( auto_offset_reset) = auto_offset_reset {
1352+ kafka_options. insert ( "auto.offset.reset" . into ( ) , auto_offset_reset. into ( ) ) ;
1353+ }
1354+ kafka_options. insert ( "bootstrap.servers" . into ( ) , default_redpanda_server ( ) ) ;
1355+ kafka_options. insert ( "group.id" . into ( ) , "test-client" . into ( ) ) ;
1356+
13461357 let config = InputEndpointConfig {
13471358 stream : Cow :: from ( "test_input" ) ,
13481359 connector_config : ConnectorConfig {
13491360 transport : TransportConfig :: KafkaInput ( KafkaInputConfig {
1350- kafka_options : {
1351- let mut kafka_options = BTreeMap :: new ( ) ;
1352- let auto_offset_reset = match start_from {
1353- KafkaStartFromConfig :: Earliest => Some ( "earliest" ) ,
1354- KafkaStartFromConfig :: Latest => Some ( "latest" ) ,
1355- KafkaStartFromConfig :: Offsets ( _) | KafkaStartFromConfig :: Timestamp ( _) => {
1356- None
1357- }
1358- } ;
1359- if let Some ( auto_offset_reset) = auto_offset_reset {
1360- kafka_options. insert ( "auto.offset.reset" . into ( ) , auto_offset_reset. into ( ) ) ;
1361- }
1362- kafka_options. insert ( "bootstrap.servers" . into ( ) , default_redpanda_server ( ) ) ;
1363- kafka_options. insert ( "group.id" . into ( ) , "test-client" . into ( ) ) ;
1364- kafka_options
1365- } ,
1366- topic : topic. into ( ) ,
13671361 log_level : Some ( KafkaLogLevel :: Debug ) ,
1368- group_join_timeout_secs : default_group_join_timeout_secs ( ) ,
1369- poller_threads : None ,
13701362 start_from : match start_from {
13711363 KafkaStartFromConfig :: Timestamp ( _) => {
13721364 sleep ( Duration :: from_secs ( 2 ) ) ;
@@ -1381,14 +1373,7 @@ fn test_offset(
13811373 }
13821374 other => other,
13831375 } ,
1384- region : None ,
1385- partitions : None ,
1386- resume_earliest_if_data_expires : false ,
1387- include_headers : None ,
1388- include_timestamp : None ,
1389- include_partition : None ,
1390- include_offset : None ,
1391- include_topic : None ,
1376+ ..KafkaInputConfig :: default ( kafka_options, topic)
13921377 } ) ,
13931378 format : Some ( FormatConfig {
13941379 name : Cow :: from ( "csv" ) ,
@@ -1796,39 +1781,26 @@ fn test_input_partition(
17961781) {
17971782 let _kafka = KafkaResources :: create_topics ( & [ ( topic, n_partitions) ] ) ;
17981783
1784+ let mut kafka_options = BTreeMap :: new ( ) ;
1785+ let auto_offset_reset = match start_from {
1786+ KafkaStartFromConfig :: Earliest => Some ( "earliest" ) ,
1787+ KafkaStartFromConfig :: Latest => Some ( "latest" ) ,
1788+ KafkaStartFromConfig :: Offsets ( _) | KafkaStartFromConfig :: Timestamp ( _) => None ,
1789+ } ;
1790+ if let Some ( auto_offset_reset) = auto_offset_reset {
1791+ kafka_options. insert ( "auto.offset.reset" . into ( ) , auto_offset_reset. into ( ) ) ;
1792+ }
1793+ kafka_options. insert ( "bootstrap.servers" . into ( ) , default_redpanda_server ( ) ) ;
1794+ kafka_options. insert ( "group.id" . into ( ) , "test-client" . into ( ) ) ;
1795+
17991796 let config = InputEndpointConfig {
18001797 stream : Cow :: from ( "test_input" ) ,
18011798 connector_config : ConnectorConfig {
18021799 transport : TransportConfig :: KafkaInput ( KafkaInputConfig {
1803- kafka_options : {
1804- let mut kafka_options = BTreeMap :: new ( ) ;
1805- let auto_offset_reset = match start_from {
1806- KafkaStartFromConfig :: Earliest => Some ( "earliest" ) ,
1807- KafkaStartFromConfig :: Latest => Some ( "latest" ) ,
1808- KafkaStartFromConfig :: Offsets ( _) | KafkaStartFromConfig :: Timestamp ( _) => {
1809- None
1810- }
1811- } ;
1812- if let Some ( auto_offset_reset) = auto_offset_reset {
1813- kafka_options. insert ( "auto.offset.reset" . into ( ) , auto_offset_reset. into ( ) ) ;
1814- }
1815- kafka_options. insert ( "bootstrap.servers" . into ( ) , default_redpanda_server ( ) ) ;
1816- kafka_options. insert ( "group.id" . into ( ) , "test-client" . into ( ) ) ;
1817- kafka_options
1818- } ,
1819- topic : topic. into ( ) ,
18201800 log_level : Some ( KafkaLogLevel :: Debug ) ,
1821- group_join_timeout_secs : default_group_join_timeout_secs ( ) ,
1822- poller_threads : None ,
18231801 start_from : start_from. clone ( ) ,
1824- region : None ,
18251802 partitions : Some ( partitions. clone ( ) ) ,
1826- resume_earliest_if_data_expires : false ,
1827- include_headers : None ,
1828- include_timestamp : None ,
1829- include_partition : None ,
1830- include_offset : None ,
1831- include_topic : None ,
1803+ ..KafkaInputConfig :: default ( kafka_options, topic)
18321804 } ) ,
18331805 format : Some ( FormatConfig {
18341806 name : Cow :: from ( "csv" ) ,
0 commit comments