Skip to content

Commit

Permalink
Merge pull request #4424 from pwojcikdev/blockprocessor-context
Browse files Browse the repository at this point in the history
Blockprocessor context
  • Loading branch information
pwojcikdev authored Feb 14, 2024
2 parents 1b9807a + 5927a95 commit 6c0c27a
Show file tree
Hide file tree
Showing 25 changed files with 258 additions and 386 deletions.
6 changes: 6 additions & 0 deletions nano/core_test/active_transactions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,12 @@ TEST (active_transactions, keep_local)
auto const send4 = wallet.send_action (nano::dev::genesis_key.pub, key4.pub, node.config.receive_minimum.number ());
auto const send5 = wallet.send_action (nano::dev::genesis_key.pub, key5.pub, node.config.receive_minimum.number ());
auto const send6 = wallet.send_action (nano::dev::genesis_key.pub, key6.pub, node.config.receive_minimum.number ());
ASSERT_NE (nullptr, send1);
ASSERT_NE (nullptr, send2);
ASSERT_NE (nullptr, send3);
ASSERT_NE (nullptr, send4);
ASSERT_NE (nullptr, send5);
ASSERT_NE (nullptr, send6);

// force-confirm blocks
for (auto const & block : { send1, send2, send3, send4, send5, send6 })
Expand Down
49 changes: 1 addition & 48 deletions nano/core_test/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2111,53 +2111,6 @@ TEST (node, block_confirm)
ASSERT_TIMELY_EQ (10s, node1.active.recently_cemented.list ().size (), 1);
}

TEST (node, block_arrival)
{
nano::test::system system (1);
auto & node (*system.nodes[0]);
ASSERT_EQ (0, node.block_arrival.arrival.size ());
nano::block_hash hash1 (1);
node.block_arrival.add (hash1);
ASSERT_EQ (1, node.block_arrival.arrival.size ());
node.block_arrival.add (hash1);
ASSERT_EQ (1, node.block_arrival.arrival.size ());
nano::block_hash hash2 (2);
node.block_arrival.add (hash2);
ASSERT_EQ (2, node.block_arrival.arrival.size ());
}

TEST (node, block_arrival_size)
{
nano::test::system system (1);
auto & node (*system.nodes[0]);
auto time (std::chrono::steady_clock::now () - nano::block_arrival::arrival_time_min - std::chrono::seconds (5));
nano::block_hash hash (0);
for (auto i (0); i < nano::block_arrival::arrival_size_min * 2; ++i)
{
node.block_arrival.arrival.push_back (nano::block_arrival_info{ time, hash });
++hash.qwords[0];
}
ASSERT_EQ (nano::block_arrival::arrival_size_min * 2, node.block_arrival.arrival.size ());
node.block_arrival.recent (0);
ASSERT_EQ (nano::block_arrival::arrival_size_min, node.block_arrival.arrival.size ());
}

TEST (node, block_arrival_time)
{
nano::test::system system (1);
auto & node (*system.nodes[0]);
auto time (std::chrono::steady_clock::now ());
nano::block_hash hash (0);
for (auto i (0); i < nano::block_arrival::arrival_size_min * 2; ++i)
{
node.block_arrival.arrival.push_back (nano::block_arrival_info{ time, hash });
++hash.qwords[0];
}
ASSERT_EQ (nano::block_arrival::arrival_size_min * 2, node.block_arrival.arrival.size ());
node.block_arrival.recent (0);
ASSERT_EQ (nano::block_arrival::arrival_size_min * 2, node.block_arrival.arrival.size ());
}

TEST (node, confirm_quorum)
{
nano::test::system system (1);
Expand Down Expand Up @@ -2958,7 +2911,7 @@ TEST (node, block_processor_reject_state)
send1->signature.bytes[0] ^= 1;
ASSERT_FALSE (node.ledger.block_or_pruned_exists (send1->hash ()));
node.process_active (send1);
ASSERT_TIMELY_EQ (5s, 1, node.stats.count (nano::stat::type::blockprocessor, nano::stat::detail::bad_signature));
ASSERT_TIMELY_EQ (5s, 1, node.stats.count (nano::stat::type::blockprocessor_result, nano::stat::detail::bad_signature));
ASSERT_FALSE (node.ledger.block_or_pruned_exists (send1->hash ()));
auto send2 = builder.make_block ()
.account (nano::dev::genesis_key.pub)
Expand Down
2 changes: 0 additions & 2 deletions nano/lib/locks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,6 @@ char const * nano::mutex_identifier (mutexes mutex)
{
case mutexes::active:
return "active";
case mutexes::block_arrival:
return "block_arrival";
case mutexes::block_processor:
return "block_processor";
case mutexes::block_uniquer:
Expand Down
1 change: 0 additions & 1 deletion nano/lib/locks.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ bool any_filters_registered ();
enum class mutexes
{
active,
block_arrival,
block_processor,
block_uniquer,
blockstore_cache,
Expand Down
16 changes: 16 additions & 0 deletions nano/lib/stats_enums.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ enum class type : uint8_t
vote_cache,
hinting,
blockprocessor,
blockprocessor_source,
blockprocessor_result,
bootstrap_server,
active,
active_started,
Expand Down Expand Up @@ -71,6 +73,7 @@ enum class detail : uint8_t
top,
none,
success,
unknown,

// processing queue
queue,
Expand Down Expand Up @@ -110,6 +113,19 @@ enum class detail : uint8_t
representative_mismatch,
block_position,

// blockprocessor
process_blocking,
process_blocking_timeout,
force,

// block source
live,
bootstrap,
bootstrap_legacy,
unchecked,
local,
forced,

// message specific
not_a_type,
invalid,
Expand Down
4 changes: 0 additions & 4 deletions nano/node/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,8 @@ add_library(
backlog_population.cpp
bandwidth_limiter.hpp
bandwidth_limiter.cpp
block_arrival.hpp
block_arrival.cpp
block_broadcast.cpp
block_broadcast.hpp
blocking_observer.cpp
blocking_observer.hpp
blockprocessor.hpp
blockprocessor.cpp
bootstrap/block_deserializer.hpp
Expand Down
4 changes: 2 additions & 2 deletions nano/node/active_transactions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ nano::active_transactions::active_transactions (nano::node & node_a, nano::confi
});

// Notify elections about alternative (forked) blocks
block_processor.processed.add ([this] (auto const & result, auto const & block) {
block_processor.block_processed.add ([this] (auto const & result, auto const & context) {
switch (result.code)
{
case nano::process_result::fork:
publish (block);
publish (context.block);
break;
default:
break;
Expand Down
35 changes: 0 additions & 35 deletions nano/node/block_arrival.cpp

This file was deleted.

49 changes: 0 additions & 49 deletions nano/node/block_arrival.hpp

This file was deleted.

40 changes: 7 additions & 33 deletions nano/node/block_broadcast.cpp
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
#include <nano/node/block_arrival.hpp>
#include <nano/node/block_broadcast.hpp>
#include <nano/node/blockprocessor.hpp>
#include <nano/node/network.hpp>

nano::block_broadcast::block_broadcast (nano::network & network, nano::block_arrival & block_arrival, bool enabled) :
nano::block_broadcast::block_broadcast (nano::network & network, bool enabled) :
network{ network },
block_arrival{ block_arrival },
enabled{ enabled }
{
}
Expand All @@ -16,34 +14,30 @@ void nano::block_broadcast::connect (nano::block_processor & block_processor)
{
return;
}
block_processor.processed.add ([this] (auto const & result, auto const & block) {
block_processor.block_processed.add ([this] (auto const & result, auto const & context) {
switch (result.code)
{
case nano::process_result::progress:
observe (block);
observe (context);
break;
default:
break;
}
erase (block);
});
}

void nano::block_broadcast::observe (std::shared_ptr<nano::block> block)
void nano::block_broadcast::observe (nano::block_processor::context const & context)
{
nano::unique_lock<nano::mutex> lock{ mutex };
auto existing = local.find (block);
auto local_l = existing != local.end ();
lock.unlock ();
if (local_l)
auto const & block = context.block;
if (context.source == nano::block_source::local)
{
// Block created on this node
// Perform more agressive initial flooding
network.flood_block_initial (block);
}
else
{
if (block_arrival.recent (block->hash ()))
if (context.source != nano::block_source::bootstrap && context.source != nano::block_source::bootstrap_legacy)
{
// Block arrived from realtime traffic, do normal gossip.
network.flood_block (block, nano::transport::buffer_drop_policy::limiter);
Expand All @@ -55,23 +49,3 @@ void nano::block_broadcast::observe (std::shared_ptr<nano::block> block)
}
}
}

void nano::block_broadcast::set_local (std::shared_ptr<nano::block> block)
{
if (!enabled)
{
return;
}
nano::lock_guard<nano::mutex> lock{ mutex };
local.insert (block);
}

void nano::block_broadcast::erase (std::shared_ptr<nano::block> block)
{
if (!enabled)
{
return;
}
nano::lock_guard<nano::mutex> lock{ mutex };
local.erase (block);
}
14 changes: 4 additions & 10 deletions nano/node/block_broadcast.hpp
Original file line number Diff line number Diff line change
@@ -1,34 +1,28 @@
#pragma once

#include <nano/lib/blocks.hpp>
#include <nano/node/blockprocessor.hpp>

#include <memory>
#include <unordered_set>

namespace nano
{
class block_arrival;
class block_processor;
class network;

// This class tracks blocks that originated from this node.
class block_broadcast
{
public:
block_broadcast (nano::network & network, nano::block_arrival & block_arrival, bool enabled = false);
block_broadcast (nano::network & network, bool enabled = false);
// Add batch_processed observer to block_processor if enabled
void connect (nano::block_processor & block_processor);
// Mark a block as originating locally
void set_local (std::shared_ptr<nano::block> block);
void erase (std::shared_ptr<nano::block> block);

private:
// Block_processor observer
void observe (std::shared_ptr<nano::block> block);
void observe (nano::block_processor::context const &);

nano::network & network;
nano::block_arrival & block_arrival;
std::unordered_set<std::shared_ptr<nano::block>> local; // Blocks originated on this node
nano::mutex mutex;
bool enabled;
};
}
Loading

0 comments on commit 6c0c27a

Please sign in to comment.