diff --git a/plugins/producer_plugin/producer_plugin.cpp b/plugins/producer_plugin/producer_plugin.cpp index f5893366b9b..60d3ad5ff0c 100644 --- a/plugins/producer_plugin/producer_plugin.cpp +++ b/plugins/producer_plugin/producer_plugin.cpp @@ -179,12 +179,14 @@ class producer_plugin_impl : public std::enable_shared_from_this calculate_next_block_time(const account_name& producer_name, const block_timestamp_type& current_block_time) const; void schedule_production_loop(); + void schedule_maybe_produce_block( bool exhausted ); void produce_block(); bool maybe_produce_block(); bool remove_expired_trxs( const fc::time_point& deadline ); + bool block_is_exhausted() const; bool remove_expired_blacklisted_trxs( const fc::time_point& deadline ); bool process_unapplied_trxs( const fc::time_point& deadline ); - bool process_scheduled_and_incoming_trxs( const fc::time_point& deadline, size_t& pending_incoming_process_limit ); + void process_scheduled_and_incoming_trxs( const fc::time_point& deadline, size_t& pending_incoming_process_limit ); bool process_incoming_trxs( const fc::time_point& deadline, size_t& pending_incoming_process_limit ); boost::program_options::variables_map _options; @@ -205,6 +207,8 @@ class producer_plugin_impl : public std::enable_shared_from_thisprocess_incoming_transaction_async( future.get(), persist_until_expired, std::move( next ) ); + if( !self->process_incoming_transaction_async( future.get(), persist_until_expired, std::move( next ) ) ) { + if( self->_pending_block_mode == pending_block_mode::producing ) { + self->schedule_maybe_produce_block( true ); + } + } } CATCH_AND_CALL(next); } ); } }); } - void process_incoming_transaction_async(const transaction_metadata_ptr& trx, bool persist_until_expired, next_function next) { + bool process_incoming_transaction_async(const transaction_metadata_ptr& trx, bool persist_until_expired, next_function next) { + bool exhausted = false; chain::controller& chain = chain_plug->chain(); auto send_response = [this, &trx, &chain, &next](const fc::static_variant& response) { @@ -469,18 +478,18 @@ class producer_plugin_impl : public std::enable_shared_from_this( FC_LOG_MESSAGE( error, "expired transaction ${id}, expiration ${e}, block time ${bt}", ("id", id)("e", trx->packed_trx()->expiration())( "bt", bt ))))); - return; + return true; } if( chain.is_known_unexpired_transaction( id )) { send_response( std::static_pointer_cast( std::make_shared( FC_LOG_MESSAGE( error, "duplicate transaction ${id}", ("id", id)))) ); - return; + return true; } if( !chain.is_building_block()) { _unapplied_transactions.add_incoming( trx, persist_until_expired, next ); - return; + return true; } auto deadline = fc::time_point::now() + fc::milliseconds( _max_transaction_time_ms ); @@ -505,6 +514,8 @@ class producer_plugin_impl : public std::enable_shared_from_thisid())); } + if( !exhausted ) + exhausted = block_is_exhausted(); } else { auto e_ptr = trace->except->dynamic_copy_exception(); send_response( e_ptr ); @@ -525,6 +536,8 @@ class producer_plugin_impl : public std::enable_shared_from_this()->default_value(config::default_block_cpu_effort_pct / config::percent_1), "Percentage of cpu block production time used to produce last block. Whole number percentages, e.g. 80 for 80%") + ("max-block-cpu-usage-threshold-us", bpo::value()->default_value( 5000 ), + "Threshold of CPU block production to consider block full; when within threshold of max-block-cpu-usage block can be produced immediately") + ("max-block-net-usage-threshold-bytes", bpo::value()->default_value( 1024 ), + "Threshold of NET block production to consider block full; when within threshold of max-block-net-usage block can be produced immediately") ("max-scheduled-transaction-time-per-block-ms", boost::program_options::value()->default_value(100), "Maximum wall-clock time, in milliseconds, spent retiring scheduled transactions in any block before returning to normal transaction processing.") ("subjective-cpu-leeway-us", boost::program_options::value()->default_value( config::default_subjective_cpu_leeway_us ), @@ -796,6 +813,12 @@ void producer_plugin::plugin_initialize(const boost::program_options::variables_ my->_produce_time_offset_us = std::min( my->_produce_time_offset_us, cpu_effort_offset_us ); my->_last_block_time_offset_us = std::min( my->_last_block_time_offset_us, last_block_cpu_effort_offset_us ); + my->_max_block_cpu_usage_threshold_us = options.at( "max-block-cpu-usage-threshold-us" ).as(); + EOS_ASSERT( my->_max_block_cpu_usage_threshold_us < config::block_interval_us, plugin_config_exception, + "max-block-cpu-usage-threshold-us ${t} must be 0 .. ${bi}", ("bi", config::block_interval_us)("t", my->_max_block_cpu_usage_threshold_us) ); + + my->_max_block_net_usage_threshold_bytes = options.at( "max-block-net-usage-threshold-bytes" ).as(); + my->_max_scheduled_transaction_time_per_block_ms = options.at("max-scheduled-transaction-time-per-block-ms").as(); if( options.at( "subjective-cpu-leeway-us" ).as() != config::default_subjective_cpu_leeway_us ) { @@ -1547,7 +1570,7 @@ producer_plugin_impl::start_block_result producer_plugin_impl::start_block() { if( app().is_quiting() ) // db guard exception above in LOG_AND_DROP could have called app().quit() return start_block_result::failed; - if (preprocess_deadline <= fc::time_point::now()) { + if (preprocess_deadline <= fc::time_point::now() || block_is_exhausted()) { return start_block_result::exhausted; } else { if( !process_incoming_trxs( preprocess_deadline, pending_incoming_process_limit ) ) @@ -1670,6 +1693,11 @@ bool producer_plugin_impl::process_unapplied_trxs( const fc::time_point& deadlin auto trace = chain.push_transaction( trx, trx_deadline, trx->billed_cpu_time_us, false ); if( trace->except ) { if( exception_is_exhausted( *trace->except, deadline_is_subjective ) ) { + if( block_is_exhausted() ) { + exhausted = true; + // don't erase, subjective failure so try again next time + break; + } // don't erase, subjective failure so try again next time } else { // this failed our configured maximum transaction time, we don't want to replay it @@ -1696,7 +1724,7 @@ bool producer_plugin_impl::process_unapplied_trxs( const fc::time_point& deadlin return !exhausted; } -bool producer_plugin_impl::process_scheduled_and_incoming_trxs( const fc::time_point& deadline, size_t& pending_incoming_process_limit ) +void producer_plugin_impl::process_scheduled_and_incoming_trxs( const fc::time_point& deadline, size_t& pending_incoming_process_limit ) { // scheduled transactions int num_applied = 0; @@ -1715,7 +1743,7 @@ bool producer_plugin_impl::process_scheduled_and_incoming_trxs( const fc::time_p auto sch_itr = sch_idx.begin(); while( sch_itr != sch_idx.end() ) { if( sch_itr->delay_until > pending_block_time) break; // not scheduled yet - if( deadline <= fc::time_point::now() ) { + if( exhausted || deadline <= fc::time_point::now() ) { exhausted = true; break; } @@ -1751,10 +1779,13 @@ bool producer_plugin_impl::process_scheduled_and_incoming_trxs( const fc::time_p auto next = itr->next; bool persist_until_expired = itr->trx_type == trx_enum_type::incoming_persisted; itr = _unapplied_transactions.erase( itr ); - process_incoming_transaction_async( trx_meta, persist_until_expired, next ); + if( !process_incoming_transaction_async( trx_meta, persist_until_expired, next ) ) { + exhausted = true; + break; + } } - if (deadline <= fc::time_point::now()) { + if (exhausted || deadline <= fc::time_point::now()) { exhausted = true; break; } @@ -1770,6 +1801,10 @@ bool producer_plugin_impl::process_scheduled_and_incoming_trxs( const fc::time_p auto trace = chain.push_scheduled_transaction(trx_id, trx_deadline, 0, false); if (trace->except) { if (exception_is_exhausted(*trace->except, deadline_is_subjective)) { + if( block_is_exhausted() ) { + exhausted = true; + break; + } // do not blacklist } else { auto expiration = fc::time_point::now() + fc::seconds(chain.get_global_properties().configuration.deferred_trx_expiration_window); @@ -1794,8 +1829,6 @@ bool producer_plugin_impl::process_scheduled_and_incoming_trxs( const fc::time_p "Processed ${m} of ${n} scheduled transactions, Applied ${applied}, Failed/Dropped ${failed}", ( "m", num_processed )( "n", scheduled_trxs_size )( "applied", num_applied )( "failed", num_failed ) ); } - - return !exhausted; } bool producer_plugin_impl::process_incoming_trxs( const fc::time_point& deadline, size_t& pending_incoming_process_limit ) @@ -1816,14 +1849,28 @@ bool producer_plugin_impl::process_incoming_trxs( const fc::time_point& deadline auto next = itr->next; bool persist_until_expired = itr->trx_type == trx_enum_type::incoming_persisted; itr = _unapplied_transactions.erase( itr ); - process_incoming_transaction_async( trx_meta, persist_until_expired, next ); ++processed; + if( !process_incoming_transaction_async( trx_meta, persist_until_expired, next ) ) { + exhausted = true; + break; + } } fc_dlog( _log, "Processed ${n} pending transactions, ${p} left", ("n", processed)("p", _unapplied_transactions.incoming_size()) ); } return !exhausted; } +bool producer_plugin_impl::block_is_exhausted() const { + const chain::controller& chain = chain_plug->chain(); + const auto& rl = chain.get_resource_limits_manager(); + + const uint64_t cpu_limit = rl.get_block_cpu_limit(); + if( cpu_limit < _max_block_cpu_usage_threshold_us ) return true; + const uint64_t net_limit = rl.get_block_net_limit(); + if( net_limit < _max_block_net_usage_threshold_bytes ) return true; + return false; +} + // Example: // --> Start block A (block time x.500) at time x.000 // -> start_block() @@ -1831,9 +1878,7 @@ bool producer_plugin_impl::process_incoming_trxs( const fc::time_point& deadline // -> Idle // --> Start block B (block time y.000) at time x.500 void producer_plugin_impl::schedule_production_loop() { - chain::controller& chain = chain_plug->chain(); _timer.cancel(); - std::weak_ptr weak_this = shared_from_this(); auto result = start_block(); @@ -1843,7 +1888,7 @@ void producer_plugin_impl::schedule_production_loop() { // we failed to start a block, so try again later? _timer.async_wait( app().get_priority_queue().wrap( priority::high, - [weak_this, cid = ++_timer_corelation_id]( const boost::system::error_code& ec ) { + [weak_this = weak_from_this(), cid = ++_timer_corelation_id]( const boost::system::error_code& ec ) { auto self = weak_this.lock(); if( self && ec != boost::asio::error::operation_aborted && cid == self->_timer_corelation_id ) { self->schedule_production_loop(); @@ -1852,7 +1897,7 @@ void producer_plugin_impl::schedule_production_loop() { } else if (result == start_block_result::waiting_for_block){ if (!_producers.empty() && !production_disabled_by_policy()) { fc_dlog(_log, "Waiting till another block is received and scheduling Speculative/Production Change"); - schedule_delayed_production_loop(weak_this, calculate_producer_wake_up_time(calculate_pending_block_time())); + schedule_delayed_production_loop(weak_from_this(), calculate_producer_wake_up_time(calculate_pending_block_time())); } else { fc_dlog(_log, "Waiting till another block is received"); // nothing to do until more blocks arrive @@ -1862,52 +1907,52 @@ void producer_plugin_impl::schedule_production_loop() { // scheduled in start_block() } else if (_pending_block_mode == pending_block_mode::producing) { + schedule_maybe_produce_block( result == start_block_result::exhausted ); - // we succeeded but block may be exhausted - static const boost::posix_time::ptime epoch(boost::gregorian::date(1970, 1, 1)); - auto deadline = calculate_block_deadline(chain.pending_block_time()); - - if (deadline > fc::time_point::now()) { - // ship this block off no later than its deadline - EOS_ASSERT( chain.is_building_block(), missing_pending_block_state, "producing without pending_block_state, start_block succeeded" ); - _timer.expires_at( epoch + boost::posix_time::microseconds( deadline.time_since_epoch().count() )); - fc_dlog(_log, "Scheduling Block Production on Normal Block #${num} for ${time}", - ("num", chain.head_block_num()+1)("time",deadline)); - } else { - EOS_ASSERT( chain.is_building_block(), missing_pending_block_state, "producing without pending_block_state" ); - auto expect_time = chain.pending_block_time() - fc::microseconds(config::block_interval_us); - // ship this block off up to 1 block time earlier or immediately - if (fc::time_point::now() >= expect_time) { - _timer.expires_from_now( boost::posix_time::microseconds( 0 )); - fc_dlog(_log, "Scheduling Block Production on Exhausted Block #${num} immediately", - ("num", chain.head_block_num()+1)); - } else { - _timer.expires_at(epoch + boost::posix_time::microseconds(expect_time.time_since_epoch().count())); - fc_dlog(_log, "Scheduling Block Production on Exhausted Block #${num} at ${time}", - ("num", chain.head_block_num()+1)("time",expect_time)); - } - } - - _timer.async_wait( app().get_priority_queue().wrap( priority::high, - [&chain,weak_this,cid=++_timer_corelation_id](const boost::system::error_code& ec) { - auto self = weak_this.lock(); - if( self && ec != boost::asio::error::operation_aborted && cid == self->_timer_corelation_id ) { - // pending_block_state expected, but can't assert inside async_wait - auto block_num = chain.is_building_block() ? chain.head_block_num() + 1 : 0; - fc_dlog( _log, "Produce block timer for ${num} running at ${time}", ("num", block_num)("time", fc::time_point::now()) ); - auto res = self->maybe_produce_block(); - fc_dlog( _log, "Producing Block #${num} returned: ${res}", ("num", block_num)( "res", res ) ); - } - } ) ); } else if (_pending_block_mode == pending_block_mode::speculating && !_producers.empty() && !production_disabled_by_policy()){ + chain::controller& chain = chain_plug->chain(); fc_dlog(_log, "Speculative Block Created; Scheduling Speculative/Production Change"); EOS_ASSERT( chain.is_building_block(), missing_pending_block_state, "speculating without pending_block_state" ); - schedule_delayed_production_loop(weak_this, calculate_producer_wake_up_time(chain.pending_block_time())); + schedule_delayed_production_loop(weak_from_this(), calculate_producer_wake_up_time(chain.pending_block_time())); } else { fc_dlog(_log, "Speculative Block Created"); } } +void producer_plugin_impl::schedule_maybe_produce_block( bool exhausted ) { + chain::controller& chain = chain_plug->chain(); + + // we succeeded but block may be exhausted + static const boost::posix_time::ptime epoch( boost::gregorian::date( 1970, 1, 1 ) ); + auto deadline = calculate_block_deadline( chain.pending_block_time() ); + + if( !exhausted && deadline > fc::time_point::now() ) { + // ship this block off no later than its deadline + EOS_ASSERT( chain.is_building_block(), missing_pending_block_state, + "producing without pending_block_state, start_block succeeded" ); + _timer.expires_at( epoch + boost::posix_time::microseconds( deadline.time_since_epoch().count() ) ); + fc_dlog( _log, "Scheduling Block Production on Normal Block #${num} for ${time}", + ("num", chain.head_block_num() + 1)( "time", deadline ) ); + } else { + EOS_ASSERT( chain.is_building_block(), missing_pending_block_state, "producing without pending_block_state" ); + _timer.expires_from_now( boost::posix_time::microseconds( 0 ) ); + fc_dlog( _log, "Scheduling Block Production on ${desc} Block #${num} immediately", + ("num", chain.head_block_num() + 1)("desc", block_is_exhausted() ? "Exhausted" : "Deadline exceeded") ); + } + + _timer.async_wait( app().get_priority_queue().wrap( priority::high, + [&chain, weak_this = weak_from_this(), cid=++_timer_corelation_id](const boost::system::error_code& ec) { + auto self = weak_this.lock(); + if( self && ec != boost::asio::error::operation_aborted && cid == self->_timer_corelation_id ) { + // pending_block_state expected, but can't assert inside async_wait + auto block_num = chain.is_building_block() ? chain.head_block_num() + 1 : 0; + fc_dlog( _log, "Produce block timer for ${num} running at ${time}", ("num", block_num)("time", fc::time_point::now()) ); + auto res = self->maybe_produce_block(); + fc_dlog( _log, "Producing Block #${num} returned: ${res}", ("num", block_num)( "res", res ) ); + } + } ) ); +} + optional producer_plugin_impl::calculate_producer_wake_up_time( const block_timestamp_type& ref_block_time ) const { // if we have any producers then we should at least set a timer for our next available slot optional wake_up_time;