diff --git a/Cargo.lock b/Cargo.lock index 286c835516..b51f78307d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -301,7 +301,7 @@ checksum = "cc6dde6e4ed435a4c1ee4e73592f5ba9da2151af10076cc04858746af9352d09" dependencies = [ "proc-macro2", "quote", - "syn 2.0.27", + "syn 2.0.38", ] [[package]] @@ -973,7 +973,7 @@ checksum = "487585f4d0c6655fe74905e2504d8ad6908e4db67f744eb140876906c2f3175d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.27", + "syn 2.0.38", ] [[package]] @@ -1175,7 +1175,7 @@ checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" dependencies = [ "proc-macro2", "quote", - "syn 2.0.27", + "syn 2.0.38", ] [[package]] @@ -2062,7 +2062,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.27", + "syn 2.0.38", ] [[package]] @@ -2217,6 +2217,7 @@ dependencies = [ "opentelemetry", "opentelemetry-datadog", "ord-bitcoincore-rpc", + "ord-kafka-macros", "pretty_assertions", "pulldown-cmark", "rdkafka", @@ -2266,6 +2267,14 @@ dependencies = [ "serde_json", ] +[[package]] +name = "ord-kafka-macros" +version = "0.1.0" +dependencies = [ + "quote", + "syn 2.0.38", +] + [[package]] name = "ordered-float" version = "3.9.2" @@ -2350,7 +2359,7 @@ checksum = "ec2e072ecce94ec471b13398d5402c188e76ac03cf74dd1a975161b23a3f6d9c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.27", + "syn 2.0.38", ] [[package]] @@ -2454,9 +2463,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.66" +version = "1.0.69" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "18fb31db3f9bddb2ea821cde30a9f70117e3f119938b5ee630b7403aa6e2ead9" +checksum = "134c189feb4956b20f6f547d2cf727d4c0fe06722b20a0eec87ed445a97f92da" dependencies = [ "unicode-ident", ] @@ -2495,9 +2504,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.32" +version = "1.0.33" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50f3b39ccfb720540debaa0164757101c08ecb8d326b15358ce76a62c7e85965" +checksum = "5267fca4496028628a95160fc423a33e8b2e6af8a5302579e322e4b520293cae" dependencies = [ "proc-macro2", ] @@ -2800,7 +2809,7 @@ dependencies = [ "proc-macro2", "quote", "rust-embed-utils", - "syn 2.0.27", + "syn 2.0.38", "walkdir", ] @@ -3050,7 +3059,7 @@ checksum = "a4e7b8c5dc823e3b90651ff1d3808419cd14e5ad76de04feaf37da114e7a306f" dependencies = [ "proc-macro2", "quote", - "syn 2.0.27", + "syn 2.0.38", ] [[package]] @@ -3242,9 +3251,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.27" +version = "2.0.38" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b60f673f44a8255b9c8c657daf66a596d435f2da81a555b06dc644d080ba45e0" +checksum = "e96b79aaa137db8f61e26363a0c9b47d8b4ec75da28b7d1d614c2303e232408b" dependencies = [ "proc-macro2", "quote", @@ -3350,7 +3359,7 @@ checksum = "090198534930841fab3a5d1bb637cde49e339654e606195f8d9c76eeb081dc96" dependencies = [ "proc-macro2", "quote", - "syn 2.0.27", + "syn 2.0.38", ] [[package]] @@ -3442,7 +3451,7 @@ checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.27", + "syn 2.0.38", ] [[package]] @@ -3601,7 +3610,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.27", + "syn 2.0.38", ] [[package]] @@ -3843,7 +3852,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.27", + "syn 2.0.38", "wasm-bindgen-shared", ] @@ -3877,7 +3886,7 @@ checksum = "54681b18a46765f095758388f2d0cf16eb8d4169b639ab575a8f5693af210c7b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.27", + "syn 2.0.38", "wasm-bindgen-backend", "wasm-bindgen-shared", ] diff --git a/Cargo.toml b/Cargo.toml index 2f9676c781..b5ba94e40d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,7 @@ autobins = false rust-version = "1.67" [workspace] -members = [".", "test-bitcoincore-rpc"] +members = [".", "test-bitcoincore-rpc", "ord-kafka-macros"] [dependencies] anyhow = { version = "1.0.56", features = ["backtrace"] } @@ -64,6 +64,7 @@ tower-http = { version = "0.4.0", features = ["compression-br", "compression-gzi rdkafka = { version = "0.33.2" } axum-jrpc = { version = "0.5.1", features = ["serde_json", "anyhow_error"] } +ord-kafka-macros = { path = "ord-kafka-macros" } [dev-dependencies] diff --git a/ord-kafka-macros/Cargo.toml b/ord-kafka-macros/Cargo.toml new file mode 100644 index 0000000000..6dd3af6fe1 --- /dev/null +++ b/ord-kafka-macros/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "ord-kafka-macros" +version = "0.1.0" +edition = "2021" + +[lib] +proc-macro = true + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +quote = "1.0.33" +syn = { version = "2.0.38", features=[ "full" ] } diff --git a/ord-kafka-macros/src/lib.rs b/ord-kafka-macros/src/lib.rs new file mode 100644 index 0000000000..10fb83dcf1 --- /dev/null +++ b/ord-kafka-macros/src/lib.rs @@ -0,0 +1,32 @@ +use proc_macro::TokenStream; +use quote::quote; +use syn::{parse_macro_input, ItemFn}; + +/// Trace a function with a given operation name. Example usage: +/// `#[trace]`, the span name will be the function's name. +/// Alternatively, `#[trace("my_span_name")]` will use the given span name +#[proc_macro_attribute] +pub fn trace(args: TokenStream, input: TokenStream) -> TokenStream { + let func = parse_macro_input!(input as ItemFn); + let func_name = &func.sig.ident; + let inputs = &func.sig.inputs; + let output = &func.sig.output; + let block = &func.block; + + let name = if args.is_empty() { + func_name.to_string() + } else { + parse_macro_input!(args as syn::LitStr).value() + }; + + let expanded = quote! { + fn #func_name(#inputs) #output { + let tracer = opentelemetry::global::tracer("ord-kafka"); + tracer.in_span(#name, |_| { + #block + }) + } + }; + + TokenStream::from(expanded) +} diff --git a/src/index/updater.rs b/src/index/updater.rs index fa8690d856..f8d8980c1f 100644 --- a/src/index/updater.rs +++ b/src/index/updater.rs @@ -1,4 +1,8 @@ -use opentelemetry::{global, trace::Tracer}; +use opentelemetry::{ + trace::{Span, Tracer}, + Context, +}; +use ord_kafka_macros::trace; use { self::inscription_updater::InscriptionUpdater, super::{fetcher::Fetcher, *}, @@ -91,18 +95,15 @@ impl<'index> Updater<'_> { let mut uncommitted = 0; let mut value_cache = HashMap::new(); - let tracer = global::tracer("updater"); while let Ok(block) = rx.recv() { - tracer.in_span("index_block", |_| { - self.index_block( - self.index, - &mut outpoint_sender, - &mut value_receiver, - &mut wtx, - block, - &mut value_cache, - ) - })?; + self.index_block( + self.index, + &mut outpoint_sender, + &mut value_receiver, + &mut wtx, + block, + &mut value_cache, + )?; if let Some(progress_bar) = &mut progress_bar { progress_bar.inc(1); @@ -162,6 +163,7 @@ impl<'index> Updater<'_> { Ok(()) } + #[trace] fn fetch_blocks_from( index: &Index, mut height: u64, @@ -175,6 +177,7 @@ impl<'index> Updater<'_> { let first_inscription_height = index.first_inscription_height; + let active_span = Context::current(); let target_height_limit = client.get_block_count()? - env::var("BLOCKS_BEHIND") .ok() @@ -182,6 +185,8 @@ impl<'index> Updater<'_> { .unwrap_or(0); thread::spawn(move || loop { + let mut span = + global::tracer("ord-kafka").start_with_context("get_block_with_retries", &active_span); if let Some(height_limit) = height_limit { if height >= height_limit { break; @@ -205,6 +210,8 @@ impl<'index> Updater<'_> { break; } } + + span.end(); }); Ok(rx) @@ -325,6 +332,7 @@ impl<'index> Updater<'_> { Ok((outpoint_sender, value_receiver)) } + #[trace] fn index_block( &mut self, index: &Index, diff --git a/src/lib.rs b/src/lib.rs index a9a74ac290..f96e14508b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,6 +10,7 @@ clippy::cast_sign_loss )] +use opentelemetry::global; use { self::{ arguments::Arguments, @@ -74,8 +75,6 @@ use { tokio::{runtime::Runtime, task}, }; -use opentelemetry::{global, sdk::propagation::TraceContextPropagator}; - pub use crate::{ block_rarity::BlockRarity, fee_rate::FeeRate, object::Object, rarity::Rarity, sat::Sat, sat_point::SatPoint, subcommand::wallet::transaction_builder::TransactionBuilder, @@ -171,12 +170,10 @@ pub fn main() { // Tracer setup if env::var("DD_SERVICE").is_ok() { - let tracer = tracer::init().unwrap_or_else(|err| { + tracer::init().unwrap_or_else(|err| { log::error!("Fatal - failed to initialize tracer: {:?}", err); process::exit(1); }); - global::set_text_map_propagator(TraceContextPropagator::new()); - global::set_tracer_provider(tracer.provider().unwrap()); } ctrlc::set_handler(move || { diff --git a/src/options.rs b/src/options.rs index 7d2e5ef8a2..e118f67c89 100644 --- a/src/options.rs +++ b/src/options.rs @@ -203,10 +203,10 @@ impl Options { let auth = self.auth()?; - log::info!("Connecting to Bitcoin Core at {}", self.rpc_url()); + log::debug!("Connecting to Bitcoin Core at {}", self.rpc_url()); if let Auth::CookieFile(cookie_file) = &auth { - log::info!( + log::debug!( "Using credentials from cookie file at `{}`", cookie_file.display() ); diff --git a/src/subcommand/server.rs b/src/subcommand/server.rs index a60cd814b1..d3e7901bd0 100644 --- a/src/subcommand/server.rs +++ b/src/subcommand/server.rs @@ -144,17 +144,17 @@ pub(crate) struct Server { impl Server { pub(crate) fn run(self, options: Options, index: Arc, handle: Handle) -> Result { Runtime::new()?.block_on(async { - let index_clone = index.clone(); - let index_thread = thread::spawn(move || loop { - if SHUTTING_DOWN.load(atomic::Ordering::Relaxed) { - break; - } - if let Err(error) = index_clone.update() { - log::warn!("{error}"); - } - thread::sleep(Duration::from_millis(5000)); - }); - INDEXER.lock().unwrap().replace(index_thread); + // let index_clone = index.clone(); + // let index_thread = thread::spawn(move || loop { + // if SHUTTING_DOWN.load(atomic::Ordering::Relaxed) { + // break; + // } + // if let Err(error) = index_clone.update() { + // log::warn!("{error}"); + // } + // thread::sleep(Duration::from_millis(5000)); + // }); + // INDEXER.lock().unwrap().replace(index_thread); let config = options.load_config()?; let acme_domains = self.acme_domains()?; @@ -169,9 +169,6 @@ impl Server { }); let router = Router::new() - .layer(OtelInResponseLayer) - //start OpenTelemetry trace on incoming request - .layer(OtelAxumLayer::default()) .route("/", get(Self::home)) .route("/block/:query", get(Self::block)) .route("/blockcount", get(Self::block_count)) @@ -206,6 +203,9 @@ impl Server { // API routes .route("/rpc/v1", post(rpc::handler)) + .layer(OtelInResponseLayer) + //start OpenTelemetry trace on incoming request + .layer(OtelAxumLayer::default()) .layer(Extension(index)) .layer(Extension(page_config)) .layer(Extension(Arc::new(config))) diff --git a/src/tracer.rs b/src/tracer.rs index 878fb412b8..1d5977a204 100644 --- a/src/tracer.rs +++ b/src/tracer.rs @@ -1,9 +1,10 @@ use log::{info, warn}; use opentelemetry::{ + global, sdk::trace::{RandomIdGenerator, Sampler}, trace::TraceError, }; -use opentelemetry_datadog::ApiVersion; +use opentelemetry_datadog::{ApiVersion, DatadogPropagator}; use std::env; fn get_trace_sample_rate() -> f64 { @@ -40,8 +41,8 @@ fn get_agent_url() -> String { format!("http://{host}:{port}") } -pub(crate) fn init() -> Result { - opentelemetry_datadog::new_pipeline() +pub(crate) fn init() -> Result<(), TraceError> { + let tracer = opentelemetry_datadog::new_pipeline() .with_service_name(env::var("DD_SERVICE").unwrap_or("ord-kafka".to_owned())) .with_api_version(ApiVersion::Version05) .with_agent_endpoint(get_agent_url()) @@ -59,7 +60,11 @@ pub(crate) fn init() -> Result { }) .with_id_generator(RandomIdGenerator::default()), ) - .install_simple() + .install_simple()?; + global::set_text_map_propagator(DatadogPropagator::default()); + global::set_tracer_provider(tracer.provider().unwrap()); + + Ok(()) } pub(crate) fn close() {