Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Speculative validation optimizations #9593

Merged
merged 4 commits into from
Oct 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,6 @@ struct unapplied_transaction {
* Persisted are first so that they can be applied in each block until expired.
*/
class unapplied_transaction_queue {
public:
enum class process_mode {
non_speculative, // HEAD, READ_ONLY, IRREVERSIBLE
speculative_non_producer, // will never produce
speculative_producer // can produce
};

private:
struct by_trx_id;
struct by_type;
Expand All @@ -72,7 +65,6 @@ class unapplied_transaction_queue {
> unapplied_trx_queue_type;

unapplied_trx_queue_type queue;
process_mode mode = process_mode::speculative_producer;
uint64_t max_transaction_queue_size = 1024*1024*1024; // enforced for incoming
uint64_t size_in_bytes = 0;
size_t incoming_count = 0;
Expand All @@ -81,13 +73,6 @@ class unapplied_transaction_queue {

void set_max_transaction_queue_size( uint64_t v ) { max_transaction_queue_size = v; }

void set_mode( process_mode new_mode ) {
if( new_mode != mode ) {
FC_ASSERT( empty(), "set_mode, queue required to be empty" );
}
mode = new_mode;
}

bool empty() const {
return queue.empty();
}
Expand Down Expand Up @@ -162,7 +147,6 @@ class unapplied_transaction_queue {
}

void add_forked( const branch_type& forked_branch ) {
if( mode == process_mode::non_speculative || mode == process_mode::speculative_non_producer ) return;
// forked_branch is in reverse order
for( auto ritr = forked_branch.rbegin(), rend = forked_branch.rend(); ritr != rend; ++ritr ) {
const block_state_ptr& bsptr = *ritr;
Expand All @@ -176,7 +160,6 @@ class unapplied_transaction_queue {
}

void add_aborted( deque<transaction_metadata_ptr> aborted_trxs ) {
if( mode == process_mode::non_speculative || mode == process_mode::speculative_non_producer ) return;
for( auto& trx : aborted_trxs ) {
fc::time_point expiry = trx->packed_trx()->expiration();
auto insert_itr = queue.insert( { std::move( trx ), expiry, trx_enum_type::aborted } );
Expand All @@ -185,7 +168,6 @@ class unapplied_transaction_queue {
}

void add_persisted( const transaction_metadata_ptr& trx ) {
if( mode == process_mode::non_speculative ) return;
auto itr = queue.get<by_trx_id>().find( trx->id() );
if( itr == queue.get<by_trx_id>().end() ) {
fc::time_point expiry = trx->packed_trx()->expiration();
Expand Down
23 changes: 16 additions & 7 deletions plugins/producer_plugin/producer_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,14 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
return true;
}

void restart_speculative_block() {
chain::controller& chain = chain_plug->chain();
// abort the pending block
_unapplied_transactions.add_aborted( chain.abort_block() );

schedule_production_loop();
}

void on_incoming_transaction_async(const packed_transaction_ptr& trx, bool persist_until_expired, next_function<transaction_trace_ptr> next) {
chain::controller& chain = chain_plug->chain();
const auto max_trx_time_ms = _max_transaction_time_ms.load();
Expand All @@ -445,6 +453,8 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
if( !self->process_incoming_transaction_async( result, persist_until_expired, next ) ) {
if( self->_pending_block_mode == pending_block_mode::producing ) {
self->schedule_maybe_produce_block( true );
} else {
self->restart_speculative_block();
}
}
} CATCH_AND_CALL(exception_handler);
Expand Down Expand Up @@ -723,11 +733,6 @@ void producer_plugin::plugin_initialize(const boost::program_options::variables_
LOAD_VALUE_SET(options, "producer-name", my->_producers)

chain::controller& chain = my->chain_plug->chain();
unapplied_transaction_queue::process_mode unapplied_mode =
(chain.get_read_mode() != chain::db_read_mode::SPECULATIVE) ? unapplied_transaction_queue::process_mode::non_speculative :
my->_producers.empty() ? unapplied_transaction_queue::process_mode::speculative_non_producer :
unapplied_transaction_queue::process_mode::speculative_producer;
my->_unapplied_transactions.set_mode( unapplied_mode );

if( options.count("private-key") )
{
Expand Down Expand Up @@ -1356,8 +1361,12 @@ fc::time_point producer_plugin_impl::calculate_pending_block_time() const {
}

fc::time_point producer_plugin_impl::calculate_block_deadline( const fc::time_point& block_time ) const {
bool last_block = ((block_timestamp_type(block_time).slot % config::producer_repetitions) == config::producer_repetitions - 1);
return block_time + fc::microseconds(last_block ? _last_block_time_offset_us : _produce_time_offset_us);
if( _pending_block_mode == pending_block_mode::producing ) {
bool last_block = ((block_timestamp_type( block_time ).slot % config::producer_repetitions) == config::producer_repetitions - 1);
return block_time + fc::microseconds(last_block ? _last_block_time_offset_us : _produce_time_offset_us);
} else {
return block_time + fc::microseconds(_produce_time_offset_us);
}
}

producer_plugin_impl::start_block_result producer_plugin_impl::start_block() {
Expand Down