@@ -791,6 +791,7 @@ impl EthereumAdapter {
791791 stream:: iter_ok :: < _ , Error > ( block_nums. into_iter ( ) . map ( move |block_num| {
792792 let web3 = web3. clone ( ) ;
793793 retry ( format ! ( "load block ptr {}" , block_num) , & logger)
794+ . when ( |res| !res. is_ok ( ) && !detect_null_block ( res) )
794795 . no_limit ( )
795796 . timeout_secs ( ENV_VARS . json_rpc_timeout . as_secs ( ) )
796797 . run ( move || {
@@ -810,8 +811,16 @@ impl EthereumAdapter {
810811 . boxed ( )
811812 . compat ( )
812813 . from_err ( )
814+ . then ( |res| {
815+ if detect_null_block ( & res) {
816+ Ok ( None )
817+ } else {
818+ Some ( res) . transpose ( )
819+ }
820+ } )
813821 } ) )
814822 . buffered ( ENV_VARS . block_batch_size )
823+ . filter_map ( |b| b)
815824 . map ( |b| b. into ( ) )
816825 }
817826
@@ -830,13 +839,12 @@ impl EthereumAdapter {
830839 logger : & Logger ,
831840 block_ptr : BlockPtr ,
832841 ) -> Result < bool , Error > {
833- let block_hash = self
834- . block_hash_by_block_number ( logger, block_ptr. number )
835- . compat ( )
842+ // TODO: This considers null blocks, but we could instead bail if we encounter one as a
843+ // small optimization.
844+ let canonical_block = self
845+ . next_existing_ptr_to_number ( logger, block_ptr. number )
836846 . await ?;
837- block_hash
838- . ok_or_else ( || anyhow ! ( "Ethereum node is missing block #{}" , block_ptr. number) )
839- . map ( |block_hash| block_hash == block_ptr. hash_as_h256 ( ) )
847+ Ok ( canonical_block == block_ptr)
840848 }
841849
842850 pub ( crate ) fn logs_in_block_range (
@@ -1079,6 +1087,16 @@ impl EthereumAdapter {
10791087 }
10801088}
10811089
1090+ // Detects null blocks as can occur on Filecoin EVM chains, by checking for the FEVM-specific
1091+ // error returned when requesting such a null round. Ideally there should be a defined reponse or
1092+ // message for this case, or a check that is less dependent on the Filecoin implementation.
1093+ fn detect_null_block < T > ( res : & Result < T , Error > ) -> bool {
1094+ match res {
1095+ Ok ( _) => false ,
1096+ Err ( e) => e. to_string ( ) . contains ( "requested epoch was a null round" ) ,
1097+ }
1098+ }
1099+
10821100#[ async_trait]
10831101impl EthereumAdapterTrait for EthereumAdapter {
10841102 fn provider ( & self ) -> & str {
@@ -1363,26 +1381,6 @@ impl EthereumAdapterTrait for EthereumAdapter {
13631381 Box :: pin ( block_future)
13641382 }
13651383
1366- fn block_pointer_from_number (
1367- & self ,
1368- logger : & Logger ,
1369- block_number : BlockNumber ,
1370- ) -> Box < dyn Future < Item = BlockPtr , Error = IngestorError > + Send > {
1371- Box :: new (
1372- self . block_hash_by_block_number ( logger, block_number)
1373- . and_then ( move |block_hash_opt| {
1374- block_hash_opt. ok_or_else ( || {
1375- anyhow ! (
1376- "Ethereum node could not find start block hash by block number {}" ,
1377- & block_number
1378- )
1379- } )
1380- } )
1381- . from_err ( )
1382- . map ( move |block_hash| BlockPtr :: from ( ( block_hash, block_number) ) ) ,
1383- )
1384- }
1385-
13861384 fn block_hash_by_block_number (
13871385 & self ,
13881386 logger : & Logger ,
@@ -1448,6 +1446,54 @@ impl EthereumAdapterTrait for EthereumAdapter {
14481446 Box :: new ( self . code ( logger, address, block_ptr) )
14491447 }
14501448
1449+ async fn next_existing_ptr_to_number (
1450+ & self ,
1451+ logger : & Logger ,
1452+ block_number : BlockNumber ,
1453+ ) -> Result < BlockPtr , Error > {
1454+ let mut next_number = block_number;
1455+ loop {
1456+ let retry_log_message = format ! (
1457+ "eth_getBlockByNumber RPC call for block number {}" ,
1458+ next_number
1459+ ) ;
1460+ let web3 = self . web3 . clone ( ) ;
1461+ let logger = logger. clone ( ) ;
1462+ let res = retry ( retry_log_message, & logger)
1463+ . when ( |res| !res. is_ok ( ) && !detect_null_block ( res) )
1464+ . no_limit ( )
1465+ . timeout_secs ( ENV_VARS . json_rpc_timeout . as_secs ( ) )
1466+ . run ( move || {
1467+ let web3 = web3. cheap_clone ( ) ;
1468+ async move {
1469+ web3. eth ( )
1470+ . block ( BlockId :: Number ( next_number. into ( ) ) )
1471+ . await
1472+ . map ( |block_opt| block_opt. and_then ( |block| block. hash ) )
1473+ . map_err ( Error :: from)
1474+ }
1475+ } )
1476+ . await
1477+ . map_err ( move |e| {
1478+ e. into_inner ( ) . unwrap_or_else ( move || {
1479+ anyhow ! (
1480+ "Ethereum node took too long to return data for block #{}" ,
1481+ next_number
1482+ )
1483+ } )
1484+ } ) ;
1485+ if detect_null_block ( & res) {
1486+ next_number += 1 ;
1487+ continue ;
1488+ }
1489+ return match res {
1490+ Ok ( Some ( hash) ) => Ok ( BlockPtr :: new ( hash. into ( ) , next_number) ) ,
1491+ Ok ( None ) => Err ( anyhow ! ( "Block {} does not contain hash" , next_number) ) ,
1492+ Err ( e) => Err ( e) ,
1493+ } ;
1494+ }
1495+ }
1496+
14511497 async fn contract_call (
14521498 & self ,
14531499 logger : & Logger ,
@@ -1652,9 +1698,10 @@ impl EthereumAdapterTrait for EthereumAdapter {
16521698 }
16531699}
16541700
1655- /// Returns blocks with triggers, corresponding to the specified range and filters.
1701+ /// Returns blocks with triggers, corresponding to the specified range and filters; and the resolved
1702+ /// `to` block, which is the nearest non-null block greater than or equal to the passed `to` block.
16561703/// If a block contains no triggers, there may be no corresponding item in the stream.
1657- /// However the `to` block will always be present, even if triggers are empty.
1704+ /// However the (resolved) `to` block will always be present, even if triggers are empty.
16581705///
16591706/// Careful: don't use this function without considering race conditions.
16601707/// Chain reorgs could happen at any time, and could affect the answer received.
@@ -1674,7 +1721,7 @@ pub(crate) async fn blocks_with_triggers(
16741721 to : BlockNumber ,
16751722 filter : & TriggerFilter ,
16761723 unified_api_version : UnifiedMappingApiVersion ,
1677- ) -> Result < Vec < BlockWithTriggers < crate :: Chain > > , Error > {
1724+ ) -> Result < ( Vec < BlockWithTriggers < crate :: Chain > > , BlockNumber ) , Error > {
16781725 // Each trigger filter needs to be queried for the same block range
16791726 // and the blocks yielded need to be deduped. If any error occurs
16801727 // while searching for a trigger type, the entire operation fails.
@@ -1685,6 +1732,13 @@ pub(crate) async fn blocks_with_triggers(
16851732 let trigger_futs: FuturesUnordered < BoxFuture < Result < Vec < EthereumTrigger > , anyhow:: Error > > > =
16861733 FuturesUnordered :: new ( ) ;
16871734
1735+ // Resolve the nearest non-null "to" block
1736+ debug ! ( logger, "Finding nearest valid `to` block to {}" , to) ;
1737+
1738+ let to_ptr = eth. next_existing_ptr_to_number ( & logger, to) . await ?;
1739+ let to_hash = to_ptr. hash_as_h256 ( ) ;
1740+ let to = to_ptr. block_number ( ) ;
1741+
16881742 // This is for `start` triggers which can be initialization handlers which needs to be run
16891743 // before all other triggers
16901744 if filter. block . trigger_every_block {
@@ -1753,28 +1807,11 @@ pub(crate) async fn blocks_with_triggers(
17531807 trigger_futs. push ( block_future)
17541808 }
17551809
1756- // Get hash for "to" block
1757- let to_hash_fut = eth
1758- . block_hash_by_block_number ( & logger, to)
1759- . and_then ( |hash| match hash {
1760- Some ( hash) => Ok ( hash) ,
1761- None => {
1762- warn ! ( logger,
1763- "Ethereum endpoint is behind" ;
1764- "url" => eth. provider( )
1765- ) ;
1766- bail ! ( "Block {} not found in the chain" , to)
1767- }
1768- } )
1769- . compat ( ) ;
1770-
1771- // Join on triggers and block hash resolution
1772- let ( triggers, to_hash) = futures03:: join!( trigger_futs. try_concat( ) , to_hash_fut) ;
1773-
1774- // Unpack and handle possible errors in the previously joined futures
1775- let triggers =
1776- triggers. with_context ( || format ! ( "Failed to obtain triggers for block {}" , to) ) ?;
1777- let to_hash = to_hash. with_context ( || format ! ( "Failed to infer hash for block {}" , to) ) ?;
1810+ // Join on triggers, unpack and handle possible errors
1811+ let triggers = trigger_futs
1812+ . try_concat ( )
1813+ . await
1814+ . with_context ( || format ! ( "Failed to obtain triggers for block {}" , to) ) ?;
17781815
17791816 let mut block_hashes: HashSet < H256 > =
17801817 triggers. iter ( ) . map ( EthereumTrigger :: block_hash) . collect ( ) ;
@@ -1839,7 +1876,7 @@ pub(crate) async fn blocks_with_triggers(
18391876 ) ) ;
18401877 }
18411878
1842- Ok ( blocks)
1879+ Ok ( ( blocks, to ) )
18431880}
18441881
18451882pub ( crate ) async fn get_calls (
0 commit comments