Skip to content

Commit

Permalink
feat: reorganized tracing span relationships
Browse files Browse the repository at this point in the history
  • Loading branch information
jsoverson committed Aug 3, 2023
1 parent 7968fb0 commit 073f6dc
Show file tree
Hide file tree
Showing 29 changed files with 568 additions and 506 deletions.
428 changes: 203 additions & 225 deletions Cargo.lock

Large diffs are not rendered by default.

11 changes: 6 additions & 5 deletions crates/wick/flow-graph-interpreter/src/interpreter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ impl Interpreter {
callback: Arc<RuntimeCallback>,
parent_span: &Span,
) -> Result<Self, Error> {
let span = trace_span!("interpreter");
span.follows_from(parent_span);
let span = trace_span!(parent: parent_span, "interpreter");

let _guard = span.enter();
let mut handlers = components.unwrap_or_default();
debug!(handlers = ?handlers.keys(), "initializing interpreter");
Expand Down Expand Up @@ -105,7 +105,7 @@ impl Interpreter {
program.validate()?;

let channel = InterpreterChannel::new();
let dispatcher = channel.dispatcher();
let dispatcher = channel.dispatcher(Some(span.clone()));

// Make the self:: component
let components = Arc::new(handlers);
Expand All @@ -117,7 +117,7 @@ impl Interpreter {

debug!(?signature, "signature");

let event_loop = EventLoop::new(channel);
let event_loop = EventLoop::new(channel, &span);
let mut handled_opts = program.operations().iter().map(|s| s.name()).collect::<Vec<_>>();
handled_opts.extend(exposed_ops.keys().map(|s: &String| s.as_str()));
debug!(
Expand Down Expand Up @@ -234,7 +234,8 @@ impl Component for Interpreter {
}
hosted
};
let span = trace_span!("invoke", tx_id = %invocation.tx_id);
let span = invocation.span.clone();

span.in_scope(|| trace!(?invocation, "invoking"));
let from_exposed = self.exposed_ops.get(invocation.target.operation_id());

Expand Down
47 changes: 31 additions & 16 deletions crates/wick/flow-graph-interpreter/src/interpreter/channel.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use flow_graph::{NodeIndex, PortReference};
use tracing::Span;
use uuid::Uuid;
use wick_packet::{Invocation, PacketPayload};

Expand All @@ -16,11 +17,12 @@ const CHANNEL_UUID: Uuid = Uuid::from_bytes([
pub struct Event {
pub(crate) tx_id: Uuid,
pub(crate) kind: EventKind,
pub(crate) span: Option<Span>,
}

impl Event {
pub(crate) fn new(tx_id: Uuid, kind: EventKind) -> Self {
Self { tx_id, kind }
pub(crate) fn new(tx_id: Uuid, kind: EventKind, span: Option<Span>) -> Self {
Self { tx_id, kind, span }
}

#[must_use]
Expand Down Expand Up @@ -108,8 +110,8 @@ impl InterpreterChannel {
Self { sender, receiver }
}

pub(crate) fn dispatcher(&self) -> InterpreterDispatchChannel {
InterpreterDispatchChannel::new(self.sender.clone())
pub(crate) fn dispatcher(&self, span: Option<Span>) -> InterpreterDispatchChannel {
InterpreterDispatchChannel::new(self.sender.clone(), span)
}

pub(crate) async fn accept(&mut self) -> Option<Event> {
Expand All @@ -119,6 +121,7 @@ impl InterpreterChannel {

#[derive(Clone)]
pub(crate) struct InterpreterDispatchChannel {
span: Option<Span>,
sender: tokio::sync::mpsc::Sender<Event>,
}

Expand All @@ -129,8 +132,15 @@ impl std::fmt::Debug for InterpreterDispatchChannel {
}

impl InterpreterDispatchChannel {
fn new(sender: tokio::sync::mpsc::Sender<Event>) -> Self {
Self { sender }
fn new(sender: tokio::sync::mpsc::Sender<Event>, span: Option<Span>) -> Self {
Self { sender, span }
}

pub(crate) fn with_span(self, span: Span) -> Self {
Self {
sender: self.sender,
span: Some(span),
}
}

pub(crate) fn dispatch(&self, event: Event) {
Expand All @@ -144,23 +154,27 @@ impl InterpreterDispatchChannel {
}

pub(crate) fn dispatch_done(&self, tx_id: Uuid) {
self.dispatch(Event::new(tx_id, EventKind::TransactionDone));
self.dispatch(Event::new(tx_id, EventKind::TransactionDone, self.span.clone()));
}

pub(crate) fn dispatch_data(&self, tx_id: Uuid, port: PortReference) {
self.dispatch(Event::new(tx_id, EventKind::PortData(port)));
self.dispatch(Event::new(tx_id, EventKind::PortData(port), self.span.clone()));
}

pub(crate) fn dispatch_close(&self, error: Option<ExecutionError>) {
self.dispatch(Event::new(CHANNEL_UUID, EventKind::Close(error)));
self.dispatch(Event::new(CHANNEL_UUID, EventKind::Close(error), self.span.clone()));
}

pub(crate) fn dispatch_start(&self, tx: Box<Transaction>) {
self.dispatch(Event::new(tx.id(), EventKind::TransactionStart(tx)));
self.dispatch(Event::new(tx.id(), EventKind::TransactionStart(tx), self.span.clone()));
}

pub(crate) fn dispatch_call_complete(&self, tx_id: Uuid, op_index: usize) {
self.dispatch(Event::new(tx_id, EventKind::CallComplete(CallComplete::new(op_index))));
self.dispatch(Event::new(
tx_id,
EventKind::CallComplete(CallComplete::new(op_index)),
self.span.clone(),
));
}

pub(crate) fn dispatch_op_err(&self, tx_id: Uuid, op_index: usize, signal: PacketPayload) {
Expand All @@ -170,6 +184,7 @@ impl InterpreterDispatchChannel {
index: op_index,
err: Some(signal),
}),
self.span.clone(),
));
}
}
Expand Down Expand Up @@ -206,9 +221,9 @@ mod test {
async fn test_channel() -> anyhow::Result<()> {
let mut channel = InterpreterChannel::new();

let child1 = channel.dispatcher();
let child2 = channel.dispatcher();
let child3 = channel.dispatcher();
let child1 = channel.dispatcher(None);
let child2 = channel.dispatcher(None);
let child3 = channel.dispatcher(None);

let join_handle = tokio::task::spawn(async move {
println!("Handling requests");
Expand All @@ -232,14 +247,14 @@ mod test {
tokio::spawn(async move {
let num = 1;
println!("Child 1 PING({})", num);
child1.dispatch(Event::new(Uuid::new_v4(), EventKind::Ping(num)));
child1.dispatch(Event::new(Uuid::new_v4(), EventKind::Ping(num), None));
})
.await?;

tokio::spawn(async move {
let num = 2;
println!("Child 2 PING({})", num);
child2.dispatch(Event::new(Uuid::new_v4(), EventKind::Ping(num)));
child2.dispatch(Event::new(Uuid::new_v4(), EventKind::Ping(num), None));
})
.await?;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ impl Operation for Op {
(CaseId::Match(CaseValue(&case.case)), &case.case_do, case.with.clone())
},
);
let span = invocation.following_span(trace_span!("switch:case:handler",%condition));
let span = trace_span!(parent:&invocation.span,"switch:case:handler",%condition);
router.push(Condition::new(
condition,
condition_level,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::sync::Arc;

use flow_component::{Component, ComponentError, RuntimeCallback};
use tracing_futures::Instrument;
use wick_interface_types::ComponentSignature;
use wick_packet::{Invocation, PacketStream, RuntimeConfig};

Expand Down Expand Up @@ -101,9 +100,8 @@ impl Component for SelfComponent {
.ok_or_else(|| Error::SchematicNotFound(operation.clone()));

Box::pin(async move {
let span = trace_span!("ns_self", name = %operation);
match fut {
Ok(fut) => fut.instrument(span).await.map_err(ComponentError::new),
Ok(fut) => fut.await.map_err(ComponentError::new),
Err(e) => Err(ComponentError::new(e)),
}
})
Expand Down
35 changes: 19 additions & 16 deletions crates/wick/flow-graph-interpreter/src/interpreter/event_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,23 +26,24 @@ impl EventLoop {
pub(crate) const STALLED_TX_TIMEOUT: Duration = Duration::from_secs(60 * 5);
pub(crate) const SLOW_TX_TIMEOUT: Duration = Duration::from_secs(15);

pub(super) fn new(channel: InterpreterChannel) -> Self {
let dispatcher = channel.dispatcher();
let span = debug_span!("event_loop");
span.follows_from(Span::current());
pub(super) fn new(channel: InterpreterChannel, span: &Span) -> Self {
let event_span = debug_span!("event_loop");
event_span.follows_from(span);
let dispatcher = channel.dispatcher(Some(event_span.clone()));

Self {
channel: Some(channel),
dispatcher,
task: Mutex::new(None),
span,
span: event_span,
}
}

pub(super) async fn start(&mut self, options: InterpreterOptions, observer: Option<Box<dyn Observer + Send + Sync>>) {
let channel = self.channel.take().unwrap();

let span = self.span.clone();
let handle = tokio::spawn(async move { event_loop(channel, options, observer).instrument(span).await });
let handle = tokio::spawn(async move { event_loop(channel, options, observer, span).await });
let mut lock = self.task.lock();
lock.replace(handle);
}
Expand Down Expand Up @@ -102,9 +103,10 @@ async fn event_loop(
mut channel: InterpreterChannel,
options: InterpreterOptions,
observer: Option<Box<dyn Observer + Send + Sync>>,
_span: Span,
) -> Result<(), ExecutionError> {
debug!(?options, "started");
let mut state = State::new(channel.dispatcher());
let mut state = State::new(channel.dispatcher(None));

let mut num: usize = 0;

Expand All @@ -118,22 +120,23 @@ async fn event_loop(
observer.on_event(num, &event);
}

let evt_span = trace_span!(parent:&Span::current(),"event", otel.name = event.name(), i = num, %tx_id);
evt_span.in_scope(|| debug!(event = ?event, tx_id = ?tx_id));
let name = event.name().to_owned();
let tx_span = event.span.unwrap_or_else(Span::current);

tx_span.in_scope(|| debug!(event = ?event.kind, tx_id = ?tx_id));

let result = match event.kind {
EventKind::Invocation(_index, _invocation) => {
error!("invocation not supported");
panic!("invocation not supported")
}
EventKind::CallComplete(data) => state.handle_call_complete(tx_id, data).instrument(evt_span).await,
EventKind::PortData(data) => state.handle_port_data(tx_id, data).instrument(evt_span).await,
EventKind::TransactionDone => state.handle_transaction_done(tx_id).instrument(evt_span).await,
EventKind::CallComplete(data) => state.handle_call_complete(tx_id, data).instrument(tx_span).await,
EventKind::PortData(data) => state.handle_port_data(tx_id, data).instrument(tx_span).await,
EventKind::TransactionDone => state.handle_transaction_done(tx_id).instrument(tx_span).await,
EventKind::TransactionStart(transaction) => {
state
.handle_transaction_start(*transaction, &options)
.instrument(evt_span)
.instrument(tx_span)
.await
}
EventKind::Ping(ping) => {
Expand All @@ -142,11 +145,11 @@ async fn event_loop(
}
EventKind::Close(error) => match error {
Some(error) => {
evt_span.in_scope(|| error!(%error,"stopped with error"));
tx_span.in_scope(|| error!(%error,"stopped with error"));
break Err(error);
}
None => {
evt_span.in_scope(|| debug!("stopping"));
tx_span.in_scope(|| debug!("stopping"));
break Ok(());
}
},
Expand All @@ -169,7 +172,7 @@ async fn event_loop(
Err(_) => {
if let Err(error) = state.run_cleanup() {
error!(%error,"Error checking hung transactions");
channel.dispatcher().dispatch_close(Some(error));
channel.dispatcher(None).dispatch_close(Some(error));
};
}
}
Expand Down
Loading

0 comments on commit 073f6dc

Please sign in to comment.