From 62ca8fa5cb0ac47bbfbad4b1939bcfe7d4cdfb6b Mon Sep 17 00:00:00 2001 From: Jonas Platte <158304798+svix-jplatte@users.noreply.github.com> Date: Fri, 17 May 2024 15:13:45 +0200 Subject: [PATCH] Add tracing spans for sending queue messages (#86) --- omniqueue/src/backends/azure_queue_storage.rs | 8 ++++++++ omniqueue/src/backends/gcp_pubsub.rs | 5 +++++ omniqueue/src/backends/rabbitmq.rs | 10 ++++++++++ omniqueue/src/backends/redis/mod.rs | 10 ++++++++++ omniqueue/src/backends/sqs.rs | 8 ++++++++ 5 files changed, 41 insertions(+) diff --git a/omniqueue/src/backends/azure_queue_storage.rs b/omniqueue/src/backends/azure_queue_storage.rs index 0257cf5..993df46 100644 --- a/omniqueue/src/backends/azure_queue_storage.rs +++ b/omniqueue/src/backends/azure_queue_storage.rs @@ -103,6 +103,14 @@ impl AqsProducer { self.send_raw_scheduled(payload, Duration::ZERO).await } + #[tracing::instrument( + name = "send", + skip_all, + fields( + payload_size = payload.len(), + delay = (delay > Duration::ZERO).then(|| tracing::field::debug(delay)) + ) + )] pub async fn send_raw_scheduled(&self, payload: &str, delay: Duration) -> Result<()> { self.client .put_message(payload) diff --git a/omniqueue/src/backends/gcp_pubsub.rs b/omniqueue/src/backends/gcp_pubsub.rs index 9f128b3..17b4988 100644 --- a/omniqueue/src/backends/gcp_pubsub.rs +++ b/omniqueue/src/backends/gcp_pubsub.rs @@ -126,6 +126,11 @@ impl GcpPubSubProducer { }) } + #[tracing::instrument( + name = "send", + skip_all, + fields(payload_size = payload.len()) + )] pub async fn send_raw(&self, payload: &[u8]) -> Result<()> { let msg = PubsubMessage { data: payload.to_vec(), diff --git a/omniqueue/src/backends/rabbitmq.rs b/omniqueue/src/backends/rabbitmq.rs index 77c0ce5..a550b28 100644 --- a/omniqueue/src/backends/rabbitmq.rs +++ b/omniqueue/src/backends/rabbitmq.rs @@ -164,6 +164,11 @@ impl RabbitMqProducer { Ok(()) } + #[tracing::instrument( + name = "send", + skip_all, + fields(payload_size = payload.len()) + )] pub async fn send_raw(&self, payload: &[u8]) -> Result<()> { self.send_raw_with_headers(payload, None).await } @@ -173,6 +178,11 @@ impl RabbitMqProducer { self.send_raw(&payload).await } + #[tracing::instrument( + name = "send", + skip_all, + fields(payload_size = payload.len(), delay) + )] pub async fn send_raw_scheduled(&self, payload: &[u8], delay: Duration) -> Result<()> { let mut headers = FieldTable::default(); diff --git a/omniqueue/src/backends/redis/mod.rs b/omniqueue/src/backends/redis/mod.rs index 83fa1b2..22511c0 100644 --- a/omniqueue/src/backends/redis/mod.rs +++ b/omniqueue/src/backends/redis/mod.rs @@ -533,6 +533,11 @@ pub struct RedisProducer { } impl RedisProducer { + #[tracing::instrument( + name = "send", + skip_all, + fields(payload_size = payload.len()) + )] pub async fn send_raw(&self, payload: &[u8]) -> Result<()> { if self.use_redis_streams { streams::send_raw(self, payload).await @@ -546,6 +551,11 @@ impl RedisProducer { self.send_raw(&payload).await } + #[tracing::instrument( + name = "send", + skip_all, + fields(payload_size = payload.len(), delay) + )] pub async fn send_raw_scheduled(&self, payload: &[u8], delay: Duration) -> Result<()> { let timestamp = unix_timestamp(SystemTime::now() + delay).map_err(QueueError::generic)?; diff --git a/omniqueue/src/backends/sqs.rs b/omniqueue/src/backends/sqs.rs index ced81ff..35b5112 100644 --- a/omniqueue/src/backends/sqs.rs +++ b/omniqueue/src/backends/sqs.rs @@ -240,6 +240,14 @@ impl SqsProducer { self.send_raw(&payload).await } + #[tracing::instrument( + name = "send", + skip_all, + fields( + payload_size = payload.len(), + delay = (delay > Duration::ZERO).then(|| tracing::field::debug(delay)) + ) + )] pub async fn send_raw_scheduled(&self, payload: &str, delay: Duration) -> Result<()> { if payload.len() > MAX_PAYLOAD_SIZE { return Err(QueueError::PayloadTooLarge {