Skip to content

Commit

Permalink
Merge pull request #2213 from carlokok/feat/multiple-triggertypes
Browse files Browse the repository at this point in the history
feat(up): Spawn multiple trigger commands
  • Loading branch information
itowlson authored Jan 23, 2024
2 parents c4fcb8f + b8490df commit 4ed8f53
Show file tree
Hide file tree
Showing 9 changed files with 154 additions and 60 deletions.
31 changes: 31 additions & 0 deletions crates/app/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#![deny(missing_docs)]

mod host_component;
use serde_json::Value;
pub use spin_locked_app::locked;
pub use spin_locked_app::values;
pub use spin_locked_app::{Error, MetadataKey, Result};
Expand Down Expand Up @@ -207,6 +208,36 @@ impl<'a, L> App<'a, L> {
.map(|locked| AppTrigger { app: self, locked })
}

/// Returns the trigger metadata for a specific trigger type.
pub fn get_trigger_metadata<'this, T: Deserialize<'this> + Default>(
&'this self,
trigger_type: &'a str,
) -> Result<Option<T>> {
let Some(value) = self.get_trigger_metadata_value(trigger_type) else {
return Ok(None);
};
let metadata = T::deserialize(value).map_err(|err| {
Error::MetadataError(format!(
"invalid metadata value for {trigger_type:?}: {err:?}"
))
})?;
Ok(Some(metadata))
}

fn get_trigger_metadata_value(&self, trigger_type: &str) -> Option<Value> {
if let Some(trigger_configs) = self.locked.metadata.get("triggers") {
// New-style: `{"triggers": {"<type>": {...}}}`
trigger_configs.get(trigger_type).cloned()
} else if self.locked.metadata["trigger"]["type"] == trigger_type {
// Old-style: `{"trigger": {"type": "<type>", ...}}`
let mut meta = self.locked.metadata["trigger"].clone();
meta.as_object_mut().unwrap().remove("type");
Some(meta)
} else {
None
}
}

/// Returns an iterator of [`AppTrigger`]s defined for this app with
/// the given `trigger_type`.
pub fn triggers_with_type(
Expand Down
2 changes: 0 additions & 2 deletions crates/http/src/trigger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ pub const METADATA_KEY: MetadataKey<Metadata> = MetadataKey::new("trigger");
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
pub struct Metadata {
// The type of trigger which should always been "http" in this case
pub r#type: String,
// The based url
#[serde(default = "default_base")]
pub base: String,
Expand Down
9 changes: 4 additions & 5 deletions crates/redis/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,11 @@ use anyhow::{anyhow, Context, Result};
use futures::{future::join_all, StreamExt};
use redis::{Client, ConnectionLike};
use serde::{de::IgnoredAny, Deserialize, Serialize};
use spin_app::MetadataKey;
use spin_core::async_trait;
use spin_trigger::{cli::NoArgs, TriggerAppEngine, TriggerExecutor};

use crate::spin::SpinRedisExecutor;

const TRIGGER_METADATA_KEY: MetadataKey<TriggerMetadata> = MetadataKey::new("trigger");

pub(crate) type RuntimeData = ();
pub(crate) type Store = spin_core::Store<RuntimeData>;

Expand Down Expand Up @@ -44,7 +41,6 @@ pub struct RedisTriggerConfig {
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
struct TriggerMetadata {
r#type: String,
address: String,
}

Expand All @@ -56,7 +52,10 @@ impl TriggerExecutor for RedisTrigger {
type RunConfig = NoArgs;

async fn new(engine: TriggerAppEngine<Self>) -> Result<Self> {
let address = engine.app().require_metadata(TRIGGER_METADATA_KEY)?.address;
let address = engine
.trigger_metadata::<TriggerMetadata>()?
.unwrap_or_default()
.address;

let mut channel_components: HashMap<String, Vec<String>> = HashMap::new();

Expand Down
5 changes: 3 additions & 2 deletions crates/trigger-http/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,10 @@ impl TriggerExecutor for HttpTrigger {

async fn new(engine: TriggerAppEngine<Self>) -> Result<Self> {
let mut base = engine
.app()
.require_metadata(spin_http::trigger::METADATA_KEY)?
.trigger_metadata::<spin_http::trigger::Metadata>()?
.unwrap_or_default()
.base;

if !base.starts_with('/') {
base = format!("/{base}");
}
Expand Down
4 changes: 4 additions & 0 deletions crates/trigger/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,10 @@ impl<Executor: TriggerExecutor> TriggerAppEngine<Executor> {
self.app.borrowed()
}

pub fn trigger_metadata<T: DeserializeOwned + Default>(&self) -> spin_app::Result<Option<T>> {
self.app().get_trigger_metadata(Executor::TRIGGER_TYPE)
}

/// Returns AppTriggers and typed TriggerConfigs for this executor type.
pub fn trigger_configs(&self) -> impl Iterator<Item = (AppTrigger, &Executor::TriggerConfig)> {
self.app()
Expand Down
13 changes: 10 additions & 3 deletions examples/spin-timer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,16 @@ pub struct TimerTrigger {
component_timings: HashMap<String, u64>,
}

// Application settings (raw serialization format)
// Picks out the timer entry from the application-level trigger settings
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
struct TriggerMetadataParent {
timer: Option<TriggerMetadata>,
}

// Application-level settings (raw serialization format)
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
struct TriggerMetadata {
r#type: String,
speedup: Option<u64>,
}

Expand All @@ -45,7 +50,7 @@ pub struct TimerTriggerConfig {
interval_secs: u64,
}

const TRIGGER_METADATA_KEY: MetadataKey<TriggerMetadata> = MetadataKey::new("trigger");
const TRIGGER_METADATA_KEY: MetadataKey<TriggerMetadataParent> = MetadataKey::new("triggers");

#[async_trait]
impl TriggerExecutor for TimerTrigger {
Expand All @@ -61,6 +66,8 @@ impl TriggerExecutor for TimerTrigger {
let speedup = engine
.app()
.require_metadata(TRIGGER_METADATA_KEY)?
.timer
.unwrap_or_default()
.speedup
.unwrap_or(1);

Expand Down
2 changes: 1 addition & 1 deletion examples/spin-timer/trigger-timer.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"description": "Run Spin components at timed intervals",
"homepage": "https://github.com/fermyon/spin/tree/main/examples/spin-timer",
"version": "0.1.0",
"spinCompatibility": ">=2.0",
"spinCompatibility": ">=2.2",
"license": "Apache-2.0",
"packages": [
{
Expand Down
143 changes: 99 additions & 44 deletions src/commands/up.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::{

use anyhow::{anyhow, bail, Context, Result};
use clap::{CommandFactory, Parser};
use itertools::Itertools;
use reqwest::Url;
use spin_app::locked::LockedApp;
use spin_common::ui::quoted_path;
Expand All @@ -16,6 +17,8 @@ use spin_oci::OciLoader;
use spin_trigger::cli::{SPIN_LOCAL_APP_DIR, SPIN_LOCKED_URL, SPIN_WORKING_DIR};
use tempfile::TempDir;

use futures::StreamExt;

use crate::opts::*;

use self::app_source::{AppSource, ResolvedAppSource};
Expand Down Expand Up @@ -128,9 +131,11 @@ impl UpCommand {

if app_source == AppSource::None {
if self.help {
return self
.run_trigger(trigger_command(HELP_ARGS_ONLY_TRIGGER_TYPE), None)
.await;
let mut child = self
.start_trigger(trigger_command(HELP_ARGS_ONLY_TRIGGER_TYPE), None)
.await?;
let _ = child.wait().await?;
return Ok(());
} else {
bail!("Default file '{DEFAULT_MANIFEST_FILE}' not found. Run `spin up --from <APPLICATION>`, or `spin up --help` for usage.");
}
Expand All @@ -150,28 +155,51 @@ impl UpCommand {

let resolved_app_source = self.resolve_app_source(&app_source, &working_dir).await?;

let trigger_cmd = trigger_command_for_resolved_app_source(&resolved_app_source)
let trigger_cmds = trigger_command_for_resolved_app_source(&resolved_app_source)
.with_context(|| format!("Couldn't find trigger executor for {app_source}"))?;

if self.help {
return self.run_trigger(trigger_cmd, None).await;
for cmd in trigger_cmds {
let mut help_process = self.start_trigger(cmd.clone(), None).await?;
_ = help_process.wait().await;
}
return Ok(());
}

let mut locked_app = self
.load_resolved_app_source(resolved_app_source, &working_dir)
.await?;

self.update_locked_app(&mut locked_app);
let locked_url = self.write_locked_app(&locked_app, &working_dir).await?;

let local_app_dir = app_source.local_app_dir().map(Into::into);

let run_opts = RunTriggerOpts {
locked_app,
locked_url,
working_dir,
local_app_dir,
};

self.run_trigger(trigger_cmd, Some(run_opts)).await
let mut trigger_processes = self.start_trigger_processes(trigger_cmds, run_opts).await?;

set_kill_on_ctrl_c(&trigger_processes)?;

let mut trigger_tasks = trigger_processes
.iter_mut()
.map(|ch| ch.wait())
.collect::<futures::stream::FuturesUnordered<_>>();

let first_to_finish = trigger_tasks.next().await;

if let Some(process_result) = first_to_finish {
let status = process_result?;
if !status.success() {
return Err(crate::subprocess::ExitStatusError::new(status).into());
}
}

Ok(())
}

fn get_canonical_working_dir(&self) -> Result<WorkingDirectory, anyhow::Error> {
Expand All @@ -190,57 +218,57 @@ impl UpCommand {
Ok(working_dir_holder)
}

async fn run_trigger(
async fn start_trigger_processes(
self,
trigger_cmds: Vec<Vec<String>>,
run_opts: RunTriggerOpts,
) -> anyhow::Result<Vec<tokio::process::Child>> {
let mut trigger_processes = Vec::with_capacity(trigger_cmds.len());

for cmd in trigger_cmds {
let child = self
.start_trigger(cmd.clone(), Some(run_opts.clone()))
.await
.context("Failed to start trigger process")?;
trigger_processes.push(child);
}

Ok(trigger_processes)
}

async fn start_trigger(
&self,
trigger_cmd: Vec<String>,
opts: Option<RunTriggerOpts>,
) -> Result<(), anyhow::Error> {
) -> Result<tokio::process::Child, anyhow::Error> {
// The docs for `current_exe` warn that this may be insecure because it could be executed
// via hard-link. I think it should be fine as long as we aren't `setuid`ing this binary.
let mut cmd = std::process::Command::new(std::env::current_exe().unwrap());
let mut cmd = tokio::process::Command::new(std::env::current_exe().unwrap());
cmd.args(&trigger_cmd);

if let Some(RunTriggerOpts {
locked_app,
locked_url,
working_dir,
local_app_dir,
}) = opts
{
let locked_url = self.write_locked_app(&locked_app, &working_dir).await?;

cmd.env(SPIN_LOCKED_URL, locked_url)
.env(SPIN_WORKING_DIR, &working_dir)
.args(&self.trigger_args);

if let Some(local_app_dir) = local_app_dir {
cmd.env(SPIN_LOCAL_APP_DIR, local_app_dir);
}

cmd.kill_on_drop(true);
} else {
cmd.arg("--help-args-only");
}

tracing::trace!("Running trigger executor: {:?}", cmd);

let mut child = cmd.spawn().context("Failed to execute trigger")?;

// Terminate trigger executor if `spin up` itself receives a termination signal
#[cfg(not(windows))]
{
// https://github.com/nix-rust/nix/issues/656
let pid = nix::unistd::Pid::from_raw(child.id() as i32);
ctrlc::set_handler(move || {
if let Err(err) = nix::sys::signal::kill(pid, nix::sys::signal::SIGTERM) {
tracing::warn!("Failed to kill trigger handler process: {:?}", err)
}
})?;
}

let status = child.wait()?;
if status.success() {
Ok(())
} else {
Err(crate::subprocess::ExitStatusError::new(status).into())
}
let child = cmd.spawn().context("Failed to execute trigger")?;
Ok(child)
}

fn app_source(&self) -> AppSource {
Expand Down Expand Up @@ -358,8 +386,31 @@ impl UpCommand {
}
}

#[cfg(windows)]
fn set_kill_on_ctrl_c(trigger_processes: &Vec<tokio::process::Child>) -> Result<(), anyhow::Error> {
Ok(())
}

#[cfg(not(windows))]
fn set_kill_on_ctrl_c(trigger_processes: &[tokio::process::Child]) -> Result<(), anyhow::Error> {
// https://github.com/nix-rust/nix/issues/656
let pids = trigger_processes
.iter()
.flat_map(|child| child.id().map(|id| nix::unistd::Pid::from_raw(id as i32)))
.collect_vec();
ctrlc::set_handler(move || {
for pid in &pids {
if let Err(err) = nix::sys::signal::kill(*pid, nix::sys::signal::SIGTERM) {
tracing::warn!("Failed to kill trigger handler process: {:?}", err)
}
}
})?;
Ok(())
}

#[derive(Clone)]
struct RunTriggerOpts {
locked_app: LockedApp,
locked_url: String,
working_dir: PathBuf,
local_app_dir: Option<PathBuf>,
}
Expand Down Expand Up @@ -424,16 +475,20 @@ fn trigger_command(trigger_type: &str) -> Vec<String> {
vec!["trigger".to_owned(), trigger_type.to_owned()]
}

fn trigger_command_for_resolved_app_source(resolved: &ResolvedAppSource) -> Result<Vec<String>> {
let trigger_type = resolved.trigger_type()?;

match trigger_type {
"http" | "redis" => Ok(trigger_command(trigger_type)),
_ => {
let cmd = resolve_trigger_plugin(trigger_type)?;
Ok(vec![cmd])
}
}
fn trigger_command_for_resolved_app_source(
resolved: &ResolvedAppSource,
) -> Result<Vec<Vec<String>>> {
let trigger_type = resolved.trigger_types()?;
trigger_type
.iter()
.map(|&t| match t {
"http" | "redis" => Ok(trigger_command(t)),
_ => {
let cmd = resolve_trigger_plugin(t)?;
Ok(vec![cmd])
}
})
.collect()
}

#[cfg(test)]
Expand Down
Loading

0 comments on commit 4ed8f53

Please sign in to comment.