Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Drop late blocks - develop #8497

Merged
merged 3 commits into from
Jan 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ namespace eosio { namespace chain { namespace plugin_interface {

namespace methods {
// synchronously push a block/trx to a single provider
using block_sync = method_decl<chain_plugin_interface, void(const signed_block_ptr&), first_provider_policy>;
using block_sync = method_decl<chain_plugin_interface, bool(const signed_block_ptr&, const std::optional<block_id_type>&), first_provider_policy>;
using transaction_async = method_decl<chain_plugin_interface, void(const packed_transaction_ptr&, bool, next_function<transaction_trace_ptr>), first_provider_policy>;
}
}
Expand Down
6 changes: 3 additions & 3 deletions plugins/chain_plugin/chain_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1139,8 +1139,8 @@ void chain_apis::read_write::validate() const {
EOS_ASSERT( !db.in_immutable_mode(), missing_chain_api_plugin_exception, "Not allowed, node in read-only mode" );
}

void chain_plugin::accept_block(const signed_block_ptr& block ) {
my->incoming_block_sync_method(block);
bool chain_plugin::accept_block(const signed_block_ptr& block, const block_id_type& id ) {
return my->incoming_block_sync_method(block, id);
}

void chain_plugin::accept_transaction(const chain::packed_transaction_ptr& trx, next_function<chain::transaction_trace_ptr> next) {
Expand Down Expand Up @@ -2072,7 +2072,7 @@ fc::variant read_only::get_block_header_state(const get_block_header_state_param

void read_write::push_block(read_write::push_block_params&& params, next_function<read_write::push_block_results> next) {
try {
app().get_method<incoming::methods::block_sync>()(std::make_shared<signed_block>(std::move(params)));
app().get_method<incoming::methods::block_sync>()(std::make_shared<signed_block>(std::move(params)), {});
next(read_write::push_block_results{});
} catch ( boost::interprocess::bad_alloc& ) {
chain_plugin::handle_db_exhaustion();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -706,7 +706,7 @@ class chain_plugin : public plugin<chain_plugin> {
chain_apis::read_only get_read_only_api() const { return chain_apis::read_only(chain(), get_abi_serializer_max_time()); }
chain_apis::read_write get_read_write_api() { return chain_apis::read_write(chain(), get_abi_serializer_max_time()); }

void accept_block( const chain::signed_block_ptr& block );
bool accept_block( const chain::signed_block_ptr& block, const chain::block_id_type& id );
void accept_transaction(const chain::packed_transaction_ptr& trx, chain::plugin_interface::next_function<chain::transaction_trace_ptr> next);

bool block_is_on_preferred_chain(const chain::block_id_type& block_id);
Expand Down
4 changes: 3 additions & 1 deletion plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2936,8 +2936,9 @@ namespace eosio {

go_away_reason reason = fatal_other;
try {
my_impl->chain_plug->accept_block(msg);
bool accepted = my_impl->chain_plug->accept_block(msg, blk_id);
my_impl->update_chain_info();
if( !accepted ) return;
reason = no_reason;
} catch( const unlinkable_block_exception &ex) {
peer_elog(c, "bad signed_block ${n} ${id}...: ${m}", ("n", blk_num)("id", blk_id.str().substr(8,16))("m",ex.what()));
Expand All @@ -2959,6 +2960,7 @@ namespace eosio {

if( reason == no_reason ) {
boost::asio::post( my_impl->thread_pool->get_executor(), [dispatcher = my_impl->dispatcher.get(), cid=c->connection_id, blk_id, msg]() {
fc_dlog( logger, "accepted signed_block : #${n} ${id}...", ("n", msg->block_num())("id", blk_id.str().substr(8,16)) );
dispatcher->add_peer_block( blk_id, cid );
dispatcher->update_txns_block_num( msg );
});
Expand Down
28 changes: 18 additions & 10 deletions plugins/producer_plugin/producer_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -334,20 +334,26 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
}
};

void on_incoming_block(const signed_block_ptr& block) {
auto id = block->id();
bool on_incoming_block(const signed_block_ptr& block, const std::optional<block_id_type>& block_id) {
auto& chain = chain_plug->chain();
if ( chain.is_building_block() && _pending_block_mode == pending_block_mode::producing ) {
fc_wlog( _log, "dropped incoming block #${num} while producing #${pbn} for ${bt}, id: ${id}",
("num", block->block_num())("pbn", chain.head_block_num() + 1)
("bt", chain.pending_block_time())("id", block_id ? (*block_id).str() : "UNKNOWN") );
return false;
}

const auto& id = block_id ? *block_id : block->id();
auto blk_num = block->block_num();

fc_dlog(_log, "received incoming block ${n} ${id}", ("n", blk_num)("id", id));

EOS_ASSERT( block->timestamp < (fc::time_point::now() + fc::seconds( 7 )), block_from_the_future,
"received a block from the future, ignoring it: ${id}", ("id", id) );

chain::controller& chain = chain_plug->chain();

/* de-dupe here... no point in aborting block if we already know the block */
auto existing = chain.fetch_block_by_id( id );
if( existing ) { return; }
if( existing ) { return false; }

// start processing of block
auto bsf = chain.create_block_state_future( block );
Expand All @@ -369,7 +375,7 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
} );
} catch ( const guard_exception& e ) {
chain_plugin::handle_guard_exception(e);
return;
return false;
} catch( const fc::exception& e ) {
elog((e.to_detail_string()));
app().get_channel<channels::rejected_block>().publish( priority::medium, block );
Expand Down Expand Up @@ -397,6 +403,8 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
("confs", hbs->block->confirmed)("latency", (fc::time_point::now() - hbs->block->timestamp).count()/1000 ) );
}
}

return true;
}

void on_incoming_transaction_async(const packed_transaction_ptr& trx, bool persist_until_expired, next_function<transaction_trace_ptr> next) {
Expand Down Expand Up @@ -609,7 +617,7 @@ void producer_plugin::set_program_options(
"Limit (between 1 and 1000) on the multiple that CPU/NET virtual resources can extend during low usage (only enforced subjectively; use 1000 to not enforce any limit)")
("produce-time-offset-us", boost::program_options::value<int32_t>()->default_value(0),
"offset of non last block producing time in microseconds. Negative number results in blocks to go out sooner, and positive number results in blocks to go out later")
("last-block-time-offset-us", boost::program_options::value<int32_t>()->default_value(0),
("last-block-time-offset-us", boost::program_options::value<int32_t>()->default_value(-200000),
"offset of last block producing time in microseconds. Negative number results in blocks to go out sooner, and positive number results in blocks to go out later")
("max-scheduled-transaction-time-per-block-ms", boost::program_options::value<int32_t>()->default_value(100),
"Maximum wall-clock time, in milliseconds, spent retiring scheduled transactions in any block before returning to normal transaction processing.")
Expand Down Expand Up @@ -798,7 +806,7 @@ void producer_plugin::plugin_initialize(const boost::program_options::variables_
my->_incoming_block_subscription = app().get_channel<incoming::channels::block>().subscribe(
[this](const signed_block_ptr& block) {
try {
my->on_incoming_block(block);
my->on_incoming_block(block, {});
} LOG_AND_DROP();
});

Expand All @@ -810,8 +818,8 @@ void producer_plugin::plugin_initialize(const boost::program_options::variables_
});

my->_incoming_block_sync_provider = app().get_method<incoming::methods::block_sync>().register_provider(
[this](const signed_block_ptr& block) {
my->on_incoming_block(block);
[this](const signed_block_ptr& block, const std::optional<block_id_type>& block_id) {
return my->on_incoming_block(block, block_id);
});

my->_incoming_transaction_async_provider = app().get_method<incoming::methods::transaction_async>().register_provider(
Expand Down