diff --git a/Cargo.lock b/Cargo.lock index 286c835516..08b8fb4424 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -301,7 +301,7 @@ checksum = "cc6dde6e4ed435a4c1ee4e73592f5ba9da2151af10076cc04858746af9352d09" dependencies = [ "proc-macro2", "quote", - "syn 2.0.27", + "syn 2.0.38", ] [[package]] @@ -426,24 +426,6 @@ dependencies = [ "tower-service", ] -[[package]] -name = "axum-tracing-opentelemetry" -version = "0.14.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "06985105829f176e9a3f113b1c71cc24e08f600ef0df4e70cd90d144f889e19f" -dependencies = [ - "axum", - "futures-core", - "futures-util", - "http", - "opentelemetry", - "pin-project-lite", - "tower", - "tracing", - "tracing-opentelemetry", - "tracing-opentelemetry-instrumentation-sdk", -] - [[package]] name = "backtrace" version = "0.3.68" @@ -973,7 +955,7 @@ checksum = "487585f4d0c6655fe74905e2504d8ad6908e4db67f744eb140876906c2f3175d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.27", + "syn 2.0.38", ] [[package]] @@ -1175,7 +1157,7 @@ checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" dependencies = [ "proc-macro2", "quote", - "syn 2.0.27", + "syn 2.0.38", ] [[package]] @@ -2062,7 +2044,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.27", + "syn 2.0.38", ] [[package]] @@ -2167,7 +2149,6 @@ dependencies = [ "ordered-float", "percent-encoding", "rand 0.8.5", - "regex", "thiserror", "tokio", "tokio-stream", @@ -2188,7 +2169,6 @@ dependencies = [ "axum", "axum-jrpc", "axum-server", - "axum-tracing-opentelemetry", "base64 0.21.2", "bech32", "bip39", @@ -2217,6 +2197,7 @@ dependencies = [ "opentelemetry", "opentelemetry-datadog", "ord-bitcoincore-rpc", + "ord-kafka-macros", "pretty_assertions", "pulldown-cmark", "rdkafka", @@ -2266,6 +2247,14 @@ dependencies = [ "serde_json", ] +[[package]] +name = "ord-kafka-macros" +version = "0.1.0" +dependencies = [ + "quote", + "syn 2.0.38", +] + [[package]] name = "ordered-float" version = "3.9.2" @@ -2350,7 +2339,7 @@ checksum = "ec2e072ecce94ec471b13398d5402c188e76ac03cf74dd1a975161b23a3f6d9c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.27", + "syn 2.0.38", ] [[package]] @@ -2454,9 +2443,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.66" +version = "1.0.69" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "18fb31db3f9bddb2ea821cde30a9f70117e3f119938b5ee630b7403aa6e2ead9" +checksum = "134c189feb4956b20f6f547d2cf727d4c0fe06722b20a0eec87ed445a97f92da" dependencies = [ "unicode-ident", ] @@ -2495,9 +2484,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.32" +version = "1.0.33" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50f3b39ccfb720540debaa0164757101c08ecb8d326b15358ce76a62c7e85965" +checksum = "5267fca4496028628a95160fc423a33e8b2e6af8a5302579e322e4b520293cae" dependencies = [ "proc-macro2", ] @@ -2800,7 +2789,7 @@ dependencies = [ "proc-macro2", "quote", "rust-embed-utils", - "syn 2.0.27", + "syn 2.0.38", "walkdir", ] @@ -3050,7 +3039,7 @@ checksum = "a4e7b8c5dc823e3b90651ff1d3808419cd14e5ad76de04feaf37da114e7a306f" dependencies = [ "proc-macro2", "quote", - "syn 2.0.27", + "syn 2.0.38", ] [[package]] @@ -3132,15 +3121,6 @@ dependencies = [ "digest", ] -[[package]] -name = "sharded-slab" -version = "0.1.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" -dependencies = [ - "lazy_static", -] - [[package]] name = "signal-hook" version = "0.3.17" @@ -3242,9 +3222,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.27" +version = "2.0.38" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b60f673f44a8255b9c8c657daf66a596d435f2da81a555b06dc644d080ba45e0" +checksum = "e96b79aaa137db8f61e26363a0c9b47d8b4ec75da28b7d1d614c2303e232408b" dependencies = [ "proc-macro2", "quote", @@ -3350,17 +3330,7 @@ checksum = "090198534930841fab3a5d1bb637cde49e339654e606195f8d9c76eeb081dc96" dependencies = [ "proc-macro2", "quote", - "syn 2.0.27", -] - -[[package]] -name = "thread_local" -version = "1.1.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3fdd6f064ccff2d6567adcb3873ca630700f00b5ad3f060c25b5dcfd9a4ce152" -dependencies = [ - "cfg-if 1.0.0", - "once_cell", + "syn 2.0.38", ] [[package]] @@ -3442,7 +3412,7 @@ checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.27", + "syn 2.0.38", ] [[package]] @@ -3589,21 +3559,9 @@ dependencies = [ "cfg-if 1.0.0", "log", "pin-project-lite", - "tracing-attributes", "tracing-core", ] -[[package]] -name = "tracing-attributes" -version = "0.1.27" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.27", -] - [[package]] name = "tracing-core" version = "0.1.31" @@ -3611,58 +3569,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0955b8137a1df6f1a2e9a37d8a6656291ff0297c1a97c24e0d8425fe2312f79a" dependencies = [ "once_cell", - "valuable", -] - -[[package]] -name = "tracing-log" -version = "0.1.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "78ddad33d2d10b1ed7eb9d1f518a5674713876e97e5bb9b7345a7984fbb4f922" -dependencies = [ - "lazy_static", - "log", - "tracing-core", -] - -[[package]] -name = "tracing-opentelemetry" -version = "0.21.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75327c6b667828ddc28f5e3f169036cb793c3f588d83bf0f262a7f062ffed3c8" -dependencies = [ - "once_cell", - "opentelemetry", - "opentelemetry_sdk", - "smallvec", - "tracing", - "tracing-core", - "tracing-log", - "tracing-subscriber", -] - -[[package]] -name = "tracing-opentelemetry-instrumentation-sdk" -version = "0.14.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "752ddd669b14a08036a89045e4ac4497ff7ce254e11386647751f36d7c7849ea" -dependencies = [ - "http", - "opentelemetry-http", - "opentelemetry_api", - "tracing", - "tracing-opentelemetry", -] - -[[package]] -name = "tracing-subscriber" -version = "0.3.17" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30a651bc37f915e81f087d86e62a18eec5f79550c7faff886f7090b4ea757c77" -dependencies = [ - "sharded-slab", - "thread_local", - "tracing-core", ] [[package]] @@ -3755,12 +3661,6 @@ version = "2.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da" -[[package]] -name = "valuable" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" - [[package]] name = "value-bag" version = "1.4.1" @@ -3843,7 +3743,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.27", + "syn 2.0.38", "wasm-bindgen-shared", ] @@ -3877,7 +3777,7 @@ checksum = "54681b18a46765f095758388f2d0cf16eb8d4169b639ab575a8f5693af210c7b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.27", + "syn 2.0.38", "wasm-bindgen-backend", "wasm-bindgen-shared", ] diff --git a/Cargo.toml b/Cargo.toml index 2f9676c781..a9e0abebd4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,7 @@ autobins = false rust-version = "1.67" [workspace] -members = [".", "test-bitcoincore-rpc"] +members = [".", "test-bitcoincore-rpc", "ord-kafka-macros"] [dependencies] anyhow = { version = "1.0.56", features = ["backtrace"] } @@ -42,7 +42,6 @@ mime = "0.3.16" mime_guess = "2.0.4" miniscript = "10.0.0" mp4 = "0.13.0" -axum-tracing-opentelemetry = "0.14.1" ord-bitcoincore-rpc = "0.17.0" opentelemetry = { version = "0.20.0", features = ["rt-tokio"] } opentelemetry-datadog = { version = "0.8.0", features = ["reqwest-blocking-client"] } @@ -64,6 +63,7 @@ tower-http = { version = "0.4.0", features = ["compression-br", "compression-gzi rdkafka = { version = "0.33.2" } axum-jrpc = { version = "0.5.1", features = ["serde_json", "anyhow_error"] } +ord-kafka-macros = { path = "ord-kafka-macros" } [dev-dependencies] diff --git a/ord-kafka-macros/Cargo.toml b/ord-kafka-macros/Cargo.toml new file mode 100644 index 0000000000..db9ce9b202 --- /dev/null +++ b/ord-kafka-macros/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "ord-kafka-macros" +version = "0.1.0" +edition = "2021" + +[lib] +proc-macro = true + +[dependencies] +quote = "1.0.33" +syn = { version = "2.0.38", features=[ "full" ] } diff --git a/ord-kafka-macros/src/lib.rs b/ord-kafka-macros/src/lib.rs new file mode 100644 index 0000000000..f4640bf564 --- /dev/null +++ b/ord-kafka-macros/src/lib.rs @@ -0,0 +1,60 @@ +use proc_macro::TokenStream; +use quote::quote; +use syn::{parse_macro_input, ItemFn}; + +/// Trace a function with a given operation name. Example usage: +/// `#[trace]`, the span name will be the function's name. +/// Alternatively, `#[trace("my_span_name")]` will use the given span name +#[proc_macro_attribute] +pub fn trace(args: TokenStream, input: TokenStream) -> TokenStream { + let func = parse_macro_input!(input as ItemFn); + let func_name = &func.sig.ident; + let inputs = &func.sig.inputs; + let output = &func.sig.output; + let block = &func.block; + let async_ident = func.sig.asyncness.is_some(); + + let name = if args.is_empty() { + func_name.to_string() + } else { + parse_macro_input!(args as syn::LitStr).value() + }; + + let expanded = if async_ident { + quote! { + async fn #func_name(#inputs) #output { + use opentelemetry::trace::Span; + let tracer = opentelemetry::global::tracer("ord-kafka"); + let cx = opentelemetry::Context::current(); + let mut span = tracer.start_with_context(#name, &cx); + + let result = { + #block + }; + + span.end(); + + result + } + } + } else { + quote! { + fn #func_name(#inputs) #output { + use opentelemetry::trace::Span; + let tracer = opentelemetry::global::tracer("ord-kafka"); + let cx = opentelemetry::Context::current(); + let mut span = tracer.start_with_context(#name, &cx); + + let result = { + #block + }; + + span.end(); + + result + } + } + }; + + TokenStream::from(expanded) +} diff --git a/src/index/updater.rs b/src/index/updater.rs index fa8690d856..6f7c92de71 100644 --- a/src/index/updater.rs +++ b/src/index/updater.rs @@ -1,4 +1,5 @@ -use opentelemetry::{global, trace::Tracer}; +use opentelemetry::trace::Tracer; +use ord_kafka_macros::trace; use { self::inscription_updater::InscriptionUpdater, super::{fetcher::Fetcher, *}, @@ -91,18 +92,15 @@ impl<'index> Updater<'_> { let mut uncommitted = 0; let mut value_cache = HashMap::new(); - let tracer = global::tracer("updater"); while let Ok(block) = rx.recv() { - tracer.in_span("index_block", |_| { - self.index_block( - self.index, - &mut outpoint_sender, - &mut value_receiver, - &mut wtx, - block, - &mut value_cache, - ) - })?; + self.index_block( + self.index, + &mut outpoint_sender, + &mut value_receiver, + &mut wtx, + block, + &mut value_cache, + )?; if let Some(progress_bar) = &mut progress_bar { progress_bar.inc(1); @@ -162,6 +160,7 @@ impl<'index> Updater<'_> { Ok(()) } + #[trace] fn fetch_blocks_from( index: &Index, mut height: u64, @@ -325,6 +324,7 @@ impl<'index> Updater<'_> { Ok((outpoint_sender, value_receiver)) } + #[trace] fn index_block( &mut self, index: &Index, diff --git a/src/lib.rs b/src/lib.rs index a9a74ac290..d9988f23df 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -74,8 +74,6 @@ use { tokio::{runtime::Runtime, task}, }; -use opentelemetry::{global, sdk::propagation::TraceContextPropagator}; - pub use crate::{ block_rarity::BlockRarity, fee_rate::FeeRate, object::Object, rarity::Rarity, sat::Sat, sat_point::SatPoint, subcommand::wallet::transaction_builder::TransactionBuilder, @@ -171,12 +169,10 @@ pub fn main() { // Tracer setup if env::var("DD_SERVICE").is_ok() { - let tracer = tracer::init().unwrap_or_else(|err| { + tracer::init().unwrap_or_else(|err| { log::error!("Fatal - failed to initialize tracer: {:?}", err); process::exit(1); }); - global::set_text_map_propagator(TraceContextPropagator::new()); - global::set_tracer_provider(tracer.provider().unwrap()); } ctrlc::set_handler(move || { diff --git a/src/options.rs b/src/options.rs index 7d2e5ef8a2..e118f67c89 100644 --- a/src/options.rs +++ b/src/options.rs @@ -203,10 +203,10 @@ impl Options { let auth = self.auth()?; - log::info!("Connecting to Bitcoin Core at {}", self.rpc_url()); + log::debug!("Connecting to Bitcoin Core at {}", self.rpc_url()); if let Auth::CookieFile(cookie_file) = &auth { - log::info!( + log::debug!( "Using credentials from cookie file at `{}`", cookie_file.display() ); diff --git a/src/subcommand/server.rs b/src/subcommand/server.rs index a60cd814b1..b841823aab 100644 --- a/src/subcommand/server.rs +++ b/src/subcommand/server.rs @@ -1,5 +1,5 @@ use axum::routing::post; -use axum_tracing_opentelemetry::middleware::{OtelAxumLayer, OtelInResponseLayer}; +// use axum_tracing_opentelemetry::middleware::{OtelAxumLayer, OtelInResponseLayer}; use { self::{ @@ -44,6 +44,7 @@ use { mod accept_json; mod error; +mod middleware; mod rpc; #[derive(Clone)] @@ -169,9 +170,6 @@ impl Server { }); let router = Router::new() - .layer(OtelInResponseLayer) - //start OpenTelemetry trace on incoming request - .layer(OtelAxumLayer::default()) .route("/", get(Self::home)) .route("/block/:query", get(Self::block)) .route("/blockcount", get(Self::block_count)) @@ -206,6 +204,7 @@ impl Server { // API routes .route("/rpc/v1", post(rpc::handler)) + .layer(axum::middleware::from_fn(middleware::tracing_layer)) .layer(Extension(index)) .layer(Extension(page_config)) .layer(Extension(Arc::new(config))) diff --git a/src/subcommand/server/middleware.rs b/src/subcommand/server/middleware.rs new file mode 100644 index 0000000000..6771378e73 --- /dev/null +++ b/src/subcommand/server/middleware.rs @@ -0,0 +1,30 @@ +use axum::{extract::MatchedPath, http::Request, middleware::Next, response::Response}; +use opentelemetry::{ + global, + trace::{Span, Tracer}, + Key, +}; + +pub(crate) async fn tracing_layer(request: Request, next: Next) -> Response { + let tracer = global::tracer("ord-kafka"); + let cx = opentelemetry::Context::current(); + let route = request + .extensions() + .get::() + .unwrap() + .as_str() + .to_string(); + + let mut span = tracer.start_with_context(route.clone(), &cx); + span.set_attribute(Key::new("http.method").string(request.method().as_str().to_string())); + span.set_attribute(Key::new("http.target").string(request.uri().path().to_string())); + span.set_attribute(Key::new("http.route").string(route)); + + let response = next.run(request).await; + + // Set http response + span.set_attribute(Key::new("http.status_code").i64(i64::from(response.status().as_u16()))); + span.end(); + + response +} diff --git a/src/subcommand/server/rpc.rs b/src/subcommand/server/rpc.rs index 0d12efee68..c5ba5ca489 100644 --- a/src/subcommand/server/rpc.rs +++ b/src/subcommand/server/rpc.rs @@ -8,6 +8,8 @@ use axum_jrpc::{ error::{JsonRpcError, JsonRpcErrorReason}, JrpcResult, JsonRpcExtractor, JsonRpcResponse, }; +use opentelemetry::trace::Tracer; +use ord_kafka_macros::trace; use serde_json::Value; use std::cmp::{max, min}; @@ -41,6 +43,7 @@ async fn get_health(value: JsonRpcExtractor) -> JrpcResult { Ok(JsonRpcResponse::success(answer_id, "OK")) } +#[trace] async fn get_sat_ranges(value: JsonRpcExtractor, index: Arc) -> JrpcResult { #[derive(Deserialize)] struct Req { diff --git a/src/tracer.rs b/src/tracer.rs index 878fb412b8..1d5977a204 100644 --- a/src/tracer.rs +++ b/src/tracer.rs @@ -1,9 +1,10 @@ use log::{info, warn}; use opentelemetry::{ + global, sdk::trace::{RandomIdGenerator, Sampler}, trace::TraceError, }; -use opentelemetry_datadog::ApiVersion; +use opentelemetry_datadog::{ApiVersion, DatadogPropagator}; use std::env; fn get_trace_sample_rate() -> f64 { @@ -40,8 +41,8 @@ fn get_agent_url() -> String { format!("http://{host}:{port}") } -pub(crate) fn init() -> Result { - opentelemetry_datadog::new_pipeline() +pub(crate) fn init() -> Result<(), TraceError> { + let tracer = opentelemetry_datadog::new_pipeline() .with_service_name(env::var("DD_SERVICE").unwrap_or("ord-kafka".to_owned())) .with_api_version(ApiVersion::Version05) .with_agent_endpoint(get_agent_url()) @@ -59,7 +60,11 @@ pub(crate) fn init() -> Result { }) .with_id_generator(RandomIdGenerator::default()), ) - .install_simple() + .install_simple()?; + global::set_text_map_propagator(DatadogPropagator::default()); + global::set_tracer_provider(tracer.provider().unwrap()); + + Ok(()) } pub(crate) fn close() {