Skip to content

Commit

Permalink
Add a flag for storing invalid blocks (sigp#4194)
Browse files Browse the repository at this point in the history
## Issue Addressed

NA

## Proposed Changes

Adds a flag to store invalid blocks on disk for teh debugz. Only *some* invalid blocks are stored, those which:

- Were received via gossip (rather than RPC, for instance)
    - This keeps things simple to start with and should capture most blocks.
- Passed gossip verification
    - This reduces the ability for random people to fill up our disk. A proposer signature is required to write something to disk.

## Additional Info

It's possible that we'll store blocks that aren't necessarily invalid, but we had an internal error during verification. Those blocks seem like they might be useful sometimes.
  • Loading branch information
paulhauner authored and isaac.asimov committed Jul 13, 2023
1 parent b939894 commit 4b04910
Show file tree
Hide file tree
Showing 9 changed files with 173 additions and 27 deletions.
4 changes: 4 additions & 0 deletions beacon_node/lighthouse_network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,9 @@ pub struct Config {

/// Configuration for the outbound rate limiter (requests made by this node).
pub outbound_rate_limiter_config: Option<OutboundRateLimiterConfig>,

/// Configures if/where invalid blocks should be stored.
pub invalid_block_storage: Option<PathBuf>,
}

impl Config {
Expand Down Expand Up @@ -329,6 +332,7 @@ impl Default for Config {
metrics_enabled: false,
enable_light_client_server: false,
outbound_rate_limiter_config: None,
invalid_block_storage: None,
}
}
}
Expand Down
55 changes: 36 additions & 19 deletions beacon_node/network/src/beacon_processor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ use logging::TimeLatch;
use slog::{crit, debug, error, trace, warn, Logger};
use std::collections::VecDeque;
use std::future::Future;
use std::path::PathBuf;
use std::pin::Pin;
use std::sync::{Arc, Weak};
use std::task::Context;
Expand Down Expand Up @@ -982,6 +983,13 @@ impl<T: BeaconChainTypes> Stream for InboundEvents<T> {
}
}

/// Defines if and where we will store the SSZ files of invalid blocks.
#[derive(Clone)]
pub enum InvalidBlockStorage {
Enabled(PathBuf),
Disabled,
}

/// A mutli-threaded processor for messages received on the network
/// that need to be processed by the `BeaconChain`
///
Expand All @@ -995,6 +1003,7 @@ pub struct BeaconProcessor<T: BeaconChainTypes> {
pub max_workers: usize,
pub current_workers: usize,
pub importing_blocks: DuplicateCache,
pub invalid_block_storage: InvalidBlockStorage,
pub log: Logger,
}

Expand Down Expand Up @@ -1676,32 +1685,40 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
peer_client,
block,
seen_timestamp,
} => task_spawner.spawn_async(async move {
worker
.process_gossip_block(
message_id,
peer_id,
peer_client,
block,
work_reprocessing_tx,
duplicate_cache,
seen_timestamp,
)
.await
}),
} => {
let invalid_block_storage = self.invalid_block_storage.clone();
task_spawner.spawn_async(async move {
worker
.process_gossip_block(
message_id,
peer_id,
peer_client,
block,
work_reprocessing_tx,
duplicate_cache,
invalid_block_storage,
seen_timestamp,
)
.await
})
}
/*
* Import for blocks that we received earlier than their intended slot.
*/
Work::DelayedImportBlock {
peer_id,
block,
seen_timestamp,
} => task_spawner.spawn_async(worker.process_gossip_verified_block(
peer_id,
*block,
work_reprocessing_tx,
seen_timestamp,
)),
} => {
let invalid_block_storage = self.invalid_block_storage.clone();
task_spawner.spawn_async(worker.process_gossip_verified_block(
peer_id,
*block,
work_reprocessing_tx,
invalid_block_storage,
seen_timestamp,
))
}
/*
* Voluntary exits received on gossip.
*/
Expand Down
1 change: 1 addition & 0 deletions beacon_node/network/src/beacon_processor/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ impl TestRig {
max_workers: cmp::max(1, num_cpus::get()),
current_workers: 0,
importing_blocks: duplicate_cache.clone(),
invalid_block_storage: InvalidBlockStorage::Disabled,
log: log.clone(),
}
.spawn_manager(beacon_processor_rx, Some(work_journal_tx));
Expand Down
93 changes: 86 additions & 7 deletions beacon_node/network/src/beacon_processor/worker/gossip_methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,12 @@ use beacon_chain::{
};
use lighthouse_network::{Client, MessageAcceptance, MessageId, PeerAction, PeerId, ReportSource};
use operation_pool::ReceivedPreCapella;
use slog::{crit, debug, error, info, trace, warn};
use slog::{crit, debug, error, info, trace, warn, Logger};
use slot_clock::SlotClock;
use ssz::Encode;
use std::fs;
use std::io::Write;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use store::hot_cold_store::HotColdDBError;
Expand All @@ -34,7 +37,7 @@ use super::{
},
Worker,
};
use crate::beacon_processor::DuplicateCache;
use crate::beacon_processor::{DuplicateCache, InvalidBlockStorage};

/// Set to `true` to introduce stricter penalties for peers who send some types of late consensus
/// messages.
Expand Down Expand Up @@ -663,6 +666,7 @@ impl<T: BeaconChainTypes> Worker<T> {
block: Arc<SignedBeaconBlock<T::EthSpec>>,
reprocess_tx: mpsc::Sender<ReprocessQueueMessage<T>>,
duplicate_cache: DuplicateCache,
invalid_block_storage: InvalidBlockStorage,
seen_duration: Duration,
) {
if let Some(gossip_verified_block) = self
Expand All @@ -683,6 +687,7 @@ impl<T: BeaconChainTypes> Worker<T> {
peer_id,
gossip_verified_block,
reprocess_tx,
invalid_block_storage,
seen_duration,
)
.await;
Expand Down Expand Up @@ -935,28 +940,30 @@ impl<T: BeaconChainTypes> Worker<T> {
peer_id: PeerId,
verified_block: GossipVerifiedBlock<T>,
reprocess_tx: mpsc::Sender<ReprocessQueueMessage<T>>,
invalid_block_storage: InvalidBlockStorage,
// This value is not used presently, but it might come in handy for debugging.
_seen_duration: Duration,
) {
let block: Arc<_> = verified_block.block.clone();
let block_root = verified_block.block_root;

match self
let result = self
.chain
.process_block(
block_root,
verified_block,
CountUnrealized::True,
NotifyExecutionLayer::Yes,
)
.await
{
.await;

match &result {
Ok(block_root) => {
metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_IMPORTED_TOTAL);

if reprocess_tx
.try_send(ReprocessQueueMessage::BlockImported {
block_root,
block_root: *block_root,
parent_root: block.message().parent_root(),
})
.is_err()
Expand Down Expand Up @@ -986,7 +993,11 @@ impl<T: BeaconChainTypes> Worker<T> {
"Block with unknown parent attempted to be processed";
"peer_id" => %peer_id
);
self.send_sync_message(SyncMessage::UnknownBlock(peer_id, block, block_root));
self.send_sync_message(SyncMessage::UnknownBlock(
peer_id,
block.clone(),
block_root,
));
}
Err(ref e @ BlockError::ExecutionPayloadError(ref epe)) if !epe.penalize_peer() => {
debug!(
Expand Down Expand Up @@ -1015,6 +1026,16 @@ impl<T: BeaconChainTypes> Worker<T> {
);
}
};

if let Err(e) = &result {
self.maybe_store_invalid_block(
&invalid_block_storage,
block_root,
&block,
e,
&self.log,
);
}
}

pub fn process_gossip_voluntary_exit(
Expand Down Expand Up @@ -2486,4 +2507,62 @@ impl<T: BeaconChainTypes> Worker<T> {

self.propagate_if_timely(is_timely, message_id, peer_id)
}

/// Stores a block as a SSZ file, if and where `invalid_block_storage` dictates.
fn maybe_store_invalid_block(
&self,
invalid_block_storage: &InvalidBlockStorage,
block_root: Hash256,
block: &SignedBeaconBlock<T::EthSpec>,
error: &BlockError<T::EthSpec>,
log: &Logger,
) {
if let InvalidBlockStorage::Enabled(base_dir) = invalid_block_storage {
let block_path = base_dir.join(format!("{}_{:?}.ssz", block.slot(), block_root));
let error_path = base_dir.join(format!("{}_{:?}.error", block.slot(), block_root));

let write_file = |path: PathBuf, bytes: &[u8]| {
// No need to write the same file twice. For the error file,
// this means that we'll remember the first error message but
// forget the rest.
if path.exists() {
return;
}

// Write to the file.
let write_result = fs::OpenOptions::new()
// Only succeed if the file doesn't already exist. We should
// have checked for this earlier.
.create_new(true)
.write(true)
.open(&path)
.map_err(|e| format!("Failed to open file: {:?}", e))
.map(|mut file| {
file.write_all(bytes)
.map_err(|e| format!("Failed to write file: {:?}", e))
});
if let Err(e) = write_result {
error!(
log,
"Failed to store invalid block/error";
"error" => e,
"path" => ?path,
"root" => ?block_root,
"slot" => block.slot(),
)
} else {
info!(
log,
"Stored invalid block/error ";
"path" => ?path,
"root" => ?block_root,
"slot" => block.slot(),
)
}
};

write_file(block_path, &block.as_ssz_bytes());
write_file(error_path, error.to_string().as_bytes());
}
}
}
4 changes: 3 additions & 1 deletion beacon_node/network/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
#![allow(clippy::unit_arg)]

use crate::beacon_processor::{
BeaconProcessor, WorkEvent as BeaconWorkEvent, MAX_WORK_EVENT_QUEUE_LEN,
BeaconProcessor, InvalidBlockStorage, WorkEvent as BeaconWorkEvent, MAX_WORK_EVENT_QUEUE_LEN,
};
use crate::error;
use crate::service::{NetworkMessage, RequestId};
Expand Down Expand Up @@ -81,6 +81,7 @@ impl<T: BeaconChainTypes> Router<T> {
network_globals: Arc<NetworkGlobals<T::EthSpec>>,
network_send: mpsc::UnboundedSender<NetworkMessage<T::EthSpec>>,
executor: task_executor::TaskExecutor,
invalid_block_storage: InvalidBlockStorage,
log: slog::Logger,
) -> error::Result<mpsc::UnboundedSender<RouterMessage<T::EthSpec>>> {
let message_handler_log = log.new(o!("service"=> "router"));
Expand Down Expand Up @@ -112,6 +113,7 @@ impl<T: BeaconChainTypes> Router<T> {
max_workers: cmp::max(1, num_cpus::get()),
current_workers: 0,
importing_blocks: Default::default(),
invalid_block_storage,
log: log.clone(),
}
.spawn_manager(beacon_processor_receive, None);
Expand Down
8 changes: 8 additions & 0 deletions beacon_node/network/src/service.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::sync::manager::RequestId as SyncId;
use crate::beacon_processor::InvalidBlockStorage;
use crate::persisted_dht::{clear_dht, load_dht, persist_dht};
use crate::router::{Router, RouterMessage};
use crate::subnet_service::SyncCommitteeService;
Expand Down Expand Up @@ -295,6 +296,12 @@ impl<T: BeaconChainTypes> NetworkService<T> {
}
}

let invalid_block_storage = config
.invalid_block_storage
.clone()
.map(InvalidBlockStorage::Enabled)
.unwrap_or(InvalidBlockStorage::Disabled);

// launch derived network services

// router task
Expand All @@ -303,6 +310,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
network_globals.clone(),
network_senders.network_send(),
executor.clone(),
invalid_block_storage,
network_log.clone(),
)?;

Expand Down
9 changes: 9 additions & 0 deletions beacon_node/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1093,4 +1093,13 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
// always using the builder.
.conflicts_with("builder-profit-threshold")
)
.arg(
Arg::with_name("invalid-gossip-verified-blocks-path")
.long("invalid-gossip-verified-blocks-path")
.value_name("PATH")
.help("If a block succeeds gossip validation whilst failing full validation, store \
the block SSZ as a file at this path. This feature is only recommended for \
developers. This directory is not pruned, users should be careful to avoid \
filling up their disks.")
)
}
5 changes: 5 additions & 0 deletions beacon_node/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -793,6 +793,11 @@ pub fn get_config<E: EthSpec>(
client_config.chain.enable_backfill_rate_limiting =
!cli_args.is_present("disable-backfill-rate-limiting");

if let Some(path) = clap_utils::parse_optional(cli_args, "invalid-gossip-verified-blocks-path")?
{
client_config.network.invalid_block_storage = Some(path);
}

Ok(client_config)
}

Expand Down
21 changes: 21 additions & 0 deletions lighthouse/tests/beacon_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2199,3 +2199,24 @@ fn disable_optimistic_finalized_sync() {
assert!(!config.chain.optimistic_finalized_sync);
});
}

#[test]
fn invalid_gossip_verified_blocks_path_default() {
CommandLineTest::new()
.run_with_zero_port()
.with_config(|config| assert_eq!(config.network.invalid_block_storage, None));
}

#[test]
fn invalid_gossip_verified_blocks_path() {
let path = "/home/karlm/naughty-blocks";
CommandLineTest::new()
.flag("invalid-gossip-verified-blocks-path", Some(path))
.run_with_zero_port()
.with_config(|config| {
assert_eq!(
config.network.invalid_block_storage,
Some(PathBuf::from(path))
)
});
}

0 comments on commit 4b04910

Please sign in to comment.