Skip to content

Commit

Permalink
Downloading correct block during state sync
Browse files Browse the repository at this point in the history
State sync was downloading the block at `sync_hash`, but actually needs to download the previous block.
Generally nothing was breaking with downloading at `sync_hash` except that if the next block for some reason cannot be applied or is not known yet, for a while `body_head` was pointing at the non-existing block (specifically the prev of `sync_hash`).

Also added logic to reset the state sync if the header head is two epochs ahead, but this logic doesn't trigger currently because we do not move the header head during state sync. Tracking in #1174
  • Loading branch information
SkidanovAlex committed Aug 15, 2019
1 parent ea785c7 commit ad8551c
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 15 deletions.
1 change: 0 additions & 1 deletion chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,6 @@ impl Chain {
// We can't fail beyond this point because the caller will not process accepted blocks
// and the blocks with missing chunks if this method fails
self.check_orphans(me, hash, block_accepted, block_misses_chunks);
self.check_orphans(me, sync_hash, block_accepted, block_misses_chunks);
Ok(())
}

Expand Down
51 changes: 44 additions & 7 deletions chain/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,11 +268,13 @@ impl Handler<NetworkClientMessages> for ClientActor {
}
NetworkClientMessages::Block(block, peer_id, was_requested) => {
if let SyncStatus::StateSync(sync_hash, _) = &mut self.sync_status {
if block.hash() == *sync_hash {
if let Err(_) = self.chain.save_block(&block) {
error!(target: "client", "Failed to save a block during state sync");
if let Ok(header) = self.chain.get_block_header(sync_hash) {
if block.hash() == header.prev_hash {
if let Err(_) = self.chain.save_block(&block) {
error!(target: "client", "Failed to save a block during state sync");
}
return NetworkClientResponses::NoResponse;
}
return NetworkClientResponses::NoResponse;
}
}
self.receive_block(ctx, block, peer_id, was_requested)
Expand Down Expand Up @@ -1511,14 +1513,46 @@ impl ClientActor {
_ => false,
};
if sync_state {
let me = &self.block_producer.as_ref().map(|x| x.account_id.clone());

let (sync_hash, mut new_shard_sync) = match &self.sync_status {
SyncStatus::StateSync(sync_hash, shard_sync) => {
(sync_hash.clone(), shard_sync.clone())
let mut need_to_restart = false;

if let Ok(sync_block_header) = self.chain.get_block_header(&sync_hash) {
let prev_hash = sync_block_header.prev_hash;

if let Ok(current_epoch) =
self.runtime_adapter.get_epoch_id_from_prev_block(&prev_hash)
{
if let Ok(next_epoch) = self
.runtime_adapter
.get_next_epoch_id_from_prev_block(&prev_hash)
{
if let Ok(header_head) = self.chain.header_head() {
let header_head_epoch = header_head.epoch_id;

if current_epoch != header_head_epoch
&& next_epoch != header_head_epoch
{
error!(target: "client", "Header head is not within two epochs of state sync hash, restarting state sync");
debug!(target: "client", "Current epoch: {:?}, Next epoch: {:?}, Header head epoch: {:?}", current_epoch, next_epoch, header_head_epoch);
need_to_restart = true;
}
}
}
}
}

if need_to_restart {
(unwrap_or_run_later!(self.find_sync_hash()), HashMap::default())
} else {
(sync_hash.clone(), shard_sync.clone())
}
}
_ => (unwrap_or_run_later!(self.find_sync_hash()), HashMap::default()),
};

let me = &self.block_producer.as_ref().map(|x| x.account_id.clone());
let shards_to_sync = (0..self.runtime_adapter.num_shards())
.filter(|x| match me {
Some(me) => {
Expand All @@ -1541,7 +1575,10 @@ impl ClientActor {
most_weight_peer(&self.network_info.most_weight_peers)
{
if fetch_block {
self.request_block_by_hash(sync_hash, peer_info.peer_info.id);
if let Ok(header) = self.chain.get_block_header(&sync_hash) {
let prev_hash = header.prev_hash;
self.request_block_by_hash(prev_hash, peer_info.peer_info.id);
}
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions chain/client/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,13 +434,14 @@ impl StateSync {
let mut sync_need_restart = HashSet::new();

debug!("MOO run state sync tracking shards: {:?}", tracking_shards);
let prev_hash = chain.get_block_header(&sync_hash)?.prev_hash.clone();

let now = Utc::now();
let (request_block, have_block) = if !chain.block_exists(&sync_hash)? {
let (request_block, have_block) = if !chain.block_exists(&prev_hash)? {
match self.last_time_block_requested {
None => (true, false),
Some(last_time) => {
error!(target: "sync", "State sync: block request for {} timed out in {} seconds", sync_hash, STATE_SYNC_TIMEOUT);
error!(target: "sync", "State sync: block request for {} timed out in {} seconds", prev_hash, STATE_SYNC_TIMEOUT);
(now - last_time >= Duration::seconds(STATE_SYNC_TIMEOUT), false)
}
}
Expand Down
19 changes: 14 additions & 5 deletions near/tests/sync_state_nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,9 @@ fn sync_state_nodes_multishard() {
heavy_test(|| {
init_test_logger();

let genesis_config =
let mut genesis_config =
GenesisConfig::test_sharded(vec!["test1", "test2", "test3", "test4"], vec![2, 2]);
genesis_config.epoch_length = 150; // so that by the time test2 joins it is not kicked out yet

let system = System::new("NEAR");

Expand Down Expand Up @@ -186,16 +187,24 @@ fn sync_state_nodes_multishard() {
Ok(Ok(b)) if b.header.height < 101 => {
println!("SECOND STAGE {}", b.header.height)
}
Err(_) => return futures::future::err(()),
_ => {}
Ok(Err(e)) => {
println!("SECOND STAGE ERROR1: {:?}", e);
return futures::future::err(());
}
Err(e) => {
println!("SECOND STAGE ERROR2: {:?}", e);
return futures::future::err(());
}
_ => {
assert!(false);
}
};
futures::future::ok(())
}));
} else {
}
}),
100,
60000,
120000,
)
.start();

Expand Down

0 comments on commit ad8551c

Please sign in to comment.