Skip to content

Commit 897cfa8

Browse files
incrypto32dborehamroysc
authored
Handle null blocks from Filecoin EVM (#5294)
Accommodate Filecoin EVM null blocks --------- Co-authored-by: David Boreham <david@bozemanpass.com> Co-authored-by: Roy Crihfield <roy@manteia.ltd>
1 parent 1c5ded5 commit 897cfa8

20 files changed

Lines changed: 373 additions & 169 deletions

File tree

.github/workflows/ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -223,4 +223,4 @@ jobs:
223223
uses: actions-rs/cargo@v1
224224
with:
225225
command: check
226-
args: --release
226+
args: --release

chain/arweave/src/chain.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
192192
_from: BlockNumber,
193193
_to: BlockNumber,
194194
_filter: &TriggerFilter,
195-
) -> Result<Vec<BlockWithTriggers<Chain>>, Error> {
195+
) -> Result<(Vec<BlockWithTriggers<Chain>>, BlockNumber), Error> {
196196
panic!("Should never be called since not used by FirehoseBlockStream")
197197
}
198198

@@ -241,6 +241,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
241241
&self,
242242
_ptr: BlockPtr,
243243
_offset: BlockNumber,
244+
_root: Option<BlockHash>,
244245
) -> Result<Option<codec::Block>, Error> {
245246
panic!("Should never be called since FirehoseBlockStream cannot resolve it")
246247
}

chain/cosmos/src/chain.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
186186
&self,
187187
_ptr: BlockPtr,
188188
_offset: BlockNumber,
189+
_root: Option<BlockHash>,
189190
) -> Result<Option<codec::Block>, Error> {
190191
panic!("Should never be called since not used by FirehoseBlockStream")
191192
}
@@ -195,7 +196,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
195196
_from: BlockNumber,
196197
_to: BlockNumber,
197198
_filter: &TriggerFilter,
198-
) -> Result<Vec<BlockWithTriggers<Chain>>, Error> {
199+
) -> Result<(Vec<BlockWithTriggers<Chain>>, BlockNumber), Error> {
199200
panic!("Should never be called since not used by FirehoseBlockStream")
200201
}
201202

chain/ethereum/src/adapter.rs

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1140,13 +1140,6 @@ pub trait EthereumAdapter: Send + Sync + 'static {
11401140
Box<dyn std::future::Future<Output = Result<EthereumBlock, bc::IngestorError>> + Send + '_>,
11411141
>;
11421142

1143-
/// Load block pointer for the specified `block number`.
1144-
fn block_pointer_from_number(
1145-
&self,
1146-
logger: &Logger,
1147-
block_number: BlockNumber,
1148-
) -> Box<dyn Future<Item = BlockPtr, Error = bc::IngestorError> + Send>;
1149-
11501143
/// Find a block by its number, according to the Ethereum node.
11511144
///
11521145
/// Careful: don't use this function without considering race conditions.
@@ -1162,6 +1155,17 @@ pub trait EthereumAdapter: Send + Sync + 'static {
11621155
block_number: BlockNumber,
11631156
) -> Box<dyn Future<Item = Option<H256>, Error = Error> + Send>;
11641157

1158+
/// Finds the hash and number of the lowest non-null block with height greater than or equal to
1159+
/// the given number.
1160+
///
1161+
/// Note that the same caveats on reorgs apply as for `block_hash_by_block_number`, and must
1162+
/// also be considered for the resolved block, in case it is higher than the requested number.
1163+
async fn next_existing_ptr_to_number(
1164+
&self,
1165+
logger: &Logger,
1166+
block_number: BlockNumber,
1167+
) -> Result<BlockPtr, Error>;
1168+
11651169
/// Call the function of a smart contract. A return of `None` indicates
11661170
/// that the call reverted. The returned `CallSource` indicates where
11671171
/// the result came from for accounting purposes

chain/ethereum/src/chain.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -465,9 +465,9 @@ impl Blockchain for Chain {
465465
.clone();
466466

467467
adapter
468-
.block_pointer_from_number(logger, number)
469-
.compat()
468+
.next_existing_ptr_to_number(logger, number)
470469
.await
470+
.map_err(From::from)
471471
}
472472
}
473473
}
@@ -673,7 +673,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
673673
from: BlockNumber,
674674
to: BlockNumber,
675675
filter: &TriggerFilter,
676-
) -> Result<Vec<BlockWithTriggers<Chain>>, Error> {
676+
) -> Result<(Vec<BlockWithTriggers<Chain>>, BlockNumber), Error> {
677677
blocks_with_triggers(
678678
self.chain_client.rpc()?.cheapest_with(&self.capabilities)?,
679679
self.logger.clone(),
@@ -707,7 +707,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
707707
BlockFinality::Final(_) => {
708708
let adapter = self.chain_client.rpc()?.cheapest_with(&self.capabilities)?;
709709
let block_number = block.number() as BlockNumber;
710-
let blocks = blocks_with_triggers(
710+
let (blocks, _) = blocks_with_triggers(
711711
adapter,
712712
logger.clone(),
713713
self.chain_store.clone(),
@@ -747,11 +747,12 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
747747
&self,
748748
ptr: BlockPtr,
749749
offset: BlockNumber,
750+
root: Option<BlockHash>,
750751
) -> Result<Option<BlockFinality>, Error> {
751752
let block: Option<EthereumBlock> = self
752753
.chain_store
753754
.cheap_clone()
754-
.ancestor_block(ptr, offset)
755+
.ancestor_block(ptr, offset, root)
755756
.await?
756757
.map(json::from_value)
757758
.transpose()?;

chain/ethereum/src/ethereum_adapter.rs

Lines changed: 89 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -791,6 +791,7 @@ impl EthereumAdapter {
791791
stream::iter_ok::<_, Error>(block_nums.into_iter().map(move |block_num| {
792792
let web3 = web3.clone();
793793
retry(format!("load block ptr {}", block_num), &logger)
794+
.when(|res| !res.is_ok() && !detect_null_block(res))
794795
.no_limit()
795796
.timeout_secs(ENV_VARS.json_rpc_timeout.as_secs())
796797
.run(move || {
@@ -810,8 +811,16 @@ impl EthereumAdapter {
810811
.boxed()
811812
.compat()
812813
.from_err()
814+
.then(|res| {
815+
if detect_null_block(&res) {
816+
Ok(None)
817+
} else {
818+
Some(res).transpose()
819+
}
820+
})
813821
}))
814822
.buffered(ENV_VARS.block_batch_size)
823+
.filter_map(|b| b)
815824
.map(|b| b.into())
816825
}
817826

@@ -830,13 +839,12 @@ impl EthereumAdapter {
830839
logger: &Logger,
831840
block_ptr: BlockPtr,
832841
) -> Result<bool, Error> {
833-
let block_hash = self
834-
.block_hash_by_block_number(logger, block_ptr.number)
835-
.compat()
842+
// TODO: This considers null blocks, but we could instead bail if we encounter one as a
843+
// small optimization.
844+
let canonical_block = self
845+
.next_existing_ptr_to_number(logger, block_ptr.number)
836846
.await?;
837-
block_hash
838-
.ok_or_else(|| anyhow!("Ethereum node is missing block #{}", block_ptr.number))
839-
.map(|block_hash| block_hash == block_ptr.hash_as_h256())
847+
Ok(canonical_block == block_ptr)
840848
}
841849

842850
pub(crate) fn logs_in_block_range(
@@ -1079,6 +1087,16 @@ impl EthereumAdapter {
10791087
}
10801088
}
10811089

1090+
// Detects null blocks as can occur on Filecoin EVM chains, by checking for the FEVM-specific
1091+
// error returned when requesting such a null round. Ideally there should be a defined reponse or
1092+
// message for this case, or a check that is less dependent on the Filecoin implementation.
1093+
fn detect_null_block<T>(res: &Result<T, Error>) -> bool {
1094+
match res {
1095+
Ok(_) => false,
1096+
Err(e) => e.to_string().contains("requested epoch was a null round"),
1097+
}
1098+
}
1099+
10821100
#[async_trait]
10831101
impl EthereumAdapterTrait for EthereumAdapter {
10841102
fn provider(&self) -> &str {
@@ -1363,26 +1381,6 @@ impl EthereumAdapterTrait for EthereumAdapter {
13631381
Box::pin(block_future)
13641382
}
13651383

1366-
fn block_pointer_from_number(
1367-
&self,
1368-
logger: &Logger,
1369-
block_number: BlockNumber,
1370-
) -> Box<dyn Future<Item = BlockPtr, Error = IngestorError> + Send> {
1371-
Box::new(
1372-
self.block_hash_by_block_number(logger, block_number)
1373-
.and_then(move |block_hash_opt| {
1374-
block_hash_opt.ok_or_else(|| {
1375-
anyhow!(
1376-
"Ethereum node could not find start block hash by block number {}",
1377-
&block_number
1378-
)
1379-
})
1380-
})
1381-
.from_err()
1382-
.map(move |block_hash| BlockPtr::from((block_hash, block_number))),
1383-
)
1384-
}
1385-
13861384
fn block_hash_by_block_number(
13871385
&self,
13881386
logger: &Logger,
@@ -1448,6 +1446,54 @@ impl EthereumAdapterTrait for EthereumAdapter {
14481446
Box::new(self.code(logger, address, block_ptr))
14491447
}
14501448

1449+
async fn next_existing_ptr_to_number(
1450+
&self,
1451+
logger: &Logger,
1452+
block_number: BlockNumber,
1453+
) -> Result<BlockPtr, Error> {
1454+
let mut next_number = block_number;
1455+
loop {
1456+
let retry_log_message = format!(
1457+
"eth_getBlockByNumber RPC call for block number {}",
1458+
next_number
1459+
);
1460+
let web3 = self.web3.clone();
1461+
let logger = logger.clone();
1462+
let res = retry(retry_log_message, &logger)
1463+
.when(|res| !res.is_ok() && !detect_null_block(res))
1464+
.no_limit()
1465+
.timeout_secs(ENV_VARS.json_rpc_timeout.as_secs())
1466+
.run(move || {
1467+
let web3 = web3.cheap_clone();
1468+
async move {
1469+
web3.eth()
1470+
.block(BlockId::Number(next_number.into()))
1471+
.await
1472+
.map(|block_opt| block_opt.and_then(|block| block.hash))
1473+
.map_err(Error::from)
1474+
}
1475+
})
1476+
.await
1477+
.map_err(move |e| {
1478+
e.into_inner().unwrap_or_else(move || {
1479+
anyhow!(
1480+
"Ethereum node took too long to return data for block #{}",
1481+
next_number
1482+
)
1483+
})
1484+
});
1485+
if detect_null_block(&res) {
1486+
next_number += 1;
1487+
continue;
1488+
}
1489+
return match res {
1490+
Ok(Some(hash)) => Ok(BlockPtr::new(hash.into(), next_number)),
1491+
Ok(None) => Err(anyhow!("Block {} does not contain hash", next_number)),
1492+
Err(e) => Err(e),
1493+
};
1494+
}
1495+
}
1496+
14511497
async fn contract_call(
14521498
&self,
14531499
logger: &Logger,
@@ -1652,9 +1698,10 @@ impl EthereumAdapterTrait for EthereumAdapter {
16521698
}
16531699
}
16541700

1655-
/// Returns blocks with triggers, corresponding to the specified range and filters.
1701+
/// Returns blocks with triggers, corresponding to the specified range and filters; and the resolved
1702+
/// `to` block, which is the nearest non-null block greater than or equal to the passed `to` block.
16561703
/// If a block contains no triggers, there may be no corresponding item in the stream.
1657-
/// However the `to` block will always be present, even if triggers are empty.
1704+
/// However the (resolved) `to` block will always be present, even if triggers are empty.
16581705
///
16591706
/// Careful: don't use this function without considering race conditions.
16601707
/// Chain reorgs could happen at any time, and could affect the answer received.
@@ -1674,7 +1721,7 @@ pub(crate) async fn blocks_with_triggers(
16741721
to: BlockNumber,
16751722
filter: &TriggerFilter,
16761723
unified_api_version: UnifiedMappingApiVersion,
1677-
) -> Result<Vec<BlockWithTriggers<crate::Chain>>, Error> {
1724+
) -> Result<(Vec<BlockWithTriggers<crate::Chain>>, BlockNumber), Error> {
16781725
// Each trigger filter needs to be queried for the same block range
16791726
// and the blocks yielded need to be deduped. If any error occurs
16801727
// while searching for a trigger type, the entire operation fails.
@@ -1685,6 +1732,13 @@ pub(crate) async fn blocks_with_triggers(
16851732
let trigger_futs: FuturesUnordered<BoxFuture<Result<Vec<EthereumTrigger>, anyhow::Error>>> =
16861733
FuturesUnordered::new();
16871734

1735+
// Resolve the nearest non-null "to" block
1736+
debug!(logger, "Finding nearest valid `to` block to {}", to);
1737+
1738+
let to_ptr = eth.next_existing_ptr_to_number(&logger, to).await?;
1739+
let to_hash = to_ptr.hash_as_h256();
1740+
let to = to_ptr.block_number();
1741+
16881742
// This is for `start` triggers which can be initialization handlers which needs to be run
16891743
// before all other triggers
16901744
if filter.block.trigger_every_block {
@@ -1753,28 +1807,11 @@ pub(crate) async fn blocks_with_triggers(
17531807
trigger_futs.push(block_future)
17541808
}
17551809

1756-
// Get hash for "to" block
1757-
let to_hash_fut = eth
1758-
.block_hash_by_block_number(&logger, to)
1759-
.and_then(|hash| match hash {
1760-
Some(hash) => Ok(hash),
1761-
None => {
1762-
warn!(logger,
1763-
"Ethereum endpoint is behind";
1764-
"url" => eth.provider()
1765-
);
1766-
bail!("Block {} not found in the chain", to)
1767-
}
1768-
})
1769-
.compat();
1770-
1771-
// Join on triggers and block hash resolution
1772-
let (triggers, to_hash) = futures03::join!(trigger_futs.try_concat(), to_hash_fut);
1773-
1774-
// Unpack and handle possible errors in the previously joined futures
1775-
let triggers =
1776-
triggers.with_context(|| format!("Failed to obtain triggers for block {}", to))?;
1777-
let to_hash = to_hash.with_context(|| format!("Failed to infer hash for block {}", to))?;
1810+
// Join on triggers, unpack and handle possible errors
1811+
let triggers = trigger_futs
1812+
.try_concat()
1813+
.await
1814+
.with_context(|| format!("Failed to obtain triggers for block {}", to))?;
17781815

17791816
let mut block_hashes: HashSet<H256> =
17801817
triggers.iter().map(EthereumTrigger::block_hash).collect();
@@ -1839,7 +1876,7 @@ pub(crate) async fn blocks_with_triggers(
18391876
));
18401877
}
18411878

1842-
Ok(blocks)
1879+
Ok((blocks, to))
18431880
}
18441881

18451882
pub(crate) async fn get_calls(

chain/near/src/chain.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -316,7 +316,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
316316
_from: BlockNumber,
317317
_to: BlockNumber,
318318
_filter: &TriggerFilter,
319-
) -> Result<Vec<BlockWithTriggers<Chain>>, Error> {
319+
) -> Result<(Vec<BlockWithTriggers<Chain>>, BlockNumber), Error> {
320320
panic!("Should never be called since not used by FirehoseBlockStream")
321321
}
322322

@@ -390,6 +390,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
390390
&self,
391391
_ptr: BlockPtr,
392392
_offset: BlockNumber,
393+
_root: Option<BlockHash>,
393394
) -> Result<Option<codec::Block>, Error> {
394395
panic!("Should never be called since FirehoseBlockStream cannot resolve it")
395396
}

chain/starknet/src/chain.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
358358
&self,
359359
_ptr: BlockPtr,
360360
_offset: BlockNumber,
361+
_root: Option<BlockHash>,
361362
) -> Result<Option<codec::Block>, Error> {
362363
panic!("Should never be called since FirehoseBlockStream cannot resolve it")
363364
}
@@ -373,7 +374,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
373374
_from: BlockNumber,
374375
_to: BlockNumber,
375376
_filter: &crate::adapter::TriggerFilter,
376-
) -> Result<Vec<BlockWithTriggers<Chain>>, Error> {
377+
) -> Result<(Vec<BlockWithTriggers<Chain>>, BlockNumber), Error> {
377378
panic!("Should never be called since not used by FirehoseBlockStream")
378379
}
379380

0 commit comments

Comments
 (0)