Skip to content

Commit

Permalink
refactor: renamed transaction to executioncontext in interpreter
Browse files Browse the repository at this point in the history
  • Loading branch information
jsoverson committed Aug 8, 2023
1 parent 8a435dd commit 8af480a
Show file tree
Hide file tree
Showing 20 changed files with 287 additions and 272 deletions.
2 changes: 1 addition & 1 deletion crates/bins/wick/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ async fn async_start() -> Result<(GlobalOptions, StructuredOutput), (GlobalOptio
logger_opts.global = true;

// Initialize the global logger
let logger = wick_logger::init(&logger_opts);
let mut logger = wick_logger::init(&logger_opts);

let res = async_main(cli, settings).await;

Expand Down
2 changes: 1 addition & 1 deletion crates/wick/flow-graph-interpreter/src/interpreter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ mod test {
{
}

#[test_logger::test]
#[test]
fn test_sync_send() -> Result<()> {
sync_send::<Interpreter>();
Ok(())
Expand Down
40 changes: 20 additions & 20 deletions crates/wick/flow-graph-interpreter/src/interpreter/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use wick_packet::{Invocation, PacketPayload};

pub(crate) use self::error::Error;
use super::executor::error::ExecutionError;
use crate::interpreter::executor::transaction::Transaction;
use crate::interpreter::executor::context::ExecutionContext;

static CHANNEL_SIZE: usize = 50;

Expand All @@ -15,19 +15,19 @@ const CHANNEL_UUID: Uuid = Uuid::from_bytes([

#[derive(Debug)]
pub struct Event {
pub(crate) tx_id: Uuid,
pub(crate) ctx_id: Uuid,
pub(crate) kind: EventKind,
pub(crate) span: Option<Span>,
}

impl Event {
pub(crate) fn new(tx_id: Uuid, kind: EventKind, span: Option<Span>) -> Self {
Self { tx_id, kind, span }
pub(crate) fn new(ctx_id: Uuid, kind: EventKind, span: Option<Span>) -> Self {
Self { ctx_id, kind, span }
}

#[must_use]
pub fn tx_id(&self) -> &Uuid {
&self.tx_id
pub fn ctx_id(&self) -> &Uuid {
&self.ctx_id
}

#[must_use]
Expand All @@ -44,8 +44,8 @@ impl Event {
#[must_use]
pub enum EventKind {
Ping(usize),
TransactionStart(Box<Transaction>),
TransactionDone,
ExecutionStart(Box<ExecutionContext>),
ExecutionDone,
PortData(PortReference),
Invocation(NodeIndex, Box<Invocation>),
CallComplete(CallComplete),
Expand All @@ -56,8 +56,8 @@ impl EventKind {
pub(crate) fn name(&self) -> &str {
match self {
EventKind::Ping(_) => "ping",
EventKind::TransactionStart(_) => "tx_start",
EventKind::TransactionDone => "tx_done",
EventKind::ExecutionStart(_) => "exec_start",
EventKind::ExecutionDone => "exec_done",
EventKind::PortData(_) => "port_data",
EventKind::Invocation(_, _) => "invocation",
EventKind::CallComplete(_) => "call_complete",
Expand Down Expand Up @@ -153,33 +153,33 @@ impl InterpreterDispatchChannel {
});
}

pub(crate) fn dispatch_done(&self, tx_id: Uuid) {
self.dispatch(Event::new(tx_id, EventKind::TransactionDone, self.span.clone()));
pub(crate) fn dispatch_done(&self, ctx_id: Uuid) {
self.dispatch(Event::new(ctx_id, EventKind::ExecutionDone, self.span.clone()));
}

pub(crate) fn dispatch_data(&self, tx_id: Uuid, port: PortReference) {
self.dispatch(Event::new(tx_id, EventKind::PortData(port), self.span.clone()));
pub(crate) fn dispatch_data(&self, ctx_id: Uuid, port: PortReference) {
self.dispatch(Event::new(ctx_id, EventKind::PortData(port), self.span.clone()));
}

pub(crate) fn dispatch_close(&self, error: Option<ExecutionError>) {
self.dispatch(Event::new(CHANNEL_UUID, EventKind::Close(error), self.span.clone()));
}

pub(crate) fn dispatch_start(&self, tx: Box<Transaction>) {
self.dispatch(Event::new(tx.id(), EventKind::TransactionStart(tx), self.span.clone()));
pub(crate) fn dispatch_start(&self, ctx: Box<ExecutionContext>) {
self.dispatch(Event::new(ctx.id(), EventKind::ExecutionStart(ctx), self.span.clone()));
}

pub(crate) fn dispatch_call_complete(&self, tx_id: Uuid, op_index: usize) {
pub(crate) fn dispatch_call_complete(&self, ctx_id: Uuid, op_index: usize) {
self.dispatch(Event::new(
tx_id,
ctx_id,
EventKind::CallComplete(CallComplete::new(op_index)),
self.span.clone(),
));
}

pub(crate) fn dispatch_op_err(&self, tx_id: Uuid, op_index: usize, signal: PacketPayload) {
pub(crate) fn dispatch_op_err(&self, ctx_id: Uuid, op_index: usize, signal: PacketPayload) {
self.dispatch(Event::new(
tx_id,
ctx_id,
EventKind::CallComplete(CallComplete {
index: op_index,
err: Some(signal),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ impl Operation for Op {
(CaseId::Match(CaseValue(&case.case)), &case.case_do, case.with.clone())
},
);
let span = trace_span!(parent:&invocation.span,"switch:case:handler",%condition);
let span = trace_span!(parent:&invocation.span,"switch:case:handler",otel.name=format!("case:{}",condition),%condition);
router.push(Condition::new(
condition,
condition_level,
Expand Down
25 changes: 10 additions & 15 deletions crates/wick/flow-graph-interpreter/src/interpreter/event_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ async fn event_loop(
let task = tokio::time::timeout(EventLoop::WAKE_TIMEOUT, channel.accept());
match task.await {
Ok(Some(event)) => {
let tx_id = event.tx_id;
let ctx_id = event.ctx_id;

if let Some(observer) = &observer {
observer.on_event(num, &event);
Expand All @@ -123,22 +123,17 @@ async fn event_loop(
let name = event.name().to_owned();
let tx_span = event.span.unwrap_or_else(Span::current);

tx_span.in_scope(|| debug!(event = ?event.kind, tx_id = ?tx_id));
tx_span.in_scope(|| debug!(event = ?event.kind, ctx_id = ?ctx_id));

let result = match event.kind {
EventKind::Invocation(_index, _invocation) => {
error!("invocation not supported");
panic!("invocation not supported")
}
EventKind::CallComplete(data) => state.handle_call_complete(tx_id, data).instrument(tx_span).await,
EventKind::PortData(data) => state.handle_port_data(tx_id, data).instrument(tx_span).await,
EventKind::TransactionDone => state.handle_transaction_done(tx_id).instrument(tx_span).await,
EventKind::TransactionStart(transaction) => {
state
.handle_transaction_start(*transaction, &options)
.instrument(tx_span)
.await
}
EventKind::CallComplete(data) => state.handle_call_complete(ctx_id, data).instrument(tx_span).await,
EventKind::PortData(data) => state.handle_port_data(ctx_id, data).instrument(tx_span).await,
EventKind::ExecutionDone => state.handle_exec_done(ctx_id).instrument(tx_span).await,
EventKind::ExecutionStart(context) => state.handle_exec_start(*context, &options).instrument(tx_span).await,
EventKind::Ping(ping) => {
trace!(ping);
Ok(())
Expand All @@ -156,9 +151,9 @@ async fn event_loop(
};

if let Err(e) = result {
warn!(event = %name, tx_id = ?tx_id, response_error = %e, "iteration:end");
warn!(event = %name, ctx_id = ?ctx_id, response_error = %e, "iteration:end");
} else {
trace!(event = %name, tx_id = ?tx_id, "iteration:end");
trace!(event = %name, ctx_id = ?ctx_id, "iteration:end");
}

if let Some(observer) = &observer {
Expand All @@ -171,7 +166,7 @@ async fn event_loop(
}
Err(_) => {
if let Err(error) = state.run_cleanup() {
error!(%error,"Error checking hung transactions");
error!(%error,"Error checking hung invocations");
channel.dispatcher(None).dispatch_close(Some(error));
};
}
Expand Down Expand Up @@ -204,7 +199,7 @@ mod test {
{
}

#[test_logger::test]
#[test]
fn test_sync_send() -> Result<()> {
sync_send::<EventLoop>();
Ok(())
Expand Down
Loading

0 comments on commit 8af480a

Please sign in to comment.