Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Zipkin - develop #9631

Merged
merged 10 commits into from
Nov 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions libraries/rodeos/rodeos.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <b1/rodeos/callbacks/kv.hpp>
#include <b1/rodeos/rodeos_tables.hpp>
#include <fc/log/trace.hpp>

namespace b1::rodeos {

Expand Down Expand Up @@ -162,6 +163,13 @@ void rodeos_db_snapshot::write_block_info(uint32_t block_num, const eosio::check
table.put(info);
}

namespace {
std::string to_string( const eosio::checksum256& cs ) {
auto bytes = cs.extract_as_byte_array();
return fc::to_hex((const char*)bytes.data(), bytes.size());
}
}

void rodeos_db_snapshot::write_block_info(const ship_protocol::get_blocks_result_v0& result) {
check_write(result);
if (!result.block)
Expand All @@ -172,6 +180,12 @@ void rodeos_db_snapshot::write_block_info(const ship_protocol::get_blocks_result
signed_block_header block;
from_bin(block, bin);

auto blk_trace = fc_create_trace_with_id( "Block", result.this_block->block_id );
auto blk_span = fc_create_span( blk_trace, "rodeos-received" );
fc_add_tag( blk_span, "block_id", to_string( result.this_block->block_id ) );
fc_add_tag( blk_span, "block_num", block_num );
fc_add_tag( blk_span, "block_time", block.timestamp.to_time_point().elapsed.count() );

write_block_info(block_num, result.this_block->block_id, block);
}

Expand All @@ -184,6 +198,13 @@ void rodeos_db_snapshot::write_block_info(const ship_protocol::get_blocks_result

const signed_block_header& header =
std::visit([](const auto& blk) { return static_cast<const signed_block_header&>(blk); }, *result.block);

auto blk_trace = fc_create_trace_with_id( "Block", result.this_block->block_id );
auto blk_span = fc_create_span( blk_trace, "rodeos-received" );
fc_add_tag( blk_span, "block_id", to_string( result.this_block->block_id ) );
fc_add_tag( blk_span, "block_num", block_num );
fc_add_tag( blk_span, "block_time", eosio::microseconds_to_str( header.timestamp.to_time_point().elapsed.count() ) );

write_block_info(block_num, result.this_block->block_id, header);
}

Expand Down
70 changes: 64 additions & 6 deletions plugins/chain_plugin/chain_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

#include <fc/io/json.hpp>
#include <fc/variant.hpp>
#include <fc/log/trace.hpp>
#include <signal.h>
#include <cstdlib>

Expand Down Expand Up @@ -322,6 +323,12 @@ void chain_plugin::set_program_options(options_description& cli, options_descrip
"print contract's output to console")
("deep-mind", bpo::bool_switch()->default_value(false),
"print deeper information about chain operations")
("telemetry-url", bpo::value<std::string>(),
"Send Zipkin spans to url. e.g. http://127.0.0.1:9411/api/v2/spans" )
("telemetry-service-name", bpo::value<std::string>()->default_value("nodeos"),
"Zipkin localEndpoint.serviceName sent with each span" )
("telemetry-timeout-us", bpo::value<uint32_t>()->default_value(200000),
"Timeout for sending Zipkin span." )
("actor-whitelist", boost::program_options::value<vector<string>>()->composing()->multitoken(),
"Account added to actor whitelist (may specify multiple times)")
("actor-blacklist", boost::program_options::value<vector<string>>()->composing()->multitoken(),
Expand Down Expand Up @@ -1205,6 +1212,13 @@ void chain_plugin::plugin_initialize(const variables_map& options) {
my->chain->enable_deep_mind( &_deep_mind_log );
}

if (options.count("telemetry-url")) {
fc::zipkin_config::init( options["telemetry-url"].as<std::string>(),
options["telemetry-service-name"].as<std::string>(),
options["telemetry-timeout-us"].as<uint32_t>() );
}


// set up method providers
my->get_block_by_number_provider = app().get_method<methods::get_block_by_number>().register_provider(
[this]( uint32_t block_num ) -> signed_block_ptr {
Expand Down Expand Up @@ -1351,6 +1365,7 @@ void chain_plugin::plugin_shutdown() {
if(app().is_quiting())
my->chain->get_wasm_interface().indicate_shutting_down();
my->chain.reset();
zipkin_config::shutdown();
}

void chain_plugin::handle_sighup() {
Expand Down Expand Up @@ -2772,12 +2787,34 @@ void read_write::push_transaction(const read_write::push_transaction_params& par
input_trx = std::make_shared<packed_transaction>( std::move( input_trx_v0 ), true );
} EOS_RETHROW_EXCEPTIONS(chain::packed_transaction_type_exception, "Invalid packed transaction")

auto trx_trace = fc_create_trace_with_id("Transaction", input_trx->id());
auto trx_span = fc_create_span(trx_trace, "HTTP Received");
fc_add_tag(trx_span, "trx_id", input_trx->id());
fc_add_tag(trx_span, "method", "push_transaction");

app().get_method<incoming::methods::transaction_async>()(input_trx, true,
[this, next](const std::variant<fc::exception_ptr, transaction_trace_ptr>& result) -> void {
[this, token=trx_trace.get_token(), input_trx, next]
(const std::variant<fc::exception_ptr, transaction_trace_ptr>& result) -> void {

auto trx_span = fc_create_span_from_token(token, "Processed");
fc_add_tag(trx_span, "trx_id", input_trx->id());

if (std::holds_alternative<fc::exception_ptr>(result)) {
next(std::get<fc::exception_ptr>(result));
auto& eptr = std::get<fc::exception_ptr>(result);
fc_add_tag(trx_span, "error", eptr->to_string());
next(eptr);
} else {
auto trx_trace_ptr = std::get<transaction_trace_ptr>(result);
auto& trx_trace_ptr = std::get<transaction_trace_ptr>(result);

fc_add_tag(trx_span, "block_num", trx_trace_ptr->block_num);
fc_add_tag(trx_span, "block_time", trx_trace_ptr->block_time.to_time_point());
fc_add_tag(trx_span, "elapsed", trx_trace_ptr->elapsed.count());
if( trx_trace_ptr->receipt ) {
fc_add_tag(trx_span, "status", std::string(trx_trace_ptr->receipt->status));
}
if( trx_trace_ptr->except ) {
fc_add_tag(trx_span, "error", trx_trace_ptr->except->to_string());
}

try {
fc::variant output;
Expand Down Expand Up @@ -2893,12 +2930,33 @@ void read_write::send_transaction(const read_write::send_transaction_params& par
input_trx = std::make_shared<packed_transaction>( std::move( input_trx_v0 ), true );
} EOS_RETHROW_EXCEPTIONS(chain::packed_transaction_type_exception, "Invalid packed transaction")

auto trx_trace = fc_create_trace_with_id("Transaction", input_trx->id());
auto trx_span = fc_create_span(trx_trace, "HTTP Received");
fc_add_tag(trx_span, "trx_id", input_trx->id());
fc_add_tag(trx_span, "method", "send_transaction");

app().get_method<incoming::methods::transaction_async>()(input_trx, true,
[this, next](const std::variant<fc::exception_ptr, transaction_trace_ptr>& result) -> void {
[this, token=trx_trace.get_token(), input_trx, next]
(const std::variant<fc::exception_ptr, transaction_trace_ptr>& result) -> void {
auto trx_span = fc_create_span_from_token(token, "Processed");
fc_add_tag(trx_span, "trx_id", input_trx->id());

if (std::holds_alternative<fc::exception_ptr>(result)) {
next(std::get<fc::exception_ptr>(result));
auto& eptr = std::get<fc::exception_ptr>(result);
fc_add_tag(trx_span, "error", eptr->to_string());
next(eptr);
} else {
auto trx_trace_ptr = std::get<transaction_trace_ptr>(result);
auto& trx_trace_ptr = std::get<transaction_trace_ptr>(result);

fc_add_tag(trx_span, "block_num", trx_trace_ptr->block_num);
fc_add_tag(trx_span, "block_time", trx_trace_ptr->block_time.to_time_point());
fc_add_tag(trx_span, "elapsed", trx_trace_ptr->elapsed.count());
if( trx_trace_ptr->receipt ) {
fc_add_tag(trx_span, "status", std::string(trx_trace_ptr->receipt->status));
}
if( trx_trace_ptr->except ) {
fc_add_tag(trx_span, "error", trx_trace_ptr->except->to_string());
}

try {
fc::variant output;
Expand Down
16 changes: 16 additions & 0 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <fc/io/raw.hpp>
#include <fc/log/appender.hpp>
#include <fc/log/logger_config.hpp>
#include <fc/log/trace.hpp>
#include <fc/reflect/variant.hpp>
#include <fc/crypto/rand.hpp>
#include <fc/exception/exception.hpp>
Expand Down Expand Up @@ -3313,6 +3314,13 @@ namespace eosio {
controller& cc = chain_plug->chain();
dispatcher->strand.post( [this, bs]() {
fc_dlog( logger, "signaled accepted_block, blk num = ${num}, id = ${id}", ("num", bs->block_num)("id", bs->id) );

auto blk_trace = fc_create_trace_with_id( "Block", bs->id );
auto blk_span = fc_create_span( blk_trace, "Accepted" );
fc_add_tag( blk_span, "block_id", bs->id );
fc_add_tag( blk_span, "block_num", bs->block_num );
fc_add_tag( blk_span, "block_time", bs->block->timestamp.to_time_point() );

dispatcher->bcast_block( bs->block, bs->id );
});
}
Expand All @@ -3325,6 +3333,14 @@ namespace eosio {
dispatcher->strand.post( [this, block]() {
auto id = block->calculate_id();
fc_dlog( logger, "signaled pre_accepted_block, blk num = ${num}, id = ${id}", ("num", block->block_num())("id", id) );

auto blk_trace = fc_create_trace_with_id("Block", id);
auto blk_span = fc_create_span(blk_trace, "PreAccepted");
fc_add_tag(blk_span, "block_id", id);
fc_add_tag(blk_span, "block_num", block->block_num());
fc_add_tag(blk_span, "block_time", block->timestamp.to_time_point());
fc_add_tag(blk_span, "producer", block->producer.to_string());

dispatcher->bcast_block( block, id );
});
}
Expand Down
20 changes: 20 additions & 0 deletions plugins/state_history_plugin/state_history_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
#include <eosio/state_history/serialization.hpp>
#include <eosio/state_history_plugin/state_history_plugin.hpp>

#include <fc/log/trace.hpp>

#include <boost/asio/bind_executor.hpp>
#include <boost/asio/ip/host_name.hpp>
#include <boost/asio/ip/tcp.hpp>
Expand Down Expand Up @@ -237,6 +239,19 @@ struct state_history_plugin_impl : std::enable_shared_from_this<state_history_pl
--current_request->max_messages_in_flight;
need_to_send_update = current_request->start_block_num <= current &&
current_request->start_block_num < current_request->end_block_num;

std::visit( []( auto&& ptr ) {
if( ptr ) {
if (fc::zipkin_config::is_enabled()) {
auto id = ptr->calculate_id();
auto blk_trace = fc_create_trace_with_id( "Block", id );
auto blk_span = fc_create_span( blk_trace, "SHiP-Send" );
fc_add_tag( blk_span, "block_id", id );
fc_add_tag( blk_span, "block_num", ptr->block_num() );
fc_add_tag( blk_span, "block_time", ptr->timestamp.to_time_point() );
}
}
}, result.block );
}

void send_update(const block_state_ptr& block_state) {
Expand Down Expand Up @@ -352,6 +367,11 @@ struct state_history_plugin_impl : std::enable_shared_from_this<state_history_pl
}

void on_accepted_block(const block_state_ptr& block_state) {
auto blk_trace = fc_create_trace_with_id("Block", block_state->id);
auto blk_span = fc_create_span(blk_trace, "SHiP-Accepted");
fc_add_tag(blk_span, "block_id", block_state->id);
fc_add_tag(blk_span, "block_num", block_state->block_num);
fc_add_tag(blk_span, "block_time", block_state->block->timestamp.to_time_point());
if (trace_log)
trace_log->store(chain_plug->chain().db(), block_state);
if (chain_state_log)
Expand Down
18 changes: 18 additions & 0 deletions programs/nodeos/logging.json
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,16 @@
"host": "host_name"
},
"enabled": true
},{
"name": "zip",
"type": "zipkin",
"args": {
"endpoint": "http://127.0.0.1:9411",
"path": "/api/v2/spans",
"service_name": "nodeos",
"timeout_us": 200000
},
"enabled": true
}
],
"loggers": [{
Expand All @@ -57,6 +67,14 @@
"stderr",
"net"
]
},{
"name": "zipkin",
"level": "debug",
"enabled": true,
"additivity": false,
"appenders": [
"zip"
]
},{
"name": "net_plugin_impl",
"level": "info",
Expand Down
28 changes: 26 additions & 2 deletions programs/rodeos/cloner_plugin.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
#include "cloner_plugin.hpp"
#include "ship_client.hpp"
#include "config.hpp"

#include <b1/rodeos/rodeos.hpp>

#include <fc/log/logger.hpp>
#include <fc/log/logger_config.hpp>
#include <fc/log/trace.hpp>

#include <boost/asio/connect.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/beast/core.hpp>
Expand Down Expand Up @@ -219,6 +225,12 @@ void cloner_plugin::set_program_options(options_description& cli, options_descri
clop("clone-stop,x", bpo::value<uint32_t>(), "Stop before block [arg]");
op("clone-exit-on-filter-wasm-error", bpo::bool_switch()->default_value(false),
"Shutdown application if filter wasm throws an exception");
op("telemetry-url", bpo::value<std::string>(),
"Send Zipkin spans to url. e.g. http://127.0.0.1:9411/api/v2/spans" );
op("telemetry-service-name", bpo::value<std::string>()->default_value(b1::rodeos::config::rodeos_executable_name),
"Zipkin localEndpoint.serviceName sent with each span" );
op("telemetry-timeout-us", bpo::value<uint32_t>()->default_value(200000),
"Timeout for sending Zipkin span." );
// todo: remove
op("filter-name", bpo::value<std::string>(), "Filter name");
op("filter-wasm", bpo::value<std::string>(), "Filter wasm");
Expand All @@ -243,21 +255,33 @@ void cloner_plugin::plugin_initialize(const variables_map& options) {
} else if (options.count("filter-name") || options.count("filter-wasm")) {
throw std::runtime_error("filter-name and filter-wasm must be used together");
}
if (options.count("telemetry-url")) {
fc::zipkin_config::init( options["telemetry-url"].as<std::string>(),
options["telemetry-service-name"].as<std::string>(),
options["telemetry-timeout-us"].as<uint32_t>() );
}
}
FC_LOG_AND_RETHROW()
}

void cloner_plugin::plugin_startup() { my->start(); }
void cloner_plugin::plugin_startup() {
handle_sighup();
my->start();
}

void cloner_plugin::plugin_shutdown() {
if (my->session)
my->session->connection->close(false);
my->timer.cancel();
fc::zipkin_config::shutdown();
ilog("cloner_plugin stopped");
}

void cloner_plugin::set_streamer(std::function<void(const char* data, uint64_t data_size)> streamer_func) {
my->streamer = streamer_func;
my->streamer = std::move(streamer_func);
}

void cloner_plugin::handle_sighup() {
}

} // namespace b1
1 change: 1 addition & 0 deletions programs/rodeos/cloner_plugin.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class cloner_plugin : public appbase::plugin<cloner_plugin> {
void plugin_initialize(const appbase::variables_map& options);
void plugin_startup();
void plugin_shutdown();
void handle_sighup() override;

void set_streamer(std::function<void(const char* data, uint64_t data_size)> streamer_function);

Expand Down