diff --git a/plugins/chain_interface/include/eosio/chain/plugin_interface.hpp b/plugins/chain_interface/include/eosio/chain/plugin_interface.hpp index 5463dd55e0a..bd5d950172b 100644 --- a/plugins/chain_interface/include/eosio/chain/plugin_interface.hpp +++ b/plugins/chain_interface/include/eosio/chain/plugin_interface.hpp @@ -45,6 +45,7 @@ namespace eosio { namespace chain { namespace plugin_interface { namespace methods { // synchronously push a block/trx to a single provider using block_sync = method_decl&), first_provider_policy>; + using blockvault_sync = method_decl; using transaction_async = method_decl), first_provider_policy>; } } diff --git a/plugins/chain_plugin/chain_plugin.cpp b/plugins/chain_plugin/chain_plugin.cpp index 8d8193f4b3d..81d78bdf1df 100644 --- a/plugins/chain_plugin/chain_plugin.cpp +++ b/plugins/chain_plugin/chain_plugin.cpp @@ -177,6 +177,7 @@ class chain_plugin_impl { ,applied_transaction_channel(app().get_channel()) ,incoming_block_channel(app().get_channel()) ,incoming_block_sync_method(app().get_method()) + ,incoming_blockvault_sync_method(app().get_method()) ,incoming_transaction_async_method(app().get_method()) {} @@ -208,6 +209,7 @@ class chain_plugin_impl { // retained references to methods for easy calling incoming::methods::block_sync::method_type& incoming_block_sync_method; + incoming::methods::blockvault_sync::method_type& incoming_blockvault_sync_method; incoming::methods::transaction_async::method_type& incoming_transaction_async_method; // method provider handles diff --git a/plugins/chain_plugin/include/eosio/chain_plugin/blockvault_sync_strategy.hpp b/plugins/chain_plugin/include/eosio/chain_plugin/blockvault_sync_strategy.hpp index c12c5ac7153..b5268063665 100644 --- a/plugins/chain_plugin/include/eosio/chain_plugin/blockvault_sync_strategy.hpp +++ b/plugins/chain_plugin/include/eosio/chain_plugin/blockvault_sync_strategy.hpp @@ -45,11 +45,15 @@ struct blockvault_sync_strategy : public sync_callback { ilog("Received no data from blockvault."); run_startup(); } + + ilog("Sync from blockvault completed. ${snap}. ${blks} blocks received. ${ulnk} blocks unlinkable", + ("snap", _received_snapshot ? "Got snapshot" : "No snapshot") + ("blks", _num_blocks_received)("ulnk", _num_unlinkable_blocks)); } void on_snapshot(const char* snapshot_filename) override final { ilog("Received snapshot from blockvault ${fn}", ("fn", snapshot_filename)); - EOS_ASSERT(!_received_snapshot, plugin_exception, "Received multiple snapshots from blockvault.", ); + EOS_ASSERT(!_received_snapshot, plugin_exception, "Received multiple snapshots from blockvault." ); _received_snapshot = true; if (_check_shutdown()) { @@ -63,9 +67,15 @@ struct blockvault_sync_strategy : public sync_callback { _startup_run = true; infile.close(); + + _snapshot_height = _blockchain_provider.chain->head_block_num(); } void on_block(eosio::chain::signed_block_ptr block) override final { + if (0 == (_num_blocks_received % 100)) { + dlog("Received block number ${bn}", ("bn", block->block_num())); + } + if (_check_shutdown()) { _shutdown(); } @@ -75,8 +85,14 @@ struct blockvault_sync_strategy : public sync_callback { } try { + ++_num_blocks_received; - _blockchain_provider.incoming_block_sync_method(block, block->calculate_id()); + auto rc = _blockchain_provider.incoming_blockvault_sync_method(block, + !(_received_snapshot && block->block_num() == _snapshot_height +1)); + + EOS_ASSERT(rc, plugin_exception, + "Unable to sync block from blockvault, block num=${bnum}, block id=${bid}", + ("bnum", block->block_num())("bid", block->calculate_id())); } catch (unlinkable_block_exception& e) { if (block->block_num() == 2) { elog("Received unlinkable block 2. Please double check if --genesis-json and --genesis-timestamp are " @@ -96,6 +112,7 @@ struct blockvault_sync_strategy : public sync_callback { bool _received_snapshot; uint32_t _num_unlinkable_blocks = 0; uint32_t _num_blocks_received = 0; + uint32_t _snapshot_height = 0; }; } // namespace blockvault diff --git a/plugins/chain_plugin/test/test_blockvault_sync_strategy.cpp b/plugins/chain_plugin/test/test_blockvault_sync_strategy.cpp index 167ed59d28f..a82fa14eddd 100644 --- a/plugins/chain_plugin/test/test_blockvault_sync_strategy.cpp +++ b/plugins/chain_plugin/test/test_blockvault_sync_strategy.cpp @@ -43,6 +43,9 @@ struct mock_chain_t { std::shared_ptr _reader; bool _startup_reader_called; + + uint32_t _head_block_num = 0; + uint32_t head_block_num() {return _head_block_num;} }; struct mock_blockvault_t : public block_vault_interface { @@ -70,9 +73,8 @@ struct mock_chain_plugin_t { chain = std::make_unique(); } - bool incoming_block_sync_method(const chain::signed_block_ptr& block, const chain::block_id_type& id) { + bool incoming_blockvault_sync_method(const chain::signed_block_ptr& block, bool check_connectivity) { _block = block; - _id = id; return _accept_block_rc; } @@ -147,7 +149,6 @@ BOOST_FIXTURE_TEST_CASE(on_block_no_snapshot, TESTER) { BOOST_TEST(plugin._startup_non_snapshot_called); BOOST_TEST(!plugin.chain->_startup_reader_called); BOOST_TEST(plugin._block->calculate_id() == b->calculate_id()); - BOOST_TEST(plugin._block->calculate_id() == plugin._id); } FC_LOG_AND_RETHROW() } diff --git a/plugins/producer_plugin/producer_plugin.cpp b/plugins/producer_plugin/producer_plugin.cpp index 2e2de5f16c4..bb57e6b9429 100644 --- a/plugins/producer_plugin/producer_plugin.cpp +++ b/plugins/producer_plugin/producer_plugin.cpp @@ -176,6 +176,7 @@ class producer_plugin_impl : public std::enable_shared_from_thischain(); + + const auto& id = block->calculate_id(); + auto blk_num = block->block_num(); + + fc_dlog(_log, "syncing blockvault block ${n} ${id}", ("n", blk_num)("id", id)); + + if (check_connectivity) { + auto previous = chain.fetch_block_by_id(block->previous); + if (!previous) { + dlog("Don't have previous block for block number ${bn}, looking for block id ${pbi}", + ("bn", block->block_num())("pbi", block->previous)); + return true; + } + } + + // start processing of block + auto bsf = chain.create_block_state_future( id, block ); + + // abort the pending block + _unapplied_transactions.add_aborted( chain.abort_block() ); + + // push the new block + auto handle_error = [&](const auto& e) + { + elog((e.to_detail_string())); + throw; + }; + + try { + block_state_ptr blk_state = chain.push_block( bsf, [this]( const branch_type& forked_branch ) { + _unapplied_transactions.add_forked( forked_branch ); + }, [this]( const transaction_id_type& id ) { + return _unapplied_transactions.get_trx( id ); + } ); + } catch ( const guard_exception& e ) { + chain_plugin::handle_guard_exception(e); + return false; + } catch ( const std::bad_alloc& ) { + chain_plugin::handle_bad_alloc(); + } catch ( boost::interprocess::bad_alloc& ) { + chain_plugin::handle_db_exhaustion(); + } catch( const fc::exception& e ) { + handle_error(e); + } catch (const std::exception& e) { + handle_error(fc::std_exception_wrapper::from_current_exception(e)); + } + + return true; + } + bool on_incoming_block(const signed_block_ptr& block, const std::optional& block_id) { auto& chain = chain_plug->chain(); - if ( _pending_block_mode == pending_block_mode::producing ) { + if ( _pending_block_mode == pending_block_mode::producing) { fc_wlog( _log, "dropped incoming block #${num} id: ${id}", ("num", block->block_num())("id", block_id ? (*block_id).str() : "UNKNOWN") ); return false; @@ -813,6 +866,11 @@ void producer_plugin::plugin_initialize(const boost::program_options::variables_ return my->on_incoming_block(block, block_id); }); + my->_incoming_blockvault_sync_provider = app().get_method().register_provider( + [this](const signed_block_ptr& block, bool check_connectivity) { + return my->on_sync_block(block, check_connectivity); + }); + my->_incoming_transaction_async_provider = app().get_method().register_provider( [this](const packed_transaction_ptr& trx, bool persist_until_expired, next_function next) -> void { return my->on_incoming_transaction_async(trx, persist_until_expired, next );