Skip to content

Commit

Permalink
Improve tracing
Browse files Browse the repository at this point in the history
  • Loading branch information
mi-yu committed Oct 18, 2023
1 parent 7d8c3a8 commit c4c913e
Show file tree
Hide file tree
Showing 8 changed files with 105 additions and 42 deletions.
45 changes: 27 additions & 18 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ autobins = false
rust-version = "1.67"

[workspace]
members = [".", "test-bitcoincore-rpc"]
members = [".", "test-bitcoincore-rpc", "ord-kafka-macros"]

[dependencies]
anyhow = { version = "1.0.56", features = ["backtrace"] }
Expand Down Expand Up @@ -64,6 +64,7 @@ tower-http = { version = "0.4.0", features = ["compression-br", "compression-gzi

rdkafka = { version = "0.33.2" }
axum-jrpc = { version = "0.5.1", features = ["serde_json", "anyhow_error"] }
ord-kafka-macros = { path = "ord-kafka-macros" }


[dev-dependencies]
Expand Down
11 changes: 11 additions & 0 deletions ord-kafka-macros/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[package]
name = "ord-kafka-macros"
version = "0.1.0"
edition = "2021"

[lib]
proc-macro = true

[dependencies]
quote = "1.0.33"
syn = { version = "2.0.38", features=[ "full" ] }
32 changes: 32 additions & 0 deletions ord-kafka-macros/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use proc_macro::TokenStream;
use quote::quote;
use syn::{parse_macro_input, ItemFn};

/// Trace a function with a given operation name. Example usage:
/// `#[trace]`, the span name will be the function's name.
/// Alternatively, `#[trace("my_span_name")]` will use the given span name
#[proc_macro_attribute]
pub fn trace(args: TokenStream, input: TokenStream) -> TokenStream {
let func = parse_macro_input!(input as ItemFn);
let func_name = &func.sig.ident;
let inputs = &func.sig.inputs;
let output = &func.sig.output;
let block = &func.block;

let name = if args.is_empty() {
func_name.to_string()
} else {
parse_macro_input!(args as syn::LitStr).value()
};

let expanded = quote! {
fn #func_name(#inputs) #output {
let tracer = opentelemetry::global::tracer("ord-kafka");
tracer.in_span(#name, |_| {
#block
})
}
};

TokenStream::from(expanded)
}
32 changes: 20 additions & 12 deletions src/index/updater.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
use opentelemetry::{global, trace::Tracer};
use opentelemetry::{
trace::{Span, Tracer},
Context,
};
use ord_kafka_macros::trace;
use {
self::inscription_updater::InscriptionUpdater,
super::{fetcher::Fetcher, *},
Expand Down Expand Up @@ -91,18 +95,15 @@ impl<'index> Updater<'_> {
let mut uncommitted = 0;
let mut value_cache = HashMap::new();

let tracer = global::tracer("updater");
while let Ok(block) = rx.recv() {
tracer.in_span("index_block", |_| {
self.index_block(
self.index,
&mut outpoint_sender,
&mut value_receiver,
&mut wtx,
block,
&mut value_cache,
)
})?;
self.index_block(
self.index,
&mut outpoint_sender,
&mut value_receiver,
&mut wtx,
block,
&mut value_cache,
)?;

if let Some(progress_bar) = &mut progress_bar {
progress_bar.inc(1);
Expand Down Expand Up @@ -162,6 +163,7 @@ impl<'index> Updater<'_> {
Ok(())
}

#[trace]
fn fetch_blocks_from(
index: &Index,
mut height: u64,
Expand All @@ -175,13 +177,16 @@ impl<'index> Updater<'_> {

let first_inscription_height = index.first_inscription_height;

let active_span = Context::current();
let target_height_limit = client.get_block_count()?
- env::var("BLOCKS_BEHIND")
.ok()
.and_then(|blocks_behind| blocks_behind.parse().ok())
.unwrap_or(0);

thread::spawn(move || loop {
let mut span =
global::tracer("ord-kafka").start_with_context("get_block_with_retries", &active_span);
if let Some(height_limit) = height_limit {
if height >= height_limit {
break;
Expand All @@ -205,6 +210,8 @@ impl<'index> Updater<'_> {
break;
}
}

span.end();
});

Ok(rx)
Expand Down Expand Up @@ -325,6 +332,7 @@ impl<'index> Updater<'_> {
Ok((outpoint_sender, value_receiver))
}

#[trace]
fn index_block(
&mut self,
index: &Index,
Expand Down
7 changes: 2 additions & 5 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
clippy::cast_sign_loss
)]

use opentelemetry::global;
use {
self::{
arguments::Arguments,
Expand Down Expand Up @@ -74,8 +75,6 @@ use {
tokio::{runtime::Runtime, task},
};

use opentelemetry::{global, sdk::propagation::TraceContextPropagator};

pub use crate::{
block_rarity::BlockRarity, fee_rate::FeeRate, object::Object, rarity::Rarity, sat::Sat,
sat_point::SatPoint, subcommand::wallet::transaction_builder::TransactionBuilder,
Expand Down Expand Up @@ -171,12 +170,10 @@ pub fn main() {

// Tracer setup
if env::var("DD_SERVICE").is_ok() {
let tracer = tracer::init().unwrap_or_else(|err| {
tracer::init().unwrap_or_else(|err| {
log::error!("Fatal - failed to initialize tracer: {:?}", err);
process::exit(1);
});
global::set_text_map_propagator(TraceContextPropagator::new());
global::set_tracer_provider(tracer.provider().unwrap());
}

ctrlc::set_handler(move || {
Expand Down
4 changes: 2 additions & 2 deletions src/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,10 +203,10 @@ impl Options {

let auth = self.auth()?;

log::info!("Connecting to Bitcoin Core at {}", self.rpc_url());
log::debug!("Connecting to Bitcoin Core at {}", self.rpc_url());

if let Auth::CookieFile(cookie_file) = &auth {
log::info!(
log::debug!(
"Using credentials from cookie file at `{}`",
cookie_file.display()
);
Expand Down
13 changes: 9 additions & 4 deletions src/tracer.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use log::{info, warn};
use opentelemetry::{
global,
sdk::trace::{RandomIdGenerator, Sampler},
trace::TraceError,
};
use opentelemetry_datadog::ApiVersion;
use opentelemetry_datadog::{ApiVersion, DatadogPropagator};
use std::env;

fn get_trace_sample_rate() -> f64 {
Expand Down Expand Up @@ -40,8 +41,8 @@ fn get_agent_url() -> String {
format!("http://{host}:{port}")
}

pub(crate) fn init() -> Result<opentelemetry::sdk::trace::Tracer, TraceError> {
opentelemetry_datadog::new_pipeline()
pub(crate) fn init() -> Result<(), TraceError> {
let tracer = opentelemetry_datadog::new_pipeline()
.with_service_name(env::var("DD_SERVICE").unwrap_or("ord-kafka".to_owned()))
.with_api_version(ApiVersion::Version05)
.with_agent_endpoint(get_agent_url())
Expand All @@ -59,7 +60,11 @@ pub(crate) fn init() -> Result<opentelemetry::sdk::trace::Tracer, TraceError> {
})
.with_id_generator(RandomIdGenerator::default()),
)
.install_simple()
.install_simple()?;
global::set_text_map_propagator(DatadogPropagator::default());
global::set_tracer_provider(tracer.provider().unwrap());

Ok(())
}

pub(crate) fn close() {
Expand Down

0 comments on commit c4c913e

Please sign in to comment.